Skip to content

Commit

Permalink
Merge pull request #1623 from skalenetwork/ticket-1620/resolve-speed-…
Browse files Browse the repository at this point in the history
…issues-of-IMA-event-scanner

Ticket 1620/resolve speed issues of ima event scanner
  • Loading branch information
DmytroNazarenko authored Oct 26, 2023
2 parents e629be1 + 0003adf commit f43cc3f
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 90 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0
2.0.1
21 changes: 19 additions & 2 deletions agent/clpTools.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -1314,14 +1314,21 @@ export function commandLineTaskTransferM2S() {
idxChainKnownForS2S: 0,
cntChainsKnownForS2S: 0
};
const optsChainPair = {
"strDirection": "M2S",
"chainSrc": imaState.chainProperties.mn,
"chainDst": imaState.chainProperties.sc
};
return await IMA.doTransfer( // main-net --> s-chain
"M2S",
joRuntimeOpts,
imaState.chainProperties.mn.ethersProvider,
imaState.joMessageProxyMainNet,
imaState.chainProperties.mn.joAbiIMA.message_proxy_mainnet_abi,
imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,
imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.strChainName,
imaState.chainProperties.sc.strChainName,
Expand All @@ -1336,7 +1343,8 @@ export function commandLineTaskTransferM2S() {
imaState.nBlockAgeM2S,
imaBLS.doSignMessagesM2S,
null,
imaState.chainProperties.sc.transactionCustomizer
imaState.chainProperties.sc.transactionCustomizer,
optsChainPair
);
}
} );
Expand All @@ -1354,14 +1362,21 @@ export function commandLineTaskTransferS2M() {
idxChainKnownForS2S: 0,
cntChainsKnownForS2S: 0
};
const optsChainPair = {
"strDirection": "S2M",
"chainSrc": imaState.chainProperties.sc,
"chainDst": imaState.chainProperties.mn
};
return await IMA.doTransfer( // s-chain --> main-net
"S2M",
joRuntimeOpts,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,
imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.ethersProvider,
imaState.joMessageProxyMainNet,
imaState.chainProperties.sc.joAbiIMA.message_proxy_mainnet_abi,
imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.strChainName,
imaState.chainProperties.mn.strChainName,
Expand All @@ -1376,7 +1391,8 @@ export function commandLineTaskTransferS2M() {
imaState.nBlockAgeS2M,
imaBLS.doSignMessagesS2M,
null,
imaState.chainProperties.mn.transactionCustomizer
imaState.chainProperties.mn.transactionCustomizer,
optsChainPair
);
}
} );
Expand All @@ -1403,6 +1419,7 @@ export function commandLineTaskTransferS2S() {
skaleObserver,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,
imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.sc.strChainName,
imaState.chainProperties.sc.chainId,
Expand Down
27 changes: 20 additions & 7 deletions agent/loop.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,21 @@ async function singleTransferLoopPartM2S( optsLoop, strLogPrefix ) {
if( checkTimeFraming( null, "m2s", optsLoop.joRuntimeOpts ) ) {
imaState.loopState.m2s.isInProgress = true;
await pwa.notifyOnLoopStart( imaState, "m2s" );
const optsChainPair = {
"strDirection": "M2S",
"chainSrc": imaState.chainProperties.mn,
"chainDst": imaState.chainProperties.sc
};
b1 = await IMA.doTransfer( // main-net --> s-chain
"M2S",
optsLoop.joRuntimeOpts,

imaState.chainProperties.mn.ethersProvider,
imaState.joMessageProxyMainNet,
imaState.chainProperties.sc.joAbiIMA.message_proxy_mainnet_abi,
imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,

imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.strChainName,
imaState.chainProperties.sc.strChainName,
Expand All @@ -244,7 +249,8 @@ async function singleTransferLoopPartM2S( optsLoop, strLogPrefix ) {
imaState.nBlockAgeM2S,
imaBLS.doSignMessagesM2S,
null,
imaState.chainProperties.sc.transactionCustomizer
imaState.chainProperties.sc.transactionCustomizer,
optsChainPair
);
imaState.loopState.m2s.isInProgress = false;
await pwa.notifyOnLoopEnd( imaState, "m2s" );
Expand Down Expand Up @@ -300,16 +306,21 @@ async function singleTransferLoopPartS2M( optsLoop, strLogPrefix ) {
if( checkTimeFraming( null, "s2m", optsLoop.joRuntimeOpts ) ) {
imaState.loopState.s2m.isInProgress = true;
await pwa.notifyOnLoopStart( imaState, "s2m" );
const optsChainPair = {
"strDirection": "S2M",
"chainSrc": imaState.chainProperties.sc,
"chainDst": imaState.chainProperties.mn
};
b2 = await IMA.doTransfer( // s-chain --> main-net
"S2M",
optsLoop.joRuntimeOpts,

imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,
imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.mn.ethersProvider,
imaState.joMessageProxyMainNet,

imaState.chainProperties.sc.joAbiIMA.message_proxy_mainnet_abi,
imaState.chainProperties.mn.joAccount,
imaState.chainProperties.sc.strChainName,
imaState.chainProperties.mn.strChainName,
Expand All @@ -324,7 +335,8 @@ async function singleTransferLoopPartS2M( optsLoop, strLogPrefix ) {
imaState.nBlockAgeS2M,
imaBLS.doSignMessagesS2M,
null,
imaState.chainProperties.mn.transactionCustomizer
imaState.chainProperties.mn.transactionCustomizer,
optsChainPair
);
imaState.loopState.s2m.isInProgress = false;
await pwa.notifyOnLoopEnd( imaState, "s2m" );
Expand Down Expand Up @@ -373,6 +385,7 @@ async function singleTransferLoopPartS2S( optsLoop, strLogPrefix ) {
skaleObserver,
imaState.chainProperties.sc.ethersProvider,
imaState.joMessageProxySChain,
imaState.chainProperties.sc.joAbiIMA.message_proxy_chain_abi,
imaState.chainProperties.sc.joAccount,
imaState.chainProperties.sc.strChainName,
imaState.chainProperties.sc.chainId,
Expand Down Expand Up @@ -836,7 +849,7 @@ export async function ensureHaveWorkers( opts ) {
};
while( ! aClient.logicalInitComplete ) {
if( log.verboseGet() >= log.verboseReversed().info )
log.write( "LOOP server is not inited yet...\n" );
log.write( "LOOP server is not initialized yet...\n" );
await threadInfo.sleep( 1000 );
aClient.send( jo );
}
Expand Down
8 changes: 8 additions & 0 deletions agent/test/agentUnitTests.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ describe( "tests for `npms/skale-ima` 3", function() {

it( "should return `false` invoke `doTransfer`", async function() {
let joMessageProxySrc; // for `false` output
const joMessageProxySrcABI = null;
const joMessageProxyDstABI = null;
const chainNameSrc = "test";
const chainNameDst = "test";
const nTransactionsCountInBlock = 4;
Expand All @@ -508,9 +510,11 @@ describe( "tests for `npms/skale-ima` 3", function() {
joRuntimeOpts,
ethersProviderSrc,
joMessageProxySrc,
joMessageProxySrcABI,
joAccountSrc,
ethersProviderDst,
joMessageProxyDst,
joMessageProxyDstABI,
joAccountDst,
chainNameSrc,
chainNameDst,
Expand Down Expand Up @@ -542,15 +546,19 @@ describe( "tests for `npms/skale-ima` 3", function() {
idxChainKnownForS2S: 0,
cntChainsKnownForS2S: 0
};
const joMessageProxySrcABI = null;
const joMessageProxyDstABI = null;
// eslint-disable-next-line no-unused-expressions
expect( await IMA.doTransfer(
"M2S",
joRuntimeOpts,
ethersProviderSrc,
joMessageProxySrc,
joMessageProxySrcABI,
joAccountSrc,
ethersProviderDst,
joMessageProxyDst,
joMessageProxyDstABI,
joAccountDst,
chainNameSrc,
chainNameDst,
Expand Down
88 changes: 86 additions & 2 deletions npms/skale-ima/imaEventLogScan.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import * as rpcCall from "../../agent/rpcCall.mjs";
import * as imaHelperAPIs from "./imaHelperAPIs.mjs";
import * as imaTransferErrorHandling from "./imaTransferErrorHandling.mjs";

import * as childProcessModule from "child_process";
import * as path from "path";
import * as url from "url";

const __dirname = path.dirname( url.fileURLToPath( import.meta.url ) );

export function createProgressiveEventsScanPlan( details, nLatestBlockNumber ) {
// assume Main Net mines 6 blocks per minute
const blocksInOneMinute = 6;
Expand Down Expand Up @@ -86,9 +92,87 @@ export function createProgressiveEventsScanPlan( details, nLatestBlockNumber ) {
return arrProgressiveEventsScanPlan;
}

export function extractEventArg( arg ) {
if( arg && typeof arg == "object" && "type" in arg && typeof arg.type == "string" &&
arg.type == "BigNumber" && "hex" in arg && typeof arg.hex == "string" )
return owaspUtils.toBN( arg.hex );
return arg;
}

function generateWhileTransferringLogMessageSuffix( optsChainPair ) {
if( ! optsChainPair )
return "";
if( ! optsChainPair.strDirection )
return "";
if( optsChainPair.strDirection == "S2S" ) {
return cc.debug( " (while performing " ) + cc.attention( optsChainPair.strDirection ) +
cc.debug( " transfer with external S-Chain " ) +
cc.info( optsChainPair.optsSpecificS2S.joSChain.data.name ) + cc.debug( " / " ) +
cc.notice( optsChainPair.optsSpecificS2S.joSChain.data.computed.chainId ) +
cc.debug( " node " ) + cc.info( optsChainPair.optsSpecificS2S.idxNode ) +
cc.debug( ")" );
}
return cc.debug( " (while performing " ) + cc.attention( optsChainPair.strDirection ) +
cc.debug( " transfer)" );
}

export async function safeGetPastEventsProgressiveExternal(
details, strLogPrefix, ethersProvider, attempts,
joContract, joABI, strEventName,
nBlockFrom, nBlockTo, joFilter, optsChainPair
) {
if( joABI && typeof joABI == "object" ) {
const escapeShell = function( cmd ) {
return "\"" + cmd.replace( /(["'$`\\])/g,"\\$1" ) + "\"";
};
const joArg = {
"url": owaspUtils.ethersProviderToUrl( ethersProvider ),
"attempts": attempts,
"strEventName": strEventName,
"nBlockFrom": nBlockFrom,
"nBlockTo": nBlockTo,
"joFilter": joFilter,
"address": joContract.address,
"abi": joABI
};
const cmd = "node " + path.join( __dirname, "imaExternalLogScan.mjs" ) + " " +
escapeShell( JSON.stringify( joArg ) );
if( log.verboseGet() >= log.verboseReversed().trace ) {
details.write( strLogPrefix +
cc.debug( "Will run external command to search logs for event " ) +
cc.j( strEventName ) + cc.debug( " via URL " ) + cc.u( joArg.url ) +
generateWhileTransferringLogMessageSuffix( optsChainPair ) +
cc.debug( "..." ) + "\n" );
}
const res = childProcessModule.execSync( cmd );
if( "error" in res && res.error ) {
if( log.verboseGet() >= log.verboseReversed().error ) {
details.write( strLogPrefix +
cc.error( "Got error from external command to search logs for event " ) +
cc.j( strEventName ) + cc.error( " via URL " ) + cc.u( joArg.url ) +
generateWhileTransferringLogMessageSuffix( optsChainPair ) +
cc.error( ":" ) + cc.warning( owaspUtils.extractErrorMessage( err ) ) + "\n" );
}
throw new Error( res.error );
}
if( log.verboseGet() >= log.verboseReversed().trace ) {
details.write( strLogPrefix +
cc.debug( "Done running external command to search logs for event " ) +
cc.j( strEventName ) + cc.debug( " via URL " ) + cc.u( joArg.url ) +
generateWhileTransferringLogMessageSuffix( optsChainPair ) +
cc.debug( "." ) + "\n" );
}
return JSON.parse( res ).result;
}
return await safeGetPastEventsProgressive(
details, strLogPrefix, ethersProvider, attempts,
joContract, strEventName,
nBlockFrom, nBlockTo, joFilter );
}

export async function safeGetPastEventsProgressive(
details, strLogPrefix,
ethersProvider, attempts, joContract, strEventName,
details, strLogPrefix, ethersProvider, attempts,
joContract, strEventName,
nBlockFrom, nBlockTo, joFilter
) {
if( ! imaTransferErrorHandling.getEnabledProgressiveEventsScan() ) {
Expand Down
88 changes: 88 additions & 0 deletions npms/skale-ima/imaExternalLogScan.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// SPDX-License-Identifier: AGPL-3.0-only

/**
* @license
* SKALE IMA
*
* SKALE IMA is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* SKALE IMA is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with SKALE IMA. If not, see <https://www.gnu.org/licenses/>.
*/

/**
* @file imaExternalLogScan.mjs
* @copyright SKALE Labs 2019-Present
*/

import * as cc from "../skale-cc/cc.mjs";
import * as log from "../skale-log/log.mjs";
import * as owaspUtils from "../skale-owasp/owaspUtils.mjs";
import * as imaEventLogScan from "./imaEventLogScan.mjs";

const gIsDebugLogging = false; // development option only, must be always false
cc.enable( false );
log.addStdout();

// allow self-signed wss and https
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0;

function finalizeOutput( jo ) {
if( ! jo )
return;
cc.enable( false );
process.stdout.write( cc.j( jo ) );
}

async function run() {
const details = log.createMemoryStream();
try {
if( gIsDebugLogging ) {
log.write( cc.debug( "Process startup arguments array is " ) +
cc.j( process.argv ) + "\n" );
}
if( process.argv.length != 3 )
throw new Error( "Wrong number of command line arguments" );

if( gIsDebugLogging ) {
log.write( cc.debug( "Main argument text is " ) +
cc.j( process.argv[2] ) + "\n" );
}
const joArg = JSON.parse( process.argv[2] );
if( gIsDebugLogging ) {
log.write( cc.debug( "Main argument JSON is " ) +
cc.j( joArg ) + "\n" );
}

const ethersProvider = owaspUtils.getEthersProviderFromURL( joArg.url );
const joContract = new owaspUtils.ethersMod.ethers.Contract(
joArg.address, joArg.abi, ethersProvider );

const arrLogRecordReferencesWalk = await imaEventLogScan.safeGetPastEventsProgressive(
details, "", ethersProvider, joArg.attempts,
joContract, joArg.strEventName,
joArg.nBlockFrom, joArg.nBlockTo, joArg.joFilter );

finalizeOutput( { "result": arrLogRecordReferencesWalk, "error": null } );
process.exit( 0 );
} catch ( err ) {
if( gIsDebugLogging ) {
log.write( cc.error( "Failed to create RPC call: " ) +
cc.j( err ) + "\n" );
}
finalizeOutput( {
"error": owaspUtils.extractErrorMessage( err ),
"output": details.toString()
} );
process.exit( 1 );
}
}
run();
Loading

0 comments on commit f43cc3f

Please sign in to comment.