refactor queue.js so retryCount is per task (#988)

* refactor queue.js so retryCount is per task

* Add tests for queue; allow custom backoff

* add sinon types

* add test for two tasks having separate retry count

* added a couple of more tests

* added more asertions

* tests about taskName

* handle comments

* use callCountMap for testing concurrency
This commit is contained in:
Qinmao Zhang
2018-11-05 13:57:08 -08:00
committed by GitHub
parent 0fddfb3d02
commit eee904b553
4 changed files with 224 additions and 16 deletions

View File

@@ -106,6 +106,7 @@
"@types/glob": "^7.1.1",
"@types/mocha": "^5.2.5",
"@types/node": "^10.12.0",
"@types/sinon": "^5.0.5",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"coveralls": "^3.0.1",

View File

@@ -29,14 +29,15 @@ class Queue {
this.max = 0;
this.avg = 0;
this.retries = options.retries || 0;
this.backoff = 200;
this.backoff = typeof options.backoff == "number" ? options.backoff : 200;
this.retryCounts = {};
this.closed = false;
this.finished = false;
}
taskName(task) {
return typeof task === "string" ? task : "index " + this.tasks.indexOf(task);
taskName(cursorIndex) {
const task = this.tasks[cursorIndex];
return typeof task === "string" ? task : "index " + cursorIndex;
}
wait() {
@@ -73,13 +74,13 @@ class Queue {
return;
}
const task = this.tasks[this.cursor];
this.cursor++;
this.active++;
this.handle(task);
this.handle(this.cursor - 1);
}
handle(task) {
handle(cursorIndex) {
const task = this.tasks[cursorIndex];
const t0 = Date.now();
const self = this;
this.handler(task)
@@ -100,13 +101,13 @@ class Queue {
})
.catch(function(err) {
if (self.retries > 0) {
self.retryCounts[task] = self.retryCounts[task] || 0;
if (self.retryCounts[task] < self.retries) {
self.retryCounts[task]++;
self.retryCounts[cursorIndex] = self.retryCounts[cursorIndex] || 0;
if (self.retryCounts[cursorIndex] < self.retries) {
self.retryCounts[cursorIndex]++;
self.retried++;
return _backoff(self.retryCounts[task], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(task));
return self.handle(task);
return _backoff(self.retryCounts[cursorIndex], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(cursorIndex));
return self.handle(cursorIndex);
});
}
}
@@ -114,15 +115,15 @@ class Queue {
self.errored++;
self.complete++;
self.active--;
if (self.retryCounts[task] > 0) {
if (self.retryCounts[cursorIndex] > 0) {
logger.debug(
"[" + self.name + "] Retries exhausted for task",
self.taskName(task),
self.taskName(cursorIndex),
":",
err
);
} else {
logger.debug("[" + self.name + "] Error on task", self.taskName(task), ":", err);
logger.debug("[" + self.name + "] Error on task", self.taskName(cursorIndex), ":", err);
}
self._finish(err);
});

206
src/test/queue.spec.ts Normal file
View File

@@ -0,0 +1,206 @@
import * as chai from "chai";
import * as sinon from "sinon";
const { expect } = chai;
import Queue = require("../queue");
const TEST_ERROR = new Error("foobar");
describe("Queue", () => {
it("should ignore non-number backoff", () => {
const q = new Queue({
backoff: "not a number",
});
expect(q.backoff).to.equal(200);
});
it("should return the task as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});
const stringTask = "test task";
q.add(stringTask);
expect(q.taskName(0)).to.equal(stringTask);
});
it("should return the index as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});
q.add(2);
expect(q.taskName(0)).to.equal("index 0");
});
it("should handle function tasks", () => {
const task = sinon.stub().resolves();
const q = new Queue({});
q.add(task);
q.close();
return q.wait().then(() => {
expect(task.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(1);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(0);
});
});
it("should handle tasks", () => {
const handler = sinon.stub().resolves();
const q = new Queue({
handler,
});
q.add(4);
q.close();
return q.wait().then(() => {
expect(handler.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(1);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(0);
});
});
it("should not retry", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
handler,
retries: 0,
});
q.add(4);
q.close();
return q
.wait()
.then(() => {
throw new Error("handler should have rejected");
})
.catch((err: Error) => {
expect(err).to.equal(TEST_ERROR);
})
.then(() => {
expect(handler.callCount).to.equal(1);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(0);
expect(q.errored).to.equal(1);
expect(q.retried).to.equal(0);
});
});
it("should retry the number of retries, plus one", () => {
const handler = sinon.stub().rejects(TEST_ERROR);
const q = new Queue({
backoff: 0,
handler,
retries: 3,
});
q.add(4);
q.close();
return q
.wait()
.then(() => {
throw new Error("handler should have rejected");
})
.catch((err: Error) => {
expect(err).to.equal(TEST_ERROR);
})
.then(() => {
expect(handler.callCount).to.equal(4);
expect(q.complete).to.equal(1);
expect(q.success).to.equal(0);
expect(q.errored).to.equal(1);
expect(q.retried).to.equal(3);
});
});
it("should handle tasks in concurrency", () => {
const callCountMap = new Map<any, number>();
const handler = (task: any) => {
let count = callCountMap.get(task);
if (!count) {
count = 0;
}
count += 1;
callCountMap.set(task, count);
if (count > 2) {
return Promise.resolve();
}
return Promise.reject();
};
const q = new Queue({
backoff: 0,
concurrency: 2,
handler,
retries: 2,
});
q.add("1");
q.add("2");
q.add("3");
q.close();
return q
.wait()
.catch((err: Error) => {
throw new Error("handler should have passed ");
})
.then(() => {
expect(q.complete).to.equal(3);
expect(q.success).to.equal(3);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(6);
});
});
it("should retry the number of retries for mutiple identical tasks", () => {
const handler = sinon
.stub()
.rejects(TEST_ERROR)
.onCall(2)
.resolves(0)
.onCall(5)
.resolves(0)
.onCall(8)
.resolves(0);
const q = new Queue({
backoff: 0,
concurrency: 1, // this makes sure only one task is running at a time, so not flaky
handler,
retries: 2,
});
q.add(5);
q.add(5);
q.add(5);
q.close();
return q
.wait()
.catch((err: Error) => {
throw new Error("handler should have passed");
})
.then(() => {
expect(handler.callCount).to.equal(9);
expect(q.complete).to.equal(3);
expect(q.success).to.equal(3);
expect(q.errored).to.equal(0);
expect(q.retried).to.equal(6);
});
});
});

View File

@@ -7,7 +7,7 @@
"outDir": "lib",
"removeComments": true,
"sourceMap": true,
"target": "ES5"
"target": "ES6"
},
"include": [
"src/**/*"