diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/dispatcher.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/dispatcher.ts index dcd1e8d501..9f3d769754 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/dispatcher.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/dispatcher.ts @@ -36,6 +36,7 @@ import type { ILocalLogRepository, IRemoteLogRepository, } from "../database/repository/interfaces/repository"; +import { GatewayShuttingDownError } from "./gateway-errors"; export interface BLODispatcherOptions { logger: Logger; @@ -65,6 +66,7 @@ export class BLODispatcher { private defaultRepository: boolean; private localRepository: ILocalLogRepository; private remoteRepository: IRemoteLogRepository | undefined; + private isShuttingDown = false; constructor(public readonly options: BLODispatcherOptions) { const fnTag = `${BLODispatcher.CLASS_NAME}#constructor()`; @@ -198,9 +200,19 @@ export class BLODispatcher { return executeGetStatus(this.level, req, this.manager); } + /** + * @notice Transact request handler + * @param req TransactRequest + * @throws GatewayShuttingDownError when the flag isShuttingDown is true + * @returns TransactResponse + */ public async Transact(req: TransactRequest): Promise<TransactResponse> { //TODO pre-verify verify input + const fnTag = `${BLODispatcher.CLASS_NAME}#transact()`; this.logger.info(`Transact request: ${req}`); + if (this.isShuttingDown) { + throw new GatewayShuttingDownError(fnTag); + } const res = await executeTransact( this.level, req, @@ -215,6 +227,19 @@ export class BLODispatcher { const res = Array.from(await this.manager.getSessions().keys()); return res; } + + public async getManager(): Promise<SATPManager> { + this.logger.info(`Get SATP Manager request`); + return this.manager; + } + + /** + * Changes the isShuttingDown flag to true, stopping all new requests + */ + public setInitiateShutdown(): void { + this.logger.info(`Stopping requests`); + this.isShuttingDown = true; + } // get channel by caller; give needed client from orchestrator to handler to call // for all channels, find session id on request // TODO implement handlers GetAudit, Transact, Cancel, Routes diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/gateway-errors.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/gateway-errors.ts new file mode 100644 index 0000000000..ca7c3d27e8 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/api1/gateway-errors.ts @@ -0,0 +1,28 @@ +import { asError } from "@hyperledger/cactus-common"; +import { RuntimeError } from "run-time-error-cjs"; +import { Error as GatewayErrorType } from "../generated/proto/cacti/satp/v02/common/message_pb"; +export class GatewayError extends RuntimeError { + protected errorType = GatewayErrorType.UNSPECIFIED; + constructor( + public message: string, + public cause: string | Error | null, + // TODO internal error codes + public code: number = 500, + public traceID?: string, + public trace?: string, + ) { + super(message, asError(cause)); + this.name = this.constructor.name; + Object.setPrototypeOf(this, new.target.prototype); // make sure prototype chain is set to error + } + + public getGatewayErrorType(): GatewayErrorType { + return this.errorType; + } +} + +export class GatewayShuttingDownError extends GatewayError { + constructor(tag: string, cause?: string | Error | null) { + super(`${tag}, shutdown initiated not receiving new requests`, cause ?? null, 500); + } +} \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/errors/satp-errors.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/errors/satp-errors.ts index 1eeecaf7a8..c23fe9647c 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/errors/satp-errors.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/errors/satp-errors.ts @@ -99,5 +99,11 @@ export class RecoverMessageError extends SATPInternalError { super(`${tag}, failed to recover message: ${message}`, cause ?? null, 500); } } + +export class BLODispatcherErraneousError extends SATPInternalError { + constructor(tag: string, cause?: string | Error | null) { + super(`${tag}, failed because BLODispatcher is erroneous`, cause ?? null, 500); + } +} // TODO client-facing error logic, maps SATPInternalErrors to user friendly errors export class SATPError extends Error {} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts index db40d5e228..2ffbd08a93 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts @@ -1,6 +1,14 @@ import { v4 as uuidv4 } from "uuid"; import { stringify as safeStableStringify } from "safe-stable-stringify"; +import { + Checks, + JsObjectSigner, + LogLevelDesc, + Logger, + LoggerProvider, +} from "@hyperledger/cactus-common"; + import { Type, MessageStagesHashesSchema, @@ -55,6 +63,7 @@ import { create } from "@bufbuild/protobuf"; // Define interface on protos export interface ISATPSessionOptions { + logLevel?: LogLevelDesc; contextID: string; sessionID?: string; server: boolean; @@ -65,8 +74,16 @@ export class SATPSession { public static readonly CLASS_NAME = "SATPSession"; private clientSessionData: SessionData | undefined; private serverSessionData: SessionData | undefined; + private readonly logger: Logger; constructor(ops: ISATPSessionOptions) { + const fnTag = `${SATPSession.CLASS_NAME}#constructor()`; + Checks.truthy(ops, `${fnTag} arg options`); + + const level = ops.logLevel || "DEBUG"; + const label = this.className; + this.logger = LoggerProvider.getOrCreate({ level, label }); + if (!ops.server && !ops.client) { throw new Error(`${SATPSession.CLASS_NAME}#constructor(), at least one of server or client must be true `); @@ -135,6 +152,10 @@ export class SATPSession { return this.serverSessionData; } + public get className(): string { + return SATPSession.CLASS_NAME; + } + public getClientSessionData(): SessionData { if (this.clientSessionData == undefined) { throw new Error( @@ -219,11 +240,17 @@ export class SATPSession { } public getSessionId(): string { - console.log("serverSessionId: ", this.serverSessionData?.id); - console.log("clientSessionId: ", this.clientSessionData?.id); + this.logger.info("serverSessionId: ", this.serverSessionData?.state); + this.logger.info("clientSessionId: ", this.clientSessionData?.state); return this.serverSessionData?.id || this.clientSessionData?.id || ""; } + public getSessionState(): State { + this.logger.info("serverSessionId: ", this.serverSessionData?.state); + this.logger.info("clientSessionId: ", this.clientSessionData?.state); + return this.serverSessionData?.state || this.clientSessionData?.state || State.UNSPECIFIED; + } + public verify( tag: string, type: SessionType, diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts index 5501b7ffb4..50940f7b75 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts @@ -65,6 +65,8 @@ import cors from "cors"; import * as OAS from "../json/openapi-blo-bundled.json"; import type { NetworkId } from "./services/network-identification/chainid-list"; import { knexLocalInstance } from "./database/knexfile"; +import schedule, { Job } from "node-schedule"; +import { BLODispatcherErraneousError } from "./core/errors/satp-errors"; export class SATPGateway implements IPluginWebService, ICactusPlugin { @IsDefined() @@ -104,6 +106,9 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { public remoteRepository?: IRemoteLogRepository; private readonly shutdownHooks: ShutdownHook[]; private crashManager?: CrashManager; + private sessionVerificationJob: Job | null = null; + private activeJobs: Set<schedule.Job> = new Set(); + constructor(public readonly options: SATPGatewayConfig) { const fnTag = `${this.className}#constructor()`; @@ -262,7 +267,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { const fnTag = `${this.className}#getOrCreateWebServices()`; this.logger.trace(`Entering ${fnTag}`); if (!this.BLODispatcher) { - throw new Error(`Cannot ${fnTag} because BLODispatcher is erroneous`); + throw new BLODispatcherErraneousError(fnTag); } let webServices = await this.BLODispatcher?.getOrCreateWebServices(); if (this.OAPIServerEnabled) { @@ -454,7 +459,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { if (!this.BLOApplication || !this.BLOServer) { if (!this.BLODispatcher) { - throw new Error("BLODispatcher is not defined"); + throw new BLODispatcherErraneousError(fnTag); } this.BLOApplication = express(); this.BLOApplication.use(bodyParser.json({ limit: "250mb" })); @@ -583,7 +588,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { public async shutdown(): Promise<void> { const fnTag = `${this.className}#getGatewaySeeds()`; this.logger.debug(`Entering ${fnTag}`); - + this.logger.info("Shutting down Node server - BOL"); await this.shutdownBLOServer(); await this.shutdownGOLServer(); @@ -618,6 +623,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { this.logger.debug(`Entering ${fnTag}`); if (this.BLOServer) { try { + await this.verifySessionsState(); await this.BLOServer.closeAllConnections(); await this.BLOServer.close(); this.BLOServer = undefined; @@ -649,4 +655,76 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { this.logger.warn("Server is not running."); } } -} + + /** + * Verify the state of the sessions before shutting down the server. + * This method is called before the server is shut down and awaits ensure that + * all sessions are concluded before the server is terminated. + * After all sessions are concluded, the job is cancelled. + */ + private async verifySessionsState(): Promise<void> { + const fnTag = `${this.className}#verifySessionsState()`; + this.logger.trace(`Entering ${fnTag}`); + if (!this.BLODispatcher) { + throw new BLODispatcherErraneousError(fnTag); + } + this.BLODispatcher.setInitiateShutdown(); + const manager = await this.BLODispatcher.getManager(); + + await this.startSessionVerificationJob(manager); + } + + /** + * Verifies if the sessions are concluded before shutting down the server. + * If they aren't starts a scheduled job to verify session states. + * The job runs every 20 seconds until all sessions are concluded. + */ + private async startSessionVerificationJob(manager: any): Promise<void> { + return new Promise<void>((resolve) => { + const cleanup = () => { + if (this.sessionVerificationJob) { + this.sessionVerificationJob.cancel(); + this.activeJobs.delete(this.sessionVerificationJob); + this.sessionVerificationJob = null; + } + }; + + const initialCheck = async () => { + try { + const status = await manager.getSATPSessionState(); + if (status) { + this.logger.info("All sessions already concluded"); + cleanup(); + resolve(); + return false; + } + this.logger.info("Initial check: sessions pending"); + } catch (error) { + this.logger.error(`Session check failed: ${error}`); + } + return true; + }; + + initialCheck().then((needsRecurring) => { + if (needsRecurring) { + this.sessionVerificationJob = schedule.scheduleJob( + "*/20 * * * * *", async () => { + try { + const status = await manager.getSATPSessionState(); + if (status) { + this.logger.info("All sessions concluded"); + cleanup(); + resolve(); + } else { + this.logger.info("Sessions still pending"); + } + } catch (error) { + this.logger.error(`Session check failed: ${error}`); + } + }); + this.activeJobs.add(this.sessionVerificationJob); + } + }); + }); + } +} \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/services/gateway/satp-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/services/gateway/satp-manager.ts index 8a19a9abd2..2fa7974e6b 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/services/gateway/satp-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/services/gateway/satp-manager.ts @@ -37,6 +37,7 @@ import { Stage2SATPHandler } from "../../core/stage-handlers/stage2-handler"; import { Stage3SATPHandler } from "../../core/stage-handlers/stage3-handler"; import { SATPCrossChainManager } from "../../cross-chain-mechanisms/satp-cc-manager"; import { GatewayOrchestrator } from "./gateway-orchestrator"; +import { State } from "../../generated/proto/cacti/satp/v02/common/session_pb"; import type { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; import type { SatpStage0Service } from "../../generated/proto/cacti/satp/v02/service/stage_0_pb"; import type { SatpStage1Service } from "../../generated/proto/cacti/satp/v02/service/stage_1_pb"; @@ -310,6 +311,21 @@ export class SATPManager { return this.satpHandlers.get(type); } + /* + * Function checks if all the sessions are in the completed state + * @returns boolean + */ + public getSATPSessionState(): boolean { + const fnTag = `${SATPManager.CLASS_NAME}#getSATPSessionStatus()`; + this.logger.info(`${fnTag}, Getting SATP Session Status...`); + for (let value of this.sessions.values()) { + if (value.getSessionState() !== State.COMPLETED) { + return false; + } + } + return true; + } + public getOrCreateSession( sessionId?: string, contextID?: string, diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/shutdown-state.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/shutdown-state.test.ts new file mode 100644 index 0000000000..fc011cab4f --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/shutdown-state.test.ts @@ -0,0 +1,215 @@ +import "jest-extended"; +import { + Containers, + pruneDockerAllIfGithubAction, +} from "@hyperledger/cactus-test-tooling"; +import { + LogLevelDesc, + LoggerProvider, +} from "@hyperledger/cactus-common"; +import { + SATPGateway, + SATPGatewayConfig, +} from "../../../main/typescript/plugin-satp-hermes-gateway"; +import { PluginFactorySATPGateway } from "../../../main/typescript/factory/plugin-factory-gateway-orchestrator"; +import { + IPluginFactoryOptions, + LedgerType, + PluginImportType, +} from "@hyperledger/cactus-core-api"; +import { + SATP_ARCHITECTURE_VERSION, + SATP_CORE_VERSION, + SATP_CRASH_VERSION, +} from "../../../main/typescript/core/constants"; +import { + knexClientConnection, + knexSourceRemoteConnection, +} from "../knex.config"; +import { SATPSession } from "../../../main/typescript/core/satp-session"; +import { + TransactRequest, + TransactRequestSourceAsset + } from "../../../main/typescript"; + +const logLevel: LogLevelDesc = "DEBUG"; +const logger = LoggerProvider.getOrCreate({ + level: logLevel, + label: "satp-gateway-orchestrator-init-test", +}); +const factoryOptions: IPluginFactoryOptions = { + pluginImportType: PluginImportType.Local, +}; +const factory = new PluginFactorySATPGateway(factoryOptions); + +let mockSession: SATPSession; +const sessionIDs: string[] = []; + +beforeAll(async () => { + pruneDockerAllIfGithubAction({ logLevel }) + .then(() => { + logger.info("Pruning throw OK"); + }) + .catch(async () => { + await Containers.logDiagnostics({ logLevel }); + fail("Pruning didn't throw OK"); + }); + mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + sessionIDs.push(mockSession.getSessionId()); +}); + +describe("Shutdown Verify State Tests", () => { + test("Gateway waits to verify the sessions state before shutdown", async () => { + const options: SATPGatewayConfig = { + gid: { + id: "mockID", + name: "CustomGateway", + version: [ + { + Core: SATP_CORE_VERSION, + Architecture: SATP_ARCHITECTURE_VERSION, + Crash: SATP_CRASH_VERSION, + }, + ], + connectedDLTs: [], + proofID: "mockProofID10", + gatewayServerPort: 3014, + gatewayClientPort: 3015, + address: "https://localhost", + }, + knexLocalConfig: knexClientConnection, + knexRemoteConfig: knexSourceRemoteConnection, + }; + + const gateway = await factory.create(options); + expect(gateway).toBeInstanceOf(SATPGateway); + + const verifySessionsStateSpy = jest.spyOn(gateway as any, "verifySessionsState"); + + const shutdownBLOServerSpy = jest.spyOn(gateway as any, "shutdownBLOServer"); + + await gateway.startup(); + await gateway.shutdown(); + + expect(verifySessionsStateSpy).toHaveBeenCalled(); + + expect(shutdownBLOServerSpy).toHaveBeenCalled(); + + verifySessionsStateSpy.mockRestore(); + shutdownBLOServerSpy.mockRestore(); + }); + + test("Gateway waits for pending sessions to complete before shutdown", async () => { + const options: SATPGatewayConfig = { + gid: { + id: "mockID", + name: "CustomGateway", + version: [ + { + Core: SATP_CORE_VERSION, + Architecture: SATP_ARCHITECTURE_VERSION, + Crash: SATP_CRASH_VERSION, + }, + ], + connectedDLTs: [], + proofID: "mockProofID10", + gatewayServerPort: 3014, + gatewayClientPort: 3015, + address: "https://localhost", + }, + knexLocalConfig: knexClientConnection, + knexRemoteConfig: knexSourceRemoteConnection, + }; + + const gateway = await factory.create(options); + expect(gateway).toBeInstanceOf(SATPGateway); + + const satpManager = (gateway as any).BLODispatcher.manager; + + let sessionState = false; + satpManager.getSessions().set(mockSession.getSessionId(), mockSession); + + await gateway.startup(); + + const shutdownPromise = gateway.shutdown(); + + const initialSessionState = await satpManager.getSATPSessionState(); + expect(initialSessionState).toBe(false); + + const getSATPSessionStateSpy = jest + .spyOn(satpManager, "getSATPSessionState") + .mockImplementation(async () => { + if (!sessionState) { + await new Promise((resolve) => setTimeout(resolve, 20000)); + sessionState = true; + } + return sessionState; + }); + + await shutdownPromise; + + const finalSessionState = await satpManager.getSATPSessionState(); + expect(finalSessionState).toBe(true); + + getSATPSessionStateSpy.mockRestore(); + + }); + + test("Gateway does not allow new transactions after shutdown is initiated", async () => { + const options: SATPGatewayConfig = { + gid: { + id: "mockID", + name: "CustomGateway", + version: [ + { + Core: SATP_CORE_VERSION, + Architecture: SATP_ARCHITECTURE_VERSION, + Crash: SATP_CRASH_VERSION, + }, + ], + connectedDLTs: [], + proofID: "mockProofID10", + gatewayServerPort: 3014, + gatewayClientPort: 3015, + address: "https://localhost", + }, + knexLocalConfig: knexClientConnection, + knexRemoteConfig: knexSourceRemoteConnection, + }; + + const gateway = await factory.create(options); + expect(gateway).toBeInstanceOf(SATPGateway); + + await gateway.startup(); + + const shutdownPromise = gateway.shutdown(); + + const transactRequestSourceAsset: TransactRequestSourceAsset = { + owner: "mockOwner", + ontology: "mockOntology", + contractName: "mockContractName", + }; + + const transactRequest: TransactRequest = { + contextID: "mockContextID", + fromDLTNetworkID: "mockFromDLTNetworkID", + toDLTNetworkID: "mockToDLTNetworkID", + fromAmount: "100", + toAmount: "100", + beneficiaryPubkey: "mockBeneficiaryPubkey", + originatorPubkey: "mockOriginatorPubkey", + sourceAsset: transactRequestSourceAsset, + receiverAsset: transactRequestSourceAsset, + }; + + await expect(gateway.BLODispatcherInstance?.Transact(transactRequest)).rejects.toThrow("BLODispatcher#transact(), shutdown initiated not receiving new requests"); + + await shutdownPromise; + }); +}); +