Skip to content

Commit 5e4da90

Browse files
committed
make logs JSON parseable and searchable
1 parent cf3dfb6 commit 5e4da90

14 files changed

+74
-55
lines changed

packages/event-pipeline/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
"graphql-request": "^3.1.0",
4646
"node-cron": "^2.0.3",
4747
"pg": "^7.5.0",
48+
"pino": "^6.11.1",
4849
"reflect-metadata": "^0.1.13",
4950
"typeorm": "^0.2.20",
5051
"typescript": "3.1.1",

packages/event-pipeline/src/data_sources/events/utils.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logUtils } from '@0x/utils';
1+
import { logger } from '../../utils/logger';
22
import { DecodedLogArgs, LogWithDecodedArgs } from 'ethereum-types';
33

44
const NUM_RETRIES = 1; // Number of retries if a request fails or times out.
@@ -28,7 +28,7 @@ export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogAr
2828
for (let fromBlock = startBlock; fromBlock <= endBlock; fromBlock += numPaginationBlocks(fromBlock)) {
2929
const toBlock = Math.min(fromBlock + numPaginationBlocks(fromBlock) - 1, endBlock);
3030

31-
logUtils.log(`Query for events in block range ${fromBlock}-${toBlock}`);
31+
logger.child({ fromBlock, toBlock }).info(`Query for events in block range ${fromBlock}-${toBlock}`);
3232

3333
const eventsInRange = await _getEventsWithRetriesAsync(getEventsAsync, NUM_RETRIES, fromBlock, toBlock);
3434
if (eventsInRange === null) {
@@ -38,7 +38,9 @@ export async function getEventsWithPaginationAsync<ArgsType extends DecodedLogAr
3838
}
3939
}
4040

41-
logUtils.log(`Retrieved ${events.length} events from block range ${startBlock}-${endBlock}`);
41+
logger
42+
.child({ count: events.length, startBlock, endBlock })
43+
.info(`Retrieved ${events.length} events from block range ${startBlock}-${endBlock}`);
4244
return events;
4345
}
4446

@@ -58,14 +60,14 @@ export async function _getEventsWithRetriesAsync<ArgsType extends DecodedLogArgs
5860
): Promise<Array<LogWithDecodedArgs<ArgsType>> | null> {
5961
let eventsInRange: Array<LogWithDecodedArgs<ArgsType>> = [];
6062
for (let i = 0; i <= numRetries; i++) {
61-
logUtils.log(`Retry ${i}: ${fromBlock}-${toBlock}`);
63+
logger.child({ retry: i, fromBlock, toBlock }).info(`Retry ${i}: ${fromBlock}-${toBlock}`);
6264
try {
6365
eventsInRange = await getEventsAsync(fromBlock, toBlock);
6466
} catch (err) {
6567
if (isErrorRetryable(err) && i < numRetries) {
6668
continue;
6769
} else {
68-
logUtils.log(err);
70+
logger.error(err);
6971
return null;
7072
}
7173
}

packages/event-pipeline/src/data_sources/web3/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Web3ProviderEngine } from '@0x/subproviders';
2-
import { logUtils } from '@0x/utils';
2+
import { logger } from '../../utils/logger';
33
import { Web3Wrapper } from '@0x/web3-wrapper';
44
import { BlockWithoutTransactionData, Transaction, BlockWithTransactionData, RawLog } from 'ethereum-types';
55

@@ -122,7 +122,7 @@ export class Web3Source {
122122

123123
public async getBlockInfoAsync(blockNumber: number): Promise<BlockWithoutTransactionData> {
124124
try {
125-
logUtils.log(`Fetching block ${blockNumber}`);
125+
logger.child({ blockNumber }).info(`Fetching block ${blockNumber}`);
126126

127127
const block = await this._web3Wrapper.getBlockIfExistsAsync(blockNumber);
128128

packages/event-pipeline/src/index.ts

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ config({ path: resolve(__dirname, '../../.env') });
66

77
import { ConnectionOptions, createConnection } from 'typeorm';
88
import * as ormConfig from './ormconfig';
9-
import { SECONDS_BETWEEN_RUNS } from './config';
109

1110
import { EventScraper } from './scripts/pull_and_save_events';
1211
import { DeploymentScraper } from './scripts/pull_and_save_deployment';

packages/event-pipeline/src/scripts/pull_and_save_deployment.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { web3Factory } from '@0x/dev-utils';
2-
import { logUtils } from '@0x/utils';
2+
import { logger } from '../utils/logger';
33
import { Connection } from 'typeorm';
44
import { Web3Source } from '../data_sources/web3';
55

@@ -14,7 +14,7 @@ const web3Source = new Web3Source(provider, ETHEREUM_RPC_URL);
1414

1515
export class DeploymentScraper {
1616
public async getParseSaveStakingProxyContractDeployment(connection: Connection): Promise<void> {
17-
logUtils.log(`pulling deployment info for transaction ${STAKING_PROXY_DEPLOYMENT_TRANSACTION}`);
17+
logger.info(`pulling deployment info for transaction ${STAKING_PROXY_DEPLOYMENT_TRANSACTION}`);
1818
const deploymentTxInfo = await web3Source.getTransactionInfoAsync(STAKING_PROXY_DEPLOYMENT_TRANSACTION);
1919
const deploymentBlockInfo = await web3Source.getBlockInfoAsync(Number(deploymentTxInfo.blockNumber));
2020

@@ -27,7 +27,7 @@ export class DeploymentScraper {
2727
stakingProxyDeployment.blockNumber = Number(deploymentBlockInfo.number);
2828
stakingProxyDeployment.blockTimestamp = Number(deploymentBlockInfo.timestamp);
2929

30-
logUtils.log(`finished pulling staking proxy deployment info`);
30+
logger.info(`finished pulling staking proxy deployment info`);
3131

3232
await this._deleteAndSaveDeploymentAsync(connection, stakingProxyDeployment);
3333
}
@@ -49,7 +49,7 @@ export class DeploymentScraper {
4949
// commit transaction now:
5050
await queryRunner.commitTransaction();
5151
} catch (err) {
52-
logUtils.log(err);
52+
logger.error(err);
5353
// since we have errors lets rollback changes we made
5454
await queryRunner.rollbackTransaction();
5555
} finally {

packages/event-pipeline/src/scripts/pull_and_save_events.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { web3Factory } from '@0x/dev-utils';
22
import { Web3ProviderEngine } from '@0x/subproviders';
3-
import { logUtils } from '@0x/utils';
3+
import { logger } from '../utils/logger';
44
import { Web3Wrapper } from '@0x/web3-wrapper';
55
import 'reflect-metadata';
66
import { Connection } from 'typeorm';
@@ -85,12 +85,12 @@ async function dummyAsync(): Promise<void> {}
8585
export class EventScraper {
8686
public async getParseSaveEventsAsync(connection: Connection): Promise<void> {
8787
const startTime = new Date().getTime();
88-
logUtils.log(`pulling events`);
88+
logger.info(`pulling events`);
8989
const latestBlockWithOffset = await calculateEndBlockAsync(provider);
9090
const latestBlockTimestampWithOffset = await (await web3Source.getBlockInfoAsync(latestBlockWithOffset))
9191
.timestamp;
9292

93-
logUtils.log(`latest block with offset: ${latestBlockWithOffset}`);
93+
logger.child({ latestBlockWithOffset }).info(`latest block with offset: ${latestBlockWithOffset}`);
9494

9595
await Promise.all([
9696
pullAndSaveTheGraphEvents.getParseSaveUniswapSwapsAsync(
@@ -260,8 +260,10 @@ export class EventScraper {
260260
]);
261261

262262
const endTime = new Date().getTime();
263-
logUtils.log(`finished pulling events and blocks`);
264-
logUtils.log(`It took ${(endTime - startTime) / 1000} seconds to complete`);
263+
const scriptDurationSeconds = (endTime - startTime) / 1000;
264+
logger
265+
.child({ scriptDurationSeconds, scriptName: `pull_and_save_events` })
266+
.info(`Finished pulling events and blocks, it took ${scriptDurationSeconds} seconds to complete`);
265267
}
266268
}
267269

packages/event-pipeline/src/scripts/pull_and_save_events_by_topic.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { web3Factory } from '@0x/dev-utils';
2-
import { logUtils } from '@0x/utils';
2+
import { logger } from '../utils/logger';
33
import { Connection } from 'typeorm';
44
import { Web3Source } from '../data_sources/web3';
55
import { calculateEndBlockAsync } from './utils/shared_utils';
@@ -59,10 +59,10 @@ const pullAndSaveEventsByTopic = new PullAndSaveEventsByTopic();
5959
export class EventsByTopicScraper {
6060
public async getParseSaveEventsAsync(connection: Connection): Promise<void> {
6161
const startTime = new Date().getTime();
62-
logUtils.log(`pulling events`);
62+
logger.info(`pulling events`);
6363
const latestBlockWithOffset = await calculateEndBlockAsync(web3Source);
6464

65-
logUtils.log(`latest block with offset: ${latestBlockWithOffset}`);
65+
logger.child({ latestBlockWithOffset }).info(`latest block with offset: ${latestBlockWithOffset}`);
6666

6767
await Promise.all([
6868
pullAndSaveEventsByTopic.getParseSaveEventsByTopic<TransformedERC20Event>(
@@ -188,7 +188,9 @@ export class EventsByTopicScraper {
188188
]);
189189

190190
const endTime = new Date().getTime();
191-
logUtils.log(`finished pulling events by topic`);
192-
logUtils.log(`It took ${(endTime - startTime) / 1000} seconds to complete`);
191+
const scriptDurationSeconds = (endTime - startTime) / 1000;
192+
logger
193+
.child({ scriptDurationSeconds, scriptName: `pull_and_save_events_by_topic` })
194+
.info(`Finished pulling events by topic, it took ${scriptDurationSeconds} seconds to complete`);
193195
}
194196
}

packages/event-pipeline/src/scripts/pull_and_save_pool_metadata.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logUtils } from '@0x/utils';
1+
import { logger } from '../utils/logger';
22
import { Connection } from 'typeorm';
33

44
import { StakingPoolRegistrySource } from '../data_sources/staking-pool-registry';
@@ -14,7 +14,7 @@ const stakingPoolSource = new StakingPoolRegistrySource(stakingPoolsUrl, poolMet
1414

1515
export class MetadataScraper {
1616
public async getParseSaveMetadataAsync(connection: Connection): Promise<void> {
17-
logUtils.log(`pulling metadata`);
17+
logger.info(`pulling metadata`);
1818

1919
const stakingPools = await stakingPoolSource.getStakingPoolsAsync();
2020
const poolMetadata = await stakingPoolSource.getStakingPoolMetadata();
@@ -23,8 +23,8 @@ export class MetadataScraper {
2323

2424
const repostiory = connection.getRepository(StakingPoolMetadata);
2525

26-
logUtils.log('Saving metadata');
26+
logger.info('Saving metadata');
2727
await repostiory.save(parsedPools);
28-
logUtils.log(`finished updating metadata`);
28+
logger.info(`finished updating metadata`);
2929
}
3030
}

packages/event-pipeline/src/scripts/utils/event_abi_utils.ts

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logUtils } from '@0x/utils';
1+
import { logger } from '../../utils/logger';
22
import { Connection } from 'typeorm';
33

44
import { RawLogEntry } from 'ethereum-types';
@@ -36,7 +36,9 @@ export class PullAndSaveEventsByTopic {
3636
);
3737
const endBlock = Math.min(latestBlockWithOffset, startBlock + (MAX_BLOCKS_TO_SEARCH - 1));
3838

39-
logUtils.log(`Searching for ${eventName} between blocks ${startBlock} and ${endBlock}`);
39+
logger
40+
.child({ eventName, startBlock, endBlock })
41+
.info(`Searching for ${eventName} between blocks ${startBlock}-${endBlock}`);
4042

4143
// assert(topics.length === 1);
4244

@@ -53,7 +55,9 @@ export class PullAndSaveEventsByTopic {
5355
rawLogsArray.map(async rawLogs => {
5456
const parsedLogs = rawLogs.logs.map((encodedLog: RawLogEntry) => parser(encodedLog));
5557

56-
logUtils.log(`Saving ${parsedLogs.length} ${eventName} events`);
58+
logger
59+
.child({ numLogs: parsedLogs.length, eventName })
60+
.info(`Saving ${parsedLogs.length} ${eventName} events`);
5761

5862
await this._deleteOverlapAndSaveAsync<EVENT>(
5963
connection,
@@ -86,7 +90,7 @@ export class PullAndSaveEventsByTopic {
8690
`SELECT last_processed_block_number FROM events.last_block_processed WHERE event_name = '${eventName}'`,
8791
);
8892

89-
logUtils.log(queryResult);
93+
logger.child({ ...queryResult, eventName }).info(`Last processed block number for ${eventName}`);
9094
const lastKnownBlock = queryResult[0] || { last_processed_block_number: defaultStartBlock };
9195

9296
return Math.min(
@@ -139,7 +143,7 @@ export class PullAndSaveEventsByTopic {
139143
// commit transaction now:
140144
await queryRunner.commitTransaction();
141145
} catch (err) {
142-
logUtils.log(err);
146+
logger.error(err);
143147
// since we have errors lets rollback changes we made
144148
await queryRunner.rollbackTransaction();
145149
} finally {

packages/event-pipeline/src/scripts/utils/event_utils.ts

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logUtils } from '@0x/utils';
1+
import { logger } from '../../utils/logger';
22
import { Connection } from 'typeorm';
33

44
import { FIRST_SEARCH_BLOCK, MAX_BLOCKS_TO_SEARCH, START_BLOCK_OFFSET } from '../../config';
@@ -17,16 +17,18 @@ export class PullAndSaveEvents {
1717
const startBlock = await this._getStartBlockAsync(eventName, connection, latestBlockWithOffset);
1818
const endBlock = Math.min(latestBlockWithOffset, startBlock + (MAX_BLOCKS_TO_SEARCH - 1));
1919

20-
logUtils.log(`Searching for ${eventName} between blocks ${startBlock} and ${endBlock}`);
20+
logger.child({ eventName, startBlock, endBlock }).info(`Searching for events`);
2121
const eventLogs = await getterFunction(startBlock, endBlock);
2222

2323
if (eventLogs === null) {
24-
logUtils.log(`Encountered an error searching for ${eventName} events. Waiting until next iteration.`);
24+
logger
25+
.child({ eventName })
26+
.info(`Encountered an error searching for events. Waiting until next iteration.`);
2527
} else {
2628
const parsedEventLogs = eventLogs.map(log => parser(log));
2729
const lastBlockProcessed: LastBlockProcessed = await this._lastBlockProcessedAsync(eventName, endBlock);
2830

29-
logUtils.log(`saving ${parsedEventLogs.length} ${eventName} events`);
31+
logger.child({ count: parsedEventLogs.length, eventName }).info(`Saving events`);
3032

3133
await this._deleteOverlapAndSaveAsync<EVENT>(
3234
connection,
@@ -56,7 +58,7 @@ export class PullAndSaveEvents {
5658
`SELECT last_processed_block_number FROM events.last_block_processed WHERE event_name = '${eventName}'`,
5759
);
5860

59-
logUtils.log(queryResult);
61+
logger.child({ ...queryResult, eventName }).info(`Last processed block number for ${eventName}`);
6062
const lastKnownBlock = queryResult[0] || { last_processed_block_number: FIRST_SEARCH_BLOCK };
6163

6264
return Math.min(
@@ -89,7 +91,7 @@ export class PullAndSaveEvents {
8991
// commit transaction now:
9092
await queryRunner.commitTransaction();
9193
} catch (err) {
92-
logUtils.log(err);
94+
logger.error(err);
9395
// since we have errors lets rollback changes we made
9496
await queryRunner.rollbackTransaction();
9597
} finally {

packages/event-pipeline/src/scripts/utils/thegraph_utils.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logUtils } from '@0x/utils';
1+
import { logger } from '../../utils/logger';
22
import { Connection } from 'typeorm';
33
import { ERC20BridgeTransferEvent, LastBlockProcessed } from '../../entities';
44

@@ -19,7 +19,7 @@ export class PullAndSaveTheGraphEvents {
1919
): Promise<void> {
2020
const startTime = await this._getStartTimestampAsync(connection, latestBlockTimestampWithOffset, protocol);
2121
const endTime = Math.min(latestBlockTimestampWithOffset, startTime + MAX_TIME_TO_SEARCH);
22-
logUtils.log(`Grabbing swap events between ${startTime} and ${endTime}`);
22+
logger.child({ startTime, endTime }).info(`Grabbing swap events`);
2323
const rawSwaps = await uniswapV2Source.getSwapEventsAsync(startTime, endTime, endpoint, 100);
2424
const parsedSwaps = rawSwaps.map(rawSwap => parseUniswapSushiswapEvents(rawSwap, protocol));
2525

@@ -29,7 +29,7 @@ export class PullAndSaveTheGraphEvents {
2929
return acc;
3030
}, 99999999999999);
3131

32-
logUtils.log(`saving ${parsedSwaps.length} external swap events`);
32+
logger.child({ count: parsedSwaps.length }).info(`saving external swap events`);
3333

3434
const lastBlockProcessed = await this._lastBlockProcessedAsync(protocol, endTime);
3535

@@ -59,7 +59,7 @@ export class PullAndSaveTheGraphEvents {
5959
`SELECT last_processed_block_timestamp FROM events.last_block_processed WHERE event_name = '${eventName}'`,
6060
);
6161

62-
logUtils.log(queryResult);
62+
logger.child({ ...queryResult, eventName }).info(`Last processed block timestamp for ${eventName}`);
6363
const lastKnownBlock = queryResult[0] || { last_processed_block_timestamp: START_DIRECT_UNISWAP_SEARCH };
6464

6565
return Math.min(
@@ -92,7 +92,7 @@ export class PullAndSaveTheGraphEvents {
9292
// commit transaction now:
9393
await queryRunner.commitTransaction();
9494
} catch (err) {
95-
logUtils.log(err);
95+
logger.error(err);
9696
// since we have errors lets rollback changes we made
9797
await queryRunner.rollbackTransaction();
9898
} finally {

0 commit comments

Comments
 (0)