Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v4: Waitpoint PAT completion, fix dequeuing without retry config, fixed metadata #1826

Merged
merged 6 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,20 @@ import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { engine } from "~/v3/runEngine.server";

const { action } = createActionApiRoute(
const { action, loader } = createActionApiRoute(
{
params: z.object({
waitpointFriendlyId: z.string(),
}),
body: CompleteWaitpointTokenRequestBody,
maxContentLength: env.TASK_PAYLOAD_MAXIMUM_SIZE,
method: "POST",
allowJWT: true,
authorization: {
action: "write",
resource: (params) => ({ waitpoints: params.waitpointFriendlyId }),
superScopes: ["write:waitpoints", "admin"],
},
corsStrategy: "all",
},
async ({ authentication, body, params }) => {
// Resume tokens are actually just waitpoints
Expand All @@ -39,6 +45,12 @@ const { action } = createActionApiRoute(
throw json({ error: "Waitpoint not found" }, { status: 404 });
}

if (waitpoint.status === "COMPLETED") {
return json<CompleteWaitpointTokenResponseBody>({
success: true,
});
}

const stringifiedData = await stringifyIO(body.data);
const finalData = await conditionallyExportPacket(
stringifiedData,
Expand All @@ -65,4 +77,4 @@ const { action } = createActionApiRoute(
}
);

export { action };
export { action, loader };
18 changes: 17 additions & 1 deletion apps/webapp/app/routes/api.v1.waitpoints.tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { engine } from "~/v3/runEngine.server";
Expand Down Expand Up @@ -77,12 +78,14 @@ const { action } = createActionApiRoute(
tags: bodyTags,
});

const $responseHeaders = await responseHeaders(authentication.environment);

return json<CreateWaitpointTokenResponseBody>(
{
id: WaitpointId.toFriendlyId(result.waitpoint.id),
isCached: result.isCached,
},
{ status: 200 }
{ status: 200, headers: $responseHeaders }
);
} catch (error) {
if (error instanceof ServiceValidationError) {
Expand All @@ -96,4 +99,17 @@ const { action } = createActionApiRoute(
}
);

async function responseHeaders(
environment: AuthenticatedEnvironment
): Promise<Record<string, string>> {
const claimsHeader = JSON.stringify({
sub: environment.id,
pub: true,
});

return {
"x-trigger-jwt-claims": claimsHeader,
};
}

export { action };
2 changes: 1 addition & 1 deletion apps/webapp/app/services/authorization.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed

const ResourceTypes = ["tasks", "tags", "runs", "batch"] as const;
const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints"] as const;

export type AuthorizationResources = {
[key in (typeof ResourceTypes)[number]]?: string | string[];
Expand Down
2 changes: 0 additions & 2 deletions internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
case "TASK_EXECUTION_ABORTED":
case "TASK_EXECUTION_FAILED":
case "TASK_PROCESS_SIGTERM":
case "TASK_DEQUEUED_INVALID_RETRY_CONFIG":
case "TASK_DEQUEUED_NO_RETRY_CONFIG":
case "TASK_DID_CONCURRENT_WAIT":
return "SYSTEM_FAILURE";
default:
Expand Down
34 changes: 1 addition & 33 deletions internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,41 +307,9 @@ export class DequeueSystem {
task: result.task.id,
rawRetryConfig: retryConfig,
});

await this.runAttemptSystem.systemFailure({
runId,
error: {
type: "INTERNAL_ERROR",
code: "TASK_DEQUEUED_INVALID_RETRY_CONFIG",
message: `Invalid retry config: ${retryConfig}`,
},
tx: prisma,
});

return null;
}

if (!parsedConfig.data) {
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No retry config", {
runId,
task: result.task.id,
rawRetryConfig: retryConfig,
});

await this.runAttemptSystem.systemFailure({
runId,
error: {
type: "INTERNAL_ERROR",
code: "TASK_DEQUEUED_NO_RETRY_CONFIG",
message: `No retry config found`,
},
tx: prisma,
});

return null;
}

maxAttempts = parsedConfig.data.maxAttempts;
maxAttempts = parsedConfig.data?.maxAttempts;
}
//update the run
const lockedTaskRun = await prisma.taskRun.update({
Expand Down
86 changes: 36 additions & 50 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { timeoutError } from "@trigger.dev/core/v3";
import { timeoutError, tryCatch } from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import {
$transaction,
Expand Down Expand Up @@ -66,65 +66,51 @@ export class WaitpointSystem {
isError: boolean;
};
}): Promise<Waitpoint> {
const result = await $transaction(
this.$.prisma,
async (tx) => {
// 1. Find the TaskRuns blocked by this waitpoint
const affectedTaskRuns = await tx.taskRunWaitpoint.findMany({
where: { waitpointId: id },
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
});
// 1. Find the TaskRuns blocked by this waitpoint
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
where: { waitpointId: id },
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
});

if (affectedTaskRuns.length === 0) {
this.$.logger.warn(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
waitpointId: id,
});
}
if (affectedTaskRuns.length === 0) {
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
waitpointId: id,
});
}

// 2. Update the waitpoint to completed (only if it's pending)
let waitpoint: Waitpoint | null = null;
try {
waitpoint = await tx.waitpoint.update({
where: { id, status: "PENDING" },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: output?.value,
outputType: output?.type,
outputIsError: output?.isError,
},
});
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
waitpoint = await tx.waitpoint.findFirst({
where: { id },
});
} else {
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { error });
throw error;
}
}
let [waitpointError, waitpoint] = await tryCatch(
this.$.prisma.waitpoint.update({
where: { id, status: "PENDING" },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: output?.value,
outputType: output?.type,
outputIsError: output?.isError,
},
})
);

return { waitpoint, affectedTaskRuns };
},
(error) => {
this.$.logger.error(`completeWaitpoint: Error completing waitpoint ${id}, retrying`, {
error,
if (waitpointError) {
if (
waitpointError instanceof Prisma.PrismaClientKnownRequestError &&
waitpointError.code === "P2025"
) {
waitpoint = await this.$.prisma.waitpoint.findFirst({
where: { id },
});
throw error;
} else {
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { waitpointError });
throw waitpointError;
}
);

if (!result) {
throw new Error(`Waitpoint couldn't be updated`);
}

if (!result.waitpoint) {
if (!waitpoint) {
throw new Error(`Waitpoint ${id} not found`);
}

//schedule trying to continue the runs
for (const run of result.affectedTaskRuns) {
for (const run of affectedTaskRuns) {
await this.$.worker.enqueue({
//this will debounce the call
id: `continueRunIfUnblocked:${run.taskRunId}`,
Expand All @@ -148,7 +134,7 @@ export class WaitpointSystem {
}
}

return result.waitpoint;
return waitpoint;
}

/**
Expand Down
2 changes: 2 additions & 0 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ const zodIpc = new ZodIpcConnection({
return;
}

runMetadataManager.runId = execution.run.id;

const executor = new TaskExecutor(task, {
tracer,
tracingSDK,
Expand Down
2 changes: 2 additions & 0 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ const zodIpc = new ZodIpcConnection({
return;
}

runMetadataManager.runId = execution.run.id;

const executor = new TaskExecutor(task, {
tracer,
tracingSDK,
Expand Down
21 changes: 14 additions & 7 deletions packages/core/src/v3/apiClient/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { calculateNextRetryDelay } from "../utils/retries.js";
import { ApiConnectionError, ApiError, ApiSchemaValidationError } from "./errors.js";

import { Attributes, context, propagation, Span } from "@opentelemetry/api";
import {suppressTracing} from "@opentelemetry/core"
import { suppressTracing } from "@opentelemetry/core";
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
import type { TriggerTracer } from "../tracer.js";
import { accessoryAttributes } from "../utils/styleAttributes.js";
Expand All @@ -27,14 +27,14 @@ export const defaultRetryOptions = {
randomize: false,
} satisfies RetryOptions;

export type ZodFetchOptions<T = unknown> = {
export type ZodFetchOptions<TData = any> = {
retry?: RetryOptions;
tracer?: TriggerTracer;
name?: string;
attributes?: Attributes;
icon?: string;
onResponseBody?: (body: T, span: Span) => void;
prepareData?: (data: T) => Promise<T> | T;
onResponseBody?: (body: TData, span: Span) => void;
prepareData?: (data: TData, response: Response) => Promise<TData> | TData;
};

export type AnyZodFetchOptions = ZodFetchOptions<any>;
Expand Down Expand Up @@ -144,7 +144,14 @@ export function zodfetchOffsetLimitPage<TItemSchema extends z.ZodTypeAny>(

const fetchResult = _doZodFetch(offsetLimitPageSchema, $url.href, requestInit, options);

return new OffsetLimitPagePromise(fetchResult, schema, url, params, requestInit, options);
return new OffsetLimitPagePromise(
fetchResult as Promise<ZodFetchResult<OffsetLimitPageResponse<z.output<TItemSchema>>>>,
schema,
url,
params,
requestInit,
options
);
}

type ZodFetchResult<T> = {
Expand Down Expand Up @@ -188,7 +195,7 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
schema: TResponseBodySchema,
url: string,
requestInit?: PromiseOrValue<RequestInit>,
options?: ZodFetchOptions
options?: ZodFetchOptions<z.output<TResponseBodySchema>>
): Promise<ZodFetchResult<z.output<TResponseBodySchema>>> {
let $requestInit = await requestInit;

Expand All @@ -202,7 +209,7 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
}

if (options?.prepareData) {
result.data = await options.prepareData(result.data);
result.data = await options.prepareData(result.data, result.response);
}

return result;
Expand Down
Loading