Skip to content

Commit 0c911a2

Browse files
committed
fix(satp-hermes): shutdown of satp gateway
Signed-off-by: Joao Pereira <[email protected]>
1 parent c029d2e commit 0c911a2

File tree

7 files changed

+314
-2
lines changed

7 files changed

+314
-2
lines changed

packages/cactus-plugin-satp-hermes/src/main/typescript/api1/dispatcher.ts

+25
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import type {
3636
ILocalLogRepository,
3737
IRemoteLogRepository,
3838
} from "../database/repository/interfaces/repository";
39+
import { GatewayShuttingDownError } from "./gateway-errors";
3940

4041
export interface BLODispatcherOptions {
4142
logger: Logger;
@@ -65,6 +66,7 @@ export class BLODispatcher {
6566
private defaultRepository: boolean;
6667
private localRepository: ILocalLogRepository;
6768
private remoteRepository: IRemoteLogRepository | undefined;
69+
private isShuttingDown = false;
6870

6971
constructor(public readonly options: BLODispatcherOptions) {
7072
const fnTag = `${BLODispatcher.CLASS_NAME}#constructor()`;
@@ -198,9 +200,19 @@ export class BLODispatcher {
198200
return executeGetStatus(this.level, req, this.manager);
199201
}
200202

203+
/**
204+
* @notice Transact request handler
205+
* @param req TransactRequest
206+
* @throws GatewayShuttingDownError when the flag isShuttingDown is true
207+
* @returns TransactResponse
208+
*/
201209
public async Transact(req: TransactRequest): Promise<TransactResponse> {
202210
//TODO pre-verify verify input
211+
const fnTag = `${BLODispatcher.CLASS_NAME}#transact()`;
203212
this.logger.info(`Transact request: ${req}`);
213+
if (this.isShuttingDown) {
214+
throw new GatewayShuttingDownError(fnTag);
215+
}
204216
const res = await executeTransact(
205217
this.level,
206218
req,
@@ -215,6 +227,19 @@ export class BLODispatcher {
215227
const res = Array.from(await this.manager.getSessions().keys());
216228
return res;
217229
}
230+
231+
public async getManager(): Promise<SATPManager> {
232+
this.logger.info(`Get SATP Manager request`);
233+
return this.manager;
234+
}
235+
236+
/**
237+
* Changes the isShuttingDown flag to true, stopping all new requests
238+
*/
239+
public setInitiateShutdown(): void {
240+
this.logger.info(`Stopping requests`);
241+
this.isShuttingDown = true;
242+
}
218243
// get channel by caller; give needed client from orchestrator to handler to call
219244
// for all channels, find session id on request
220245
// TODO implement handlers GetAudit, Transact, Cancel, Routes
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { asError } from "@hyperledger/cactus-common";
2+
import { RuntimeError } from "run-time-error-cjs";
3+
import { Error as GatewayErrorType } from "../generated/proto/cacti/satp/v02/common/message_pb";
4+
export class GatewayError extends RuntimeError {
5+
protected errorType = GatewayErrorType.UNSPECIFIED;
6+
constructor(
7+
public message: string,
8+
public cause: string | Error | null,
9+
// TODO internal error codes
10+
public code: number = 500,
11+
public traceID?: string,
12+
public trace?: string,
13+
) {
14+
super(message, asError(cause));
15+
this.name = this.constructor.name;
16+
Object.setPrototypeOf(this, new.target.prototype); // make sure prototype chain is set to error
17+
}
18+
19+
public getGatewayErrorType(): GatewayErrorType {
20+
return this.errorType;
21+
}
22+
}
23+
24+
export class GatewayShuttingDownError extends GatewayError {
25+
constructor(tag: string, cause?: string | Error | null) {
26+
super(`${tag}, shutdown initiated not receiving new requests`, cause ?? null, 500);
27+
}
28+
}

packages/cactus-plugin-satp-hermes/src/main/typescript/core/satp-session.ts

+6
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,12 @@ export class SATPSession {
224224
return this.serverSessionData?.id || this.clientSessionData?.id || "";
225225
}
226226

227+
public getSessionState(): State {
228+
console.log("serverSessionId: ", this.serverSessionData?.state);
229+
console.log("clientSessionId: ", this.clientSessionData?.state);
230+
return this.serverSessionData?.state || this.clientSessionData?.state || State.UNSPECIFIED;
231+
}
232+
227233
public verify(
228234
tag: string,
229235
type: SessionType,

packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts

+27-1
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
583583
public async shutdown(): Promise<void> {
584584
const fnTag = `${this.className}#getGatewaySeeds()`;
585585
this.logger.debug(`Entering ${fnTag}`);
586-
586+
587587
this.logger.info("Shutting down Node server - BOL");
588588
await this.shutdownBLOServer();
589589
await this.shutdownGOLServer();
@@ -618,6 +618,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
618618
this.logger.debug(`Entering ${fnTag}`);
619619
if (this.BLOServer) {
620620
try {
621+
await this.verifySessionsState();
621622
await this.BLOServer.closeAllConnections();
622623
await this.BLOServer.close();
623624
this.BLOServer = undefined;
@@ -649,4 +650,29 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
649650
this.logger.warn("Server is not running.");
650651
}
651652
}
653+
654+
/**
655+
* @notice Verify the state of the sessions before shutting down the server.
656+
* This method is called before the server is shut down and awaits ensure that
657+
* all sessions are concluded before the server is terminated.
658+
*/
659+
private async verifySessionsState(): Promise<void> {
660+
const fnTag = `${this.className}#verifySessionsState()`;
661+
this.logger.trace(`Entering ${fnTag}`);
662+
if (!this.BLODispatcher) {
663+
throw new Error(`Cannot ${fnTag} because BLODispatcher is erroneous`);
664+
}
665+
this.BLODispatcher.setInitiateShutdown();
666+
const manager = await this.BLODispatcher.getManager();
667+
let status = false;
668+
while (!status) {
669+
this.logger.debug(`Inside: ${status}`);
670+
status = await manager.getSATPSessionState();
671+
if (!status) {
672+
this.logger.info("Sessions are still pending");
673+
await new Promise(resolve => setTimeout(resolve, 20000));
674+
}
675+
}
676+
this.logger.info("All sessions are concluded");
677+
}
652678
}

packages/cactus-plugin-satp-hermes/src/main/typescript/services/gateway/satp-manager.ts

+12
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { Stage2SATPHandler } from "../../core/stage-handlers/stage2-handler";
3737
import { Stage3SATPHandler } from "../../core/stage-handlers/stage3-handler";
3838
import { SATPCrossChainManager } from "../../cross-chain-mechanisms/satp-cc-manager";
3939
import { GatewayOrchestrator } from "./gateway-orchestrator";
40+
import { State } from "../../generated/proto/cacti/satp/v02/common/session_pb";
4041
import type { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
4142
import type { SatpStage0Service } from "../../generated/proto/cacti/satp/v02/service/stage_0_pb";
4243
import type { SatpStage1Service } from "../../generated/proto/cacti/satp/v02/service/stage_1_pb";
@@ -308,6 +309,17 @@ export class SATPManager {
308309
return this.satpHandlers.get(type);
309310
}
310311

312+
public getSATPSessionState(): boolean {
313+
const fnTag = `${SATPManager.CLASS_NAME}#getSATPSessionStatus()`;
314+
this.logger.info(`${fnTag}, Getting SATP Session Status...`);
315+
for (let value of this.sessions.values()) {
316+
if (value.getSessionState() !== State.COMPLETED) {
317+
return false;
318+
}
319+
}
320+
return true;
321+
}
322+
311323
public getOrCreateSession(
312324
sessionId?: string,
313325
contextID?: string,

packages/cactus-plugin-satp-hermes/src/test/typescript/integration/bridge/fabric-bridge.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ beforeAll(async () => {
231231
const satpContractRelPath =
232232
"../../../../test/typescript/fabric/contracts/satp-contract/chaincode-typescript";
233233
const wrapperSatpContractRelPath =
234-
"../../../../main/typescript/fabric-contracts/satp-wrapper/chaincode-typescript";
234+
"../../../../main/typescript/cross-chain-mechanisms/satp-bridge/fabric-contracts/satp-wrapper/chaincode-typescript";
235235
const satpContractDir = path.join(__dirname, satpContractRelPath);
236236

237237
// ├── package.json
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import "jest-extended";
2+
import {
3+
Containers,
4+
pruneDockerAllIfGithubAction,
5+
} from "@hyperledger/cactus-test-tooling";
6+
import {
7+
LogLevelDesc,
8+
LoggerProvider,
9+
} from "@hyperledger/cactus-common";
10+
import {
11+
SATPGateway,
12+
SATPGatewayConfig,
13+
} from "../../../main/typescript/plugin-satp-hermes-gateway";
14+
import { PluginFactorySATPGateway } from "../../../main/typescript/factory/plugin-factory-gateway-orchestrator";
15+
import {
16+
IPluginFactoryOptions,
17+
LedgerType,
18+
PluginImportType,
19+
} from "@hyperledger/cactus-core-api";
20+
import {
21+
SATP_ARCHITECTURE_VERSION,
22+
SATP_CORE_VERSION,
23+
SATP_CRASH_VERSION,
24+
} from "../../../main/typescript/core/constants";
25+
import {
26+
knexClientConnection,
27+
knexSourceRemoteConnection,
28+
} from "../knex.config";
29+
import { SATPSession } from "../../../main/typescript/core/satp-session";
30+
import {
31+
TransactRequest,
32+
TransactRequestSourceAsset
33+
} from "../../../main/typescript";
34+
35+
const logLevel: LogLevelDesc = "DEBUG";
36+
const logger = LoggerProvider.getOrCreate({
37+
level: logLevel,
38+
label: "satp-gateway-orchestrator-init-test",
39+
});
40+
const factoryOptions: IPluginFactoryOptions = {
41+
pluginImportType: PluginImportType.Local,
42+
};
43+
const factory = new PluginFactorySATPGateway(factoryOptions);
44+
45+
let mockSession: SATPSession;
46+
const sessionIDs: string[] = [];
47+
48+
beforeAll(async () => {
49+
pruneDockerAllIfGithubAction({ logLevel })
50+
.then(() => {
51+
logger.info("Pruning throw OK");
52+
})
53+
.catch(async () => {
54+
await Containers.logDiagnostics({ logLevel });
55+
fail("Pruning didn't throw OK");
56+
});
57+
mockSession = new SATPSession({
58+
contextID: "MOCK_CONTEXT_ID",
59+
server: false,
60+
client: true,
61+
});
62+
63+
sessionIDs.push(mockSession.getSessionId());
64+
});
65+
66+
describe("Shutdown Verify State Tests", () => {
67+
test("Gateway waits to verify the sessions state before shutdown", async () => {
68+
const options: SATPGatewayConfig = {
69+
gid: {
70+
id: "mockID",
71+
name: "CustomGateway",
72+
version: [
73+
{
74+
Core: SATP_CORE_VERSION,
75+
Architecture: SATP_ARCHITECTURE_VERSION,
76+
Crash: SATP_CRASH_VERSION,
77+
},
78+
],
79+
connectedDLTs: [],
80+
proofID: "mockProofID10",
81+
gatewayServerPort: 3014,
82+
gatewayClientPort: 3015,
83+
address: "https://localhost",
84+
},
85+
knexLocalConfig: knexClientConnection,
86+
knexRemoteConfig: knexSourceRemoteConnection,
87+
};
88+
89+
const gateway = await factory.create(options);
90+
expect(gateway).toBeInstanceOf(SATPGateway);
91+
92+
const verifySessionsStateSpy = jest.spyOn(gateway as any, "verifySessionsState");
93+
94+
const shutdownBLOServerSpy = jest.spyOn(gateway as any, "shutdownBLOServer");
95+
96+
await gateway.startup();
97+
await gateway.shutdown();
98+
99+
expect(verifySessionsStateSpy).toHaveBeenCalled();
100+
101+
expect(shutdownBLOServerSpy).toHaveBeenCalled();
102+
103+
verifySessionsStateSpy.mockRestore();
104+
shutdownBLOServerSpy.mockRestore();
105+
});
106+
107+
test("Gateway waits for pending sessions to complete before shutdown", async () => {
108+
const options: SATPGatewayConfig = {
109+
gid: {
110+
id: "mockID",
111+
name: "CustomGateway",
112+
version: [
113+
{
114+
Core: SATP_CORE_VERSION,
115+
Architecture: SATP_ARCHITECTURE_VERSION,
116+
Crash: SATP_CRASH_VERSION,
117+
},
118+
],
119+
connectedDLTs: [],
120+
proofID: "mockProofID10",
121+
gatewayServerPort: 3014,
122+
gatewayClientPort: 3015,
123+
address: "https://localhost",
124+
},
125+
knexLocalConfig: knexClientConnection,
126+
knexRemoteConfig: knexSourceRemoteConnection,
127+
};
128+
129+
const gateway = await factory.create(options);
130+
expect(gateway).toBeInstanceOf(SATPGateway);
131+
132+
const satpManager = (gateway as any).BLODispatcher.manager;
133+
134+
let sessionState = false;
135+
satpManager.getSessions().set(mockSession.getSessionId(), mockSession);
136+
137+
await gateway.startup();
138+
139+
const shutdownPromise = gateway.shutdown();
140+
141+
const initialSessionState = await satpManager.getSATPSessionState();
142+
expect(initialSessionState).toBe(false);
143+
144+
const getSATPSessionStateSpy = jest
145+
.spyOn(satpManager, "getSATPSessionState")
146+
.mockImplementation(async () => {
147+
if (!sessionState) {
148+
await new Promise((resolve) => setTimeout(resolve, 30000));
149+
sessionState = true;
150+
}
151+
return sessionState;
152+
});
153+
154+
await shutdownPromise;
155+
156+
const finalSessionState = await satpManager.getSATPSessionState();
157+
expect(finalSessionState).toBe(true);
158+
159+
getSATPSessionStateSpy.mockRestore();
160+
161+
});
162+
163+
test("Gateway does not allow new transactions after shutdown is initiated", async () => {
164+
const options: SATPGatewayConfig = {
165+
gid: {
166+
id: "mockID",
167+
name: "CustomGateway",
168+
version: [
169+
{
170+
Core: SATP_CORE_VERSION,
171+
Architecture: SATP_ARCHITECTURE_VERSION,
172+
Crash: SATP_CRASH_VERSION,
173+
},
174+
],
175+
connectedDLTs: [],
176+
proofID: "mockProofID10",
177+
gatewayServerPort: 3014,
178+
gatewayClientPort: 3015,
179+
address: "https://localhost",
180+
},
181+
knexLocalConfig: knexClientConnection,
182+
knexRemoteConfig: knexSourceRemoteConnection,
183+
};
184+
185+
const gateway = await factory.create(options);
186+
expect(gateway).toBeInstanceOf(SATPGateway);
187+
188+
await gateway.startup();
189+
190+
const shutdownPromise = gateway.shutdown();
191+
192+
const transactRequestSourceAsset: TransactRequestSourceAsset = {
193+
owner: "mockOwner",
194+
ontology: "mockOntology",
195+
contractName: "mockContractName",
196+
};
197+
198+
const transactRequest: TransactRequest = {
199+
contextID: "mockContextID",
200+
fromDLTNetworkID: "mockFromDLTNetworkID",
201+
toDLTNetworkID: "mockToDLTNetworkID",
202+
fromAmount: "100",
203+
toAmount: "100",
204+
beneficiaryPubkey: "mockBeneficiaryPubkey",
205+
originatorPubkey: "mockOriginatorPubkey",
206+
sourceAsset: transactRequestSourceAsset,
207+
receiverAsset: transactRequestSourceAsset,
208+
};
209+
210+
await expect(gateway.BLODispatcherInstance?.Transact(transactRequest)).rejects.toThrow("BLODispatcher#transact(), shutdown initiated not receiving new requests");
211+
212+
await shutdownPromise;
213+
});
214+
});
215+

0 commit comments

Comments
 (0)