Skip to content

Commit

Permalink
Add --parallel-limit <#processes> to oao all and oao run-script
Browse files Browse the repository at this point in the history
…, to limit concurrency when running things in parallel (#69)
  • Loading branch information
guigrpa committed Mar 14, 2018
1 parent 00dd14b commit cbd5715
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 23 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
* `oao run-script`: add the possibility to generate more than one job per subpackage (e.g. `oao run-script test:*`).
* Add `--parallel-limit <#processes>` to `oao all` and `oao run-script`, to limit concurrency when running things in parallel (#69).
* `oao run-script`: add the possibility to generate more than one job per subpackage (e.g. `oao run-script test:*`) (#70).

## 1.4.1 (2018-3-12)

Expand Down
3 changes: 0 additions & 3 deletions ROADMAP.md
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
* Add a limit to parallel jobs
* Make parallel jobs know which are their input requirements

*Nothing else identified right now*
1 change: 1 addition & 0 deletions src/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Options = {
tree?: boolean,
parallel?: boolean,
parallelLogs?: boolean,
parallelLimit?: number,
ignoreErrors?: boolean,
relativeTime?: boolean,
};
Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ createCommand('all <command>', 'Run a given command on all sub-packages')
'--no-parallel-logs',
'use chronological logging, even in parallel mode'
)
.option('--parallel-limit <#processes>', 'max number of processes to launch')
.option(
'--ignore-errors',
'do not stop even if there are errors in some packages'
Expand All @@ -262,6 +263,7 @@ createCommand('run-script <command>', 'Run a given script on all sub-packages')
'--no-parallel-logs',
'use chronological logging, even in parallel mode'
)
.option('--parallel-limit <#processes>', 'max number of processes to launch')
.option(
'--ignore-errors',
'do not stop even if there are errors in some packages'
Expand Down
1 change: 1 addition & 0 deletions src/runScript.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Options = {
tree?: boolean,
parallel?: boolean,
parallelLogs?: boolean,
parallelLimit?: number,
ignoreErrors?: boolean,
relativeTime?: boolean,
};
Expand Down
7 changes: 6 additions & 1 deletion src/utils/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ const shortenName = (name: string, maxLen: number): string => {

const isObject = (o: any) => !!o && o.constructor === Object;

export { shortenName, isObject };
const delay = ms =>
new Promise(resolve => {
setTimeout(resolve, ms);
});

export { shortenName, isObject, delay };
83 changes: 65 additions & 18 deletions src/utils/multiRun.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// @flow

/* eslint-disable no-constant-condition */

import { removeAllListeners, addListener } from 'storyboard';
import parallelConsoleListener from 'storyboard-listener-console-parallel';
import { readAllSpecs } from './readSpecs';
import { exec } from './shell';
import { shortenName } from './helpers';
import { shortenName, delay } from './helpers';
import calcGraph from './calcGraph';

type Options = {
Expand All @@ -13,22 +15,25 @@ type Options = {
tree?: boolean,
parallel?: boolean,
parallelLogs?: boolean,
parallelLimit?: number,
ignoreErrors?: boolean,
relativeTime?: boolean,
};

type Job = {
cmd: string,
cwd: string,
bareLogs: ?boolean,
storySrc: string,
bareLogs: boolean,
storySrc: ?string,
status: 'idle' | 'running' | 'done',
pkgName: string,
preconditions: Array<string>, // jobs for these subpackages should have finished
promise: Promise<any>,
promise?: Promise<any>,
};
type JobCreator = (specs: Object) => Array<string>;

const DELAY_MAIN_LOOP = 20; // [ms]

// ------------------------------------------------
// Main
// ------------------------------------------------
Expand All @@ -39,6 +44,7 @@ const multiRun = async (
tree,
parallel,
parallelLogs,
parallelLimit,
ignoreErrors,
relativeTime,
}: Options,
Expand Down Expand Up @@ -76,7 +82,7 @@ const multiRun = async (
if (!parallel) {
await runSerially(allJobs, { ignoreErrors });
} else {
await runInParallel(allJobs, { ignoreErrors, parallelLogs });
await runInParallel(allJobs, { ignoreErrors, parallelLogs, parallelLimit });
}
};

Expand All @@ -85,28 +91,42 @@ const multiRun = async (
// ------------------------------------------------
const runSerially = async (allJobs, { ignoreErrors }) => {
for (let i = 0; i < allJobs.length; i++) {
const { cmd, cwd, bareLogs, storySrc } = allJobs[i];
let promise = exec(cmd, { cwd, bareLogs, storySrc });
if (ignoreErrors) promise = promise.catch(() => {});
await promise;
await executeJob(allJobs[i], { ignoreErrors });
}
};

const runInParallel = async (allJobs, { ignoreErrors, parallelLogs }) => {
const allPromises = [];
for (let i = 0; i < allJobs.length; i++) {
const { cmd, cwd, bareLogs, storySrc } = allJobs[i];
let promise = exec(cmd, { cwd, bareLogs, storySrc });
if (ignoreErrors) promise = promise.catch(() => {});
allPromises.push(promise);
const runInParallel = async (
allJobs,
{ ignoreErrors, parallelLogs, parallelLimit }
) => {
const maxConcurrency = parallelLimit || Infinity;
while (true) {
const runningJobCount = getRunningJobs(allJobs).length;
if (runningJobCount >= maxConcurrency) {
await delay(DELAY_MAIN_LOOP);
continue;
}
const job = getNextJob(allJobs);
if (job) {
executeJob(job, { ignoreErrors });
} else if (getIdleJobs(allJobs).length === 0) {
break;
} else {
// We may still have pending jobs, but cannot run yet (they depend on
// others). Wait a bit...
await delay(DELAY_MAIN_LOOP);
}
}

// If parallel logs are enabled, we have to manually exit.
// If parallel logs are enabled, we have to manually exit (`process.exit`).
// We should also show the error again, since the parallel console
// most probably swallowed it or only showed the final part.
if (parallelLogs) {
const pendingPromises = allJobs
.filter(o => o.status !== 'done')
.map(job => job.promise);
try {
await Promise.all(allPromises);
await Promise.all(pendingPromises);
} catch (err) {
if (err.stderr) {
console.error(err.message); // eslint-disable-line
Expand All @@ -120,6 +140,33 @@ const runInParallel = async (allJobs, { ignoreErrors, parallelLogs }) => {
}
};

// ------------------------------------------------
// Helpers
// ------------------------------------------------
/* eslint-disable no-param-reassign */
const executeJob = (job, { ignoreErrors }) => {
job.promise = _executeJob(job, { ignoreErrors });
};

const _executeJob = async (job, { ignoreErrors }) => {
const { cmd, cwd, bareLogs, storySrc } = job;
const promise = exec(cmd, { cwd, bareLogs, storySrc });
job.status = 'running';
try {
await promise;
job.status = 'done';
} catch (err) {
job.status = 'done';
if (!ignoreErrors) throw err;
}
};
/* eslint-enable no-param-reassign */

const getNextJob = jobs =>
jobs.find(job => job.status !== 'done' && job.status !== 'running');
const getRunningJobs = jobs => jobs.filter(job => job.status === 'running');
const getIdleJobs = jobs => jobs.filter(job => job.status === 'idle');

// ------------------------------------------------
// Public
// ------------------------------------------------
Expand Down

0 comments on commit cbd5715

Please sign in to comment.