Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-http/src/p3/body.rs
3088 views
1
use crate::p3::bindings::http::types::{ErrorCode, Fields, Trailers};
2
use crate::p3::{WasiHttp, WasiHttpCtxView};
3
use bytes::Bytes;
4
use core::iter;
5
use core::num::NonZeroUsize;
6
use core::pin::Pin;
7
use core::task::{Context, Poll, ready};
8
use http::HeaderMap;
9
use http_body::Body as _;
10
use http_body_util::combinators::UnsyncBoxBody;
11
use std::any::{Any, TypeId};
12
use std::io::Cursor;
13
use std::sync::Arc;
14
use tokio::sync::{mpsc, oneshot};
15
use tokio_util::sync::PollSender;
16
use wasmtime::component::{
17
Access, Destination, FutureConsumer, FutureReader, Resource, Source, StreamConsumer,
18
StreamProducer, StreamReader, StreamResult,
19
};
20
use wasmtime::error::Context as _;
21
use wasmtime::{AsContextMut, StoreContextMut};
22
23
/// The concrete type behind a `wasi:http/types.body` resource.
24
pub(crate) enum Body {
25
/// Body constructed by the guest
26
Guest {
27
/// The body stream
28
contents_rx: Option<StreamReader<u8>>,
29
/// Future, on which guest will write result and optional trailers
30
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
31
/// Channel, on which transmission result will be written
32
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
33
},
34
/// Body constructed by the host.
35
Host {
36
/// The [`http_body::Body`]
37
body: UnsyncBoxBody<Bytes, ErrorCode>,
38
/// Channel, on which transmission result will be written
39
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
40
},
41
}
42
43
/// [FutureConsumer] implementation for future passed to `consume-body`.
44
struct BodyResultConsumer(
45
Option<oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>>,
46
);
47
48
impl<D> FutureConsumer<D> for BodyResultConsumer
49
where
50
D: 'static,
51
{
52
type Item = Result<(), ErrorCode>;
53
54
fn poll_consume(
55
mut self: Pin<&mut Self>,
56
_: &mut Context<'_>,
57
store: StoreContextMut<D>,
58
mut src: Source<'_, Self::Item>,
59
_: bool,
60
) -> Poll<wasmtime::Result<()>> {
61
let mut res = None;
62
src.read(store, &mut res).context("failed to read result")?;
63
let res = res.context("result value missing")?;
64
let tx = self.0.take().context("polled after returning `Ready`")?;
65
_ = tx.send(Box::new(async { res }));
66
Poll::Ready(Ok(()))
67
}
68
}
69
70
impl Body {
71
/// Implementation of `consume-body` shared between requests and responses
72
pub(crate) fn consume<T>(
73
self,
74
mut store: Access<'_, T, WasiHttp>,
75
fut: FutureReader<Result<(), ErrorCode>>,
76
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
77
) -> (
78
StreamReader<u8>,
79
FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
80
) {
81
match self {
82
Body::Guest {
83
contents_rx: Some(contents_rx),
84
trailers_rx,
85
result_tx,
86
} => {
87
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
88
(contents_rx, trailers_rx)
89
}
90
Body::Guest {
91
contents_rx: None,
92
trailers_rx,
93
result_tx,
94
} => {
95
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
96
(StreamReader::new(&mut store, iter::empty()), trailers_rx)
97
}
98
Body::Host { body, result_tx } => {
99
fut.pipe(&mut store, BodyResultConsumer(Some(result_tx)));
100
let (trailers_tx, trailers_rx) = oneshot::channel();
101
(
102
StreamReader::new(
103
&mut store,
104
HostBodyStreamProducer {
105
body,
106
trailers: Some(trailers_tx),
107
getter,
108
},
109
),
110
FutureReader::new(&mut store, trailers_rx),
111
)
112
}
113
}
114
}
115
116
/// Implementation of `drop` shared between requests and responses
117
pub(crate) fn drop(self, mut store: impl AsContextMut) {
118
if let Body::Guest {
119
contents_rx,
120
mut trailers_rx,
121
..
122
} = self
123
{
124
if let Some(mut contents_rx) = contents_rx {
125
contents_rx.close(&mut store);
126
}
127
trailers_rx.close(store);
128
}
129
}
130
}
131
132
/// [StreamConsumer] implementation for bodies originating in the guest with `Content-Length`
133
/// header set.
134
struct LimitedGuestBodyConsumer {
135
contents_tx: PollSender<Result<Bytes, ErrorCode>>,
136
error_tx: Option<oneshot::Sender<ErrorCode>>,
137
make_error: fn(Option<u64>) -> ErrorCode,
138
/// Limit of bytes to be sent
139
limit: u64,
140
/// Number of bytes sent
141
sent: u64,
142
// `true` when the other side of `contents_tx` was unexpectedly closed
143
closed: bool,
144
}
145
146
impl LimitedGuestBodyConsumer {
147
/// Sends the error constructed by [Self::make_error] on both error channels.
148
/// Does nothing if an error has already been sent on [Self::error_tx].
149
fn send_error(&mut self, sent: Option<u64>) {
150
if let Some(error_tx) = self.error_tx.take() {
151
_ = error_tx.send((self.make_error)(sent));
152
self.contents_tx.abort_send();
153
if let Some(tx) = self.contents_tx.get_ref() {
154
_ = tx.try_send(Err((self.make_error)(sent)))
155
}
156
self.contents_tx.close();
157
}
158
}
159
}
160
161
impl Drop for LimitedGuestBodyConsumer {
162
fn drop(&mut self) {
163
if !self.closed && self.limit != self.sent {
164
self.send_error(Some(self.sent))
165
}
166
}
167
}
168
169
impl<D> StreamConsumer<D> for LimitedGuestBodyConsumer {
170
type Item = u8;
171
172
fn poll_consume(
173
mut self: Pin<&mut Self>,
174
cx: &mut Context<'_>,
175
store: StoreContextMut<D>,
176
src: Source<Self::Item>,
177
finish: bool,
178
) -> Poll<wasmtime::Result<StreamResult>> {
179
debug_assert!(!self.closed);
180
let mut src = src.as_direct(store);
181
let buf = src.remaining();
182
let n = buf.len();
183
184
// Perform `content-length` check early and precompute the next value
185
let Ok(sent) = n.try_into() else {
186
self.send_error(None);
187
return Poll::Ready(Ok(StreamResult::Dropped));
188
};
189
let Some(sent) = self.sent.checked_add(sent) else {
190
self.send_error(None);
191
return Poll::Ready(Ok(StreamResult::Dropped));
192
};
193
if sent > self.limit {
194
self.send_error(Some(sent));
195
return Poll::Ready(Ok(StreamResult::Dropped));
196
}
197
match self.contents_tx.poll_reserve(cx) {
198
Poll::Ready(Ok(())) => {
199
let buf = Bytes::copy_from_slice(buf);
200
match self.contents_tx.send_item(Ok(buf)) {
201
Ok(()) => {
202
src.mark_read(n);
203
// Record new `content-length` only on successful send
204
self.sent = sent;
205
Poll::Ready(Ok(StreamResult::Completed))
206
}
207
Err(..) => {
208
self.closed = true;
209
Poll::Ready(Ok(StreamResult::Dropped))
210
}
211
}
212
}
213
Poll::Ready(Err(..)) => {
214
self.closed = true;
215
Poll::Ready(Ok(StreamResult::Dropped))
216
}
217
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
218
Poll::Pending => Poll::Pending,
219
}
220
}
221
}
222
223
/// [StreamConsumer] implementation for bodies originating in the guest without `Content-Length`
224
/// header set.
225
struct UnlimitedGuestBodyConsumer(PollSender<Result<Bytes, ErrorCode>>);
226
227
impl<D> StreamConsumer<D> for UnlimitedGuestBodyConsumer {
228
type Item = u8;
229
230
fn poll_consume(
231
mut self: Pin<&mut Self>,
232
cx: &mut Context<'_>,
233
store: StoreContextMut<D>,
234
src: Source<Self::Item>,
235
finish: bool,
236
) -> Poll<wasmtime::Result<StreamResult>> {
237
match self.0.poll_reserve(cx) {
238
Poll::Ready(Ok(())) => {
239
let mut src = src.as_direct(store);
240
let buf = src.remaining();
241
let n = buf.len();
242
let buf = Bytes::copy_from_slice(buf);
243
match self.0.send_item(Ok(buf)) {
244
Ok(()) => {
245
src.mark_read(n);
246
Poll::Ready(Ok(StreamResult::Completed))
247
}
248
Err(..) => Poll::Ready(Ok(StreamResult::Dropped)),
249
}
250
}
251
Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)),
252
Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)),
253
Poll::Pending => Poll::Pending,
254
}
255
}
256
}
257
258
/// [http_body::Body] implementation for bodies originating in the guest.
259
pub(crate) struct GuestBody {
260
contents_rx: Option<mpsc::Receiver<Result<Bytes, ErrorCode>>>,
261
trailers_rx: Option<oneshot::Receiver<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>>,
262
content_length: Option<u64>,
263
}
264
265
impl GuestBody {
266
/// Construct a new [GuestBody]
267
pub(crate) fn new<T: 'static>(
268
mut store: impl AsContextMut<Data = T>,
269
contents_rx: Option<StreamReader<u8>>,
270
trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>,
271
result_tx: oneshot::Sender<Box<dyn Future<Output = Result<(), ErrorCode>> + Send>>,
272
result_fut: impl Future<Output = Result<(), ErrorCode>> + Send + 'static,
273
content_length: Option<u64>,
274
make_error: fn(Option<u64>) -> ErrorCode,
275
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
276
) -> Self {
277
let (trailers_http_tx, trailers_http_rx) = oneshot::channel();
278
trailers_rx.pipe(
279
&mut store,
280
GuestTrailerConsumer {
281
tx: Some(trailers_http_tx),
282
getter,
283
},
284
);
285
286
let contents_rx = if let Some(rx) = contents_rx {
287
let (http_tx, http_rx) = mpsc::channel(1);
288
let contents_tx = PollSender::new(http_tx);
289
if let Some(limit) = content_length {
290
let (error_tx, error_rx) = oneshot::channel();
291
_ = result_tx.send(Box::new(async move {
292
if let Ok(err) = error_rx.await {
293
return Err(err);
294
};
295
result_fut.await
296
}));
297
rx.pipe(
298
store,
299
LimitedGuestBodyConsumer {
300
contents_tx,
301
error_tx: Some(error_tx),
302
make_error,
303
limit,
304
sent: 0,
305
closed: false,
306
},
307
);
308
} else {
309
_ = result_tx.send(Box::new(result_fut));
310
rx.pipe(store, UnlimitedGuestBodyConsumer(contents_tx));
311
};
312
Some(http_rx)
313
} else {
314
_ = result_tx.send(Box::new(result_fut));
315
None
316
};
317
Self {
318
trailers_rx: Some(trailers_http_rx),
319
contents_rx,
320
content_length,
321
}
322
}
323
}
324
325
impl http_body::Body for GuestBody {
326
type Data = Bytes;
327
type Error = ErrorCode;
328
329
fn poll_frame(
330
mut self: Pin<&mut Self>,
331
cx: &mut Context<'_>,
332
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
333
if let Some(contents_rx) = self.contents_rx.as_mut() {
334
// `contents_rx` has not been closed yet, poll it
335
while let Some(res) = ready!(contents_rx.poll_recv(cx)) {
336
match res {
337
Ok(buf) => {
338
if let Some(n) = self.content_length.as_mut() {
339
// Subtract frame length from `content_length`,
340
// [LimitedGuestBodyConsumer] already performs the validation, so
341
// just keep count as optimization for
342
// `is_end_stream` and `size_hint`
343
*n = n.saturating_sub(buf.len().try_into().unwrap_or(u64::MAX));
344
}
345
return Poll::Ready(Some(Ok(http_body::Frame::data(buf))));
346
}
347
Err(err) => {
348
return Poll::Ready(Some(Err(err)));
349
}
350
}
351
}
352
// Record that `contents_rx` is closed
353
self.contents_rx = None;
354
}
355
356
let Some(trailers_rx) = self.trailers_rx.as_mut() else {
357
// `trailers_rx` has already terminated - this is the end of stream
358
return Poll::Ready(None);
359
};
360
361
let res = ready!(Pin::new(trailers_rx).poll(cx));
362
// Record that `trailers_rx` has terminated
363
self.trailers_rx = None;
364
match res {
365
Ok(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(
366
Arc::unwrap_or_clone(trailers),
367
)))),
368
Ok(Ok(None)) => Poll::Ready(None),
369
Ok(Err(err)) => Poll::Ready(Some(Err(err))),
370
Err(..) => Poll::Ready(None),
371
}
372
}
373
374
fn is_end_stream(&self) -> bool {
375
if let Some(contents_rx) = self.contents_rx.as_ref() {
376
if !contents_rx.is_empty()
377
|| !contents_rx.is_closed()
378
|| self.content_length.is_some_and(|n| n > 0)
379
{
380
// `contents_rx` might still produce data frames
381
return false;
382
}
383
}
384
if let Some(trailers_rx) = self.trailers_rx.as_ref() {
385
if !trailers_rx.is_terminated() {
386
// `trailers_rx` has not terminated yet
387
return false;
388
}
389
}
390
391
// no data left
392
return true;
393
}
394
395
fn size_hint(&self) -> http_body::SizeHint {
396
if let Some(n) = self.content_length {
397
http_body::SizeHint::with_exact(n)
398
} else {
399
http_body::SizeHint::default()
400
}
401
}
402
}
403
404
/// [FutureConsumer] implementation for trailers originating in the guest.
405
struct GuestTrailerConsumer<T> {
406
tx: Option<oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>>,
407
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
408
}
409
410
impl<D> FutureConsumer<D> for GuestTrailerConsumer<D>
411
where
412
D: 'static,
413
{
414
type Item = Result<Option<Resource<Trailers>>, ErrorCode>;
415
416
fn poll_consume(
417
mut self: Pin<&mut Self>,
418
_: &mut Context<'_>,
419
mut store: StoreContextMut<D>,
420
mut src: Source<'_, Self::Item>,
421
_: bool,
422
) -> Poll<wasmtime::Result<()>> {
423
let mut res = None;
424
src.read(&mut store, &mut res)
425
.context("failed to read result")?;
426
let res = match res.context("result value missing")? {
427
Ok(Some(trailers)) => {
428
let WasiHttpCtxView { table, .. } = (self.getter)(store.data_mut());
429
let trailers = table
430
.delete(trailers)
431
.context("failed to delete trailers")?;
432
Ok(Some(Arc::from(trailers)))
433
}
434
Ok(None) => Ok(None),
435
Err(err) => Err(err),
436
};
437
_ = self.tx.take().unwrap().send(res);
438
Poll::Ready(Ok(()))
439
}
440
}
441
442
/// [StreamProducer] implementation for bodies originating in the host.
443
pub(crate) struct HostBodyStreamProducer<T> {
444
pub(crate) body: UnsyncBoxBody<Bytes, ErrorCode>,
445
trailers: Option<oneshot::Sender<Result<Option<Resource<Trailers>>, ErrorCode>>>,
446
getter: fn(&mut T) -> WasiHttpCtxView<'_>,
447
}
448
449
impl<T> Drop for HostBodyStreamProducer<T> {
450
fn drop(&mut self) {
451
self.close(Ok(None))
452
}
453
}
454
455
impl<T> HostBodyStreamProducer<T> {
456
fn close(&mut self, res: Result<Option<Resource<Trailers>>, ErrorCode>) {
457
if let Some(tx) = self.trailers.take() {
458
_ = tx.send(res);
459
}
460
}
461
}
462
463
impl<D> StreamProducer<D> for HostBodyStreamProducer<D>
464
where
465
D: 'static,
466
{
467
type Item = u8;
468
type Buffer = Cursor<Bytes>;
469
470
fn poll_produce<'a>(
471
mut self: Pin<&mut Self>,
472
cx: &mut Context<'_>,
473
mut store: StoreContextMut<'a, D>,
474
mut dst: Destination<'a, Self::Item, Self::Buffer>,
475
finish: bool,
476
) -> Poll<wasmtime::Result<StreamResult>> {
477
let res = 'result: {
478
let cap = match dst.remaining(&mut store).map(NonZeroUsize::new) {
479
Some(Some(cap)) => Some(cap),
480
Some(None) => {
481
// On 0-length the best we can do is check that underlying stream has not
482
// reached the end yet
483
if self.body.is_end_stream() {
484
break 'result Ok(None);
485
} else {
486
return Poll::Ready(Ok(StreamResult::Completed));
487
}
488
}
489
None => None,
490
};
491
match Pin::new(&mut self.body).poll_frame(cx) {
492
Poll::Ready(Some(Ok(frame))) => {
493
match frame.into_data().map_err(http_body::Frame::into_trailers) {
494
Ok(mut frame) => {
495
// Libraries like `Reqwest` generate a 0-length frame after sensing end-of-stream,
496
// so we have to check for the body's end-of-stream indicator here too
497
if frame.len() == 0 && self.body.is_end_stream() {
498
break 'result Ok(None);
499
}
500
501
if let Some(cap) = cap {
502
let n = frame.len();
503
let cap = cap.into();
504
if n > cap {
505
// data frame does not fit in destination, fill it and buffer the rest
506
dst.set_buffer(Cursor::new(frame.split_off(cap)));
507
let mut dst = dst.as_direct(store, cap);
508
dst.remaining().copy_from_slice(&frame);
509
dst.mark_written(cap);
510
} else {
511
// copy the whole frame into the destination
512
let mut dst = dst.as_direct(store, n);
513
dst.remaining()[..n].copy_from_slice(&frame);
514
dst.mark_written(n);
515
}
516
} else {
517
dst.set_buffer(Cursor::new(frame));
518
}
519
return Poll::Ready(Ok(StreamResult::Completed));
520
}
521
Err(Ok(trailers)) => {
522
let trailers = (self.getter)(store.data_mut())
523
.table
524
.push(Fields::new_mutable(trailers))
525
.context("failed to push trailers to table")?;
526
break 'result Ok(Some(trailers));
527
}
528
Err(Err(..)) => break 'result Err(ErrorCode::HttpProtocolError),
529
}
530
}
531
Poll::Ready(Some(Err(err))) => break 'result Err(err),
532
Poll::Ready(None) => break 'result Ok(None),
533
Poll::Pending if finish => return Poll::Ready(Ok(StreamResult::Cancelled)),
534
Poll::Pending => return Poll::Pending,
535
}
536
};
537
self.close(res);
538
Poll::Ready(Ok(StreamResult::Dropped))
539
}
540
541
fn try_into(me: Pin<Box<Self>>, ty: TypeId) -> Result<Box<dyn Any>, Pin<Box<Self>>> {
542
if ty == TypeId::of::<Self>() {
543
let me = Pin::into_inner(me);
544
Ok(me)
545
} else {
546
Err(me)
547
}
548
}
549
}
550
551
/// A wrapper around [http_body::Body], which allows attaching arbitrary state to it
552
pub(crate) struct BodyWithState<T, U> {
553
body: T,
554
_state: U,
555
}
556
557
impl<T, U> http_body::Body for BodyWithState<T, U>
558
where
559
T: http_body::Body + Unpin,
560
U: Unpin,
561
{
562
type Data = T::Data;
563
type Error = T::Error;
564
565
#[inline]
566
fn poll_frame(
567
self: Pin<&mut Self>,
568
cx: &mut Context<'_>,
569
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
570
Pin::new(&mut self.get_mut().body).poll_frame(cx)
571
}
572
573
#[inline]
574
fn is_end_stream(&self) -> bool {
575
self.body.is_end_stream()
576
}
577
578
#[inline]
579
fn size_hint(&self) -> http_body::SizeHint {
580
self.body.size_hint()
581
}
582
}
583
584
/// A wrapper around [http_body::Body], which validates `Content-Length`
585
pub(crate) struct BodyWithContentLength<T, E> {
586
body: T,
587
error_tx: Option<oneshot::Sender<E>>,
588
make_error: fn(Option<u64>) -> E,
589
/// Limit of bytes to be sent
590
limit: u64,
591
/// Number of bytes sent
592
sent: u64,
593
}
594
595
impl<T, E> BodyWithContentLength<T, E> {
596
/// Sends the error constructed by [Self::make_error] on [Self::error_tx].
597
/// Does nothing if an error has already been sent on [Self::error_tx].
598
fn send_error<V>(&mut self, sent: Option<u64>) -> Poll<Option<Result<V, E>>> {
599
if let Some(error_tx) = self.error_tx.take() {
600
_ = error_tx.send((self.make_error)(sent));
601
}
602
Poll::Ready(Some(Err((self.make_error)(sent))))
603
}
604
}
605
606
impl<T, E> http_body::Body for BodyWithContentLength<T, E>
607
where
608
T: http_body::Body<Data = Bytes, Error = E> + Unpin,
609
{
610
type Data = T::Data;
611
type Error = T::Error;
612
613
#[inline]
614
fn poll_frame(
615
mut self: Pin<&mut Self>,
616
cx: &mut Context<'_>,
617
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
618
match ready!(Pin::new(&mut self.as_mut().body).poll_frame(cx)) {
619
Some(Ok(frame)) => {
620
let Some(data) = frame.data_ref() else {
621
return Poll::Ready(Some(Ok(frame)));
622
};
623
let Ok(sent) = data.len().try_into() else {
624
return self.send_error(None);
625
};
626
let Some(sent) = self.sent.checked_add(sent) else {
627
return self.send_error(None);
628
};
629
if sent > self.limit {
630
return self.send_error(Some(sent));
631
}
632
self.sent = sent;
633
Poll::Ready(Some(Ok(frame)))
634
}
635
Some(Err(err)) => Poll::Ready(Some(Err(err))),
636
None if self.limit != self.sent => {
637
// short write
638
let sent = self.sent;
639
self.send_error(Some(sent))
640
}
641
None => Poll::Ready(None),
642
}
643
}
644
645
#[inline]
646
fn is_end_stream(&self) -> bool {
647
self.body.is_end_stream()
648
}
649
650
#[inline]
651
fn size_hint(&self) -> http_body::SizeHint {
652
let n = self.limit.saturating_sub(self.sent);
653
let mut hint = self.body.size_hint();
654
if hint.lower() >= n {
655
hint.set_exact(n)
656
} else if let Some(max) = hint.upper() {
657
hint.set_upper(n.min(max))
658
} else {
659
hint.set_upper(n)
660
}
661
hint
662
}
663
}
664
665
pub(crate) trait BodyExt {
666
fn with_state<T>(self, state: T) -> BodyWithState<Self, T>
667
where
668
Self: Sized,
669
{
670
BodyWithState {
671
body: self,
672
_state: state,
673
}
674
}
675
676
fn with_content_length<E>(
677
self,
678
limit: u64,
679
error_tx: oneshot::Sender<E>,
680
make_error: fn(Option<u64>) -> E,
681
) -> BodyWithContentLength<Self, E>
682
where
683
Self: Sized,
684
{
685
BodyWithContentLength {
686
body: self,
687
error_tx: Some(error_tx),
688
make_error,
689
limit,
690
sent: 0,
691
}
692
}
693
}
694
695
impl<T> BodyExt for T {}
696
697