Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Kitware
GitHub Repository: Kitware/CMake
Path: blob/master/Utilities/cmcppdap/src/session.cpp
3153 views
1
// Copyright 2019 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// https://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
#include "content_stream.h"
16
17
#include "dap/any.h"
18
#include "dap/session.h"
19
20
#include "chan.h"
21
#include "json_serializer.h"
22
#include "socket.h"
23
24
#include <stdarg.h>
25
#include <stdio.h>
26
#include <atomic>
27
#include <deque>
28
#include <memory>
29
#include <mutex>
30
#include <thread>
31
#include <unordered_map>
32
#include <vector>
33
34
namespace {
35
36
class Impl : public dap::Session {
37
public:
38
void setOnInvalidData(dap::OnInvalidData onInvalidData_) override {
39
this->onInvalidData = onInvalidData_;
40
}
41
42
void onError(const ErrorHandler& handler) override { handlers.put(handler); }
43
44
void registerHandler(const dap::TypeInfo* typeinfo,
45
const GenericRequestHandler& handler) override {
46
handlers.put(typeinfo, handler);
47
}
48
49
void registerHandler(const dap::TypeInfo* typeinfo,
50
const GenericEventHandler& handler) override {
51
handlers.put(typeinfo, handler);
52
}
53
54
void registerHandler(const dap::TypeInfo* typeinfo,
55
const GenericResponseSentHandler& handler) override {
56
handlers.put(typeinfo, handler);
57
}
58
59
std::function<void()> getPayload() override {
60
auto request = reader.read();
61
if (request.size() > 0) {
62
if (auto payload = processMessage(request)) {
63
return payload;
64
}
65
}
66
return {};
67
}
68
69
void connect(const std::shared_ptr<dap::Reader>& r,
70
const std::shared_ptr<dap::Writer>& w) override {
71
if (isBound.exchange(true)) {
72
handlers.error("Session::connect called twice");
73
return;
74
}
75
76
reader = dap::ContentReader(r, this->onInvalidData);
77
writer = dap::ContentWriter(w);
78
}
79
80
void startProcessingMessages(
81
const ClosedHandler& onClose /* = {} */) override {
82
if (isProcessingMessages.exchange(true)) {
83
handlers.error("Session::startProcessingMessages() called twice");
84
return;
85
}
86
recvThread = std::thread([this, onClose] {
87
while (reader.isOpen()) {
88
if (auto payload = getPayload()) {
89
inbox.put(std::move(payload));
90
}
91
}
92
if (onClose) {
93
onClose();
94
}
95
});
96
97
dispatchThread = std::thread([this] {
98
while (auto payload = inbox.take()) {
99
payload.value()();
100
}
101
});
102
}
103
104
bool send(const dap::TypeInfo* requestTypeInfo,
105
const dap::TypeInfo* responseTypeInfo,
106
const void* request,
107
const GenericResponseHandler& responseHandler) override {
108
int seq = nextSeq++;
109
110
handlers.put(seq, responseTypeInfo, responseHandler);
111
112
dap::json::Serializer s;
113
if (!s.object([&](dap::FieldSerializer* fs) {
114
return fs->field("seq", dap::integer(seq)) &&
115
fs->field("type", "request") &&
116
fs->field("command", requestTypeInfo->name()) &&
117
fs->field("arguments", [&](dap::Serializer* s) {
118
return requestTypeInfo->serialize(s, request);
119
});
120
})) {
121
return false;
122
}
123
return send(s.dump());
124
}
125
126
bool send(const dap::TypeInfo* typeinfo, const void* event) override {
127
dap::json::Serializer s;
128
if (!s.object([&](dap::FieldSerializer* fs) {
129
return fs->field("seq", dap::integer(nextSeq++)) &&
130
fs->field("type", "event") &&
131
fs->field("event", typeinfo->name()) &&
132
fs->field("body", [&](dap::Serializer* s) {
133
return typeinfo->serialize(s, event);
134
});
135
})) {
136
return false;
137
}
138
return send(s.dump());
139
}
140
141
~Impl() override {
142
inbox.close();
143
reader.close();
144
writer.close();
145
if (recvThread.joinable()) {
146
recvThread.join();
147
}
148
if (dispatchThread.joinable()) {
149
dispatchThread.join();
150
}
151
}
152
153
private:
154
using Payload = std::function<void()>;
155
156
class EventHandlers {
157
public:
158
void put(const ErrorHandler& handler) {
159
std::unique_lock<std::mutex> lock(errorMutex);
160
errorHandler = handler;
161
}
162
163
void error(const char* format, ...) {
164
va_list vararg;
165
va_start(vararg, format);
166
std::unique_lock<std::mutex> lock(errorMutex);
167
errorLocked(format, vararg);
168
va_end(vararg);
169
}
170
171
std::pair<const dap::TypeInfo*, GenericRequestHandler> request(
172
const std::string& name) {
173
std::unique_lock<std::mutex> lock(requestMutex);
174
auto it = requestMap.find(name);
175
return (it != requestMap.end()) ? it->second : decltype(it->second){};
176
}
177
178
void put(const dap::TypeInfo* typeinfo,
179
const GenericRequestHandler& handler) {
180
std::unique_lock<std::mutex> lock(requestMutex);
181
auto added =
182
requestMap
183
.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
184
.second;
185
if (!added) {
186
errorfLocked("Request handler for '%s' already registered",
187
typeinfo->name().c_str());
188
}
189
}
190
191
std::pair<const dap::TypeInfo*, GenericResponseHandler> response(
192
int64_t seq) {
193
std::unique_lock<std::mutex> lock(responseMutex);
194
auto responseIt = responseMap.find(seq);
195
if (responseIt == responseMap.end()) {
196
errorfLocked("Unknown response with sequence %d", seq);
197
return {};
198
}
199
auto out = std::move(responseIt->second);
200
responseMap.erase(seq);
201
return out;
202
}
203
204
void put(int seq,
205
const dap::TypeInfo* typeinfo,
206
const GenericResponseHandler& handler) {
207
std::unique_lock<std::mutex> lock(responseMutex);
208
auto added =
209
responseMap.emplace(seq, std::make_pair(typeinfo, handler)).second;
210
if (!added) {
211
errorfLocked("Response handler for sequence %d already registered",
212
seq);
213
}
214
}
215
216
std::pair<const dap::TypeInfo*, GenericEventHandler> event(
217
const std::string& name) {
218
std::unique_lock<std::mutex> lock(eventMutex);
219
auto it = eventMap.find(name);
220
return (it != eventMap.end()) ? it->second : decltype(it->second){};
221
}
222
223
void put(const dap::TypeInfo* typeinfo,
224
const GenericEventHandler& handler) {
225
std::unique_lock<std::mutex> lock(eventMutex);
226
auto added =
227
eventMap.emplace(typeinfo->name(), std::make_pair(typeinfo, handler))
228
.second;
229
if (!added) {
230
errorfLocked("Event handler for '%s' already registered",
231
typeinfo->name().c_str());
232
}
233
}
234
235
GenericResponseSentHandler responseSent(const dap::TypeInfo* typeinfo) {
236
std::unique_lock<std::mutex> lock(responseSentMutex);
237
auto it = responseSentMap.find(typeinfo);
238
return (it != responseSentMap.end()) ? it->second
239
: decltype(it->second){};
240
}
241
242
void put(const dap::TypeInfo* typeinfo,
243
const GenericResponseSentHandler& handler) {
244
std::unique_lock<std::mutex> lock(responseSentMutex);
245
auto added = responseSentMap.emplace(typeinfo, handler).second;
246
if (!added) {
247
errorfLocked("Response sent handler for '%s' already registered",
248
typeinfo->name().c_str());
249
}
250
}
251
252
private:
253
void errorfLocked(const char* format, ...) {
254
va_list vararg;
255
va_start(vararg, format);
256
errorLocked(format, vararg);
257
va_end(vararg);
258
}
259
260
void errorLocked(const char* format, va_list args) {
261
char buf[2048];
262
vsnprintf(buf, sizeof(buf), format, args);
263
if (errorHandler) {
264
errorHandler(buf);
265
}
266
}
267
268
std::mutex errorMutex;
269
ErrorHandler errorHandler;
270
271
std::mutex requestMutex;
272
std::unordered_map<std::string,
273
std::pair<const dap::TypeInfo*, GenericRequestHandler>>
274
requestMap;
275
276
std::mutex responseMutex;
277
std::unordered_map<int64_t,
278
std::pair<const dap::TypeInfo*, GenericResponseHandler>>
279
responseMap;
280
281
std::mutex eventMutex;
282
std::unordered_map<std::string,
283
std::pair<const dap::TypeInfo*, GenericEventHandler>>
284
eventMap;
285
286
std::mutex responseSentMutex;
287
std::unordered_map<const dap::TypeInfo*, GenericResponseSentHandler>
288
responseSentMap;
289
}; // EventHandlers
290
291
Payload processMessage(const std::string& str) {
292
auto d = dap::json::Deserializer(str);
293
dap::string type;
294
if (!d.field("type", &type)) {
295
handlers.error("Message missing string 'type' field");
296
return {};
297
}
298
299
dap::integer sequence = 0;
300
if (!d.field("seq", &sequence)) {
301
handlers.error("Message missing number 'seq' field");
302
return {};
303
}
304
305
if (type == "request") {
306
return processRequest(&d, sequence);
307
} else if (type == "event") {
308
return processEvent(&d);
309
} else if (type == "response") {
310
processResponse(&d);
311
return {};
312
} else {
313
handlers.error("Unknown message type '%s'", type.c_str());
314
}
315
316
return {};
317
}
318
319
Payload processRequest(dap::json::Deserializer* d, dap::integer sequence) {
320
dap::string command;
321
if (!d->field("command", &command)) {
322
handlers.error("Request missing string 'command' field");
323
return {};
324
}
325
326
const dap::TypeInfo* typeinfo;
327
GenericRequestHandler handler;
328
std::tie(typeinfo, handler) = handlers.request(command);
329
if (!typeinfo) {
330
handlers.error("No request handler registered for command '%s'",
331
command.c_str());
332
return {};
333
}
334
335
auto data = new uint8_t[typeinfo->size()];
336
typeinfo->construct(data);
337
338
if (!d->field("arguments", [&](dap::Deserializer* d) {
339
return typeinfo->deserialize(d, data);
340
})) {
341
handlers.error("Failed to deserialize request");
342
typeinfo->destruct(data);
343
delete[] data;
344
return {};
345
}
346
347
return [=] {
348
handler(
349
data,
350
[=](const dap::TypeInfo* typeinfo, const void* data) {
351
// onSuccess
352
dap::json::Serializer s;
353
s.object([&](dap::FieldSerializer* fs) {
354
return fs->field("seq", dap::integer(nextSeq++)) &&
355
fs->field("type", "response") &&
356
fs->field("request_seq", sequence) &&
357
fs->field("success", dap::boolean(true)) &&
358
fs->field("command", command) &&
359
fs->field("body", [&](dap::Serializer* s) {
360
return typeinfo->serialize(s, data);
361
});
362
});
363
send(s.dump());
364
365
if (auto handler = handlers.responseSent(typeinfo)) {
366
handler(data, nullptr);
367
}
368
},
369
[=](const dap::TypeInfo* typeinfo, const dap::Error& error) {
370
// onError
371
dap::json::Serializer s;
372
s.object([&](dap::FieldSerializer* fs) {
373
return fs->field("seq", dap::integer(nextSeq++)) &&
374
fs->field("type", "response") &&
375
fs->field("request_seq", sequence) &&
376
fs->field("success", dap::boolean(false)) &&
377
fs->field("command", command) &&
378
fs->field("message", error.message);
379
});
380
send(s.dump());
381
382
if (auto handler = handlers.responseSent(typeinfo)) {
383
handler(nullptr, &error);
384
}
385
});
386
typeinfo->destruct(data);
387
delete[] data;
388
};
389
}
390
391
Payload processEvent(dap::json::Deserializer* d) {
392
dap::string event;
393
if (!d->field("event", &event)) {
394
handlers.error("Event missing string 'event' field");
395
return {};
396
}
397
398
const dap::TypeInfo* typeinfo;
399
GenericEventHandler handler;
400
std::tie(typeinfo, handler) = handlers.event(event);
401
if (!typeinfo) {
402
handlers.error("No event handler registered for event '%s'",
403
event.c_str());
404
return {};
405
}
406
407
auto data = new uint8_t[typeinfo->size()];
408
typeinfo->construct(data);
409
410
// "body" is an optional field for some events, such as "Terminated Event".
411
bool body_ok = true;
412
d->field("body", [&](dap::Deserializer* d) {
413
if (!typeinfo->deserialize(d, data)) {
414
body_ok = false;
415
}
416
return true;
417
});
418
419
if (!body_ok) {
420
handlers.error("Failed to deserialize event '%s' body", event.c_str());
421
typeinfo->destruct(data);
422
delete[] data;
423
return {};
424
}
425
426
return [=] {
427
handler(data);
428
typeinfo->destruct(data);
429
delete[] data;
430
};
431
}
432
433
void processResponse(const dap::Deserializer* d) {
434
dap::integer requestSeq = 0;
435
if (!d->field("request_seq", &requestSeq)) {
436
handlers.error("Response missing int 'request_seq' field");
437
return;
438
}
439
440
const dap::TypeInfo* typeinfo;
441
GenericResponseHandler handler;
442
std::tie(typeinfo, handler) = handlers.response(requestSeq);
443
if (!typeinfo) {
444
handlers.error("Unknown response with sequence %d", requestSeq);
445
return;
446
}
447
448
dap::boolean success = false;
449
if (!d->field("success", &success)) {
450
handlers.error("Response missing int 'success' field");
451
return;
452
}
453
454
if (success) {
455
auto data = std::unique_ptr<uint8_t[]>(new uint8_t[typeinfo->size()]);
456
typeinfo->construct(data.get());
457
458
// "body" field in Response is an optional field.
459
d->field("body", [&](const dap::Deserializer* d) {
460
return typeinfo->deserialize(d, data.get());
461
});
462
463
handler(data.get(), nullptr);
464
typeinfo->destruct(data.get());
465
} else {
466
std::string message;
467
if (!d->field("message", &message)) {
468
handlers.error("Failed to deserialize message");
469
return;
470
}
471
auto error = dap::Error("%s", message.c_str());
472
handler(nullptr, &error);
473
}
474
}
475
476
bool send(const std::string& s) {
477
std::unique_lock<std::mutex> lock(sendMutex);
478
if (!writer.isOpen()) {
479
handlers.error("Send failed as the writer is closed");
480
return false;
481
}
482
return writer.write(s);
483
}
484
485
std::atomic<bool> isBound = {false};
486
std::atomic<bool> isProcessingMessages = {false};
487
dap::ContentReader reader;
488
dap::ContentWriter writer;
489
490
std::atomic<bool> shutdown = {false};
491
EventHandlers handlers;
492
std::thread recvThread;
493
std::thread dispatchThread;
494
dap::Chan<Payload> inbox;
495
std::atomic<uint32_t> nextSeq = {1};
496
std::mutex sendMutex;
497
dap::OnInvalidData onInvalidData = dap::kIgnore;
498
};
499
500
} // anonymous namespace
501
502
namespace dap {
503
504
Error::Error(const std::string& message) : message(message) {}
505
506
Error::Error(const char* msg, ...) {
507
char buf[2048];
508
va_list vararg;
509
va_start(vararg, msg);
510
vsnprintf(buf, sizeof(buf), msg, vararg);
511
va_end(vararg);
512
message = buf;
513
}
514
515
Session::~Session() = default;
516
517
std::unique_ptr<Session> Session::create() {
518
return std::unique_ptr<Session>(new Impl());
519
}
520
521
} // namespace dap
522
523