Skip to content

Commit 8d4f764

Browse files
committed
Added initial package files.
1 parent 3b0cdcb commit 8d4f764

File tree

8 files changed

+1179
-1
lines changed

8 files changed

+1179
-1
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/node_modules
22
/coverage
3-
/.idea
3+
/.idea
4+
/reactNativeQueue.realm*

Models/Queue.js

+254
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/**
2+
*
3+
* Queue Model
4+
*
5+
* Job Realm Schema defined in ../config/Database
6+
*
7+
*/
8+
9+
import Database from '../config/Database';
10+
import uuid from 'react-native-uuid';
11+
import Worker from './Worker';
12+
import promiseReflect from 'promise-reflect';
13+
14+
15+
export class Queue {
16+
17+
constructor() {
18+
this.realm = null;
19+
this.worker = new Worker();
20+
this.status = 'inactive';
21+
}
22+
23+
async init() {
24+
if (this.realm === null) {
25+
this.realm = await Database.getRealmInstance();
26+
}
27+
}
28+
29+
addWorker(jobName, worker, options = {}) {
30+
this.worker.addWorker(jobName, worker, options);
31+
}
32+
33+
removeWorker(jobName) {
34+
this.worker.removeWorker(jobName);
35+
}
36+
37+
createJob(name, payload = {}, options = {}, startQueue = true) {
38+
39+
if (!name) {
40+
throw new Error('Job name must be supplied.');
41+
}
42+
43+
this.realm.write(() => {
44+
45+
this.realm.create('Job', {
46+
id: uuid.v4(),
47+
name,
48+
payload: JSON.stringify(payload),
49+
data: JSON.stringify({
50+
timeout: (options.timeout > 0) ? options.timeout : 0,
51+
attempts: options.attempts || 1
52+
}),
53+
priority: options.priority || 0,
54+
active: false,
55+
created: new Date(),
56+
failed: null
57+
});
58+
59+
});
60+
61+
// Start queue on job creation if it isn't running by default.
62+
if (startQueue && this.status == 'inactive') {
63+
this.start();
64+
}
65+
66+
}
67+
68+
async start() {
69+
70+
// If queue is already running, don't fire up concurrent loop.
71+
if (this.status == 'active') {
72+
return;
73+
}
74+
75+
this.status = 'active';
76+
77+
let concurrentJobs = await this.getConcurrentJobs();
78+
79+
while (this.status == 'active' && concurrentJobs.length) {
80+
81+
// Loop over jobs and process them concurrently.
82+
const processingJobs = concurrentJobs.map( job => {
83+
return this.processJob(job);
84+
});
85+
86+
// Promise Reflect ensures all processingJobs resolve so
87+
// we don't break await early if one of the jobs fails.
88+
await Promise.all(processingJobs.map(promiseReflect));
89+
90+
// Get next batch of jobs.
91+
concurrentJobs = await this.getConcurrentJobs();
92+
93+
}
94+
95+
this.status = 'inactive';
96+
97+
}
98+
99+
stop() {
100+
this.status = 'inactive';
101+
}
102+
103+
async getJobs(sync = false) {
104+
105+
if (sync) {
106+
107+
let jobs = null;
108+
this.realm.write(() => {
109+
110+
jobs = this.realm.objects('Job');
111+
112+
});
113+
114+
return jobs;
115+
116+
} else {
117+
return await this.realm.objects('Job');
118+
}
119+
120+
}
121+
122+
async getConcurrentJobs() {
123+
124+
let concurrentJobs = [];
125+
126+
this.realm.write(() => {
127+
128+
// Get next job from queue.
129+
let nextJob = null;
130+
131+
let jobs = this.realm.objects('Job')
132+
.filtered('active == FALSE AND failed == null')
133+
.sorted([['priority', true], ['created', false]]);
134+
135+
if (jobs.length) {
136+
nextJob = jobs[0];
137+
}
138+
139+
// If next job exists, get concurrent related jobs appropriately.
140+
if (nextJob) {
141+
142+
const concurrency = this.worker.getConcurrency(nextJob.name);
143+
144+
const allRelatedJobs = this.realm.objects('Job')
145+
.filtered('name == "'+ nextJob.name +'" AND active == FALSE AND failed == null')
146+
.sorted([['priority', true], ['created', false]]);
147+
148+
let jobsToMarkActive = allRelatedJobs.slice(0, concurrency);
149+
150+
// Grab concurrent job ids to reselect jobs as marking these jobs as active will remove
151+
// them from initial selection when write transaction exits.
152+
// See: https://stackoverflow.com/questions/47359368/does-realm-support-select-for-update-style-read-locking/47363356#comment81772710_47363356
153+
const concurrentJobIds = jobsToMarkActive.map( job => job.id);
154+
155+
// Mark concurrent jobs as active
156+
jobsToMarkActive = jobsToMarkActive.map( job => {
157+
job.active = true;
158+
});
159+
160+
// Reselect now-active concurrent jobs by id.
161+
const query = concurrentJobIds.map( jobId => 'id == "' + jobId + '"').join(' OR ');
162+
const reselectedJobs = this.realm.objects('Job')
163+
.filtered(query)
164+
.sorted([['priority', true], ['created', false]]);
165+
166+
concurrentJobs = reselectedJobs.slice(0, concurrency);
167+
168+
}
169+
170+
});
171+
172+
return concurrentJobs;
173+
174+
}
175+
176+
async processJob(job) {
177+
178+
try {
179+
180+
await this.worker.executeJob(job);
181+
182+
// On job completion, remove job
183+
this.realm.write(() => {
184+
185+
this.realm.delete(job);
186+
187+
});
188+
189+
} catch (error) {
190+
191+
// Handle job failure logic, including retries.
192+
this.realm.write(() => {
193+
194+
// Increment failed attempts number
195+
let jobData = JSON.parse(job.data);
196+
197+
if (!jobData.failedAttempts) {
198+
jobData.failedAttempts = 1;
199+
} else {
200+
jobData.failedAttempts++;
201+
}
202+
203+
job.data = JSON.stringify(jobData);
204+
205+
// Reset active status
206+
job.active = false;
207+
208+
// Mark job as failed if too many attempts
209+
if (jobData.failedAttempts >= jobData.attempts) {
210+
job.failed = new Date();
211+
}
212+
213+
});
214+
215+
}
216+
217+
}
218+
219+
flushQueue(jobName = null) {
220+
221+
if (jobName) {
222+
223+
this.realm.write(() => {
224+
225+
let jobs = this.realm.objects('Job')
226+
.filtered('name == "' + jobName + '"');
227+
228+
if (jobs.length) {
229+
this.realm.delete(jobs);
230+
}
231+
232+
});
233+
234+
} else {
235+
this.realm.write(() => {
236+
237+
this.realm.deleteAll();
238+
239+
});
240+
}
241+
242+
}
243+
244+
245+
}
246+
247+
export default async function queueFactory() {
248+
249+
const queue = new Queue();
250+
await queue.init();
251+
252+
return queue;
253+
254+
}

Models/Worker.js

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Created by mayor on 11/16/17.
3+
*/
4+
5+
6+
export default class Worker {
7+
8+
static workers = {};
9+
10+
addWorker(jobName, worker, options = {}) {
11+
12+
// Attach options to worker
13+
worker.options = {
14+
concurrency: options.concurrency || 1
15+
};
16+
17+
Worker.workers[jobName] = worker;
18+
}
19+
20+
removeWorker(jobName) {
21+
delete Worker.workers[jobName];
22+
}
23+
24+
getConcurrency(jobName) {
25+
26+
// If no worker assigned to job name, throw error.
27+
if (!Worker.workers[jobName]) {
28+
throw new Error('Job ' + jobName + ' does not have a worker assigned to it.');
29+
}
30+
31+
return Worker.workers[jobName].options.concurrency;
32+
33+
}
34+
35+
async executeJob(job) {
36+
37+
// If no worker assigned to job name, throw error.
38+
if (!Worker.workers[job.name]) {
39+
throw new Error('Job ' + job.name + ' does not have a worker assigned to it.');
40+
}
41+
42+
// Timeout Logic
43+
const jobTimeout = JSON.parse(job.data).timeout;
44+
45+
if (jobTimeout > 0) {
46+
47+
let timeoutPromise = new Promise((resolve, reject) => {
48+
49+
setTimeout(() => {
50+
reject(new Error('TIMEOUT: Job id: ' + job.id + ' timed out in ' + jobTimeout + 'ms.'));
51+
}, jobTimeout);
52+
53+
});
54+
55+
await Promise.race([timeoutPromise, Worker.workers[job.name](job.id, JSON.parse(job.payload))]);
56+
57+
} else {
58+
await Worker.workers[job.name](job.id, JSON.parse(job.payload));
59+
}
60+
61+
}
62+
63+
};

config/Database.js

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Realm database bootstrap
3+
*/
4+
5+
import { Config } from './config';
6+
import Realm from 'realm';
7+
8+
const JobSchema = {
9+
name: 'Job',
10+
primaryKey: 'id',
11+
properties: {
12+
id: 'string', // UUID.
13+
name: 'string', // Job name to be matched with handler function.
14+
payload: 'string', // Job payload stored as JSON.
15+
data: 'string', // Store arbitrary data like "failed attempts" as JSON.
16+
priority: 'int', // -5 to 5 to indicate low to high priority.
17+
active: { type: 'bool', default: false},
18+
created: 'date', // Job creation timestamp.
19+
failed: 'date?' // Job failure timestamp (null until failure).
20+
}
21+
};
22+
23+
export default class Database {
24+
25+
static realmInstance = null; // Use a singleton connection to realm for performance.
26+
27+
static async getRealmInstance(options = {}) {
28+
29+
// Connect to realm if database singleton instance has not already been created.
30+
if (Database.realmInstance === null) {
31+
32+
Database.realmInstance = await Realm.open({
33+
path: options.realmPath || Config.REALM_PATH,
34+
schemaVersion: Config.REALM_SCHEMA_VERSION,
35+
schema: [JobSchema]
36+
37+
// Look up shouldCompactOnLaunch to auto-vacuum https://github.com/realm/realm-js/pull/1209/files
38+
39+
});
40+
41+
}
42+
43+
return Database.realmInstance;
44+
45+
}
46+
47+
};

config/config.js

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/**
2+
*
3+
* Config constants
4+
*
5+
*/
6+
7+
export const Config = {
8+
REALM_PATH: 'reactNativeQueue.realm',
9+
REALM_SCHEMA_VERSION: 0 // Must be incremented if data model updates.
10+
};

0 commit comments

Comments
 (0)