Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(satp-hermes): shutdown of SATP gateway should assure there are no pending transfers before shutting down #3802

Open
wants to merge 1 commit into
base: satp-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import type {
ILocalLogRepository,
IRemoteLogRepository,
} from "../database/repository/interfaces/repository";
import { GatewayShuttingDownError } from "./gateway-errors";

export interface BLODispatcherOptions {
logger: Logger;
Expand Down Expand Up @@ -65,6 +66,7 @@ export class BLODispatcher {
private defaultRepository: boolean;
private localRepository: ILocalLogRepository;
private remoteRepository: IRemoteLogRepository | undefined;
private isShuttingDown = false;

constructor(public readonly options: BLODispatcherOptions) {
const fnTag = `${BLODispatcher.CLASS_NAME}#constructor()`;
Expand Down Expand Up @@ -198,9 +200,19 @@ export class BLODispatcher {
return executeGetStatus(this.level, req, this.manager);
}

/**
* @notice Transact request handler
* @param req TransactRequest
* @throws GatewayShuttingDownError when the flag isShuttingDown is true
* @returns TransactResponse
*/
public async Transact(req: TransactRequest): Promise<TransactResponse> {
//TODO pre-verify verify input
const fnTag = `${BLODispatcher.CLASS_NAME}#transact()`;
this.logger.info(`Transact request: ${req}`);
if (this.isShuttingDown) {
throw new GatewayShuttingDownError(fnTag);
}
const res = await executeTransact(
this.level,
req,
Expand All @@ -215,6 +227,19 @@ export class BLODispatcher {
const res = Array.from(await this.manager.getSessions().keys());
return res;
}

public async getManager(): Promise<SATPManager> {
this.logger.info(`Get SATP Manager request`);
return this.manager;
}

/**
* Changes the isShuttingDown flag to true, stopping all new requests
*/
public setInitiateShutdown(): void {
this.logger.info(`Stopping requests`);
this.isShuttingDown = true;
}
// get channel by caller; give needed client from orchestrator to handler to call
// for all channels, find session id on request
// TODO implement handlers GetAudit, Transact, Cancel, Routes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { asError } from "@hyperledger/cactus-common";
import { RuntimeError } from "run-time-error-cjs";
import { Error as GatewayErrorType } from "../generated/proto/cacti/satp/v02/common/message_pb";
export class GatewayError extends RuntimeError {
protected errorType = GatewayErrorType.UNSPECIFIED;
constructor(
public message: string,
public cause: string | Error | null,
// TODO internal error codes
public code: number = 500,
public traceID?: string,
public trace?: string,
) {
super(message, asError(cause));
this.name = this.constructor.name;
Object.setPrototypeOf(this, new.target.prototype); // make sure prototype chain is set to error
}

public getGatewayErrorType(): GatewayErrorType {
return this.errorType;
}
}

export class GatewayShuttingDownError extends GatewayError {
constructor(tag: string, cause?: string | Error | null) {
super(`${tag}, shutdown initiated not receiving new requests`, cause ?? null, 500);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { v4 as uuidv4 } from "uuid";
import { stringify as safeStableStringify } from "safe-stable-stringify";

import {
Checks,
JsObjectSigner,
LogLevelDesc,
Logger,
LoggerProvider,
} from "@hyperledger/cactus-common";

import {
Type,
MessageStagesHashesSchema,
Expand Down Expand Up @@ -55,6 +63,7 @@ import { create } from "@bufbuild/protobuf";

// Define interface on protos
export interface ISATPSessionOptions {
logLevel?: LogLevelDesc;
contextID: string;
sessionID?: string;
server: boolean;
Expand All @@ -65,8 +74,16 @@ export class SATPSession {
public static readonly CLASS_NAME = "SATPSession";
private clientSessionData: SessionData | undefined;
private serverSessionData: SessionData | undefined;
private readonly logger: Logger;

constructor(ops: ISATPSessionOptions) {
const fnTag = `${SATPSession.CLASS_NAME}#constructor()`;
Checks.truthy(ops, `${fnTag} arg options`);

const level = ops.logLevel || "DEBUG";
const label = this.className;
this.logger = LoggerProvider.getOrCreate({ level, label });

if (!ops.server && !ops.client) {
throw new Error(`${SATPSession.CLASS_NAME}#constructor(), at least one of server or client must be true
`);
Expand Down Expand Up @@ -135,6 +152,10 @@ export class SATPSession {
return this.serverSessionData;
}

public get className(): string {
return SATPSession.CLASS_NAME;
}

public getClientSessionData(): SessionData {
if (this.clientSessionData == undefined) {
throw new Error(
Expand Down Expand Up @@ -219,11 +240,17 @@ export class SATPSession {
}

public getSessionId(): string {
console.log("serverSessionId: ", this.serverSessionData?.id);
console.log("clientSessionId: ", this.clientSessionData?.id);
this.logger.info("serverSessionId: ", this.serverSessionData?.state);
this.logger.info("clientSessionId: ", this.clientSessionData?.state);
return this.serverSessionData?.id || this.clientSessionData?.id || "";
}

public getSessionState(): State {
this.logger.info("serverSessionId: ", this.serverSessionData?.state);
this.logger.info("clientSessionId: ", this.clientSessionData?.state);
return this.serverSessionData?.state || this.clientSessionData?.state || State.UNSPECIFIED;
}

public verify(
tag: string,
type: SessionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import cors from "cors";
import * as OAS from "../json/openapi-blo-bundled.json";
import type { NetworkId } from "./services/network-identification/chainid-list";
import { knexLocalInstance } from "./database/knexfile";
import schedule, { Job } from "node-schedule";

export class SATPGateway implements IPluginWebService, ICactusPlugin {
@IsDefined()
Expand Down Expand Up @@ -104,6 +105,8 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
public remoteRepository?: IRemoteLogRepository;
private readonly shutdownHooks: ShutdownHook[];
private crashManager?: CrashManager;
private sessionVerificationJob: Job | null = null;


constructor(public readonly options: SATPGatewayConfig) {
const fnTag = `${this.className}#constructor()`;
Expand Down Expand Up @@ -583,7 +586,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
public async shutdown(): Promise<void> {
const fnTag = `${this.className}#getGatewaySeeds()`;
this.logger.debug(`Entering ${fnTag}`);

this.logger.info("Shutting down Node server - BOL");
await this.shutdownBLOServer();
await this.shutdownGOLServer();
Expand Down Expand Up @@ -618,6 +621,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
this.logger.debug(`Entering ${fnTag}`);
if (this.BLOServer) {
try {
await this.verifySessionsState();
await this.BLOServer.closeAllConnections();
await this.BLOServer.close();
this.BLOServer = undefined;
Expand Down Expand Up @@ -649,4 +653,51 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
this.logger.warn("Server is not running.");
}
}
}

/**
* Verify the state of the sessions before shutting down the server.
* This method is called before the server is shut down and awaits ensure that
* all sessions are concluded before the server is terminated.
* After all sessions are concluded, the job is cancelled.
*/
private async verifySessionsState(): Promise<void> {
const fnTag = `${this.className}#verifySessionsState()`;
this.logger.trace(`Entering ${fnTag}`);
if (!this.BLODispatcher) {
throw new Error(`Cannot ${fnTag} because BLODispatcher is erroneous`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this to a Runtime Error

}
this.BLODispatcher.setInitiateShutdown();
const manager = await this.BLODispatcher.getManager();

await this.startSessionVerificationJob(manager);
this.logger.info("Session verification process started.");
}


/**
* Start a scheduled job to verify session states.
* The job runs every 20 seconds until all sessions are concluded.
*/
private async startSessionVerificationJob(manager: any): Promise<void> {
const fnTag = `${this.className}#startSessionVerificationJob()`;
this.logger.trace(`Entering ${fnTag}`);

this.sessionVerificationJob = schedule.scheduleJob("*/20 * * * * *", async () => {
try {
const status = await manager.getSATPSessionState();
if (!status) {
this.logger.info("Sessions are still pending");
} else {
this.logger.info("All sessions are concluded");
if (this.sessionVerificationJob) {
this.sessionVerificationJob.cancel(); // Stop the job once sessions are concluded
this.logger.info("Session verification job stopped.");
}
}
} catch (error) {
this.logger.error(`Error in session verification job: ${error}`);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import { Stage2SATPHandler } from "../../core/stage-handlers/stage2-handler";
import { Stage3SATPHandler } from "../../core/stage-handlers/stage3-handler";
import { SATPCrossChainManager } from "../../cross-chain-mechanisms/satp-cc-manager";
import { GatewayOrchestrator } from "./gateway-orchestrator";
import { State } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import type { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import type { SatpStage0Service } from "../../generated/proto/cacti/satp/v02/service/stage_0_pb";
import type { SatpStage1Service } from "../../generated/proto/cacti/satp/v02/service/stage_1_pb";
Expand Down Expand Up @@ -310,6 +311,21 @@ export class SATPManager {
return this.satpHandlers.get(type);
}

/*
* Function checks if all the sessions are in the completed state
* @returns boolean
*/
public getSATPSessionState(): boolean {
const fnTag = `${SATPManager.CLASS_NAME}#getSATPSessionStatus()`;
this.logger.info(`${fnTag}, Getting SATP Session Status...`);
for (let value of this.sessions.values()) {
if (value.getSessionState() !== State.COMPLETED) {
return false;
}
}
return true;
}

public getOrCreateSession(
sessionId?: string,
contextID?: string,
Expand Down
Loading
Loading