Skip to content

Commit

Permalink
Use step functions for order status (#362)
Browse files Browse the repository at this point in the history
* use step functions for limit orders

* step function retry

* cleanup

* fix tests

* fix tests

* code review changes
  • Loading branch information
robert-seitz-uniswap authored Feb 27, 2024
1 parent d68a235 commit 8d83309
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 76 deletions.
2 changes: 2 additions & 0 deletions bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export class APIPipeline extends Stack {
FILL_EVENT_DESTINATION_ARN: resourceArnSecret.secretValueFromJson('FILL_EVENT_DESTINATION_ARN_BETA').toString(),
POSTED_ORDER_DESTINATION_ARN: resourceArnSecret.secretValueFromJson('POSTED_ORDER_DESTINATION_BETA').toString(),
THROTTLE_PER_FIVE_MINS: '3000',
REGION: 'us-east-2', //needed in checkOrderStatusHandler to kick off step function retries
},
tableCapacityConfig: {
order: { billingMode: cdk.aws_dynamodb.BillingMode.PAY_PER_REQUEST },
Expand All @@ -180,6 +181,7 @@ export class APIPipeline extends Stack {
FILL_EVENT_DESTINATION_ARN: resourceArnSecret.secretValueFromJson('FILL_EVENT_DESTINATION_ARN_PROD').toString(),
POSTED_ORDER_DESTINATION_ARN: resourceArnSecret.secretValueFromJson('POSTED_ORDER_DESTINATION_PROD').toString(),
THROTTLE_PER_FIVE_MINS: '3000',
REGION: 'us-east-2', //needed in checkOrderStatusHandler to kick off step function retries
},
tableCapacityConfig: PROD_TABLE_CAPACITY,
})
Expand Down
21 changes: 19 additions & 2 deletions bin/definitions/order-tracking-sfn.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
"Next": "latestOrderStatus",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException"],
"ErrorEquals": [
"States.TaskFailed",
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 4,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [ "States.ALL" ],
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.errorInfo",
"Next": "orderInFailedState"
}
Expand Down Expand Up @@ -46,9 +51,18 @@
{
"Variable": "$.orderStatus",
"StringEquals": "error"
},
{
"Variable": "$.orderStatus",
"StringEquals": "insufficient-funds"
}
],
"Next": "orderInTerminalState"
},
{
"Variable": "$.retryCount",
"NumericGreaterThan": 301,
"Next": "orderRetried"
}
],
"Default": "waitStep"
Expand All @@ -64,6 +78,9 @@
},
"orderInTerminalState": {
"Type": "Succeed"
},
"orderRetried": {
"Type": "Succeed"
}
}
}
15 changes: 8 additions & 7 deletions bin/stacks/lambda-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { STAGE } from '../../lib/util/stage'
import { SERVICE_NAME } from '../constants'
import { CronStack } from './cron-stack'
import { DynamoStack, IndexCapacityConfig, TableCapacityConfig } from './dynamo-stack'
import { StatusStack } from './status-stack'
import { StepFunctionStack } from './step-function-stack'

export interface LambdaStackProps extends cdk.NestedStackProps {
Expand Down Expand Up @@ -94,12 +93,14 @@ export class LambdaStack extends cdk.NestedStack {
indexCapacityConfig,
})

new StatusStack(this, `${SERVICE_NAME}-StatusStack`, {
environmentVariables: {
...props.envVars,
},
stage: props.stage,
})
// tracks limit order status as a fargate task
// currently done by step functions
// new StatusStack(this, `${SERVICE_NAME}-StatusStack`, {
// environmentVariables: {
// ...props.envVars,
// },
// stage: props.stage,
// })

const sfnStack = new StepFunctionStack(this, `${SERVICE_NAME}SfnStack`, {
stage: props.stage as STAGE,
Expand Down
44 changes: 43 additions & 1 deletion lib/handlers/check-order-status/handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { OrderType } from '@uniswap/uniswapx-sdk'
import { DynamoDB } from 'aws-sdk'
import Joi from 'joi'
import { BaseOrdersRepository } from '../../repositories/base'
import { LimitOrdersRepository } from '../../repositories/limit-orders-repository'
import { SfnInjector, SfnLambdaHandler, SfnStateInputOutput } from '../base'
import { kickoffOrderTrackingSfn } from '../shared/sfn'
import { ContainerInjected, RequestInjected } from './injector'
import { CheckOrderStatusInputJoi } from './schema'
import { CheckOrderStatusService } from './service'

export class CheckOrderStatusHandler extends SfnLambdaHandler<ContainerInjected, RequestInjected> {
private _checkOrderStatusService!: CheckOrderStatusService
private _checkLimitOrderStatusService!: CheckOrderStatusService

private getCheckOrderStatusService(dbInterface: BaseOrdersRepository) {
if (!this._checkOrderStatusService) {
Expand All @@ -15,6 +20,14 @@ export class CheckOrderStatusHandler extends SfnLambdaHandler<ContainerInjected,
return this._checkOrderStatusService
}

private getCheckLimitOrderStatusService() {
if (!this._checkLimitOrderStatusService) {
const dbInterface = LimitOrdersRepository.create(new DynamoDB.DocumentClient())
this._checkLimitOrderStatusService = new CheckOrderStatusService(dbInterface)
}
return this._checkLimitOrderStatusService
}

constructor(handlerName: string, injectorPromise: Promise<SfnInjector<ContainerInjected, RequestInjected>>) {
super(handlerName, injectorPromise)
}
Expand All @@ -23,7 +36,36 @@ export class CheckOrderStatusHandler extends SfnLambdaHandler<ContainerInjected,
containerInjected: ContainerInjected
requestInjected: RequestInjected
}): Promise<SfnStateInputOutput> {
return this.getCheckOrderStatusService(input.containerInjected.dbInterface).handleRequest(input.requestInjected)
//make sure to change "Variable": "$.retryCount", in order-tracking-sfn.json to be 1+retryCount
if (input.requestInjected?.retryCount > 300) {
const stateMachineArn = input.requestInjected.stateMachineArn
await kickoffOrderTrackingSfn(
{
orderHash: input.requestInjected.orderHash,
chainId: input.requestInjected.chainId,
orderStatus: input.requestInjected.orderStatus,
quoteId: input.requestInjected.quoteId,
orderType: input.requestInjected.orderType,
stateMachineArn: input.requestInjected.stateMachineArn,
},
stateMachineArn
)
}
if (input.requestInjected.orderType === OrderType.Limit) {
return {
...(await this.getCheckLimitOrderStatusService().handleRequest(input.requestInjected)),
orderType: input.requestInjected.orderType,
stateMachineArn: input.requestInjected.stateMachineArn,
}
} else {
return {
...(await this.getCheckOrderStatusService(input.containerInjected.dbInterface).handleRequest(
input.requestInjected
)),
orderType: input.requestInjected.orderType,
stateMachineArn: input.requestInjected.stateMachineArn,
}
}
}

protected inputSchema(): Joi.ObjectSchema | null {
Expand Down
5 changes: 4 additions & 1 deletion lib/handlers/check-order-status/injector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { BaseOrdersRepository } from '../../repositories/base'
import { DutchOrdersRepository } from '../../repositories/dutch-orders-repository'
import { setGlobalMetrics } from '../../util/metrics'
import { BaseRInj, SfnInjector, SfnStateInputOutput } from '../base/index'

export interface RequestInjected extends BaseRInj {
chainId: number
quoteId: string
Expand All @@ -21,6 +20,8 @@ export interface RequestInjected extends BaseRInj {
provider: ethers.providers.StaticJsonRpcProvider
orderWatcher: EventWatcher
orderQuoter: OrderValidator
orderType: OrderType
stateMachineArn: string
}

export interface ContainerInjected {
Expand Down Expand Up @@ -70,6 +71,8 @@ export class CheckOrderStatusInjector extends SfnInjector<ContainerInjected, Req
provider: provider,
orderWatcher: watcher,
orderQuoter: quoter,
orderType: event.orderType as OrderType,
stateMachineArn: event.stateMachineArn as string,
}
}
}
39 changes: 12 additions & 27 deletions lib/handlers/post-order/handler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { SFNClient, StartExecutionCommand } from '@aws-sdk/client-sfn'
import { getAddress } from '@ethersproject/address'
import { AddressZero } from '@ethersproject/constants'
import { DutchOrder, OrderType, OrderValidation } from '@uniswap/uniswapx-sdk'
import { Unit } from 'aws-embedded-metrics'
import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'
import Logger from 'bunyan'
import Joi from 'joi'

import { OrderEntity, ORDER_STATUS } from '../../entities'
Expand All @@ -13,16 +11,10 @@ import { metrics } from '../../util/metrics'
import { formatOrderEntity } from '../../util/order'
import { currentTimestampInSeconds } from '../../util/time'
import { APIGLambdaHandler, APIHandleRequestParams, ApiRInj, ErrorCode, ErrorResponse, Response } from '../base'
import { kickoffOrderTrackingSfn } from '../shared/sfn'
import { ContainerInjected } from './injector'
import { PostOrderRequestBody, PostOrderRequestBodyJoi, PostOrderResponse, PostOrderResponseJoi } from './schema'

type OrderTrackingSfnInput = {
orderHash: string
chainId: number
orderStatus: ORDER_STATUS
quoteId: string
}

export class PostOrderHandler extends APIGLambdaHandler<
ContainerInjected,
ApiRInj,
Expand Down Expand Up @@ -145,31 +137,24 @@ export class PostOrderHandler extends APIGLambdaHandler<
},
})

if (orderType === OrderType.Dutch) {
await this.kickoffOrderTrackingSfn(
{ orderHash: id, chainId: chainId, orderStatus: ORDER_STATUS.OPEN, quoteId: quoteId ?? '' },
await kickoffOrderTrackingSfn(
{
orderHash: id,
chainId: chainId,
orderStatus: ORDER_STATUS.OPEN,
quoteId: quoteId ?? '',
orderType,
stateMachineArn,
log
)
}
},
stateMachineArn
)

return {
statusCode: 201,
body: { hash: id },
}
}

private async kickoffOrderTrackingSfn(sfnInput: OrderTrackingSfnInput, stateMachineArn: string, log?: Logger) {
const region = checkDefined(process.env['REGION'])
const sfnClient = new SFNClient({ region: region })
const startExecutionCommand = new StartExecutionCommand({
stateMachineArn: stateMachineArn,
input: JSON.stringify(sfnInput),
name: sfnInput.orderHash,
})
log?.info(startExecutionCommand, 'Starting state machine execution')
await sfnClient.send(startExecutionCommand)
}

protected requestBodySchema(): Joi.ObjectSchema | null {
return PostOrderRequestBodyJoi
}
Expand Down
32 changes: 32 additions & 0 deletions lib/handlers/shared/sfn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { SFNClient, StartExecutionCommand } from '@aws-sdk/client-sfn'
import { OrderType } from '@uniswap/uniswapx-sdk'
import { ORDER_STATUS } from '../../entities'
import { log } from '../../Logging'
import { checkDefined } from '../../preconditions/preconditions'

export type OrderTrackingSfnInput = {
orderHash: string
chainId: number
orderStatus: ORDER_STATUS
quoteId: string
orderType: OrderType
stateMachineArn: string
}

//TODO: remove random for sfn name
//this is to stop collisions in the running step function name
const BIG_NUMBER = 1000000000000000000000

export async function kickoffOrderTrackingSfn(sfnInput: OrderTrackingSfnInput, stateMachineArn: string) {
log.info('starting state machine')
const region = checkDefined(process.env['REGION'])
const sfnClient = new SFNClient({ region: region })
const rand = Math.floor(Math.random() * BIG_NUMBER)
const startExecutionCommand = new StartExecutionCommand({
stateMachineArn: stateMachineArn,
input: JSON.stringify(sfnInput),
name: sfnInput.orderHash + '_' + rand,
})
log.info('Starting state machine execution', { startExecutionCommand })
await sfnClient.send(startExecutionCommand)
}
36 changes: 0 additions & 36 deletions test/unit/handlers/check-order-status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,42 +219,6 @@ describe('Testing check order status handler', () => {
startingBlockNumber: mockedBlockNumber - FILL_EVENT_LOOKBACK_BLOCKS_ON(1),
})
})

it('should do exponential backoff when retry count > 300', async () => {
const injectorPromiseMock: any = buildInjectorPromiseMock(301, ORDER_STATUS.OPEN)
const checkOrderStatusHandler = new CheckOrderStatusHandler('check-order-status', injectorPromiseMock)
validateMock.mockReturnValue(OrderValidation.OK)
const response = await checkOrderStatusHandler.handler(handlerEventMock)
expect(getByHashMock).toBeCalledWith(MOCK_ORDER_HASH)
expect(validateMock).toBeCalled()
expect(updateOrderStatusMock).not.toBeCalled() // there is no update
expect(response).toEqual({
orderHash: MOCK_ORDER_HASH,
orderStatus: 'open',
retryCount: 302,
retryWaitSeconds: 13,
chainId: 1,
startingBlockNumber: mockedBlockNumber - FILL_EVENT_LOOKBACK_BLOCKS_ON(1),
})
})

it('should cap exponential backoff when wait interval reaches 18000 seconds', async () => {
const injectorPromiseMock: any = buildInjectorPromiseMock(500, ORDER_STATUS.OPEN)
const checkOrderStatusHandler = new CheckOrderStatusHandler('check-order-status', injectorPromiseMock)
validateMock.mockReturnValue(OrderValidation.OK)
const response = await checkOrderStatusHandler.handler(handlerEventMock)
expect(getByHashMock).toBeCalledWith(MOCK_ORDER_HASH)
expect(validateMock).toBeCalled()
expect(updateOrderStatusMock).not.toBeCalled() // there is no update
expect(response).toEqual({
orderHash: MOCK_ORDER_HASH,
orderStatus: 'open',
retryCount: 501,
retryWaitSeconds: 18000,
chainId: 1,
startingBlockNumber: mockedBlockNumber - FILL_EVENT_LOOKBACK_BLOCKS_ON(1),
})
})
})

describe('Test getSettledAmounts', () => {
Expand Down
7 changes: 7 additions & 0 deletions test/unit/handlers/post-limit-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import { getMaxLimitOpenOrders } from '../../../lib/handlers/post-limit-order/in
import { PostOrderHandler } from '../../../lib/handlers/post-order/handler'
import { ORDER_INFO } from '../fixtures'

jest.mock('../../../lib/handlers/shared/sfn', () => {
return {
kickoffOrderTrackingSfn: jest.fn(),
}
})

const MOCK_ARN_1 = 'MOCK_ARN_1'
const MOCK_ARN_5 = 'MOCK_ARN_5'

Expand Down Expand Up @@ -36,6 +42,7 @@ describe('Testing post limit order handler.', () => {

const encodedOrder = '0x01'
const postRequestBody = {
orderHash: '0x01',
encodedOrder: encodedOrder,
signature:
'0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010',
Expand Down
6 changes: 4 additions & 2 deletions test/unit/handlers/post-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ErrorCode } from '../../../lib/handlers/base'
import { DEFAULT_MAX_OPEN_ORDERS } from '../../../lib/handlers/constants'
import { PostOrderHandler } from '../../../lib/handlers/post-order/handler'
import { getMaxOpenOrders } from '../../../lib/handlers/post-order/injector'
import { kickoffOrderTrackingSfn } from '../../../lib/handlers/shared/sfn'
import { ORDER_INFO } from '../fixtures'

const MOCK_ARN_1 = 'MOCK_ARN_1'
Expand Down Expand Up @@ -61,6 +62,7 @@ describe('Testing post order handler.', () => {
const encodedOrder = '0x01'
const postRequestBody = {
encodedOrder: encodedOrder,
orderHash: '0x01',
signature:
'0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010',
chainId: 1,
Expand Down Expand Up @@ -298,14 +300,14 @@ describe('Testing post order handler.', () => {

it('should call StepFunctions.startExecution method with the correct params', async () => {
const sfnInput = { orderHash: '0xhash', chainId: 1, quoteId: 'quoteId', orderStatus: ORDER_STATUS.OPEN }
expect(async () => await postOrderHandler['kickoffOrderTrackingSfn'](sfnInput, MOCK_ARN_1)).not.toThrow()
expect(async () => await kickoffOrderTrackingSfn(sfnInput, MOCK_ARN_1)).not.toThrow()
expect(mockSfnClient.calls()).toHaveLength(1)

expect(mockSfnClient.call(0).args[0].input).toStrictEqual(
new StartExecutionCommand({
stateMachineArn: MOCK_ARN_1,
input: JSON.stringify(sfnInput),
name: sfnInput.orderHash,
name: expect.any(String),
}).input
)
})
Expand Down

0 comments on commit 8d83309

Please sign in to comment.