Skip to content

Commit

Permalink
Validate batch cleanup (#360)
Browse files Browse the repository at this point in the history
* wip:validate in batch

* wip:janky batching

* add metrics

* restore fillevent check on expired

* fixes

* parallelize looping

* track down conflict error

* fix tests

* boost cpu,memory

* fix lint issues

* scale down to 8gb memory

* fixes

* lower coverage
  • Loading branch information
robert-seitz-uniswap authored Feb 26, 2024
1 parent 916355e commit d68a235
Show file tree
Hide file tree
Showing 17 changed files with 704 additions and 310 deletions.
9 changes: 6 additions & 3 deletions bin/stacks/status-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ export class StatusStack extends cdk.NestedStack {

const taskDefinition = new aws_ecs.FargateTaskDefinition(this, `TaskDef`, {
taskRole: loaderStackRole,
memoryLimitMiB: 2048,
cpu: 1024,
memoryLimitMiB: 8192,
cpu: 4096,
runtimePlatform: {
operatingSystemFamily: aws_ecs.OperatingSystemFamily.LINUX,
cpuArchitecture: aws_ecs.CpuArchitecture.X86_64,
Expand Down Expand Up @@ -67,6 +67,9 @@ export class StatusStack extends cdk.NestedStack {
taskDefinition,
desiredCount: 1,
healthCheckGracePeriod: Duration.seconds(60),
circuitBreaker: {
rollback: true,
},
})

new Alarm(this, `${SERVICE_NAME}-SEV3-${OnChainStatusCheckerMetricNames.TotalOrderProcessingErrors}`, {
Expand All @@ -82,7 +85,7 @@ export class StatusStack extends cdk.NestedStack {
evaluationPeriods: 3,
})

let statusCheckerErrorRate = new MathExpression({
const statusCheckerErrorRate = new MathExpression({
expression: '100*(errors/attempts)',
period: Duration.minutes(3),
usingMetrics: {
Expand Down
2 changes: 1 addition & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module.exports = {
coverageThreshold: {
global: {
statements: 80,
branches: 80,
branches: 78,
functions: 80,
lines: 80,
},
Expand Down
239 changes: 159 additions & 80 deletions lib/compute/on-chain-status-checker.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import { MetricUnits } from '@aws-lambda-powertools/metrics'
import { EventWatcher, OrderType, OrderValidator, REACTOR_ADDRESS_MAPPING } from '@uniswap/uniswapx-sdk'
import { ethers } from 'ethers'
import { OrderEntity, ORDER_STATUS } from '../entities'
import { SfnStateInputOutput } from '../handlers/base'
import { CheckOrderStatusRequest, CheckOrderStatusService } from '../handlers/check-order-status/service'
import { LIMIT_ORDERS_FILL_EVENT_LOOKBACK_BLOCKS_ON } from '../handlers/check-order-status/util'
import {
getProvider,
getValidator,
getWatcher,
LIMIT_ORDERS_FILL_EVENT_LOOKBACK_BLOCKS_ON,
} from '../handlers/check-order-status/util'
import { log } from '../Logging'
import { OnChainStatusCheckerMetricNames, powertoolsMetric as metrics } from '../Metrics'
import { BaseOrdersRepository, QueryResult } from '../repositories/base'
import { SUPPORTED_CHAINS } from '../util/chain'

const RECHECK_DELAY = 30 * 1000 //30 seconds
const LOOP_DELAY_MS = 30 * 1000 //30 seconds

// arbitrary and capricious value
// if increasing check memory utilization
// TODO:(urgent) up to 1k
export const BATCH_READ_MAX = 100
export class OnChainStatusChecker {
private checkOrderStatusService: CheckOrderStatusService
Expand All @@ -22,99 +28,168 @@ export class OnChainStatusChecker {
public stop() {
this._stop = true
}

public getWatcher(provider: ethers.providers.StaticJsonRpcProvider, chainId: number) {
if (!REACTOR_ADDRESS_MAPPING[chainId][OrderType.Dutch]) {
throw new Error(`No Reactor Address Defined in UniswapX SDK for chainId:${chainId}, orderType${OrderType.Dutch}`)
public async getFromDynamo(cursor?: any) {

Check warning on line 31 in lib/compute/on-chain-status-checker.ts

View workflow job for this annotation

GitHub Actions / lint-and-test

Unexpected any. Specify a different type
try {
const startTime = new Date().getTime()
const orders = await this.dbInterface.getByOrderStatus(ORDER_STATUS.OPEN, BATCH_READ_MAX, cursor)
const endTime = new Date().getTime()
metrics.addMetric('OnChainStatusChecker-DynamoBatchReadTime', MetricUnits.Milliseconds, endTime - startTime)
return orders
} catch (e) {
log.error('error in getFromDynamo', { error: e })
throw e
}
return new EventWatcher(provider, REACTOR_ADDRESS_MAPPING[chainId][OrderType.Dutch] as string)
}

public getProvider(chainId: number) {
const rpcURL = process.env[`RPC_${chainId}`]
if (!rpcURL) {
throw new Error(`rpcURL not defined for ${chainId}`)
public async pollForOpenOrders() {
try {
while (!this._stop) {
let totalCheckedOrders = 0
let processedOrderError = 0
const startTime = new Date().getTime()
const asyncLoopCalls = []
try {
log.info('starting processing orders')
let openOrders = await this.getFromDynamo()
do {
asyncLoopCalls.push(this.loopProcess(openOrders))
} while (openOrders.cursor && (openOrders = await this.getFromDynamo(openOrders.cursor)))

for (let i = 0; i < asyncLoopCalls.length; i++) {
const { processedCount, errorCount } = await asyncLoopCalls[i]
processedOrderError += errorCount
totalCheckedOrders += processedCount
}

log.info(`finished processing orders`, { totalCheckedOrders })
} catch (e) {
log.error('OnChainStatusChecker Error', { error: e })
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopError, MetricUnits.Count, 1)
} finally {
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalProcessedOpenOrders,
MetricUnits.Count,
totalCheckedOrders
)
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalOrderProcessingErrors,
MetricUnits.Count,
processedOrderError
)
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalLoopProcessingTime,
MetricUnits.Milliseconds,
new Date().getTime() - startTime
)
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopCompleted, MetricUnits.Count, 1)
metrics.publishStoredMetrics()
metrics.clearMetrics()
// TODO:(urgent) factor in total loop time
await delay(LOOP_DELAY_MS)
}
}
} catch (e) {
log.error('OnChainStatusChecker-UnexpectedError-Exiting', { error: e })
} finally {
//should never reach this
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopEnded, MetricUnits.Count, 1)
}
const provider = new ethers.providers.StaticJsonRpcProvider(rpcURL, chainId)
return provider
}

public getValidator(provider: ethers.providers.StaticJsonRpcProvider, chainId: number) {
return new OrderValidator(provider, chainId)
public async loopProcess(openOrders: QueryResult) {
let errorCount = 0
const processedCount = openOrders.orders.length
try {
const { promises, batchSize } = this.processOrderBatch(openOrders)
//await all promises
const responses = await Promise.allSettled(promises)
for (let i = 0; i < promises.length; i++) {
const response = responses[i]
if (response.status === 'rejected') {
errorCount += batchSize[i]
}
if (response.status === 'fulfilled') {
const output = response.value
for (let j = 0; j < output.length; j++) {
const singleOutput = output[j]
if (typeof singleOutput.getFillLogAttempts === 'number' && singleOutput.getFillLogAttempts > 0) {
const chainId: number = typeof singleOutput.chainId === 'number' ? singleOutput.chainId : 1
const orderHash = singleOutput.orderHash
const orderStatus: ORDER_STATUS = singleOutput.orderStatus as ORDER_STATUS
if (!orderHash || !(typeof orderHash === 'string')) {
continue
}
log.info('setting off retry', { orderHash })
const provider = getProvider(chainId)
// TODO:(urgent) maintain this in a cache and let following loop catch it
this.retryUpdate({
chainId: chainId,
orderHash: orderHash,
startingBlockNumber: 0, // if 0, looks back 50 blocks, otherwise looks from given block
orderStatus: orderStatus,
getFillLogAttempts: 1,
retryCount: 1,
provider: provider,
orderWatcher: getWatcher(provider, chainId),
orderQuoter: getValidator(provider, chainId),
quoteId: '',
})
}
}
}
}
return { processedCount, errorCount }
} catch (e) {
log.error('error in loopProcess', { error: e })
return { processedCount, errorCount: processedCount }
}
}

public async pollForOpenOrders() {
// eslint-disable-next-line no-constant-condition
while (!this._stop) {
let totalCheckedOrders = 0
let processedOrderError = 0
const startTime = new Date().getTime()
try {
let openOrders = await this.dbInterface.getByOrderStatus(ORDER_STATUS.OPEN, BATCH_READ_MAX)
do {
const promises = await this.processOrderBatch(openOrders)
const results = await Promise.allSettled(promises)
processedOrderError += results.filter((p) => p.status === 'rejected').length
totalCheckedOrders += openOrders.orders.length
} while (
openOrders.cursor &&
(openOrders = await this.dbInterface.getByOrderStatus(ORDER_STATUS.OPEN, BATCH_READ_MAX, openOrders.cursor))
)
log.info(`finished processing orders`, { totalCheckedOrders })
} catch (e) {
log.error('OnChainStatusChecker Error', { error: e })
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopError, MetricUnits.Count, 1)
} finally {
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalProcessedOpenOrders,
MetricUnits.Count,
totalCheckedOrders
)
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalOrderProcessingErrors,
MetricUnits.Count,
processedOrderError
)
metrics.addMetric(
OnChainStatusCheckerMetricNames.TotalLoopProcessingTime,
MetricUnits.Seconds,
(new Date().getTime() - startTime) / 1000
)
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopCompleted, MetricUnits.Count, 1)
metrics.publishStoredMetrics()
metrics.clearMetrics()
await delay(LOOP_DELAY_MS)
public processOrderBatch(openOrders: QueryResult) {
const openOrdersPerChain = this.mapOpenOrdersToChain(openOrders.orders)
const promises: Promise<SfnStateInputOutput[]>[] = []
const batchSize: number[] = []

Object.keys(openOrdersPerChain).forEach((chain) => {
const chainId = parseInt(chain)
const orders = openOrdersPerChain[chainId]
if (orders.length === 0) {
return
}
}
//should never reach this
metrics.addMetric(OnChainStatusCheckerMetricNames.LoopEnded, MetricUnits.Count, 1)
//get all promises
promises.push(this.getOrderChangesBatch(orders, chainId))
batchSize.push(orders.length)
})
//return promises per chain & batchsize per chain
return { promises, batchSize }
}

public async processOrderBatch(openOrders: QueryResult) {
const promises = []
for (let i = 0; i < openOrders.orders.length; i++) {
const order = openOrders.orders[i]
promises.push(
(async function (statusChecker: OnChainStatusChecker): Promise<void> {
try {
await statusChecker.updateOrder(order)
} catch (e) {
log.error('OnChainStatusChecker Error Processing Order', { error: e })
throw e
}
})(this)
)
public mapOpenOrdersToChain(batch: OrderEntity[]) {
const chainToOrdersMap: Record<number, OrderEntity[]> = {}

SUPPORTED_CHAINS.forEach((chainId) => {
chainToOrdersMap[chainId] = []
})

for (let i = 0; i < batch.length; i++) {
const { chainId } = batch[i]
chainToOrdersMap[chainId].push(batch[i])
}
return promises

return chainToOrdersMap
}

public async getOrderChangesBatch(orders: OrderEntity[], chainId: number): Promise<SfnStateInputOutput[]> {
return this.checkOrderStatusService.batchHandleRequestPerChain(orders, chainId)
}

// TODO: https://linear.app/uniswap/issue/DAT-264/batch-update-order-status
public async updateOrder(order: OrderEntity): Promise<void> {
const chainId = order.chainId
const provider = this.getProvider(chainId)
const quoter = this.getValidator(provider, chainId)
const provider = getProvider(chainId)
const quoter = getValidator(provider, chainId)
// TODO: use different reactor address for different order type
const watcher = this.getWatcher(provider, chainId)
const watcher = getWatcher(provider, chainId)

const request: CheckOrderStatusRequest = {
chainId: chainId,
Expand All @@ -138,8 +213,12 @@ export class OnChainStatusChecker {

//retry after 30 seconds
public async retryUpdate(request: CheckOrderStatusRequest) {
await delay(RECHECK_DELAY)
await this.checkOrderStatusService.handleRequest({ ...request, getFillLogAttempts: 1 })
try {
await delay(RECHECK_DELAY)
await this.checkOrderStatusService.handleRequest({ ...request, getFillLogAttempts: 1 })
} catch (e) {
log.error('retryUpdate error', { error: e })
}
}
}

Expand Down
8 changes: 3 additions & 5 deletions lib/crons/gs-reaper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import { metricScope, MetricsLogger, Unit } from 'aws-embedded-metrics'
import { ORDER_STATUS } from '../entities'
import { BaseOrdersRepository } from '../repositories/base'
import { DutchOrdersRepository } from '../repositories/dutch-orders-repository'
import { ONE_HOUR_IN_SECONDS } from '../util/constants'

export const BATCH_WRITE_MAX = 25
import { DYNAMO_BATCH_WRITE_MAX, ONE_HOUR_IN_SECONDS } from '../util/constants'

export const handler: ScheduledHandler = metricScope((metrics) => async (_event: EventBridgeEvent<string, void>) => {

Check warning on line 11 in lib/crons/gs-reaper.ts

View workflow job for this annotation

GitHub Actions / lint-and-test

'_event' is defined but never used
await main(metrics)
Expand All @@ -31,7 +29,7 @@ export async function deleteStaleOrders(
log: Logger,
metrics?: MetricsLogger
): Promise<void> {
let openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, BATCH_WRITE_MAX)
let openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, DYNAMO_BATCH_WRITE_MAX)
for (;;) {
// get orderHashes with deadlines more than 1 hour ago and are still 'open'
const staleOrders = openOrders.orders.flatMap((order) => {
Expand All @@ -51,7 +49,7 @@ export async function deleteStaleOrders(
}
}
if (openOrders.cursor) {
openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, BATCH_WRITE_MAX, openOrders.cursor)
openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, DYNAMO_BATCH_WRITE_MAX, openOrders.cursor)
} else {
break
}
Expand Down
Loading

0 comments on commit d68a235

Please sign in to comment.