Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ulixee
GitHub Repository: ulixee/secret-agent
Path: blob/main/puppet/lib/PipeTransport.ts
1029 views
1
/**
2
* Copyright 2020 Data Liberation Foundation, Inc. All rights reserved.
3
*
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
* you may not use this file except in compliance with the License.
6
* You may obtain a copy of the License at
7
*
8
* http://www.apache.org/licenses/LICENSE-2.0
9
*
10
* Unless required by applicable law or agreed to in writing, software
11
* distributed under the License is distributed on an "AS IS" BASIS,
12
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
* See the License for the specific language governing permissions and
14
* limitations under the License.
15
*/
16
import EventSubscriber from '@secret-agent/commons/EventSubscriber';
17
import Log from '@secret-agent/commons/Logger';
18
import { ChildProcess } from 'child_process';
19
import IConnectionTransport from '@secret-agent/interfaces/IConnectionTransport';
20
21
const { log } = Log(module);
22
23
export class PipeTransport implements IConnectionTransport {
24
pipeWrite: NodeJS.WritableStream;
25
pendingMessage: string;
26
eventSubscriber = new EventSubscriber();
27
isClosed = false;
28
29
public onMessageFn: (message: string) => void;
30
public readonly onCloseFns: (() => void)[] = [];
31
32
constructor(childProcess: ChildProcess) {
33
const { 3: pipeWrite, 4: pipeRead } = childProcess.stdio;
34
this.pipeWrite = pipeWrite as NodeJS.WritableStream;
35
this.pipeWrite.on('error', error => {
36
if (this.isClosed) return;
37
log.error('PipeTransport.WriteError', { error, sessionId: null });
38
});
39
this.pendingMessage = '';
40
this.eventSubscriber.on(pipeRead, 'data', this.onData.bind(this));
41
this.eventSubscriber.on(pipeRead, 'close', this.onReadClosed.bind(this));
42
this.eventSubscriber.on(pipeRead, 'error', error =>
43
log.error('PipeTransport.ReadError', { error, sessionId: null }),
44
);
45
this.eventSubscriber.on(pipeWrite, 'error', error =>
46
log.error('PipeTransport.WriteError', { error, sessionId: null }),
47
);
48
}
49
50
send(message: string): boolean {
51
if (!this.isClosed) {
52
this.pipeWrite.write(`${message}\0`);
53
return true;
54
}
55
return false;
56
}
57
58
close(): void {
59
if (this.isClosed) return;
60
this.isClosed = true;
61
this.eventSubscriber.close();
62
}
63
64
private emit(message): void {
65
if (this.onMessageFn) setImmediate(this.onMessageFn, message);
66
}
67
68
private onReadClosed(): void {
69
log.info('PipeTransport.Closed');
70
for (const close of this.onCloseFns) close();
71
this.close();
72
}
73
74
private onData(buffer: Buffer): void {
75
let end = buffer.indexOf('\0');
76
if (end === -1) {
77
this.pendingMessage += buffer.toString();
78
return;
79
}
80
const message = this.pendingMessage + buffer.toString(undefined, 0, end);
81
this.emit(message);
82
83
let start = end + 1;
84
end = buffer.indexOf('\0', start);
85
while (end !== -1) {
86
this.emit(buffer.toString(undefined, start, end));
87
start = end + 1;
88
end = buffer.indexOf('\0', start);
89
}
90
this.pendingMessage = buffer.toString(undefined, start);
91
}
92
}
93
94