Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/json_rpc.rs
3309 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
6
use tokio::{
7
io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
8
pin,
9
sync::mpsc,
10
};
11
12
use crate::{
13
rpc::{self, MaybeSync, Serialization},
14
util::{
15
errors::InvalidRpcDataError,
16
sync::{Barrier, Receivable},
17
},
18
};
19
use std::io;
20
21
#[derive(Clone)]
22
pub struct JsonRpcSerializer {}
23
24
impl Serialization for JsonRpcSerializer {
25
fn serialize(&self, value: impl serde::Serialize) -> Vec<u8> {
26
let mut v = serde_json::to_vec(&value).unwrap();
27
v.push(b'\n');
28
v
29
}
30
31
fn deserialize<P: serde::de::DeserializeOwned>(
32
&self,
33
b: &[u8],
34
) -> Result<P, crate::util::errors::AnyError> {
35
serde_json::from_slice(b).map_err(|e| InvalidRpcDataError(e.to_string()).into())
36
}
37
}
38
39
/// Creates a new RPC Builder that serializes to JSON.
40
#[allow(dead_code)]
41
pub fn new_json_rpc() -> rpc::RpcBuilder<JsonRpcSerializer> {
42
rpc::RpcBuilder::new(JsonRpcSerializer {})
43
}
44
45
#[allow(dead_code)]
46
pub async fn start_json_rpc<C: Send + Sync + 'static, S: Clone>(
47
dispatcher: rpc::RpcDispatcher<JsonRpcSerializer, C>,
48
read: impl AsyncRead + Unpin,
49
mut write: impl AsyncWrite + Unpin,
50
mut msg_rx: impl Receivable<Vec<u8>>,
51
mut shutdown_rx: Barrier<S>,
52
) -> io::Result<Option<S>> {
53
let (write_tx, mut write_rx) = mpsc::channel::<Vec<u8>>(8);
54
let mut read = BufReader::new(read);
55
56
let mut read_buf = String::new();
57
let shutdown_fut = shutdown_rx.wait();
58
pin!(shutdown_fut);
59
60
loop {
61
tokio::select! {
62
r = &mut shutdown_fut => return Ok(r.ok()),
63
Some(w) = write_rx.recv() => {
64
write.write_all(&w).await?;
65
},
66
Some(w) = msg_rx.recv_msg() => {
67
write.write_all(&w).await?;
68
},
69
n = read.read_line(&mut read_buf) => {
70
let r = match n {
71
Ok(0) => return Ok(None),
72
Ok(n) => dispatcher.dispatch(read_buf[..n].as_bytes()),
73
Err(e) => return Err(e)
74
};
75
76
read_buf.truncate(0);
77
78
match r {
79
MaybeSync::Sync(Some(v)) => {
80
write.write_all(&v).await?;
81
},
82
MaybeSync::Sync(None) => continue,
83
MaybeSync::Future(fut) => {
84
let write_tx = write_tx.clone();
85
tokio::spawn(async move {
86
if let Some(v) = fut.await {
87
let _ = write_tx.send(v).await;
88
}
89
});
90
},
91
MaybeSync::Stream((dto, fut)) => {
92
if let Some(dto) = dto {
93
dispatcher.register_stream(write_tx.clone(), dto).await;
94
}
95
let write_tx = write_tx.clone();
96
tokio::spawn(async move {
97
if let Some(v) = fut.await {
98
let _ = write_tx.send(v).await;
99
}
100
});
101
}
102
}
103
}
104
}
105
}
106
}
107
108