Skip to content

Commit

Permalink
refactor concurrent processing and handling of jobs
Browse files Browse the repository at this point in the history
fetch is now reduced to one async tail call with execution thrown onto
the event loop.
to control concurrency we check the in progress against the concurrency
limit and Promise.race the in progress jobs to know when we can continue
fetching.
  • Loading branch information
jbielick committed Jun 5, 2021
1 parent 8e5f9b2 commit 96019af
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 124 deletions.
14 changes: 0 additions & 14 deletions @types/redis-parser/index.d.ts

This file was deleted.

6 changes: 2 additions & 4 deletions bench/work
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#!/usr/bin/env node

const faktory = require('../');
const faktory = require("../");
const concurrency = Number(process.argv[2]);

(async () => {

let completed = 0;
const time = process.hrtime();
let worker;
Expand All @@ -25,11 +24,10 @@ jobs/s: ${Math.round(completed / duration, 2)}
}
};

faktory.register('MyJob', async () => {
faktory.register("MyJob", async () => {
// await new Promise(resolve => setTimeout(resolve, 0));
finish();
});

worker = await faktory.work({ concurrency });

})();
2 changes: 1 addition & 1 deletion bin/server
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ exec docker run \
$@ \
-p 7419:7419 \
-p 7420:7420 \
contribsys/faktory:1.4.1 \
contribsys/faktory:1.5.1 \
/faktory -b :7419
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"@types/uuid": "^8.0.0",
"@typescript-eslint/eslint-plugin": "^4.0.0",
"@typescript-eslint/parser": "4.25.0",
"ava": "^3.8.2",
"ava": "3.15.0",
"coveralls": "3.1.0",
"eslint": "^7.2.0",
"eslint-config-prettier": "^7.1.0",
Expand Down
7 changes: 7 additions & 0 deletions src/@types/generic-pool/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { EventEmitter } from "koa";

declare module "generic-pool" {
export interface Pool<T> {
ready(): PromiseLike<void>;
}
}
4 changes: 2 additions & 2 deletions src/__tests__/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ test("#beat: returns a signal from the server", async (t) => {
});
});

test.skip("#connect: rejects connect when connection cannot be established", async (t) => {
const client = new Client({ url: "tcp://localhost:7488" });
test("#connect: rejects connect when connection cannot be established", async (t) => {
const client = new Client({ url: "tcp://localhost:1" });

await t.throwsAsync(client.connect(), { message: /ECONNREFUSED/ });
});
Expand Down
1 change: 0 additions & 1 deletion src/__tests__/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ test("invokes middleware", async (t) => {
},
},
});

worker.work();
});
});
Expand Down
4 changes: 2 additions & 2 deletions src/__tests__/signals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ test(".stop() allows in-progress jobs to finish", async (t) => {
await stop();
});

test("worker drains pool after stop timeout", async (t) => {
test("worker exits the process after stop timeout", async (t) => {
const { queue, jobtype } = await push();
let exited = false;

Expand All @@ -94,7 +94,7 @@ test("worker drains pool after stop timeout", async (t) => {
registry: {
[jobtype]: async () => {
worker.stop();
await sleep(100);
await sleep(1000);
t.truthy(exited);
resolve();
},
Expand Down
3 changes: 1 addition & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ import { default as makeDebug } from "debug";
import { URL } from "url";
import { unescape } from "querystring";
import { hostname } from "os";
import { createPool } from "generic-pool";
import { createPool, Pool } from "generic-pool";

import { encode, hash } from "./utils";
import { Job, JobPayload, JobType } from "./job";
import { Mutation, RETRIES, DEAD, SCHEDULED } from "./mutation";
import { Connection, Greeting, Command } from "./connection";
import { ConnectionFactory } from "./connection-factory";
import { Pool } from "generic-pool";

const debug = makeDebug("faktory-worker:client");
const heartDebug = makeDebug("faktory-worker:client:heart");
Expand Down
6 changes: 4 additions & 2 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class Connection extends EventEmitter {
*/
private listen(): Connection {
this.socket
.on("connect", this.onConnect.bind(this))
.once("connect", this.onConnect.bind(this))
.on("data", this.parser.execute.bind(this.parser))
.on("timeout", this.onTimeout.bind(this))
.on("error", this.onError.bind(this))
Expand Down Expand Up @@ -215,13 +215,15 @@ export class Connection extends EventEmitter {
private onError(err: Error) {
this.lastError = err;
this.emit("error", err);
this.close();
}

/**
* Closes the connection to the server
* @return {Promise} resolved when underlying socket emits "close"
*/
close(): Promise<void> {
async close(): Promise<void> {
if (this.closing) return;
this.closing = true;
return new Promise<void>((resolve) =>
this.socket
Expand Down
41 changes: 41 additions & 0 deletions src/create-execution-chain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import compose, { ComposedMiddleware } from "koa-compose";
import { Middleware, MiddlewareContext, Registry } from "./worker";

/**
* Builds a koa-compose stack of the middleware functions in addition to
* two worker-added middleware functions for pulling the job function from the
* registry and calling the job function and/or thunk
*
* @private
* @return {function} entrypoint function to the middleware stack
*/

export default function createExecutionChain(
middleware: Middleware[],
registry: Registry
): ComposedMiddleware<MiddlewareContext> {
return compose([
...middleware,
function getJobFnFromRegistry(ctx, next) {
const {
job: { jobtype },
} = ctx;
ctx.fn = registry[jobtype];
return next();
},
async function callJobFn(ctx, next) {
const {
fn,
job: { jobtype, args },
} = ctx;
if (!fn) throw new Error(`No jobtype registered: ${jobtype}`);
const thunkOrPromise = await fn(...args);
if (typeof thunkOrPromise === "function") {
await thunkOrPromise(ctx);
} else {
await thunkOrPromise;
}
return next();
},
]);
}
Loading

0 comments on commit 96019af

Please sign in to comment.