Skip to content

Commit

Permalink
Merge pull request #1643 from skalenetwork/beta
Browse files Browse the repository at this point in the history
2.0.1
  • Loading branch information
DmytroNazarenko authored Nov 17, 2023
2 parents 792d205 + b3d9d0c commit f57d757
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 359 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0
2.0.1
224 changes: 114 additions & 110 deletions agent/bls.mjs

Large diffs are not rendered by default.

83 changes: 47 additions & 36 deletions agent/loop.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ async function singleTransferLoopPartM2S( optsLoop, strLogPrefix ) {
imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,

imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.strChainName,
imaState.chainProperties.sc.strChainName,
Expand Down Expand Up @@ -309,7 +308,6 @@ async function singleTransferLoopPartS2M( optsLoop, strLogPrefix ) {
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.ethersProvider,
imaState.joMessageProxyMainNet,

imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.strChainName,
imaState.chainProperties.mn.strChainName,
Expand Down Expand Up @@ -512,17 +510,21 @@ const gArrClients = [];
export function notifyCacheChangedSNB( arrSChainsCached ) {
const cntWorkers = gArrWorkers.length;
if( cntWorkers == 0 ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.warning( "Will skip chainsCacheChanged dispatch event with " ) +
cc.warning( "no chains arrived in " ) + threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.warning( "Will skip chainsCacheChanged dispatch event with " ) +
cc.warning( "no chains arrived in " ) + threadInfo.threadDescription() + "\n" );
}
}
return;
}
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will broadcast arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will broadcast arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
}
for( let idxWorker = 0; idxWorker < cntWorkers; ++ idxWorker ) {
const jo = {
Expand All @@ -531,34 +533,44 @@ export function notifyCacheChangedSNB( arrSChainsCached ) {
"arrSChainsCached": arrSChainsCached
}
};
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache will be sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker..." ) +
"\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache will be sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker..." ) +
"\n" );
}
}
gArrClients[idxWorker].send( jo );
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache did sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker" ) +
"\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write( cc.debug( "S-Chains cache did sent to " ) +
cc.notice( gArrClients[idxWorker].url ) + cc.debug( " loop worker" ) +
"\n" );
}
}
}
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module did finished broadcasting arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module did finished broadcasting arrSChainsCached event to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
}
}

if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Subscribe to chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
log.write( cc.debug( "Subscribe to chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
}
}
skaleObserver.events.on( "chainsCacheChanged", function( eventData ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived chainsCacheChanged event in " ) +
threadInfo.threadDescription() + "\n" );
}
}
notifyCacheChangedSNB( eventData.detail.arrSChainsCached );
} );
Expand Down Expand Up @@ -757,14 +769,12 @@ export async function ensureHaveWorkers( opts ) {
"nMaxTransactionsM2S": opts.imaState.nMaxTransactionsM2S,
"nMaxTransactionsS2M": opts.imaState.nMaxTransactionsS2M,
"nMaxTransactionsS2S": opts.imaState.nMaxTransactionsS2S,

"nBlockAwaitDepthM2S": opts.imaState.nBlockAwaitDepthM2S,
"nBlockAwaitDepthS2M": opts.imaState.nBlockAwaitDepthS2M,
"nBlockAwaitDepthS2S": opts.imaState.nBlockAwaitDepthS2S,
"nBlockAgeM2S": opts.imaState.nBlockAgeM2S,
"nBlockAgeS2M": opts.imaState.nBlockAgeS2M,
"nBlockAgeS2S": opts.imaState.nBlockAgeS2S,

"nLoopPeriodSeconds": opts.imaState.nLoopPeriodSeconds,
"nNodeNumber": opts.imaState.nNodeNumber,
"nNodesCount": opts.imaState.nNodesCount,
Expand Down Expand Up @@ -836,7 +846,7 @@ export async function ensureHaveWorkers( opts ) {
};
while( ! aClient.logicalInitComplete ) {
if( log.verboseGet() >= log.verboseReversed().info )
log.write( "LOOP server is not inited yet...\n" );
log.write( "LOOP server is not initialized yet...\n" );
await threadInfo.sleep( 1000 );
aClient.send( jo );
}
Expand All @@ -846,12 +856,14 @@ export async function ensureHaveWorkers( opts ) {
cc.info( gArrWorkers.length ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "" ) + "\n" );
}
if( log.verboseGet() >= log.verboseReversed().trace ) {
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded &&
log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Subscribe to inThread-arrSChainsCached event in " ) +
threadInfo.threadDescription() + "\n" );
}
skaleObserver.events.on( "inThread-arrSChainsCached", function( eventData ) {
if( log.verboseGet() >= log.verboseReversed().trace ) {
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded &&
log.verboseGet() >= log.verboseReversed().trace ) {
log.write( cc.debug( "Did arrived inThread-arrSChainsCached event in " ) +
threadInfo.threadDescription() + "\n" );
}
Expand All @@ -861,9 +873,8 @@ export async function ensureHaveWorkers( opts ) {
// Force broadcast what we have in SNB right now because works above can start later than SNB
// is finished download connected chains quickly
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop module will do first initial broadcast of arrSChainsCached to its " ) +
cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
log.write( cc.debug( "Loop module will do first initial broadcast of arrSChainsCached " +
"to its " ) + cc.info( cntWorkers ) + cc.debug( " worker(s) in " ) +
threadInfo.threadDescription() + cc.debug( "..." ) + "\n" );
}
notifyCacheChangedSNB( skaleObserver.getLastCachedSChains() );
Expand Down
26 changes: 18 additions & 8 deletions agent/loopWorker.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,17 @@ class ObserverServer extends SocketServer {
const isFlush = true;
socket.send( jo, isFlush );
} );
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
log.write(
cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
cc.debug( " will save cached S-Chains..." ) + "\n" );
}
}
if( ! self.opts.imaState.optsLoop.enableStepS2S )
threadInfo.joCustomThreadProperties.isSChainsCacheNeeded = false;
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
log.write( cc.debug( "Loop worker " ) + cc.notice( workerData.url ) +
cc.debug( " will save cached S-Chains..." ) + "\n" );
}
skaleObserver.setLastCachedSChains( self.opts.imaState.arrSChainsCached );
Expand Down Expand Up @@ -200,11 +208,13 @@ class ObserverServer extends SocketServer {
imaState.joSChainNetworkInfo = joMessage.joSChainNetworkInfo;
};
self.mapApiHandlers.schainsCached = function( joMessage, joAnswer, eventData, socket ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
self.log( cc.debug( "S-Chains cache did arrived to " ) +
cc.notice( workerData.url ) + cc.debug( " loop worker in " ) +
threadInfo.threadDescription() + cc.debug( ": " ) +
cc.j( joMessage.message.arrSChainsCached ) + "\n" );
if( threadInfo.joCustomThreadProperties.isSChainsCacheNeeded ) {
if( log.verboseGet() >= log.verboseReversed().debug ) {
self.log( cc.debug( "S-Chains cache did arrived to " ) +
cc.notice( workerData.url ) + cc.debug( " loop worker in " ) +
threadInfo.threadDescription() + cc.debug( ": " ) +
cc.j( joMessage.message.arrSChainsCached ) + "\n" );
}
}
skaleObserver.setLastCachedSChains( joMessage.message.arrSChainsCached );
};
Expand Down
7 changes: 6 additions & 1 deletion agent/main.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,13 @@ function initJsonRpcServer() {
}
break;
default:
throw new Error( "Unknown method name \"" + joMessage.method + "\" was specified" );
joAnswer.error = `Unknown method name ${joMessage.method} was specified`;
break;
} // switch( joMessage.method )
if( ( !joAnswer ) || typeof joAnswer != "object" ) {
joAnswer = {};
joAnswer.error = "internal error, null data returned";
}
} catch ( err ) {
if( log.verboseGet() >= log.verboseReversed().error ) {
const strError = owaspUtils.extractErrorMessage( err );
Expand Down
2 changes: 1 addition & 1 deletion agent/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ BASE_OPTIONS="--gas-price-multiplier=$GAS_PRICE_MULTIPLIER \
--monitoring-port=$MONITORING_PORT \
--pwa \
--no-expose-pwa \
--auto-exit=86400"
--auto-exit=3600"

echo "Running loop cmd..."
node "$DIR/main.mjs" --loop $BASE_OPTIONS
5 changes: 5 additions & 0 deletions agent/threadInfo.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import * as cc from "../npms/skale-cc/cc.mjs";
const Worker = worker_threads.Worker;
export { Worker };

const joCustomThreadProperties = {
"isSChainsCacheNeeded": true // by default is set to true
};
export { joCustomThreadProperties };

export const sleep = ( milliseconds ) => {
return new Promise( resolve => setTimeout( resolve, milliseconds ) );
};
Expand Down
32 changes: 29 additions & 3 deletions npms/skale-ima/imaEventLogScan.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,26 @@ export function createProgressiveEventsScanPlan( details, nLatestBlockNumber ) {
return arrProgressiveEventsScanPlan;
}

function generateWhileTransferringLogMessageSuffix( optsChainPair ) {
if( ! optsChainPair )
return "";
if( ! optsChainPair.strDirection )
return "";
if( optsChainPair.strDirection == "S2S" ) {
return cc.debug( " (while performing " ) + cc.attention( optsChainPair.strDirection ) +
cc.debug( " transfer with external S-Chain " ) +
cc.info( optsChainPair.optsSpecificS2S.joSChain.data.name ) + cc.debug( " / " ) +
cc.notice( optsChainPair.optsSpecificS2S.joSChain.data.computed.chainId ) +
cc.debug( " node " ) + cc.info( optsChainPair.optsSpecificS2S.idxNode ) +
cc.debug( ")" );
}
return cc.debug( " (while performing " ) + cc.attention( optsChainPair.strDirection ) +
cc.debug( " transfer)" );
}

export async function safeGetPastEventsProgressive(
details, strLogPrefix,
ethersProvider, attempts, joContract, strEventName,
nBlockFrom, nBlockTo, joFilter
details, strLogPrefix, ethersProvider, attempts, joContract, strEventName,
nBlockFrom, nBlockTo, joFilter, optsChainPair
) {
if( ! imaTransferErrorHandling.getEnabledProgressiveEventsScan() ) {
details.write( strLogPrefix +
Expand All @@ -104,6 +120,16 @@ export async function safeGetPastEventsProgressive(
nBlockFrom, nBlockTo, joFilter
);
}
if( log.verboseGet() >= log.verboseReversed().information ) {
details.write( strLogPrefix +
cc.info( "Will run progressive event log search for event " ) +
cc.j( strEventName ) + cc.info( " via URL " ) +
cc.u( owaspUtils.ethersProviderToUrl( ethersProvider ) ) +
generateWhileTransferringLogMessageSuffix( optsChainPair ) +
cc.info( ", from block " ) + cc.notice( nBlockFrom ) +
cc.info( ", to block " ) + cc.notice( nBlockTo ) +
cc.info( "..." ) + "\n" );
}
const nLatestBlockNumber = owaspUtils.toBN(
await imaHelperAPIs.safeGetBlockNumber( details, 10, ethersProvider ) );
const nLatestBlockNumberPlus1 = nLatestBlockNumber.add( owaspUtils.toBN( 1 ) );
Expand Down
Loading

0 comments on commit f57d757

Please sign in to comment.