diff --git a/api/test/test-helpers.js b/api/test/test-helpers.js index f8c4075..0e3387a 100644 --- a/api/test/test-helpers.js +++ b/api/test/test-helpers.js @@ -1,5 +1,9 @@ import { AssertionError } from 'node:assert' +/** + * @param {Response} res + * @param {number} status + */ export const assertResponseStatus = async (res, status) => { if (res.status !== status) { throw new AssertionError({ diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index c690784..f0b21f0 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -11,6 +11,7 @@ import { countStoredActiveDealsWithUnresolvedPayloadCid, resolvePayloadCids, cou import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js' import { payloadCidRequest } from '../lib/piece-indexer-service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {MakeRpcRequest, MakePayloadCidRequest} from '../lib/typings.d.ts' */ const { INFLUXDB_TOKEN, @@ -37,6 +38,10 @@ assert(finalityEpochs <= maxPastEpochs) const pgPool = await createPgPool() const { recordTelemetry } = createInflux(INFLUXDB_TOKEN) +/** + * @param {MakeRpcRequest} makeRpcRequest + * @param {Queryable} pgPool + */ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { while (true) { const start = Date.now() @@ -47,7 +52,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { const numberOfRevertedActiveDeals = await countRevertedActiveDeals(pgPool) if (INFLUXDB_TOKEN) { recordTelemetry('observed_deals_stats', point => { - point.intField('last_searched_epoch', newLastInsertedDeal.activated_at_epoch) + point.intField('last_searched_epoch', newLastInsertedDeal?.activated_at_epoch ?? 0) point.intField('number_of_stored_active_deals', numberOfStoredDeals) point.intField('number_of_reverted_active_deals', numberOfRevertedActiveDeals) }) @@ -116,6 +121,11 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken, } } +/** + * @param {MakeRpcRequest} makeRpcRequest + * @param {MakePayloadCidRequest} makePayloadCidRequest + * @param {Queryable} pgPool + */ export const resolvePayloadCidsLoop = async (makeRpcRequest, makePayloadCidRequest, pgPool) => { while (true) { const start = Date.now() diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index 2b1565c..bc2008d 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -1,14 +1,14 @@ -/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ -/** @import { Static } from '@sinclair/typebox' */ - import { getActorEvents, getActorEventsFilter, getChainHead } from './rpc-service/service.js' import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' import { convertBlockEventToActiveDealDbEntry } from './utils.js' +/** @import {Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ +/** @import { Static } from '@sinclair/typebox' */ +/** @import {MakeRpcRequest} from './typings.d.ts' */ /** * @param {Queryable} pgPool - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @param {number} maxPastEpochs * @param {number} finalityEpochs * @returns {Promise} @@ -18,7 +18,7 @@ export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastE const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) const startEpoch = Math.max( currentChainHead.Height - maxPastEpochs, - (lastInsertedDeal?.activated_at_epoch + 1) || 0 + (lastInsertedDeal?.activated_at_epoch ?? -1) + 1 ) const endEpoch = currentChainHead.Height - finalityEpochs for (let epoch = startEpoch; epoch <= endEpoch; epoch++) { @@ -29,7 +29,7 @@ export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastE /** * @param {number} blockHeight * @param {Queryable} pgPool - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest */ export const fetchAndStoreActiveDeals = async (blockHeight, pgPool, makeRpcRequest) => { const eventType = 'claim' @@ -123,11 +123,13 @@ export async function storeActiveDeals (activeDeals, pgPool) { /** * @param {Queryable} pgPool * @param {string} query - * @param {Array} args + * @param {Array} args * @returns {Promise>>} */ export async function loadDeals (pgPool, query, args = []) { - const result = (await pgPool.query(query, args)).rows.map(deal => { + /** @type {QueryResultWithUnknownRows} */ + const { rows } = await pgPool.query(query, args) + const result = rows.map((deal) => { // SQL used null, typebox needs undefined for null values Object.keys(deal).forEach(key => { if (deal[key] === null) { diff --git a/backend/lib/resolve-payload-cids.js b/backend/lib/resolve-payload-cids.js index 42d386a..ccf355f 100644 --- a/backend/lib/resolve-payload-cids.js +++ b/backend/lib/resolve-payload-cids.js @@ -11,8 +11,8 @@ const THREE_DAYS_IN_MILLISECONDS = 1000 * 60 * 60 * 24 * 3 /** * - * @param {function} makeRpcRequest - * @param {function} makePayloadCidRequest + * @param {import('./typings.js').MakeRpcRequest} makeRpcRequest + * @param {import('./typings.js').MakePayloadCidRequest} makePayloadCidRequest * @param {Queryable} pgPool * @param {number} maxDeals * @returns {Promise} @@ -21,7 +21,8 @@ export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, let payloadCidsResolved = 0 for (const deal of await fetchDealsWithUnresolvedPayloadCid(pgPool, maxDeals, new Date(now - THREE_DAYS_IN_MILLISECONDS))) { const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest) - deal.payload_cid = await makePayloadCidRequest(minerPeerId, deal.piece_cid) + const payloadCid = await makePayloadCidRequest(minerPeerId, deal.piece_cid) + if (payloadCid) deal.payload_cid = payloadCid if (!deal.payload_cid) { if (deal.last_payload_retrieval_attempt) { deal.payload_retrievability_state = PayloadRetrievabilityState.TerminallyUnretrievable @@ -49,6 +50,10 @@ export async function fetchDealsWithUnresolvedPayloadCid (pgPool, maxDeals, now) return await loadDeals(pgPool, query, [now, maxDeals]) } +/** + * @param {Queryable} pgPool + * @returns {Promise} + */ export async function countStoredActiveDealsWithUnresolvedPayloadCid (pgPool) { const query = 'SELECT COUNT(*) FROM active_deals WHERE payload_cid IS NULL' const result = await pgPool.query(query) @@ -70,7 +75,7 @@ export async function countRevertedActiveDeals (pgPool) { * @param {Static} deal * @param {Static< typeof PayloadRetrievabilityStateType>} newPayloadRetrievalState * @param {Date} lastRetrievalAttemptTimestamp - * @param {string} newPayloadCid + * @param {string | undefined} newPayloadCid * @returns { Promise} */ async function updatePayloadCidInActiveDeal (pgPool, deal, newPayloadRetrievalState, lastRetrievalAttemptTimestamp, newPayloadCid) { diff --git a/backend/lib/rpc-service/data-types.js b/backend/lib/rpc-service/data-types.js index 2a1483f..0719da1 100644 --- a/backend/lib/rpc-service/data-types.js +++ b/backend/lib/rpc-service/data-types.js @@ -24,8 +24,8 @@ const RawActorEvent = Type.Object({ entries: Type.Array(Entry), height: Type.Number(), reverted: Type.Boolean(), - msgCid: Type.Any(), - tipsetKey: Type.Array(Type.Any()) + msgCid: Type.Unknown(), + tipsetKey: Type.Array(Type.Unknown()) }) const BlockEvent = Type.Object({ @@ -35,8 +35,14 @@ const BlockEvent = Type.Object({ reverted: Type.Boolean() }) -const RpcRespone = Type.Object({ - result: Type.Any() +const RpcResponse = Type.Object({ + result: Type.Unknown() +}) + +const ChainHead = Type.Object({ + Height: Type.Number(), + Blocks: Type.Unknown(), + Cids: Type.Unknown() }) export { @@ -44,5 +50,6 @@ export { Entry, RawActorEvent, BlockEvent, - RpcRespone + RpcResponse, + ChainHead } diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index 7112ab1..a286ec1 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -4,14 +4,16 @@ import { encode as cborEncode } from '@ipld/dag-cbor' import { rawEventEntriesToEvent } from './utils.js' import { Value } from '@sinclair/typebox/value' import * as util from 'node:util' -import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone } from './data-types.js' +import { ClaimEvent, RawActorEvent, BlockEvent, RpcResponse, ChainHead } from './data-types.js' import pRetry from 'p-retry' + /** @import { Static } from '@sinclair/typebox' */ +/** @import {MakeRpcRequest} from '../typings.d.ts' */ /** * @param {string} method - * @param {Object} params - * @returns {Promise} + * @param {unknown[]} params + * @returns {Promise} */ export const rpcRequest = async (method, params) => { const reqBody = JSON.stringify({ method, params, id: 1, jsonrpc: '2.0' }) @@ -30,7 +32,7 @@ export const rpcRequest = async (method, params) => { } const json = await response.json() try { - const parsedRpcResponse = Value.Parse(RpcRespone, json).result + const parsedRpcResponse = Value.Parse(RpcResponse, json).result return parsedRpcResponse } catch (error) { throw Error(util.format('Failed to parse RPC response: %o', json), { cause: error }) @@ -40,12 +42,13 @@ export const rpcRequest = async (method, params) => { } } /** - * @param {object} actorEventFilter + * @param {{fromHeight:number,toHeight:number,fields: unknown}} actorEventFilter * Returns actor events filtered by the given actorEventFilter + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise>>} */ export async function getActorEvents (actorEventFilter, makeRpcRequest) { - const rawEvents = await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter]) + const rawEvents = /** @type {unknown[]} */(await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter])) if (!rawEvents || rawEvents.length === 0) { console.debug(`No actor events found in the height range ${actorEventFilter.fromHeight} - ${actorEventFilter.toHeight}.`) return [] @@ -81,22 +84,31 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) { } /** - * @param {function} makeRpcRequest - * @returns {Promise} + * @param {MakeRpcRequest} makeRpcRequest + * @returns {Promise>} */ export async function getChainHead (makeRpcRequest) { - return await makeRpcRequest('Filecoin.ChainHead', []) + const result = await makeRpcRequest('Filecoin.ChainHead', []) + try { + return Value.Parse(ChainHead, result) + } catch (e) { + throw Error(util.format('Failed to parse chain head: %o', result)) + } } /** * @param {number} minerId - * @param {function} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise} */ export async function getMinerPeerId (minerId, makeRpcRequest) { + /** @typedef {{ + * PeerId: string; + * }} MinerInfo + */ try { const params = getMinerInfoCallParams(minerId) - const res = await makeRpcRequest('Filecoin.StateMinerInfo', params) + const res = /** @type {MinerInfo} */(await makeRpcRequest('Filecoin.StateMinerInfo', params)) if (!res || !res.PeerId) { throw Error(`Failed to get peer ID for miner ${minerId}, result: ${res}`) } diff --git a/backend/lib/rpc-service/utils.js b/backend/lib/rpc-service/utils.js index 3d54b8e..f263f24 100644 --- a/backend/lib/rpc-service/utils.js +++ b/backend/lib/rpc-service/utils.js @@ -2,6 +2,10 @@ import { base64pad } from 'multiformats/bases/base64' import { decode as cborDecode } from '@ipld/dag-cbor' import * as util from 'node:util' +/** + * @param {string} data + * @returns {unknown} + */ const decodeCborInBase64 = (data) => { return cborDecode(base64pad.baseDecode(data)) } @@ -14,7 +18,9 @@ const decodeCborInBase64 = (data) => { */ const rawEventEntriesToEvent = (rawEventEntries) => { // Each event is defined by a list of event entries which will parsed into a typed event + /** @type {Record} */ const event = {} + /** @type {string | undefined} */ let eventType for (const entry of rawEventEntries) { // The key returned by the Lotus API is kebab-case, we convert it to camelCase @@ -22,13 +28,13 @@ const rawEventEntriesToEvent = (rawEventEntries) => { let value = decodeCborInBase64(entry.Value) // In each entry exists an event type declaration which we need to extract if (key === '$type') { - eventType = value + eventType = /** @type {string} */(value) // The type entry is not part of the event itself continue } // Convert CID instanes to the string representation - if (value[Symbol.toStringTag] === 'CID') { + if (typeof value === 'object' && value && (Symbol.toStringTag in value) && value[Symbol.toStringTag] === 'CID') { value = value.toString() } else if (typeof value !== 'number') { throw new Error(util.format( diff --git a/backend/lib/telemetry.js b/backend/lib/telemetry.js index db3dd08..04ce457 100644 --- a/backend/lib/telemetry.js +++ b/backend/lib/telemetry.js @@ -3,6 +3,10 @@ import createDebug from 'debug' const debug = createDebug('spark:deal-observer:telemetry') +/** + * @param {string | undefined} token + * @returns {{influx: InfluxDB,recordTelemetry: (name: string, fn: (p: Point) => void) => void}} + */ export const createInflux = token => { const influx = new InfluxDB({ url: 'https://eu-central-1-1.aws.cloud2.influxdata.com', diff --git a/backend/lib/typings.d.ts b/backend/lib/typings.d.ts new file mode 100644 index 0000000..0901370 --- /dev/null +++ b/backend/lib/typings.d.ts @@ -0,0 +1,2 @@ +export type MakeRpcRequest = (method: string, params: unknown[]) => Promise; +export type MakePayloadCidRequest = (providerId:string,pieceCid:string) => Promise; diff --git a/backend/package.json b/backend/package.json index b0a9968..566110f 100644 --- a/backend/package.json +++ b/backend/package.json @@ -9,6 +9,9 @@ "test": "node --test --test-reporter=spec --test-concurrency=1" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/pg-cursor": "^2.7.2", + "@types/slug": "^5.0.9", "standard": "^17.1.2" }, "dependencies": { diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index 5b36563..d5c3d84 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -3,15 +3,19 @@ import { after, before, beforeEach, describe, it } from 'node:test' import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js' import { Value } from '@sinclair/typebox/value' -import { BlockEvent } from '../lib/rpc-service/data-types.js' +import { BlockEvent, ClaimEvent } from '../lib/rpc-service/data-types.js' import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js' import { PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' import { chainHeadTestData } from './test_data/chainHead.js' import { rawActorEventTestData } from './test_data/rawActorEvent.js' import { parse } from '@ipld/dag-json' import { countRevertedActiveDeals } from '../lib/resolve-payload-cids.js' +/** @import { Static } from '@sinclair/typebox' */ describe('deal-observer-backend', () => { + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool} + */ let pgPool before(async () => { pgPool = await createPgPool() @@ -27,33 +31,28 @@ describe('deal-observer-backend', () => { }) it('adds new FIL+ deals from built-in actor events to storage', async () => { - const eventData = { - id: 1, - provider: 2, - client: 3, - pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', - pieceSize: 4n, - termStart: 5, - termMin: 12340, - termMax: 12340, - sector: 6n, - payload_cid: undefined - } - const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) + const event = Value.Parse(BlockEvent, { + height: 1, + event: { + id: 1, + provider: 2, + client: 3, + pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', + pieceSize: 4n, + termStart: 5, + termMin: 12340, + termMax: 12340, + sector: 6n, + payload_cid: undefined + }, + emitter: 'f06', + reverted: false + }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) const actualData = await loadDeals(pgPool, 'SELECT * FROM active_deals') const expectedData = { - activated_at_epoch: event.height, - miner_id: eventData.provider, - client_id: eventData.client, - piece_cid: eventData.pieceCid, - piece_size: eventData.pieceSize, - term_start_epoch: eventData.termStart, - term_min: eventData.termMin, - term_max: eventData.termMax, - sector_id: eventData.sector, - payload_cid: undefined, + ...dbEntry, payload_retrievability_state: PayloadRetrievabilityState.NotQueried, last_payload_retrieval_attempt: undefined, reverted: false @@ -84,12 +83,15 @@ describe('deal-observer-backend', () => { }) it('check number of stored deals', async () => { + /** + * @param {Static} eventData + */ const storeBlockEvent = async (eventData) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) } - const data = { + const data = Value.Parse(ClaimEvent, { id: 1, provider: 2, client: 3, @@ -99,7 +101,7 @@ describe('deal-observer-backend', () => { termMin: 12340, termMax: 12340, sector: 6n - } + }) assert.strictEqual(await countStoredActiveDeals(pgPool), 0n) await storeBlockEvent(data) assert.strictEqual(await countStoredActiveDeals(pgPool), 1n) @@ -111,6 +113,9 @@ describe('deal-observer-backend', () => { }) it('serially processes claims for a piece stored twice in the same sector', async () => { + /** + * @param {Static} eventData + */ const storeDeal = async (eventData) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) @@ -141,6 +146,9 @@ describe('deal-observer-backend', () => { assert.strictEqual(actual.length, 1) }) it('simultaneously processes claims for a piece stored twice in the same sector', async () => { + /** + * @param {Array>} events + */ const storeDeal = async (events) => { const dbEntries = events.map(eventData => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) @@ -148,7 +156,7 @@ describe('deal-observer-backend', () => { }) await storeActiveDeals(dbEntries, pgPool) } - const eventData = { + const eventData = Value.Parse(ClaimEvent, { id: 1, provider: 2, client: 3, @@ -161,19 +169,23 @@ describe('deal-observer-backend', () => { payload_cid: undefined, payload_retrievability_state: PayloadRetrievabilityState.NotQueried, last_payload_retrieval_attempt: undefined - } + }) await storeDeal([eventData, { ...eventData, id: 2 }]) const actual = await loadDeals(pgPool, 'SELECT * FROM active_deals') // Only one of the events will be stored in the database assert.strictEqual(actual.length, 1) }) it('check number of reverted stored deals', async () => { + /** + * @param {Static} eventData + * @param {boolean} reverted + */ const storeBlockEvent = async (eventData, reverted) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) } - const data = { + const data = Value.Parse(ClaimEvent, { id: 1, provider: 2, client: 3, @@ -183,7 +195,7 @@ describe('deal-observer-backend', () => { termMin: 12340, termMax: 12340, sector: 6n - } + }) assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n) await storeBlockEvent(data, false) assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n) @@ -203,13 +215,23 @@ describe('deal-observer-backend', () => { }) describe('deal-observer-backend built in actor event observer', () => { + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool} + */ let pgPool + /** + * @type {import('../lib/typings.js').MakeRpcRequest} + */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw':{ + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) } default: console.error('Unknown method') } @@ -237,8 +259,7 @@ describe('deal-observer-backend built in actor event observer', () => { let deals = await loadDeals(pgPool, 'SELECT * FROM active_deals') assert.strictEqual(deals.length, 25) const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) - assert.strictEqual(lastInsertedDeal.activated_at_epoch, 4622129) - + assert.strictEqual(lastInsertedDeal?.activated_at_epoch, 4622129) // The deal observer function should pick up from the current storage await observeBuiltinActorEvents(pgPool, makeRpcRequest, 100, 0) deals = await loadDeals(pgPool, 'SELECT * FROM active_deals') diff --git a/backend/test/resolve-payload-cids.test.js b/backend/test/resolve-payload-cids.test.js index ad4e63d..8af435f 100644 --- a/backend/test/resolve-payload-cids.test.js +++ b/backend/test/resolve-payload-cids.test.js @@ -12,18 +12,30 @@ import { ActiveDealDbEntry, PayloadRetrievabilityState } from '@filecoin-station import { countStoredActiveDealsWithUnresolvedPayloadCid, resolvePayloadCids } from '../lib/resolve-payload-cids.js' describe('deal-observer-backend resolve payload CIDs', () => { + /** + * @type {import('../lib/typings.d.ts').MakeRpcRequest} + * */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } case 'Filecoin.StateMinerInfo': + assert(typeof params[0] === 'string', 'params[0] must be a string') return minerPeerIds.get(params[0]) default: console.error('Unknown method') } } + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool}} + * */ let pgPool before(async () => { pgPool = await createPgPool() @@ -48,17 +60,20 @@ describe('deal-observer-backend resolve payload CIDs', () => { it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => { const resolvePayloadCidCalls = [] - const resolvePayloadCid = async (providerId, pieceCid) => { + /** + * @type {import('../lib/typings.d.ts').MakePayloadCidRequest} + * */ + const makePayloadCidRequest = async (providerId, pieceCid) => { resolvePayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid + return payloadCid ? payloadCid.payloadCid : null } assert.strictEqual( (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, 336 ) - await resolvePayloadCids(makeRpcRequest, resolvePayloadCid, pgPool, 10000) + await resolvePayloadCids(makeRpcRequest, makePayloadCidRequest, pgPool, 10000) assert.strictEqual(resolvePayloadCidCalls.length, 336) assert.strictEqual( (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, @@ -70,10 +85,13 @@ describe('deal-observer-backend resolve payload CIDs', () => { let unresolvedPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool) assert.strictEqual(unresolvedPayloadCids, 336n) const resolvePayloadCidCalls = [] + /** + * @type {import('../lib/typings.d.ts').MakePayloadCidRequest} + * */ const resolvePayloadCid = async (providerId, pieceCid) => { resolvePayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid + return payloadCid ? payloadCid.payloadCid : null } await resolvePayloadCids(makeRpcRequest, resolvePayloadCid, pgPool, 10000) @@ -83,6 +101,9 @@ describe('deal-observer-backend resolve payload CIDs', () => { }) describe('deal-observer-backend piece indexer payload retrieval', () => { + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool} + */ let pgPool const payloadCid = 'PAYLOAD_CID' const minerPeerId = 'MINER_PEER_ID' diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index 9ab57e6..6da0c0f 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -7,13 +7,21 @@ import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from ' import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' +/** @import {MakeRpcRequest} from '../lib/typings.d.ts' */ + describe('RpcApiClient', () => { + /** @type {MakeRpcRequest} */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } default: console.error('Unknown method') } @@ -23,7 +31,8 @@ describe('RpcApiClient', () => { const chainHead = await getChainHead(makeRpcRequest) assert(chainHead) const expected = parse(JSON.stringify(chainHeadTestData)) - assert.deepStrictEqual(chainHead, expected) + assert(chainHead.Height) + assert.deepStrictEqual(expected.Height, chainHead.Height) }) for (let blockHeight = 4622129; blockHeight < 4622129 + 11; blockHeight++) { diff --git a/backend/test/spark-api-submit-deals.test.js b/backend/test/spark-api-submit-deals.test.js index e44a05b..5f49efc 100644 --- a/backend/test/spark-api-submit-deals.test.js +++ b/backend/test/spark-api-submit-deals.test.js @@ -4,7 +4,12 @@ import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observ import { calculateActiveDealEpochs, daysAgo, daysFromNow, today } from './test-helpers.js' import { findAndSubmitUnsubmittedDeals } from '../lib/spark-api-submit-deals.js' +/** @import {PgPool, Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ + describe('Submit deals to spark-api', () => { + /** + * @type {PgPool} + */ let pgPool before(async () => { @@ -34,6 +39,7 @@ describe('Submit deals to spark-api', () => { const mockSubmitEligibleDeals = createSubmitEligibleDealsMock() const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 2) @@ -48,6 +54,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 2) @@ -65,6 +72,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 1) assert.strictEqual(ingested, 1) @@ -82,6 +90,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 1) @@ -91,6 +100,19 @@ describe('Submit deals to spark-api', () => { }) }) +/** + * @param {Queryable} pgPool + * @param {Object} activeDeal + * @param {string} activeDeal.createdAt + * @param {string} activeDeal.startsAt + * @param {string} activeDeal.expiresAt + * @param {number} [activeDeal.minerId=2] + * @param {number} [activeDeal.clientId=3] + * @param {string} [activeDeal.pieceCid='cidone'] + * @param {string | null} [activeDeal.payloadCid=null] + * @param {boolean} [activeDeal.reverted=false] + * @returns {Promise} + */ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId = 2, clientId = 3, pieceCid = 'cidone', payloadCid = null, reverted = false }) => { const { activatedAtEpoch, termStart, termMin, termMax } = calculateActiveDealEpochs(createdAt, startsAt, expiresAt) await pgPool.query( @@ -103,12 +125,7 @@ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId // TODO: allow callers of this helper to define how many deals should be reported as skipped const createSubmitEligibleDealsMock = () => { - return mock.fn( - // original - unused param - () => {}, - // implementation - async (deals) => { - return { ingested: deals.length, skipped: 0 } - } - ) + return mock.fn(async (deals) => { + return { ingested: deals.length, skipped: 0 } + }) } diff --git a/backend/test/test-helpers.js b/backend/test/test-helpers.js index 84370ad..d8c5f06 100644 --- a/backend/test/test-helpers.js +++ b/backend/test/test-helpers.js @@ -14,8 +14,8 @@ export const getLocalDayAsISOString = (d) => { export const today = () => getLocalDayAsISOString(new Date()) export const yesterday = () => getLocalDayAsISOString(new Date(Date.now() - 24 * 60 * 60 * 1000)) -export const daysAgo = (n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000)) -export const daysFromNow = (n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000)) +export const daysAgo = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000)) +export const daysFromNow = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000)) /** * Calculates activated at, term start, term min, and term max. diff --git a/db/index.js b/db/index.js index 752f8dd..5c37660 100644 --- a/db/index.js +++ b/db/index.js @@ -6,6 +6,8 @@ import Postgrator from 'postgrator' // re-export types /** @typedef {import('./typings.js').Queryable} Queryable */ /** @typedef {import('./typings.js').PgPool} PgPool */ +/** @typedef {import('./typings.js').UnknownRow} UnknownRow */ +/** @typedef {import('./typings.js').QueryResultWithUnknownRows} QueryResultWithUnknownRows */ // Configure node-postgres to deserialize BIGINT values as BigInt, not String pg.types.setTypeParser(20, BigInt) // Type Id 20 = BIGINT | BIGSERIAL @@ -33,7 +35,11 @@ const poolConfig = { maxLifetimeSeconds: 60 } -const onError = err => { +/** + * @param {Error} err + * @returns {void} + */ +const onError = (err) => { // Prevent crashing the process on idle client errors, the pool will recover // itself. If all connections are lost, the process will still crash. // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 diff --git a/db/typings.d.ts b/db/typings.d.ts index 419a754..35aa4e0 100644 --- a/db/typings.d.ts +++ b/db/typings.d.ts @@ -4,3 +4,5 @@ export type PgPool = pg.Pool // Copied from import('@types/pg'). export type Queryable = Pick +export type UnknownRow = Record +export type QueryResultWithUnknownRows = pg.QueryResult diff --git a/package-lock.json b/package-lock.json index cbb819d..e398ca5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -52,6 +52,9 @@ "pg-cursor": "^2.12.3" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/pg-cursor": "^2.7.2", + "@types/slug": "^5.0.9", "standard": "^17.1.2" } }, @@ -1079,6 +1082,16 @@ "@types/node": "*" } }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/json5": { "version": "0.0.29", "dev": true, @@ -1107,6 +1120,17 @@ "pg-types": "^4.0.1" } }, + "node_modules/@types/pg-cursor": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/@types/pg-cursor/-/pg-cursor-2.7.2.tgz", + "integrity": "sha512-m3xT8bVFCvx98LuzbvXyuCdT/Hjdd/v8ml4jL4K1QF70Y8clOfCFdgoaEB1FWdcSwcpoFYZTJQaMD9/GQ27efQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*", + "@types/pg": "*" + } + }, "node_modules/@types/pg-pool": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/@types/pg-pool/-/pg-pool-2.0.6.tgz", @@ -1124,6 +1148,13 @@ "version": "1.2.0", "license": "MIT" }, + "node_modules/@types/slug": { + "version": "5.0.9", + "resolved": "https://registry.npmjs.org/@types/slug/-/slug-5.0.9.tgz", + "integrity": "sha512-6Yp8BSplP35Esa/wOG1wLNKiqXevpQTEF/RcL/NV6BBQaMmZh4YlDwCgrrFSoUE4xAGvnKd5c+lkQJmPrBAzfQ==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/tedious": { "version": "4.0.14", "license": "MIT", diff --git a/tsconfig.json b/tsconfig.json index 04bbb8d..8af993b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,8 +12,7 @@ "isolatedModules": true, "verbatimModuleSyntax": true, - // TODO - // "strict": true, + "strict": true, "forceConsistentCasingInFileNames": true, "declaration": true,