Skip to content

Commit f55503c

Browse files
committedApr 13, 2018
Added Job lifecycle callbacks functionality.
1 parent 8badc22 commit f55503c

File tree

5 files changed

+779
-17
lines changed

5 files changed

+779
-17
lines changed
 

‎Models/Queue.js

+29-4
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,9 @@ export class Queue {
308308

309309
/**
310310
*
311-
* Execute a job.
311+
* Process a job.
312+
*
313+
* Job lifecycle callbacks are called as appropriate throughout the job processing lifecycle.
312314
*
313315
* Job is deleted upon successful completion.
314316
*
@@ -321,25 +323,39 @@ export class Queue {
321323
*/
322324
async processJob(job) {
323325

326+
// Data must be cloned off the realm job object for several lifecycle callbacks to work correctly.
327+
// This is because realm job is deleted before some callbacks are called if job processed successfully.
328+
// More info: https://github.com/billmalarky/react-native-queue/issues/2#issuecomment-361418965
329+
const jobName = job.name;
330+
const jobId = job.id;
331+
const jobPayload = JSON.parse(job.payload);
332+
333+
// Fire onStart job lifecycle callback
334+
this.worker.executeJobLifecycleCallback('onStart', jobName, jobId, jobPayload);
335+
324336
try {
325337

326338
await this.worker.executeJob(job);
327339

328-
// On job completion, remove job
340+
// On successful job completion, remove job
329341
this.realm.write(() => {
330342

331343
this.realm.delete(job);
332344

333345
});
334346

347+
// Job has processed successfully, fire onSuccess and onComplete job lifecycle callbacks.
348+
this.worker.executeJobLifecycleCallback('onSuccess', jobName, jobId, jobPayload);
349+
this.worker.executeJobLifecycleCallback('onComplete', jobName, jobId, jobPayload);
350+
335351
} catch (error) {
336352

337353
// Handle job failure logic, including retries.
354+
let jobData = JSON.parse(job.data);
355+
338356
this.realm.write(() => {
339357

340358
// Increment failed attempts number
341-
let jobData = JSON.parse(job.data);
342-
343359
if (!jobData.failedAttempts) {
344360
jobData.failedAttempts = 1;
345361
} else {
@@ -365,6 +381,15 @@ export class Queue {
365381

366382
});
367383

384+
// Execute job onFailure lifecycle callback.
385+
this.worker.executeJobLifecycleCallback('onFailure', jobName, jobId, jobPayload);
386+
387+
// If job has failed all attempts execute job onFailed and onComplete lifecycle callbacks.
388+
if (jobData.failedAttempts >= jobData.attempts) {
389+
this.worker.executeJobLifecycleCallback('onFailed', jobName, jobId, jobPayload);
390+
this.worker.executeJobLifecycleCallback('onComplete', jobName, jobId, jobPayload);
391+
}
392+
368393
}
369394

370395
}

‎Models/Worker.js

+37-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ export default class Worker {
4141

4242
// Attach options to worker
4343
worker.options = {
44-
concurrency: options.concurrency || 1
44+
concurrency: options.concurrency || 1,
45+
onStart: options.onStart || null,
46+
onSuccess: options.onSuccess || null,
47+
onFailure: options.onFailure || null,
48+
onFailed: options.onFailed || null,
49+
onComplete: options.onComplete || null
4550
};
4651

4752
Worker.workers[jobName] = worker;
@@ -119,4 +124,35 @@ export default class Worker {
119124

120125
}
121126

127+
/**
128+
*
129+
* Execute an asynchronous job lifecycle callback associated with related worker.
130+
*
131+
* @param callbackName {string} - Job lifecycle callback name.
132+
* @param jobName {string} - Name associated with jobs assigned to related worker.
133+
* @param jobId {string} - Unique id associated with job.
134+
* @param jobPayload {object} - Data payload associated with job.
135+
*/
136+
async executeJobLifecycleCallback(callbackName, jobName, jobId, jobPayload) {
137+
138+
// Validate callback name
139+
const validCallbacks = ['onStart', 'onSuccess', 'onFailure', 'onFailed', 'onComplete'];
140+
if (!validCallbacks.includes(callbackName)) {
141+
throw new Error('Invalid job lifecycle callback name.');
142+
}
143+
144+
// Fire job lifecycle callback if set.
145+
// Uses a try catch statement to gracefully degrade errors in production.
146+
if (Worker.workers[jobName].options[callbackName]) {
147+
148+
try {
149+
await Worker.workers[jobName].options[callbackName](jobId, jobPayload);
150+
} catch (error) {
151+
console.error(error); // eslint-disable-line no-console
152+
}
153+
154+
}
155+
156+
}
157+
122158
}

‎README.md

+51-7
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ A React Native at-least-once priority job queue / task queue backed by persisten
1717
* [Example Use Cases](#example-use-cases)
1818
* [Installation](#installation)
1919
* [Basic Usage](#basic-usage)
20-
* [Options](#options)
20+
* [Options and Job Lifecycle Callbacks](#options-and-job-lifecycle-callbacks)
2121
* [Testing with Jest](#testing-with-jest)
2222
* [Caveats](#caveats)
2323
* [Advanced Usage Examples](#advanced-usage-examples)
@@ -30,7 +30,7 @@ A React Native at-least-once priority job queue / task queue backed by persisten
3030
* **Simple API:** Set up job workers and begin creating your jobs in minutes with just two basic API calls
3131
* queue.addWorker(name, workerFunction, options = {})
3232
* queue.createJob(name, payload = {}, options = {}, startQueue = true)
33-
* **Powerful options:** Easily modify default functionality. Set job timeouts, number of retry attempts, priority, and worker concurrency with an options object. Start queue processing with a lifespan to easily meet OS background task time limits.
33+
* **Powerful options:** Easily modify default functionality. Set job timeouts, number of retry attempts, priority, job lifecycle callbacks, and worker concurrency with an options object. Start queue processing with a lifespan to easily meet OS background task time limits.
3434
* **Persistent Jobs:** Jobs are persisted with Realm. Because jobs persist, you can easily continue to process jobs across app restarts or in OS background tasks until completed or failed (or app is uninstalled).
3535
* **Powerful Integrations:** React Native Queue was designed to play well with others. The queue quickly integrates with a variety of OS background task and Worker packages so processing your jobs in a background service or dedicated thread have never been easier.
3636

@@ -147,19 +147,63 @@ console.log('The above jobs are processing in the background of app now.');
147147

148148
```
149149

150-
## Options
150+
## Options and Job Lifecycle Callbacks
151151

152-
#### Worker Options
152+
#### Worker Options (includes async job lifecycle callbacks)
153153

154-
queue.addWorker() accepts an options object in order to tweak standard functionality.
154+
queue.addWorker() accepts an options object in order to tweak standard functionality and allow you to hook into asynchronous job lifecycle callbacks.
155+
156+
**IMPORTANT: Job Lifecycle callbacks are called asynchronously.** They do not block job processing or each other. Don't put logic in onStart that you expect to be completed before the actual job process begins executing. Don't put logic in onFailure you expect to be completed before onFailed is called. You can, of course, assume that the job process has completed (or failed) before onSuccess, onFailure, onFailed, or onComplete are asynchonrously called.
155157

156158
```js
157159

158-
queue.addWorker('job-name-here', (id, payload) => { console.log(id); }, {
160+
queue.addWorker('job-name-here', async (id, payload) => { console.log(id); }, {
159161

160162
// Set max number of jobs for this worker to process concurrently.
161163
// Defaults to 1.
162-
concurrency: 5
164+
concurrency: 5,
165+
166+
// JOB LIFECYCLE CALLBACKS
167+
168+
// onStart job callback handler is fired when a job begins processing.
169+
//
170+
// IMPORTANT: Job lifecycle callbacks are executed asynchronously and do not block job processing
171+
// (even if the callback returns a promise it will not be "awaited" on).
172+
// As such, do not place any logic in onStart that your actual job worker function will depend on,
173+
// this type of logic should of course go inside the job worker function itself.
174+
onStart: async (id, payload) => {
175+
176+
console.log('Job "job-name-here" with id ' + id + ' has started processing.');
177+
178+
},
179+
180+
// onSuccess job callback handler is fired after a job successfully completes processing.
181+
onSuccess: async (id, payload) => {
182+
183+
console.log('Job "job-name-here" with id ' + id + ' was successful.');
184+
185+
},
186+
187+
// onFailure job callback handler is fired after each time a job fails (onFailed also fires if job has reached max number of attempts).
188+
onFailure: async (id, payload) => {
189+
190+
console.log('Job "job-name-here" with id ' + id + ' had an attempt end in failure.');
191+
192+
},
193+
194+
// onFailed job callback handler is fired if job fails enough times to reach max number of attempts.
195+
onFailed: async (id, payload) => {
196+
197+
console.log('Job "job-name-here" with id ' + id + ' has failed.');
198+
199+
},
200+
201+
// onComplete job callback handler fires after job has completed processing successfully or failed entirely.
202+
onComplete: async (id, payload) => {
203+
204+
console.log('Job "job-name-here" with id ' + id + ' has completed processing.');
205+
206+
}
163207

164208
});
165209

‎tests/Queue.test.js

+497-2
Original file line numberDiff line numberDiff line change
@@ -700,13 +700,23 @@ describe('Models/Queue', function() {
700700
it('#addWorker() and removeWorker() should pass calls through to Worker class', async () => {
701701

702702
const queue = await QueueFactory();
703-
const workerOptions = { concurrency: 4 };
703+
const workerOptions = {
704+
concurrency: 4,
705+
onSuccess: async (id, payload) => {}
706+
};
704707

705708
queue.addWorker('job-name', () => {}, workerOptions);
706709

707710
// first worker is added with default options.
708711
Worker.workers['job-name'].should.be.a.Function();
709-
Worker.workers['job-name'].options.should.deepEqual(workerOptions);
712+
Worker.workers['job-name'].options.should.deepEqual({
713+
concurrency: workerOptions.concurrency,
714+
onStart: null,
715+
onSuccess: workerOptions.onSuccess,
716+
onFailure: null,
717+
onFailed: null,
718+
onComplete: null
719+
});
710720

711721
queue.removeWorker('job-name');
712722

@@ -1581,4 +1591,489 @@ describe('Models/Queue', function() {
15811591

15821592
});
15831593

1594+
////
1595+
//// JOB LIFECYCLE CALLBACK TESTING
1596+
////
1597+
1598+
it('onStart lifecycle callback fires before job begins processing.', async () => {
1599+
1600+
const queue = await QueueFactory();
1601+
queue.flushQueue();
1602+
const jobName = 'job-name';
1603+
let jobProcessed = false;
1604+
let testFailed = false;
1605+
1606+
queue.addWorker(jobName, async (id, payload) => {
1607+
1608+
// Timeout needed because onStart runs async so we need to ensure this function gets
1609+
// executed last.
1610+
await new Promise((resolve) => {
1611+
setTimeout(() => {
1612+
jobProcessed = true;
1613+
resolve();
1614+
}, 0);
1615+
});
1616+
1617+
}, {
1618+
onStart: (id, payload) => {
1619+
1620+
// If onStart runs after job has processed, fail test.
1621+
if (jobProcessed) {
1622+
testFailed = true;
1623+
throw new Error('ERROR: onStart fired after job began processing.')
1624+
}
1625+
1626+
}
1627+
});
1628+
1629+
// Create a job
1630+
queue.createJob(jobName, { random: 'this is 1st random data' }, {}, false);
1631+
1632+
jobProcessed.should.equal(false);
1633+
testFailed.should.equal(false);
1634+
await queue.start();
1635+
jobProcessed.should.equal(true);
1636+
testFailed.should.equal(false);
1637+
1638+
});
1639+
1640+
it('onSuccess, onComplete lifecycle callbacks fire after job begins processing.', async () => {
1641+
1642+
const queue = await QueueFactory();
1643+
queue.flushQueue();
1644+
const jobName = 'job-name';
1645+
let jobProcessed = false;
1646+
let testFailed = false;
1647+
let onSuccessFired = false;
1648+
let onCompleteFired = false;
1649+
1650+
queue.addWorker(jobName, async (id, payload) => {
1651+
1652+
// Simulate work
1653+
await new Promise((resolve) => {
1654+
setTimeout(() => {
1655+
jobProcessed = true;
1656+
resolve();
1657+
}, 300);
1658+
});
1659+
1660+
}, {
1661+
onSuccess: (id, payload) => {
1662+
1663+
onSuccessFired = true;
1664+
1665+
// If onSuccess runs before job has processed, fail test.
1666+
if (!jobProcessed) {
1667+
testFailed = true;
1668+
throw new Error('ERROR: onSuccess fired before job began processing.')
1669+
}
1670+
1671+
},
1672+
onComplete: (id, payload) => {
1673+
1674+
onCompleteFired = true;
1675+
1676+
// If onComplete runs before job has processed, fail test.
1677+
if (!jobProcessed) {
1678+
testFailed = true;
1679+
throw new Error('ERROR: onComplete fired before job began processing.')
1680+
}
1681+
1682+
}
1683+
});
1684+
1685+
// Create a job
1686+
queue.createJob(jobName, { random: 'this is 1st random data' }, {}, false);
1687+
1688+
jobProcessed.should.equal(false);
1689+
testFailed.should.equal(false);
1690+
onSuccessFired.should.equal(false);
1691+
onCompleteFired.should.equal(false);
1692+
await queue.start();
1693+
jobProcessed.should.equal(true);
1694+
testFailed.should.equal(false);
1695+
onSuccessFired.should.equal(true);
1696+
onCompleteFired.should.equal(true);
1697+
1698+
});
1699+
1700+
it('onFailure, onFailed lifecycle callbacks fire after job begins processing.', async () => {
1701+
1702+
const queue = await QueueFactory();
1703+
queue.flushQueue();
1704+
const jobName = 'job-name';
1705+
let jobProcessStarted = false;
1706+
let testFailed = false;
1707+
1708+
queue.addWorker(jobName, async (id, payload) => {
1709+
1710+
// Simulate work
1711+
await new Promise((resolve, reject) => {
1712+
setTimeout(() => {
1713+
jobProcessStarted = true;
1714+
reject(new Error('Job failed.'));
1715+
}, 300);
1716+
});
1717+
1718+
}, {
1719+
onFailure: (id, payload) => {
1720+
1721+
// If onFailure runs before job has processed, fail test.
1722+
if (!jobProcessStarted) {
1723+
testFailed = true;
1724+
throw new Error('ERROR: onFailure fired before job began processing.')
1725+
}
1726+
1727+
},
1728+
onFailed: (id, payload) => {
1729+
1730+
// If onFailed runs before job has processed, fail test.
1731+
if (!jobProcessStarted) {
1732+
testFailed = true;
1733+
throw new Error('ERROR: onFailed fired before job began processing.')
1734+
}
1735+
1736+
}
1737+
});
1738+
1739+
// Create a job
1740+
queue.createJob(jobName, { random: 'this is 1st random data' }, {}, false);
1741+
1742+
jobProcessStarted.should.equal(false);
1743+
testFailed.should.equal(false);
1744+
await queue.start();
1745+
jobProcessStarted.should.equal(true);
1746+
testFailed.should.equal(false);
1747+
1748+
});
1749+
1750+
it('onFailure, onFailed lifecycle callbacks work as expected.', async () => {
1751+
1752+
const queue = await QueueFactory();
1753+
queue.flushQueue();
1754+
const jobName = 'job-name';
1755+
let jobAttemptCounter = 0;
1756+
let onFailureFiredCounter = 0;
1757+
let onFailedFiredCounter = 0;
1758+
1759+
queue.addWorker(jobName, async (id, payload) => {
1760+
1761+
// Simulate work
1762+
await new Promise((resolve, reject) => {
1763+
setTimeout(() => {
1764+
jobAttemptCounter++;
1765+
reject(new Error('Job failed.'));
1766+
}, 0);
1767+
});
1768+
1769+
}, {
1770+
1771+
onFailure: (id, payload) => {
1772+
1773+
onFailureFiredCounter++;
1774+
1775+
},
1776+
onFailed: (id, payload) => {
1777+
1778+
onFailedFiredCounter++;
1779+
1780+
}
1781+
});
1782+
1783+
const attempts = 3;
1784+
1785+
// Create a job
1786+
queue.createJob(jobName, { random: 'this is 1st random data' }, {
1787+
attempts
1788+
}, false);
1789+
1790+
jobAttemptCounter.should.equal(0);
1791+
await queue.start();
1792+
onFailureFiredCounter.should.equal(attempts);
1793+
onFailedFiredCounter.should.equal(1);
1794+
jobAttemptCounter.should.equal(attempts);
1795+
1796+
});
1797+
1798+
it('onComplete fires only once on job with multiple attempts that ends in success.', async () => {
1799+
1800+
const queue = await QueueFactory();
1801+
queue.flushQueue();
1802+
const jobName = 'job-name';
1803+
let jobAttemptCounter = 0;
1804+
let onFailureFiredCounter = 0;
1805+
let onFailedFiredCounter = 0;
1806+
let onCompleteFiredCounter = 0;
1807+
const attempts = 3;
1808+
1809+
queue.addWorker(jobName, async (id, payload) => {
1810+
1811+
jobAttemptCounter++;
1812+
1813+
// Keep failing attempts until last attempt then success.
1814+
if (jobAttemptCounter < attempts) {
1815+
1816+
// Simulate work that fails
1817+
await new Promise((resolve, reject) => {
1818+
setTimeout(() => {
1819+
reject(new Error('Job failed.'));
1820+
}, 0);
1821+
});
1822+
1823+
} else {
1824+
1825+
// Simulate work that succeeds
1826+
await new Promise((resolve, reject) => {
1827+
setTimeout(() => {
1828+
resolve();
1829+
}, 0);
1830+
});
1831+
1832+
}
1833+
1834+
}, {
1835+
1836+
onFailure: (id, payload) => {
1837+
1838+
onFailureFiredCounter++;
1839+
1840+
},
1841+
onFailed: (id, payload) => {
1842+
1843+
onFailedFiredCounter++;
1844+
1845+
},
1846+
onComplete: (id, payload) => {
1847+
1848+
onCompleteFiredCounter++;
1849+
1850+
}
1851+
});
1852+
1853+
// Create a job
1854+
queue.createJob(jobName, { random: 'this is 1st random data succes' }, {
1855+
attempts
1856+
}, false);
1857+
1858+
jobAttemptCounter.should.equal(0);
1859+
await queue.start();
1860+
onFailureFiredCounter.should.equal(attempts - 1);
1861+
onFailedFiredCounter.should.equal(0);
1862+
jobAttemptCounter.should.equal(attempts);
1863+
onCompleteFiredCounter.should.equal(1);
1864+
1865+
});
1866+
1867+
it('onComplete fires only once on job with multiple attempts that ends in failure.', async () => {
1868+
1869+
const queue = await QueueFactory();
1870+
queue.flushQueue();
1871+
const jobName = 'job-name';
1872+
let jobAttemptCounter = 0;
1873+
let onFailureFiredCounter = 0;
1874+
let onFailedFiredCounter = 0;
1875+
let onCompleteFiredCounter = 0;
1876+
const attempts = 3;
1877+
1878+
queue.addWorker(jobName, async (id, payload) => {
1879+
1880+
jobAttemptCounter++;
1881+
1882+
// Simulate work that fails
1883+
await new Promise((resolve, reject) => {
1884+
setTimeout(() => {
1885+
reject(new Error('Job failed.'));
1886+
}, 0);
1887+
});
1888+
1889+
}, {
1890+
1891+
onFailure: (id, payload) => {
1892+
1893+
onFailureFiredCounter++;
1894+
1895+
},
1896+
onFailed: (id, payload) => {
1897+
1898+
onFailedFiredCounter++;
1899+
1900+
},
1901+
onComplete: (id, payload) => {
1902+
1903+
onCompleteFiredCounter++;
1904+
1905+
}
1906+
});
1907+
1908+
// Create a job
1909+
queue.createJob(jobName, { random: 'this is 1st random data' }, {
1910+
attempts
1911+
}, false);
1912+
1913+
jobAttemptCounter.should.equal(0);
1914+
await queue.start();
1915+
onFailureFiredCounter.should.equal(attempts);
1916+
onFailedFiredCounter.should.equal(1);
1917+
jobAttemptCounter.should.equal(attempts);
1918+
onCompleteFiredCounter.should.equal(1);
1919+
1920+
});
1921+
1922+
it('onStart, onSuccess, onComplete Job lifecycle callbacks do not block job processing.', async () => {
1923+
1924+
const queue = await QueueFactory();
1925+
queue.flushQueue();
1926+
const jobName = 'job-name';
1927+
let workTracker = [];
1928+
let tracker = [];
1929+
1930+
queue.addWorker(jobName, async (id, payload) => {
1931+
1932+
// Simulate work
1933+
await new Promise((resolve) => {
1934+
workTracker.push(payload.random);
1935+
tracker.push('job processed');
1936+
setTimeout(resolve, 0);
1937+
});
1938+
1939+
}, {
1940+
1941+
onStart: async (id, payload) => {
1942+
1943+
// wait a bit
1944+
await new Promise((resolve) => {
1945+
setTimeout(() => {
1946+
tracker.push('onStart completed.');
1947+
}, 1000);
1948+
});
1949+
1950+
},
1951+
onSuccess: async (id, payload) => {
1952+
1953+
// wait a bit
1954+
await new Promise((resolve) => {
1955+
setTimeout(() => {
1956+
tracker.push('onSuccess completed.');
1957+
}, 1000);
1958+
});
1959+
1960+
},
1961+
onComplete: async (id, payload) => {
1962+
1963+
// wait a bit
1964+
await new Promise((resolve) => {
1965+
setTimeout(() => {
1966+
tracker.push('onComplete completed.');
1967+
}, 1000);
1968+
});
1969+
1970+
}
1971+
});
1972+
1973+
// Create a job
1974+
queue.createJob(jobName, { random: 'this is 1st random data' }, {}, false);
1975+
queue.createJob(jobName, { random: 'this is 2nd random data' }, {}, false);
1976+
queue.createJob(jobName, { random: 'this is 3rd random data' }, {}, false);
1977+
queue.createJob(jobName, { random: 'this is 4th random data' }, {}, false);
1978+
queue.createJob(jobName, { random: 'this is 5th random data' }, {}, false);
1979+
1980+
await queue.start();
1981+
1982+
// Ensure all jobs processed.
1983+
workTracker.should.containDeep([
1984+
'this is 1st random data',
1985+
'this is 2nd random data',
1986+
'this is 4th random data',
1987+
'this is 3rd random data',
1988+
'this is 5th random data'
1989+
]);
1990+
1991+
// Since lifecycle callbacks take a second to process,
1992+
// queue should churn through all jobs well before any of the lifecycle
1993+
// callbacks complete.
1994+
const firstFive = tracker.slice(0, 5);
1995+
firstFive.should.deepEqual([
1996+
'job processed',
1997+
'job processed',
1998+
'job processed',
1999+
'job processed',
2000+
'job processed'
2001+
]);
2002+
2003+
});
2004+
2005+
it('onFailure, onFailed Job lifecycle callbacks do not block job processing.', async () => {
2006+
2007+
const queue = await QueueFactory();
2008+
queue.flushQueue();
2009+
const jobName = 'job-name';
2010+
let workTracker = [];
2011+
let tracker = [];
2012+
2013+
queue.addWorker(jobName, async (id, payload) => {
2014+
2015+
// Simulate failure
2016+
await new Promise((resolve, reject) => {
2017+
workTracker.push(payload.random);
2018+
setTimeout(() => {
2019+
tracker.push('job attempted');
2020+
reject(new Error('job failed'));
2021+
}, 0);
2022+
});
2023+
2024+
}, {
2025+
onFailure: async (id, payload) => {
2026+
2027+
// wait a bit
2028+
await new Promise((resolve) => {
2029+
setTimeout(() => {
2030+
tracker.push('onFailure completed.');
2031+
}, 1000);
2032+
});
2033+
2034+
},
2035+
onFailed: async (id, payload) => {
2036+
2037+
// wait a bit
2038+
await new Promise((resolve) => {
2039+
setTimeout(() => {
2040+
tracker.push('onFailed completed.');
2041+
}, 1000);
2042+
});
2043+
2044+
}
2045+
});
2046+
2047+
// Create a job
2048+
queue.createJob(jobName, { random: 'this is 1st random data' }, {}, false);
2049+
queue.createJob(jobName, { random: 'this is 2nd random data' }, {}, false);
2050+
queue.createJob(jobName, { random: 'this is 3rd random data' }, {}, false);
2051+
queue.createJob(jobName, { random: 'this is 4th random data' }, {}, false);
2052+
queue.createJob(jobName, { random: 'this is 5th random data' }, {}, false);
2053+
2054+
await queue.start();
2055+
2056+
// Ensure all jobs started to process (even though they are failed).
2057+
workTracker.should.containDeep([
2058+
'this is 1st random data',
2059+
'this is 2nd random data',
2060+
'this is 4th random data',
2061+
'this is 3rd random data',
2062+
'this is 5th random data'
2063+
]);
2064+
2065+
// Since lifecycle callbacks take a second to process,
2066+
// queue should churn through all jobs well before any of the lifecycle
2067+
// callbacks complete.
2068+
const firstFive = tracker.slice(0, 5);
2069+
firstFive.should.deepEqual([
2070+
'job attempted',
2071+
'job attempted',
2072+
'job attempted',
2073+
'job attempted',
2074+
'job attempted'
2075+
]);
2076+
2077+
});
2078+
15842079
});

‎tests/Worker.test.js

+165-3
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,32 @@ describe('Models/Worker', function() {
3535
worker.addWorker('test-job-one', async () => {});
3636

3737
const workerOptions = {
38-
concurrency: 3
38+
concurrency: 3,
39+
onStart: async (id, payload) => {}
3940
};
4041
worker.addWorker('test-job-two', async () => {}, workerOptions);
4142

4243
// first worker is added with default options.
4344
Worker.workers['test-job-one'].should.be.a.Function();
4445
Worker.workers['test-job-one'].options.should.deepEqual({
45-
concurrency: 1
46+
concurrency: 1,
47+
onStart: null,
48+
onSuccess: null,
49+
onFailure: null,
50+
onFailed: null,
51+
onComplete: null
4652
});
4753

4854
// second worker is added with new concurrency option.
4955
Worker.workers['test-job-two'].should.be.a.Function();
50-
Worker.workers['test-job-two'].options.should.deepEqual(workerOptions);
56+
Worker.workers['test-job-two'].options.should.deepEqual({
57+
concurrency: workerOptions.concurrency,
58+
onStart: workerOptions.onStart,
59+
onSuccess: null,
60+
onFailure: null,
61+
onFailed: null,
62+
onComplete: null
63+
});
5164

5265
});
5366

@@ -195,4 +208,153 @@ describe('Models/Worker', function() {
195208

196209
});
197210

211+
it('#executeJobLifecycleCallback() should execute a job lifecycle method correctly.', async () => {
212+
213+
let onStartCalled = false;
214+
let testPassed = false;
215+
216+
const job = {
217+
id: 'd21dca87-435c-4533-b0af-ed9844e6b827',
218+
name: 'test-job-one',
219+
payload: JSON.stringify({
220+
key: 'value'
221+
}),
222+
data: JSON.stringify({
223+
timeout: 0,
224+
attempts: 1
225+
}),
226+
priority: 0,
227+
active: false,
228+
created: new Date(),
229+
failed: null
230+
};
231+
232+
const worker = new Worker();
233+
234+
worker.addWorker('test-job-one', async () => {}, {
235+
onStart: (id, payload) => {
236+
237+
onStartCalled = true;
238+
239+
// Verify params passed correctly off job and payload JSON has been parsed.
240+
id.should.equal(job.id);
241+
payload.should.deepEqual({
242+
key: 'value'
243+
});
244+
245+
// Explicitly mark test as passed because the assertions
246+
// directly above will be caught in the try/catch statement within
247+
// executeJobLifecycleCallback() if they throw an error. While any thrown errors will be
248+
// output to console, the test will still pass so won't be caught by CI testing.
249+
testPassed = true;
250+
251+
}
252+
});
253+
254+
onStartCalled.should.equal(false);
255+
const payload = JSON.parse(job.payload); // Payload JSON is always parsed by Queue model before passing to executeJobLifecycleCallback();
256+
await worker.executeJobLifecycleCallback('onStart', job.name, job.id, payload);
257+
onStartCalled.should.equal(true);
258+
testPassed.should.equal(true);
259+
260+
});
261+
262+
it('#executeJobLifecycleCallback() should throw an error on invalid job lifecycle name.', async () => {
263+
264+
let onStartCalled = false;
265+
let testPassed = true;
266+
267+
const job = {
268+
id: 'd21dca87-435c-4533-b0af-ed9844e6b827',
269+
name: 'test-job-one',
270+
payload: JSON.stringify({
271+
key: 'value'
272+
}),
273+
data: JSON.stringify({
274+
timeout: 0,
275+
attempts: 1
276+
}),
277+
priority: 0,
278+
active: false,
279+
created: new Date(),
280+
failed: null
281+
};
282+
283+
const worker = new Worker();
284+
285+
worker.addWorker('test-job-one', async () => {}, {
286+
onStart: (id, payload) => {
287+
288+
testPassed = false;
289+
throw new Error('Should not be called.');
290+
291+
}
292+
});
293+
294+
onStartCalled.should.equal(false);
295+
const payload = JSON.parse(job.payload); // Payload JSON is always parsed by Queue model before passing to executeJobLifecycleCallback();
296+
try {
297+
await worker.executeJobLifecycleCallback('onInvalidLifecycleName', job.name, job.id, payload);
298+
} catch (error) {
299+
error.should.deepEqual(new Error('Invalid job lifecycle callback name.'));
300+
}
301+
onStartCalled.should.equal(false);
302+
testPassed.should.equal(true);
303+
304+
});
305+
306+
it('#executeJobLifecycleCallback() job lifecycle callbacks that error out should gracefully degrade to console error.', async () => {
307+
308+
let onStartCalled = false;
309+
let consoleErrorCalled = false;
310+
311+
// Cache console error.
312+
const consoleErrorCache = console.error; // eslint-disable-line no-console
313+
314+
// Overwrite console.error to make sure it gets called on job lifecycle
315+
// callback error and is passed the error object.
316+
console.error = (errorObject) => {
317+
consoleErrorCalled = true;
318+
errorObject.should.deepEqual(new Error('Something failed catastrophically!'));
319+
};
320+
321+
const job = {
322+
id: 'd21dca87-435c-4533-b0af-ed9844e6b827',
323+
name: 'test-job-one',
324+
payload: JSON.stringify({
325+
key: 'value'
326+
}),
327+
data: JSON.stringify({
328+
timeout: 0,
329+
attempts: 1
330+
}),
331+
priority: 0,
332+
active: false,
333+
created: new Date(),
334+
failed: null
335+
};
336+
337+
const worker = new Worker();
338+
339+
worker.addWorker('test-job-one', async () => {}, {
340+
onStart: (id, payload) => {
341+
342+
onStartCalled = true;
343+
throw new Error('Something failed catastrophically!');
344+
345+
}
346+
});
347+
348+
onStartCalled.should.equal(false);
349+
consoleErrorCalled.should.equal(false);
350+
const payload = JSON.parse(job.payload); // Payload JSON is always parsed by Queue model before passing to executeJobLifecycleCallback();
351+
await worker.executeJobLifecycleCallback('onStart', job.name, job.id, payload);
352+
onStartCalled.should.equal(true);
353+
consoleErrorCalled.should.equal(true);
354+
355+
// Re-apply console.error.
356+
console.error = consoleErrorCache;
357+
358+
});
359+
198360
});

0 commit comments

Comments
 (0)
Please sign in to comment.