Skip to content

Commit

Permalink
Reaper 2.0 (#508)
Browse files Browse the repository at this point in the history
* Work in progress

* Add unit tests

* More tests

* Pull expiry status from validation

* Set cron to max timeout

* Add comments

* Fix lint

* Lint fix
  • Loading branch information
codyborn authored Jan 27, 2025
1 parent 65613ff commit 1d605f5
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 147 deletions.
2 changes: 1 addition & 1 deletion bin/stacks/cron-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class CronStack extends cdk.NestedStack {
runtime: cdk.aws_lambda.Runtime.NODEJS_18_X,
entry: path.join(__dirname, '../../lib/crons/gs-reaper.ts'),
handler: 'handler',
timeout: cdk.Duration.minutes(1),
timeout: cdk.Duration.minutes(15),
memorySize: 512,
bundling: {
minify: true,
Expand Down
201 changes: 175 additions & 26 deletions lib/crons/gs-reaper.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
import { EventBridgeEvent, ScheduledHandler } from 'aws-lambda'
import { DynamoDB } from 'aws-sdk'
import { default as bunyan, default as Logger } from 'bunyan'

import { metricScope, MetricsLogger, Unit } from 'aws-embedded-metrics'
import { ORDER_STATUS, UniswapXOrderEntity } from '../entities'
import { BaseOrdersRepository } from '../repositories/base'
import { ORDER_STATUS, SettledAmount, UniswapXOrderEntity } from '../entities'
import { BaseOrdersRepository, QueryResult } from '../repositories/base'
import { DutchOrdersRepository } from '../repositories/dutch-orders-repository'
import { DYNAMO_BATCH_WRITE_MAX, ONE_HOUR_IN_SECONDS } from '../util/constants'
import { BLOCK_RANGE, CRON_MAX_ATTEMPTS, DYNAMO_BATCH_WRITE_MAX, OLDEST_BLOCK_BY_CHAIN } from '../util/constants'
import { ethers } from 'ethers'
import { CosignedPriorityOrder, CosignedV2DutchOrder, CosignedV3DutchOrder, DutchOrder, OrderType, OrderValidation, OrderValidator, REACTOR_ADDRESS_MAPPING, UniswapXEventWatcher, UniswapXOrder } from '@uniswap/uniswapx-sdk'
import { parseOrder } from '../handlers/OrderParser'
import { getSettledAmounts } from '../handlers/check-order-status/util'
import { ChainId } from '../util/chain'

export const handler: ScheduledHandler = metricScope((metrics) => async (_event: EventBridgeEvent<string, void>) => {

Check warning on line 15 in lib/crons/gs-reaper.ts

View workflow job for this annotation

GitHub Actions / lint-and-test

'_event' is defined but never used
await main(metrics)
})

/**
* The Reaper is a cron job that runs daily and checks for any orphaned orders
* that have been filled, cancelled or expired
* @param metrics - The metrics logger
*/
async function main(metrics: MetricsLogger) {
metrics.setNamespace('Uniswap')
metrics.setDimensions({ Service: 'UniswapXServiceCron' })
Expand All @@ -21,37 +30,177 @@ async function main(metrics: MetricsLogger) {
level: 'info',
})
const repo = DutchOrdersRepository.create(new DynamoDB.DocumentClient())
await deleteStaleOrders(repo, log, metrics)
const providers = new Map<ChainId, ethers.providers.StaticJsonRpcProvider>()
for (const chainIdKey of Object.keys(OLDEST_BLOCK_BY_CHAIN)) {
const chainId = Number(chainIdKey) as keyof typeof OLDEST_BLOCK_BY_CHAIN
const rpcURL = process.env[`RPC_${chainId}`]
const provider = new ethers.providers.StaticJsonRpcProvider(rpcURL, chainId)
providers.set(chainId, provider)
}
await cleanupOrphanedOrders(repo, providers, log, metrics)
}

type OrderUpdate = {
status: ORDER_STATUS,
txHash?: string,
fillBlock?: number,
settledAmounts?: SettledAmount[]
}

export async function deleteStaleOrders(
export async function cleanupOrphanedOrders(
repo: BaseOrdersRepository<UniswapXOrderEntity>,
providers: Map<ChainId, ethers.providers.StaticJsonRpcProvider>,
log: Logger,
metrics?: MetricsLogger
): Promise<void> {
let openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, DYNAMO_BATCH_WRITE_MAX)
for (;;) {
// get orderHashes with deadlines more than 1 hour ago and are still 'open'
const staleOrders = openOrders.orders.flatMap((order) => {
if (order.deadline < Date.now() / 1000 - ONE_HOUR_IN_SECONDS) {
return order.orderHash

for (const chainIdKey of Object.keys(OLDEST_BLOCK_BY_CHAIN)) {
const chainId = Number(chainIdKey) as keyof typeof OLDEST_BLOCK_BY_CHAIN
const provider = providers.get(chainId)
if (!provider) {
log.error(`No provider found for chainId ${chainId}`)
continue
}

// get a map of all open orders from the database
const parsedOrders = await getParsedOrders(repo, chainId)
const orderUpdates = new Map<string, OrderUpdate>()

// Look through events to find if any of the orders have been filled
for (const orderType of Object.keys(REACTOR_ADDRESS_MAPPING[chainId])){
const reactorAddress = REACTOR_ADDRESS_MAPPING[chainId][orderType as OrderType]
if (!reactorAddress) continue
const watcher = new UniswapXEventWatcher(provider, reactorAddress)
const lastProcessedBlock = await provider.getBlockNumber()
let recentErrors = 0
const earliestBlock = OLDEST_BLOCK_BY_CHAIN[chainId]
// TODO: Lookback 1.2 days
// const msPerDay = 1000 * 60 * 60 * 24 * 1.2
// const blocksPerDay = msPerDay / BLOCK_TIME_MS_BY_CHAIN[chainId]
// const earliestBlock = lastProcessedBlock - blocksPerDay

for (let i = lastProcessedBlock; i > earliestBlock; i -= BLOCK_RANGE) {
let attempts = 0
while (attempts < CRON_MAX_ATTEMPTS) {
try {
log.info(`Getting fill events for blocks ${i - BLOCK_RANGE} to ${i}`)
const fillEvents = await watcher.getFillEvents(i - BLOCK_RANGE, i)
recentErrors = Math.max(0, recentErrors - 1)
await Promise.all(fillEvents.map(async (e) => {
if (parsedOrders.has(e.orderHash)) {
log.info(`Fill event found for order ${e.orderHash}`)
// Only get fill info when we know there's a matching event in this
// range due to additional RPC calls that are required for fill info
const fillInfo = await watcher.getFillInfo(i - BLOCK_RANGE, i)
const fillEvent = fillInfo.find((f) => f.orderHash === e.orderHash)
if (fillEvent) {
const [tx, block] = await Promise.all([
provider.getTransaction(fillEvent.txHash),
provider.getBlock(fillEvent.blockNumber),
])
const settledAmounts = getSettledAmounts(
fillEvent,
{
timestamp: block.timestamp,
gasPrice: tx.gasPrice,
maxPriorityFeePerGas: tx.maxPriorityFeePerGas,
maxFeePerGas: tx.maxFeePerGas,
},
parsedOrders.get(e.orderHash)?.order as DutchOrder | CosignedV2DutchOrder | CosignedV3DutchOrder | CosignedPriorityOrder
)
orderUpdates.set(e.orderHash, {
status: ORDER_STATUS.FILLED,
txHash: fillEvent.txHash,
fillBlock: fillEvent.blockNumber,
settledAmounts: settledAmounts,
})
}
else {
orderUpdates.set(e.orderHash, {
status: ORDER_STATUS.FILLED,
})
}
}
}))

break // Success - exit the retry loop
} catch (error) {
attempts++
recentErrors++
console.log(`Failed to get fill events for blocks ${i - BLOCK_RANGE} to ${i}, error: ${error}`)
log.error({ error }, `Failed to get fill events for blocks ${i - BLOCK_RANGE} to ${i}`)
if (attempts === CRON_MAX_ATTEMPTS) {
log.error({ error }, `Failed to get fill events after ${attempts} attempts for blocks ${i - BLOCK_RANGE} to ${i}`)
metrics?.putMetric(`GetFillEventsError`, 1, Unit.Count)
break // Skip this range and continue with the next one
}
// Wait time is determined by the number of recent errors
await new Promise(resolve => setTimeout(resolve, 1000 * recentErrors))
}
}
}
return []
})
log.info({ staleOrders }, `Found ${staleOrders.length} stale orders`)
if (staleOrders.length > 0) {
try {
await repo.deleteOrders(staleOrders)
} catch (e) {
metrics?.putMetric('DeleteStaleOrdersError', 1, Unit.Count)
log.error({ error: e }, 'Failed to delete stale orders')
throw e
}

// Loop through unfilled orders and see if they were cancelled
const quoter = new OrderValidator(provider, chainId)
for (const orderHash of parsedOrders.keys()) {
if (!orderUpdates.has(orderHash)) {
const validation = await quoter.validate({
order: parsedOrders.get(orderHash)!.order,

Check warning on line 149 in lib/crons/gs-reaper.ts

View workflow job for this annotation

GitHub Actions / lint-and-test

Forbidden non-null assertion
signature: parsedOrders.get(orderHash)!.signature,

Check warning on line 150 in lib/crons/gs-reaper.ts

View workflow job for this annotation

GitHub Actions / lint-and-test

Forbidden non-null assertion
})
if (validation === OrderValidation.NonceUsed) {
orderUpdates.set(orderHash, {
status: ORDER_STATUS.CANCELLED,
})
}
if (validation === OrderValidation.Expired) {
orderUpdates.set(orderHash, {
status: ORDER_STATUS.EXPIRED,
})
}
}
}
if (openOrders.cursor) {
openOrders = await repo.getByOrderStatus(ORDER_STATUS.OPEN, DYNAMO_BATCH_WRITE_MAX, openOrders.cursor)
} else {
break

// Update the orders in the database
log.info(`Updating ${orderUpdates.size} incorrect orders`)
for (const [orderHash, orderUpdate] of orderUpdates) {
await repo.updateOrderStatus(
orderHash,
orderUpdate.status,
orderUpdate.txHash,
orderUpdate.fillBlock,
orderUpdate.settledAmounts
)

metrics?.putMetric(`UpdateOrderStatus_${orderUpdate.status}`, 1, Unit.Count)
}
log.info(`Update complete`)
}
}

/**
* Get all open orders from the database and parse them
* @param repo - The orders repository
* @param chainId - The chain ID
* @returns A map of order hashes to their parsed order data
*/
async function getParsedOrders(repo: BaseOrdersRepository<UniswapXOrderEntity>, chainId: ChainId) {

// Collect all open orders
let cursor: string | undefined = undefined
let allOrders: UniswapXOrderEntity[] = []
do {
const openOrders: QueryResult<UniswapXOrderEntity> = await repo.getOrders(DYNAMO_BATCH_WRITE_MAX, {
orderStatus: ORDER_STATUS.OPEN,
chainId: chainId,
cursor: cursor,
})
cursor = openOrders.cursor
allOrders = allOrders.concat(openOrders.orders)

} while (cursor)
const parsedOrders = new Map<string, {order: UniswapXOrder, signature: string, deadline: number}>()
allOrders.forEach((o) => parsedOrders.set(o.orderHash, {order: parseOrder(o, chainId), signature: o.signature, deadline: o.deadline}))
return parsedOrders
}
19 changes: 19 additions & 0 deletions lib/handlers/OrderParser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { CosignedPriorityOrder, CosignedV2DutchOrder, CosignedV3DutchOrder, DutchOrder, UniswapXOrder, OrderType } from '@uniswap/uniswapx-sdk'
import { ChainId } from '../util/chain'
import { UniswapXOrderEntity } from '../entities'

export function parseOrder(order: UniswapXOrderEntity, chainId: ChainId): UniswapXOrder {
switch (order.type) {
case OrderType.Dutch:
case OrderType.Limit:
return DutchOrder.parse(order.encodedOrder, chainId)
case OrderType.Dutch_V2:
return CosignedV2DutchOrder.parse(order.encodedOrder, chainId)
case OrderType.Dutch_V3:
return CosignedV3DutchOrder.parse(order.encodedOrder, chainId)
case OrderType.Priority:
return CosignedPriorityOrder.parse(order.encodedOrder, chainId)
default:
throw new Error(`Unsupported OrderType ${JSON.stringify(order)}, No Parser Configured`)
}
}
23 changes: 2 additions & 21 deletions lib/handlers/check-order-status/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
OrderValidation,
OrderValidator,
UniswapXEventWatcher,
UniswapXOrder,
} from '@uniswap/uniswapx-sdk'
import { ethers } from 'ethers'
import { ORDER_STATUS, RelayOrderEntity, SettledAmount, UniswapXOrderEntity } from '../../entities'
Expand All @@ -22,6 +21,7 @@ import { metrics } from '../../util/metrics'
import { SfnStateInputOutput } from '../base'
import { FillEventLogger } from './fill-event-logger'
import { getSettledAmounts, IS_TERMINAL_STATE } from './util'
import { parseOrder } from '../OrderParser'

const FILL_CHECK_OVERLAP_BLOCK = 20

Expand Down Expand Up @@ -66,7 +66,6 @@ export class CheckOrderStatusService {
orderQuoter,
orderWatcher,
orderStatus,
orderType,
}: CheckOrderStatusRequest): Promise<SfnStateInputOutput> {
const order: UniswapXOrderEntity = checkDefined(
await wrapWithTimerMetric<UniswapXOrderEntity | undefined>(
Expand All @@ -76,25 +75,7 @@ export class CheckOrderStatusService {
`cannot find order by hash when updating order status, hash: ${orderHash}`
)

let parsedOrder: UniswapXOrder
switch (orderType) {
case OrderType.Dutch:
case OrderType.Limit:
parsedOrder = DutchOrder.parse(order.encodedOrder, chainId)
break
case OrderType.Dutch_V2:
parsedOrder = CosignedV2DutchOrder.parse(order.encodedOrder, chainId)
break
case OrderType.Dutch_V3:
parsedOrder = CosignedV3DutchOrder.parse(order.encodedOrder, chainId)
break
case OrderType.Priority:
parsedOrder = CosignedPriorityOrder.parse(order.encodedOrder, chainId)
break
default:
throw new Error(`Unsupported OrderType ${orderType}, No Parser Configured`)
}

const parsedOrder = parseOrder(order, chainId)
const validation = await wrapWithTimerMetric(
orderQuoter.validate({
order: parsedOrder,
Expand Down
17 changes: 16 additions & 1 deletion lib/util/constants.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
import { ChainId } from "./chain"

export const WEBHOOK_CONFIG_BUCKET = 'order-webhook-notification-config'
export const PRODUCTION_WEBHOOK_CONFIG_KEY = 'production.json'
export const BETA_WEBHOOK_CONFIG_KEY = 'beta.json'
export const NATIVE_ADDRESS = '0x0000000000000000000000000000000000000000'
export const ONE_HOUR_IN_SECONDS = 60 * 60
export const ONE_DAY_IN_SECONDS = 60 * 60 * 24
export const ONE_YEAR_IN_SECONDS = 60 * 60 * 24 * 365

export const OLDEST_BLOCK_BY_CHAIN = {
[ChainId.MAINNET]: 20120259,
[ChainId.ARBITRUM_ONE]: 253597707,
[ChainId.BASE]: 22335646,
[ChainId.UNICHAIN]: 6747397,
}
export const BLOCK_TIME_MS_BY_CHAIN = {
[ChainId.MAINNET]: 12000,
[ChainId.ARBITRUM_ONE]: 250,
[ChainId.BASE]: 2000,
[ChainId.UNICHAIN]: 1000,
}
export const BLOCK_RANGE = 10000
export const CRON_MAX_ATTEMPTS = 10
//Dynamo limits batch write to 25
export const DYNAMO_BATCH_WRITE_MAX = 25
Loading

0 comments on commit 1d605f5

Please sign in to comment.