diff --git a/agent/discoveryTools.mjs b/agent/discoveryTools.mjs index ace9a5752..44f7157b2 100644 --- a/agent/discoveryTools.mjs +++ b/agent/discoveryTools.mjs @@ -52,6 +52,10 @@ export function initialSkaleNetworkScanForS2S() { log.write( strLogPrefix + cc.debug( "Downloading SKALE network information..." ) + "\n" ); } + if( log.verboseGet() >= log.verboseReversed().information ) { + log.write( strLogPrefix + + cc.notice( "Will init periodic S-Chains caching now..." ) + "\n" ); + } const opts = { imaState: imaState, "details": log, @@ -64,18 +68,23 @@ export function initialSkaleNetworkScanForS2S() { "bParallelModeRefreshSNB": ( !!( imaState.optsS2S.bParallelModeRefreshSNB ) ), "isForceMultiAttemptsUntilSuccess": true }; - if( log.verboseGet() >= log.verboseReversed().information ) { - log.write( strLogPrefix + - cc.debug( "Will start periodic S-Chains caching..." ) + "\n" ); - } await skaleObserver.periodicCachingStart( imaState.chainProperties.sc.strChainName, opts - ); - if( log.verboseGet() >= log.verboseReversed().information ) { - log.write( strLogPrefix + - cc.success( "Done, did started periodic S-Chains caching." ) + "\n" ); - } + ).then( function() { + if( log.verboseGet() >= log.verboseReversed().information ) { + log.write( strLogPrefix + + cc.success( "Done, did started periodic S-Chains caching." ) + + "\n" ); + } + } ).catch( function( err ) { + if( log.verboseGet() >= log.verboseReversed().error ) { + const strError = owaspUtils.extractErrorMessage( err ); + log.write( cc.fatal( "CRITICAL ERROR:" ) + + cc.error( " failed to start periodic S-Chains caching" ) + + cc.warning( strError ) + "\n" ); + } + } ); return true; } } ); diff --git a/agent/loop.mjs b/agent/loop.mjs index b5d54bd7f..64673cbb4 100644 --- a/agent/loop.mjs +++ b/agent/loop.mjs @@ -655,6 +655,20 @@ function constructChainProperties( opts ) { }; } +function getDefaultOptsLoop( idxWorker ) { + const optsLoop = { + joRuntimeOpts: { + isInsideWorker: true, idxChainKnownForS2S: 0, cntChainsKnownForS2S: 0 + }, + isDelayFirstRun: false, + enableStepOracle: ( idxWorker == 0 ) ? true : false, + enableStepM2S: ( idxWorker == 0 ) ? true : false, + enableStepS2M: ( idxWorker == 1 ) ? true : false, + enableStepS2S: ( idxWorker == 0 ) ? true : false + }; + return optsLoop; +} + export async function ensureHaveWorkers( opts ) { if( gArrWorkers.length > 0 ) return gArrWorkers; @@ -668,53 +682,54 @@ export async function ensureHaveWorkers( opts ) { const workerData = { url: "ima_loop_server" + idxWorker, cc: { isEnabled: cc.isEnabled() } }; - gArrWorkers.push( - new threadInfo.Worker( - path.join( __dirname, "loopWorker.mjs" ), - { "type": "module", "workerData": workerData } - ) - ); + gArrWorkers.push( new threadInfo.Worker( + path.join( __dirname, "loopWorker.mjs" ), + { "type": "module", "workerData": workerData } + ) ); gArrWorkers[idxWorker].on( "message", jo => { if( networkLayer.outOfWorkerAPIs.onMessage( gArrWorkers[idxWorker], jo ) ) return; } ); - gArrClients.push( new networkLayer.OutOfWorkerSocketClientPipe( - workerData.url, gArrWorkers[idxWorker] ) ); - gArrClients[idxWorker].on( "message", async function( eventData ) { + const aClient = new networkLayer.OutOfWorkerSocketClientPipe( + workerData.url, gArrWorkers[idxWorker] ); + gArrClients.push( aClient ); + aClient.logicalInitComplete = false; + aClient.errorLogicalInit = null; + aClient.on( "message", async function( eventData ) { const joMessage = eventData.message; switch ( joMessage.method ) { + case "init": + if( ! joMessage.error ) { + aClient.logicalInitComplete = true; + break; + } + aClient.errorLogicalInit = joMessage.error; + if( log.verboseGet() >= log.verboseReversed().critical ) { + opts.details.write( cc.fatal( "CRITICAL ERROR:" ) + " " + + cc.debug( "Loop worker thread " ) + cc.info( idxWorker ) + + cc.debug( " reported/returned init error:" ) + " " + + cc.warning( owaspUtils.extractErrorMessage( joMessage.error ) ) + "\n" ); + } + break; case "log": log.write( cc.attention( "LOOP WORKER" ) + " " + cc.notice( workerData.url ) + " " + joMessage.message + "\n" ); break; case "saveTransferError": imaTransferErrorHandling.saveTransferError( - joMessage.message.category, - joMessage.message.textLog, - joMessage.message.ts ); + joMessage.message.category, joMessage.message.textLog, joMessage.message.ts ); break; case "saveTransferSuccess": imaTransferErrorHandling.saveTransferSuccess( joMessage.message.category ); break; } // switch ( joMessage.method ) } ); - await threadInfo.sleep( 3 * 1000 ); - const optsLoop = { - joRuntimeOpts: { - isInsideWorker: true, idxChainKnownForS2S: 0, cntChainsKnownForS2S: 0 - }, - isDelayFirstRun: false, - enableStepOracle: ( idxWorker == 0 ) ? true : false, - enableStepM2S: ( idxWorker == 0 ) ? true : false, - enableStepS2M: ( idxWorker == 1 ) ? true : false, - enableStepS2S: ( idxWorker == 0 ) ? true : false - }; const jo = { "method": "init", "message": { "opts": { "imaState": { - "optsLoop": optsLoop, + "optsLoop": getDefaultOptsLoop( idxWorker ), "verbose_": log.verboseGet(), "expose_details_": log.exposeDetailsGet(), "arrSChainsCached": skaleObserver.getLastCachedSChains(), @@ -791,18 +806,14 @@ export async function ensureHaveWorkers( opts ) { "chainProperties": constructChainProperties( opts ), "joAbiSkaleManager": opts.imaState.joAbiSkaleManager, "bHaveSkaleManagerABI": opts.imaState.bHaveSkaleManagerABI, - "strChainNameOriginChain": opts.imaState.strChainNameOriginChain, - "isPWA": opts.imaState.isPWA, "nTimeoutSecondsPWA": opts.imaState.nTimeoutSecondsPWA, - "strReimbursementChain": opts.imaState.strReimbursementChain, "isShowReimbursementBalance": opts.imaState.isShowReimbursementBalance, "nReimbursementRecharge": opts.imaState.nReimbursementRecharge, "nReimbursementWithdraw": opts.imaState.nReimbursementWithdraw, "nReimbursementRange": opts.imaState.nReimbursementRange, - "joSChainDiscovery": { "isSilentReDiscovery": opts.imaState.joSChainDiscovery.isSilentReDiscovery, @@ -811,7 +822,6 @@ export async function ensureHaveWorkers( opts ) { "periodicDiscoveryInterval": opts.imaState.joSChainDiscovery.periodicDiscoveryInterval }, - "optsS2S": { // S-Chain to S-Chain transfer options "isEnabled": true, "bParallelModeRefreshSNB": @@ -821,7 +831,6 @@ export async function ensureHaveWorkers( opts ) { "secondsToWaitForSkaleNetworkDiscovered": opts.imaState.optsS2S.secondsToWaitForSkaleNetworkDiscovered }, - "nJsonRpcPort": opts.imaState.nJsonRpcPort, "isCrossImaBlsMode": opts.imaState.isCrossImaBlsMode } @@ -829,7 +838,13 @@ export async function ensureHaveWorkers( opts ) { "cc": { "isEnabled": cc.isEnabled() } } }; - gArrClients[idxWorker].send( jo ); + while( ! aClient.logicalInitComplete ) { + if( log.verboseGet() >= log.verboseReversed().info ) + log.write( "LOOP server is not inited yet...\n" ); + + await threadInfo.sleep( 1000 ); + aClient.send( jo ); + } } if( log.verboseGet() >= log.verboseReversed().debug ) { log.write( cc.debug( "Loop module did created its " ) + diff --git a/agent/loopWorker.mjs b/agent/loopWorker.mjs index 52f89fa26..09111c278 100644 --- a/agent/loopWorker.mjs +++ b/agent/loopWorker.mjs @@ -63,11 +63,18 @@ class ObserverServer extends SocketServer { constructor( acceptor ) { super( acceptor ); const self = this; + self.initComplete = false; cc.enable( workerData.cc.isEnabled ); self.opts = null; self.intervalPeriodicSchainsCaching = null; self.bIsPeriodicCachingStepInProgress = false; self.mapApiHandlers.init = function( joMessage, joAnswer, eventData, socket ) { + joAnswer.message = { + "method": "" + joMessage.method, + "error": null + }; + if( self.initComplete ) + return joAnswer; self.log = function() { const args = Array.prototype.slice.call( arguments ); const jo = { @@ -107,10 +114,6 @@ class ObserverServer extends SocketServer { cc.debug( " will save cached S-Chains..." ) + "\n" ); } skaleObserver.setLastCachedSChains( self.opts.imaState.arrSChainsCached ); - joAnswer.message = { - "method": "" + joMessage.method, - "error": null - }; self.opts.imaState.chainProperties.mn.joAccount.address = owaspUtils.fnAddressImpl_; self.opts.imaState.chainProperties.sc.joAccount.address = owaspUtils.fnAddressImpl_; if( self.opts.imaState.chainProperties.mn.strURL && @@ -160,6 +163,7 @@ class ObserverServer extends SocketServer { imaTx.getTransactionCustomizerForSChainTarget(); state.set( imaState ); imaCLI.initContracts(); + self.initComplete = true; if( log.verboseGet() >= log.verboseReversed().information ) { self.log( cc.debug( "IMA loop worker" ) + " " + cc.notice( workerData.url ) + cc.debug( " will do the following work:" ) + "\n" + " " + diff --git a/npms/skale-observer/observer.mjs b/npms/skale-observer/observer.mjs index d42bd585a..47364448e 100644 --- a/npms/skale-observer/observer.mjs +++ b/npms/skale-observer/observer.mjs @@ -1197,9 +1197,23 @@ export async function ensureHaveWorker( opts ) { return; } ); gClient = new networkLayer.OutOfWorkerSocketClientPipe( url, gWorker ); + gClient.logicalInitComplete = false; + gClient.errorLogicalInit = null; gClient.on( "message", function( eventData ) { const joMessage = eventData.message; switch ( joMessage.method ) { + case "init": + if( ! joMessage.error ) { + gClient.logicalInitComplete = true; + break; + } + gClient.errorLogicalInit = joMessage.error; + if( log.verboseGet() >= log.verboseReversed().critical ) { + opts.details.write( cc.fatal( "CRITICAL ERROR:" ) + " " + + cc.debug( "SNB worker thread reported/returned init error:" ) + " " + + cc.warning( owaspUtils.extractErrorMessage( joMessage.error ) ) + "\n" ); + } + break; case "periodicCachingDoNow": if( log.verboseGet() >= log.verboseReversed().debug ) { opts.details.write( @@ -1222,7 +1236,6 @@ export async function ensureHaveWorker( opts ) { break; } // switch ( joMessage.method ) } ); - await threadInfo.sleep( 1000 ); const jo = { "method": "init", "message": { @@ -1318,7 +1331,13 @@ export async function ensureHaveWorker( opts ) { } } }; - gClient.send( jo ); + while( ! gClient.logicalInitComplete ) { + if( log.verboseGet() >= log.verboseReversed().info ) + log.write( "SNB server is not inited yet...\n" ); + + await threadInfo.sleep( 1000 ); + gClient.send( jo ); + } } async function inThreadPeriodicCachingStart( strChainNameConnectedTo, opts ) { @@ -1350,6 +1369,9 @@ async function parallelPeriodicCachingStart( strChainNameConnectedTo, opts ) { try { const nSecondsToWaitParallel = ( opts.secondsToWaitForSkaleNetworkDiscovered > 0 ) ? opts.secondsToWaitForSkaleNetworkDiscovered : ( 2 * 60 ); + owaspUtils.ensureObserverOptionsInitialized( opts ); + await ensureHaveWorker( opts ); + await threadInfo.sleep( 5 * 1000 ); let iv = null; iv = setTimeout( function() { if( iv ) { @@ -1368,8 +1390,6 @@ async function parallelPeriodicCachingStart( strChainNameConnectedTo, opts ) { periodicCachingStop(); inThreadPeriodicCachingStart( strChainNameConnectedTo, opts ); }, nSecondsToWaitParallel * 1000 ); - owaspUtils.ensureObserverOptionsInitialized( opts ); - await ensureHaveWorker( opts ); if( log.verboseGet() >= log.verboseReversed().debug ) { log.write( threadInfo.threadDescription() + cc.debug( " will inform worker thread to start periodic SNB refresh each " ) + diff --git a/npms/skale-observer/observerWorker.mjs b/npms/skale-observer/observerWorker.mjs index 77a813d2d..01ea4fd90 100644 --- a/npms/skale-observer/observerWorker.mjs +++ b/npms/skale-observer/observerWorker.mjs @@ -62,10 +62,17 @@ class ObserverServer extends SocketServer { constructor( acceptor ) { super( acceptor ); const self = this; + self.initComplete = false; self.opts = null; self.intervalPeriodicSchainsCaching = null; self.bIsPeriodicCachingStepInProgress = false; self.mapApiHandlers.init = function( joMessage, joAnswer, eventData, socket ) { + joAnswer.message = { + "method": "" + joMessage.method, + "error": null + }; + if( self.initComplete ) + return joAnswer; self.log = function() { const args = Array.prototype.slice.call( arguments ); const jo = { @@ -83,10 +90,6 @@ class ObserverServer extends SocketServer { cc.enable( joMessage.message.cc.isEnabled ); log.verboseSet( self.opts.imaState.verbose_ ); log.exposeDetailsSet( self.opts.imaState.expose_details_ ); - joAnswer.message = { - "method": "" + joMessage.method, - "error": null - }; self.opts.imaState.chainProperties.mn.joAccount.address = owaspUtils.fnAddressImpl_; self.opts.imaState.chainProperties.sc.joAccount.address = @@ -149,6 +152,7 @@ class ObserverServer extends SocketServer { self.opts.imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi, self.opts.imaState.chainProperties.sc.ethersProvider ); + self.initComplete = true; if( log.verboseGet() >= log.verboseReversed().information ) { self.log( cc.debug( "Full init compete for in-worker SNB server in " ) + threadInfo.threadDescription() + " " +