Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_remote/src/http.rs
6598 views
1
//! The BRP transport using JSON-RPC over HTTP.
2
//!
3
//! Adding the [`RemoteHttpPlugin`] to your [`App`] causes Bevy to accept
4
//! connections over HTTP (by default, on port 15702) while your app is running.
5
//!
6
//! Clients are expected to `POST` JSON requests to the root URL; see the `client`
7
//! example for a trivial example of use.
8
9
#![cfg(not(target_family = "wasm"))]
10
11
use crate::{
12
error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpResult, BrpSender,
13
};
14
use anyhow::Result as AnyhowResult;
15
use async_channel::{Receiver, Sender};
16
use async_io::Async;
17
use bevy_app::{App, Plugin, Startup};
18
use bevy_ecs::resource::Resource;
19
use bevy_ecs::system::Res;
20
use bevy_tasks::{futures_lite::StreamExt, IoTaskPool};
21
use core::{
22
convert::Infallible,
23
net::{IpAddr, Ipv4Addr},
24
pin::Pin,
25
task::{Context, Poll},
26
};
27
use http_body_util::{BodyExt as _, Full};
28
use hyper::{
29
body::{Body, Bytes, Frame, Incoming},
30
header::{HeaderName, HeaderValue},
31
server::conn::http1,
32
service, Request, Response,
33
};
34
use serde_json::Value;
35
use smol_hyper::rt::{FuturesIo, SmolTimer};
36
use std::{
37
collections::HashMap,
38
net::{TcpListener, TcpStream},
39
};
40
41
/// The default port that Bevy will listen on.
42
///
43
/// This value was chosen randomly.
44
pub const DEFAULT_PORT: u16 = 15702;
45
46
/// The default host address that Bevy will use for its server.
47
pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
48
49
/// A struct that holds a collection of HTTP headers.
50
///
51
/// This struct is used to store a set of HTTP headers as key-value pairs, where the keys are
52
/// of type [`HeaderName`] and the values are of type [`HeaderValue`].
53
///
54
#[derive(Debug, Resource, Clone)]
55
pub struct Headers {
56
headers: HashMap<HeaderName, HeaderValue>,
57
}
58
59
impl Headers {
60
/// Create a new instance of `Headers`.
61
pub fn new() -> Self {
62
Self {
63
headers: HashMap::default(),
64
}
65
}
66
67
/// Insert a key value pair to the `Headers` instance.
68
pub fn insert(
69
mut self,
70
name: impl TryInto<HeaderName>,
71
value: impl TryInto<HeaderValue>,
72
) -> Self {
73
let Ok(header_name) = name.try_into() else {
74
panic!("Invalid header name")
75
};
76
let Ok(header_value) = value.try_into() else {
77
panic!("Invalid header value")
78
};
79
self.headers.insert(header_name, header_value);
80
self
81
}
82
}
83
84
impl Default for Headers {
85
fn default() -> Self {
86
Self::new()
87
}
88
}
89
90
/// Add this plugin to your [`App`] to allow remote connections over HTTP to inspect and modify entities.
91
/// It requires the [`RemotePlugin`](super::RemotePlugin).
92
///
93
/// This BRP transport cannot be used when targeting WASM.
94
///
95
/// The defaults are:
96
/// - [`DEFAULT_ADDR`] : 127.0.0.1.
97
/// - [`DEFAULT_PORT`] : 15702.
98
///
99
pub struct RemoteHttpPlugin {
100
/// The address that Bevy will bind to.
101
address: IpAddr,
102
/// The port that Bevy will listen on.
103
port: u16,
104
/// The headers that Bevy will include in its HTTP responses
105
headers: Headers,
106
}
107
108
impl Default for RemoteHttpPlugin {
109
fn default() -> Self {
110
Self {
111
address: DEFAULT_ADDR,
112
port: DEFAULT_PORT,
113
headers: Headers::new(),
114
}
115
}
116
}
117
118
impl Plugin for RemoteHttpPlugin {
119
fn build(&self, app: &mut App) {
120
app.insert_resource(HostAddress(self.address))
121
.insert_resource(HostPort(self.port))
122
.insert_resource(HostHeaders(self.headers.clone()))
123
.add_systems(Startup, start_http_server);
124
}
125
}
126
127
impl RemoteHttpPlugin {
128
/// Set the IP address that the server will use.
129
#[must_use]
130
pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
131
self.address = address.into();
132
self
133
}
134
/// Set the remote port that the server will listen on.
135
#[must_use]
136
pub fn with_port(mut self, port: u16) -> Self {
137
self.port = port;
138
self
139
}
140
/// Set the extra headers that the response will include.
141
///
142
/// ////// /// # Example
143
///
144
/// ```ignore
145
///
146
/// // Create CORS headers
147
/// let cors_headers = Headers::new()
148
/// .insert("Access-Control-Allow-Origin", "*")
149
/// .insert("Access-Control-Allow-Headers", "Content-Type");
150
///
151
/// // Create the Bevy app and add the RemoteHttpPlugin with CORS headers
152
/// fn main() {
153
/// App::new()
154
/// .add_plugins(DefaultPlugins)
155
/// .add_plugins(RemotePlugin::default())
156
/// .add_plugins(RemoteHttpPlugin::default()
157
/// .with_headers(cors_headers))
158
/// .run();
159
/// }
160
/// ```
161
#[must_use]
162
pub fn with_headers(mut self, headers: Headers) -> Self {
163
self.headers = headers;
164
self
165
}
166
/// Add a single header to the response headers.
167
#[must_use]
168
pub fn with_header(
169
mut self,
170
name: impl TryInto<HeaderName>,
171
value: impl TryInto<HeaderValue>,
172
) -> Self {
173
self.headers = self.headers.insert(name, value);
174
self
175
}
176
}
177
178
/// A resource containing the IP address that Bevy will host on.
179
///
180
/// Currently, changing this while the application is running has no effect; this merely
181
/// reflects the IP address that is set during the setup of the [`RemoteHttpPlugin`].
182
#[derive(Debug, Resource)]
183
pub struct HostAddress(pub IpAddr);
184
185
/// A resource containing the port number that Bevy will listen on.
186
///
187
/// Currently, changing this while the application is running has no effect; this merely
188
/// reflects the host that is set during the setup of the [`RemoteHttpPlugin`].
189
#[derive(Debug, Resource)]
190
pub struct HostPort(pub u16);
191
192
/// A resource containing the headers that Bevy will include in its HTTP responses.
193
///
194
#[derive(Debug, Resource)]
195
struct HostHeaders(pub Headers);
196
197
/// A system that starts up the Bevy Remote Protocol HTTP server.
198
fn start_http_server(
199
request_sender: Res<BrpSender>,
200
address: Res<HostAddress>,
201
remote_port: Res<HostPort>,
202
headers: Res<HostHeaders>,
203
) {
204
IoTaskPool::get()
205
.spawn(server_main(
206
address.0,
207
remote_port.0,
208
request_sender.clone(),
209
headers.0.clone(),
210
))
211
.detach();
212
}
213
214
/// The Bevy Remote Protocol server main loop.
215
async fn server_main(
216
address: IpAddr,
217
port: u16,
218
request_sender: Sender<BrpMessage>,
219
headers: Headers,
220
) -> AnyhowResult<()> {
221
listen(
222
Async::<TcpListener>::bind((address, port))?,
223
&request_sender,
224
&headers,
225
)
226
.await
227
}
228
229
async fn listen(
230
listener: Async<TcpListener>,
231
request_sender: &Sender<BrpMessage>,
232
headers: &Headers,
233
) -> AnyhowResult<()> {
234
loop {
235
let (client, _) = listener.accept().await?;
236
237
let request_sender = request_sender.clone();
238
let headers = headers.clone();
239
IoTaskPool::get()
240
.spawn(async move {
241
let _ = handle_client(client, request_sender, headers).await;
242
})
243
.detach();
244
}
245
}
246
247
async fn handle_client(
248
client: Async<TcpStream>,
249
request_sender: Sender<BrpMessage>,
250
headers: Headers,
251
) -> AnyhowResult<()> {
252
http1::Builder::new()
253
.timer(SmolTimer::new())
254
.serve_connection(
255
FuturesIo::new(client),
256
service::service_fn(|request| {
257
process_request_batch(request, &request_sender, &headers)
258
}),
259
)
260
.await?;
261
262
Ok(())
263
}
264
265
/// A helper function for the Bevy Remote Protocol server that handles a batch
266
/// of requests coming from a client.
267
async fn process_request_batch(
268
request: Request<Incoming>,
269
request_sender: &Sender<BrpMessage>,
270
headers: &Headers,
271
) -> AnyhowResult<Response<BrpHttpBody>> {
272
let batch_bytes = request.into_body().collect().await?.to_bytes();
273
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
274
275
let result = match batch {
276
Ok(BrpBatch::Single(request)) => {
277
let response = process_single_request(request, request_sender).await?;
278
match response {
279
BrpHttpResponse::Complete(res) => {
280
BrpHttpResponse::Complete(serde_json::to_string(&res)?)
281
}
282
BrpHttpResponse::Stream(stream) => BrpHttpResponse::Stream(stream),
283
}
284
}
285
Ok(BrpBatch::Batch(requests)) => {
286
let mut responses = Vec::new();
287
288
for request in requests {
289
let response = process_single_request(request, request_sender).await?;
290
match response {
291
BrpHttpResponse::Complete(res) => responses.push(res),
292
BrpHttpResponse::Stream(BrpStream { id, .. }) => {
293
responses.push(BrpResponse::new(
294
id,
295
Err(BrpError {
296
code: error_codes::INVALID_REQUEST,
297
message: "Streaming can not be used in batch requests".to_string(),
298
data: None,
299
}),
300
));
301
}
302
}
303
}
304
305
BrpHttpResponse::Complete(serde_json::to_string(&responses)?)
306
}
307
Err(err) => {
308
let err = BrpResponse::new(
309
None,
310
Err(BrpError {
311
code: error_codes::INVALID_REQUEST,
312
message: err.to_string(),
313
data: None,
314
}),
315
);
316
317
BrpHttpResponse::Complete(serde_json::to_string(&err)?)
318
}
319
};
320
321
let mut response = match result {
322
BrpHttpResponse::Complete(serialized) => {
323
let mut response = Response::new(BrpHttpBody::Complete(Full::new(Bytes::from(
324
serialized.as_bytes().to_owned(),
325
))));
326
response.headers_mut().insert(
327
hyper::header::CONTENT_TYPE,
328
HeaderValue::from_static("application/json"),
329
);
330
response
331
}
332
BrpHttpResponse::Stream(stream) => {
333
let mut response = Response::new(BrpHttpBody::Stream(stream));
334
response.headers_mut().insert(
335
hyper::header::CONTENT_TYPE,
336
HeaderValue::from_static("text/event-stream"),
337
);
338
response
339
}
340
};
341
for (key, value) in &headers.headers {
342
response.headers_mut().insert(key, value.clone());
343
}
344
Ok(response)
345
}
346
347
/// A helper function for the Bevy Remote Protocol server that processes a single
348
/// request coming from a client.
349
async fn process_single_request(
350
request: Value,
351
request_sender: &Sender<BrpMessage>,
352
) -> AnyhowResult<BrpHttpResponse<BrpResponse, BrpStream>> {
353
// Reach in and get the request ID early so that we can report it even when parsing fails.
354
let id = request.as_object().and_then(|map| map.get("id")).cloned();
355
356
let request: BrpRequest = match serde_json::from_value(request) {
357
Ok(v) => v,
358
Err(err) => {
359
return Ok(BrpHttpResponse::Complete(BrpResponse::new(
360
id,
361
Err(BrpError {
362
code: error_codes::INVALID_REQUEST,
363
message: err.to_string(),
364
data: None,
365
}),
366
)));
367
}
368
};
369
370
if request.jsonrpc != "2.0" {
371
return Ok(BrpHttpResponse::Complete(BrpResponse::new(
372
id,
373
Err(BrpError {
374
code: error_codes::INVALID_REQUEST,
375
message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
376
data: None,
377
}),
378
)));
379
}
380
381
let watch = request.method.contains("+watch");
382
let size = if watch { 8 } else { 1 };
383
let (result_sender, result_receiver) = async_channel::bounded(size);
384
385
let _ = request_sender
386
.send(BrpMessage {
387
method: request.method,
388
params: request.params,
389
sender: result_sender,
390
})
391
.await;
392
393
if watch {
394
Ok(BrpHttpResponse::Stream(BrpStream {
395
id: request.id,
396
rx: Box::pin(result_receiver),
397
}))
398
} else {
399
let result = result_receiver.recv().await?;
400
Ok(BrpHttpResponse::Complete(BrpResponse::new(
401
request.id, result,
402
)))
403
}
404
}
405
406
struct BrpStream {
407
id: Option<Value>,
408
rx: Pin<Box<Receiver<BrpResult>>>,
409
}
410
411
impl Body for BrpStream {
412
type Data = Bytes;
413
type Error = Infallible;
414
415
fn poll_frame(
416
mut self: Pin<&mut Self>,
417
cx: &mut Context<'_>,
418
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
419
match self.as_mut().rx.poll_next(cx) {
420
Poll::Ready(result) => match result {
421
Some(result) => {
422
let response = BrpResponse::new(self.id.clone(), result);
423
let serialized = serde_json::to_string(&response).unwrap();
424
let bytes =
425
Bytes::from(format!("data: {serialized}\n\n").as_bytes().to_owned());
426
let frame = Frame::data(bytes);
427
Poll::Ready(Some(Ok(frame)))
428
}
429
None => Poll::Ready(None),
430
},
431
Poll::Pending => Poll::Pending,
432
}
433
}
434
435
fn is_end_stream(&self) -> bool {
436
self.rx.is_closed()
437
}
438
}
439
440
enum BrpHttpResponse<C, S> {
441
Complete(C),
442
Stream(S),
443
}
444
445
enum BrpHttpBody {
446
Complete(Full<Bytes>),
447
Stream(BrpStream),
448
}
449
450
impl Body for BrpHttpBody {
451
type Data = Bytes;
452
type Error = Infallible;
453
454
fn poll_frame(
455
self: Pin<&mut Self>,
456
cx: &mut Context<'_>,
457
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
458
match &mut *self.get_mut() {
459
BrpHttpBody::Complete(body) => Body::poll_frame(Pin::new(body), cx),
460
BrpHttpBody::Stream(body) => Body::poll_frame(Pin::new(body), cx),
461
}
462
}
463
}
464
465