Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Kitware
GitHub Repository: Kitware/CMake
Path: blob/master/Utilities/cmlibuv/src/win/tcp.c
3153 views
1
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2
*
3
* Permission is hereby granted, free of charge, to any person obtaining a copy
4
* of this software and associated documentation files (the "Software"), to
5
* deal in the Software without restriction, including without limitation the
6
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7
* sell copies of the Software, and to permit persons to whom the Software is
8
* furnished to do so, subject to the following conditions:
9
*
10
* The above copyright notice and this permission notice shall be included in
11
* all copies or substantial portions of the Software.
12
*
13
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19
* IN THE SOFTWARE.
20
*/
21
22
#include <assert.h>
23
#include <stdlib.h>
24
25
#include "uv.h"
26
#include "internal.h"
27
#include "handle-inl.h"
28
#include "stream-inl.h"
29
#include "req-inl.h"
30
31
32
/*
33
* Threshold of active tcp streams for which to preallocate tcp read buffers.
34
* (Due to node slab allocator performing poorly under this pattern,
35
* the optimization is temporarily disabled (threshold=0). This will be
36
* revisited once node allocator is improved.)
37
*/
38
const unsigned int uv_active_tcp_streams_threshold = 0;
39
40
/*
41
* Number of simultaneous pending AcceptEx calls.
42
*/
43
const unsigned int uv_simultaneous_server_accepts = 32;
44
45
/* A zero-size buffer for use by uv_tcp_read */
46
static char uv_zero_[] = "";
47
48
static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
49
if (setsockopt(socket,
50
IPPROTO_TCP,
51
TCP_NODELAY,
52
(const char*)&enable,
53
sizeof enable) == -1) {
54
return WSAGetLastError();
55
}
56
return 0;
57
}
58
59
60
static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
61
if (setsockopt(socket,
62
SOL_SOCKET,
63
SO_KEEPALIVE,
64
(const char*)&enable,
65
sizeof enable) == -1) {
66
return WSAGetLastError();
67
}
68
69
if (enable && setsockopt(socket,
70
IPPROTO_TCP,
71
TCP_KEEPALIVE,
72
(const char*)&delay,
73
sizeof delay) == -1) {
74
return WSAGetLastError();
75
}
76
77
return 0;
78
}
79
80
81
static int uv__tcp_set_socket(uv_loop_t* loop,
82
uv_tcp_t* handle,
83
SOCKET socket,
84
int family,
85
int imported) {
86
DWORD yes = 1;
87
int non_ifs_lsp;
88
int err;
89
90
if (handle->socket != INVALID_SOCKET)
91
return UV_EBUSY;
92
93
/* Set the socket to nonblocking mode */
94
if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
95
return WSAGetLastError();
96
}
97
98
/* Make the socket non-inheritable */
99
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
100
return GetLastError();
101
102
/* Associate it with the I/O completion port. Use uv_handle_t pointer as
103
* completion key. */
104
if (CreateIoCompletionPort((HANDLE)socket,
105
loop->iocp,
106
(ULONG_PTR)socket,
107
0) == NULL) {
108
if (imported) {
109
handle->flags |= UV_HANDLE_EMULATE_IOCP;
110
} else {
111
return GetLastError();
112
}
113
}
114
115
if (family == AF_INET6) {
116
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
117
} else {
118
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
119
}
120
121
if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
122
UCHAR sfcnm_flags =
123
FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
124
if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
125
return GetLastError();
126
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
127
}
128
129
if (handle->flags & UV_HANDLE_TCP_NODELAY) {
130
err = uv__tcp_nodelay(handle, socket, 1);
131
if (err)
132
return err;
133
}
134
135
/* TODO: Use stored delay. */
136
if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
137
err = uv__tcp_keepalive(handle, socket, 1, 60);
138
if (err)
139
return err;
140
}
141
142
handle->socket = socket;
143
144
if (family == AF_INET6) {
145
handle->flags |= UV_HANDLE_IPV6;
146
} else {
147
assert(!(handle->flags & UV_HANDLE_IPV6));
148
}
149
150
return 0;
151
}
152
153
154
int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
155
int domain;
156
157
/* Use the lower 8 bits for the domain */
158
domain = flags & 0xFF;
159
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
160
return UV_EINVAL;
161
162
if (flags & ~0xFF)
163
return UV_EINVAL;
164
165
uv__stream_init(loop, (uv_stream_t*) handle, UV_TCP);
166
handle->tcp.serv.accept_reqs = NULL;
167
handle->tcp.serv.pending_accepts = NULL;
168
handle->socket = INVALID_SOCKET;
169
handle->reqs_pending = 0;
170
handle->tcp.serv.func_acceptex = NULL;
171
handle->tcp.conn.func_connectex = NULL;
172
handle->tcp.serv.processed_accepts = 0;
173
handle->delayed_error = 0;
174
175
/* If anything fails beyond this point we need to remove the handle from
176
* the handle queue, since it was added by uv__handle_init in uv__stream_init.
177
*/
178
179
if (domain != AF_UNSPEC) {
180
SOCKET sock;
181
DWORD err;
182
183
sock = socket(domain, SOCK_STREAM, 0);
184
if (sock == INVALID_SOCKET) {
185
err = WSAGetLastError();
186
QUEUE_REMOVE(&handle->handle_queue);
187
return uv_translate_sys_error(err);
188
}
189
190
err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0);
191
if (err) {
192
closesocket(sock);
193
QUEUE_REMOVE(&handle->handle_queue);
194
return uv_translate_sys_error(err);
195
}
196
197
}
198
199
return 0;
200
}
201
202
203
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
204
return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
205
}
206
207
208
void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown_t *req) {
209
int err;
210
211
assert(req);
212
assert(stream->stream.conn.write_reqs_pending == 0);
213
assert(!(stream->flags & UV_HANDLE_SHUT));
214
assert(stream->flags & UV_HANDLE_CONNECTION);
215
216
stream->stream.conn.shutdown_req = NULL;
217
stream->flags &= ~UV_HANDLE_SHUTTING;
218
UNREGISTER_HANDLE_REQ(loop, stream, req);
219
220
err = 0;
221
if (stream->flags & UV_HANDLE_CLOSING)
222
/* The user destroyed the stream before we got to do the shutdown. */
223
err = UV_ECANCELED;
224
else if (shutdown(stream->socket, SD_SEND) == SOCKET_ERROR)
225
err = uv_translate_sys_error(WSAGetLastError());
226
else /* Success. */
227
stream->flags |= UV_HANDLE_SHUT;
228
229
if (req->cb)
230
req->cb(req, err);
231
232
DECREASE_PENDING_REQ_COUNT(stream);
233
}
234
235
236
void uv__tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
237
unsigned int i;
238
uv_tcp_accept_t* req;
239
240
assert(handle->flags & UV_HANDLE_CLOSING);
241
assert(handle->reqs_pending == 0);
242
assert(!(handle->flags & UV_HANDLE_CLOSED));
243
assert(handle->socket == INVALID_SOCKET);
244
245
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
246
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
247
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
248
req = &handle->tcp.serv.accept_reqs[i];
249
if (req->wait_handle != INVALID_HANDLE_VALUE) {
250
UnregisterWait(req->wait_handle);
251
req->wait_handle = INVALID_HANDLE_VALUE;
252
}
253
if (req->event_handle != NULL) {
254
CloseHandle(req->event_handle);
255
req->event_handle = NULL;
256
}
257
}
258
}
259
260
uv__free(handle->tcp.serv.accept_reqs);
261
handle->tcp.serv.accept_reqs = NULL;
262
}
263
264
if (handle->flags & UV_HANDLE_CONNECTION &&
265
handle->flags & UV_HANDLE_EMULATE_IOCP) {
266
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
267
UnregisterWait(handle->read_req.wait_handle);
268
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
269
}
270
if (handle->read_req.event_handle != NULL) {
271
CloseHandle(handle->read_req.event_handle);
272
handle->read_req.event_handle = NULL;
273
}
274
}
275
276
uv__handle_close(handle);
277
loop->active_tcp_streams--;
278
}
279
280
281
/* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
282
* allow binding to addresses that are in use by sockets in TIME_WAIT, it
283
* effectively allows 'stealing' a port which is in use by another application.
284
*
285
* SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
286
* regardless of state, so we'd get an error even if the port is in use by a
287
* socket in TIME_WAIT state.
288
*
289
* See issue #1360.
290
*
291
*/
292
static int uv__tcp_try_bind(uv_tcp_t* handle,
293
const struct sockaddr* addr,
294
unsigned int addrlen,
295
unsigned int flags) {
296
DWORD err;
297
int r;
298
299
if (handle->socket == INVALID_SOCKET) {
300
SOCKET sock;
301
302
/* Cannot set IPv6-only mode on non-IPv6 socket. */
303
if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
304
return ERROR_INVALID_PARAMETER;
305
306
sock = socket(addr->sa_family, SOCK_STREAM, 0);
307
if (sock == INVALID_SOCKET) {
308
return WSAGetLastError();
309
}
310
311
err = uv__tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
312
if (err) {
313
closesocket(sock);
314
return err;
315
}
316
}
317
318
#ifdef IPV6_V6ONLY
319
if (addr->sa_family == AF_INET6) {
320
int on;
321
322
on = (flags & UV_TCP_IPV6ONLY) != 0;
323
324
/* TODO: how to handle errors? This may fail if there is no ipv4 stack
325
* available, or when run on XP/2003 which have no support for dualstack
326
* sockets. For now we're silently ignoring the error. */
327
setsockopt(handle->socket,
328
IPPROTO_IPV6,
329
IPV6_V6ONLY,
330
(const char*)&on,
331
sizeof on);
332
}
333
#endif
334
335
r = bind(handle->socket, addr, addrlen);
336
337
if (r == SOCKET_ERROR) {
338
err = WSAGetLastError();
339
if (err == WSAEADDRINUSE) {
340
/* Some errors are not to be reported until connect() or listen() */
341
handle->delayed_error = err;
342
} else {
343
return err;
344
}
345
}
346
347
handle->flags |= UV_HANDLE_BOUND;
348
349
return 0;
350
}
351
352
353
static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
354
uv_req_t* req;
355
uv_tcp_t* handle;
356
357
req = (uv_req_t*) context;
358
assert(req != NULL);
359
handle = (uv_tcp_t*)req->data;
360
assert(handle != NULL);
361
assert(!timed_out);
362
363
if (!PostQueuedCompletionStatus(handle->loop->iocp,
364
req->u.io.overlapped.InternalHigh,
365
0,
366
&req->u.io.overlapped)) {
367
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
368
}
369
}
370
371
372
static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
373
uv_write_t* req;
374
uv_tcp_t* handle;
375
376
req = (uv_write_t*) context;
377
assert(req != NULL);
378
handle = (uv_tcp_t*)req->handle;
379
assert(handle != NULL);
380
assert(!timed_out);
381
382
if (!PostQueuedCompletionStatus(handle->loop->iocp,
383
req->u.io.overlapped.InternalHigh,
384
0,
385
&req->u.io.overlapped)) {
386
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
387
}
388
}
389
390
391
static void uv__tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
392
uv_loop_t* loop = handle->loop;
393
BOOL success;
394
DWORD bytes;
395
SOCKET accept_socket;
396
short family;
397
398
assert(handle->flags & UV_HANDLE_LISTENING);
399
assert(req->accept_socket == INVALID_SOCKET);
400
401
/* choose family and extension function */
402
if (handle->flags & UV_HANDLE_IPV6) {
403
family = AF_INET6;
404
} else {
405
family = AF_INET;
406
}
407
408
/* Open a socket for the accepted connection. */
409
accept_socket = socket(family, SOCK_STREAM, 0);
410
if (accept_socket == INVALID_SOCKET) {
411
SET_REQ_ERROR(req, WSAGetLastError());
412
uv__insert_pending_req(loop, (uv_req_t*)req);
413
handle->reqs_pending++;
414
return;
415
}
416
417
/* Make the socket non-inheritable */
418
if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
419
SET_REQ_ERROR(req, GetLastError());
420
uv__insert_pending_req(loop, (uv_req_t*)req);
421
handle->reqs_pending++;
422
closesocket(accept_socket);
423
return;
424
}
425
426
/* Prepare the overlapped structure. */
427
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
428
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
429
assert(req->event_handle != NULL);
430
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
431
}
432
433
success = handle->tcp.serv.func_acceptex(handle->socket,
434
accept_socket,
435
(void*)req->accept_buffer,
436
0,
437
sizeof(struct sockaddr_storage),
438
sizeof(struct sockaddr_storage),
439
&bytes,
440
&req->u.io.overlapped);
441
442
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
443
/* Process the req without IOCP. */
444
req->accept_socket = accept_socket;
445
handle->reqs_pending++;
446
uv__insert_pending_req(loop, (uv_req_t*)req);
447
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
448
/* The req will be processed with IOCP. */
449
req->accept_socket = accept_socket;
450
handle->reqs_pending++;
451
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
452
req->wait_handle == INVALID_HANDLE_VALUE &&
453
!RegisterWaitForSingleObject(&req->wait_handle,
454
req->event_handle, post_completion, (void*) req,
455
INFINITE, WT_EXECUTEINWAITTHREAD)) {
456
SET_REQ_ERROR(req, GetLastError());
457
uv__insert_pending_req(loop, (uv_req_t*)req);
458
}
459
} else {
460
/* Make this req pending reporting an error. */
461
SET_REQ_ERROR(req, WSAGetLastError());
462
uv__insert_pending_req(loop, (uv_req_t*)req);
463
handle->reqs_pending++;
464
/* Destroy the preallocated client socket. */
465
closesocket(accept_socket);
466
/* Destroy the event handle */
467
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
468
CloseHandle(req->event_handle);
469
req->event_handle = NULL;
470
}
471
}
472
}
473
474
475
static void uv__tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
476
uv_read_t* req;
477
uv_buf_t buf;
478
int result;
479
DWORD bytes, flags;
480
481
assert(handle->flags & UV_HANDLE_READING);
482
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
483
484
req = &handle->read_req;
485
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
486
487
/*
488
* Preallocate a read buffer if the number of active streams is below
489
* the threshold.
490
*/
491
if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
492
handle->flags &= ~UV_HANDLE_ZERO_READ;
493
handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
494
handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
495
if (handle->tcp.conn.read_buffer.base == NULL ||
496
handle->tcp.conn.read_buffer.len == 0) {
497
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
498
return;
499
}
500
assert(handle->tcp.conn.read_buffer.base != NULL);
501
buf = handle->tcp.conn.read_buffer;
502
} else {
503
handle->flags |= UV_HANDLE_ZERO_READ;
504
buf.base = (char*) &uv_zero_;
505
buf.len = 0;
506
}
507
508
/* Prepare the overlapped structure. */
509
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
510
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
511
assert(req->event_handle != NULL);
512
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
513
}
514
515
flags = 0;
516
result = WSARecv(handle->socket,
517
(WSABUF*)&buf,
518
1,
519
&bytes,
520
&flags,
521
&req->u.io.overlapped,
522
NULL);
523
524
handle->flags |= UV_HANDLE_READ_PENDING;
525
handle->reqs_pending++;
526
527
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
528
/* Process the req without IOCP. */
529
req->u.io.overlapped.InternalHigh = bytes;
530
uv__insert_pending_req(loop, (uv_req_t*)req);
531
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
532
/* The req will be processed with IOCP. */
533
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
534
req->wait_handle == INVALID_HANDLE_VALUE &&
535
!RegisterWaitForSingleObject(&req->wait_handle,
536
req->event_handle, post_completion, (void*) req,
537
INFINITE, WT_EXECUTEINWAITTHREAD)) {
538
SET_REQ_ERROR(req, GetLastError());
539
uv__insert_pending_req(loop, (uv_req_t*)req);
540
}
541
} else {
542
/* Make this req pending reporting an error. */
543
SET_REQ_ERROR(req, WSAGetLastError());
544
uv__insert_pending_req(loop, (uv_req_t*)req);
545
}
546
}
547
548
549
int uv_tcp_close_reset(uv_tcp_t* handle, uv_close_cb close_cb) {
550
struct linger l = { 1, 0 };
551
552
/* Disallow setting SO_LINGER to zero due to some platform inconsistencies */
553
if (handle->flags & UV_HANDLE_SHUTTING)
554
return UV_EINVAL;
555
556
if (0 != setsockopt(handle->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof(l)))
557
return uv_translate_sys_error(WSAGetLastError());
558
559
uv_close((uv_handle_t*) handle, close_cb);
560
return 0;
561
}
562
563
564
int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
565
unsigned int i, simultaneous_accepts;
566
uv_tcp_accept_t* req;
567
int err;
568
569
assert(backlog > 0);
570
571
if (handle->flags & UV_HANDLE_LISTENING) {
572
handle->stream.serv.connection_cb = cb;
573
}
574
575
if (handle->flags & UV_HANDLE_READING) {
576
return WSAEISCONN;
577
}
578
579
if (handle->delayed_error) {
580
return handle->delayed_error;
581
}
582
583
if (!(handle->flags & UV_HANDLE_BOUND)) {
584
err = uv__tcp_try_bind(handle,
585
(const struct sockaddr*) &uv_addr_ip4_any_,
586
sizeof(uv_addr_ip4_any_),
587
0);
588
if (err)
589
return err;
590
if (handle->delayed_error)
591
return handle->delayed_error;
592
}
593
594
if (!handle->tcp.serv.func_acceptex) {
595
if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
596
return WSAEAFNOSUPPORT;
597
}
598
}
599
600
/* If this flag is set, we already made this listen call in xfer. */
601
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
602
listen(handle->socket, backlog) == SOCKET_ERROR) {
603
return WSAGetLastError();
604
}
605
606
handle->flags |= UV_HANDLE_LISTENING;
607
handle->stream.serv.connection_cb = cb;
608
INCREASE_ACTIVE_COUNT(loop, handle);
609
610
simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
611
: uv_simultaneous_server_accepts;
612
613
if (handle->tcp.serv.accept_reqs == NULL) {
614
handle->tcp.serv.accept_reqs =
615
uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
616
if (!handle->tcp.serv.accept_reqs) {
617
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
618
}
619
620
for (i = 0; i < simultaneous_accepts; i++) {
621
req = &handle->tcp.serv.accept_reqs[i];
622
UV_REQ_INIT(req, UV_ACCEPT);
623
req->accept_socket = INVALID_SOCKET;
624
req->data = handle;
625
626
req->wait_handle = INVALID_HANDLE_VALUE;
627
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
628
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
629
if (req->event_handle == NULL) {
630
uv_fatal_error(GetLastError(), "CreateEvent");
631
}
632
} else {
633
req->event_handle = NULL;
634
}
635
636
uv__tcp_queue_accept(handle, req);
637
}
638
639
/* Initialize other unused requests too, because uv_tcp_endgame doesn't
640
* know how many requests were initialized, so it will try to clean up
641
* {uv_simultaneous_server_accepts} requests. */
642
for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
643
req = &handle->tcp.serv.accept_reqs[i];
644
UV_REQ_INIT(req, UV_ACCEPT);
645
req->accept_socket = INVALID_SOCKET;
646
req->data = handle;
647
req->wait_handle = INVALID_HANDLE_VALUE;
648
req->event_handle = NULL;
649
}
650
}
651
652
return 0;
653
}
654
655
656
int uv__tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
657
uv_loop_t* loop = server->loop;
658
int err = 0;
659
int family;
660
661
uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
662
663
if (!req) {
664
/* No valid connections found, so we error out. */
665
return WSAEWOULDBLOCK;
666
}
667
668
if (req->accept_socket == INVALID_SOCKET) {
669
return WSAENOTCONN;
670
}
671
672
if (server->flags & UV_HANDLE_IPV6) {
673
family = AF_INET6;
674
} else {
675
family = AF_INET;
676
}
677
678
err = uv__tcp_set_socket(client->loop,
679
client,
680
req->accept_socket,
681
family,
682
0);
683
if (err) {
684
closesocket(req->accept_socket);
685
} else {
686
uv__connection_init((uv_stream_t*) client);
687
/* AcceptEx() implicitly binds the accepted socket. */
688
client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
689
}
690
691
/* Prepare the req to pick up a new connection */
692
server->tcp.serv.pending_accepts = req->next_pending;
693
req->next_pending = NULL;
694
req->accept_socket = INVALID_SOCKET;
695
696
if (!(server->flags & UV_HANDLE_CLOSING)) {
697
/* Check if we're in a middle of changing the number of pending accepts. */
698
if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
699
uv__tcp_queue_accept(server, req);
700
} else {
701
/* We better be switching to a single pending accept. */
702
assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
703
704
server->tcp.serv.processed_accepts++;
705
706
if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
707
server->tcp.serv.processed_accepts = 0;
708
/*
709
* All previously queued accept requests are now processed.
710
* We now switch to queueing just a single accept.
711
*/
712
uv__tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
713
server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
714
server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
715
}
716
}
717
}
718
719
loop->active_tcp_streams++;
720
721
return err;
722
}
723
724
725
int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
726
uv_read_cb read_cb) {
727
uv_loop_t* loop = handle->loop;
728
729
handle->flags |= UV_HANDLE_READING;
730
handle->read_cb = read_cb;
731
handle->alloc_cb = alloc_cb;
732
INCREASE_ACTIVE_COUNT(loop, handle);
733
734
/* If reading was stopped and then started again, there could still be a read
735
* request pending. */
736
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
737
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
738
handle->read_req.event_handle == NULL) {
739
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
740
if (handle->read_req.event_handle == NULL) {
741
uv_fatal_error(GetLastError(), "CreateEvent");
742
}
743
}
744
uv__tcp_queue_read(loop, handle);
745
}
746
747
return 0;
748
}
749
750
static int uv__is_loopback(const struct sockaddr_storage* storage) {
751
const struct sockaddr_in* in4;
752
const struct sockaddr_in6* in6;
753
int i;
754
755
if (storage->ss_family == AF_INET) {
756
in4 = (const struct sockaddr_in*) storage;
757
return in4->sin_addr.S_un.S_un_b.s_b1 == 127;
758
}
759
if (storage->ss_family == AF_INET6) {
760
in6 = (const struct sockaddr_in6*) storage;
761
for (i = 0; i < 7; ++i) {
762
if (in6->sin6_addr.u.Word[i] != 0)
763
return 0;
764
}
765
return in6->sin6_addr.u.Word[7] == htons(1);
766
}
767
return 0;
768
}
769
770
// Check if Windows version is 10.0.16299 or later
771
static int uv__is_fast_loopback_fail_supported(void) {
772
OSVERSIONINFOW os_info;
773
if (!pRtlGetVersion)
774
return 0;
775
pRtlGetVersion(&os_info);
776
if (os_info.dwMajorVersion < 10)
777
return 0;
778
if (os_info.dwMajorVersion > 10)
779
return 1;
780
if (os_info.dwMinorVersion > 0)
781
return 1;
782
return os_info.dwBuildNumber >= 16299;
783
}
784
785
static int uv__tcp_try_connect(uv_connect_t* req,
786
uv_tcp_t* handle,
787
const struct sockaddr* addr,
788
unsigned int addrlen,
789
uv_connect_cb cb) {
790
uv_loop_t* loop = handle->loop;
791
TCP_INITIAL_RTO_PARAMETERS retransmit_ioctl;
792
const struct sockaddr* bind_addr;
793
struct sockaddr_storage converted;
794
BOOL success;
795
DWORD bytes;
796
int err;
797
798
err = uv__convert_to_localhost_if_unspecified(addr, &converted);
799
if (err)
800
return err;
801
802
if (handle->delayed_error != 0)
803
goto out;
804
805
if (!(handle->flags & UV_HANDLE_BOUND)) {
806
if (addrlen == sizeof(uv_addr_ip4_any_)) {
807
bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
808
} else if (addrlen == sizeof(uv_addr_ip6_any_)) {
809
bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
810
} else {
811
abort();
812
}
813
err = uv__tcp_try_bind(handle, bind_addr, addrlen, 0);
814
if (err)
815
return err;
816
if (handle->delayed_error != 0)
817
goto out;
818
}
819
820
if (!handle->tcp.conn.func_connectex) {
821
if (!uv__get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
822
return WSAEAFNOSUPPORT;
823
}
824
}
825
826
/* This makes connect() fail instantly if the target port on the localhost
827
* is not reachable, instead of waiting for 2s. We do not care if this fails.
828
* This only works on Windows version 10.0.16299 and later.
829
*/
830
if (uv__is_fast_loopback_fail_supported() && uv__is_loopback(&converted)) {
831
memset(&retransmit_ioctl, 0, sizeof(retransmit_ioctl));
832
retransmit_ioctl.Rtt = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
833
retransmit_ioctl.MaxSynRetransmissions = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
834
WSAIoctl(handle->socket,
835
SIO_TCP_INITIAL_RTO,
836
&retransmit_ioctl,
837
sizeof(retransmit_ioctl),
838
NULL,
839
0,
840
&bytes,
841
NULL,
842
NULL);
843
}
844
845
out:
846
847
UV_REQ_INIT(req, UV_CONNECT);
848
req->handle = (uv_stream_t*) handle;
849
req->cb = cb;
850
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
851
852
if (handle->delayed_error != 0) {
853
/* Process the req without IOCP. */
854
handle->reqs_pending++;
855
REGISTER_HANDLE_REQ(loop, handle, req);
856
uv__insert_pending_req(loop, (uv_req_t*)req);
857
return 0;
858
}
859
860
success = handle->tcp.conn.func_connectex(handle->socket,
861
(const struct sockaddr*) &converted,
862
addrlen,
863
NULL,
864
0,
865
&bytes,
866
&req->u.io.overlapped);
867
868
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
869
/* Process the req without IOCP. */
870
handle->reqs_pending++;
871
REGISTER_HANDLE_REQ(loop, handle, req);
872
uv__insert_pending_req(loop, (uv_req_t*)req);
873
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
874
/* The req will be processed with IOCP. */
875
handle->reqs_pending++;
876
REGISTER_HANDLE_REQ(loop, handle, req);
877
} else {
878
return WSAGetLastError();
879
}
880
881
return 0;
882
}
883
884
885
int uv_tcp_getsockname(const uv_tcp_t* handle,
886
struct sockaddr* name,
887
int* namelen) {
888
889
return uv__getsockpeername((const uv_handle_t*) handle,
890
getsockname,
891
name,
892
namelen,
893
handle->delayed_error);
894
}
895
896
897
int uv_tcp_getpeername(const uv_tcp_t* handle,
898
struct sockaddr* name,
899
int* namelen) {
900
901
return uv__getsockpeername((const uv_handle_t*) handle,
902
getpeername,
903
name,
904
namelen,
905
handle->delayed_error);
906
}
907
908
909
int uv__tcp_write(uv_loop_t* loop,
910
uv_write_t* req,
911
uv_tcp_t* handle,
912
const uv_buf_t bufs[],
913
unsigned int nbufs,
914
uv_write_cb cb) {
915
int result;
916
DWORD bytes;
917
918
UV_REQ_INIT(req, UV_WRITE);
919
req->handle = (uv_stream_t*) handle;
920
req->cb = cb;
921
922
/* Prepare the overlapped structure. */
923
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
924
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
925
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
926
if (req->event_handle == NULL) {
927
uv_fatal_error(GetLastError(), "CreateEvent");
928
}
929
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
930
req->wait_handle = INVALID_HANDLE_VALUE;
931
}
932
933
result = WSASend(handle->socket,
934
(WSABUF*) bufs,
935
nbufs,
936
&bytes,
937
0,
938
&req->u.io.overlapped,
939
NULL);
940
941
if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
942
/* Request completed immediately. */
943
req->u.io.queued_bytes = 0;
944
handle->reqs_pending++;
945
handle->stream.conn.write_reqs_pending++;
946
REGISTER_HANDLE_REQ(loop, handle, req);
947
uv__insert_pending_req(loop, (uv_req_t*) req);
948
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
949
/* Request queued by the kernel. */
950
req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
951
handle->reqs_pending++;
952
handle->stream.conn.write_reqs_pending++;
953
REGISTER_HANDLE_REQ(loop, handle, req);
954
handle->write_queue_size += req->u.io.queued_bytes;
955
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
956
!RegisterWaitForSingleObject(&req->wait_handle,
957
req->event_handle, post_write_completion, (void*) req,
958
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
959
SET_REQ_ERROR(req, GetLastError());
960
uv__insert_pending_req(loop, (uv_req_t*)req);
961
}
962
} else {
963
/* Send failed due to an error, report it later */
964
req->u.io.queued_bytes = 0;
965
handle->reqs_pending++;
966
handle->stream.conn.write_reqs_pending++;
967
REGISTER_HANDLE_REQ(loop, handle, req);
968
SET_REQ_ERROR(req, WSAGetLastError());
969
uv__insert_pending_req(loop, (uv_req_t*) req);
970
}
971
972
return 0;
973
}
974
975
976
int uv__tcp_try_write(uv_tcp_t* handle,
977
const uv_buf_t bufs[],
978
unsigned int nbufs) {
979
int result;
980
DWORD bytes;
981
982
if (handle->stream.conn.write_reqs_pending > 0)
983
return UV_EAGAIN;
984
985
result = WSASend(handle->socket,
986
(WSABUF*) bufs,
987
nbufs,
988
&bytes,
989
0,
990
NULL,
991
NULL);
992
993
if (result == SOCKET_ERROR)
994
return uv_translate_sys_error(WSAGetLastError());
995
else
996
return bytes;
997
}
998
999
1000
void uv__process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
1001
uv_req_t* req) {
1002
DWORD bytes, flags, err;
1003
uv_buf_t buf;
1004
int count;
1005
1006
assert(handle->type == UV_TCP);
1007
1008
handle->flags &= ~UV_HANDLE_READ_PENDING;
1009
1010
if (!REQ_SUCCESS(req)) {
1011
/* An error occurred doing the read. */
1012
if ((handle->flags & UV_HANDLE_READING) ||
1013
!(handle->flags & UV_HANDLE_ZERO_READ)) {
1014
handle->flags &= ~UV_HANDLE_READING;
1015
DECREASE_ACTIVE_COUNT(loop, handle);
1016
buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
1017
uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
1018
1019
err = GET_REQ_SOCK_ERROR(req);
1020
1021
if (err == WSAECONNABORTED) {
1022
/* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
1023
*/
1024
err = WSAECONNRESET;
1025
}
1026
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1027
1028
handle->read_cb((uv_stream_t*)handle,
1029
uv_translate_sys_error(err),
1030
&buf);
1031
}
1032
} else {
1033
if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
1034
/* The read was done with a non-zero buffer length. */
1035
if (req->u.io.overlapped.InternalHigh > 0) {
1036
/* Successful read */
1037
handle->read_cb((uv_stream_t*)handle,
1038
req->u.io.overlapped.InternalHigh,
1039
&handle->tcp.conn.read_buffer);
1040
/* Read again only if bytes == buf.len */
1041
if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
1042
goto done;
1043
}
1044
} else {
1045
/* Connection closed */
1046
if (handle->flags & UV_HANDLE_READING) {
1047
handle->flags &= ~UV_HANDLE_READING;
1048
DECREASE_ACTIVE_COUNT(loop, handle);
1049
}
1050
1051
buf.base = 0;
1052
buf.len = 0;
1053
handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
1054
goto done;
1055
}
1056
}
1057
1058
/* Do nonblocking reads until the buffer is empty */
1059
count = 32;
1060
while ((handle->flags & UV_HANDLE_READING) && (count-- > 0)) {
1061
buf = uv_buf_init(NULL, 0);
1062
handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
1063
if (buf.base == NULL || buf.len == 0) {
1064
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1065
break;
1066
}
1067
assert(buf.base != NULL);
1068
1069
flags = 0;
1070
if (WSARecv(handle->socket,
1071
(WSABUF*)&buf,
1072
1,
1073
&bytes,
1074
&flags,
1075
NULL,
1076
NULL) != SOCKET_ERROR) {
1077
if (bytes > 0) {
1078
/* Successful read */
1079
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1080
/* Read again only if bytes == buf.len */
1081
if (bytes < buf.len) {
1082
break;
1083
}
1084
} else {
1085
/* Connection closed */
1086
handle->flags &= ~UV_HANDLE_READING;
1087
DECREASE_ACTIVE_COUNT(loop, handle);
1088
1089
handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
1090
break;
1091
}
1092
} else {
1093
err = WSAGetLastError();
1094
if (err == WSAEWOULDBLOCK) {
1095
/* Read buffer was completely empty, report a 0-byte read. */
1096
handle->read_cb((uv_stream_t*)handle, 0, &buf);
1097
} else {
1098
/* Ouch! serious error. */
1099
handle->flags &= ~UV_HANDLE_READING;
1100
DECREASE_ACTIVE_COUNT(loop, handle);
1101
1102
if (err == WSAECONNABORTED) {
1103
/* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
1104
* Unix. */
1105
err = WSAECONNRESET;
1106
}
1107
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1108
1109
handle->read_cb((uv_stream_t*)handle,
1110
uv_translate_sys_error(err),
1111
&buf);
1112
}
1113
break;
1114
}
1115
}
1116
1117
done:
1118
/* Post another read if still reading and not closing. */
1119
if ((handle->flags & UV_HANDLE_READING) &&
1120
!(handle->flags & UV_HANDLE_READ_PENDING)) {
1121
uv__tcp_queue_read(loop, handle);
1122
}
1123
}
1124
1125
DECREASE_PENDING_REQ_COUNT(handle);
1126
}
1127
1128
1129
void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
1130
uv_write_t* req) {
1131
int err;
1132
1133
assert(handle->type == UV_TCP);
1134
1135
assert(handle->write_queue_size >= req->u.io.queued_bytes);
1136
handle->write_queue_size -= req->u.io.queued_bytes;
1137
1138
UNREGISTER_HANDLE_REQ(loop, handle, req);
1139
1140
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1141
if (req->wait_handle != INVALID_HANDLE_VALUE) {
1142
UnregisterWait(req->wait_handle);
1143
req->wait_handle = INVALID_HANDLE_VALUE;
1144
}
1145
if (req->event_handle != NULL) {
1146
CloseHandle(req->event_handle);
1147
req->event_handle = NULL;
1148
}
1149
}
1150
1151
if (req->cb) {
1152
err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
1153
if (err == UV_ECONNABORTED) {
1154
/* use UV_ECANCELED for consistency with Unix */
1155
err = UV_ECANCELED;
1156
}
1157
req->cb(req, err);
1158
}
1159
1160
handle->stream.conn.write_reqs_pending--;
1161
if (handle->stream.conn.write_reqs_pending == 0) {
1162
if (handle->flags & UV_HANDLE_CLOSING) {
1163
closesocket(handle->socket);
1164
handle->socket = INVALID_SOCKET;
1165
}
1166
if (handle->flags & UV_HANDLE_SHUTTING)
1167
uv__process_tcp_shutdown_req(loop,
1168
handle,
1169
handle->stream.conn.shutdown_req);
1170
}
1171
1172
DECREASE_PENDING_REQ_COUNT(handle);
1173
}
1174
1175
1176
void uv__process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
1177
uv_req_t* raw_req) {
1178
uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
1179
int err;
1180
1181
assert(handle->type == UV_TCP);
1182
1183
/* If handle->accepted_socket is not a valid socket, then uv_queue_accept
1184
* must have failed. This is a serious error. We stop accepting connections
1185
* and report this error to the connection callback. */
1186
if (req->accept_socket == INVALID_SOCKET) {
1187
if (handle->flags & UV_HANDLE_LISTENING) {
1188
handle->flags &= ~UV_HANDLE_LISTENING;
1189
DECREASE_ACTIVE_COUNT(loop, handle);
1190
if (handle->stream.serv.connection_cb) {
1191
err = GET_REQ_SOCK_ERROR(req);
1192
handle->stream.serv.connection_cb((uv_stream_t*)handle,
1193
uv_translate_sys_error(err));
1194
}
1195
}
1196
} else if (REQ_SUCCESS(req) &&
1197
setsockopt(req->accept_socket,
1198
SOL_SOCKET,
1199
SO_UPDATE_ACCEPT_CONTEXT,
1200
(char*)&handle->socket,
1201
sizeof(handle->socket)) == 0) {
1202
req->next_pending = handle->tcp.serv.pending_accepts;
1203
handle->tcp.serv.pending_accepts = req;
1204
1205
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
1206
if (handle->stream.serv.connection_cb) {
1207
handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1208
}
1209
} else {
1210
/* Error related to accepted socket is ignored because the server socket
1211
* may still be healthy. If the server socket is broken uv_queue_accept
1212
* will detect it. */
1213
closesocket(req->accept_socket);
1214
req->accept_socket = INVALID_SOCKET;
1215
if (handle->flags & UV_HANDLE_LISTENING) {
1216
uv__tcp_queue_accept(handle, req);
1217
}
1218
}
1219
1220
DECREASE_PENDING_REQ_COUNT(handle);
1221
}
1222
1223
1224
void uv__process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
1225
uv_connect_t* req) {
1226
int err;
1227
1228
assert(handle->type == UV_TCP);
1229
1230
UNREGISTER_HANDLE_REQ(loop, handle, req);
1231
1232
err = 0;
1233
if (handle->delayed_error) {
1234
/* To smooth over the differences between unixes errors that
1235
* were reported synchronously on the first connect can be delayed
1236
* until the next tick--which is now.
1237
*/
1238
err = handle->delayed_error;
1239
handle->delayed_error = 0;
1240
} else if (REQ_SUCCESS(req)) {
1241
if (handle->flags & UV_HANDLE_CLOSING) {
1242
/* use UV_ECANCELED for consistency with Unix */
1243
err = ERROR_OPERATION_ABORTED;
1244
} else if (setsockopt(handle->socket,
1245
SOL_SOCKET,
1246
SO_UPDATE_CONNECT_CONTEXT,
1247
NULL,
1248
0) == 0) {
1249
uv__connection_init((uv_stream_t*)handle);
1250
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1251
loop->active_tcp_streams++;
1252
} else {
1253
err = WSAGetLastError();
1254
}
1255
} else {
1256
err = GET_REQ_SOCK_ERROR(req);
1257
}
1258
req->cb(req, uv_translate_sys_error(err));
1259
1260
DECREASE_PENDING_REQ_COUNT(handle);
1261
}
1262
1263
1264
int uv__tcp_xfer_export(uv_tcp_t* handle,
1265
int target_pid,
1266
uv__ipc_socket_xfer_type_t* xfer_type,
1267
uv__ipc_socket_xfer_info_t* xfer_info) {
1268
if (handle->flags & UV_HANDLE_CONNECTION) {
1269
*xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
1270
} else {
1271
*xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
1272
/* We're about to share the socket with another process. Because this is a
1273
* listening socket, we assume that the other process will be accepting
1274
* connections on it. Thus, before sharing the socket with another process,
1275
* we call listen here in the parent process. */
1276
if (!(handle->flags & UV_HANDLE_LISTENING)) {
1277
if (!(handle->flags & UV_HANDLE_BOUND)) {
1278
return ERROR_NOT_SUPPORTED;
1279
}
1280
if (handle->delayed_error == 0 &&
1281
listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
1282
handle->delayed_error = WSAGetLastError();
1283
}
1284
}
1285
}
1286
1287
if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
1288
return WSAGetLastError();
1289
xfer_info->delayed_error = handle->delayed_error;
1290
1291
/* Mark the local copy of the handle as 'shared' so we behave in a way that's
1292
* friendly to the process(es) that we share the socket with. */
1293
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
1294
1295
return 0;
1296
}
1297
1298
1299
int uv__tcp_xfer_import(uv_tcp_t* tcp,
1300
uv__ipc_socket_xfer_type_t xfer_type,
1301
uv__ipc_socket_xfer_info_t* xfer_info) {
1302
int err;
1303
SOCKET socket;
1304
1305
assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
1306
xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
1307
1308
socket = WSASocketW(FROM_PROTOCOL_INFO,
1309
FROM_PROTOCOL_INFO,
1310
FROM_PROTOCOL_INFO,
1311
&xfer_info->socket_info,
1312
0,
1313
WSA_FLAG_OVERLAPPED);
1314
1315
if (socket == INVALID_SOCKET) {
1316
return WSAGetLastError();
1317
}
1318
1319
err = uv__tcp_set_socket(
1320
tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
1321
if (err) {
1322
closesocket(socket);
1323
return err;
1324
}
1325
1326
tcp->delayed_error = xfer_info->delayed_error;
1327
tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
1328
1329
if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
1330
uv__connection_init((uv_stream_t*)tcp);
1331
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1332
}
1333
1334
tcp->loop->active_tcp_streams++;
1335
return 0;
1336
}
1337
1338
1339
int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
1340
int err;
1341
1342
if (handle->socket != INVALID_SOCKET) {
1343
err = uv__tcp_nodelay(handle, handle->socket, enable);
1344
if (err)
1345
return uv_translate_sys_error(err);
1346
}
1347
1348
if (enable) {
1349
handle->flags |= UV_HANDLE_TCP_NODELAY;
1350
} else {
1351
handle->flags &= ~UV_HANDLE_TCP_NODELAY;
1352
}
1353
1354
return 0;
1355
}
1356
1357
1358
int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
1359
int err;
1360
1361
if (handle->socket != INVALID_SOCKET) {
1362
err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
1363
if (err)
1364
return uv_translate_sys_error(err);
1365
}
1366
1367
if (enable) {
1368
handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
1369
} else {
1370
handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
1371
}
1372
1373
/* TODO: Store delay if handle->socket isn't created yet. */
1374
1375
return 0;
1376
}
1377
1378
1379
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
1380
if (handle->flags & UV_HANDLE_CONNECTION) {
1381
return UV_EINVAL;
1382
}
1383
1384
/* Check if we're already in the desired mode. */
1385
if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
1386
(!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
1387
return 0;
1388
}
1389
1390
/* Don't allow switching from single pending accept to many. */
1391
if (enable) {
1392
return UV_ENOTSUP;
1393
}
1394
1395
/* Check if we're in a middle of changing the number of pending accepts. */
1396
if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
1397
return 0;
1398
}
1399
1400
handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
1401
1402
/* Flip the changing flag if we have already queued multiple accepts. */
1403
if (handle->flags & UV_HANDLE_LISTENING) {
1404
handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
1405
}
1406
1407
return 0;
1408
}
1409
1410
1411
static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
1412
SOCKET socket;
1413
int non_ifs_lsp;
1414
int reading;
1415
int writing;
1416
1417
socket = tcp->socket;
1418
reading = tcp->flags & UV_HANDLE_READ_PENDING;
1419
writing = tcp->stream.conn.write_reqs_pending > 0;
1420
if (!reading && !writing)
1421
return;
1422
1423
/* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
1424
* them each explicitly with CancelIoEx (like unix). */
1425
if (reading)
1426
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1427
if (writing)
1428
CancelIo((HANDLE) socket);
1429
1430
/* Check if we have any non-IFS LSPs stacked on top of TCP */
1431
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
1432
uv_tcp_non_ifs_lsp_ipv4;
1433
1434
/* If there are non-ifs LSPs then try to obtain a base handle for the socket.
1435
* This will always fail on Windows XP/3k. */
1436
if (non_ifs_lsp) {
1437
DWORD bytes;
1438
if (WSAIoctl(socket,
1439
SIO_BASE_HANDLE,
1440
NULL,
1441
0,
1442
&socket,
1443
sizeof socket,
1444
&bytes,
1445
NULL,
1446
NULL) != 0) {
1447
/* Failed. We can't do CancelIo. */
1448
return;
1449
}
1450
}
1451
1452
assert(socket != 0 && socket != INVALID_SOCKET);
1453
1454
if (socket != tcp->socket) {
1455
if (reading)
1456
CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1457
if (writing)
1458
CancelIo((HANDLE) socket);
1459
}
1460
}
1461
1462
1463
void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
1464
if (tcp->flags & UV_HANDLE_CONNECTION) {
1465
if (tcp->flags & UV_HANDLE_READING) {
1466
uv_read_stop((uv_stream_t*) tcp);
1467
}
1468
uv__tcp_try_cancel_reqs(tcp);
1469
} else {
1470
if (tcp->tcp.serv.accept_reqs != NULL) {
1471
/* First close the incoming sockets to cancel the accept operations before
1472
* we free their resources. */
1473
unsigned int i;
1474
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
1475
uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
1476
if (req->accept_socket != INVALID_SOCKET) {
1477
closesocket(req->accept_socket);
1478
req->accept_socket = INVALID_SOCKET;
1479
}
1480
}
1481
}
1482
assert(!(tcp->flags & UV_HANDLE_READING));
1483
}
1484
1485
if (tcp->flags & UV_HANDLE_LISTENING) {
1486
tcp->flags &= ~UV_HANDLE_LISTENING;
1487
DECREASE_ACTIVE_COUNT(loop, tcp);
1488
}
1489
1490
tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1491
uv__handle_closing(tcp);
1492
1493
/* If any overlapped req failed to cancel, calling `closesocket` now would
1494
* cause Win32 to send an RST packet. Try to avoid that for writes, if
1495
* possibly applicable, by waiting to process the completion notifications
1496
* first (which typically should be cancellations). There's not much we can
1497
* do about canceled reads, which also will generate an RST packet. */
1498
if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
1499
tcp->stream.conn.write_reqs_pending == 0) {
1500
closesocket(tcp->socket);
1501
tcp->socket = INVALID_SOCKET;
1502
}
1503
1504
if (tcp->reqs_pending == 0)
1505
uv__want_endgame(loop, (uv_handle_t*) tcp);
1506
}
1507
1508
1509
int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
1510
WSAPROTOCOL_INFOW protocol_info;
1511
int opt_len;
1512
int err;
1513
struct sockaddr_storage saddr;
1514
int saddr_len;
1515
1516
/* Detect the address family of the socket. */
1517
opt_len = (int) sizeof protocol_info;
1518
if (getsockopt(sock,
1519
SOL_SOCKET,
1520
SO_PROTOCOL_INFOW,
1521
(char*) &protocol_info,
1522
&opt_len) == SOCKET_ERROR) {
1523
return uv_translate_sys_error(GetLastError());
1524
}
1525
1526
err = uv__tcp_set_socket(handle->loop,
1527
handle,
1528
sock,
1529
protocol_info.iAddressFamily,
1530
1);
1531
if (err) {
1532
return uv_translate_sys_error(err);
1533
}
1534
1535
/* Support already active socket. */
1536
saddr_len = sizeof(saddr);
1537
if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1538
/* Socket is already bound. */
1539
handle->flags |= UV_HANDLE_BOUND;
1540
saddr_len = sizeof(saddr);
1541
if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1542
/* Socket is already connected. */
1543
uv__connection_init((uv_stream_t*) handle);
1544
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1545
}
1546
}
1547
1548
return 0;
1549
}
1550
1551
1552
/* This function is an egress point, i.e. it returns libuv errors rather than
1553
* system errors.
1554
*/
1555
int uv__tcp_bind(uv_tcp_t* handle,
1556
const struct sockaddr* addr,
1557
unsigned int addrlen,
1558
unsigned int flags) {
1559
int err;
1560
1561
err = uv__tcp_try_bind(handle, addr, addrlen, flags);
1562
if (err)
1563
return uv_translate_sys_error(err);
1564
1565
return 0;
1566
}
1567
1568
1569
/* This function is an egress point, i.e. it returns libuv errors rather than
1570
* system errors.
1571
*/
1572
int uv__tcp_connect(uv_connect_t* req,
1573
uv_tcp_t* handle,
1574
const struct sockaddr* addr,
1575
unsigned int addrlen,
1576
uv_connect_cb cb) {
1577
int err;
1578
1579
err = uv__tcp_try_connect(req, handle, addr, addrlen, cb);
1580
if (err)
1581
return uv_translate_sys_error(err);
1582
1583
return 0;
1584
}
1585
1586
#ifndef WSA_FLAG_NO_HANDLE_INHERIT
1587
/* Added in Windows 7 SP1. Specify this to avoid race conditions, */
1588
/* but also manually clear the inherit flag in case this failed. */
1589
#define WSA_FLAG_NO_HANDLE_INHERIT 0x80
1590
#endif
1591
1592
int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
1593
SOCKET server = INVALID_SOCKET;
1594
SOCKET client0 = INVALID_SOCKET;
1595
SOCKET client1 = INVALID_SOCKET;
1596
SOCKADDR_IN name;
1597
LPFN_ACCEPTEX func_acceptex;
1598
WSAOVERLAPPED overlap;
1599
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
1600
int namelen;
1601
int err;
1602
DWORD bytes;
1603
DWORD flags;
1604
DWORD client0_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1605
DWORD client1_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1606
1607
if (flags0 & UV_NONBLOCK_PIPE)
1608
client0_flags |= WSA_FLAG_OVERLAPPED;
1609
if (flags1 & UV_NONBLOCK_PIPE)
1610
client1_flags |= WSA_FLAG_OVERLAPPED;
1611
1612
server = WSASocketW(AF_INET, type, protocol, NULL, 0,
1613
WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
1614
if (server == INVALID_SOCKET)
1615
goto wsaerror;
1616
if (!SetHandleInformation((HANDLE) server, HANDLE_FLAG_INHERIT, 0))
1617
goto error;
1618
name.sin_family = AF_INET;
1619
name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1620
name.sin_port = 0;
1621
if (bind(server, (SOCKADDR*) &name, sizeof(name)) != 0)
1622
goto wsaerror;
1623
if (listen(server, 1) != 0)
1624
goto wsaerror;
1625
namelen = sizeof(name);
1626
if (getsockname(server, (SOCKADDR*) &name, &namelen) != 0)
1627
goto wsaerror;
1628
client0 = WSASocketW(AF_INET, type, protocol, NULL, 0, client0_flags);
1629
if (client0 == INVALID_SOCKET)
1630
goto wsaerror;
1631
if (!SetHandleInformation((HANDLE) client0, HANDLE_FLAG_INHERIT, 0))
1632
goto error;
1633
if (connect(client0, (SOCKADDR*) &name, sizeof(name)) != 0)
1634
goto wsaerror;
1635
client1 = WSASocketW(AF_INET, type, protocol, NULL, 0, client1_flags);
1636
if (client1 == INVALID_SOCKET)
1637
goto wsaerror;
1638
if (!SetHandleInformation((HANDLE) client1, HANDLE_FLAG_INHERIT, 0))
1639
goto error;
1640
if (!uv__get_acceptex_function(server, &func_acceptex)) {
1641
err = WSAEAFNOSUPPORT;
1642
goto cleanup;
1643
}
1644
memset(&overlap, 0, sizeof(overlap));
1645
if (!func_acceptex(server,
1646
client1,
1647
accept_buffer,
1648
0,
1649
sizeof(struct sockaddr_storage),
1650
sizeof(struct sockaddr_storage),
1651
&bytes,
1652
&overlap)) {
1653
err = WSAGetLastError();
1654
if (err == ERROR_IO_PENDING) {
1655
/* Result should complete immediately, since we already called connect,
1656
* but empirically, we sometimes have to poll the kernel a couple times
1657
* until it notices that. */
1658
while (!WSAGetOverlappedResult(client1, &overlap, &bytes, FALSE, &flags)) {
1659
err = WSAGetLastError();
1660
if (err != WSA_IO_INCOMPLETE)
1661
goto cleanup;
1662
SwitchToThread();
1663
}
1664
}
1665
else {
1666
goto cleanup;
1667
}
1668
}
1669
if (setsockopt(client1, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1670
(char*) &server, sizeof(server)) != 0) {
1671
goto wsaerror;
1672
}
1673
1674
closesocket(server);
1675
1676
fds[0] = client0;
1677
fds[1] = client1;
1678
1679
return 0;
1680
1681
wsaerror:
1682
err = WSAGetLastError();
1683
goto cleanup;
1684
1685
error:
1686
err = GetLastError();
1687
goto cleanup;
1688
1689
cleanup:
1690
if (server != INVALID_SOCKET)
1691
closesocket(server);
1692
if (client0 != INVALID_SOCKET)
1693
closesocket(client0);
1694
if (client1 != INVALID_SOCKET)
1695
closesocket(client1);
1696
1697
assert(err);
1698
return uv_translate_sys_error(err);
1699
}
1700
1701