diff --git a/packager/react-packager/src/lib/BatchProcessor.js b/packager/react-packager/src/lib/BatchProcessor.js new file mode 100644 index 000000000..0464534d7 --- /dev/null +++ b/packager/react-packager/src/lib/BatchProcessor.js @@ -0,0 +1,104 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +'use strict'; + +const invariant = require('fbjs/lib/invariant'); + +type ProcessBatch = ( + batch: Array, + callback: (error?: Error, orderedResults?: Array) => mixed, +) => mixed; + +type BatchProcessorOptions = { + maximumDelayMs: number, + maximumItems: number, + concurrency: number, +}; + +/** + * We batch items together trying to minimize their processing, for example as + * network queries. For that we wait a small moment before processing a batch. + * We limit also the number of items we try to process in a single batch so that + * if we have many items pending in a short amount of time, we can start + * processing right away. + */ +class BatchProcessor { + + _options: BatchProcessorOptions; + _processBatch: ProcessBatch; + _queue: Array<{ + item: TItem, + callback: (error?: Error, result?: TResult) => mixed, + }>; + _timeoutHandle: ?number; + _currentProcessCount: number; + + constructor( + options: BatchProcessorOptions, + processBatch: ProcessBatch, + ) { + this._options = options; + this._processBatch = processBatch; + this._queue = []; + this._timeoutHandle = null; + this._currentProcessCount = 0; + (this: any)._processQueue = this._processQueue.bind(this); + } + + _processQueue() { + this._timeoutHandle = null; + while ( + this._queue.length > 0 && + this._currentProcessCount < this._options.concurrency + ) { + this._currentProcessCount++; + const jobs = this._queue.splice(0, this._options.maximumItems); + const items = jobs.map(job => job.item); + this._processBatch(items, (error, results) => { + invariant( + results == null || results.length === items.length, + 'Not enough results returned.', + ); + for (let i = 0; i < items.length; ++i) { + jobs[i].callback(error, results && results[i]); + } + this._currentProcessCount--; + this._processQueueOnceReady(); + }); + } + } + + _processQueueOnceReady() { + if (this._queue.length >= this._options.maximumItems) { + clearTimeout(this._timeoutHandle); + process.nextTick(this._processQueue); + return; + } + if (this._timeoutHandle == null) { + this._timeoutHandle = setTimeout( + this._processQueue, + this._options.maximumDelayMs, + ); + } + } + + queue( + item: TItem, + callback: (error?: Error, result?: TResult) => mixed, + ) { + this._queue.push({item, callback}); + this._processQueueOnceReady(); + } + +} + +module.exports = BatchProcessor; diff --git a/packager/react-packager/src/lib/GlobalTransformCache.js b/packager/react-packager/src/lib/GlobalTransformCache.js index 472e42e3e..5775a36db 100644 --- a/packager/react-packager/src/lib/GlobalTransformCache.js +++ b/packager/react-packager/src/lib/GlobalTransformCache.js @@ -11,9 +11,10 @@ 'use strict'; +const BatchProcessor = require('./BatchProcessor'); + const crypto = require('crypto'); const imurmurhash = require('imurmurhash'); -const invariant = require('fbjs/lib/invariant'); const jsonStableStringify = require('json-stable-stringify'); const path = require('path'); const request = require('request'); @@ -42,92 +43,6 @@ type FetchProps = { type FetchCallback = (error?: Error, result?: ?CachedResult) => mixed; type FetchURICallback = (error?: Error, resultURI?: ?string) => mixed; -type ProcessBatch = ( - batch: Array, - callback: (error?: Error, orderedResults?: Array) => mixed, -) => mixed; -type BatchProcessorOptions = { - maximumDelayMs: number, - maximumItems: number, - concurrency: number, -}; - -/** - * We batch keys together trying to make a smaller amount of queries. For that - * we wait a small moment before starting to fetch. We limit also the number of - * keys we try to fetch at once, so if we already have that many keys pending, - * we can start fetching right away. - */ -class BatchProcessor { - - _options: BatchProcessorOptions; - _processBatch: ProcessBatch; - _queue: Array<{ - item: TItem, - callback: (error?: Error, result?: TResult) => mixed, - }>; - _timeoutHandle: ?number; - _currentProcessCount: number; - - constructor( - options: BatchProcessorOptions, - processBatch: ProcessBatch, - ) { - this._options = options; - this._processBatch = processBatch; - this._queue = []; - this._timeoutHandle = null; - this._currentProcessCount = 0; - (this: any)._processQueue = this._processQueue.bind(this); - } - - _processQueue() { - this._timeoutHandle = null; - while ( - this._queue.length > 0 && - this._currentProcessCount < this._options.concurrency - ) { - this._currentProcessCount++; - const jobs = this._queue.splice(0, this._options.maximumItems); - const items = jobs.map(job => job.item); - this._processBatch(items, (error, results) => { - invariant( - results == null || results.length === items.length, - 'Not enough results returned.', - ); - for (let i = 0; i < items.length; ++i) { - jobs[i].callback(error, results && results[i]); - } - this._currentProcessCount--; - this._processQueueOnceReady(); - }); - } - } - - _processQueueOnceReady() { - if (this._queue.length >= this._options.maximumItems) { - clearTimeout(this._timeoutHandle); - process.nextTick(this._processQueue); - return; - } - if (this._timeoutHandle == null) { - this._timeoutHandle = setTimeout( - this._processQueue, - this._options.maximumDelayMs, - ); - } - } - - queue( - item: TItem, - callback: (error?: Error, result?: TResult) => mixed, - ) { - this._queue.push({item, callback}); - this._processQueueOnceReady(); - } - -} - type URI = string; /**