import EventSubscriber from '@secret-agent/commons/EventSubscriber';
import Log from '@secret-agent/commons/Logger';
import { ChildProcess } from 'child_process';
import IConnectionTransport from '@secret-agent/interfaces/IConnectionTransport';
const { log } = Log(module);
export class PipeTransport implements IConnectionTransport {
pipeWrite: NodeJS.WritableStream;
pendingMessage: string;
eventSubscriber = new EventSubscriber();
isClosed = false;
public onMessageFn: (message: string) => void;
public readonly onCloseFns: (() => void)[] = [];
constructor(childProcess: ChildProcess) {
const { 3: pipeWrite, 4: pipeRead } = childProcess.stdio;
this.pipeWrite = pipeWrite as NodeJS.WritableStream;
this.pipeWrite.on('error', error => {
if (this.isClosed) return;
log.error('PipeTransport.WriteError', { error, sessionId: null });
});
this.pendingMessage = '';
this.eventSubscriber.on(pipeRead, 'data', this.onData.bind(this));
this.eventSubscriber.on(pipeRead, 'close', this.onReadClosed.bind(this));
this.eventSubscriber.on(pipeRead, 'error', error =>
log.error('PipeTransport.ReadError', { error, sessionId: null }),
);
this.eventSubscriber.on(pipeWrite, 'error', error =>
log.error('PipeTransport.WriteError', { error, sessionId: null }),
);
}
send(message: string): boolean {
if (!this.isClosed) {
this.pipeWrite.write(`${message}\0`);
return true;
}
return false;
}
close(): void {
if (this.isClosed) return;
this.isClosed = true;
this.eventSubscriber.close();
}
private emit(message): void {
if (this.onMessageFn) setImmediate(this.onMessageFn, message);
}
private onReadClosed(): void {
log.info('PipeTransport.Closed');
for (const close of this.onCloseFns) close();
this.close();
}
private onData(buffer: Buffer): void {
let end = buffer.indexOf('\0');
if (end === -1) {
this.pendingMessage += buffer.toString();
return;
}
const message = this.pendingMessage + buffer.toString(undefined, 0, end);
this.emit(message);
let start = end + 1;
end = buffer.indexOf('\0', start);
while (end !== -1) {
this.emit(buffer.toString(undefined, start, end));
start = end + 1;
end = buffer.indexOf('\0', start);
}
this.pendingMessage = buffer.toString(undefined, start);
}
}