From c1ecd45ef7de5efb1a217085d42122e678c19545 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 4 Feb 2025 10:38:20 +0000 Subject: [PATCH 01/22] add strict type checking --- api/test/test-helpers.js | 4 +++ backend/bin/deal-observer-backend.js | 14 ++++++-- backend/lib/deal-observer.js | 6 ++-- backend/lib/rpc-service/data-types.js | 9 ++++- backend/lib/rpc-service/service.js | 16 ++++++--- backend/lib/spark-api-submit-deals.js | 8 ++--- backend/lib/telemetry.js | 4 +++ backend/package.json | 2 ++ backend/test/deal-observer.test.js | 7 ++++ backend/test/piece-indexer.test.js | 16 ++++++++- backend/test/rpc-client.test.js | 10 ++++-- backend/test/spark-api-submit-deals.test.js | 20 ++++++----- backend/test/test-helpers.js | 4 +-- db/index.js | 6 +++- package-lock.json | 38 +++++++++++++++++++++ package.json | 1 + tsconfig.json | 3 +- 17 files changed, 137 insertions(+), 31 deletions(-) diff --git a/api/test/test-helpers.js b/api/test/test-helpers.js index f8c4075..0e3387a 100644 --- a/api/test/test-helpers.js +++ b/api/test/test-helpers.js @@ -1,5 +1,9 @@ import { AssertionError } from 'node:assert' +/** + * @param {Response} res + * @param {number} status + */ export const assertResponseStatus = async (res, status) => { if (res.status !== status) { throw new AssertionError({ diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index dba0640..222c5c1 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -11,6 +11,7 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuil import { indexPieces } from '../lib/piece-indexer.js' import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js' import { getDealPayloadCid } from '../lib/piece-indexer-service.js' +/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ const { INFLUXDB_TOKEN, @@ -37,6 +38,10 @@ assert(finalityEpochs <= maxPastEpochs) const pgPool = await createPgPool() const { recordTelemetry } = createInflux(INFLUXDB_TOKEN) +/** + * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {Queryable} pgPool + */ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { const LOOP_NAME = 'Observe actor events' while (true) { @@ -46,7 +51,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) const startEpoch = Math.max( currentChainHead.Height - maxPastEpochs, - (lastInsertedDeal?.activated_at_epoch + 1) || 0 + lastInsertedDeal ? (lastInsertedDeal.activated_at_epoch ?? -1) + 1 : 0 ) const endEpoch = currentChainHead.Height - finalityEpochs @@ -57,7 +62,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { const numberOfStoredDeals = await countStoredActiveDeals(pgPool) if (INFLUXDB_TOKEN) { recordTelemetry('observed_deals_stats', point => { - point.intField('last_searched_epoch', newLastInsertedDeal.activated_at_epoch) + point.intField('last_searched_epoch', newLastInsertedDeal?.activated_at_epoch || 0) point.intField('number_of_stored_active_deals', numberOfStoredDeals) }) } @@ -126,6 +131,11 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken, } } +/** + * @param {(method:string,params:object) => object} makeRpcRequest + * @param {(providerId:string,pieceCid:string) => Promise} getDealPayloadCid + * @param {*} pgPool + */ export const pieceIndexerLoop = async (makeRpcRequest, getDealPayloadCid, pgPool) => { const LOOP_NAME = 'Piece Indexer' while (true) { diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index 6299c1d..05d1c5a 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -9,7 +9,7 @@ import { convertBlockEventToActiveDealDbEntry } from './utils.js' /** * @param {number} blockHeight * @param {Queryable} pgPool - * @param {(method:string,params:object) => object} makeRpcRequest + * @param {(method:string,params:any[]) => Promise} makeRpcRequest * @returns {Promise} */ export async function observeBuiltinActorEvents (blockHeight, pgPool, makeRpcRequest) { @@ -94,11 +94,11 @@ export async function storeActiveDeals (activeDeals, pgPool) { /** * @param {Queryable} pgPool * @param {string} query - * @param {Array} args + * @param {Array} args * @returns {Promise>>} */ export async function loadDeals (pgPool, query, args = []) { - const result = (await pgPool.query(query, args)).rows.map(deal => { + const result = (await pgPool.query(query, args)).rows.map((/** @type {any} */ deal) => { // SQL used null, typebox needs undefined for null values Object.keys(deal).forEach(key => { if (deal[key] === null) { diff --git a/backend/lib/rpc-service/data-types.js b/backend/lib/rpc-service/data-types.js index 0a219d4..8c8e2e7 100644 --- a/backend/lib/rpc-service/data-types.js +++ b/backend/lib/rpc-service/data-types.js @@ -38,10 +38,17 @@ const RpcRespone = Type.Object({ result: Type.Any() }) +const ChainHead = Type.Object({ + Height: Type.Number(), + Blocks: Type.Any(), + Cids: Type.Any() +}) + export { ClaimEvent, Entry, RawActorEvent, BlockEvent, - RpcRespone + RpcRespone, + ChainHead } diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index cd5e37d..ee9a339 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -4,7 +4,7 @@ import { encode as cborEncode } from '@ipld/dag-cbor' import { rawEventEntriesToEvent } from './utils.js' import { Value } from '@sinclair/typebox/value' import * as util from 'node:util' -import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone } from './data-types.js' +import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone, ChainHead } from './data-types.js' import pRetry from 'p-retry' /** @import { Static } from '@sinclair/typebox' */ @@ -40,8 +40,9 @@ export const rpcRequest = async (method, params) => { } } /** - * @param {object} actorEventFilter + * @param {{fromHeight:number,toHeight:number,fields: any}} actorEventFilter * Returns actor events filtered by the given actorEventFilter + * @param {(method: string, params: any[]) => Promise} makeRpcRequest * @returns {Promise>>} */ export async function getActorEvents (actorEventFilter, makeRpcRequest) { @@ -52,7 +53,7 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) { } // TODO: handle reverted events // https://github.com/filecoin-station/deal-observer/issues/22 - const typedRawEventEntries = rawEvents.map((rawEvent) => Value.Parse(RawActorEvent, rawEvent)) + const typedRawEventEntries = rawEvents.map((/** @type {any} */ rawEvent) => Value.Parse(RawActorEvent, rawEvent)) // An emitted event contains the height at which it was emitted, the emitter and the event itself const emittedEvents = [] for (const typedEventEntries of typedRawEventEntries) { @@ -81,10 +82,15 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) { /** * @param {function} makeRpcRequest - * @returns {Promise} + * @returns {Promise>} */ export async function getChainHead (makeRpcRequest) { - return await makeRpcRequest('Filecoin.ChainHead', []) + const result = await makeRpcRequest('Filecoin.ChainHead', []) + try { + return Value.Parse(ChainHead, result) + } catch (e) { + throw Error(util.format('Failed to parse chain head: %o', result)) + } } /** diff --git a/backend/lib/spark-api-submit-deals.js b/backend/lib/spark-api-submit-deals.js index 0b10718..79ed78e 100644 --- a/backend/lib/spark-api-submit-deals.js +++ b/backend/lib/spark-api-submit-deals.js @@ -7,7 +7,7 @@ import * as Sentry from '@sentry/node' * * @param {PgPool} pgPool * @param {number} batchSize - * @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals + * @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals * @returns {Promise<{submitted: number; ingested: number; skipped: number;}>} Number of deals submitted, ingested and skipped */ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDeals) => { @@ -45,7 +45,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea * * @param {PgPool} pgPool * @param {number} batchSize - * @returns {AsyncGenerator} + * @returns {AsyncGenerator>} */ const findUnsubmittedDeals = async function * (pgPool, batchSize) { const client = await pgPool.connect() @@ -82,7 +82,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { * Mark deals as submitted. * * @param {Queryable} pgPool - * @param {Array} eligibleDeals + * @param {Array} eligibleDeals */ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { await pgPool.query(` @@ -112,7 +112,7 @@ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { * * @param {string} sparkApiBaseURL * @param {string} sparkApiToken - * @param {Array} deals + * @param {Array} deals * @returns {Promise<{ingested: number; skipped: number}>} */ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deals) => { diff --git a/backend/lib/telemetry.js b/backend/lib/telemetry.js index db3dd08..04ce457 100644 --- a/backend/lib/telemetry.js +++ b/backend/lib/telemetry.js @@ -3,6 +3,10 @@ import createDebug from 'debug' const debug = createDebug('spark:deal-observer:telemetry') +/** + * @param {string | undefined} token + * @returns {{influx: InfluxDB,recordTelemetry: (name: string, fn: (p: Point) => void) => void}} + */ export const createInflux = token => { const influx = new InfluxDB({ url: 'https://eu-central-1-1.aws.cloud2.influxdata.com', diff --git a/backend/package.json b/backend/package.json index 82469e6..ef950e9 100644 --- a/backend/package.json +++ b/backend/package.json @@ -9,6 +9,8 @@ "test": "node --test --test-reporter=spec --test-concurrency=1" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/pg-cursor": "^2.7.2", "standard": "^17.1.2" }, "dependencies": { diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index e9f3261..366751b 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -5,8 +5,12 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, import { Value } from '@sinclair/typebox/value' import { BlockEvent } from '../lib/rpc-service/data-types.js' import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js' +/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ describe('deal-observer-backend', () => { + /** + * @type {PgPool} + */ let pgPool before(async () => { pgPool = await createPgPool() @@ -74,6 +78,9 @@ describe('deal-observer-backend', () => { }) it('check number of stored deals', async () => { + /** + * @param {{id:number,provider:number,client:number,pieceCid:string,pieceSize:bigint,termStart:number,termMin:number,termMax:number,sector:bigint}} eventData + */ const storeBlockEvent = async (eventData) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) diff --git a/backend/test/piece-indexer.test.js b/backend/test/piece-indexer.test.js index aa285d1..5483f2a 100644 --- a/backend/test/piece-indexer.test.js +++ b/backend/test/piece-indexer.test.js @@ -8,20 +8,29 @@ import assert from 'assert' import { minerPeerIds } from './test_data/minerInfo.js' import { payloadCIDs } from './test_data/payloadCIDs.js' import { indexPieces } from '../lib/piece-indexer.js' +/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ describe('deal-observer-backend piece indexer', () => { + /** + * @param {string} method + * @param {any[]} params + * @returns + */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) case 'Filecoin.StateMinerInfo': return minerPeerIds.get(params[0]) default: console.error('Unknown method') } } + /** + * @type {PgPool} + */ let pgPool before(async () => { pgPool = await createPgPool() @@ -46,6 +55,11 @@ describe('deal-observer-backend piece indexer', () => { it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => { const getDealPayloadCidCalls = [] + /** + * @param {number} providerId + * @param {string} pieceCid + * @returns {Promise} + */ const getDealPayloadCid = async (providerId, pieceCid) => { getDealPayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index 9ab57e6..1335c07 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -8,12 +8,17 @@ import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' describe('RpcApiClient', () => { + /** + * @param {string} method + * @param {any[]} params + * @returns + */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) default: console.error('Unknown method') } @@ -23,7 +28,8 @@ describe('RpcApiClient', () => { const chainHead = await getChainHead(makeRpcRequest) assert(chainHead) const expected = parse(JSON.stringify(chainHeadTestData)) - assert.deepStrictEqual(chainHead, expected) + assert(chainHead.Height) + assert.deepStrictEqual(expected.Height, chainHead.Height) }) for (let blockHeight = 4622129; blockHeight < 4622129 + 11; blockHeight++) { diff --git a/backend/test/spark-api-submit-deals.test.js b/backend/test/spark-api-submit-deals.test.js index 741980e..93c2c24 100644 --- a/backend/test/spark-api-submit-deals.test.js +++ b/backend/test/spark-api-submit-deals.test.js @@ -3,8 +3,13 @@ import { after, before, beforeEach, describe, it, mock } from 'node:test' import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { calculateActiveDealEpochs, daysAgo, daysFromNow, today } from './test-helpers.js' import { findAndSubmitUnsubmittedDeals } from '../lib/spark-api-submit-deals.js' +/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ +/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ describe('Submit deals to spark-api', () => { + /** + * @type {PgPool} + */ let pgPool before(async () => { @@ -91,6 +96,10 @@ describe('Submit deals to spark-api', () => { }) }) +/** + * @param {Queryable} pgPool + * @param {*} param1 + */ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId = 2, clientId = 3, pieceCid = 'cidone', payloadCid = null }) => { const { activatedAtEpoch, termStart, termMin, termMax } = calculateActiveDealEpochs(createdAt, startsAt, expiresAt) await pgPool.query( @@ -103,12 +112,7 @@ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId // TODO: allow callers of this helper to define how many deals should be reported as skipped const createSubmitEligibleDealsMock = () => { - return mock.fn( - // original - unused param - () => {}, - // implementation - async (deals) => { - return { ingested: deals.length, skipped: 0 } - } - ) + return mock.fn(async (deals) => { + return { ingested: deals.length, skipped: 0 } + }) } diff --git a/backend/test/test-helpers.js b/backend/test/test-helpers.js index 84370ad..d8c5f06 100644 --- a/backend/test/test-helpers.js +++ b/backend/test/test-helpers.js @@ -14,8 +14,8 @@ export const getLocalDayAsISOString = (d) => { export const today = () => getLocalDayAsISOString(new Date()) export const yesterday = () => getLocalDayAsISOString(new Date(Date.now() - 24 * 60 * 60 * 1000)) -export const daysAgo = (n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000)) -export const daysFromNow = (n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000)) +export const daysAgo = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() - n * 24 * 60 * 60 * 1000)) +export const daysFromNow = (/** @type {number} */ n) => getLocalDayAsISOString(new Date(Date.now() + n * 24 * 60 * 60 * 1000)) /** * Calculates activated at, term start, term min, and term max. diff --git a/db/index.js b/db/index.js index 752f8dd..3f734e2 100644 --- a/db/index.js +++ b/db/index.js @@ -33,7 +33,11 @@ const poolConfig = { maxLifetimeSeconds: 60 } -const onError = err => { +/** + * @param {Error} err + * @returns {void} + */ +const onError = (err) => { // Prevent crashing the process on idle client errors, the pool will recover // itself. If all connections are lost, the process will still crash. // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 diff --git a/package-lock.json b/package-lock.json index 0c90e66..0bffcdc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "devDependencies": { "@types/mocha": "^10.0.10", "@types/pg": "^8.11.11", + "@types/slug": "^5.0.9", "prettier": "^3.4.2", "standard": "^17.1.2", "typescript": "^5.7.3" @@ -54,6 +55,8 @@ "slug": "^10.0.0" }, "devDependencies": { + "@types/debug": "^4.1.12", + "@types/pg-cursor": "^2.7.2", "standard": "^17.1.2" } }, @@ -1240,6 +1243,16 @@ "@types/node": "*" } }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/json5": { "version": "0.0.29", "resolved": "https://registry.npmjs.org/@types/json5/-/json5-0.0.29.tgz", @@ -1254,6 +1267,13 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ms": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-2.1.0.tgz", + "integrity": "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/mysql": { "version": "2.15.26", "resolved": "https://registry.npmjs.org/@types/mysql/-/mysql-2.15.26.tgz", @@ -1283,6 +1303,17 @@ "pg-types": "^4.0.1" } }, + "node_modules/@types/pg-cursor": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/@types/pg-cursor/-/pg-cursor-2.7.2.tgz", + "integrity": "sha512-m3xT8bVFCvx98LuzbvXyuCdT/Hjdd/v8ml4jL4K1QF70Y8clOfCFdgoaEB1FWdcSwcpoFYZTJQaMD9/GQ27efQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*", + "@types/pg": "*" + } + }, "node_modules/@types/pg-pool": { "version": "2.0.6", "resolved": "https://registry.npmjs.org/@types/pg-pool/-/pg-pool-2.0.6.tgz", @@ -1304,6 +1335,13 @@ "integrity": "sha512-UE7oxhQLLd9gub6JKIAhDq06T0F6FnztwMNRvYgjeQSBeMc1ZG/tA47EwfduvkuQS8apbkM/lpLpWsaCeYsXVg==", "license": "MIT" }, + "node_modules/@types/slug": { + "version": "5.0.9", + "resolved": "https://registry.npmjs.org/@types/slug/-/slug-5.0.9.tgz", + "integrity": "sha512-6Yp8BSplP35Esa/wOG1wLNKiqXevpQTEF/RcL/NV6BBQaMmZh4YlDwCgrrFSoUE4xAGvnKd5c+lkQJmPrBAzfQ==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/tedious": { "version": "4.0.14", "resolved": "https://registry.npmjs.org/@types/tedious/-/tedious-4.0.14.tgz", diff --git a/package.json b/package.json index 3da2cf6..48ad857 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "devDependencies": { "@types/mocha": "^10.0.10", "@types/pg": "^8.11.11", + "@types/slug": "^5.0.9", "prettier": "^3.4.2", "standard": "^17.1.2", "typescript": "^5.7.3" diff --git a/tsconfig.json b/tsconfig.json index 04bbb8d..8af993b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,8 +12,7 @@ "isolatedModules": true, "verbatimModuleSyntax": true, - // TODO - // "strict": true, + "strict": true, "forceConsistentCasingInFileNames": true, "declaration": true, From f3621d6df03e8ad8e28d65d5edd56ee7f79001ec Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 4 Feb 2025 10:41:27 +0000 Subject: [PATCH 02/22] moved slug --- backend/package.json | 1 + package-lock.json | 2 +- package.json | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/package.json b/backend/package.json index ef950e9..a20b0ea 100644 --- a/backend/package.json +++ b/backend/package.json @@ -11,6 +11,7 @@ "devDependencies": { "@types/debug": "^4.1.12", "@types/pg-cursor": "^2.7.2", + "@types/slug": "^5.0.9", "standard": "^17.1.2" }, "dependencies": { diff --git a/package-lock.json b/package-lock.json index 0bffcdc..14d7689 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,6 @@ "devDependencies": { "@types/mocha": "^10.0.10", "@types/pg": "^8.11.11", - "@types/slug": "^5.0.9", "prettier": "^3.4.2", "standard": "^17.1.2", "typescript": "^5.7.3" @@ -57,6 +56,7 @@ "devDependencies": { "@types/debug": "^4.1.12", "@types/pg-cursor": "^2.7.2", + "@types/slug": "^5.0.9", "standard": "^17.1.2" } }, diff --git a/package.json b/package.json index 48ad857..3da2cf6 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,6 @@ "devDependencies": { "@types/mocha": "^10.0.10", "@types/pg": "^8.11.11", - "@types/slug": "^5.0.9", "prettier": "^3.4.2", "standard": "^17.1.2", "typescript": "^5.7.3" From ca64e6d4916081ef94639b9530c4638e478d7dcb Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 4 Feb 2025 10:57:35 +0000 Subject: [PATCH 03/22] fmt --- backend/lib/rpc-service/utils.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/backend/lib/rpc-service/utils.js b/backend/lib/rpc-service/utils.js index 3d54b8e..75655d6 100644 --- a/backend/lib/rpc-service/utils.js +++ b/backend/lib/rpc-service/utils.js @@ -2,6 +2,10 @@ import { base64pad } from 'multiformats/bases/base64' import { decode as cborDecode } from '@ipld/dag-cbor' import * as util from 'node:util' +/** + * @param {string} data + * @returns + */ const decodeCborInBase64 = (data) => { return cborDecode(base64pad.baseDecode(data)) } @@ -14,6 +18,7 @@ const decodeCborInBase64 = (data) => { */ const rawEventEntriesToEvent = (rawEventEntries) => { // Each event is defined by a list of event entries which will parsed into a typed event + /** @type {Record} */ const event = {} let eventType for (const entry of rawEventEntries) { From a2f4e92b0589be1e71036c63cd252ad4ee5224a5 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 4 Feb 2025 12:59:17 +0000 Subject: [PATCH 04/22] use clam event --- backend/test/deal-observer.test.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index 366751b..3eb097b 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -3,9 +3,10 @@ import { after, before, beforeEach, describe, it } from 'node:test' import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals } from '../lib/deal-observer.js' import { Value } from '@sinclair/typebox/value' -import { BlockEvent } from '../lib/rpc-service/data-types.js' +import { BlockEvent, ClaimEvent } from '../lib/rpc-service/data-types.js' import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js' /** @import {PgPool} from '@filecoin-station/deal-observer-db' */ +/** @import { Static } from '@sinclair/typebox' */ describe('deal-observer-backend', () => { /** @@ -79,14 +80,14 @@ describe('deal-observer-backend', () => { it('check number of stored deals', async () => { /** - * @param {{id:number,provider:number,client:number,pieceCid:string,pieceSize:bigint,termStart:number,termMin:number,termMax:number,sector:bigint}} eventData + * @param {Static} eventData */ const storeBlockEvent = async (eventData) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) } - const data = { + const data = Value.Parse(ClaimEvent, { id: 1, provider: 2, client: 3, @@ -96,7 +97,7 @@ describe('deal-observer-backend', () => { termMin: 12340, termMax: 12340, sector: 6n - } + }) assert.strictEqual(await countStoredActiveDeals(pgPool), 0n) await storeBlockEvent(data) assert.strictEqual(await countStoredActiveDeals(pgPool), 1n) From 686113097a55be7a3c758730d3eb1ab247acec4e Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 5 Feb 2025 08:21:09 +0000 Subject: [PATCH 05/22] fmt --- backend/test/spark-api-submit-deals.test.js | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/backend/test/spark-api-submit-deals.test.js b/backend/test/spark-api-submit-deals.test.js index 93c2c24..aa2a98e 100644 --- a/backend/test/spark-api-submit-deals.test.js +++ b/backend/test/spark-api-submit-deals.test.js @@ -98,7 +98,15 @@ describe('Submit deals to spark-api', () => { /** * @param {Queryable} pgPool - * @param {*} param1 + * @param {Object} activeDeal + * @param {string} activeDeal.createdAt + * @param {string} activeDeal.startsAt + * @param {string} activeDeal.expiresAt + * @param {number} [activeDeal.minerId=2] + * @param {number} [activeDeal.clientId=3] + * @param {string} [activeDeal.pieceCid='cidone'] + * @param {string | null} [activeDeal.payloadCid=null] + * @returns {Promise} */ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId = 2, clientId = 3, pieceCid = 'cidone', payloadCid = null }) => { const { activatedAtEpoch, termStart, termMin, termMax } = calculateActiveDealEpochs(createdAt, startsAt, expiresAt) From e1a7484fe0d5ad103c9f8a31d69f78be42f449a3 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Tue, 11 Feb 2025 09:30:48 +0000 Subject: [PATCH 06/22] add SubmittableDeal type --- backend/lib/spark-api-submit-deals.js | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/backend/lib/spark-api-submit-deals.js b/backend/lib/spark-api-submit-deals.js index 0b10718..9e5f38f 100644 --- a/backend/lib/spark-api-submit-deals.js +++ b/backend/lib/spark-api-submit-deals.js @@ -1,13 +1,17 @@ /** @import {PgPool, Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import { Static } from '@sinclair/typebox' */ +/** @import { ActiveDealDbEntry, PayloadRetrievabilityStateType } from '@filecoin-station/deal-observer-db/lib/types.js' */ import Cursor from 'pg-cursor' import * as Sentry from '@sentry/node' +import { Type } from '@sinclair/typebox' +import { Value } from '@sinclair/typebox/value' /** * Finds deals that haven't been submitted to the Spark API yet and submits them. * * @param {PgPool} pgPool * @param {number} batchSize - * @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals + * @param {(eligibleDeals: Array>) => Promise<{ingested: number; skipped: number}>} submitDeals * @returns {Promise<{submitted: number; ingested: number; skipped: number;}>} Number of deals submitted, ingested and skipped */ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDeals) => { @@ -45,7 +49,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea * * @param {PgPool} pgPool * @param {number} batchSize - * @returns {AsyncGenerator} + * @returns {AsyncGenerator>, void, unknown>} */ const findUnsubmittedDeals = async function * (pgPool, batchSize) { const client = await pgPool.connect() @@ -72,7 +76,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { while (true) { const rows = await cursor.read(batchSize) if (rows.length === 0) break - yield rows + yield rows.map(row => Value.Parse(SubmittableDeal, row)) } client.release() @@ -82,7 +86,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { * Mark deals as submitted. * * @param {Queryable} pgPool - * @param {Array} eligibleDeals + * @param {Array>} eligibleDeals */ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { await pgPool.query(` @@ -112,7 +116,7 @@ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { * * @param {string} sparkApiBaseURL * @param {string} sparkApiToken - * @param {Array} deals + * @param {Array>} deals * @returns {Promise<{ingested: number; skipped: number}>} */ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deals) => { @@ -147,3 +151,12 @@ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deal return /** @type {{ingested: number; skipped: number}} */ (await response.json()) } + +const SubmittableDeal = Type.Object({ + miner_id: Type.Number(), + client_id: Type.Number(), + piece_cid: Type.String(), + piece_size: Type.BigInt(), + payload_cid: Type.String(), + expires_at: Type.Date() +}) From 2072510bf9823192a400439c47d82834ac6e1453 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Wed, 12 Feb 2025 10:35:03 +0000 Subject: [PATCH 07/22] Revert "add SubmittableDeal type" This reverts commit e1a7484fe0d5ad103c9f8a31d69f78be42f449a3. --- backend/lib/spark-api-submit-deals.js | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/backend/lib/spark-api-submit-deals.js b/backend/lib/spark-api-submit-deals.js index 9e5f38f..0b10718 100644 --- a/backend/lib/spark-api-submit-deals.js +++ b/backend/lib/spark-api-submit-deals.js @@ -1,17 +1,13 @@ /** @import {PgPool, Queryable} from '@filecoin-station/deal-observer-db' */ -/** @import { Static } from '@sinclair/typebox' */ -/** @import { ActiveDealDbEntry, PayloadRetrievabilityStateType } from '@filecoin-station/deal-observer-db/lib/types.js' */ import Cursor from 'pg-cursor' import * as Sentry from '@sentry/node' -import { Type } from '@sinclair/typebox' -import { Value } from '@sinclair/typebox/value' /** * Finds deals that haven't been submitted to the Spark API yet and submits them. * * @param {PgPool} pgPool * @param {number} batchSize - * @param {(eligibleDeals: Array>) => Promise<{ingested: number; skipped: number}>} submitDeals + * @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals * @returns {Promise<{submitted: number; ingested: number; skipped: number;}>} Number of deals submitted, ingested and skipped */ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDeals) => { @@ -49,7 +45,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea * * @param {PgPool} pgPool * @param {number} batchSize - * @returns {AsyncGenerator>, void, unknown>} + * @returns {AsyncGenerator} */ const findUnsubmittedDeals = async function * (pgPool, batchSize) { const client = await pgPool.connect() @@ -76,7 +72,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { while (true) { const rows = await cursor.read(batchSize) if (rows.length === 0) break - yield rows.map(row => Value.Parse(SubmittableDeal, row)) + yield rows } client.release() @@ -86,7 +82,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { * Mark deals as submitted. * * @param {Queryable} pgPool - * @param {Array>} eligibleDeals + * @param {Array} eligibleDeals */ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { await pgPool.query(` @@ -116,7 +112,7 @@ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { * * @param {string} sparkApiBaseURL * @param {string} sparkApiToken - * @param {Array>} deals + * @param {Array} deals * @returns {Promise<{ingested: number; skipped: number}>} */ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deals) => { @@ -151,12 +147,3 @@ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deal return /** @type {{ingested: number; skipped: number}} */ (await response.json()) } - -const SubmittableDeal = Type.Object({ - miner_id: Type.Number(), - client_id: Type.Number(), - piece_cid: Type.String(), - piece_size: Type.BigInt(), - payload_cid: Type.String(), - expires_at: Type.Date() -}) From 8e1cb9310d62d6140c0b3f811df46c971093936a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 10:28:12 +0100 Subject: [PATCH 08/22] introduce fully typed MakeRpcRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/bin/deal-observer-backend.js | 6 ++++-- backend/lib/deal-observer.js | 3 ++- backend/lib/piece-indexer.js | 3 ++- backend/lib/rpc-service/service.js | 19 +++++++++++++------ backend/test/piece-indexer.test.js | 18 +++++++++++------- backend/test/rpc-client.test.js | 17 ++++++++++------- 6 files changed, 42 insertions(+), 24 deletions(-) diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 222c5c1..d507d9b 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -11,7 +11,9 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuil import { indexPieces } from '../lib/piece-indexer.js' import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js' import { getDealPayloadCid } from '../lib/piece-indexer-service.js' + /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {MakeRpcRequest} from '../lib/typings.js' */ const { INFLUXDB_TOKEN, @@ -39,7 +41,7 @@ const pgPool = await createPgPool() const { recordTelemetry } = createInflux(INFLUXDB_TOKEN) /** - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @param {Queryable} pgPool */ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { @@ -132,7 +134,7 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken, } /** - * @param {(method:string,params:object) => object} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @param {(providerId:string,pieceCid:string) => Promise} getDealPayloadCid * @param {*} pgPool */ diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index 05d1c5a..df6b8e4 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -1,5 +1,6 @@ /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ +/** @import {MakeRpcRequest} from './typings.js' */ import { getActorEvents, getActorEventsFilter } from './rpc-service/service.js' import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' @@ -9,7 +10,7 @@ import { convertBlockEventToActiveDealDbEntry } from './utils.js' /** * @param {number} blockHeight * @param {Queryable} pgPool - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise} */ export async function observeBuiltinActorEvents (blockHeight, pgPool, makeRpcRequest) { diff --git a/backend/lib/piece-indexer.js b/backend/lib/piece-indexer.js index ab774d7..2436108 100644 --- a/backend/lib/piece-indexer.js +++ b/backend/lib/piece-indexer.js @@ -5,10 +5,11 @@ import { getMinerPeerId } from './rpc-service/service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ /** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */ +/** @import {MakeRpcRequest} from './typings.js' */ /** * - * @param {function} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @param {function} getDealPayloadCid * @param {Queryable} pgPool * @param {number} maxDeals diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index ee9a339..d97e26d 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -6,7 +6,9 @@ import { Value } from '@sinclair/typebox/value' import * as util from 'node:util' import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone, ChainHead } from './data-types.js' import pRetry from 'p-retry' + /** @import { Static } from '@sinclair/typebox' */ +/** @import {MakeRpcRequest} from '../typings.js' */ /** * @param {string} method @@ -42,18 +44,19 @@ export const rpcRequest = async (method, params) => { /** * @param {{fromHeight:number,toHeight:number,fields: any}} actorEventFilter * Returns actor events filtered by the given actorEventFilter - * @param {(method: string, params: any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise>>} */ export async function getActorEvents (actorEventFilter, makeRpcRequest) { - const rawEvents = await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter]) + /** @typedef {unknown[]} ActorEventsRaw */ + const rawEvents = /** @type {ActorEventsRaw} */(await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter])) if (!rawEvents || rawEvents.length === 0) { console.debug(`No actor events found in the height range ${actorEventFilter.fromHeight} - ${actorEventFilter.toHeight}.`) return [] } // TODO: handle reverted events // https://github.com/filecoin-station/deal-observer/issues/22 - const typedRawEventEntries = rawEvents.map((/** @type {any} */ rawEvent) => Value.Parse(RawActorEvent, rawEvent)) + const typedRawEventEntries = rawEvents.map((rawEvent) => Value.Parse(RawActorEvent, rawEvent)) // An emitted event contains the height at which it was emitted, the emitter and the event itself const emittedEvents = [] for (const typedEventEntries of typedRawEventEntries) { @@ -81,7 +84,7 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) { } /** - * @param {function} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise>} */ export async function getChainHead (makeRpcRequest) { @@ -95,13 +98,17 @@ export async function getChainHead (makeRpcRequest) { /** * @param {number} minerId - * @param {function} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise} */ export async function getMinerPeerId (minerId, makeRpcRequest) { + /** @typedef {{ + * PeerId: string; + * }} MinerInfo + */ try { const params = getMinerInfoCallParams(minerId) - const res = await makeRpcRequest('Filecoin.StateMinerInfo', params) + const res = /** @type {MinerInfo} */(await makeRpcRequest('Filecoin.StateMinerInfo', params)) if (!res || !res.PeerId) { throw Error(`Failed to get peer ID for miner ${minerId}, result: ${res}`) } diff --git a/backend/test/piece-indexer.test.js b/backend/test/piece-indexer.test.js index 5483f2a..deded11 100644 --- a/backend/test/piece-indexer.test.js +++ b/backend/test/piece-indexer.test.js @@ -8,21 +8,25 @@ import assert from 'assert' import { minerPeerIds } from './test_data/minerInfo.js' import { payloadCIDs } from './test_data/payloadCIDs.js' import { indexPieces } from '../lib/piece-indexer.js' + /** @import {PgPool} from '@filecoin-station/deal-observer-db' */ +/** @import {MakeRpcRequest} from '../lib/typings.js' */ describe('deal-observer-backend piece indexer', () => { - /** - * @param {string} method - * @param {any[]} params - * @returns - */ + /** @type {MakeRpcRequest} */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } case 'Filecoin.StateMinerInfo': + assert(typeof params[0] === 'string', 'params[0] must be a string') return minerPeerIds.get(params[0]) default: console.error('Unknown method') diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index 1335c07..1db150e 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -7,18 +7,21 @@ import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from ' import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' +/** @import {MakeRpcRequest} from '../lib/typings.js' */ + describe('RpcApiClient', () => { - /** - * @param {string} method - * @param {any[]} params - * @returns - */ + /** @type {MakeRpcRequest} */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } default: console.error('Unknown method') } From cddc0cafa7f60a07ebca222140385334fc23055c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 10:32:15 +0100 Subject: [PATCH 09/22] introduce shared type alias GetDealPayloadCid MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/bin/deal-observer-backend.js | 4 ++-- backend/lib/piece-indexer.js | 4 ++-- backend/test/piece-indexer.test.js | 10 +++------- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index d507d9b..c1e3fbf 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -13,7 +13,7 @@ import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spa import { getDealPayloadCid } from '../lib/piece-indexer-service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ -/** @import {MakeRpcRequest} from '../lib/typings.js' */ +/** @import {MakeRpcRequest, GetDealPayloadCid} from '../lib/typings.js' */ const { INFLUXDB_TOKEN, @@ -135,7 +135,7 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken, /** * @param {MakeRpcRequest} makeRpcRequest - * @param {(providerId:string,pieceCid:string) => Promise} getDealPayloadCid + * @param {GetDealPayloadCid} getDealPayloadCid * @param {*} pgPool */ export const pieceIndexerLoop = async (makeRpcRequest, getDealPayloadCid, pgPool) => { diff --git a/backend/lib/piece-indexer.js b/backend/lib/piece-indexer.js index 2436108..e326088 100644 --- a/backend/lib/piece-indexer.js +++ b/backend/lib/piece-indexer.js @@ -5,12 +5,12 @@ import { getMinerPeerId } from './rpc-service/service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ /** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */ -/** @import {MakeRpcRequest} from './typings.js' */ +/** @import {MakeRpcRequest, GetDealPayloadCid} from './typings.js' */ /** * * @param {MakeRpcRequest} makeRpcRequest - * @param {function} getDealPayloadCid + * @param {GetDealPayloadCid} getDealPayloadCid * @param {Queryable} pgPool * @param {number} maxDeals * @returns {Promise} diff --git a/backend/test/piece-indexer.test.js b/backend/test/piece-indexer.test.js index deded11..005f7ea 100644 --- a/backend/test/piece-indexer.test.js +++ b/backend/test/piece-indexer.test.js @@ -10,7 +10,7 @@ import { payloadCIDs } from './test_data/payloadCIDs.js' import { indexPieces } from '../lib/piece-indexer.js' /** @import {PgPool} from '@filecoin-station/deal-observer-db' */ -/** @import {MakeRpcRequest} from '../lib/typings.js' */ +/** @import {MakeRpcRequest, GetDealPayloadCid} from '../lib/typings.js' */ describe('deal-observer-backend piece indexer', () => { /** @type {MakeRpcRequest} */ @@ -59,15 +59,11 @@ describe('deal-observer-backend piece indexer', () => { it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => { const getDealPayloadCidCalls = [] - /** - * @param {number} providerId - * @param {string} pieceCid - * @returns {Promise} - */ + /** @type {GetDealPayloadCid} */ const getDealPayloadCid = async (providerId, pieceCid) => { getDealPayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid + return payloadCid?.payloadCid ?? null } assert.strictEqual( From 7fab2e979a0c7b35d6481b727df89a6a9d88341b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 10:46:46 +0100 Subject: [PATCH 10/22] introduce UnknownRow and QueryResultWithUnknownRows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/lib/deal-observer.js | 6 ++++-- backend/lib/piece-indexer.js | 2 +- backend/lib/spark-api-submit-deals.js | 6 +++--- backend/test/spark-api-submit-deals.test.js | 8 ++++++-- db/index.js | 2 ++ db/typings.d.ts | 2 ++ 6 files changed, 18 insertions(+), 8 deletions(-) diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index df6b8e4..ff07fa8 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -1,4 +1,4 @@ -/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ /** @import {MakeRpcRequest} from './typings.js' */ @@ -99,7 +99,9 @@ export async function storeActiveDeals (activeDeals, pgPool) { * @returns {Promise>>} */ export async function loadDeals (pgPool, query, args = []) { - const result = (await pgPool.query(query, args)).rows.map((/** @type {any} */ deal) => { + /** @type {QueryResultWithUnknownRows} */ + const { rows } = await pgPool.query(query, args) + const result = rows.map((deal) => { // SQL used null, typebox needs undefined for null values Object.keys(deal).forEach(key => { if (deal[key] === null) { diff --git a/backend/lib/piece-indexer.js b/backend/lib/piece-indexer.js index e326088..04c8030 100644 --- a/backend/lib/piece-indexer.js +++ b/backend/lib/piece-indexer.js @@ -2,7 +2,7 @@ import { loadDeals } from './deal-observer.js' import * as util from 'node:util' import { getMinerPeerId } from './rpc-service/service.js' -/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {Queryable, UnknownRow} from '@filecoin-station/deal-observer-db' */ /** @import { Static } from '@sinclair/typebox' */ /** @import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' */ /** @import {MakeRpcRequest, GetDealPayloadCid} from './typings.js' */ diff --git a/backend/lib/spark-api-submit-deals.js b/backend/lib/spark-api-submit-deals.js index 79ed78e..32a467c 100644 --- a/backend/lib/spark-api-submit-deals.js +++ b/backend/lib/spark-api-submit-deals.js @@ -1,4 +1,4 @@ -/** @import {PgPool, Queryable} from '@filecoin-station/deal-observer-db' */ +/** @import {PgPool, Queryable, UnknownRow} from '@filecoin-station/deal-observer-db' */ import Cursor from 'pg-cursor' import * as Sentry from '@sentry/node' @@ -45,7 +45,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea * * @param {PgPool} pgPool * @param {number} batchSize - * @returns {AsyncGenerator>} + * @returns {AsyncGenerator} */ const findUnsubmittedDeals = async function * (pgPool, batchSize) { const client = await pgPool.connect() @@ -82,7 +82,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) { * Mark deals as submitted. * * @param {Queryable} pgPool - * @param {Array} eligibleDeals + * @param {Array} eligibleDeals */ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => { await pgPool.query(` diff --git a/backend/test/spark-api-submit-deals.test.js b/backend/test/spark-api-submit-deals.test.js index aa2a98e..40aad08 100644 --- a/backend/test/spark-api-submit-deals.test.js +++ b/backend/test/spark-api-submit-deals.test.js @@ -3,8 +3,8 @@ import { after, before, beforeEach, describe, it, mock } from 'node:test' import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' import { calculateActiveDealEpochs, daysAgo, daysFromNow, today } from './test-helpers.js' import { findAndSubmitUnsubmittedDeals } from '../lib/spark-api-submit-deals.js' -/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ -/** @import {Queryable} from '@filecoin-station/deal-observer-db' */ + +/** @import {PgPool, Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ describe('Submit deals to spark-api', () => { /** @@ -39,6 +39,7 @@ describe('Submit deals to spark-api', () => { const mockSubmitEligibleDeals = createSubmitEligibleDealsMock() const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 2) @@ -53,6 +54,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 2) @@ -70,6 +72,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 1) assert.strictEqual(ingested, 1) @@ -87,6 +90,7 @@ describe('Submit deals to spark-api', () => { // two deals are eligible for submission, batchSize is 1 const { submitted, ingested, skipped } = await findAndSubmitUnsubmittedDeals(pgPool, batchSize, mockSubmitEligibleDeals) + /** @type {QueryResultWithUnknownRows} */ const { rows } = await pgPool.query('SELECT * FROM active_deals WHERE submitted_at IS NOT NULL') assert.strictEqual(submitted, 2) assert.strictEqual(ingested, 1) diff --git a/db/index.js b/db/index.js index 3f734e2..5c37660 100644 --- a/db/index.js +++ b/db/index.js @@ -6,6 +6,8 @@ import Postgrator from 'postgrator' // re-export types /** @typedef {import('./typings.js').Queryable} Queryable */ /** @typedef {import('./typings.js').PgPool} PgPool */ +/** @typedef {import('./typings.js').UnknownRow} UnknownRow */ +/** @typedef {import('./typings.js').QueryResultWithUnknownRows} QueryResultWithUnknownRows */ // Configure node-postgres to deserialize BIGINT values as BigInt, not String pg.types.setTypeParser(20, BigInt) // Type Id 20 = BIGINT | BIGSERIAL diff --git a/db/typings.d.ts b/db/typings.d.ts index 419a754..35aa4e0 100644 --- a/db/typings.d.ts +++ b/db/typings.d.ts @@ -4,3 +4,5 @@ export type PgPool = pg.Pool // Copied from import('@types/pg'). export type Queryable = Pick +export type UnknownRow = Record +export type QueryResultWithUnknownRows = pg.QueryResult From 7463a1111c604b6c56eccb204805d5e176ca854e Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Fri, 21 Feb 2025 13:20:20 +0100 Subject: [PATCH 11/22] chore: add typings file --- backend/bin/deal-observer-backend.js | 4 +- backend/lib/deal-observer.js | 13 ++-- backend/lib/resolve-payload-cids.js | 15 ++-- backend/lib/rpc-service/data-types.js | 14 ++-- backend/lib/rpc-service/service.js | 10 +-- backend/lib/typings.d.ts | 2 + backend/test/deal-observer.test.js | 14 +++- backend/test/piece-indexer.test.js | 80 --------------------- backend/test/resolve-payload-cids.test.js | 25 +++++-- backend/test/rpc-client.test.js | 2 +- backend/test/spark-api-submit-deals.test.js | 1 + 11 files changed, 67 insertions(+), 113 deletions(-) create mode 100644 backend/lib/typings.d.ts delete mode 100644 backend/test/piece-indexer.test.js diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index dbdf2e6..3c2a4a2 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -11,7 +11,7 @@ import { countStoredActiveDealsWithUnresolvedPayloadCid, resolvePayloadCids, cou import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js' import { payloadCidRequest } from '../lib/piece-indexer-service.js' /** @import {Queryable} from '@filecoin-station/deal-observer-db' */ -/** @import {MakeRpcRequest, GetDealPayloadCid} from '../lib/typings.js' */ +/** @import {MakeRpcRequest, MakePayloadCidRequest} from '../lib/typings.d.ts' */ const { INFLUXDB_TOKEN, @@ -123,7 +123,7 @@ const sparkApiSubmitDealsLoop = async (pgPool, { sparkApiBaseUrl, sparkApiToken, /** * @param {MakeRpcRequest} makeRpcRequest - * @param {GetDealPayloadCid} getDealPayloadCid + * @param {MakePayloadCidRequest} makePayloadCidRequest * @param {Queryable} pgPool */ export const resolvePayloadCidsLoop = async (makeRpcRequest, makePayloadCidRequest, pgPool) => { diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index d391b94..a8ba239 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -1,15 +1,14 @@ -/** @import {Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ -/** @import { Static } from '@sinclair/typebox' */ -/** @import {MakeRpcRequest} from './typings.js' */ - import { getActorEvents, getActorEventsFilter, getChainHead } from './rpc-service/service.js' import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js' import { Value } from '@sinclair/typebox/value' import { convertBlockEventToActiveDealDbEntry } from './utils.js' +/** @import {Queryable, QueryResultWithUnknownRows} from '@filecoin-station/deal-observer-db' */ +/** @import { Static } from '@sinclair/typebox' */ +/** @import {MakeRpcRequest} from './typings.d.ts' */ /** * @param {Queryable} pgPool - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest * @param {number} maxPastEpochs * @param {number} finalityEpochs * @returns {Promise} @@ -19,7 +18,7 @@ export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastE const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) const startEpoch = Math.max( currentChainHead.Height - maxPastEpochs, - (lastInsertedDeal?.activated_at_epoch + 1) || 0 + ((lastInsertedDeal?.activated_at_epoch ?? -1) + 1) || 0 ) const endEpoch = currentChainHead.Height - finalityEpochs for (let epoch = startEpoch; epoch <= endEpoch; epoch++) { @@ -30,7 +29,7 @@ export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastE /** * @param {number} blockHeight * @param {Queryable} pgPool - * @param {(method:string,params:any[]) => Promise} makeRpcRequest + * @param {MakeRpcRequest} makeRpcRequest */ export const fetchAndStoreActiveDeals = async (blockHeight, pgPool, makeRpcRequest) => { const eventType = 'claim' diff --git a/backend/lib/resolve-payload-cids.js b/backend/lib/resolve-payload-cids.js index 42d386a..1a783c0 100644 --- a/backend/lib/resolve-payload-cids.js +++ b/backend/lib/resolve-payload-cids.js @@ -11,8 +11,8 @@ const THREE_DAYS_IN_MILLISECONDS = 1000 * 60 * 60 * 24 * 3 /** * - * @param {function} makeRpcRequest - * @param {function} makePayloadCidRequest + * @param {import('./typings.js').MakeRpcRequest} makeRpcRequest + * @param {import('./typings.js').MakePayloadCidRequest} makePayloadCidRequest * @param {Queryable} pgPool * @param {number} maxDeals * @returns {Promise} @@ -21,7 +21,8 @@ export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, let payloadCidsResolved = 0 for (const deal of await fetchDealsWithUnresolvedPayloadCid(pgPool, maxDeals, new Date(now - THREE_DAYS_IN_MILLISECONDS))) { const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest) - deal.payload_cid = await makePayloadCidRequest(minerPeerId, deal.piece_cid) + const payloadCid = await makePayloadCidRequest(minerPeerId, deal.piece_cid) + deal.payload_cid = payloadCid !== null ? payloadCid : undefined if (!deal.payload_cid) { if (deal.last_payload_retrieval_attempt) { deal.payload_retrievability_state = PayloadRetrievabilityState.TerminallyUnretrievable @@ -46,9 +47,13 @@ export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, */ export async function fetchDealsWithUnresolvedPayloadCid (pgPool, maxDeals, now) { const query = "SELECT * FROM active_deals WHERE payload_cid IS NULL AND (payload_retrievability_state = 'PAYLOAD_CID_NOT_QUERIED_YET' OR payload_retrievability_state = 'PAYLOAD_CID_UNRESOLVED') AND (last_payload_retrieval_attempt IS NULL OR last_payload_retrieval_attempt < $1) ORDER BY activated_at_epoch ASC LIMIT $2" - return await loadDeals(pgPool, query, [now, maxDeals]) + return await loadDeals(pgPool, query, [now.toISOString(), maxDeals]) } +/** + * @param {Queryable} pgPool + * @returns {Promise} + */ export async function countStoredActiveDealsWithUnresolvedPayloadCid (pgPool) { const query = 'SELECT COUNT(*) FROM active_deals WHERE payload_cid IS NULL' const result = await pgPool.query(query) @@ -70,7 +75,7 @@ export async function countRevertedActiveDeals (pgPool) { * @param {Static} deal * @param {Static< typeof PayloadRetrievabilityStateType>} newPayloadRetrievalState * @param {Date} lastRetrievalAttemptTimestamp - * @param {string} newPayloadCid + * @param {string | undefined} newPayloadCid * @returns { Promise} */ async function updatePayloadCidInActiveDeal (pgPool, deal, newPayloadRetrievalState, lastRetrievalAttemptTimestamp, newPayloadCid) { diff --git a/backend/lib/rpc-service/data-types.js b/backend/lib/rpc-service/data-types.js index a6322a4..0719da1 100644 --- a/backend/lib/rpc-service/data-types.js +++ b/backend/lib/rpc-service/data-types.js @@ -24,8 +24,8 @@ const RawActorEvent = Type.Object({ entries: Type.Array(Entry), height: Type.Number(), reverted: Type.Boolean(), - msgCid: Type.Any(), - tipsetKey: Type.Array(Type.Any()) + msgCid: Type.Unknown(), + tipsetKey: Type.Array(Type.Unknown()) }) const BlockEvent = Type.Object({ @@ -35,14 +35,14 @@ const BlockEvent = Type.Object({ reverted: Type.Boolean() }) -const RpcRespone = Type.Object({ - result: Type.Any() +const RpcResponse = Type.Object({ + result: Type.Unknown() }) const ChainHead = Type.Object({ Height: Type.Number(), - Blocks: Type.Any(), - Cids: Type.Any() + Blocks: Type.Unknown(), + Cids: Type.Unknown() }) export { @@ -50,6 +50,6 @@ export { Entry, RawActorEvent, BlockEvent, - RpcRespone, + RpcResponse, ChainHead } diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index 428d14a..4fd4f33 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -4,16 +4,16 @@ import { encode as cborEncode } from '@ipld/dag-cbor' import { rawEventEntriesToEvent } from './utils.js' import { Value } from '@sinclair/typebox/value' import * as util from 'node:util' -import { ClaimEvent, RawActorEvent, BlockEvent, RpcRespone, ChainHead } from './data-types.js' +import { ClaimEvent, RawActorEvent, BlockEvent, RpcResponse, ChainHead } from './data-types.js' import pRetry from 'p-retry' /** @import { Static } from '@sinclair/typebox' */ -/** @import {MakeRpcRequest} from '../typings.js' */ +/** @import {MakeRpcRequest} from '../typings.d.ts' */ /** * @param {string} method - * @param {Object} params - * @returns {Promise} + * @param {unknown} params + * @returns {Promise} */ export const rpcRequest = async (method, params) => { const reqBody = JSON.stringify({ method, params, id: 1, jsonrpc: '2.0' }) @@ -32,7 +32,7 @@ export const rpcRequest = async (method, params) => { } const json = await response.json() try { - const parsedRpcResponse = Value.Parse(RpcRespone, json).result + const parsedRpcResponse = Value.Parse(RpcResponse, json).result return parsedRpcResponse } catch (error) { throw Error(util.format('Failed to parse RPC response: %o', json), { cause: error }) diff --git a/backend/lib/typings.d.ts b/backend/lib/typings.d.ts new file mode 100644 index 0000000..75e88f8 --- /dev/null +++ b/backend/lib/typings.d.ts @@ -0,0 +1,2 @@ +export type MakeRpcRequest = (method: string, params: Array) => Promise +export type MakePayloadCidRequest = (minerPeerId:string,payloadCid: string) => Promise \ No newline at end of file diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index 189e05f..eeff734 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -119,6 +119,11 @@ describe('deal-observer-backend', () => { }) it('check number of reverted stored deals', async () => { + /** + * @param {Static} eventData + * @param {boolean} reverted + * @returns {Promise} + */ const storeBlockEvent = async (eventData, reverted) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) @@ -154,13 +159,19 @@ describe('deal-observer-backend', () => { }) describe('deal-observer-backend built in actor event observer', () => { + /** + * @type {PgPool} + */ let pgPool + /** + * @type {import('../lib/typings.js').MakeRpcRequest} + */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) default: console.error('Unknown method') } @@ -188,6 +199,7 @@ describe('deal-observer-backend built in actor event observer', () => { let deals = await loadDeals(pgPool, 'SELECT * FROM active_deals') assert.strictEqual(deals.length, 25) const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) + assert(lastInsertedDeal !== null) assert.strictEqual(lastInsertedDeal.activated_at_epoch, 4622129) // The deal observer function should pick up from the current storage diff --git a/backend/test/piece-indexer.test.js b/backend/test/piece-indexer.test.js deleted file mode 100644 index 005f7ea..0000000 --- a/backend/test/piece-indexer.test.js +++ /dev/null @@ -1,80 +0,0 @@ -import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db' -import { before, beforeEach, it, describe, after } from 'node:test' -import { rawActorEventTestData } from './test_data/rawActorEvent.js' -import { chainHeadTestData } from './test_data/chainHead.js' -import { parse } from '@ipld/dag-json' -import { observeBuiltinActorEvents } from '../lib/deal-observer.js' -import assert from 'assert' -import { minerPeerIds } from './test_data/minerInfo.js' -import { payloadCIDs } from './test_data/payloadCIDs.js' -import { indexPieces } from '../lib/piece-indexer.js' - -/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ -/** @import {MakeRpcRequest, GetDealPayloadCid} from '../lib/typings.js' */ - -describe('deal-observer-backend piece indexer', () => { - /** @type {MakeRpcRequest} */ - const makeRpcRequest = async (method, params) => { - switch (method) { - case 'Filecoin.ChainHead': - return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': { - assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') - const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) - assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') - assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) - } - case 'Filecoin.StateMinerInfo': - assert(typeof params[0] === 'string', 'params[0] must be a string') - return minerPeerIds.get(params[0]) - default: - console.error('Unknown method') - } - } - /** - * @type {PgPool} - */ - let pgPool - before(async () => { - pgPool = await createPgPool() - await migrateWithPgClient(pgPool) - }) - - after(async () => { - await pgPool.end() - }) - - beforeEach(async () => { - await pgPool.query('DELETE FROM active_deals') - const startEpoch = 4622129 - for (let blockHeight = startEpoch; blockHeight < startEpoch + 10; blockHeight++) { - await observeBuiltinActorEvents(blockHeight, pgPool, makeRpcRequest) - } - assert.strictEqual( - (await pgPool.query('SELECT * FROM active_deals')).rows.length, - 336 - ) - }) - - it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => { - const getDealPayloadCidCalls = [] - /** @type {GetDealPayloadCid} */ - const getDealPayloadCid = async (providerId, pieceCid) => { - getDealPayloadCidCalls.push({ providerId, pieceCid }) - const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid ?? null - } - - assert.strictEqual( - (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, - 336 - ) - await indexPieces(makeRpcRequest, getDealPayloadCid, pgPool, 10000) - assert.strictEqual(getDealPayloadCidCalls.length, 336) - assert.strictEqual( - (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, - 85 // Not all deals have a payload CID in the test data - ) - }) -}) diff --git a/backend/test/resolve-payload-cids.test.js b/backend/test/resolve-payload-cids.test.js index ad4e63d..e8fa8ee 100644 --- a/backend/test/resolve-payload-cids.test.js +++ b/backend/test/resolve-payload-cids.test.js @@ -12,18 +12,24 @@ import { ActiveDealDbEntry, PayloadRetrievabilityState } from '@filecoin-station import { countStoredActiveDealsWithUnresolvedPayloadCid, resolvePayloadCids } from '../lib/resolve-payload-cids.js' describe('deal-observer-backend resolve payload CIDs', () => { + /** + * @type {import('../lib/typings.d.ts').MakeRpcRequest} + * */ const makeRpcRequest = async (method, params) => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) case 'Filecoin.StateMinerInfo': return minerPeerIds.get(params[0]) default: console.error('Unknown method') } } + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool}} + * */ let pgPool before(async () => { pgPool = await createPgPool() @@ -48,17 +54,20 @@ describe('deal-observer-backend resolve payload CIDs', () => { it('piece indexer loop function fetches deals where there exists no payload yet and updates the database entry', async (t) => { const resolvePayloadCidCalls = [] - const resolvePayloadCid = async (providerId, pieceCid) => { + /** + * @type {import('../lib/typings.d.ts').MakePayloadCidRequest} + * */ + const makePayloadCidRequest = async (providerId, pieceCid) => { resolvePayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid + return payloadCid ? payloadCid.payloadCid : null } assert.strictEqual( (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, 336 ) - await resolvePayloadCids(makeRpcRequest, resolvePayloadCid, pgPool, 10000) + await resolvePayloadCids(makeRpcRequest, makePayloadCidRequest, pgPool, 10000) assert.strictEqual(resolvePayloadCidCalls.length, 336) assert.strictEqual( (await pgPool.query('SELECT * FROM active_deals WHERE payload_cid IS NULL')).rows.length, @@ -70,10 +79,13 @@ describe('deal-observer-backend resolve payload CIDs', () => { let unresolvedPayloadCids = await countStoredActiveDealsWithUnresolvedPayloadCid(pgPool) assert.strictEqual(unresolvedPayloadCids, 336n) const resolvePayloadCidCalls = [] + /** + * @type {import('../lib/typings.d.ts').MakePayloadCidRequest} + * */ const resolvePayloadCid = async (providerId, pieceCid) => { resolvePayloadCidCalls.push({ providerId, pieceCid }) const payloadCid = payloadCIDs.get(JSON.stringify({ minerId: providerId, pieceCid })) - return payloadCid?.payloadCid + return payloadCid ? payloadCid.payloadCid : null } await resolvePayloadCids(makeRpcRequest, resolvePayloadCid, pgPool, 10000) @@ -83,6 +95,9 @@ describe('deal-observer-backend resolve payload CIDs', () => { }) describe('deal-observer-backend piece indexer payload retrieval', () => { + /** + * @type {import('@filecoin-station/deal-observer-db').PgPool} + */ let pgPool const payloadCid = 'PAYLOAD_CID' const minerPeerId = 'MINER_PEER_ID' diff --git a/backend/test/rpc-client.test.js b/backend/test/rpc-client.test.js index 1db150e..6da0c0f 100644 --- a/backend/test/rpc-client.test.js +++ b/backend/test/rpc-client.test.js @@ -7,7 +7,7 @@ import { getActorEvents, getActorEventsFilter, getChainHead, rpcRequest } from ' import { ClaimEvent } from '../lib/rpc-service/data-types.js' import { Value } from '@sinclair/typebox/value' -/** @import {MakeRpcRequest} from '../lib/typings.js' */ +/** @import {MakeRpcRequest} from '../lib/typings.d.ts' */ describe('RpcApiClient', () => { /** @type {MakeRpcRequest} */ diff --git a/backend/test/spark-api-submit-deals.test.js b/backend/test/spark-api-submit-deals.test.js index c48651d..5f49efc 100644 --- a/backend/test/spark-api-submit-deals.test.js +++ b/backend/test/spark-api-submit-deals.test.js @@ -110,6 +110,7 @@ describe('Submit deals to spark-api', () => { * @param {number} [activeDeal.clientId=3] * @param {string} [activeDeal.pieceCid='cidone'] * @param {string | null} [activeDeal.payloadCid=null] + * @param {boolean} [activeDeal.reverted=false] * @returns {Promise} */ const givenActiveDeal = async (pgPool, { createdAt, startsAt, expiresAt, minerId = 2, clientId = 3, pieceCid = 'cidone', payloadCid = null, reverted = false }) => { From 43e1337897286442665fd5ce4527075a4b3706a4 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Fri, 21 Feb 2025 13:24:53 +0100 Subject: [PATCH 12/22] fix: remove use of any --- backend/lib/rpc-service/service.js | 2 +- backend/lib/rpc-service/utils.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index 4fd4f33..af1eff5 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -42,7 +42,7 @@ export const rpcRequest = async (method, params) => { } } /** - * @param {{fromHeight:number,toHeight:number,fields: any}} actorEventFilter + * @param {{fromHeight:number,toHeight:number,fields: unknown}} actorEventFilter * Returns actor events filtered by the given actorEventFilter * @param {MakeRpcRequest} makeRpcRequest * @returns {Promise>>} diff --git a/backend/lib/rpc-service/utils.js b/backend/lib/rpc-service/utils.js index 75655d6..4de009a 100644 --- a/backend/lib/rpc-service/utils.js +++ b/backend/lib/rpc-service/utils.js @@ -18,7 +18,7 @@ const decodeCborInBase64 = (data) => { */ const rawEventEntriesToEvent = (rawEventEntries) => { // Each event is defined by a list of event entries which will parsed into a typed event - /** @type {Record} */ + /** @type {Record} */ const event = {} let eventType for (const entry of rawEventEntries) { From f73e3d867a85824055f3fd41f6df023f4e890fe6 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Fri, 21 Feb 2025 13:29:02 +0100 Subject: [PATCH 13/22] chore: formatting --- backend/bin/deal-observer-backend.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/bin/deal-observer-backend.js b/backend/bin/deal-observer-backend.js index 3c2a4a2..f0b21f0 100644 --- a/backend/bin/deal-observer-backend.js +++ b/backend/bin/deal-observer-backend.js @@ -52,7 +52,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => { const numberOfRevertedActiveDeals = await countRevertedActiveDeals(pgPool) if (INFLUXDB_TOKEN) { recordTelemetry('observed_deals_stats', point => { - point.intField('last_searched_epoch', newLastInsertedDeal?.activated_at_epoch || 0) + point.intField('last_searched_epoch', newLastInsertedDeal?.activated_at_epoch ?? 0) point.intField('number_of_stored_active_deals', numberOfStoredDeals) point.intField('number_of_reverted_active_deals', numberOfRevertedActiveDeals) }) From d54f02540b76326571ff47858b986b12e803a83c Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Fri, 21 Feb 2025 13:39:56 +0100 Subject: [PATCH 14/22] chore: formatting --- backend/lib/deal-observer.js | 2 +- backend/lib/resolve-payload-cids.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index a8ba239..3865870 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -123,7 +123,7 @@ export async function storeActiveDeals (activeDeals, pgPool) { /** * @param {Queryable} pgPool * @param {string} query - * @param {Array} args + * @param {Array} args * @returns {Promise>>} */ export async function loadDeals (pgPool, query, args = []) { diff --git a/backend/lib/resolve-payload-cids.js b/backend/lib/resolve-payload-cids.js index 1a783c0..8c0335f 100644 --- a/backend/lib/resolve-payload-cids.js +++ b/backend/lib/resolve-payload-cids.js @@ -47,7 +47,7 @@ export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, */ export async function fetchDealsWithUnresolvedPayloadCid (pgPool, maxDeals, now) { const query = "SELECT * FROM active_deals WHERE payload_cid IS NULL AND (payload_retrievability_state = 'PAYLOAD_CID_NOT_QUERIED_YET' OR payload_retrievability_state = 'PAYLOAD_CID_UNRESOLVED') AND (last_payload_retrieval_attempt IS NULL OR last_payload_retrieval_attempt < $1) ORDER BY activated_at_epoch ASC LIMIT $2" - return await loadDeals(pgPool, query, [now.toISOString(), maxDeals]) + return await loadDeals(pgPool, query, [now, maxDeals]) } /** From 4cc2e503a526457d857989aa9cc3c32159c2d223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 15:25:26 +0100 Subject: [PATCH 15/22] fix MakeRpcRequest --- backend/lib/typings.d.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/lib/typings.d.ts b/backend/lib/typings.d.ts index 75e88f8..08f39cf 100644 --- a/backend/lib/typings.d.ts +++ b/backend/lib/typings.d.ts @@ -1,2 +1,2 @@ -export type MakeRpcRequest = (method: string, params: Array) => Promise -export type MakePayloadCidRequest = (minerPeerId:string,payloadCid: string) => Promise \ No newline at end of file +export type MakeRpcRequest = (method: string, params: unknown[]) => Promise; +export type GetDealPayloadCid = (providerId:string,pieceCid:string) => Promise; From eda49143c1b8ff9e1b485ee9c956f2292126a41c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 15:36:56 +0100 Subject: [PATCH 16/22] fix tsc errors after making MakeRpcRequest stricter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/lib/typings.d.ts | 2 +- backend/test/deal-observer.test.js | 9 +++++++-- backend/test/resolve-payload-cids.test.js | 10 ++++++++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/backend/lib/typings.d.ts b/backend/lib/typings.d.ts index 08f39cf..0901370 100644 --- a/backend/lib/typings.d.ts +++ b/backend/lib/typings.d.ts @@ -1,2 +1,2 @@ export type MakeRpcRequest = (method: string, params: unknown[]) => Promise; -export type GetDealPayloadCid = (providerId:string,pieceCid:string) => Promise; +export type MakePayloadCidRequest = (providerId:string,pieceCid:string) => Promise; diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index eeff734..c575e3d 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -170,8 +170,13 @@ describe('deal-observer-backend built in actor event observer', () => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } default: console.error('Unknown method') } diff --git a/backend/test/resolve-payload-cids.test.js b/backend/test/resolve-payload-cids.test.js index e8fa8ee..8af435f 100644 --- a/backend/test/resolve-payload-cids.test.js +++ b/backend/test/resolve-payload-cids.test.js @@ -19,9 +19,15 @@ describe('deal-observer-backend resolve payload CIDs', () => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= params[0].fromHeight && e.height <= params[0].toHeight) + case 'Filecoin.GetActorEventsRaw': { + assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') + const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) + assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') + assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) + } case 'Filecoin.StateMinerInfo': + assert(typeof params[0] === 'string', 'params[0] must be a string') return minerPeerIds.get(params[0]) default: console.error('Unknown method') From fddbd9142736cd9d7e8300518b540f3f9449ba4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 15:51:57 +0100 Subject: [PATCH 17/22] remove `any` from `decodeCborInBase64` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/lib/rpc-service/utils.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/backend/lib/rpc-service/utils.js b/backend/lib/rpc-service/utils.js index 4de009a..f263f24 100644 --- a/backend/lib/rpc-service/utils.js +++ b/backend/lib/rpc-service/utils.js @@ -4,7 +4,7 @@ import * as util from 'node:util' /** * @param {string} data - * @returns + * @returns {unknown} */ const decodeCborInBase64 = (data) => { return cborDecode(base64pad.baseDecode(data)) @@ -20,6 +20,7 @@ const rawEventEntriesToEvent = (rawEventEntries) => { // Each event is defined by a list of event entries which will parsed into a typed event /** @type {Record} */ const event = {} + /** @type {string | undefined} */ let eventType for (const entry of rawEventEntries) { // The key returned by the Lotus API is kebab-case, we convert it to camelCase @@ -27,13 +28,13 @@ const rawEventEntriesToEvent = (rawEventEntries) => { let value = decodeCborInBase64(entry.Value) // In each entry exists an event type declaration which we need to extract if (key === '$type') { - eventType = value + eventType = /** @type {string} */(value) // The type entry is not part of the event itself continue } // Convert CID instanes to the string representation - if (value[Symbol.toStringTag] === 'CID') { + if (typeof value === 'object' && value && (Symbol.toStringTag in value) && value[Symbol.toStringTag] === 'CID') { value = value.toString() } else if (typeof value !== 'number') { throw new Error(util.format( From dffd3cd5006ae66bdb1ad3ba4b0a78a845e8094d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Fri, 21 Feb 2025 15:54:24 +0100 Subject: [PATCH 18/22] rpcRequest accepts an array as params MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- backend/lib/rpc-service/service.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index af1eff5..c392a22 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -12,7 +12,7 @@ import pRetry from 'p-retry' /** * @param {string} method - * @param {unknown} params + * @param {unknown[]} params * @returns {Promise} */ export const rpcRequest = async (method, params) => { From 5769366d51a6887f5d0feae092830dc2a22c56af Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl <113891786+NikolasHaimerl@users.noreply.github.com> Date: Fri, 21 Feb 2025 17:24:16 +0100 Subject: [PATCH 19/22] Update backend/lib/rpc-service/service.js MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Miroslav Bajtoš --- backend/lib/rpc-service/service.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backend/lib/rpc-service/service.js b/backend/lib/rpc-service/service.js index c392a22..a286ec1 100644 --- a/backend/lib/rpc-service/service.js +++ b/backend/lib/rpc-service/service.js @@ -48,8 +48,7 @@ export const rpcRequest = async (method, params) => { * @returns {Promise>>} */ export async function getActorEvents (actorEventFilter, makeRpcRequest) { - /** @typedef {unknown[]} ActorEventsRaw */ - const rawEvents = /** @type {ActorEventsRaw} */(await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter])) + const rawEvents = /** @type {unknown[]} */(await makeRpcRequest('Filecoin.GetActorEventsRaw', [actorEventFilter])) if (!rawEvents || rawEvents.length === 0) { console.debug(`No actor events found in the height range ${actorEventFilter.fromHeight} - ${actorEventFilter.toHeight}.`) return [] From dd4a886c2cf0a6ea196da4dd1f4fdaf9de20f2ce Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl <113891786+NikolasHaimerl@users.noreply.github.com> Date: Mon, 24 Feb 2025 10:45:04 +0100 Subject: [PATCH 20/22] Update backend/lib/deal-observer.js Co-authored-by: Julian Gruber --- backend/lib/deal-observer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/lib/deal-observer.js b/backend/lib/deal-observer.js index 3865870..bc2008d 100644 --- a/backend/lib/deal-observer.js +++ b/backend/lib/deal-observer.js @@ -18,7 +18,7 @@ export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastE const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) const startEpoch = Math.max( currentChainHead.Height - maxPastEpochs, - ((lastInsertedDeal?.activated_at_epoch ?? -1) + 1) || 0 + (lastInsertedDeal?.activated_at_epoch ?? -1) + 1 ) const endEpoch = currentChainHead.Height - finalityEpochs for (let epoch = startEpoch; epoch <= endEpoch; epoch++) { From b78a4220089541740dd8463ee30a10c97ba49408 Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl <113891786+NikolasHaimerl@users.noreply.github.com> Date: Mon, 24 Feb 2025 10:45:34 +0100 Subject: [PATCH 21/22] Update backend/lib/resolve-payload-cids.js Co-authored-by: Julian Gruber --- backend/lib/resolve-payload-cids.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/lib/resolve-payload-cids.js b/backend/lib/resolve-payload-cids.js index 8c0335f..ccf355f 100644 --- a/backend/lib/resolve-payload-cids.js +++ b/backend/lib/resolve-payload-cids.js @@ -22,7 +22,7 @@ export const resolvePayloadCids = async (makeRpcRequest, makePayloadCidRequest, for (const deal of await fetchDealsWithUnresolvedPayloadCid(pgPool, maxDeals, new Date(now - THREE_DAYS_IN_MILLISECONDS))) { const minerPeerId = await getMinerPeerId(deal.miner_id, makeRpcRequest) const payloadCid = await makePayloadCidRequest(minerPeerId, deal.piece_cid) - deal.payload_cid = payloadCid !== null ? payloadCid : undefined + if (payloadCid) deal.payload_cid = payloadCid if (!deal.payload_cid) { if (deal.last_payload_retrieval_attempt) { deal.payload_retrievability_state = PayloadRetrievabilityState.TerminallyUnretrievable From 5f61e221138d44fcd0c2a02d31ec6753a6ec9f4c Mon Sep 17 00:00:00 2001 From: Nikolas Haimerl Date: Mon, 24 Feb 2025 12:48:48 +0100 Subject: [PATCH 22/22] merged with main --- backend/test/deal-observer.test.js | 125 ++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 36 deletions(-) diff --git a/backend/test/deal-observer.test.js b/backend/test/deal-observer.test.js index c575e3d..d5c3d84 100644 --- a/backend/test/deal-observer.test.js +++ b/backend/test/deal-observer.test.js @@ -5,17 +5,16 @@ import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, import { Value } from '@sinclair/typebox/value' import { BlockEvent, ClaimEvent } from '../lib/rpc-service/data-types.js' import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js' -/** @import {PgPool} from '@filecoin-station/deal-observer-db' */ -/** @import { Static } from '@sinclair/typebox' */ import { PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/lib/types.js' import { chainHeadTestData } from './test_data/chainHead.js' import { rawActorEventTestData } from './test_data/rawActorEvent.js' import { parse } from '@ipld/dag-json' import { countRevertedActiveDeals } from '../lib/resolve-payload-cids.js' +/** @import { Static } from '@sinclair/typebox' */ describe('deal-observer-backend', () => { /** - * @type {PgPool} + * @type {import('@filecoin-station/deal-observer-db').PgPool} */ let pgPool before(async () => { @@ -32,33 +31,28 @@ describe('deal-observer-backend', () => { }) it('adds new FIL+ deals from built-in actor events to storage', async () => { - const eventData = { - id: 1, - provider: 2, - client: 3, - pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', - pieceSize: 4n, - termStart: 5, - termMin: 12340, - termMax: 12340, - sector: 6n, - payload_cid: undefined - } - const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) + const event = Value.Parse(BlockEvent, { + height: 1, + event: { + id: 1, + provider: 2, + client: 3, + pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', + pieceSize: 4n, + termStart: 5, + termMin: 12340, + termMax: 12340, + sector: 6n, + payload_cid: undefined + }, + emitter: 'f06', + reverted: false + }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) const actualData = await loadDeals(pgPool, 'SELECT * FROM active_deals') const expectedData = { - activated_at_epoch: event.height, - miner_id: eventData.provider, - client_id: eventData.client, - piece_cid: eventData.pieceCid, - piece_size: eventData.pieceSize, - term_start_epoch: eventData.termStart, - term_min: eventData.termMin, - term_max: eventData.termMax, - sector_id: eventData.sector, - payload_cid: undefined, + ...dbEntry, payload_retrievability_state: PayloadRetrievabilityState.NotQueried, last_payload_retrieval_attempt: undefined, reverted: false @@ -118,18 +112,80 @@ describe('deal-observer-backend', () => { assert.strictEqual(await countStoredActiveDeals(pgPool), 2n) }) + it('serially processes claims for a piece stored twice in the same sector', async () => { + /** + * @param {Static} eventData + */ + const storeDeal = async (eventData) => { + const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) + const dbEntry = convertBlockEventToActiveDealDbEntry(event) + await storeActiveDeals([dbEntry], pgPool) + } + const eventData = { + id: 1, + provider: 2, + client: 3, + pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', + pieceSize: 4n, + termStart: 5, + termMin: 12340, + termMax: 12340, + sector: 6n, + payload_cid: undefined, + payload_retrievability_state: PayloadRetrievabilityState.NotQueried, + last_payload_retrieval_attempt: undefined + } + await storeDeal(eventData) + let actual = await loadDeals(pgPool, 'SELECT * FROM active_deals') + assert.strictEqual(actual.length, 1) + // If we only change the id, the unique constraint which does not include the id will prevent the insertion + // This test verifies that `storeDeal` handles such situation by ignoring the duplicate deal record + eventData.id = 2 + await storeDeal(eventData) + actual = await loadDeals(pgPool, 'SELECT * FROM active_deals') + assert.strictEqual(actual.length, 1) + }) + it('simultaneously processes claims for a piece stored twice in the same sector', async () => { + /** + * @param {Array>} events + */ + const storeDeal = async (events) => { + const dbEntries = events.map(eventData => { + const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false }) + return convertBlockEventToActiveDealDbEntry(event) + }) + await storeActiveDeals(dbEntries, pgPool) + } + const eventData = Value.Parse(ClaimEvent, { + id: 1, + provider: 2, + client: 3, + pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi', + pieceSize: 4n, + termStart: 5, + termMin: 12340, + termMax: 12340, + sector: 6n, + payload_cid: undefined, + payload_retrievability_state: PayloadRetrievabilityState.NotQueried, + last_payload_retrieval_attempt: undefined + }) + await storeDeal([eventData, { ...eventData, id: 2 }]) + const actual = await loadDeals(pgPool, 'SELECT * FROM active_deals') + // Only one of the events will be stored in the database + assert.strictEqual(actual.length, 1) + }) it('check number of reverted stored deals', async () => { /** * @param {Static} eventData * @param {boolean} reverted - * @returns {Promise} */ const storeBlockEvent = async (eventData, reverted) => { const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted }) const dbEntry = convertBlockEventToActiveDealDbEntry(event) await storeActiveDeals([dbEntry], pgPool) } - const data = { + const data = Value.Parse(ClaimEvent, { id: 1, provider: 2, client: 3, @@ -139,7 +195,7 @@ describe('deal-observer-backend', () => { termMin: 12340, termMax: 12340, sector: 6n - } + }) assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n) await storeBlockEvent(data, false) assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n) @@ -160,7 +216,7 @@ describe('deal-observer-backend', () => { describe('deal-observer-backend built in actor event observer', () => { /** - * @type {PgPool} + * @type {import('@filecoin-station/deal-observer-db').PgPool} */ let pgPool /** @@ -170,13 +226,12 @@ describe('deal-observer-backend built in actor event observer', () => { switch (method) { case 'Filecoin.ChainHead': return parse(JSON.stringify(chainHeadTestData)) - case 'Filecoin.GetActorEventsRaw': { + case 'Filecoin.GetActorEventsRaw':{ assert(typeof params[0] === 'object' && params[0], 'params[0] must be an object') const filter = /** @type {{fromHeight: number; toHeight: number}} */(params[0]) assert(typeof filter.fromHeight === 'number', 'filter.fromHeight must be a number') assert(typeof filter.toHeight === 'number', 'filter.toHeight must be a number') - return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) - } + return parse(JSON.stringify(rawActorEventTestData)).filter((/** @type {{ height: number; }} */ e) => e.height >= filter.fromHeight && e.height <= filter.toHeight) } default: console.error('Unknown method') } @@ -204,9 +259,7 @@ describe('deal-observer-backend built in actor event observer', () => { let deals = await loadDeals(pgPool, 'SELECT * FROM active_deals') assert.strictEqual(deals.length, 25) const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool) - assert(lastInsertedDeal !== null) - assert.strictEqual(lastInsertedDeal.activated_at_epoch, 4622129) - + assert.strictEqual(lastInsertedDeal?.activated_at_epoch, 4622129) // The deal observer function should pick up from the current storage await observeBuiltinActorEvents(pgPool, makeRpcRequest, 100, 0) deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')