#include <assert.h>
#include <io.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "handle-inl.h"
#include "internal.h"
#include "req-inl.h"
#include "stream-inl.h"
#include "uv-common.h"
#include "uv.h"
#include <aclapi.h>
#include <accctrl.h>
static char uv_zero_[] = "";
static const uv_buf_t uv_null_buf_ = { 0, NULL };
static const int64_t eof_timeout = 50;
static const int default_pending_pipe_instances = 4;
static char pipe_prefix[] = "\\\\?\\pipe";
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
typedef struct {
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;
QUEUE member;
} uv__ipc_xfer_queue_item_t;
enum {
UV__IPC_FRAME_HAS_DATA = 0x01,
UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
UV__IPC_FRAME_XFER_FLAGS = 0x06,
UV__IPC_FRAME_VALID_FLAGS = 0x07
};
typedef struct {
uint32_t flags;
uint32_t reserved1;
uint32_t data_length;
uint32_t reserved2;
} uv__ipc_frame_header_t;
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
typedef struct {
uv_write_t req;
uv_write_t* user_req;
} uv__coalesced_write_t;
static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
static void eof_timer_cb(uv_timer_t* timer);
static void eof_timer_destroy(uv_pipe_t* pipe);
static void eof_timer_close_cb(uv_handle_t* handle);
static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
}
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
handle->name = NULL;
handle->pipe.conn.ipc_remote_pid = 0;
handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
handle->pipe.conn.ipc_xfer_queue_length = 0;
handle->ipc = ipc;
handle->pipe.conn.non_overlapped_writes_tail = NULL;
return 0;
}
static void uv__pipe_connection_init(uv_pipe_t* handle) {
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
uv__connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->pipe.conn.eof_timer = NULL;
}
static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
HANDLE pipeHandle;
pipeHandle = CreateFileW(name,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
return pipeHandle;
}
if (GetLastError() == ERROR_ACCESS_DENIED) {
pipeHandle = CreateFileW(name,
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_READABLE;
return pipeHandle;
}
}
if (GetLastError() == ERROR_ACCESS_DENIED) {
pipeHandle = CreateFileW(name,
GENERIC_WRITE | FILE_READ_ATTRIBUTES,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
*duplex_flags = UV_HANDLE_WRITABLE;
return pipeHandle;
}
}
return INVALID_HANDLE_VALUE;
}
static void close_pipe(uv_pipe_t* pipe) {
assert(pipe->u.fd == -1 || pipe->u.fd > 2);
if (pipe->u.fd == -1)
CloseHandle(pipe->handle);
else
close(pipe->u.fd);
pipe->u.fd = -1;
pipe->handle = INVALID_HANDLE_VALUE;
}
static int uv__pipe_server(
HANDLE* pipeHandle_ptr, DWORD access,
char* name, size_t nameSize, char* random) {
HANDLE pipeHandle;
int err;
for (;;) {
uv__unique_pipe_name(random, name, nameSize);
pipeHandle = CreateNamedPipeA(name,
access | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
break;
}
err = GetLastError();
if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
goto error;
}
random++;
}
*pipeHandle_ptr = pipeHandle;
return 0;
error:
if (pipeHandle != INVALID_HANDLE_VALUE)
CloseHandle(pipeHandle);
return err;
}
static int uv__create_pipe_pair(
HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
unsigned int server_flags, unsigned int client_flags,
int inherit_client, char* random) {
char pipe_name[64];
SECURITY_ATTRIBUTES sa;
DWORD server_access;
DWORD client_access;
HANDLE server_pipe;
HANDLE client_pipe;
int err;
server_pipe = INVALID_HANDLE_VALUE;
client_pipe = INVALID_HANDLE_VALUE;
server_access = 0;
if (server_flags & UV_READABLE_PIPE)
server_access |= PIPE_ACCESS_INBOUND;
if (server_flags & UV_WRITABLE_PIPE)
server_access |= PIPE_ACCESS_OUTBOUND;
if (server_flags & UV_NONBLOCK_PIPE)
server_access |= FILE_FLAG_OVERLAPPED;
server_access |= WRITE_DAC;
client_access = 0;
if (client_flags & UV_READABLE_PIPE)
client_access |= GENERIC_READ;
else
client_access |= FILE_READ_ATTRIBUTES;
if (client_flags & UV_WRITABLE_PIPE)
client_access |= GENERIC_WRITE;
else
client_access |= FILE_WRITE_ATTRIBUTES;
client_access |= WRITE_DAC;
err = uv__pipe_server(&server_pipe,
server_access,
pipe_name,
sizeof(pipe_name),
random);
if (err)
goto error;
sa.nLength = sizeof sa;
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = inherit_client;
client_pipe = CreateFileA(pipe_name,
client_access,
0,
&sa,
OPEN_EXISTING,
(client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
NULL);
if (client_pipe == INVALID_HANDLE_VALUE) {
err = GetLastError();
goto error;
}
#ifndef NDEBUG
{
DWORD mode;
BOOL r;
r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
if (r == TRUE) {
assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
} else {
fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
}
}
#endif
if (!ConnectNamedPipe(server_pipe, NULL)) {
if (GetLastError() != ERROR_PIPE_CONNECTED) {
err = GetLastError();
goto error;
}
}
*client_pipe_ptr = client_pipe;
*server_pipe_ptr = server_pipe;
return 0;
error:
if (server_pipe != INVALID_HANDLE_VALUE)
CloseHandle(server_pipe);
if (client_pipe != INVALID_HANDLE_VALUE)
CloseHandle(client_pipe);
return err;
}
int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
uv_file temp[2];
int err;
HANDLE readh;
HANDLE writeh;
read_flags |= UV_READABLE_PIPE;
write_flags |= UV_WRITABLE_PIPE;
err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
if (err != 0)
return err;
temp[0] = _open_osfhandle((intptr_t) readh, 0);
if (temp[0] == -1) {
if (errno == UV_EMFILE)
err = UV_EMFILE;
else
err = UV_UNKNOWN;
CloseHandle(readh);
CloseHandle(writeh);
return err;
}
temp[1] = _open_osfhandle((intptr_t) writeh, 0);
if (temp[1] == -1) {
if (errno == UV_EMFILE)
err = UV_EMFILE;
else
err = UV_UNKNOWN;
_close(temp[0]);
CloseHandle(writeh);
return err;
}
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
}
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
HANDLE server_pipe;
HANDLE client_pipe;
unsigned int server_flags;
unsigned int client_flags;
int err;
uv__pipe_connection_init(parent_pipe);
server_pipe = INVALID_HANDLE_VALUE;
client_pipe = INVALID_HANDLE_VALUE;
server_flags = 0;
client_flags = 0;
if (flags & UV_READABLE_PIPE) {
server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
client_flags |= UV_READABLE_PIPE;
}
if (flags & UV_WRITABLE_PIPE) {
server_flags |= UV_READABLE_PIPE;
client_flags |= UV_WRITABLE_PIPE;
}
server_flags |= UV_NONBLOCK_PIPE;
if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
client_flags |= UV_NONBLOCK_PIPE;
}
err = uv__create_pipe_pair(&server_pipe, &client_pipe,
server_flags, client_flags, 1, (char*) server_pipe);
if (err)
goto error;
if (CreateIoCompletionPort(server_pipe,
loop->iocp,
(ULONG_PTR) parent_pipe,
0) == NULL) {
err = GetLastError();
goto error;
}
parent_pipe->handle = server_pipe;
*child_pipe_ptr = client_pipe;
if (flags & UV_READABLE_PIPE)
parent_pipe->flags |= UV_HANDLE_WRITABLE;
if (flags & UV_WRITABLE_PIPE)
parent_pipe->flags |= UV_HANDLE_READABLE;
return 0;
error:
if (server_pipe != INVALID_HANDLE_VALUE)
CloseHandle(server_pipe);
if (client_pipe != INVALID_HANDLE_VALUE)
CloseHandle(client_pipe);
return err;
}
static int uv__set_pipe_handle(uv_loop_t* loop,
uv_pipe_t* handle,
HANDLE pipeHandle,
int fd,
DWORD duplex_flags) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_MODE_INFORMATION mode_info;
DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
DWORD current_mode = 0;
DWORD err = 0;
assert(handle->flags & UV_HANDLE_CONNECTION);
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
if (handle->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
if (handle->handle != INVALID_HANDLE_VALUE)
return UV_EBUSY;
if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
err = GetLastError();
if (err == ERROR_ACCESS_DENIED) {
if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
NULL, NULL, 0)) {
return uv_translate_sys_error(GetLastError());
} else if (current_mode & PIPE_NOWAIT) {
return UV_EACCES;
}
} else {
if (err == ERROR_INVALID_PARAMETER) {
return UV_ENOTSOCK;
}
return uv_translate_sys_error(err);
}
}
nt_status = pNtQueryInformationFile(pipeHandle,
&io_status,
&mode_info,
sizeof(mode_info),
FileModeInformation);
if (nt_status != STATUS_SUCCESS) {
return uv_translate_sys_error(err);
}
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
handle->pipe.conn.readfile_thread_handle = NULL;
InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
} else {
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR) handle,
0) == NULL) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
}
handle->handle = pipeHandle;
handle->u.fd = fd;
handle->flags |= duplex_flags;
return 0;
}
static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
req->pipeHandle =
CreateNamedPipeW(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
(firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
if (req->pipeHandle == INVALID_HANDLE_VALUE) {
return 0;
}
if (CreateIoCompletionPort(req->pipeHandle,
loop->iocp,
(ULONG_PTR) handle,
0) == NULL) {
uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
}
handle->handle = req->pipeHandle;
return 1;
}
static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
uv_loop_t* loop;
uv_pipe_t* handle;
uv_shutdown_t* req;
req = (uv_shutdown_t*) parameter;
assert(req);
handle = (uv_pipe_t*) req->handle;
assert(handle);
loop = handle->loop;
assert(loop);
FlushFileBuffers(handle->handle);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
DWORD result;
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
assert(handle->flags & UV_HANDLE_CONNECTION);
assert(req != NULL);
assert(handle->stream.conn.write_reqs_pending == 0);
SET_REQ_SUCCESS(req);
if (handle->flags & UV_HANDLE_CLOSING) {
uv__insert_pending_req(loop, (uv_req_t*) req);
return;
}
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
&pipe_info,
sizeof pipe_info,
FilePipeLocalInformation);
if (nt_status != STATUS_SUCCESS) {
SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
handle->flags |= UV_HANDLE_WRITABLE;
uv__insert_pending_req(loop, (uv_req_t*) req);
return;
}
if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
uv__insert_pending_req(loop, (uv_req_t*) req);
return;
}
result = QueueUserWorkItem(pipe_shutdown_thread_proc,
req,
WT_EXECUTELONGFUNCTION);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
handle->flags |= UV_HANDLE_WRITABLE;
uv__insert_pending_req(loop, (uv_req_t*) req);
return;
}
}
void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
uv__ipc_xfer_queue_item_t* xfer_queue_item;
assert(handle->reqs_pending == 0);
assert(handle->flags & UV_HANDLE_CLOSING);
assert(!(handle->flags & UV_HANDLE_CLOSED));
if (handle->flags & UV_HANDLE_CONNECTION) {
while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
QUEUE* q;
SOCKET socket;
q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
QUEUE_REMOVE(q);
xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
socket = WSASocketW(FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
FROM_PROTOCOL_INFO,
&xfer_queue_item->xfer_info.socket_info,
0,
WSA_FLAG_OVERLAPPED);
uv__free(xfer_queue_item);
if (socket != INVALID_SOCKET)
closesocket(socket);
}
handle->pipe.conn.ipc_xfer_queue_length = 0;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle != NULL) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
}
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
assert(handle->pipe.serv.accept_reqs);
uv__free(handle->pipe.serv.accept_reqs);
handle->pipe.serv.accept_reqs = NULL;
}
uv__handle_close(handle);
}
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
if (handle->flags & UV_HANDLE_BOUND)
return;
handle->pipe.serv.pending_instances = count;
handle->flags |= UV_HANDLE_PIPESERVER;
}
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
uv_loop_t* loop = handle->loop;
int i, err, nameSize;
uv_pipe_accept_t* req;
if (handle->flags & UV_HANDLE_BOUND) {
return UV_EINVAL;
}
if (!name) {
return UV_EINVAL;
}
if (uv__is_closing(handle)) {
return UV_EINVAL;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
handle->pipe.serv.pending_instances = default_pending_pipe_instances;
}
handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
if (!handle->pipe.serv.accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
}
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
req = &handle->pipe.serv.accept_reqs[i];
UV_REQ_INIT(req, UV_ACCEPT);
req->data = handle;
req->pipeHandle = INVALID_HANDLE_VALUE;
req->next_pending = NULL;
}
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
handle->name = uv__malloc(nameSize);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
}
if (!MultiByteToWideChar(CP_UTF8,
0,
name,
-1,
handle->name,
nameSize / sizeof(WCHAR))) {
err = GetLastError();
goto error;
}
if (!pipe_alloc_accept(loop,
handle,
&handle->pipe.serv.accept_reqs[0],
TRUE)) {
err = GetLastError();
if (err == ERROR_ACCESS_DENIED) {
err = WSAEADDRINUSE;
} else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
err = WSAEACCES;
}
goto error;
}
handle->pipe.serv.pending_accepts = NULL;
handle->flags |= UV_HANDLE_PIPESERVER;
handle->flags |= UV_HANDLE_BOUND;
return 0;
error:
if (handle->name) {
uv__free(handle->name);
handle->name = NULL;
}
return uv_translate_sys_error(err);
}
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
uv_loop_t* loop;
uv_pipe_t* handle;
uv_connect_t* req;
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
DWORD duplex_flags;
req = (uv_connect_t*) parameter;
assert(req);
handle = (uv_pipe_t*) req->handle;
assert(handle);
loop = handle->loop;
assert(loop);
while (WaitNamedPipeW(handle->name, 30000)) {
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
if (pipeHandle != INVALID_HANDLE_VALUE)
break;
SwitchToThread();
}
if (pipeHandle != INVALID_HANDLE_VALUE) {
SET_REQ_SUCCESS(req);
req->u.connect.pipeHandle = pipeHandle;
req->u.connect.duplex_flags = duplex_flags;
} else {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
int err, nameSize;
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
DWORD duplex_flags;
UV_REQ_INIT(req, UV_CONNECT);
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
req->u.connect.duplex_flags = 0;
if (handle->flags & UV_HANDLE_PIPESERVER) {
err = ERROR_INVALID_PARAMETER;
goto error;
}
if (handle->flags & UV_HANDLE_CONNECTION) {
err = ERROR_PIPE_BUSY;
goto error;
}
uv__pipe_connection_init(handle);
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
handle->name = uv__malloc(nameSize);
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
}
if (!MultiByteToWideChar(CP_UTF8,
0,
name,
-1,
handle->name,
nameSize / sizeof(WCHAR))) {
err = GetLastError();
goto error;
}
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
if (pipeHandle == INVALID_HANDLE_VALUE) {
if (GetLastError() == ERROR_PIPE_BUSY) {
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
err = GetLastError();
goto error;
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
return;
}
err = GetLastError();
goto error;
}
req->u.connect.pipeHandle = pipeHandle;
req->u.connect.duplex_flags = duplex_flags;
SET_REQ_SUCCESS(req);
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
error:
if (handle->name) {
uv__free(handle->name);
handle->name = NULL;
}
if (pipeHandle != INVALID_HANDLE_VALUE)
CloseHandle(pipeHandle);
SET_REQ_ERROR(req, err);
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
}
void uv__pipe_interrupt_read(uv_pipe_t* handle) {
BOOL r;
if (!(handle->flags & UV_HANDLE_READ_PENDING))
return;
if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
return;
if (handle->handle == INVALID_HANDLE_VALUE)
return;
if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
assert(r || GetLastError() == ERROR_NOT_FOUND);
(void) r;
} else {
HANDLE thread;
volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
thread = *thread_ptr;
if (thread == NULL) {
*thread_ptr = INVALID_HANDLE_VALUE;
} else {
while (thread != INVALID_HANDLE_VALUE) {
r = CancelSynchronousIo(thread);
assert(r || GetLastError() == ERROR_NOT_FOUND);
SwitchToThread();
thread = *thread_ptr;
}
}
LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
}
handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
}
void uv__pipe_read_stop(uv_pipe_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(handle->loop, handle);
uv__pipe_interrupt_read(handle);
}
void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;
if (handle->flags & UV_HANDLE_READING) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(handle);
uv__pipe_interrupt_read(handle);
if (handle->name) {
uv__free(handle->name);
handle->name = NULL;
}
if (handle->flags & UV_HANDLE_PIPESERVER) {
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
}
}
handle->handle = INVALID_HANDLE_VALUE;
}
if (handle->flags & UV_HANDLE_CONNECTION) {
eof_timer_destroy(handle);
}
if ((handle->flags & UV_HANDLE_CONNECTION)
&& handle->handle != INVALID_HANDLE_VALUE) {
close_pipe(handle);
}
if (handle->reqs_pending == 0)
uv__want_endgame(loop, (uv_handle_t*) handle);
}
static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
uv_pipe_accept_t* req, BOOL firstInstance) {
assert(handle->flags & UV_HANDLE_LISTENING);
if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
SET_REQ_ERROR(req, GetLastError());
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
GetLastError() != ERROR_IO_PENDING) {
if (GetLastError() == ERROR_PIPE_CONNECTED) {
SET_REQ_SUCCESS(req);
} else {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
SET_REQ_ERROR(req, GetLastError());
}
uv__insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
return;
}
handle->reqs_pending++;
}
int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
uv_loop_t* loop = server->loop;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
QUEUE* q;
uv__ipc_xfer_queue_item_t* item;
int err;
if (server->ipc) {
if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
return WSAEWOULDBLOCK;
}
q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
QUEUE_REMOVE(q);
server->pipe.conn.ipc_xfer_queue_length--;
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
err = uv__tcp_xfer_import(
(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
if (err != 0)
return err;
uv__free(item);
} else {
pipe_client = (uv_pipe_t*) client;
uv__pipe_connection_init(pipe_client);
req = server->pipe.serv.pending_accepts;
if (!req) {
return WSAEWOULDBLOCK;
}
pipe_client->handle = req->pipeHandle;
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
server->pipe.serv.pending_accepts = req->next_pending;
req->next_pending = NULL;
req->pipeHandle = INVALID_HANDLE_VALUE;
server->handle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv__pipe_queue_accept(loop, server, req, FALSE);
}
}
return 0;
}
int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv_loop_t* loop = handle->loop;
int i;
if (handle->flags & UV_HANDLE_LISTENING) {
handle->stream.serv.connection_cb = cb;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
return WSAEINVAL;
}
if (handle->flags & UV_HANDLE_READING) {
return WSAEISCONN;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
return ERROR_NOT_SUPPORTED;
}
if (handle->ipc) {
return WSAEINVAL;
}
handle->flags |= UV_HANDLE_LISTENING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->stream.serv.connection_cb = cb;
assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
}
return 0;
}
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
uv_read_t* req = (uv_read_t*) arg;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
HANDLE thread;
DWORD bytes;
DWORD err;
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);
err = 0;
if (!DuplicateHandle(GetCurrentProcess(),
GetCurrentThread(),
GetCurrentProcess(),
&thread,
0,
FALSE,
DUPLICATE_SAME_ACCESS)) {
err = GetLastError();
goto out1;
}
EnterCriticalSection(lock);
if (*thread_ptr == INVALID_HANDLE_VALUE) {
err = ERROR_OPERATION_ABORTED;
} else {
assert(*thread_ptr == NULL);
*thread_ptr = thread;
}
LeaveCriticalSection(lock);
if (err)
goto out2;
if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
err = GetLastError();
assert(thread == *thread_ptr);
*thread_ptr = INVALID_HANDLE_VALUE;
EnterCriticalSection(lock);
LeaveCriticalSection(lock);
out2:
CloseHandle(thread);
out1:
if (err)
SET_REQ_ERROR(req, err);
else
SET_REQ_SUCCESS(req);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_write_t* req = (uv_write_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
uv_loop_t* loop = handle->loop;
assert(req != NULL);
assert(req->type == UV_WRITE);
assert(handle->type == UV_NAMED_PIPE);
result = WriteFile(handle->handle,
req->write_buffer.base,
req->write_buffer.len,
&bytes,
NULL);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
uv_read_t* req;
uv_tcp_t* handle;
req = (uv_read_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->u.io.overlapped.InternalHigh,
0,
&req->u.io.overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
uv_write_t* req;
uv_tcp_t* handle;
req = (uv_write_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->handle;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->u.io.overlapped.InternalHigh,
0,
&req->u.io.overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
uv_read_t* req;
int result;
assert(handle->flags & UV_HANDLE_READING);
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
assert(handle->handle != INVALID_HANDLE_VALUE);
req = &handle->read_req;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
handle->pipe.conn.readfile_thread_handle = NULL;
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
} else {
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
assert(req->event_handle != NULL);
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
}
result = ReadFile(handle->handle,
&uv_zero_,
0,
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle == INVALID_HANDLE_VALUE) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_read_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
}
}
}
eof_timer_start(handle);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
return;
error:
uv__insert_pending_req(loop, (uv_req_t*)req);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
int uv__pipe_read_start(uv_pipe_t* handle,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
handle->read_req.event_handle == NULL) {
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (handle->read_req.event_handle == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
uv__pipe_queue_read(loop, handle);
}
return 0;
}
static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
uv_write_t* req) {
req->next_req = NULL;
if (handle->pipe.conn.non_overlapped_writes_tail) {
req->next_req =
handle->pipe.conn.non_overlapped_writes_tail->next_req;
handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
handle->pipe.conn.non_overlapped_writes_tail = req;
} else {
req->next_req = (uv_req_t*)req;
handle->pipe.conn.non_overlapped_writes_tail = req;
}
}
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
uv_write_t* req;
if (handle->pipe.conn.non_overlapped_writes_tail) {
req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
if (req == handle->pipe.conn.non_overlapped_writes_tail) {
handle->pipe.conn.non_overlapped_writes_tail = NULL;
} else {
handle->pipe.conn.non_overlapped_writes_tail->next_req =
req->next_req;
}
return req;
} else {
return NULL;
}
}
static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
if (req) {
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
uv_fatal_error(GetLastError(), "QueueUserWorkItem");
}
}
}
static int uv__build_coalesced_write_req(uv_write_t* user_req,
const uv_buf_t bufs[],
size_t nbufs,
uv_write_t** req_out,
uv_buf_t* write_buf_out) {
char* heap_buffer;
size_t heap_buffer_length, heap_buffer_offset;
uv__coalesced_write_t* coalesced_write_req;
char* data_start;
size_t data_length;
unsigned int i;
data_length = 0;
for (i = 0; i < nbufs; i++)
data_length += bufs[i].len;
if (data_length > UINT32_MAX)
return WSAENOBUFS;
heap_buffer_length = sizeof *coalesced_write_req +
data_length;
heap_buffer = uv__malloc(heap_buffer_length);
if (heap_buffer == NULL)
return ERROR_NOT_ENOUGH_MEMORY;
coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
coalesced_write_req->req = *user_req;
coalesced_write_req->req.coalesced = 1;
coalesced_write_req->user_req = user_req;
heap_buffer_offset = sizeof *coalesced_write_req;
data_start = &heap_buffer[heap_buffer_offset];
for (i = 0; i < nbufs; i++) {
memcpy(&heap_buffer[heap_buffer_offset],
bufs[i].base,
bufs[i].len);
heap_buffer_offset += bufs[i].len;
}
assert(heap_buffer_offset == heap_buffer_length);
*req_out = &coalesced_write_req->req;
*write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
return 0;
}
static int uv__pipe_write_data(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t bufs[],
size_t nbufs,
uv_write_cb cb,
int copy_always) {
int err;
int result;
uv_buf_t write_buf;
assert(handle->handle != INVALID_HANDLE_VALUE);
UV_REQ_INIT(req, UV_WRITE);
req->handle = (uv_stream_t*) handle;
req->send_handle = NULL;
req->cb = cb;
req->coalesced = 0;
req->event_handle = NULL;
req->wait_handle = INVALID_HANDLE_VALUE;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (req->event_handle == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
}
req->write_buffer = uv_null_buf_;
if (nbufs == 0) {
write_buf = uv_null_buf_;
} else if (nbufs == 1 && !copy_always) {
write_buf = bufs[0];
} else {
err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
if (err != 0)
return err;
}
if ((handle->flags &
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
DWORD bytes;
result =
WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
if (!result) {
err = GetLastError();
return err;
} else {
req->u.io.queued_bytes = 0;
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = write_buf;
uv__insert_non_overlapped_write_req(handle, req);
if (handle->stream.conn.write_reqs_pending == 0) {
uv__queue_non_overlapped_write(handle);
}
req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
result = WriteFile(handle->handle,
write_buf.base,
write_buf.len,
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
err = GetLastError();
CloseHandle(req->event_handle);
req->event_handle = NULL;
return err;
}
if (result) {
req->u.io.queued_bytes = 0;
} else {
req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
if (WaitForSingleObject(req->event_handle, INFINITE) !=
WAIT_OBJECT_0) {
err = GetLastError();
CloseHandle(req->event_handle);
req->event_handle = NULL;
return err;
}
}
CloseHandle(req->event_handle);
req->event_handle = NULL;
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
return 0;
} else {
result = WriteFile(handle->handle,
write_buf.base,
write_buf.len,
NULL,
&req->u.io.overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
return GetLastError();
}
if (result) {
req->u.io.queued_bytes = 0;
} else {
req->u.io.queued_bytes = write_buf.len;
handle->write_queue_size += req->u.io.queued_bytes;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
return GetLastError();
}
}
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->stream.conn.write_reqs_pending++;
return 0;
}
static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
if (*pid == 0)
*pid = GetCurrentProcessId();
return *pid;
}
int uv__pipe_write_ipc(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t data_bufs[],
size_t data_buf_count,
uv_stream_t* send_handle,
uv_write_cb cb) {
uv_buf_t stack_bufs[6];
uv_buf_t* bufs;
size_t buf_count, buf_index;
uv__ipc_frame_header_t frame_header;
uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
uv__ipc_socket_xfer_info_t xfer_info;
uint64_t data_length;
size_t i;
int err;
data_length = 0;
for (i = 0; i < data_buf_count; i++)
data_length += data_bufs[i].len;
if (data_length > UINT32_MAX)
return WSAENOBUFS;
if (send_handle != NULL) {
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
if (send_tcp_handle->type != UV_TCP)
return ERROR_NOT_SUPPORTED;
err = uv__tcp_xfer_export(send_tcp_handle,
uv__pipe_get_ipc_remote_pid(handle),
&xfer_type,
&xfer_info);
if (err != 0)
return err;
}
buf_count = 1 + data_buf_count;
if (send_handle != NULL)
buf_count += 1;
if (buf_count < ARRAY_SIZE(stack_bufs)) {
bufs = stack_bufs;
} else {
bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
if (bufs == NULL)
return ERROR_NOT_ENOUGH_MEMORY;
}
buf_index = 0;
memset(&frame_header, 0, sizeof frame_header);
bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
if (send_handle != NULL) {
switch (xfer_type) {
case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
break;
case UV__IPC_SOCKET_XFER_TCP_SERVER:
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
break;
default:
assert(0);
}
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
}
if (data_length > 0) {
frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
frame_header.data_length = (uint32_t) data_length;
for (i = 0; i < data_buf_count; i++)
bufs[buf_index++] = data_bufs[i];
}
err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
if (bufs != stack_bufs) {
uv__free(bufs);
}
return err;
}
int uv__pipe_write(uv_loop_t* loop,
uv_write_t* req,
uv_pipe_t* handle,
const uv_buf_t bufs[],
size_t nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
if (handle->ipc) {
return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
} else {
assert(send_handle == NULL);
return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
}
}
static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
}
static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_buf_t buf) {
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
}
static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
int error, uv_buf_t buf) {
if (error == ERROR_BROKEN_PIPE) {
uv__pipe_read_eof(loop, handle, buf);
} else {
uv__pipe_read_error(loop, handle, error, buf);
}
}
static void uv__pipe_queue_ipc_xfer_info(
uv_pipe_t* handle,
uv__ipc_socket_xfer_type_t xfer_type,
uv__ipc_socket_xfer_info_t* xfer_info) {
uv__ipc_xfer_queue_item_t* item;
item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
if (item == NULL)
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
item->xfer_type = xfer_type;
item->xfer_info = *xfer_info;
QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
handle->pipe.conn.ipc_xfer_queue_length++;
}
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
DWORD bytes_read, bytes_read_now;
bytes_read = 0;
while (bytes_read < count) {
if (!ReadFile(h,
(char*) buffer + bytes_read,
count - bytes_read,
&bytes_read_now,
NULL)) {
return GetLastError();
}
bytes_read += bytes_read_now;
}
assert(bytes_read == count);
return 0;
}
static DWORD uv__pipe_read_data(uv_loop_t* loop,
uv_pipe_t* handle,
DWORD suggested_bytes,
DWORD max_bytes) {
DWORD bytes_read;
uv_buf_t buf;
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
return 0;
}
if (max_bytes > buf.len)
max_bytes = buf.len;
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
return 0;
}
handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
return bytes_read;
}
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
int err;
if (*data_remaining > 0) {
DWORD bytes_read =
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
*data_remaining -= bytes_read;
return bytes_read;
} else {
uv__ipc_frame_header_t frame_header;
uint32_t xfer_flags;
uv__ipc_socket_xfer_type_t xfer_type;
uv__ipc_socket_xfer_info_t xfer_info;
err = uv__pipe_read_exactly(
handle->handle, &frame_header, sizeof frame_header);
if (err)
goto error;
if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
goto invalid;
if (frame_header.reserved2 != 0)
goto invalid;
xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
? UV__IPC_SOCKET_XFER_TCP_CONNECTION
: UV__IPC_SOCKET_XFER_TCP_SERVER;
} else if (xfer_flags == 0) {
xfer_type = UV__IPC_SOCKET_XFER_NONE;
} else {
goto invalid;
}
if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
*data_remaining = frame_header.data_length;
} else if (frame_header.data_length != 0) {
goto invalid;
}
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
return sizeof frame_header;
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
return sizeof frame_header + sizeof xfer_info;
}
invalid:
err = WSAECONNABORTED;
error:
uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
return 0;
}
void uv__process_pipe_read_req(uv_loop_t* loop,
uv_pipe_t* handle,
uv_req_t* req) {
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
DECREASE_PENDING_REQ_COUNT(handle);
eof_timer_stop(handle);
if (!(handle->flags & UV_HANDLE_READING))
return;
if (!REQ_SUCCESS(req)) {
DWORD err = GET_REQ_ERROR(req);
if (err != ERROR_OPERATION_ABORTED)
uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
} else {
DWORD avail;
avail = 0;
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
while (avail > 0 && handle->flags & UV_HANDLE_READING) {
DWORD bytes_read =
handle->ipc ? uv__pipe_read_ipc(loop, handle)
: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
if (bytes_read == 0)
break;
if (bytes_read > avail)
break;
avail -= bytes_read;
}
}
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv__pipe_queue_read(loop, handle);
}
}
void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
int err;
assert(handle->type == UV_NAMED_PIPE);
assert(handle->write_queue_size >= req->u.io.queued_bytes);
handle->write_queue_size -= req->u.io.queued_bytes;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
err = GET_REQ_ERROR(req);
if (req->coalesced) {
uv__coalesced_write_t* coalesced_write =
container_of(req, uv__coalesced_write_t, req);
req = coalesced_write->user_req;
uv__free(coalesced_write);
}
if (req->cb) {
req->cb(req, uv_translate_sys_error(err));
}
handle->stream.conn.write_reqs_pending--;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->pipe.conn.non_overlapped_writes_tail) {
assert(handle->stream.conn.write_reqs_pending > 0);
uv__queue_non_overlapped_write(handle);
}
if (handle->stream.conn.write_reqs_pending == 0)
if (handle->flags & UV_HANDLE_SHUTTING)
uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* raw_req) {
uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
assert(handle->type == UV_NAMED_PIPE);
if (handle->flags & UV_HANDLE_CLOSING) {
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
if (REQ_SUCCESS(req)) {
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
req->next_pending = handle->pipe.serv.pending_accepts;
handle->pipe.serv.pending_accepts = req;
if (handle->stream.serv.connection_cb) {
handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
}
} else {
if (req->pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
}
if (!(handle->flags & UV_HANDLE_CLOSING)) {
uv__pipe_queue_accept(loop, handle, req, FALSE);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_connect_t* req) {
HANDLE pipeHandle;
DWORD duplex_flags;
int err;
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
err = 0;
if (REQ_SUCCESS(req)) {
pipeHandle = req->u.connect.pipeHandle;
duplex_flags = req->u.connect.duplex_flags;
err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
if (err)
CloseHandle(pipeHandle);
} else {
err = uv_translate_sys_error(GET_REQ_ERROR(req));
}
if (req->cb)
req->cb(req, err);
DECREASE_PENDING_REQ_COUNT(handle);
}
void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_shutdown_t* req) {
int err;
assert(handle->type == UV_NAMED_PIPE);
handle->stream.conn.shutdown_req = NULL;
handle->flags &= ~UV_HANDLE_SHUTTING;
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (handle->flags & UV_HANDLE_CLOSING) {
err = UV_ECANCELED;
} else if (!REQ_SUCCESS(req)) {
err = uv_translate_sys_error(GET_REQ_ERROR(req));
} else {
if (handle->flags & UV_HANDLE_READABLE) {
eof_timer_init(handle);
if (handle->flags & UV_HANDLE_READ_PENDING) {
eof_timer_start(handle);
}
} else {
close_pipe(handle);
}
err = 0;
}
if (req->cb)
req->cb(req, err);
DECREASE_PENDING_REQ_COUNT(handle);
}
static void eof_timer_init(uv_pipe_t* pipe) {
int r;
assert(pipe->pipe.conn.eof_timer == NULL);
assert(pipe->flags & UV_HANDLE_CONNECTION);
pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
assert(r == 0);
(void) r;
pipe->pipe.conn.eof_timer->data = pipe;
uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
}
static void eof_timer_start(uv_pipe_t* pipe) {
assert(pipe->flags & UV_HANDLE_CONNECTION);
if (pipe->pipe.conn.eof_timer != NULL) {
uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
}
}
static void eof_timer_stop(uv_pipe_t* pipe) {
assert(pipe->flags & UV_HANDLE_CONNECTION);
if (pipe->pipe.conn.eof_timer != NULL) {
uv_timer_stop(pipe->pipe.conn.eof_timer);
}
}
static void eof_timer_cb(uv_timer_t* timer) {
uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
uv_loop_t* loop = timer->loop;
assert(pipe->type == UV_NAMED_PIPE);
assert(pipe->flags & UV_HANDLE_READ_PENDING);
if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
return;
}
close_pipe(pipe);
uv_read_stop((uv_stream_t*) pipe);
uv__pipe_read_eof(loop, pipe, uv_null_buf_);
}
static void eof_timer_destroy(uv_pipe_t* pipe) {
assert(pipe->flags & UV_HANDLE_CONNECTION);
if (pipe->pipe.conn.eof_timer) {
uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
pipe->pipe.conn.eof_timer = NULL;
}
}
static void eof_timer_close_cb(uv_handle_t* handle) {
assert(handle->type == UV_TIMER);
uv__free(handle);
}
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
HANDLE os_handle = uv__get_osfhandle(file);
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_ACCESS_INFORMATION access;
DWORD duplex_flags = 0;
int err;
if (os_handle == INVALID_HANDLE_VALUE)
return UV_EBADF;
if (pipe->flags & UV_HANDLE_PIPESERVER)
return UV_EINVAL;
if (pipe->flags & UV_HANDLE_CONNECTION)
return UV_EBUSY;
uv__pipe_connection_init(pipe);
uv__once_init();
if (file <= 2) {
if (!DuplicateHandle(INVALID_HANDLE_VALUE,
os_handle,
INVALID_HANDLE_VALUE,
&os_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS))
return uv_translate_sys_error(GetLastError());
assert(os_handle != INVALID_HANDLE_VALUE);
file = -1;
}
nt_status = pNtQueryInformationFile(os_handle,
&io_status,
&access,
sizeof(access),
FileAccessInformation);
if (nt_status != STATUS_SUCCESS)
return UV_EINVAL;
if (pipe->ipc) {
if (!(access.AccessFlags & FILE_WRITE_DATA) ||
!(access.AccessFlags & FILE_READ_DATA)) {
return UV_EINVAL;
}
}
if (access.AccessFlags & FILE_WRITE_DATA)
duplex_flags |= UV_HANDLE_WRITABLE;
if (access.AccessFlags & FILE_READ_DATA)
duplex_flags |= UV_HANDLE_READABLE;
err = uv__set_pipe_handle(pipe->loop,
pipe,
os_handle,
file,
duplex_flags);
if (err) {
if (file == -1)
CloseHandle(os_handle);
return err;
}
if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
}
return 0;
}
static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_NAME_INFORMATION tmp_name_info;
FILE_NAME_INFORMATION* name_info;
WCHAR* name_buf;
unsigned int addrlen;
unsigned int name_size;
unsigned int name_len;
int err;
uv__once_init();
name_info = NULL;
if (handle->name != NULL) {
name_buf = handle->name;
name_len = wcslen(name_buf);
addrlen = WideCharToMultiByte(CP_UTF8,
0,
name_buf,
name_len,
NULL,
0,
NULL,
NULL);
if (!addrlen) {
*size = 0;
err = uv_translate_sys_error(GetLastError());
return err;
} else if (addrlen >= *size) {
*size = addrlen + 1;
err = UV_ENOBUFS;
goto error;
}
addrlen = WideCharToMultiByte(CP_UTF8,
0,
name_buf,
name_len,
buffer,
addrlen,
NULL,
NULL);
if (!addrlen) {
*size = 0;
err = uv_translate_sys_error(GetLastError());
return err;
}
*size = addrlen;
buffer[addrlen] = '\0';
return 0;
}
if (handle->handle == INVALID_HANDLE_VALUE) {
*size = 0;
return UV_EINVAL;
}
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
uv__pipe_interrupt_read((uv_pipe_t*) handle);
}
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
&tmp_name_info,
sizeof tmp_name_info,
FileNameInformation);
if (nt_status == STATUS_BUFFER_OVERFLOW) {
name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
name_info = uv__malloc(name_size);
if (!name_info) {
*size = 0;
err = UV_ENOMEM;
goto cleanup;
}
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
name_info,
name_size,
FileNameInformation);
}
if (nt_status != STATUS_SUCCESS) {
*size = 0;
err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
goto error;
}
if (!name_info) {
name_buf = tmp_name_info.FileName;
name_len = tmp_name_info.FileNameLength;
} else {
name_buf = name_info->FileName;
name_len = name_info->FileNameLength;
}
if (name_len == 0) {
*size = 0;
err = 0;
goto error;
}
name_len /= sizeof(WCHAR);
addrlen = WideCharToMultiByte(CP_UTF8,
0,
name_buf,
name_len,
NULL,
0,
NULL,
NULL);
if (!addrlen) {
*size = 0;
err = uv_translate_sys_error(GetLastError());
goto error;
} else if (pipe_prefix_len + addrlen >= *size) {
*size = pipe_prefix_len + addrlen + 1;
err = UV_ENOBUFS;
goto error;
}
memcpy(buffer, pipe_prefix, pipe_prefix_len);
addrlen = WideCharToMultiByte(CP_UTF8,
0,
name_buf,
name_len,
buffer+pipe_prefix_len,
*size-pipe_prefix_len,
NULL,
NULL);
if (!addrlen) {
*size = 0;
err = uv_translate_sys_error(GetLastError());
goto error;
}
addrlen += pipe_prefix_len;
*size = addrlen;
buffer[addrlen] = '\0';
err = 0;
error:
uv__free(name_info);
cleanup:
return err;
}
int uv_pipe_pending_count(uv_pipe_t* handle) {
if (!handle->ipc)
return 0;
return handle->pipe.conn.ipc_xfer_queue_length;
}
int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
if (handle->flags & UV_HANDLE_BOUND)
return uv__pipe_getname(handle, buffer, size);
if (handle->flags & UV_HANDLE_CONNECTION ||
handle->handle != INVALID_HANDLE_VALUE) {
*size = 0;
return 0;
}
return UV_EBADF;
}
int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
if (handle->flags & UV_HANDLE_BOUND)
return UV_ENOTCONN;
if (handle->handle != INVALID_HANDLE_VALUE)
return uv__pipe_getname(handle, buffer, size);
if (handle->flags & UV_HANDLE_CONNECTION) {
if (handle->name != NULL)
return uv__pipe_getname(handle, buffer, size);
}
return UV_EBADF;
}
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
if (!handle->ipc)
return UV_UNKNOWN_HANDLE;
if (handle->pipe.conn.ipc_xfer_queue_length == 0)
return UV_UNKNOWN_HANDLE;
else
return UV_TCP;
}
int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
PACL old_dacl, new_dacl;
PSECURITY_DESCRIPTOR sd;
EXPLICIT_ACCESS ea;
PSID everyone;
int error;
if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
return UV_EBADF;
if (mode != UV_READABLE &&
mode != UV_WRITABLE &&
mode != (UV_WRITABLE | UV_READABLE))
return UV_EINVAL;
if (!AllocateAndInitializeSid(&sid_world,
1,
SECURITY_WORLD_RID,
0, 0, 0, 0, 0, 0, 0,
&everyone)) {
error = GetLastError();
goto done;
}
if (GetSecurityInfo(handle->handle,
SE_KERNEL_OBJECT,
DACL_SECURITY_INFORMATION,
NULL,
NULL,
&old_dacl,
NULL,
&sd)) {
error = GetLastError();
goto clean_sid;
}
memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
if (mode & UV_READABLE)
ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
if (mode & UV_WRITABLE)
ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
ea.grfAccessPermissions |= SYNCHRONIZE;
ea.grfAccessMode = SET_ACCESS;
ea.grfInheritance = NO_INHERITANCE;
ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
ea.Trustee.ptstrName = (LPTSTR)everyone;
if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
error = GetLastError();
goto clean_sd;
}
if (SetSecurityInfo(handle->handle,
SE_KERNEL_OBJECT,
DACL_SECURITY_INFORMATION,
NULL,
NULL,
new_dacl,
NULL)) {
error = GetLastError();
goto clean_dacl;
}
error = 0;
clean_dacl:
LocalFree((HLOCAL) new_dacl);
clean_sd:
LocalFree((HLOCAL) sd);
clean_sid:
FreeSid(everyone);
done:
return uv_translate_sys_error(error);
}