Skip to content

Commit 613994f

Browse files
authored
fix: Restart proving job if epoch content changes (#12655)
As another fix for #12625, adds a recurring check on the epoch's block headers to the proving job. If a mismatch is found, the job is aborted and restarted with fresh data. Also removes the unused `prove` call on the prover-node. See also #12638
1 parent ae4f935 commit 613994f

File tree

13 files changed

+175
-56
lines changed

13 files changed

+175
-56
lines changed

yarn-project/archiver/src/archiver/archiver.ts

+18
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,24 @@ export class Archiver extends EventEmitter implements ArchiveSource, Traceable {
612612
return blocks.reverse();
613613
}
614614

615+
public async getBlockHeadersForEpoch(epochNumber: bigint): Promise<BlockHeader[]> {
616+
const [start, end] = getSlotRangeForEpoch(epochNumber, this.l1constants);
617+
const blocks: BlockHeader[] = [];
618+
619+
// Walk the list of blocks backwards and filter by slots matching the requested epoch.
620+
// We'll typically ask for blocks for a very recent epoch, so we shouldn't need an index here.
621+
let number = await this.store.getSynchedL2BlockNumber();
622+
let header = await this.getBlockHeader(number);
623+
const slot = (b: BlockHeader) => b.globalVariables.slotNumber.toBigInt();
624+
while (header && slot(header) >= start) {
625+
if (slot(header) <= end) {
626+
blocks.push(header);
627+
}
628+
header = await this.getBlockHeader(--number);
629+
}
630+
return blocks.reverse();
631+
}
632+
615633
public async isEpochComplete(epochNumber: bigint): Promise<boolean> {
616634
// The epoch is complete if the current L2 block is the last one in the epoch (or later)
617635
const header = await this.getBlockHeader('latest');

yarn-project/archiver/src/test/mock_l2_block_source.ts

+4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ export class MockL2BlockSource implements L2BlockSource {
118118
return Promise.resolve(blocks);
119119
}
120120

121+
getBlockHeadersForEpoch(epochNumber: bigint): Promise<BlockHeader[]> {
122+
return this.getBlocksForEpoch(epochNumber).then(blocks => blocks.map(b => b.header));
123+
}
124+
121125
/**
122126
* Gets a tx effect.
123127
* @param txHash - The hash of a transaction which resulted in the returned tx effect.

yarn-project/foundation/src/promise/running-promise.ts

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export class RunningPromise {
7373
}
7474
};
7575
this.runningPromise = poll();
76+
return this;
7677
}
7778

7879
/**

yarn-project/foundation/src/retry/index.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,12 @@ export async function retry<Result>(
8383
* @param interval - The optional interval, in seconds, between retry attempts. Defaults to 1 second.
8484
* @returns A Promise that resolves with the successful (truthy) result of the provided function, or rejects if timeout is exceeded.
8585
*/
86-
export async function retryUntil<T>(fn: () => Promise<T | undefined>, name = '', timeout = 0, interval = 1) {
86+
export async function retryUntil<T>(
87+
fn: () => (T | undefined) | Promise<T | undefined>,
88+
name = '',
89+
timeout = 0,
90+
interval = 1,
91+
) {
8792
const timer = new Timer();
8893
while (true) {
8994
const result = await fn();

yarn-project/prover-node/src/job/epoch-proving-job.test.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { toArray } from '@aztec/foundation/iterable';
33
import { sleep } from '@aztec/foundation/sleep';
44
import type { PublicProcessor, PublicProcessorFactory } from '@aztec/simulator/server';
55
import { L2Block, type L2BlockSource } from '@aztec/stdlib/block';
6+
import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers';
67
import type { EpochProver, MerkleTreeWriteOperations, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
78
import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging';
89
import { Proof } from '@aztec/stdlib/proofs';
@@ -85,9 +86,12 @@ describe('epoch-proving-job', () => {
8586

8687
l1ToL2MessageSource.getL1ToL2Messages.mockResolvedValue([]);
8788
l2BlockSource.getBlockHeader.mockResolvedValue(initialHeader);
89+
l2BlockSource.getL1Constants.mockResolvedValue({ ethereumSlotDuration: 0.1 } as L1RollupConstants);
90+
l2BlockSource.getBlockHeadersForEpoch.mockResolvedValue(blocks.map(b => b.header));
8891
publicProcessorFactory.create.mockReturnValue(publicProcessor);
8992
db.getInitialHeader.mockReturnValue(initialHeader);
9093
worldState.fork.mockResolvedValue(db);
94+
prover.startNewBlock.mockImplementation(() => sleep(200));
9195
prover.finaliseEpoch.mockResolvedValue({ publicInputs, proof });
9296
publisher.submitEpochProof.mockResolvedValue(true);
9397
publicProcessor.process.mockImplementation(async txs => {
@@ -146,7 +150,6 @@ describe('epoch-proving-job', () => {
146150
});
147151

148152
it('halts if stopped externally', async () => {
149-
prover.startNewBlock.mockImplementation(() => sleep(200));
150153
const job = createJob();
151154
void job.run();
152155
await sleep(100);
@@ -155,4 +158,16 @@ describe('epoch-proving-job', () => {
155158
expect(job.getState()).toEqual('stopped');
156159
expect(publisher.submitEpochProof).not.toHaveBeenCalled();
157160
});
161+
162+
it('halts if a new block for the epoch is found', async () => {
163+
const newBlocks = await timesParallel(NUM_BLOCKS + 1, i => L2Block.random(i + 1, TXS_PER_BLOCK));
164+
l2BlockSource.getBlockHeadersForEpoch.mockResolvedValue(newBlocks.map(b => b.header));
165+
166+
const job = createJob();
167+
await job.run();
168+
169+
expect(job.getState()).toEqual('reorg');
170+
expect(publisher.submitEpochProof).not.toHaveBeenCalled();
171+
expect(prover.cancel).toHaveBeenCalled();
172+
});
158173
});

yarn-project/prover-node/src/job/epoch-proving-job.ts

+42-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { asyncPool } from '@aztec/foundation/async-pool';
22
import { createLogger } from '@aztec/foundation/log';
3-
import { promiseWithResolvers } from '@aztec/foundation/promise';
3+
import { RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise';
44
import { Timer } from '@aztec/foundation/timer';
55
import type { PublicProcessor, PublicProcessorFactory } from '@aztec/simulator/server';
66
import type { L2Block, L2BlockSource } from '@aztec/stdlib/block';
@@ -30,6 +30,7 @@ export class EpochProvingJob implements Traceable {
3030
private uuid: string;
3131

3232
private runPromise: Promise<void> | undefined;
33+
private epochCheckPromise: RunningPromise | undefined;
3334
private deadlineTimeoutHandler: NodeJS.Timeout | undefined;
3435

3536
public readonly tracer: Tracer;
@@ -47,7 +48,6 @@ export class EpochProvingJob implements Traceable {
4748
private metrics: ProverNodeMetrics,
4849
private deadline: Date | undefined,
4950
private config: { parallelBlockLimit: number } = { parallelBlockLimit: 32 },
50-
private cleanUp: (job: EpochProvingJob) => Promise<void> = () => Promise.resolve(),
5151
) {
5252
this.uuid = crypto.randomUUID();
5353
this.tracer = metrics.client.getTracer('EpochProvingJob');
@@ -73,6 +73,7 @@ export class EpochProvingJob implements Traceable {
7373
})
7474
public async run() {
7575
this.scheduleDeadlineStop();
76+
await this.scheduleEpochCheck();
7677

7778
const epochNumber = Number(this.epochNumber);
7879
const epochSizeBlocks = this.blocks.length;
@@ -154,14 +155,18 @@ export class EpochProvingJob implements Traceable {
154155
this.metrics.recordProvingJob(executionTime, timer.ms(), epochSizeBlocks, epochSizeTxs);
155156
} catch (err: any) {
156157
if (err && err.name === 'HaltExecutionError') {
157-
this.log.warn(`Halted execution of epoch ${epochNumber} prover job`, { uuid: this.uuid, epochNumber });
158+
this.log.warn(`Halted execution of epoch ${epochNumber} prover job`, {
159+
uuid: this.uuid,
160+
epochNumber,
161+
details: err.message,
162+
});
158163
return;
159164
}
160165
this.log.error(`Error running epoch ${epochNumber} prover job`, err, { uuid: this.uuid, epochNumber });
161166
this.state = 'failed';
162167
} finally {
163168
clearTimeout(this.deadlineTimeoutHandler);
164-
await this.cleanUp(this);
169+
await this.epochCheckPromise?.stop();
165170
await this.prover.stop();
166171
resolve();
167172
}
@@ -173,12 +178,12 @@ export class EpochProvingJob implements Traceable {
173178
}
174179

175180
private checkState() {
176-
if (this.state === 'timed-out' || this.state === 'stopped' || this.state === 'failed') {
181+
if (this.state === 'timed-out' || this.state === 'stopped' || this.state === 'failed' || this.state === 'reorg') {
177182
throw new HaltExecutionError(this.state);
178183
}
179184
}
180185

181-
public async stop(state: EpochProvingJobState = 'stopped') {
186+
public async stop(state: EpochProvingJobTerminalState = 'stopped') {
182187
this.state = state;
183188
this.prover.cancel();
184189
// TODO(palla/prover): Stop the publisher as well
@@ -207,6 +212,36 @@ export class EpochProvingJob implements Traceable {
207212
}
208213
}
209214

215+
/**
216+
* Kicks off a running promise that queries the archiver for the set of L2 blocks of the current epoch.
217+
* If those changed, stops the proving job with a `rerun` state, so the node re-enqueues it.
218+
*/
219+
private async scheduleEpochCheck() {
220+
const intervalMs = Math.ceil((await this.l2BlockSource.getL1Constants()).ethereumSlotDuration / 2) * 1000;
221+
this.epochCheckPromise = new RunningPromise(
222+
async () => {
223+
const blocks = await this.l2BlockSource.getBlockHeadersForEpoch(this.epochNumber);
224+
const blockHashes = await Promise.all(blocks.map(block => block.hash()));
225+
const thisBlockHashes = await Promise.all(this.blocks.map(block => block.hash()));
226+
if (
227+
blocks.length !== this.blocks.length ||
228+
!blockHashes.every((block, i) => block.equals(thisBlockHashes[i]))
229+
) {
230+
this.log.warn('Epoch blocks changed underfoot', {
231+
uuid: this.uuid,
232+
epochNumber: this.epochNumber,
233+
oldBlockHashes: thisBlockHashes,
234+
newBlockHashes: blockHashes,
235+
});
236+
void this.stop('reorg');
237+
}
238+
},
239+
this.log,
240+
intervalMs,
241+
).start();
242+
this.log.verbose(`Scheduled epoch check for epoch ${this.epochNumber} every ${intervalMs}ms`);
243+
}
244+
210245
/* Returns the header for the given block number, or the genesis block for block zero. */
211246
private async getBlockHeader(blockNumber: number) {
212247
if (blockNumber === 0) {
@@ -249,7 +284,7 @@ export class EpochProvingJob implements Traceable {
249284
}
250285

251286
class HaltExecutionError extends Error {
252-
constructor(state: EpochProvingJobState) {
287+
constructor(public readonly state: EpochProvingJobState) {
253288
super(`Halted execution due to state ${state}`);
254289
this.name = 'HaltExecutionError';
255290
}

yarn-project/prover-node/src/prover-node.test.ts

+41-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { timesParallel } from '@aztec/foundation/collection';
22
import { EthAddress } from '@aztec/foundation/eth-address';
3+
import { promiseWithResolvers } from '@aztec/foundation/promise';
4+
import { retryUntil } from '@aztec/foundation/retry';
5+
import { sleep } from '@aztec/foundation/sleep';
36
import type { PublicProcessorFactory } from '@aztec/simulator/server';
47
import { L2Block, type L2BlockSource } from '@aztec/stdlib/block';
58
import type { ContractDataSource } from '@aztec/stdlib/contract';
@@ -45,11 +48,7 @@ describe('prover-node', () => {
4548
let address: EthAddress;
4649

4750
// List of all jobs ever created by the test prover node and their dependencies
48-
let jobs: {
49-
job: MockProxy<EpochProvingJob>;
50-
cleanUp: (job: EpochProvingJob) => Promise<void>;
51-
epochNumber: bigint;
52-
}[];
51+
let jobs: { job: MockProxy<EpochProvingJob>; epochNumber: bigint }[];
5352

5453
const createProverNode = () =>
5554
new TestProverNode(
@@ -140,39 +139,70 @@ describe('prover-node', () => {
140139
it('starts a proof on a finished epoch', async () => {
141140
await proverNode.handleEpochReadyToProve(10n);
142141
expect(jobs[0].epochNumber).toEqual(10n);
142+
expect(proverNode.totalJobCount).toEqual(1);
143143
});
144144

145145
it('does not start a proof if there are no blocks in the epoch', async () => {
146146
l2BlockSource.getBlocksForEpoch.mockResolvedValue([]);
147147
await proverNode.handleEpochReadyToProve(10n);
148-
expect(jobs.length).toEqual(0);
148+
expect(proverNode.totalJobCount).toEqual(0);
149149
});
150150

151151
it('does not start a proof if there is a tx missing from coordinator', async () => {
152152
mockCoordination.getTxsByHash.mockResolvedValue([]);
153153
await proverNode.handleEpochReadyToProve(10n);
154-
expect(jobs.length).toEqual(0);
154+
expect(proverNode.totalJobCount).toEqual(0);
155155
});
156156

157157
it('does not prove the same epoch twice', async () => {
158+
const firstJob = promiseWithResolvers<void>();
159+
proverNode.nextJobRun = () => firstJob.promise;
160+
proverNode.nextJobState = 'processing';
158161
await proverNode.handleEpochReadyToProve(10n);
159162
await proverNode.handleEpochReadyToProve(10n);
160163

161-
expect(jobs.length).toEqual(1);
164+
firstJob.resolve();
165+
expect(proverNode.totalJobCount).toEqual(1);
166+
});
167+
168+
it('restarts a proof on a reorg', async () => {
169+
proverNode.nextJobState = 'reorg';
170+
await proverNode.handleEpochReadyToProve(10n);
171+
await retryUntil(() => proverNode.totalJobCount === 2, 'job retried', 5);
172+
expect(proverNode.totalJobCount).toEqual(2);
173+
});
174+
175+
it('does not restart a proof on an error', async () => {
176+
proverNode.nextJobState = 'failed';
177+
await proverNode.handleEpochReadyToProve(10n);
178+
await sleep(1000);
179+
expect(proverNode.totalJobCount).toEqual(1);
162180
});
163181

164182
class TestProverNode extends ProverNode {
183+
public totalJobCount = 0;
184+
public nextJobState: EpochProvingJobState = 'completed';
185+
public nextJobRun: () => Promise<void> = () => Promise.resolve();
186+
165187
protected override doCreateEpochProvingJob(
166188
epochNumber: bigint,
167189
_deadline: Date | undefined,
168190
_blocks: L2Block[],
169191
_txs: Tx[],
170192
_publicProcessorFactory: PublicProcessorFactory,
171-
cleanUp: (job: EpochProvingJob) => Promise<void>,
172193
): EpochProvingJob {
173-
const job = mock<EpochProvingJob>({ getState: () => 'processing', run: () => Promise.resolve() });
194+
const state = this.nextJobState;
195+
this.nextJobState = 'completed';
196+
const run = this.nextJobRun;
197+
this.nextJobRun = () => Promise.resolve();
198+
const job = mock<EpochProvingJob>({
199+
getState: () => state,
200+
run,
201+
getEpochNumber: () => epochNumber,
202+
});
174203
job.getId.mockReturnValue(jobs.length.toString());
175-
jobs.push({ epochNumber, job, cleanUp });
204+
jobs.push({ epochNumber, job });
205+
this.totalJobCount++;
176206
return job;
177207
}
178208

0 commit comments

Comments
 (0)