diff --git a/bull/bull-tests.ts.tscparams b/bull/bull-tests.ts.tscparams new file mode 100644 index 0000000000..6641df12d4 --- /dev/null +++ b/bull/bull-tests.ts.tscparams @@ -0,0 +1 @@ +--target es5 --noImplicitAny --module commonjs diff --git a/bull/bull-tests.tsx b/bull/bull-tests.tsx new file mode 100644 index 0000000000..bd25efc0c9 --- /dev/null +++ b/bull/bull-tests.tsx @@ -0,0 +1,102 @@ +/** + * Created by Bruno Grieder + */ + +/// + + +import * as Queue from "bull" + +var videoQueue = Queue( 'video transcoding', 6379, '127.0.0.1' ); +var audioQueue = Queue( 'audio transcoding', 6379, '127.0.0.1' ); +var imageQueue = Queue( 'image transcoding', 6379, '127.0.0.1' ); + +videoQueue.process( ( job: Queue.Job, done: Queue.DoneCallback ) => { + + // job.data contains the custom data passed when the job was created + // job.jobId contains id of this job. + + // transcode video asynchronously and report progress + job.progress( 42 ); + + // call done when finished + done(); + + // or give a error if error + done( Error( 'error transcoding' ) ); + + // or pass it a result + done( null, { framerate: 29.5 /* etc... */ } ); + + // If the job throws an unhandled exception it is also handled correctly + throw (Error( 'some unexpected error' )); +} ); + +audioQueue.process( ( job: Queue.Job, done: Queue.DoneCallback ) => { + // transcode audio asynchronously and report progress + job.progress( 42 ); + + // call done when finished + done(); + + // or give a error if error + done( Error( 'error transcoding' ) ); + + // or pass it a result + done( null, { samplerate: 48000 /* etc... */ } ); + + // If the job throws an unhandled exception it is also handled correctly + throw (Error( 'some unexpected error' )); +} ); + +imageQueue.process( ( job: Queue.Job, done: Queue.DoneCallback ) => { + // transcode image asynchronously and report progress + job.progress( 42 ); + + // call done when finished + done(); + + // or give a error if error + done( Error( 'error transcoding' ) ); + + // or pass it a result + done( null, { width: 1280, height: 720 /* etc... */ } ); + + // If the job throws an unhandled exception it is also handled correctly + throw (Error( 'some unexpected error' )); +} ); + +videoQueue.add( { video: 'http://example.com/video1.mov' } ); +audioQueue.add( { audio: 'http://example.com/audio1.mp3' } ); +imageQueue.add( { image: 'http://example.com/image1.tiff' } ); + + +////////////////////////////////////////////////////////////////////////////////// +// +// Using Promises +// +////////////////////////////////////////////////////////////////////////////////// + +const fetchVideo = ( url: string ): Promise => { return null } +const transcodeVideo = ( data: any ): Promise => { return null } + +interface VideoJob extends Queue.Job { + data: {url: string} +} + + +videoQueue.process( ( job: VideoJob ) => { // don't forget to remove the done callback! + // Simply return a promise + return fetchVideo( job.data.url ).then( transcodeVideo ); + + // Handles promise rejection + return Promise.reject( new Error( 'error transcoding' ) ); + + // Passes the value the promise is resolved with to the "completed" event + return Promise.resolve( { framerate: 29.5 /* etc... */ } ); + + // If the job throws an unhandled exception it is also handled correctly + throw new Error( 'some unexpected error' ); + // same as + return Promise.reject( new Error( 'some unexpected error' ) ); +} ); diff --git a/bull/bull.d.ts b/bull/bull.d.ts new file mode 100644 index 0000000000..b867c11235 --- /dev/null +++ b/bull/bull.d.ts @@ -0,0 +1,311 @@ +// Type definitions for bull 0.7.0 +// Project: https://github.com/OptimalBits/bull +// Definitions by: Bruno Grieder +// Definitions: https://github.com/borisyankov/DefinitelyTyped + +/// +/// + + +declare module "bull" { + + import * as Redis from "redis"; + + /** + * This is the Queue constructor. + * It creates a new Queue that is persisted in Redis. + * Everytime the same queue is instantiated it tries to process all the old jobs that may exist from a previous unfinished session. + */ + function Bull(queueName: string, redisPort: number, redisHost: string, redisOpt?: Redis.ClientOpts): Bull.Queue; + + module Bull { + + export interface DoneCallback { + (error?: Error, value?: any): void + } + + export interface Job { + + id: string + + /** + * The custom data passed when the job was created + */ + data: Object; + + /** + * Report progress on a job + */ + progress(value: any): Promise; + + /** + * Removes a Job from the queue from all the lists where it may be included. + * @returns {Promise} A promise that resolves when the job is removed. + */ + remove(): Promise; + + /** + * Rerun a Job that has failed. + * @returns {Promise} A promise that resolves when the job is scheduled for retry. + */ + retry(): Promise; + } + + export interface Backoff { + + /** + * Backoff type, which can be either `fixed` or `exponential` + */ + type: string + + /** + * Backoff delay, in milliseconds + */ + delay: number; + } + + export interface AddOptions { + /** + * An amount of miliseconds to wait until this job can be processed. + * Note that for accurate delays, both server and clients should have their clocks synchronized + */ + delay?: number; + + /** + * A number of attempts to retry if the job fails [optional] + */ + attempts?: number; + + /** + * Backoff setting for automatic retries if the job fails + */ + backoff?: number | Backoff + + /** + * A boolean which, if true, adds the job to the right + * of the queue instead of the left (default false) + */ + lifo?: boolean; + + /** + * The number of milliseconds after which the job should be fail with a timeout error + */ + timeout?: number; + } + + export interface Queue { + + /** + * Defines a processing function for the jobs placed into a given Queue. + * + * The callback is called everytime a job is placed in the queue. + * It is passed an instance of the job as first argument. + * + * The done callback can be called with an Error instance, to signal that the job did not complete successfully, + * or with a result as second argument as second argument (e.g.: done(null, result);) when the job is successful. + * Errors will be passed as a second argument to the "failed" event; + * results, as a second argument to the "completed" event. + * + * concurrency: Bull will then call you handler in parallel respecting this max number. + */ + process(concurrency: number, callback: (job: Job, done: DoneCallback) => void): void; + + /** + * Defines a processing function for the jobs placed into a given Queue. + * + * The callback is called everytime a job is placed in the queue. + * It is passed an instance of the job as first argument. + * + * The done callback can be called with an Error instance, to signal that the job did not complete successfully, + * or with a result as second argument as second argument (e.g.: done(null, result);) when the job is successful. + * Errors will be passed as a second argument to the "failed" event; + * results, as a second argument to the "completed" event. + */ + process(callback: (job: Job, done: DoneCallback) => void): void; + + /** + * Defines a processing function for the jobs placed into a given Queue. + * + * The callback is called everytime a job is placed in the queue. + * It is passed an instance of the job as first argument. + * + * A promise must be returned to signal job completion. + * If the promise is rejected, the error will be passed as a second argument to the "failed" event. + * If it is resolved, its value will be the "completed" event's second argument. + * + * concurrency: Bull will then call you handler in parallel respecting this max number. + */ + process(concurrency: number, callback: (job: Job) => void): Promise; + + /** + * Defines a processing function for the jobs placed into a given Queue. + * + * The callback is called everytime a job is placed in the queue. + * It is passed an instance of the job as first argument. + * + * A promise must be returned to signal job completion. + * If the promise is rejected, the error will be passed as a second argument to the "failed" event. + * If it is resolved, its value will be the "completed" event's second argument. + */ + process(callback: (job: Job) => void): Promise; + + // process(callback: (job: Job, done?: DoneCallback) => void): Promise; + + /** + * Creates a new job and adds it to the queue. + * If the queue is empty the job will be executed directly, + * otherwise it will be placed in the queue and executed as soon as possible. + */ + add(data: Object, opts?: AddOptions): Promise; + + /** + * Returns a promise that resolves when the queue is paused. + * The pause is global, meaning that all workers in all queue instances for a given queue will be paused. + * A paused queue will not process new jobs until resumed, + * but current jobs being processed will continue until they are finalized. + * + * Pausing a queue that is already paused does nothing. + */ + pause(): Promise; + + /** + * Returns a promise that resolves when the queue is resumed after being paused. + * The resume is global, meaning that all workers in all queue instances for a given queue will be resumed. + * + * Resuming a queue that is not paused does nothing. + */ + resume(): Promise; + + /** + * Returns a promise that returns the number of jobs in the queue, waiting or paused. + * Since there may be other processes adding or processing jobs, this value may be true only for a very small amount of time. + */ + count(): Promise; + + /** + * Empties a queue deleting all the input lists and associated jobs. + */ + empty(): Promise; + + /** + * Closes the underlying redis client. Use this to perform a graceful shutdown. + * + * `close` can be called from anywhere, with one caveat: + * if called from within a job handler the queue won't close until after the job has been processed + */ + close(): Promise; + + /** + * Returns a promise that will return the job instance associated with the jobId parameter. + * If the specified job cannot be located, the promise callback parameter will be set to null. + */ + getJob(jobId: string): Promise; + + /** + * Tells the queue remove all jobs created outside of a grace period in milliseconds. + * You can clean the jobs with the following states: completed, waiting, active, delayed, and failed. + */ + clean(gracePeriod: number, jobsState?: string): Promise; + + /** + * Listens to queue events + * 'ready', 'error', 'activ', 'progress', 'completed', 'failed', 'paused', 'resumed', 'cleaned' + */ + on(eventName: string, callback: EventCallback): void; + } + + interface EventCallback { + (...args: any[]): void + } + + interface ReadyEventCallback extends EventCallback { + (): void; + } + + interface ErrorEventCallback extends EventCallback { + (error: Error): void; + } + + interface JobPromise { + /** + * Abort this job + */ + cancel(): void + } + + interface ActiveEventCallback extends EventCallback { + (job: Job, jobPromise: JobPromise): void; + } + + interface ProgressEventCallback extends EventCallback { + (job: Job, progress: any): void; + } + + interface CompletedEventCallback extends EventCallback { + (job: Job, result: Object): void; + } + + interface FailedEventCallback extends EventCallback { + (job: Job, error: Error): void; + } + + interface PausedEventCallback extends EventCallback { + (): void; + } + + interface ResumedEventCallback extends EventCallback { + (job?: Job): void; + } + + /** + * @see clean() for details + */ + interface CleanedEventCallback extends EventCallback { + (jobs: Job[], type: string): void; + } + } + + export = Bull; +} + +declare module "bull/lib/priority-queue" { + + import * as Bull from "bull"; + import * as Redis from "redis"; + + /** + * This is the Queue constructor of priority queue. + * + * It works same a normal queue, with same function and parameters. + * The only difference is that the Queue#add() allow an options opts.priority + * that could take ["low", "normal", "medium", "hight", "critical"]. If no options provider, "normal" will be taken. + * + * The priority queue will process more often highter priority jobs than lower. + */ + function PQueue(queueName: string, redisPort: number, redisHost: string, redisOpt?: Redis.ClientOpts): PQueue.PriorityQueue; + + module PQueue { + + export interface AddOptions extends Bull.AddOptions { + + /** + * "low", "normal", "medium", "high", "critical" + */ + priority?: string; + } + + + export interface PriorityQueue extends Bull.Queue { + + /** + * Creates a new job and adds it to the queue. + * If the queue is empty the job will be executed directly, + * otherwise it will be placed in the queue and executed as soon as possible. + */ + add(data: Object, opts?: PQueue.AddOptions): Promise; + + } + } + + export = PQueue; +}