Convert queue.js to TypeScript (#1000)

* Convert queue.js to TypeScript

* Update src/queue.ts

Co-Authored-By: mbleigh <mbleigh@mbleigh.com>

* Update src/queue.ts

Co-Authored-By: mbleigh <mbleigh@mbleigh.com>

* Update src/queue.ts

Co-Authored-By: mbleigh <mbleigh@mbleigh.com>

* Update src/deploy/hosting/uploader.js

Co-Authored-By: mbleigh <mbleigh@mbleigh.com>

* Address comments.
This commit is contained in:
Michael Bleigh
2018-11-06 08:20:44 -08:00
committed by Bryan Kendall
parent 645483aaf2
commit 16c511425f
14 changed files with 237 additions and 217 deletions

View File

@@ -1,4 +1,5 @@
{
"trailingComma": "es5",
"printWidth": 100
"printWidth": 100,
"arrowParens": "always"
}

View File

@@ -22,13 +22,13 @@ if (previews.emulators) {
VALID_TARGETS = ["functions", "hosting", "database", "firestore"];
}
var filterOnlyEmulators = only => {
var filterOnlyEmulators = (only) => {
if (!only) {
return [];
}
return _.intersection(
VALID_EMULATORS,
only.split(",").map(opt => {
only.split(",").map((opt) => {
return opt.split(":")[0];
})
);
@@ -46,7 +46,7 @@ module.exports = new Command("serve")
"--except <targets>",
"serve all except specified targets (valid targets are: " + VALID_TARGETS.join(", ") + ")"
)
.before(options => {
.before((options) => {
if (filterOnlyEmulators(options.only).length > 0) {
return Promise.resolve();
}
@@ -55,7 +55,7 @@ module.exports = new Command("serve")
.then(() => checkDupHostingKeys(options))
.then(() => getProjectNumber(options));
})
.action(options => {
.action((options) => {
options.targets = filterOnlyEmulators(options.only);
if (options.targets.length > 0) {
return serve(options);

View File

@@ -12,7 +12,7 @@ const hashcache = require("./hashcache");
const detectProjectRoot = require("../../detectProjectRoot");
const api = require("../../api");
const logger = require("../../logger");
const Queue = require("../../queue");
const { Queue } = require("../../queue");
const MIN_UPLOAD_TIMEOUT = 30000; // 30s
const MAX_UPLOAD_TIMEOUT = 7200000; // 2h

View File

@@ -5,14 +5,14 @@ const request = require("request");
const emulatorConstants = require("./constants");
const utils = require("../utils");
module.exports = name => {
module.exports = (name) => {
return new Promise((resolve, reject) => {
utils.logLabeledBullet(name, "downloading emulator...");
let emulator = emulatorConstants.emulators[name];
fs.ensureDirSync(emulator.cacheDir);
let req = request.get(emulator.remoteUrl);
let writeStream = fs.createWriteStream(emulator.localPath);
req.on("error", err => reject(err));
req.on("error", (err) => reject(err));
req.on("end", () => {
writeStream.close();
fs.chmodSync(emulator.localPath, 0o755);

View File

@@ -15,7 +15,7 @@ function _list(nextPageToken, projects) {
auth: true,
origin: api.firebaseApiOrigin,
})
.then(response => {
.then((response) => {
projects = projects.concat(response.body.results);
if (response.body.nextPageToken) {
return _list(response.body.nextPageToken, projects);
@@ -26,10 +26,10 @@ function _list(nextPageToken, projects) {
exports.listProjects = () => _list();
exports.getProject = projectId =>
exports.getProject = (projectId) =>
api
.request("GET", `/${API_VERSION}/projects/${projectId}`, {
auth: true,
origin: api.firebaseApiOrigin,
})
.then(response => response.body);
.then((response) => response.body);

View File

@@ -43,7 +43,7 @@ function _getProject(options) {
// Load all projects and prompt the user to choose.
return firebaseApi.listProjects().then(function(projects) {
var choices = projects.filter(project => !!project).map(project => {
var choices = projects.filter((project) => !!project).map((project) => {
return {
name: project.projectId,
label: project.projectId + " (" + project.displayName + ")",
@@ -82,7 +82,7 @@ function _getProject(options) {
}
var id = prompt.listLabelToValue(label, choices);
const project = projects.find(p => p.projectId === id);
const project = projects.find((p) => p.projectId === id);
return {
id: id,
label: label,

View File

@@ -1,168 +0,0 @@
"use strict";
const logger = require("./logger");
function _backoff(retryNumber, delay) {
return new Promise(function(resolve) {
setTimeout(resolve, delay * Math.pow(2, retryNumber));
});
}
class Queue {
constructor(options) {
this.name = options.name || "queue";
this.concurrency = options.concurrency || 1;
this.handler =
options.handler ||
function(task) {
return task();
};
this.cursor = 0;
this.active = 0;
this.complete = 0;
this.success = 0;
this.errored = 0;
this.retried = 0;
this.tasks = [];
this.waits = [];
this.min = 9999999999;
this.max = 0;
this.avg = 0;
this.retries = options.retries || 0;
this.backoff = typeof options.backoff == "number" ? options.backoff : 200;
this.retryCounts = {};
this.closed = false;
this.finished = false;
}
taskName(cursorIndex) {
const task = this.tasks[cursorIndex];
return typeof task === "string" ? task : "index " + cursorIndex;
}
wait() {
const self = this;
const p = new Promise(function(resolve, reject) {
self.waits.push({ resolve: resolve, reject: reject });
});
return p;
}
add(task) {
if (!this.startTime) {
this.startTime = Date.now();
}
if (this.closed) {
throw new Error("Cannot add a task to a closed queue.");
}
this.tasks.push(task);
this.process();
}
close() {
this.closed = true;
this._finishIfIdle();
}
process() {
if (
this._finishIfIdle() ||
this.active >= this.concurrency ||
this.cursor === this.tasks.length
) {
return;
}
this.cursor++;
this.active++;
this.handle(this.cursor - 1);
}
handle(cursorIndex) {
const task = this.tasks[cursorIndex];
const t0 = Date.now();
const self = this;
this.handler(task)
.then(function() {
const dt = Date.now() - t0;
if (dt < self.min) {
self.min = dt;
}
if (dt > self.max) {
self.max = dt;
}
self.avg = (self.avg * self.complete + dt) / (self.complete + 1);
self.success++;
self.complete++;
self.active--;
self.process();
})
.catch(function(err) {
if (self.retries > 0) {
self.retryCounts[cursorIndex] = self.retryCounts[cursorIndex] || 0;
if (self.retryCounts[cursorIndex] < self.retries) {
self.retryCounts[cursorIndex]++;
self.retried++;
return _backoff(self.retryCounts[cursorIndex], self.backoff).then(function() {
logger.debug("[" + self.name + "] Retrying task", self.taskName(cursorIndex));
return self.handle(cursorIndex);
});
}
}
self.errored++;
self.complete++;
self.active--;
if (self.retryCounts[cursorIndex] > 0) {
logger.debug(
"[" + self.name + "] Retries exhausted for task",
self.taskName(cursorIndex),
":",
err
);
} else {
logger.debug("[" + self.name + "] Error on task", self.taskName(cursorIndex), ":", err);
}
self._finish(err);
});
}
stats() {
return {
max: this.max,
min: this.min,
avg: this.avg,
active: this.active,
complete: this.complete,
success: this.success,
errored: this.errored,
retried: this.retried,
total: this.tasks.length,
elapsed: Date.now() - this.startTime,
};
}
_finishIfIdle() {
if (this.closed && this.cursor == this.tasks.length && this.active === 0) {
this._finish();
return true;
}
return false;
}
_finish(err) {
var self = this;
this.waits.forEach(function(p) {
if (err) {
return p.reject(err);
}
self.finished = true;
return p.resolve();
});
}
}
module.exports = Queue;

200
src/queue.ts Normal file
View File

@@ -0,0 +1,200 @@
import * as logger from "./logger";
function _backoff(retryNumber: number, delay: number): Promise<void> {
return new Promise((resolve: () => void) => {
setTimeout(resolve, delay * Math.pow(2, retryNumber));
});
}
function DEFAULT_HANDLER(task: any): Promise<any> {
return (task as () => Promise<any>)();
}
export interface QueueOptions<T> {
name?: string;
concurrency?: number;
handler?: (task: T) => Promise<any>;
retries?: number;
backoff?: number;
}
export interface QueueStats {
max: number;
min: number;
avg: number;
active: number;
complete: number;
success: number;
errored: number;
retried: number;
total: number;
elapsed: number;
}
export class Queue<T> {
public name: string = "queue";
public concurrency: number = 200;
public handler: (task: T) => Promise<any> = DEFAULT_HANDLER;
public cursor: number = 0;
public active: number = 0;
public complete: number = 0;
public success: number = 0;
public errored: number = 0;
public retried: number = 0;
public tasks: T[] = [];
public waits: Array<{ resolve: () => void; reject: (err: Error) => void }> = [];
public min: number = 9999999999;
public max: number = 0;
public avg: number = 0;
public retries: number = 0;
public backoff: number = 200;
public retryCounts: { [index: number]: number } = {};
public closed: boolean = false;
public finished: boolean = false;
public startTime: number = 0;
constructor(options: QueueOptions<T>) {
if (options.name) {
this.name = options.name;
}
if (options.handler) {
this.handler = options.handler;
}
if (typeof options.concurrency === "number") {
this.concurrency = options.concurrency;
}
if (typeof options.retries === "number") {
this.retries = options.retries;
}
if (typeof options.backoff === "number") {
this.backoff = options.backoff;
}
if (typeof options.backoff === "number") {
this.backoff = options.backoff;
}
}
public wait(): Promise<void> {
const p = new Promise<void>((resolve, reject) => {
this.waits.push({ resolve, reject });
});
return p;
}
public add(task: T): void {
if (this.closed) {
throw new Error("Cannot add a task to a closed queue.");
}
if (!this.startTime) {
this.startTime = Date.now();
}
this.tasks.push(task);
this.process();
}
public close(): boolean {
this.closed = true;
return this._finishIfIdle();
}
public process(): void {
if (
this._finishIfIdle() ||
this.active >= this.concurrency ||
this.cursor === this.tasks.length
) {
return;
}
this.cursor++;
this.active++;
this.handle(this.cursor - 1);
}
public async handle(cursorIndex: number): Promise<void> {
const task = this.tasks[cursorIndex];
const tname = this.taskName(cursorIndex);
const t0 = Date.now();
try {
await this.handler(task);
const dt = Date.now() - t0;
if (dt < this.min) {
this.min = dt;
}
if (dt > this.max) {
this.max = dt;
}
this.avg = (this.avg * this.complete + dt) / (this.complete + 1);
this.success++;
this.complete++;
this.active--;
this.process();
} catch (err) {
if (this.retries > 0) {
this.retryCounts[cursorIndex] = this.retryCounts[cursorIndex] || 0;
if (this.retryCounts[cursorIndex] < this.retries) {
this.retryCounts[cursorIndex]++;
this.retried++;
await _backoff(this.retryCounts[cursorIndex], this.backoff);
logger.debug(`[${this.name}] Retrying task`, tname);
return this.handle(cursorIndex);
}
}
this.errored++;
this.complete++;
this.active--;
if (this.retryCounts[cursorIndex] > 0) {
logger.debug(`[${this.name}] Retries exhausted for task ${tname}:`, err);
} else {
logger.debug(`[${this.name}] Error on task ${tname}:`, err);
}
this._finish(err);
}
}
public stats(): QueueStats {
return {
max: this.max,
min: this.min,
avg: this.avg,
active: this.active,
complete: this.complete,
success: this.success,
errored: this.errored,
retried: this.retried,
total: this.tasks.length,
elapsed: Date.now() - this.startTime,
};
}
public taskName(cursorIndex: number): string {
const task = this.tasks[cursorIndex];
return typeof task === "string" ? task : `index ${cursorIndex}`;
}
private _finishIfIdle(): boolean {
if (this.closed && this.cursor === this.tasks.length && this.active === 0) {
this._finish(null);
return true;
}
return false;
}
private _finish(err: Error | null): void {
this.waits.forEach((p) => {
if (err) {
return p.reject(err);
}
this.finished = true;
return p.resolve();
});
}
}
export default Queue;

View File

@@ -5,7 +5,7 @@ module.exports = function(options) {
return Promise.resolve();
}
return getInstanceId(options).then(instance => {
return getInstanceId(options).then((instance) => {
options.instance = instance;
});
};

View File

@@ -30,7 +30,7 @@ module.exports = function(options, permissions) {
origin: api.resourceManagerOrigin,
});
})
.then(response => {
.then((response) => {
const allowedPermissions = (response.body.permissions || []).sort();
const missingPermissions = _.difference(requiredPermissions, allowedPermissions);

View File

@@ -14,19 +14,19 @@ function _fatal(emulator, errorMsg) {
}
function _runBinary(emulator, command) {
return new Promise(resolve => {
return new Promise((resolve) => {
emulator.stdout = fs.createWriteStream(emulator.name + "-debug.log");
emulator.instance = childProcess.spawn(command.binary, command.args, {
stdio: ["inherit", "pipe", "pipe"],
});
emulator.instance.stdout.on("data", data => {
emulator.instance.stdout.on("data", (data) => {
console.log(data.toString());
emulator.stdout.write(data.toString());
});
emulator.instance.stderr.on("data", data => {
emulator.instance.stderr.on("data", (data) => {
utils.logWarning(emulator.name + ": " + data.toString());
});
emulator.instance.on("error", err => {
emulator.instance.on("error", (err) => {
if (err.path == "java" && err.code == "ENOENT") {
_fatal(
emulator,

View File

@@ -3,18 +3,11 @@ import * as sinon from "sinon";
const { expect } = chai;
import Queue = require("../queue");
import Queue from "../queue";
const TEST_ERROR = new Error("foobar");
describe("Queue", () => {
it("should ignore non-number backoff", () => {
const q = new Queue({
backoff: "not a number",
});
expect(q.backoff).to.equal(200);
});
it("should return the task as the task name", () => {
const handler = sinon.stub().resolves();
const q = new Queue({

View File

@@ -7,9 +7,7 @@
"outDir": "lib",
"removeComments": true,
"sourceMap": true,
"target": "ES6"
"target": "es2015"
},
"include": [
"src/**/*"
]
"include": ["src/**/*"]
}

View File

@@ -1,20 +1,16 @@
{
"defaultSeverity": "error",
"extends": [
"tslint:recommended",
"tslint-no-unused-expression-chai"
],
"jsRules": {},
"rules": {
"prettier": true
},
"rulesDirectory": [
"tslint-plugin-prettier"
],
"linterOptions": {
"exclude": [
"**/node_modules/**",
"**/*.js"
]
}
"defaultSeverity": "error",
"extends": ["tslint:recommended", "tslint-no-unused-expression-chai"],
"jsRules": {},
"rules": {
"prettier": true,
"interface-name": false,
"object-literal-sort-keys": false,
"arrow-parens": true,
"trailing-comma": [true, { "functions": "never" }]
},
"rulesDirectory": ["tslint-plugin-prettier"],
"linterOptions": {
"exclude": ["**/node_modules/**", "**/*.js"]
}
}