diff --git a/.eslintignore b/.eslintignore index a03c4c0..a9c4c03 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1 +1,2 @@ -*.test.js +*.test.ts +lib diff --git a/.eslintrc.json b/.eslintrc.json index 42fa85e..0b8c10e 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -1,5 +1,17 @@ { - "extends": "airbnb-base", + "parser": "@typescript-eslint/parser", + "plugins": [ + "@typescript-eslint" + ], + "extends": [ + "plugin:@typescript-eslint/recommended", + "prettier/@typescript-eslint", + "plugin:prettier/recommended" + ], + "parserOptions": { + "ecmaVersion": 2020, + "sourceType": "module" + }, "env": { "node": true, "es6": true diff --git a/.npmignore b/.npmignore index cdcff95..6fb3da9 100644 --- a/.npmignore +++ b/.npmignore @@ -1,4 +1,4 @@ -test +__tests__ *.test.js docs coverage diff --git a/.prettierrc.js b/.prettierrc.js new file mode 100644 index 0000000..f053ebf --- /dev/null +++ b/.prettierrc.js @@ -0,0 +1 @@ +module.exports = {}; diff --git a/.travis.yml b/.travis.yml index 8ae0c58..39f4ad6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ before_script: done script: - npm run lint + - npm run build - npm test after_success: - './node_modules/.bin/nyc report --reporter=text-lcov | ./node_modules/.bin/coveralls' diff --git a/bin/faktory-work b/bin/faktory-work index a348a81..b2e073c 100755 --- a/bin/faktory-work +++ b/bin/faktory-work @@ -1,8 +1,6 @@ #!/usr/bin/env node const faktory = require("../lib/faktory"); -console.log(faktory); -process.exit(0); const program = require("../lib/cli"); faktory.work(program).catch((error) => { diff --git a/package.json b/package.json index fdd3b5a..cb464cb 100644 --- a/package.json +++ b/package.json @@ -9,13 +9,15 @@ "watch": "tsc -w", "build": "tsc", "clean": "rm -rf lib/*", - "test": "nyc --cache ava src/__tests__/*", + "test": "nyc --cache ava src/__tests__/**/*", + "test:unit": "nyc --cache ava src/__tests__/unit/*", + "test:integration": "nyc --cache ava src/__tests__/integration/*", "test:watch": "npm run test -- -w", "coverage": "nyc report --reporter=html", - "lint": "eslint src", + "lint": "eslint . --ext .ts", "htmldocs": "jsdoc . lib -d docs --readme README.md --pedantic", "docs": "jsdoc2md --files lib/*.js --template docs/template.hbs > docs/api.md", - "preversion": "npm run lint && npm run test && npm run docs", + "preversion": "npm run lint && npm run build && npm run test && npm run docs", "version": "git add docs/ package.json", "postversion": "git push && git push --tags" }, @@ -49,19 +51,27 @@ "engines": { "node": ">=7" }, + "types": "lib/index.d.ts", "devDependencies": { "@ava/typescript": "^1.1.1", + "@types/debug": "^4.1.5", + "@types/generic-pool": "^3.1.9", + "@types/koa-compose": "^3.2.5", "@types/node": "^14.0.11", "@types/uuid": "^8.0.0", + "@typescript-eslint/eslint-plugin": "^3.1.0", + "@typescript-eslint/parser": "^3.1.0", "ava": "^3.8.2", "coveralls": "3.1.0", - "eslint": "^4.10", - "eslint-config-airbnb-base": "13.2.0", - "eslint-plugin-import": "2.18.2", + "eslint": "^7.2.0", + "eslint-config-prettier": "^6.11.0", + "eslint-plugin-import": "^2.21.1", + "eslint-plugin-prettier": "^3.1.3", "get-port": "4.2.0", "jsdoc": "3.6.3", "jsdoc-to-markdown": "^3.1.0-1", "nyc": "13.3.0", + "prettier": "^2.0.5", "sinon": "9.0.2", "typescript": "^3.9.5" }, @@ -69,7 +79,7 @@ "commander": "2.20.0", "debug": "^4.1.0", "generic-pool": "3.7.1", - "koa-compose": "^4.0.0", + "koa-compose": "^4.1.0", "redis-parser": "^3.0.0", "uuid": "3.3.3" } diff --git a/src/@types/redis-parser/index.d.ts b/src/@types/redis-parser/index.d.ts new file mode 100644 index 0000000..5d05c0e --- /dev/null +++ b/src/@types/redis-parser/index.d.ts @@ -0,0 +1,15 @@ +declare module "redis-parser" { + type Config = { + returnReply: (response: string) => void; + returnError: (error: Error) => void; + }; + + class RedisParser { + // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types + constructor(config: Config); + execute(buffer: Buffer): (err: Error | null, response: string) => void; + } + + // export = RedisParser; + export default RedisParser; +} diff --git a/src/__tests__/_helper.ts b/src/__tests__/_helper.ts index 10dda54..81fa5c9 100644 --- a/src/__tests__/_helper.ts +++ b/src/__tests__/_helper.ts @@ -1,10 +1,9 @@ -import { TestInterface } from 'ava'; -import { Socket, createServer, Server } from 'net'; -const uuid = require('uuid/v4'); -import getPort from 'get-port'; -import Client from '../client'; +import { TestInterface } from "ava"; +import { Socket, createServer, Server } from "net"; +import { v4 as uuid } from "uuid"; +import getPort from "get-port"; +import Client from "../client"; import { JobPayload, PartialJobPayload } from "../job"; -import { Command } from "../connection"; export type ServerControl = { socket: Socket; @@ -12,7 +11,7 @@ export type ServerControl = { data?: string; }; -export const mockServer = () => { +export const mockServer = (): Server => { const server = createServer(); server.on("connection", (socket) => { @@ -38,7 +37,11 @@ export const mockServer = () => { return server; }; -export const mocked = async (fn: (server: Server, port: number) => any | void) => { +type ServerUser = { + (server: Server, port: number): Promise; +}; + +export const mocked = async (fn: ServerUser): ReturnType => { const server = mockServer(); const port = await getPort(); server.listen(port, "127.0.0.1"); @@ -81,15 +84,15 @@ mocked.info = () => ({ socket }: ServerControl) => { socket.write(`$${json.length}\r\n${json}\r\n`); }; -export const sleep = (ms: number, value: any = true) => { +export const sleep = (ms: number, value?: unknown): Promise => { return new Promise((resolve) => setTimeout(() => resolve(value), ms)); }; -export const randQueue = (label: string = "test") => { +export const randQueue = (label = "test"): string => { return `${label}-${uuid().slice(0, 6)}`; }; -export const createJob = (...args: any[]): PartialJobPayload => { +export const createJob = (...args: unknown[]): PartialJobPayload => { return { jobtype: "testJob", queue: randQueue(), @@ -97,9 +100,13 @@ export const createJob = (...args: any[]): PartialJobPayload => { }; }; -export const push = async ( - { args, queue, jobtype }: { args?: any[], queue?: string, jobtype?: string } = {} -) => { +export const push = async ({ + args, + queue, + jobtype, +}: { args?: unknown[]; queue?: string; jobtype?: string } = {}): Promise< + JobPayload +> => { const client = new Client(); const job = client.job(jobtype || "test"); @@ -113,9 +120,9 @@ export const push = async ( return job; }; -export const flush = () => new Client().flush(); +export const flush = (): Promise => new Client().flush(); -export function registerCleaner(test: TestInterface) { +export function registerCleaner(test: TestInterface): void { test.beforeEach(async () => { await flush(); }); diff --git a/src/__tests__/client.test.ts b/src/__tests__/client.test.ts index 5b3a201..5b8cec3 100644 --- a/src/__tests__/client.test.ts +++ b/src/__tests__/client.test.ts @@ -25,20 +25,22 @@ test("#buildHello: client builds a passwordless ahoy", (t) => { }); test("#buildHello: client builds a salty ahoy", (t) => { - const client = new Client(); + const client = new Client({ + password: "abcde123", + }); const hello = client.buildHello({ i: 3, s: "123", v: 3 }); t.is( hello.pwdhash, - "3180b4071170db0ae9f666167ed379f53468463f152e3c3cfb57d1de45fd01d6" + "ef646abadf4ffba660d9bbb8de8e45576970de917b4c9da8cad96b49e64636d9" ); }); test("#buildHello: wid is present in HELLO", (t) => { const client = new Client({ wid: "workerid" }); - const hello = client.buildHello({ v: 2, s: 'abc', i: 3 }); + const hello = client.buildHello({ v: 2, s: "abc", i: 3 }); t.is(hello.wid, client.wid, "wid in ahoy does not match"); }); @@ -144,7 +146,7 @@ test("#beat: sends a heartbeat", async (t) => { }); test("#beat: returns a signal from the server", async (t) => { - return mocked(async (server, port) => { + await mocked(async (server, port) => { server.on("BEAT", mocked.beat("quiet")); const client = new Client({ port }); @@ -170,7 +172,7 @@ test("#connect: rejects if handshake is not successful", async (t) => { test("#connect: connects explicitly", async (t) => { t.plan(2); - return mocked(async (server, port) => { + await mocked(async (server, port) => { server .on("HELLO", () => { t.is(1, 1); @@ -202,7 +204,7 @@ test("#ack: ACKs a job", async (t) => { }); test("#fetch: returns null when queue is empty", async (t) => { - return mocked(async (server, port) => { + await mocked(async (server, port) => { server.on("FETCH", ({ socket }) => { // null bulkstring socket.write("$-1\r\n"); @@ -215,7 +217,7 @@ test("#fetch: returns null when queue is empty", async (t) => { test("#push: defaults job payload values according to spec", async (t) => { let serverJob: JobPayload; - return mocked(async (server, port) => { + await mocked(async (server, port) => { server.on("PUSH", ({ data, socket }) => { serverJob = data; socket.write("+OK\r\n"); @@ -282,7 +284,7 @@ test("#job: provides the args to the job", (t) => { }); test("#job: push sends job specification to server", async (t) => { - return mocked(async (server, port) => { + await mocked(async (server, port) => { server.on("PUSH", ({ data, socket }) => { socket.write("+OK\r\n"); const { jobtype, args, custom, retry } = data; @@ -302,7 +304,7 @@ test("#job: push sends job specification to server", async (t) => { }); test("#job: push resolves with the jid", async (t) => { - return mocked(async (server, port) => { + await mocked(async (server, port) => { server.on("PUSH", ({ data, socket }) => { socket.write("+OK\r\n"); }); diff --git a/src/__tests__/connection.test.ts b/src/__tests__/connection.test.ts index 90cae18..6ebd4f4 100644 --- a/src/__tests__/connection.test.ts +++ b/src/__tests__/connection.test.ts @@ -1,31 +1,31 @@ -import test from 'ava'; +import test from "ava"; import Connection from "../connection"; import { mocked, registerCleaner } from "./_helper"; registerCleaner(test); -test("#open: resolves after HI", (t) => { - return mocked(async (server, port) => { +test("#open: resolves after HI", async (t) => { + await mocked(async (server, port) => { let acc = ""; server.on("HI", () => (acc += "A")); const conn = new Connection(port); - const resolved = await conn.open(); + await conn.open(); acc += "B"; t.is(acc, "AB"); }); }); -test("#open: resolves with the server greeting", (t) => { - return mocked(async (_, port) => { +test("#open: resolves with the server greeting", async (t) => { + await mocked(async (_, port) => { const conn = new Connection(port); const greeting = await conn.open(); - t.deepEqual(greeting, { v: 2, s: 'abc', i: 3 }); + t.deepEqual(greeting, { v: 2, s: "abc", i: 3 }); }); }); -test("#close: connects after disconnect", (t) => { - return mocked(async (server, port) => { +test("#close: connects after disconnect", async (t) => { + await mocked(async (server, port) => { let acc = ""; server.on("connection", () => (acc += "Co")); const conn = new Connection(port); @@ -40,8 +40,8 @@ test("#close: connects after disconnect", (t) => { }); }); -test("#close: emits close", (t) => { - return mocked(async (_, port) => { +test("#close: emits close", async (t) => { + await mocked(async (_, port) => { const conn = new Connection(port); conn.on("close", () => t.pass()); await conn.open(); @@ -81,8 +81,8 @@ test.cb("#open: emits error when connection fails to connect", (t) => { .then(); }); -test("#send: resolves with server response", (t) => { - return mocked(async (_, port) => { +test("#send: resolves with server response", async (t) => { + await mocked(async (_, port) => { const conn = new Connection(port); await conn.open(); const resp = await conn.send(["HELLO", '{ "v": 2, "s": "abc", "i": 3 }']); @@ -90,8 +90,8 @@ test("#send: resolves with server response", (t) => { }); }); -test("#sendWithAssert: throws when response does not match assertion", (t) => { - return mocked(async (_, port) => { +test("#sendWithAssert: throws when response does not match assertion", async (t) => { + await mocked(async (_, port) => { const conn = new Connection(port); await conn.open(); return t.throwsAsync( @@ -104,27 +104,31 @@ test("#sendWithAssert: throws when response does not match assertion", (t) => { }); }); -test("#sendWithAssert: does not throw when response matches assertion", (t) => { - return mocked(async (_, port) => { +test("#sendWithAssert: does not throw when response matches assertion", async (t) => { + await mocked(async (_, port) => { const conn = new Connection(port); await conn.open(); - return t.notThrowsAsync(conn.sendWithAssert(["HELLO", '{ "v": 2, "s": "abc", "i": 3 }'], "OK")); + return t.notThrowsAsync( + conn.sendWithAssert(["HELLO", '{ "v": 2, "s": "abc", "i": 3 }'], "OK") + ); }); }); -test("#send: throws when the server responds with error", (t) => { - return mocked(async (server, port) => { +test("#send: throws when the server responds with error", async (t) => { + await mocked(async (server, port) => { server.on("INFO", ({ socket }) => { socket.write("-ERR Something is wrong\r\n"); }); const conn = new Connection(port); await conn.open(); - return t.throwsAsync(conn.send(["INFO"]), { message: /something is wrong/i }); + return t.throwsAsync(conn.send(["INFO"]), { + message: /something is wrong/i, + }); }); }); -test("#send: emits timeout when exceeds deadline", (t) => { - return mocked(async (server, port) => { +test("#send: emits timeout when exceeds deadline", async (t) => { + await mocked(async (server, port) => { let acc = ""; server.on("INFO", ({ socket }) => { setTimeout(() => mocked.ok()({ socket }), 301); diff --git a/src/__tests__/faktory.test.ts b/src/__tests__/faktory.test.ts index 84f1cba..dfae8b8 100644 --- a/src/__tests__/faktory.test.ts +++ b/src/__tests__/faktory.test.ts @@ -1,11 +1,11 @@ import test from "ava"; -import create from '../faktory'; -import Client from '../client'; -import Worker from '../worker'; +import create from "../faktory"; +import Client from "../client"; +import Worker from "../worker"; import { sleep, mocked, registerCleaner } from "./_helper"; -// registerCleaner(test); +registerCleaner(test); test("#register: returns self", (t) => { const faktory = create(); @@ -59,21 +59,23 @@ test(".connect() resolves a client", async (t) => { t.truthy(client instanceof Client); }); -test(".work() creates a worker, runs it and resolve the worker", async (t) => { - t.plan(1); +test(".work() creates a worker, runs, then resolves the worker", async (t) => { + t.plan(3); await mocked(async (server, port) => { server .on("BEAT", ({ socket }) => { socket.write("+OK\r\n"); + t.true(true); }) .on("FETCH", async ({ socket }) => { await sleep(10); + t.true(true); socket.write("$-1\r\n"); }); const faktory = create(); const worker = await faktory.work({ port, concurrency: 1 }); - t.truthy(worker.heartbeat, "worker not started (no heartbeat)"); + t.true(worker instanceof Worker); await worker.stop(); }); diff --git a/src/__tests__/hash.test.ts b/src/__tests__/hash.test.ts index 156aac7..b1b59b8 100644 --- a/src/__tests__/hash.test.ts +++ b/src/__tests__/hash.test.ts @@ -1,6 +1,7 @@ -import test from 'ava'; -import { createHash } from 'crypto'; -import hash from '../hash'; +import test from "ava"; + +import { createHash } from "crypto"; +import hash from "../hash"; test("client builds a hex pwdhash with salt", (t) => { const iterations = 10; diff --git a/src/__tests__/job.test.ts b/src/__tests__/job.test.ts index 67ad314..ba5633c 100644 --- a/src/__tests__/job.test.ts +++ b/src/__tests__/job.test.ts @@ -1,5 +1,5 @@ import test from "ava"; -import Job from '../job'; +import Job from "../job"; import Client from "../client"; test(".jid: generates a uuid jid", (t) => { diff --git a/src/__tests__/middleware.test.ts b/src/__tests__/middleware.test.ts index fd294a6..fd8916b 100644 --- a/src/__tests__/middleware.test.ts +++ b/src/__tests__/middleware.test.ts @@ -1,8 +1,9 @@ import test from "ava"; -import Worker from '../worker'; +import Worker from "../worker"; import faktoryControlCreator from "../faktory"; import { sleep, push, registerCleaner } from "./_helper"; +import { MiddlewareContext } from "../types"; registerCleaner(test); @@ -52,12 +53,12 @@ test("invokes middleware in order", async (t) => { }); await new Promise((resolve) => { - worker.registry[jobtype] = async () => { + worker.register(jobtype, async () => { recorder.push("run 1"); await sleep(1); recorder.push("run 2"); resolve(); - }; + }); worker.work(); }); @@ -83,15 +84,19 @@ test(".use() adds middleware to the stack", (t) => { ); }); +type MyAppContext = { + memo: string[]; +} & MiddlewareContext; + test("middleware context is passed to job thunk", async (t) => { const { queue, jobtype } = await push({ args: [1] }); const control = faktoryControlCreator(); - control.use((ctx, next) => { + control.use((ctx: MyAppContext, next) => { ctx.memo = ["hello"]; return next(); }); - control.use((ctx, next) => { + control.use((ctx: MyAppContext, next) => { ctx.memo.push("world"); return next(); }); diff --git a/src/__tests__/mutation.test.ts b/src/__tests__/mutation.test.ts index 83a74bc..871cba5 100644 --- a/src/__tests__/mutation.test.ts +++ b/src/__tests__/mutation.test.ts @@ -1,7 +1,7 @@ import test from "ava"; import Client from "../client"; -import Mutation from "../mutation"; +import Mutation, { SCHEDULED, RETRIES, DEAD } from "../mutation"; import { mocked, registerCleaner } from "./_helper"; registerCleaner(test); @@ -87,13 +87,13 @@ test("integration: #requeue moves retries to queue", async (t) => { test("#clear clears retries", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "clear", - target: "retries", + target: RETRIES, filter: { jobtype: "clearsRetries", jids: ["123"], @@ -113,13 +113,13 @@ test("#clear clears retries", async (t) => { test("#clear clears scheduled jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "clear", - target: "scheduled", + target: SCHEDULED, filter: { jobtype: "clearsScheduled", jids: ["123"], @@ -139,13 +139,13 @@ test("#clear clears scheduled jobs", async (t) => { test("#clear clears dead jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "clear", - target: "dead", + target: DEAD, filter: { jobtype: "clearsDead", jids: ["123"], @@ -165,13 +165,13 @@ test("#clear clears dead jobs", async (t) => { test("#kill kills retries", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "kill", - target: "retries", + target: RETRIES, filter: { jobtype: "killsRetries", jids: ["123"], @@ -191,13 +191,13 @@ test("#kill kills retries", async (t) => { test("#kill kills scheduled jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "kill", - target: "scheduled", + target: SCHEDULED, filter: { jobtype: "killsScheduled", jids: ["123"], @@ -217,13 +217,13 @@ test("#kill kills scheduled jobs", async (t) => { test("#kill kills dead jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "kill", - target: "dead", + target: DEAD, filter: { jobtype: "killsDead", jids: ["123"], @@ -243,13 +243,13 @@ test("#kill kills dead jobs", async (t) => { test("#discard discards retries", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "discard", - target: "retries", + target: RETRIES, filter: { jobtype: "discardsRetries", jids: ["123"], @@ -269,13 +269,13 @@ test("#discard discards retries", async (t) => { test("#discard discards scheduled jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "discard", - target: "scheduled", + target: SCHEDULED, filter: { jobtype: "discardsScheduled", jids: ["123"], @@ -295,13 +295,13 @@ test("#discard discards scheduled jobs", async (t) => { test("#discard discards dead jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "discard", - target: "dead", + target: DEAD, filter: { jobtype: "discardsDead", jids: ["123"], @@ -321,13 +321,13 @@ test("#discard discards dead jobs", async (t) => { test("#requeue requeues retries", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "requeue", - target: "retries", + target: RETRIES, filter: { jobtype: "requeuesRetries", jids: ["123"], @@ -347,13 +347,13 @@ test("#requeue requeues retries", async (t) => { test("#requeue requeues scheduled jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "requeue", - target: "scheduled", + target: SCHEDULED, filter: { jobtype: "requeuesScheduled", jids: ["123"], @@ -373,13 +373,13 @@ test("#requeue requeues scheduled jobs", async (t) => { test("#requeue requeues dead jobs", async (t) => { t.plan(1); - return mocked(async (server, port) => { + await mocked(async (server, port) => { const client = new Client({ port }); server.on("MUTATE", ({ data, socket }) => { t.deepEqual(data, { cmd: "requeue", - target: "dead", + target: DEAD, filter: { jobtype: "requeuesDead", jids: ["123"], @@ -398,20 +398,26 @@ test("#requeue requeues dead jobs", async (t) => { }); test("#matching disallows nonstrings", (t) => { - t.throws(() => { - const mutation = new Mutation(new Client()); - // @ts-ignore - mutation.matching(new RegExp("something")); - }, { message: /redis SCAN/ }); + t.throws( + () => { + const mutation = new Mutation(new Client()); + // @ts-ignore + mutation.matching(new RegExp("something")); + }, + { message: /redis SCAN/ } + ); }); test("#ofType disallows nonstring argument", (t) => { - t.throws(() => { - const mutation = new Mutation(new Client()); - const MyJob = () => {}; - // @ts-ignore - mutation.ofType(MyJob); - }, { - message: /must be a string/i - }); + t.throws( + () => { + const mutation = new Mutation(new Client()); + const MyJob = () => {}; + // @ts-ignore + mutation.ofType(MyJob); + }, + { + message: /must be a string/i, + } + ); }); diff --git a/src/__tests__/parser.test.ts b/src/__tests__/parser.test.ts index b2b56c9..d7f1576 100644 --- a/src/__tests__/parser.test.ts +++ b/src/__tests__/parser.test.ts @@ -1,5 +1,6 @@ import test from "ava"; -import Parser from '../parser'; + +import Parser from "../parser"; test.cb("parses HI", (t) => { const parser = new Parser(); diff --git a/src/__tests__/signals.test.ts b/src/__tests__/signals.test.ts index b2a2390..acd4b24 100644 --- a/src/__tests__/signals.test.ts +++ b/src/__tests__/signals.test.ts @@ -1,6 +1,6 @@ import test from "ava"; -import { Worker } from '../faktory'; +import { Worker } from "../faktory"; import { sleep, push, mocked, registerCleaner } from "./_helper"; registerCleaner(test); diff --git a/src/__tests__/work.test.ts b/src/__tests__/work.test.ts index 1137071..369df6c 100644 --- a/src/__tests__/work.test.ts +++ b/src/__tests__/work.test.ts @@ -8,7 +8,7 @@ const concurrency = 1; registerCleaner(test); -function create(options: WorkerOptions = {}) { +function create(options: WorkerOptions = {}): Worker { return new Worker(Object.assign({ concurrency }, options)); } @@ -39,7 +39,7 @@ test("awaits async jobfns", async (t) => { const worker = create({ queues: [queue], registry: { - [jobtype]: async (...args: any[]) => { + [jobtype]: async (...args: unknown[]) => { await sleep(1); t.deepEqual(args, [1, 2, "three"], "args do not match"); resolve(); @@ -59,7 +59,7 @@ test("handles sync jobfn and sync thunk", async (t) => { const worker = create({ queues: [queue], registry: { - [jobtype]: (...args: any[]) => ({ job }: MiddlewareContext) => { + [jobtype]: (...args) => ({ job }: MiddlewareContext) => { t.is(job.jid, jid, "jid does not match"); t.deepEqual(args, [1, 2, "three"], "args do not match"); resolve(); @@ -95,20 +95,20 @@ test("handles sync jobfn and async (thunk)", async (t) => { test("handles async jobfn and sync thunk", async (t) => { const args = [1, 2, "three"]; const { queue, jobtype, jid } = await push({ args }); - - await new Promise((resolve) => { - const worker = create({ - queues: [queue], - registry: { - [jobtype]: async (...args) => ({ job }: MiddlewareContext) => { - t.is(job.jid, jid, "jid does not match"); - t.deepEqual(args, [1, 2, "three"], "args do not match"); - resolve(); - }, - }, - }); - - worker.work(); + const worker = create({ queues: [queue] }); + + await new Promise(async (resolve) => { + worker.register( + jobtype, + async (...args) => ({ job }: MiddlewareContext) => { + t.is(job.jid, jid, "jid does not match"); + t.deepEqual(args, [1, 2, "three"], "args do not match"); + resolve(); + } + ); + + await worker.work(); + await worker.stop(); }); }); @@ -120,7 +120,7 @@ test("handles async jobfn and async thunk", async (t) => { const worker = create({ queues: [queue], registry: { - [jobtype]: async (...args: any[]) => async ({ job }: MiddlewareContext) => { + [jobtype]: async (...args) => async ({ job }: MiddlewareContext) => { await sleep(1); t.is(job.jid, jid, "jid does not match"); t.deepEqual(args, [1, 2, "three"], "args do not match"); @@ -134,7 +134,7 @@ test("handles async jobfn and async thunk", async (t) => { }); test(".handle() FAILs and throws when no job is registered", async (t) => { - const job = { jid: "123", jobtype: "Unknown", args: [], queue: 'default' }; + const job = { jid: "123", jobtype: "Unknown", args: [], queue: "default" }; await mocked(async (server, port) => { let worker: Worker; @@ -156,7 +156,7 @@ test(".handle() FAILs and throws when no job is registered", async (t) => { test(".handle() FAILs and throws when the job throws (sync) during execution", async (t) => { const jid = "123"; const jobtype = "failingjob"; - const queue = 'default'; + const queue = "default"; const job = { jid, jobtype, args: [], queue }; await mocked(async (server, port) => { let worker: Worker; @@ -188,7 +188,7 @@ test(".handle() FAILs and throws when the job throws (sync) during execution", a test(".handle() FAILs and throws when the job rejects (async) during execution", async (t) => { const jid = "123"; const jobtype = "failingjob"; - const queue = 'default'; + const queue = "default"; const job = { jid, jobtype, args: [], queue }; await mocked(async (server, port) => { let worker: Worker; @@ -220,7 +220,7 @@ test(".handle() FAILs and throws when the job rejects (async) during execution", test(".handle() FAILs when the job returns a rejected promise with no error", async (t) => { const jid = "123"; const jobtype = "failingjob"; - const queue = 'default'; + const queue = "default"; const job = { jid, jobtype, args: [], queue }; await mocked(async (server, port) => { let worker: Worker; diff --git a/src/__tests__/worker.test.ts b/src/__tests__/worker.test.ts index f8debb4..c5212eb 100644 --- a/src/__tests__/worker.test.ts +++ b/src/__tests__/worker.test.ts @@ -1,7 +1,7 @@ import test from "ava"; import Worker from "../worker"; -import { mocked, registerCleaner } from './_helper'; +import { mocked, registerCleaner } from "./_helper"; registerCleaner(test); @@ -40,11 +40,29 @@ test("passes the password to the client", (t) => { test("passes poolSize option to Client", (t) => { const worker = new Worker({ poolSize: 8 }); - t.is(worker.client.pool._config.max, 8); + t.is(worker.client.pool.size, 8); +}); + +test.only("allows registering job functions", async (t) => { + await mocked(async (server, port) => { + server + .on("BEAT", mocked.beat()) + .on("ACK", mocked.ok()) + .on( + "FETCH", + mocked.fetch({ jid: "123", jobtype: "test", args: [], queue: "defaut" }) + ); + const worker = new Worker({ concurrency: 1, port }); + + worker.register("test", () => t.pass()); + + await worker.work(); + await worker.stop(); + }); }); test("hearbeats", async (t) => { - return mocked(async (server, port) => { + await mocked(async (server, port) => { let worker: Worker; let called = 0; diff --git a/src/cli.ts b/src/cli.ts index 7186020..238d501 100755 --- a/src/cli.ts +++ b/src/cli.ts @@ -1,34 +1,43 @@ #!/usr/bin/env node -const program = require('commander'); -const { version } = require('../package.json'); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const program = require("commander"); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { version } = require("../package.json"); function collect(val: string, memo: Array) { return memo.concat(val); } -function collectSplit(val: string, memo: Array) { - return memo.concat(val.split(',')); +function collectSplit(val: string, memo: Array) { + return memo.concat(val.split(",")); } module.exports = program .version(`faktory-worker ${version}`) - .usage('[options]') - .description(` + .usage("[options]") + .description( + ` ___ __ __ __ / _/__ _/ /__/ /____ ______ __ _ _____ ____/ /_____ ____ / _/ _ \`/ '_/ __/ _ \\/ __/ // / | |/|/ / _ \\/ __/ '_/ -_) __/ /_/ \\_,_/_/\\_\\\\__/\\___/_/ \\_, / |__,__/\\___/_/ /_/\\_\\\\__/_/ /___/ - `) - .option('-q, --queue ', 'queues to process with optional weights', collectSplit, []) - .option('-c, --concurrency ', 'number of concurrent workers', parseInt) - .option('-t, --timeout ', 'shutdown timeout', parseInt) + ` + ) + .option( + "-q, --queue ", + "queues to process with optional weights", + collectSplit, + [] + ) + .option("-c, --concurrency ", "number of concurrent workers", parseInt) + .option("-t, --timeout ", "shutdown timeout", parseInt) // .option('-e, --environment ', 'application environment') - .option('-l, --label