diff --git a/lib/PTask.ts b/lib/PTask.ts index 06a2762..cb8dd8c 100644 --- a/lib/PTask.ts +++ b/lib/PTask.ts @@ -86,13 +86,18 @@ export class PTask { } public async run(): Promise { - const newRes = await ProcessingPriorityQueue.getInstance( - this.queueName - ).enqueue(this); - const result = this.resultsMerge(this.resSoFar, newRes); - this.removeSelfFromQueue(); - this._status = "completed"; - return result; + try { + const newRes = await ProcessingPriorityQueue.getInstance( + this.queueName + ).enqueue(this); + + return this.resultsMerge(this.resSoFar, newRes); + } finally { + this.removeSelfFromQueue(); + if (this._status === 'running') { + this._status = "completed"; + } + } } public async pause(): Promise { diff --git a/tests/unit/PTask.test.ts b/tests/unit/PTask.test.ts index b705002..968fac2 100644 --- a/tests/unit/PTask.test.ts +++ b/tests/unit/PTask.test.ts @@ -670,7 +670,7 @@ describe("PriorityTask", () => { }); }); - it("should be removed from the queue when complete", (done) => { + it("should be removed from the queue when complete successfully", (done) => { const ptask = new PTask({ args: undefined, priority: 1, @@ -685,6 +685,23 @@ describe("PriorityTask", () => { }); expect(PTask.getAllPTasks("krombopulos").length).toBe(1); }); + + it('should be removed from the queue when complete unsuccessfully', (done) => { + const ptask = new PTask({ + args: undefined, + priority: 1, + onRun: async () => + await new Promise((resolve, reject) => setTimeout(reject, 500)), + queueName: "krombopulos-2", + }); + + ptask.run() + .catch(() => { + expect(PTask.getAllPTasks("krombopulos-2").length).toBe(0); + done(); + }); + expect(PTask.getAllPTasks("krombopulos-2").length).toBe(1); + }) it("should not schedule more than concurrencyLimit items", (done) => { const CONCURRENCY_LIMIT = 2;