TaskQueue.js
5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/**
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
* @providesModule TaskQueue
* @flow
*/
'use strict';
const infoLog = require('infoLog');
const invariant = require('fbjs/lib/invariant');
type SimpleTask = {
name: string,
run: () => void,
};
type PromiseTask = {
name: string,
gen: () => Promise<any>,
};
export type Task = Function | SimpleTask | PromiseTask;
const DEBUG = false;
/**
* TaskQueue - A system for queueing and executing a mix of simple callbacks and
* trees of dependent tasks based on Promises. No tasks are executed unless
* `processNext` is called.
*
* `enqueue` takes a Task object with either a simple `run` callback, or a
* `gen` function that returns a `Promise` and puts it in the queue. If a gen
* function is supplied, then the promise it returns will block execution of
* tasks already in the queue until it resolves. This can be used to make sure
* the first task is fully resolved (including asynchronous dependencies that
* also schedule more tasks via `enqueue`) before starting on the next task.
* The `onMoreTasks` constructor argument is used to inform the owner that an
* async task has resolved and that the queue should be processed again.
*
* Note: Tasks are only actually executed with explicit calls to `processNext`.
*/
class TaskQueue {
/**
* TaskQueue instances are self contained and independent, so multiple tasks
* of varying semantics and priority can operate together.
*
* `onMoreTasks` is invoked when `PromiseTask`s resolve if there are more
* tasks to process.
*/
constructor({onMoreTasks}: {onMoreTasks: () => void}) {
this._onMoreTasks = onMoreTasks;
this._queueStack = [{tasks: [], popable: false}];
}
/**
* Add a task to the queue. It is recommended to name your tasks for easier
* async debugging. Tasks will not be executed until `processNext` is called
* explicitly.
*/
enqueue(task: Task): void {
this._getCurrentQueue().push(task);
}
enqueueTasks(tasks: Array<Task>): void {
tasks.forEach((task) => this.enqueue(task));
}
cancelTasks(tasksToCancel: Array<Task>): void {
// search through all tasks and remove them.
this._queueStack = this._queueStack
.map((queue) => ({
...queue,
tasks: queue.tasks.filter((task) => tasksToCancel.indexOf(task) === -1),
}))
.filter((queue, idx) => (queue.tasks.length > 0 || idx === 0));
}
/**
* Check to see if `processNext` should be called.
*
* @returns {boolean} Returns true if there are tasks that are ready to be
* processed with `processNext`, or returns false if there are no more tasks
* to be processed right now, although there may be tasks in the queue that
* are blocked by earlier `PromiseTask`s that haven't resolved yet.
* `onMoreTasks` will be called after each `PromiseTask` resolves if there are
* tasks ready to run at that point.
*/
hasTasksToProcess(): bool {
return this._getCurrentQueue().length > 0;
}
/**
* Executes the next task in the queue.
*/
processNext(): void {
const queue = this._getCurrentQueue();
if (queue.length) {
const task = queue.shift();
try {
if (task.gen) {
DEBUG && infoLog('genPromise for task ' + task.name);
this._genPromise((task: any)); // Rather than annoying tagged union
} else if (task.run) {
DEBUG && infoLog('run task ' + task.name);
task.run();
} else {
invariant(
typeof task === 'function',
'Expected Function, SimpleTask, or PromiseTask, but got:\n' +
JSON.stringify(task, null, 2)
);
DEBUG && infoLog('run anonymous task');
task();
}
} catch (e) {
e.message = 'TaskQueue: Error with task ' + (task.name || '') + ': ' +
e.message;
throw e;
}
}
}
_queueStack: Array<{tasks: Array<Task>, popable: bool}>;
_onMoreTasks: () => void;
_getCurrentQueue(): Array<Task> {
const stackIdx = this._queueStack.length - 1;
const queue = this._queueStack[stackIdx];
if (queue.popable &&
queue.tasks.length === 0 &&
this._queueStack.length > 1) {
this._queueStack.pop();
DEBUG && infoLog('popped queue: ', {stackIdx, queueStackSize: this._queueStack.length});
return this._getCurrentQueue();
} else {
return queue.tasks;
}
}
_genPromise(task: PromiseTask) {
// Each async task pushes it's own queue onto the queue stack. This
// effectively defers execution of previously queued tasks until the promise
// resolves, at which point we allow the new queue to be popped, which
// happens once it is fully processed.
this._queueStack.push({tasks: [], popable: false});
const stackIdx = this._queueStack.length - 1;
DEBUG && infoLog('push new queue: ', {stackIdx});
DEBUG && infoLog('exec gen task ' + task.name);
task.gen()
.then(() => {
DEBUG && infoLog(
'onThen for gen task ' + task.name,
{stackIdx, queueStackSize: this._queueStack.length},
);
this._queueStack[stackIdx].popable = true;
this.hasTasksToProcess() && this._onMoreTasks();
})
.catch((ex) => {
ex.message = `TaskQueue: Error resolving Promise in task ${task.name}: ${ex.message}`;
throw ex;
})
.done();
}
}
module.exports = TaskQueue;