Merge pull request #6286 from abreits/master

amqplib: callback api definition and tests added
This commit is contained in:
Masahiro Wakame
2015-10-21 23:19:39 +09:00
2 changed files with 119 additions and 13 deletions

View File

@@ -1,5 +1,6 @@
/// <reference path="amqplib.d.ts" />
// promise api tests
import amqp = require("amqplib");
var msg = "Hello World";
@@ -19,3 +20,34 @@ amqp.connect("amqp://localhost")
.then(channel => channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString())))
.ensure(() => connection.close());
});
// callback api tests
import amqpcb = require("amqplib/callback_api");
amqpcb.connect("amqp://localhost", (err, connection) => {
if(!err) {
connection.createChannel((err, channel) => {
if (!err) {
channel.assertQueue("myQueue", {}, (err, ok) => {
if(!err) {
channel.sendToQueue("myQueue", new Buffer(msg));
}
});
}
});
}
});
amqpcb.connect("amqp://localhost", (err, connection) => {
if(!err) {
connection.createChannel((err, channel) => {
if (!err) {
channel.assertQueue("myQueue", {}, (err, ok) => {
if(!err) {
channel.consume("myQueue", newMsg => console.log("New Message: " + newMsg.content.toString()));
}
});
}
});
}
});

100
amqplib/amqplib.d.ts vendored
View File

@@ -1,22 +1,12 @@
// Type definitions for amqplib 0.3.x
// Project: https://github.com/squaremo/amqp.node
// Definitions by: Michael Nahkies <https://github.com/mnahkies>
// Definitions by: Michael Nahkies <https://github.com/mnahkies>, Ab Reitsma <https://github.com/abreits>
// Definitions: https://github.com/borisyankov/DefinitelyTyped
/// <reference path="../when/when.d.ts" />
/// <reference path="../node/node.d.ts" />
declare module "amqplib" {
import events = require("events");
import when = require("when");
interface Connection extends events.EventEmitter {
close(): when.Promise<void>;
createChannel(): when.Promise<Channel>;
createConfirmChannel(): when.Promise<Channel>;
}
declare module "amqplib/properties" {
module Replies {
interface Empty {
}
@@ -25,6 +15,9 @@ declare module "amqplib" {
messageCount: number;
consumerCount: number;
}
interface PurgeQueue {
messageCount: number;
}
interface DeleteQueue {
messageCount: number;
}
@@ -100,6 +93,22 @@ declare module "amqplib" {
fields: Object;
properties: Object;
}
}
declare module "amqplib" {
import events = require("events");
import when = require("when");
import shared = require("amqplib/properties")
import Replies = shared.Replies;
import Options = shared.Options;
import Message = shared.Message;
interface Connection extends events.EventEmitter {
close(): when.Promise<void>;
createChannel(): when.Promise<Channel>;
createConfirmChannel(): when.Promise<Channel>;
}
interface Channel extends events.EventEmitter {
close(): when.Promise<void>;
@@ -108,7 +117,7 @@ declare module "amqplib" {
checkQueue(queue: string): when.Promise<Replies.AssertQueue>;
deleteQueue(queue: string, options?: Options.DeleteQueue): when.Promise<Replies.DeleteQueue>;
purgeQueue(queue: string): when.Promise<Replies.DeleteQueue>;
purgeQueue(queue: string): when.Promise<Replies.PurgeQueue>;
bindQueue(queue: string, source: string, pattern: string, args?: any): when.Promise<Replies.Empty>;
unbindQueue(queue: string, source: string, pattern: string, args?: any): when.Promise<Replies.Empty>;
@@ -142,3 +151,68 @@ declare module "amqplib" {
function connect(url: string, socketOptions?: any): when.Promise<Connection>;
}
declare module "amqplib/callback_api" {
import events = require("events");
import shared = require("amqplib/properties")
import Replies = shared.Replies;
import Options = shared.Options;
import Message = shared.Message;
interface Connection extends events.EventEmitter {
close(callback?: (err: any) => void): void;
createChannel(callback: (err: any, channel: Channel) => void): void;
createConfirmChannel(callback: (err: any, confirmChannel: ConfirmChannel) => void): void;
}
interface Channel extends events.EventEmitter {
close(callback: (err: any) => void): void;
assertQueue(queue?: string, options?: Options.AssertQueue, callback?: (err:any, ok: Replies.AssertQueue) => void): void;
checkQueue(queue: string, callback?: (err: any, ok: Replies.AssertQueue) => void): void;
deleteQueue(queue: string, options?: Options.DeleteQueue, callback?: (err:any, ok: Replies.DeleteQueue) => void): void;
purgeQueue(queue: string, callback?: (err:any, ok: Replies.PurgeQueue) => void): void;
bindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void;
unbindQueue(queue: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void;
assertExchange(exchange: string, type: string, options?: Options.AssertExchange, callback?: (err: any, ok: Replies.AssertExchange) => void): void;
checkExchange(exchange: string, callback?: (err: any, ok: Replies.Empty) => void): void;
deleteExchange(exchange: string, options?: Options.DeleteExchange, callback?: (err: any, ok: Replies.Empty) => void): void;
bindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void;
unbindExchange(destination: string, source: string, pattern: string, args?: any, callback?: (err: any, ok: Replies.Empty) => void): void;
publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish): boolean;
sendToQueue(queue: string, content: Buffer, options?: Options.Publish): boolean;
consume(queue: string, onMessage: (msg: Message) => any, options?: Options.Consume, callback?: (err: any, ok: Replies.Consume) => void): void;
cancel(consumerTag: string, callback?: (err: any, ok: Replies.Empty) => void): void;
get(queue: string, options?: Options.Get, callback?: (err: any, ok: Message | boolean) => void): void;
ack(message: Message, allUpTo?: boolean): void;
ackAll(): void;
nack(message: Message, allUpTo?: boolean, requeue?: boolean): void;
nackAll(requeue?: boolean): void;
reject(message: Message, requeue?: boolean): void;
prefetch(count: number, global?: boolean): void;
recover(callback?: (err: any, ok: Replies.Empty) => void): void;
}
interface ConfirmChannel extends Channel {
publish(exchange: string, routingKey: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean;
sendToQueue(queue: string, content: Buffer, options?: Options.Publish, callback?: (err: any, ok: Replies.Empty) => void): boolean;
waitForConfirms(callback?: (err: any) => void): void;
}
function connect(callback: (err: any, connection: Connection) => void): void;
function connect(url: string, callback: (err: any, connection: Connection) => void): void;
function connect(url: string, socketOptions: any, callback: (err: any, connection: Connection) => void): void;
}