From 8dfd709683480bf87b3fa9ab92b0e2894cb70a3f Mon Sep 17 00:00:00 2001 From: abreits Date: Wed, 14 Oct 2015 14:33:16 +0200 Subject: [PATCH 1/5] amqplib: callback api definition and tests added --- amqplib/amqplib-tests.ts | 28 ++++++++ amqplib/amqplib.d.ts | 149 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) diff --git a/amqplib/amqplib-tests.ts b/amqplib/amqplib-tests.ts index 7a1f512002..cfb9adbe13 100644 --- a/amqplib/amqplib-tests.ts +++ b/amqplib/amqplib-tests.ts @@ -1,5 +1,6 @@ /// +// promise api tests import amqp = require("amqplib"); var msg = "Hello World"; @@ -19,3 +20,30 @@ amqp.connect("amqp://localhost") .then(channel => channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString()))) .ensure(() => connection.close()); }); + +// callback api tests +import amqpcb = require("amqplib/callback_api"); + +amqpcb.connect("amqp://localhost", (err, connection) => { + if(!err) { + connection.createChannel((err, channel) => { + if (!err) { + channel.assertQueue("myQueue", (err, ok) => { + channel.sendToQueue("myQueue", new Buffer(msg)); + }); + } + }); + } +}); + +amqpcb.connect("amqp://localhost", (err, connection) => { + if(!err) { + connection.createChannel((err, channel) => { + if (!err) { + channel.assertQueue("myQueue", (err, ok) => { + channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString())); + }); + } + }); + } +}); diff --git a/amqplib/amqplib.d.ts b/amqplib/amqplib.d.ts index 0c7f0720a6..c6f73feefe 100644 --- a/amqplib/amqplib.d.ts +++ b/amqplib/amqplib.d.ts @@ -1,6 +1,7 @@ // Type definitions for amqplib 0.3.x // Project: https://github.com/squaremo/amqp.node // Definitions by: Michael Nahkies +// Definitions for callback api added by: Ab Reitsma // Definitions: https://github.com/borisyankov/DefinitelyTyped /// @@ -142,3 +143,151 @@ declare module "amqplib" { function connect(url: string, socketOptions?: any): when.Promise; } + +declare module "amqplib/callback_api" { + + import events = require("events"); + + interface Connection extends events.EventEmitter { + close(callback?: (err: any) => void); + createChannel(callback: (err: any, channel: Channel) => void); + createConfirmChannel(callback: (err: any, confirmChannel: ConfirmChannel) => void); + } + + module Replies { + interface Empty { + } + interface AssertQueue { + queue: string; + messageCount: number; + consumerCount: number; + } + interface DeleteQueue { + messageCount: number; + } + interface PurgeQueue { + messageCount: number; + } + interface AssertExchange { + exchange: string; + } + interface Consume { + consumerTag: string; + } + } + + module Options { + interface AssertQueue { + exclusive?: boolean; + durable?: boolean; + autoDelete?: boolean; + arguments?: any; + messageTtl?: number; + expires?: number; + deadLetterExchange?: string; + maxLength?: number; + } + interface DeleteQueue { + ifUnused?: boolean; + ifEmpty?: boolean; + } + interface AssertExchange { + durable?: boolean; + internal?: boolean; + autoDelete?: boolean; + alternateExchange?: string; + arguments?: any; + } + interface DeleteExchange { + ifUnused?: boolean; + } + interface Publish { + expiration?: string; + userId?: string; + CC?: string | string[]; + + mandatory?: boolean; + persistent?: boolean; + deliveryMode?: boolean | number; + BCC?: string | string[]; + + contentType?: string; + contentEncoding?: string; + headers?: Object; + priority?: number; + correlationId?: string; + replyTo?: string; + messageId?: string; + timestamp?: number; + type?: string; + appId?: string; + } + interface Consume { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: Object; + } + interface Get { + noAck?: boolean; + } + } + + interface Message { + content: Buffer; + fields: any; + properties: any; + } + + interface Channel extends events.EventEmitter { + close(callback: (err: any) => void); + + assertQueue(queue?: string, options?: Options.AssertQueue, callback?: (err:any, ok: Replies.AssertQueue) => void); + checkQueue(queue: string, callback?: (err: any, ok: Replies.AssertQueue) => void); + + deleteQueue(queue: string, options?: Options.DeleteQueue, callback?: (err:any, ok: Replies.DeleteQueue) => void); + purgeQueue(queue: string, callback?: (err:any, ok: Replies.PurgeQueue) => void); + + bindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + unbindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + + assertExchange(exchange: string, type: string, options?: Options.AssertExchange, callback?: (err: any, ok: Replies.AssertExchange) => void); + checkExchange(exchange: string, callback?: (err: any, ok: Replies.Empty) => void); + + deleteExchange(exchange: string, options?: Options.DeleteExchange, callback?: (err: any, ok: Replies.Empty) => void); + + bindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + unbindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + + publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish): boolean; + sendToQueue(queue: string, content: Buffer, options?: Options.Publish): boolean; + + consume(queue: string, onMessage: (msg: Message) => any, options?: Options.Consume, callback?: (err: any, ok: Replies.Consume) => void); + + cancel(consumerTag: string, callback?: (err: any, ok: Replies.Empty) => void); + get(queue: string, options?: Options.Get, callback?: (err: any, ok: Message | boolean) => void); + + ack(message: Message, allUpTo?: boolean): void; + ackAll(): void; + + nack(message: Message, allUpTo?: boolean, requeue?: boolean): void; + nackAll(requeue?: boolean): void; + reject(message: Message, requeue?: boolean): void; + + prefetch(count: number, global?: boolean); + recover(callback?: (err: any, ok: Replies.Empty) => void); + } + + interface ConfirmChannel extends Channel { + publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean; + sendToQueue(queue: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean; + + waitForConfirms(callback?: (err: any) => void); + } + + function connect(callback: (err: any, connection: Connection) => void); + function connect(url: string, callback: (err: any, connection: Connection) => void); + function connect(url: string, socketOptions: any, callback: (err: any, connection: Connection) => void); +} From 93bcd768f07af500f68c26abfba8ed2827c81eac Mon Sep 17 00:00:00 2001 From: abreits Date: Wed, 14 Oct 2015 14:58:55 +0200 Subject: [PATCH 2/5] amqplib: callback-api added (second try) --- amqplib/amqplib-tests.ts | 8 +++++-- amqplib/amqplib.d.ts | 51 ++++++++++++++++++++-------------------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/amqplib/amqplib-tests.ts b/amqplib/amqplib-tests.ts index cfb9adbe13..c22bc7ac96 100644 --- a/amqplib/amqplib-tests.ts +++ b/amqplib/amqplib-tests.ts @@ -29,7 +29,9 @@ amqpcb.connect("amqp://localhost", (err, connection) => { connection.createChannel((err, channel) => { if (!err) { channel.assertQueue("myQueue", (err, ok) => { - channel.sendToQueue("myQueue", new Buffer(msg)); + if(!err) { + channel.sendToQueue("myQueue", new Buffer(msg)); + } }); } }); @@ -41,7 +43,9 @@ amqpcb.connect("amqp://localhost", (err, connection) => { connection.createChannel((err, channel) => { if (!err) { channel.assertQueue("myQueue", (err, ok) => { - channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString())); + if(!err) { + channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString())); + } }); } }); diff --git a/amqplib/amqplib.d.ts b/amqplib/amqplib.d.ts index c6f73feefe..a6e6e7a055 100644 --- a/amqplib/amqplib.d.ts +++ b/amqplib/amqplib.d.ts @@ -1,7 +1,6 @@ // Type definitions for amqplib 0.3.x // Project: https://github.com/squaremo/amqp.node -// Definitions by: Michael Nahkies -// Definitions for callback api added by: Ab Reitsma +// Definitions by: Michael Nahkies , Ab Reitsma // Definitions: https://github.com/borisyankov/DefinitelyTyped /// @@ -149,9 +148,9 @@ declare module "amqplib/callback_api" { import events = require("events"); interface Connection extends events.EventEmitter { - close(callback?: (err: any) => void); - createChannel(callback: (err: any, channel: Channel) => void); - createConfirmChannel(callback: (err: any, confirmChannel: ConfirmChannel) => void); + close(callback?: (err: any) => void): void; + createChannel(callback: (err: any, channel: Channel) => void): void; + createConfirmChannel(callback: (err: any, confirmChannel: ConfirmChannel) => void): void; } module Replies { @@ -242,32 +241,32 @@ declare module "amqplib/callback_api" { } interface Channel extends events.EventEmitter { - close(callback: (err: any) => void); + close(callback: (err: any) => void): void; - assertQueue(queue?: string, options?: Options.AssertQueue, callback?: (err:any, ok: Replies.AssertQueue) => void); - checkQueue(queue: string, callback?: (err: any, ok: Replies.AssertQueue) => void); + assertQueue(queue?: string, options?: Options.AssertQueue, callback?: (err:any, ok: Replies.AssertQueue) => void): void; + checkQueue(queue: string, callback?: (err: any, ok: Replies.AssertQueue) => void): void; - deleteQueue(queue: string, options?: Options.DeleteQueue, callback?: (err:any, ok: Replies.DeleteQueue) => void); - purgeQueue(queue: string, callback?: (err:any, ok: Replies.PurgeQueue) => void); + deleteQueue(queue: string, options?: Options.DeleteQueue, callback?: (err:any, ok: Replies.DeleteQueue) => void): void; + purgeQueue(queue: string, callback?: (err:any, ok: Replies.PurgeQueue) => void): void; - bindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); - unbindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + bindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void; + unbindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void; - assertExchange(exchange: string, type: string, options?: Options.AssertExchange, callback?: (err: any, ok: Replies.AssertExchange) => void); - checkExchange(exchange: string, callback?: (err: any, ok: Replies.Empty) => void); + assertExchange(exchange: string, type: string, options?: Options.AssertExchange, callback?: (err: any, ok: Replies.AssertExchange) => void): void; + checkExchange(exchange: string, callback?: (err: any, ok: Replies.Empty) => void): void; - deleteExchange(exchange: string, options?: Options.DeleteExchange, callback?: (err: any, ok: Replies.Empty) => void); + deleteExchange(exchange: string, options?: Options.DeleteExchange, callback?: (err: any, ok: Replies.Empty) => void): void; - bindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); - unbindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void); + bindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void; + unbindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void; publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish): boolean; sendToQueue(queue: string, content: Buffer, options?: Options.Publish): boolean; - consume(queue: string, onMessage: (msg: Message) => any, options?: Options.Consume, callback?: (err: any, ok: Replies.Consume) => void); + consume(queue: string, onMessage: (msg: Message) => any, options?: Options.Consume, callback?: (err: any, ok: Replies.Consume) => void): void; - cancel(consumerTag: string, callback?: (err: any, ok: Replies.Empty) => void); - get(queue: string, options?: Options.Get, callback?: (err: any, ok: Message | boolean) => void); + cancel(consumerTag: string, callback?: (err: any, ok: Replies.Empty) => void): void; + get(queue: string, options?: Options.Get, callback?: (err: any, ok: Message | boolean) => void): void; ack(message: Message, allUpTo?: boolean): void; ackAll(): void; @@ -276,18 +275,18 @@ declare module "amqplib/callback_api" { nackAll(requeue?: boolean): void; reject(message: Message, requeue?: boolean): void; - prefetch(count: number, global?: boolean); - recover(callback?: (err: any, ok: Replies.Empty) => void); + prefetch(count: number, global?: boolean): void; + recover(callback?: (err: any, ok: Replies.Empty) => void): void; } interface ConfirmChannel extends Channel { publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean; sendToQueue(queue: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean; - waitForConfirms(callback?: (err: any) => void); + waitForConfirms(callback?: (err: any) => void): void; } - function connect(callback: (err: any, connection: Connection) => void); - function connect(url: string, callback: (err: any, connection: Connection) => void); - function connect(url: string, socketOptions: any, callback: (err: any, connection: Connection) => void); + function connect(callback: (err: any, connection: Connection) => void): void; + function connect(url: string, callback: (err: any, connection: Connection) => void): void; + function connect(url: string, socketOptions: any, callback: (err: any, connection: Connection) => void): void; } From 9e7d9453a136e98970eeae042706680d0e974c67 Mon Sep 17 00:00:00 2001 From: abreits Date: Wed, 14 Oct 2015 15:09:10 +0200 Subject: [PATCH 3/5] amqplib: callback-api definition and tests added --- amqplib/amqplib-tests.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amqplib/amqplib-tests.ts b/amqplib/amqplib-tests.ts index c22bc7ac96..f99a3bde1a 100644 --- a/amqplib/amqplib-tests.ts +++ b/amqplib/amqplib-tests.ts @@ -28,7 +28,7 @@ amqpcb.connect("amqp://localhost", (err, connection) => { if(!err) { connection.createChannel((err, channel) => { if (!err) { - channel.assertQueue("myQueue", (err, ok) => { + channel.assertQueue("myQueue", {}, (err, ok) => { if(!err) { channel.sendToQueue("myQueue", new Buffer(msg)); } @@ -42,7 +42,7 @@ amqpcb.connect("amqp://localhost", (err, connection) => { if(!err) { connection.createChannel((err, channel) => { if (!err) { - channel.assertQueue("myQueue", (err, ok) => { + channel.assertQueue("myQueue", {}, (err, ok) => { if(!err) { channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString())); } From 8d17670522399165e2025b3c5e3886ff976a8686 Mon Sep 17 00:00:00 2001 From: abreits Date: Mon, 19 Oct 2015 13:48:09 +0200 Subject: [PATCH 4/5] amqplib: Shared declarations combined --- amqplib/amqplib.d.ts | 124 +++++++++---------------------------------- 1 file changed, 25 insertions(+), 99 deletions(-) diff --git a/amqplib/amqplib.d.ts b/amqplib/amqplib.d.ts index a6e6e7a055..bc0006753c 100644 --- a/amqplib/amqplib.d.ts +++ b/amqplib/amqplib.d.ts @@ -6,17 +6,7 @@ /// /// -declare module "amqplib" { - - import events = require("events"); - import when = require("when"); - - interface Connection extends events.EventEmitter { - close(): when.Promise; - createChannel(): when.Promise; - createConfirmChannel(): when.Promise; - } - +declare module "amqplib/shared" { module Replies { interface Empty { } @@ -25,6 +15,9 @@ declare module "amqplib" { messageCount: number; consumerCount: number; } + interface PurgeQueue { + messageCount: number; + } interface DeleteQueue { messageCount: number; } @@ -100,6 +93,22 @@ declare module "amqplib" { fields: Object; properties: Object; } +} + +declare module "amqplib" { + + import events = require("events"); + import when = require("when"); + import shared = require("amqplib/shared") + import Replies = shared.Replies; + import Options = shared.Options; + import Message = shared.Message; + + interface Connection extends events.EventEmitter { + close(): when.Promise; + createChannel(): when.Promise; + createConfirmChannel(): when.Promise; + } interface Channel extends events.EventEmitter { close(): when.Promise; @@ -108,7 +117,7 @@ declare module "amqplib" { checkQueue(queue: string): when.Promise; deleteQueue(queue: string, options?: Options.DeleteQueue): when.Promise; - purgeQueue(queue: string): when.Promise; + purgeQueue(queue: string): when.Promise; bindQueue(queue: string, source: string, pattern: string, args?: any): when.Promise; unbindQueue(queue: string, source: string, pattern: string, args?: any): when.Promise; @@ -146,6 +155,10 @@ declare module "amqplib" { declare module "amqplib/callback_api" { import events = require("events"); + import shared = require("amqplib/shared") + import Replies = shared.Replies; + import Options = shared.Options; + import Message = shared.Message; interface Connection extends events.EventEmitter { close(callback?: (err: any) => void): void; @@ -153,93 +166,6 @@ declare module "amqplib/callback_api" { createConfirmChannel(callback: (err: any, confirmChannel: ConfirmChannel) => void): void; } - module Replies { - interface Empty { - } - interface AssertQueue { - queue: string; - messageCount: number; - consumerCount: number; - } - interface DeleteQueue { - messageCount: number; - } - interface PurgeQueue { - messageCount: number; - } - interface AssertExchange { - exchange: string; - } - interface Consume { - consumerTag: string; - } - } - - module Options { - interface AssertQueue { - exclusive?: boolean; - durable?: boolean; - autoDelete?: boolean; - arguments?: any; - messageTtl?: number; - expires?: number; - deadLetterExchange?: string; - maxLength?: number; - } - interface DeleteQueue { - ifUnused?: boolean; - ifEmpty?: boolean; - } - interface AssertExchange { - durable?: boolean; - internal?: boolean; - autoDelete?: boolean; - alternateExchange?: string; - arguments?: any; - } - interface DeleteExchange { - ifUnused?: boolean; - } - interface Publish { - expiration?: string; - userId?: string; - CC?: string | string[]; - - mandatory?: boolean; - persistent?: boolean; - deliveryMode?: boolean | number; - BCC?: string | string[]; - - contentType?: string; - contentEncoding?: string; - headers?: Object; - priority?: number; - correlationId?: string; - replyTo?: string; - messageId?: string; - timestamp?: number; - type?: string; - appId?: string; - } - interface Consume { - consumerTag?: string; - noLocal?: boolean; - noAck?: boolean; - exclusive?: boolean; - priority?: number; - arguments?: Object; - } - interface Get { - noAck?: boolean; - } - } - - interface Message { - content: Buffer; - fields: any; - properties: any; - } - interface Channel extends events.EventEmitter { close(callback: (err: any) => void): void; From 754264b378fa47ccdf102dba2d90b06714b19db6 Mon Sep 17 00:00:00 2001 From: abreits Date: Tue, 20 Oct 2015 07:10:37 +0200 Subject: [PATCH 5/5] amqplib: better description for shared properties module --- amqplib/amqplib.d.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/amqplib/amqplib.d.ts b/amqplib/amqplib.d.ts index bc0006753c..f125aaa070 100644 --- a/amqplib/amqplib.d.ts +++ b/amqplib/amqplib.d.ts @@ -6,7 +6,7 @@ /// /// -declare module "amqplib/shared" { +declare module "amqplib/properties" { module Replies { interface Empty { } @@ -99,7 +99,7 @@ declare module "amqplib" { import events = require("events"); import when = require("when"); - import shared = require("amqplib/shared") + import shared = require("amqplib/properties") import Replies = shared.Replies; import Options = shared.Options; import Message = shared.Message; @@ -155,7 +155,7 @@ declare module "amqplib" { declare module "amqplib/callback_api" { import events = require("events"); - import shared = require("amqplib/shared") + import shared = require("amqplib/properties") import Replies = shared.Replies; import Options = shared.Options; import Message = shared.Message;