Adds retries to the queue for delta uploads. (#282)

This commit is contained in:
Michael Bleigh
2018-07-06 13:28:00 -07:00
committed by GitHub
parent 44d3c24483
commit 484e784777
3 changed files with 58 additions and 18 deletions

View File

@@ -26,19 +26,13 @@ module.exports = new Command("hosting:disable")
return Promise.resolve();
}
return api.request(
"POST",
"/v1/projects/" + encodeURIComponent(options.project) + "/releases",
{
auth: true,
data: {
hosting: {
disabled: true,
},
},
origin: api.deployOrigin,
}
);
return api.request("POST", "/v1beta1/sites/" + options.instance + "/releases", {
auth: true,
data: {
type: "SITE_DISABLE",
},
origin: api.hostingApiOrigin,
});
})
.then(function() {
if (options.confirm) {

View File

@@ -37,18 +37,23 @@ class Uploader {
this.gzipLevel = options.gzipLevel || 9;
this.hashQueue = new Queue({
name: "hashQueue",
concurrency: options.hashConcurrency || 50,
handler: this.hashHandler.bind(this),
});
this.populateBatchSize = options.populateBatchSize || 1000;
this.populateBatch = {};
this.populateQueue = new Queue({
name: "populateQueue",
concurrency: options.populateConcurrency || 10,
handler: this.populateHandler.bind(this),
retries: 3,
});
this.uploadQueue = new Queue({
name: "uploadQueue",
concurrency: options.uploadConcurrency || 200,
handler: this.uploadHandler.bind(this),
retries: 5,
});
this.public = options.public || this.cwd;
this.files = options.files;
@@ -168,7 +173,7 @@ class Uploader {
queuePopulate() {
const pop = this.populateBatch;
this.populateQueue.add(pop);
this.populateQueue.add(pop, "batch" + (this.populateQueue.tasks.length + 1));
this.populateBatch = {};
this.populateQueue.process();
}

View File

@@ -2,8 +2,15 @@
const logger = require("./logger");
function _backoff(retryNumber, delay) {
return new Promise(function(resolve) {
setTimeout(resolve, delay * Math.pow(2, retryNumber));
});
}
class Queue {
constructor(options) {
this.name = options.name || "queue";
this.concurrency = options.concurrency || 1;
this.handler =
options.handler ||
@@ -15,15 +22,23 @@ class Queue {
this.complete = 0;
this.success = 0;
this.errored = 0;
this.retried = 0;
this.tasks = [];
this.waits = [];
this.min = 9999999999;
this.max = 0;
this.avg = 0;
this.retries = options.retries || 0;
this.backoff = 200;
this.retryCounts = {};
this.closed = false;
this.finished = false;
}
taskName(task) {
return typeof task === "string" ? task : "index " + this.tasks.indexOf(task);
}
wait() {
const self = this;
const p = new Promise(function(resolve, reject) {
@@ -55,15 +70,19 @@ class Queue {
this.active >= this.concurrency ||
this.cursor === this.tasks.length
) {
return Promise.resolve();
return;
}
const t0 = Date.now();
const task = this.tasks[this.cursor];
this.cursor++;
this.active++;
this.handle(task);
}
handle(task) {
const t0 = Date.now();
const self = this;
return this.handler(task)
this.handler(task)
.then(function() {
const dt = Date.now() - t0;
if (dt < self.min) {
@@ -80,10 +99,31 @@ class Queue {
self.process();
})
.catch(function(err) {
if (self.retries > 0) {
self.retryCounts[task] = self.retryCounts[task] || 0;
if (self.retryCounts[task] < self.retries) {
self.retryCounts[task]++;
self.retried++;
return _backoff(self.retryCounts[task], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(task));
return self.handle(task);
});
}
}
self.errored++;
self.complete++;
self.active--;
logger.debug("[queue] Error on task", task, ":", err);
if (self.retryCounts[task] > 0) {
logger.debug(
"[" + self.name + "] Retries exhausted for task",
self.taskName(task),
":",
err
);
} else {
logger.debug("[" + self.name + "] Error on task", self.taskName(task), ":", err);
}
self._finish(err);
});
}
@@ -97,6 +137,7 @@ class Queue {
complete: this.complete,
success: this.success,
errored: this.errored,
retried: this.retried,
total: this.tasks.length,
elapsed: Date.now() - this.startTime,
};