Path: blob/main/client/connections/ConnectionToCore.ts
2605 views
import ICoreRequestPayload from '@secret-agent/interfaces/ICoreRequestPayload';1import ICoreEventPayload from '@secret-agent/interfaces/ICoreEventPayload';2import ICoreResponsePayload from '@secret-agent/interfaces/ICoreResponsePayload';3import { bindFunctions, createPromise } from '@secret-agent/commons/utils';4import IResolvablePromise from '@secret-agent/interfaces/IResolvablePromise';5import Log from '@secret-agent/commons/Logger';6import ISessionCreateOptions from '@secret-agent/interfaces/ISessionCreateOptions';7import ISessionMeta from '@secret-agent/interfaces/ISessionMeta';8import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';9import ICoreConfigureOptions from '@secret-agent/interfaces/ICoreConfigureOptions';10import { TypedEventEmitter } from '@secret-agent/commons/eventUtils';11import SessionClosedOrMissingError from '@secret-agent/commons/SessionClosedOrMissingError';12import Resolvable from '@secret-agent/commons/Resolvable';13import ICoreConnectionEventPayload from '@secret-agent/interfaces/ICoreConnectionEventPayload';14import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';15import CoreCommandQueue from '../lib/CoreCommandQueue';16import CoreSession from '../lib/CoreSession';17import { IAgentCreateOptions } from '../index';18import Agent from '../lib/Agent';19import CoreSessions from '../lib/CoreSessions';20import DisconnectedFromCoreError from './DisconnectedFromCoreError';2122const { log } = Log(module);2324export default abstract class ConnectionToCore extends TypedEventEmitter<{25disconnected: void;26connected: void;27}> {28public readonly commandQueue: CoreCommandQueue;29public readonly hostOrError: Promise<string | Error>;30public options: IConnectionToCoreOptions;31public isDisconnecting = false;32public isConnectionTerminated = false;3334protected resolvedHost: string;3536private connectPromise: IResolvablePromise<Error | null>;37private get connectOptions(): ICoreConfigureOptions & { isPersistent: boolean } {38return {39coreServerPort: this.options.coreServerPort,40localProxyPortStart: this.options.localProxyPortStart,41sessionsDir: this.options.sessionsDir,42isPersistent: this.options.isPersistent,43};44}4546private connectRequestId: string;47private disconnectRequestId: string;48private didAutoConnect = false;49private coreSessions: CoreSessions;50private readonly pendingRequestsById = new Map<string, IResolvablePromiseWithId>();51private lastId = 0;5253constructor(options?: IConnectionToCoreOptions) {54super();55this.options = options ?? { isPersistent: true };56this.commandQueue = new CoreCommandQueue(null, this, null);57this.coreSessions = new CoreSessions(58this.options.maxConcurrency,59this.options.agentTimeoutMillis,60);6162if (this.options.host) {63this.hostOrError = Promise.resolve(this.options.host)64.then(x => {65if (!x.includes('://')) {66return `ws://${x}`;67}68return x;69})70.then(x => {71this.resolvedHost = x;72return this.resolvedHost;73})74.catch(err => err);75} else {76this.hostOrError = Promise.resolve(new Error('No host provided'));77}78bindFunctions(this);79}8081protected abstract internalSendRequest(payload: ICoreRequestPayload): Promise<void>;82protected abstract createConnection(): Promise<Error | null>;83protected abstract destroyConnection(): Promise<any>;8485public async connect(isAutoConnect = false): Promise<Error | null> {86if (!this.connectPromise) {87this.didAutoConnect = isAutoConnect;88this.connectPromise = new Resolvable();89try {90const startTime = new Date();91const connectError = await this.createConnection();92if (connectError) throw connectError;93if (this.isDisconnecting) {94if (this.coreSessions.size > 0 && !this.didAutoConnect) {95throw new DisconnectedFromCoreError(this.resolvedHost);96}97this.connectPromise.resolve();98}99// can be resolved if canceled by a disconnect100if (this.connectPromise.isResolved) return;101102const connectResult = await this.internalSendRequestAndWait({103startDate: startTime,104command: 'Core.connect',105args: [this.connectOptions],106});107if (connectResult?.data) {108const { maxConcurrency } = connectResult.data;109if (110maxConcurrency &&111(!this.options.maxConcurrency || maxConcurrency < this.options.maxConcurrency)112) {113log.info('Overriding max concurrency with Core value', {114maxConcurrency,115sessionId: null,116});117this.coreSessions.concurrency = maxConcurrency;118this.options.maxConcurrency = maxConcurrency;119}120}121this.emit('connected');122} catch (err) {123this.connectPromise.resolve(err);124} finally {125if (!this.connectPromise.isResolved) this.connectPromise.resolve();126}127}128129return this.connectPromise.promise;130}131132public async disconnect(fatalError?: Error): Promise<void> {133// user triggered disconnect sends a disconnect to Core134const startTime = new Date();135await this.internalDisconnect(fatalError, async () => {136try {137await this.internalSendRequestAndWait(138{139command: 'Core.disconnect',140startDate: startTime,141args: [fatalError],142},1432e3,144);145} catch (error) {146// don't do anything147}148});149}150151public willDisconnect(): void {152this.coreSessions.willStop();153this.commandQueue.willStop();154}155/////// PIPE FUNCTIONS /////////////////////////////////////////////////////////////////////////////////////////////156157public async sendRequest(158payload: Omit<ICoreRequestPayload, 'messageId' | 'sendDate'>,159): Promise<ICoreResponsePayload> {160const result = await this.connect();161if (result) throw result;162163return this.internalSendRequestAndWait(payload);164}165166public onMessage(167payload: ICoreResponsePayload | ICoreEventPayload | ICoreConnectionEventPayload,168): void {169if ((payload as ICoreConnectionEventPayload).disconnecting) {170this.willDisconnect();171} else if ((payload as ICoreResponsePayload).responseId) {172payload = payload as ICoreResponsePayload;173this.onResponse(payload.responseId, payload);174} else if ((payload as ICoreEventPayload).listenerId) {175this.onEvent(payload as ICoreEventPayload);176} else {177throw new Error(`message could not be processed: ${JSON.stringify(payload)}`);178}179}180/////// SESSION FUNCTIONS //////////////////////////////////////////////////////////////////////////////////////////181182public useAgent(183options: IAgentCreateOptions,184callbackFn: (agent: Agent) => Promise<any>,185): Promise<void> {186// just kick off187this.connect().catch(() => null);188return this.coreSessions.waitForAvailable(() => {189const agent = new Agent({190...options,191connectionToCore: this,192});193194return callbackFn(agent);195});196}197198public canCreateSessionNow(): boolean {199return this.isDisconnecting === false && this.coreSessions.hasAvailability();200}201202public async createSession(options: ISessionCreateOptions): Promise<CoreSession> {203try {204const sessionMeta = await this.commandQueue.run<ISessionMeta>('Session.create', options);205const session = new CoreSession({ ...sessionMeta, sessionName: options.sessionName }, this);206this.coreSessions.track(session);207return session;208} catch (error) {209if (error instanceof DisconnectedFromCoreError && this.isDisconnecting) return null;210throw error;211}212}213214public getSession(sessionId: string): CoreSession {215return this.coreSessions.get(sessionId);216}217218public closeSession(coreSession: CoreSession): void {219this.coreSessions.untrack(coreSession.sessionId);220}221222public async logUnhandledError(error: Error): Promise<void> {223await this.commandQueue.run('Core.logUnhandledError', error);224}225226protected async internalDisconnect(227fatalError?: Error,228beforeClose?: () => Promise<any>,229): Promise<void> {230if (this.isDisconnecting) return;231this.isDisconnecting = true;232const logid = log.stats('ConnectionToCore.Disconnecting', {233host: this.hostOrError,234sessionId: null,235});236237const hasSessions = this.coreSessions?.size > 0;238239this.cancelPendingRequests();240241if (this.connectPromise) {242if (!this.connectPromise.isResolved && hasSessions && !this.didAutoConnect) {243this.connectPromise.resolve(new DisconnectedFromCoreError(this.resolvedHost));244} else if (beforeClose) {245await beforeClose();246}247}248await this.destroyConnection();249log.stats('ConnectionToCore.Disconnected', {250parentLogId: logid,251host: this.hostOrError,252sessionId: null,253});254255this.emit('disconnected');256}257258protected async internalSendRequestAndWait(259payload: Omit<ICoreRequestPayload, 'messageId' | 'sendDate'>,260timeoutMs?: number,261): Promise<ICoreResponsePayload> {262const { promise, id, resolve } = this.createPendingResult();263const { command } = payload;264265if (command === 'Core.connect') this.connectRequestId = id;266if (command === 'Core.disconnect') this.disconnectRequestId = id;267268let timeout: NodeJS.Timeout;269if (timeoutMs) timeout = setTimeout(() => resolve(null), timeoutMs).unref();270try {271await this.internalSendRequest({272messageId: id,273sendDate: new Date(),274...payload,275});276} catch (error) {277clearTimeout(timeout);278if (error instanceof CanceledPromiseError) {279this.pendingRequestsById.delete(id);280return;281}282throw error;283}284285// now run to completion with timeout286try {287return await promise;288} finally {289clearTimeout(timeout);290}291}292293protected onEvent(payload: ICoreEventPayload): void {294const { meta, listenerId, eventArgs } = payload as ICoreEventPayload;295const session = this.getSession(meta.sessionId);296session?.onEvent(meta, listenerId, eventArgs);297}298299protected async onConnectionTerminated(): Promise<void> {300if (this.isConnectionTerminated) return;301302this.isConnectionTerminated = true;303await this.internalDisconnect();304if (this.connectRequestId) {305this.onResponse(this.connectRequestId, {306data: this.didAutoConnect ? new DisconnectedFromCoreError(this.resolvedHost) : null,307});308}309if (this.disconnectRequestId) {310this.onResponse(this.disconnectRequestId, {311data: null,312});313}314}315316protected onResponse(id: string, message: ICoreResponsePayload): void {317const pending = this.pendingRequestsById.get(id);318if (!pending) return;319this.pendingRequestsById.delete(id);320const isInternalRequest = this.connectRequestId === id || this.disconnectRequestId === id;321if (this.disconnectRequestId === id) this.disconnectRequestId = null;322if (this.connectRequestId === id) this.connectRequestId = null;323324if (message.data instanceof Error) {325let responseError = message.data;326const isDisconnected =327this.isDisconnecting ||328responseError.name === SessionClosedOrMissingError.name ||329(responseError as any).isDisconnecting === true;330delete (responseError as any).isDisconnecting;331332if (!isInternalRequest && isDisconnected) {333responseError = new DisconnectedFromCoreError(this.resolvedHost);334}335this.rejectPendingRequest(pending, responseError);336} else {337pending.resolve({ data: message.data });338}339}340341protected cancelPendingRequests(): void {342const host = String(this.resolvedHost);343for (const entry of this.pendingRequestsById.values()) {344const id = entry.id;345if (this.connectRequestId === id || this.disconnectRequestId === id) {346continue;347}348this.pendingRequestsById.delete(id);349this.rejectPendingRequest(entry, new DisconnectedFromCoreError(host));350}351this.commandQueue.stop(new DisconnectedFromCoreError(host));352this.coreSessions.stop(new DisconnectedFromCoreError(host));353}354355private createPendingResult(): IResolvablePromiseWithId {356const resolvablePromise = createPromise<ICoreResponsePayload>() as IResolvablePromiseWithId;357this.lastId += 1;358const id = this.lastId.toString();359resolvablePromise.id = id;360this.pendingRequestsById.set(id, resolvablePromise);361return this.pendingRequestsById.get(id);362}363364private rejectPendingRequest(pending: IResolvablePromiseWithId, error: Error): void {365error.stack += `\n${'------CONNECTION'.padEnd(50, '-')}\n${pending.stack}`;366pending.reject(error);367}368}369370interface IResolvablePromiseWithId extends IResolvablePromise<ICoreResponsePayload> {371id: string;372}373374375