Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/contrib/libevent/bufferevent_async.c
39475 views
1
/*
2
* Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
3
*
4
* All rights reserved.
5
*
6
* Redistribution and use in source and binary forms, with or without
7
* modification, are permitted provided that the following conditions
8
* are met:
9
* 1. Redistributions of source code must retain the above copyright
10
* notice, this list of conditions and the following disclaimer.
11
* 2. Redistributions in binary form must reproduce the above copyright
12
* notice, this list of conditions and the following disclaimer in the
13
* documentation and/or other materials provided with the distribution.
14
* 3. The name of the author may not be used to endorse or promote products
15
* derived from this software without specific prior written permission.
16
*
17
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27
*/
28
29
#include "event2/event-config.h"
30
#include "evconfig-private.h"
31
32
#ifdef EVENT__HAVE_SYS_TIME_H
33
#include <sys/time.h>
34
#endif
35
36
#include <errno.h>
37
#include <stdio.h>
38
#include <stdlib.h>
39
#include <string.h>
40
#ifdef EVENT__HAVE_STDARG_H
41
#include <stdarg.h>
42
#endif
43
#ifdef EVENT__HAVE_UNISTD_H
44
#include <unistd.h>
45
#endif
46
47
#ifdef _WIN32
48
#include <winsock2.h>
49
#include <winerror.h>
50
#include <ws2tcpip.h>
51
#endif
52
53
#include <sys/queue.h>
54
55
#include "event2/util.h"
56
#include "event2/bufferevent.h"
57
#include "event2/buffer.h"
58
#include "event2/bufferevent_struct.h"
59
#include "event2/event.h"
60
#include "event2/util.h"
61
#include "event-internal.h"
62
#include "log-internal.h"
63
#include "mm-internal.h"
64
#include "bufferevent-internal.h"
65
#include "util-internal.h"
66
#include "iocp-internal.h"
67
68
#ifndef SO_UPDATE_CONNECT_CONTEXT
69
/* Mingw is sometimes missing this */
70
#define SO_UPDATE_CONNECT_CONTEXT 0x7010
71
#endif
72
73
/* prototypes */
74
static int be_async_enable(struct bufferevent *, short);
75
static int be_async_disable(struct bufferevent *, short);
76
static void be_async_destruct(struct bufferevent *);
77
static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
78
static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
79
80
struct bufferevent_async {
81
struct bufferevent_private bev;
82
struct event_overlapped connect_overlapped;
83
struct event_overlapped read_overlapped;
84
struct event_overlapped write_overlapped;
85
size_t read_in_progress;
86
size_t write_in_progress;
87
unsigned ok : 1;
88
unsigned read_added : 1;
89
unsigned write_added : 1;
90
};
91
92
const struct bufferevent_ops bufferevent_ops_async = {
93
"socket_async",
94
evutil_offsetof(struct bufferevent_async, bev.bev),
95
be_async_enable,
96
be_async_disable,
97
NULL, /* Unlink */
98
be_async_destruct,
99
bufferevent_generic_adj_timeouts_,
100
be_async_flush,
101
be_async_ctrl,
102
};
103
104
static inline void
105
be_async_run_eventcb(struct bufferevent *bev, short what, int options)
106
{ bufferevent_run_eventcb_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
107
108
static inline void
109
be_async_trigger_nolock(struct bufferevent *bev, short what, int options)
110
{ bufferevent_trigger_nolock_(bev, what, options|BEV_TRIG_DEFER_CALLBACKS); }
111
112
static inline int
113
fatal_error(int err)
114
{
115
switch (err) {
116
/* We may have already associated this fd with a port.
117
* Let's hope it's this port, and that the error code
118
* for doing this neer changes. */
119
case ERROR_INVALID_PARAMETER:
120
return 0;
121
}
122
return 1;
123
}
124
125
static inline struct bufferevent_async *
126
upcast(struct bufferevent *bev)
127
{
128
struct bufferevent_async *bev_a;
129
if (!BEV_IS_ASYNC(bev))
130
return NULL;
131
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
132
return bev_a;
133
}
134
135
static inline struct bufferevent_async *
136
upcast_connect(struct event_overlapped *eo)
137
{
138
struct bufferevent_async *bev_a;
139
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
140
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
141
return bev_a;
142
}
143
144
static inline struct bufferevent_async *
145
upcast_read(struct event_overlapped *eo)
146
{
147
struct bufferevent_async *bev_a;
148
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
149
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
150
return bev_a;
151
}
152
153
static inline struct bufferevent_async *
154
upcast_write(struct event_overlapped *eo)
155
{
156
struct bufferevent_async *bev_a;
157
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
158
EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
159
return bev_a;
160
}
161
162
static void
163
bev_async_del_write(struct bufferevent_async *beva)
164
{
165
struct bufferevent *bev = &beva->bev.bev;
166
167
if (beva->write_added) {
168
beva->write_added = 0;
169
event_base_del_virtual_(bev->ev_base);
170
}
171
}
172
173
static void
174
bev_async_del_read(struct bufferevent_async *beva)
175
{
176
struct bufferevent *bev = &beva->bev.bev;
177
178
if (beva->read_added) {
179
beva->read_added = 0;
180
event_base_del_virtual_(bev->ev_base);
181
}
182
}
183
184
static void
185
bev_async_add_write(struct bufferevent_async *beva)
186
{
187
struct bufferevent *bev = &beva->bev.bev;
188
189
if (!beva->write_added) {
190
beva->write_added = 1;
191
event_base_add_virtual_(bev->ev_base);
192
}
193
}
194
195
static void
196
bev_async_add_read(struct bufferevent_async *beva)
197
{
198
struct bufferevent *bev = &beva->bev.bev;
199
200
if (!beva->read_added) {
201
beva->read_added = 1;
202
event_base_add_virtual_(bev->ev_base);
203
}
204
}
205
206
static void
207
bev_async_consider_writing(struct bufferevent_async *beva)
208
{
209
size_t at_most;
210
int limit;
211
struct bufferevent *bev = &beva->bev.bev;
212
213
/* Don't write if there's a write in progress, or we do not
214
* want to write, or when there's nothing left to write. */
215
if (beva->write_in_progress || beva->bev.connecting)
216
return;
217
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
218
!evbuffer_get_length(bev->output)) {
219
bev_async_del_write(beva);
220
return;
221
}
222
223
at_most = evbuffer_get_length(bev->output);
224
225
/* This is safe so long as bufferevent_get_write_max never returns
226
* more than INT_MAX. That's true for now. XXXX */
227
limit = (int)bufferevent_get_write_max_(&beva->bev);
228
if (at_most >= (size_t)limit && limit >= 0)
229
at_most = limit;
230
231
if (beva->bev.write_suspended) {
232
bev_async_del_write(beva);
233
return;
234
}
235
236
/* XXXX doesn't respect low-water mark very well. */
237
bufferevent_incref_(bev);
238
if (evbuffer_launch_write_(bev->output, at_most,
239
&beva->write_overlapped)) {
240
bufferevent_decref_(bev);
241
beva->ok = 0;
242
be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
243
} else {
244
beva->write_in_progress = at_most;
245
bufferevent_decrement_write_buckets_(&beva->bev, at_most);
246
bev_async_add_write(beva);
247
}
248
}
249
250
static void
251
bev_async_consider_reading(struct bufferevent_async *beva)
252
{
253
size_t cur_size;
254
size_t read_high;
255
size_t at_most;
256
int limit;
257
struct bufferevent *bev = &beva->bev.bev;
258
259
/* Don't read if there is a read in progress, or we do not
260
* want to read. */
261
if (beva->read_in_progress || beva->bev.connecting)
262
return;
263
if (!beva->ok || !(bev->enabled&EV_READ)) {
264
bev_async_del_read(beva);
265
return;
266
}
267
268
/* Don't read if we're full */
269
cur_size = evbuffer_get_length(bev->input);
270
read_high = bev->wm_read.high;
271
if (read_high) {
272
if (cur_size >= read_high) {
273
bev_async_del_read(beva);
274
return;
275
}
276
at_most = read_high - cur_size;
277
} else {
278
at_most = 16384; /* FIXME totally magic. */
279
}
280
281
/* XXXX This over-commits. */
282
/* XXXX see also not above on cast on bufferevent_get_write_max_() */
283
limit = (int)bufferevent_get_read_max_(&beva->bev);
284
if (at_most >= (size_t)limit && limit >= 0)
285
at_most = limit;
286
287
if (beva->bev.read_suspended) {
288
bev_async_del_read(beva);
289
return;
290
}
291
292
bufferevent_incref_(bev);
293
if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
294
beva->ok = 0;
295
be_async_run_eventcb(bev, BEV_EVENT_ERROR, 0);
296
bufferevent_decref_(bev);
297
} else {
298
beva->read_in_progress = at_most;
299
bufferevent_decrement_read_buckets_(&beva->bev, at_most);
300
bev_async_add_read(beva);
301
}
302
303
return;
304
}
305
306
static void
307
be_async_outbuf_callback(struct evbuffer *buf,
308
const struct evbuffer_cb_info *cbinfo,
309
void *arg)
310
{
311
struct bufferevent *bev = arg;
312
struct bufferevent_async *bev_async = upcast(bev);
313
314
/* If we added data to the outbuf and were not writing before,
315
* we may want to write now. */
316
317
bufferevent_incref_and_lock_(bev);
318
319
if (cbinfo->n_added)
320
bev_async_consider_writing(bev_async);
321
322
bufferevent_decref_and_unlock_(bev);
323
}
324
325
static void
326
be_async_inbuf_callback(struct evbuffer *buf,
327
const struct evbuffer_cb_info *cbinfo,
328
void *arg)
329
{
330
struct bufferevent *bev = arg;
331
struct bufferevent_async *bev_async = upcast(bev);
332
333
/* If we drained data from the inbuf and were not reading before,
334
* we may want to read now */
335
336
bufferevent_incref_and_lock_(bev);
337
338
if (cbinfo->n_deleted)
339
bev_async_consider_reading(bev_async);
340
341
bufferevent_decref_and_unlock_(bev);
342
}
343
344
static int
345
be_async_enable(struct bufferevent *buf, short what)
346
{
347
struct bufferevent_async *bev_async = upcast(buf);
348
349
if (!bev_async->ok)
350
return -1;
351
352
if (bev_async->bev.connecting) {
353
/* Don't launch anything during connection attempts. */
354
return 0;
355
}
356
357
if (what & EV_READ)
358
BEV_RESET_GENERIC_READ_TIMEOUT(buf);
359
if (what & EV_WRITE)
360
BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
361
362
/* If we newly enable reading or writing, and we aren't reading or
363
writing already, consider launching a new read or write. */
364
365
if (what & EV_READ)
366
bev_async_consider_reading(bev_async);
367
if (what & EV_WRITE)
368
bev_async_consider_writing(bev_async);
369
return 0;
370
}
371
372
static int
373
be_async_disable(struct bufferevent *bev, short what)
374
{
375
struct bufferevent_async *bev_async = upcast(bev);
376
/* XXXX If we disable reading or writing, we may want to consider
377
* canceling any in-progress read or write operation, though it might
378
* not work. */
379
380
if (what & EV_READ) {
381
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
382
bev_async_del_read(bev_async);
383
}
384
if (what & EV_WRITE) {
385
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
386
bev_async_del_write(bev_async);
387
}
388
389
return 0;
390
}
391
392
static void
393
be_async_destruct(struct bufferevent *bev)
394
{
395
struct bufferevent_async *bev_async = upcast(bev);
396
struct bufferevent_private *bev_p = BEV_UPCAST(bev);
397
evutil_socket_t fd;
398
399
EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
400
!upcast(bev)->read_in_progress);
401
402
bev_async_del_read(bev_async);
403
bev_async_del_write(bev_async);
404
405
fd = evbuffer_overlapped_get_fd_(bev->input);
406
if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
407
(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
408
evutil_closesocket(fd);
409
evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
410
}
411
}
412
413
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
414
* we use WSAGetOverlappedResult to translate. */
415
static void
416
bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
417
{
418
DWORD bytes, flags;
419
evutil_socket_t fd;
420
421
fd = evbuffer_overlapped_get_fd_(bev->input);
422
WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
423
}
424
425
static int
426
be_async_flush(struct bufferevent *bev, short what,
427
enum bufferevent_flush_mode mode)
428
{
429
return 0;
430
}
431
432
static void
433
connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
434
ev_ssize_t nbytes, int ok)
435
{
436
struct bufferevent_async *bev_a = upcast_connect(eo);
437
struct bufferevent *bev = &bev_a->bev.bev;
438
evutil_socket_t sock;
439
440
BEV_LOCK(bev);
441
442
EVUTIL_ASSERT(bev_a->bev.connecting);
443
bev_a->bev.connecting = 0;
444
sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
445
/* XXXX Handle error? */
446
setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
447
448
if (ok)
449
bufferevent_async_set_connected_(bev);
450
else
451
bev_async_set_wsa_error(bev, eo);
452
453
be_async_run_eventcb(bev, ok ? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
454
455
event_base_del_virtual_(bev->ev_base);
456
457
bufferevent_decref_and_unlock_(bev);
458
}
459
460
static void
461
read_complete(struct event_overlapped *eo, ev_uintptr_t key,
462
ev_ssize_t nbytes, int ok)
463
{
464
struct bufferevent_async *bev_a = upcast_read(eo);
465
struct bufferevent *bev = &bev_a->bev.bev;
466
short what = BEV_EVENT_READING;
467
ev_ssize_t amount_unread;
468
BEV_LOCK(bev);
469
EVUTIL_ASSERT(bev_a->read_in_progress);
470
471
amount_unread = bev_a->read_in_progress - nbytes;
472
evbuffer_commit_read_(bev->input, nbytes);
473
bev_a->read_in_progress = 0;
474
if (amount_unread)
475
bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
476
477
if (!ok)
478
bev_async_set_wsa_error(bev, eo);
479
480
if (bev_a->ok) {
481
if (ok && nbytes) {
482
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
483
be_async_trigger_nolock(bev, EV_READ, 0);
484
bev_async_consider_reading(bev_a);
485
} else if (!ok) {
486
what |= BEV_EVENT_ERROR;
487
bev_a->ok = 0;
488
be_async_run_eventcb(bev, what, 0);
489
} else if (!nbytes) {
490
what |= BEV_EVENT_EOF;
491
bev_a->ok = 0;
492
be_async_run_eventcb(bev, what, 0);
493
}
494
}
495
496
bufferevent_decref_and_unlock_(bev);
497
}
498
499
static void
500
write_complete(struct event_overlapped *eo, ev_uintptr_t key,
501
ev_ssize_t nbytes, int ok)
502
{
503
struct bufferevent_async *bev_a = upcast_write(eo);
504
struct bufferevent *bev = &bev_a->bev.bev;
505
short what = BEV_EVENT_WRITING;
506
ev_ssize_t amount_unwritten;
507
508
BEV_LOCK(bev);
509
EVUTIL_ASSERT(bev_a->write_in_progress);
510
511
amount_unwritten = bev_a->write_in_progress - nbytes;
512
evbuffer_commit_write_(bev->output, nbytes);
513
bev_a->write_in_progress = 0;
514
515
if (amount_unwritten)
516
bufferevent_decrement_write_buckets_(&bev_a->bev,
517
-amount_unwritten);
518
519
520
if (!ok)
521
bev_async_set_wsa_error(bev, eo);
522
523
if (bev_a->ok) {
524
if (ok && nbytes) {
525
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
526
be_async_trigger_nolock(bev, EV_WRITE, 0);
527
bev_async_consider_writing(bev_a);
528
} else if (!ok) {
529
what |= BEV_EVENT_ERROR;
530
bev_a->ok = 0;
531
be_async_run_eventcb(bev, what, 0);
532
} else if (!nbytes) {
533
what |= BEV_EVENT_EOF;
534
bev_a->ok = 0;
535
be_async_run_eventcb(bev, what, 0);
536
}
537
}
538
539
bufferevent_decref_and_unlock_(bev);
540
}
541
542
struct bufferevent *
543
bufferevent_async_new_(struct event_base *base,
544
evutil_socket_t fd, int options)
545
{
546
struct bufferevent_async *bev_a;
547
struct bufferevent *bev;
548
struct event_iocp_port *iocp;
549
550
options |= BEV_OPT_THREADSAFE;
551
552
if (!(iocp = event_base_get_iocp_(base)))
553
return NULL;
554
555
if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
556
if (fatal_error(GetLastError()))
557
return NULL;
558
}
559
560
if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
561
return NULL;
562
563
bev = &bev_a->bev.bev;
564
if (!(bev->input = evbuffer_overlapped_new_(fd))) {
565
mm_free(bev_a);
566
return NULL;
567
}
568
if (!(bev->output = evbuffer_overlapped_new_(fd))) {
569
evbuffer_free(bev->input);
570
mm_free(bev_a);
571
return NULL;
572
}
573
574
if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
575
options)<0)
576
goto err;
577
578
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
579
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
580
581
event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
582
event_overlapped_init_(&bev_a->read_overlapped, read_complete);
583
event_overlapped_init_(&bev_a->write_overlapped, write_complete);
584
585
bufferevent_init_generic_timeout_cbs_(bev);
586
587
bev_a->ok = fd >= 0;
588
589
return bev;
590
err:
591
bufferevent_free(&bev_a->bev.bev);
592
return NULL;
593
}
594
595
void
596
bufferevent_async_set_connected_(struct bufferevent *bev)
597
{
598
struct bufferevent_async *bev_async = upcast(bev);
599
bev_async->ok = 1;
600
/* Now's a good time to consider reading/writing */
601
be_async_enable(bev, bev->enabled);
602
}
603
604
int
605
bufferevent_async_can_connect_(struct bufferevent *bev)
606
{
607
const struct win32_extension_fns *ext =
608
event_get_win32_extension_fns_();
609
610
if (BEV_IS_ASYNC(bev) &&
611
event_base_get_iocp_(bev->ev_base) &&
612
ext && ext->ConnectEx)
613
return 1;
614
615
return 0;
616
}
617
618
int
619
bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
620
const struct sockaddr *sa, int socklen)
621
{
622
BOOL rc;
623
struct bufferevent_async *bev_async = upcast(bev);
624
struct sockaddr_storage ss;
625
const struct win32_extension_fns *ext =
626
event_get_win32_extension_fns_();
627
628
EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
629
630
/* ConnectEx() requires that the socket be bound to an address
631
* with bind() before using, otherwise it will fail. We attempt
632
* to issue a bind() here, taking into account that the error
633
* code is set to WSAEINVAL when the socket is already bound. */
634
memset(&ss, 0, sizeof(ss));
635
if (sa->sa_family == AF_INET) {
636
struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
637
sin->sin_family = AF_INET;
638
sin->sin_addr.s_addr = INADDR_ANY;
639
} else if (sa->sa_family == AF_INET6) {
640
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
641
sin6->sin6_family = AF_INET6;
642
sin6->sin6_addr = in6addr_any;
643
} else {
644
/* Well, the user will have to bind() */
645
return -1;
646
}
647
if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
648
WSAGetLastError() != WSAEINVAL)
649
return -1;
650
651
event_base_add_virtual_(bev->ev_base);
652
bufferevent_incref_(bev);
653
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
654
&bev_async->connect_overlapped.overlapped);
655
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
656
return 0;
657
658
event_base_del_virtual_(bev->ev_base);
659
bufferevent_decref_(bev);
660
661
return -1;
662
}
663
664
static int
665
be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
666
union bufferevent_ctrl_data *data)
667
{
668
switch (op) {
669
case BEV_CTRL_GET_FD:
670
data->fd = evbuffer_overlapped_get_fd_(bev->input);
671
return 0;
672
case BEV_CTRL_SET_FD: {
673
struct bufferevent_async *bev_a = upcast(bev);
674
struct event_iocp_port *iocp;
675
676
if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
677
return 0;
678
if (!(iocp = event_base_get_iocp_(bev->ev_base)))
679
return -1;
680
if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) {
681
if (fatal_error(GetLastError()))
682
return -1;
683
}
684
evbuffer_overlapped_set_fd_(bev->input, data->fd);
685
evbuffer_overlapped_set_fd_(bev->output, data->fd);
686
bev_a->ok = data->fd >= 0;
687
return 0;
688
}
689
case BEV_CTRL_CANCEL_ALL: {
690
struct bufferevent_async *bev_a = upcast(bev);
691
evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
692
if (fd != (evutil_socket_t)EVUTIL_INVALID_SOCKET &&
693
(bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
694
closesocket(fd);
695
evbuffer_overlapped_set_fd_(bev->input, EVUTIL_INVALID_SOCKET);
696
}
697
bev_a->ok = 0;
698
return 0;
699
}
700
case BEV_CTRL_GET_UNDERLYING:
701
default:
702
return -1;
703
}
704
}
705
706
707
708