Merge pull request #18169 from amiram/kafka-node-new-client

Kafka node new client
This commit is contained in:
Wesley Wigham
2017-07-18 14:20:04 -07:00
committed by GitHub
4 changed files with 319 additions and 193 deletions

View File

@@ -1,99 +1,128 @@
// Type definitions for kafka-node 1.3.3
// Type definitions for kafka-node 2.0
// Project: https://github.com/SOHU-Co/kafka-node/
// Definitions by: Daniel Imrie-Situnayake <https://github.com/dansitu/>, Bill <https://github.com/bkim54>, Michael Haan <https://github.com/sfrooster>
// Definitions by: Daniel Imrie-Situnayake <https://github.com/dansitu/>, Bill <https://github.com/bkim54>, Michael Haan <https://github.com/sfrooster>, Amiram Korach <https://github.com/amiram>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped
// # Classes
export declare class Client {
export class Client {
constructor(connectionString: string, clientId?: string, options?: ZKOptions, noBatchOptions?: AckBatchOptions, sslOptions?: any);
close(callback?: Function): void;
topicExists(topics: Array<string>, callback: Function): void;
refreshMetadata(topics: Array<string>, cb?: (error: any, data: any) => any): void;
close(cb: (error: any) => any): void;
sendOffsetCommitV2Request(group: string, generationId: number, memberId: string, commits: Array<OffsetCommitRequest>, cb: (error: any, data: any) => any): void;
close(callback?: () => void): void;
topicExists(topics: string[], callback: (err?: TopicsNotExistError | any) => any): void;
refreshMetadata(topics: string[], cb?: (error?: any) => any): void;
sendOffsetCommitV2Request(group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: () => void): void;
}
export declare class Producer {
constructor(client: Client, options?: any, customPartitioner?: any);
on(eventName: string, cb: () => any): void;
on(eventName: string, cb: (error: any) => any): void;
send(payloads: Array<ProduceRequest>, cb: (error: any, data: any) => any): void;
createTopics(topics: Array<string>, async: boolean, cb?: (error: any, data: any) => any): void;
close(cb: (error: any) => any): void;
export class KafkaClient extends Client {
constructor(options?: KafkaClientOptions);
connect(): void;
}
export declare class HighLevelProducer {
constructor(client: Client, options?: any, customPartitioner?: any);
on(eventName: string, cb: () => any): void;
on(eventName: string, cb: (error: any) => any): void;
send(payloads: Array<ProduceRequest>, cb: (error: any, data: any) => any): void;
createTopics(topics: Array<string>, async: boolean, cb?: (error: any, data: any) => any): void;
close(cb: (error: any) => any): void;
export class Producer {
constructor(client: Client, options?: ProducerOptions, customPartitioner?: any);
on(eventName: "ready", cb: () => any): void;
on(eventName: "error", cb: (error: any) => any): void;
send(payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
createTopics(topics: string[], async: true, cb: (error?: any, data?: any) => any): void;
createTopics(topics: string[], async: false, cb: () => any): void;
createTopics(topics: string[], cb: (error?: any, data?: any) => any): void;
close(): void;
}
export declare class Consumer {
constructor(client: Client, fetchRequests: Array<OffsetFetchRequest>, options: ConsumerOptions);
export class HighLevelProducer extends Producer {
}
export class Consumer {
constructor(client: Client, fetchRequests: Array<OffsetFetchRequest | string>, options: ConsumerOptions);
client: Client;
on(eventName: string, cb: (message: string) => any): void;
on(eventName: string, cb: (error: any) => any): void;
addTopics(topics: Array<string>, cb: (error: any, added: boolean) => any): void;
addTopics(topics: Array<Topic>, cb: (error: any, added: boolean) => any, fromOffset: boolean): void;
removeTopics(topics: Array<string>, cb: (error: any, removed: boolean) => any): void;
on(eventName: "message", cb: (message: Message) => any): void;
on(eventName: "error" | "offsetOutOfRange", cb: (error: any) => any): void;
addTopics(topics: string[] | Topic[], cb: (error: any, added: string[] | Topic[]) => any, fromOffset?: boolean): void;
removeTopics(topics: string | string[], cb: (error: any, removed: number) => any): void;
commit(cb: (error: any, data: any) => any): void;
commit(force: boolean, cb: (error: any, data: any) => any): void;
setOffset(topic: string, partition: number, offset: number): void;
pause(): void;
resume(): void;
pauseTopics(topics: Array<any> /* Array<string|Topic> */): void;
resumeTopics(topics: Array<any> /* Array<string|Topic> */): void;
pauseTopics(topics: any[] /* Array<string|Topic> */): void;
resumeTopics(topics: any[] /* Array<string|Topic> */): void;
close(force: boolean, cb: () => any): void;
close(cb: () => any): void;
}
export declare class HighLevelConsumer {
constructor(client: Client, payloads: Array<Topic>, options: ConsumerOptions);
export class HighLevelConsumer {
constructor(client: Client, payloads: Topic[], options: HighLevelConsumerOptions);
client: Client;
on(eventName: string, cb: (message: string) => any): void;
on(eventName: string, cb: (error: any) => any): void;
addTopics(topics: Array<string>, cb: (error: any, added: boolean) => any): void;
addTopics(topics: Array<Topic>, cb: (error: any, added: boolean) => any, fromOffset: boolean): void;
removeTopics(topics: Array<string>, cb: (error: any, removed: boolean) => any): void;
on(eventName: "message", cb: (message: Message) => any): void;
on(eventName: "error" | "offsetOutOfRange", cb: (error: any) => any): void;
addTopics(topics: string[] | Topic[], cb?: (error: any, added: string[] | Topic[]) => any): void;
removeTopics(topics: string | string[], cb: (error: any, removed: number) => any): void;
commit(cb: (error: any, data: any) => any): void;
commit(force: boolean, cb: (error: any, data: any) => any): void;
setOffset(topic: string, partition: number, offset: number): void;
pause(): void;
resume(): void;
pauseTopics(topics: Array<any> /* Array<string|Topic> */): void;
resumeTopics(topics: Array<any> /* Array<string|Topic> */): void;
close(force: boolean, cb: () => any): void;
close(cb: () => any): void;
}
export declare class ConsumerGroup extends HighLevelConsumer {
constructor(options: ConsumerGroupOptions, topics: string[]);
on(eventName: string, cb: (message: string) => any): void;
on(eventName: string, cb: (error: any) => any): void;
close(force: boolean, cb: (error: any) => any): void;
export class ConsumerGroup extends HighLevelConsumer {
constructor(options: ConsumerGroupOptions, topics: string[] | string);
generationId: number;
memberId: string;
}
export declare class Offset {
export class Offset {
constructor(client: Client);
on(eventName: string, cb: () => any): void;
fetch(payloads: Array<OffsetRequest>, cb: (error: any, data: any) => any): void;
commit(groupId: string, payloads: Array<OffsetCommitRequest>, cb: (error: any, data: any) => any): void;
fetchCommits(groupId: string, payloads: Array<OffsetFetchRequest>, cb: (error: any, data: any) => any): void;
fetchLatestOffsets(topics: Array<string>, cb: (error: any, data: any) => any): void;
fetchEarliestOffsets(topics: Array<string>, cb: (error: any, data: any) => any): void;
on(eventName: string, cb: (error: any) => any): void;
on(eventName: string, cb: (error?: any) => any): void;
fetch(payloads: OffsetRequest[], cb: (error: any, data: any) => any): void;
commit(groupId: string, payloads: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
fetchCommits(groupId: string, payloads: OffsetFetchRequest[], cb: (error: any, data: any) => any): void;
fetchLatestOffsets(topics: string[], cb: (error: any, data: any) => any): void;
fetchEarliestOffsets(topics: string[], cb: (error: any, data: any) => any): void;
}
export declare class KeyedMessage {
export class KeyedMessage {
constructor(key: string, message: string);
}
// # Interfaces
export interface Message {
topic: string;
value: string;
offset?: number;
partition?: number;
highWaterOffset?: number;
key?: number;
}
export interface ProducerOptions {
requireAcks?: number;
ackTimeoutMs?: number;
partitionerType?: number;
}
export interface KafkaClientOptions {
kafkaHost?: string;
connectTimeout?: number;
requestTimeout?: number;
authConnect?: boolean;
connectRetryOptions?: ConnectRetryOptions;
sslOptions?: any;
clientId?: string;
}
export interface ConnectRetryOptions {
retries?: number;
factor?: number;
minTimeout?: number;
maxTimeout?: number;
randomize?: boolean;
}
export interface AckBatchOptions {
noAckBatchSize: number | null,
noAckBatchAge: number | null
noAckBatchSize: number | null;
noAckBatchAge: number | null;
}
export interface ZKOptions {
@@ -104,7 +133,7 @@ export interface ZKOptions {
export interface ProduceRequest {
topic: string;
messages: any; // Array<string> | Array<KeyedMessage> | string | KeyedMessage
messages: any; // string[] | Array<KeyedMessage> | string | KeyedMessage
key?: any;
partition?: number;
attributes?: number;
@@ -112,7 +141,6 @@ export interface ProduceRequest {
export interface ConsumerOptions {
groupId?: string;
id?: string;
autoCommit?: boolean;
autoCommitIntervalMs?: number;
fetchMaxWaitMs?: number;
@@ -120,13 +148,18 @@ export interface ConsumerOptions {
fetchMaxBytes?: number;
fromOffset?: boolean;
encoding?: string;
keyEncoding?: string;
}
export interface HighLevelConsumerOptions extends ConsumerOptions {
id?: string;
}
export interface CustomPartitionAssignmentProtocol {
name: string;
version: number;
userData: {};
assign: (topicPattern: any, groupMembers: any, callback: (error: any, result: any) => void) => void;
assign(topicPattern: any, groupMembers: any, callback: (error: any, result: any) => void): void;
}
export interface ConsumerGroupOptions {
@@ -134,7 +167,7 @@ export interface ConsumerGroupOptions {
zk?: ZKOptions;
batch?: AckBatchOptions;
ssl?: boolean;
id: string;
id?: string;
groupId: string;
sessionTimeout?: number;
protocol?: Array<"roundrobin" | "range" | CustomPartitionAssignmentProtocol>;
@@ -180,3 +213,7 @@ export interface OffsetFetchRequest {
partition?: number;
offset?: number;
}
export class TopicsNotExistError extends Error {
topics: string | string[];
}

View File

@@ -1,181 +1,267 @@
import kafka = require('kafka-node');
var basicClient = new kafka.Client('localhost:2181/', 'sendMessage');
const basicClient = new kafka.Client('localhost:2181/', 'sendMessage');
var optionsClient = new kafka.Client('localhost:2181/', 'sendMessage', {
sessionTimeout: 30000,
spinDelay: 1000,
retries: 0
const optionsClient = new kafka.Client('localhost:2181/', 'sendMessage', {
sessionTimeout: 30000,
spinDelay: 1000,
retries: 0
}, {
noAckBatchSize: 1000,
noAckBatchAge: 1000 * 10
}, {
rejectUnauthorized: false
});
optionsClient.topicExists(['topic'], (error: any) => {
});
optionsClient.refreshMetadata(['topic'], (error: any) => {
});
optionsClient.close();
optionsClient.close(function(){});
var producer = new kafka.Producer(basicClient);
producer.on('error', function(error: Error){});
producer.on('ready', function(){
var messages = [{
topic: 'topicName',
messages: ['message body'],
partition: 0,
attributes: 2
}, {
topic: 'topicName',
messages: ['message body'],
partition: 0
}, {
topic: 'topicName',
messages: ['message body'],
attributes: 0
}, {
topic: 'topicName',
messages: ['message body']
}, {
topic: 'topicName',
messages: [new kafka.KeyedMessage('key', 'message')]
}];
producer.send(messages, function(err: Error){});
producer.send(messages, function(err: Error, data: Object){});
producer.createTopics(['t'], true, function (err: Error, data: Object) {});
producer.createTopics(['t'], false, function (err, data) {});
// producer.createTopics(['t'], function (err: Error, data: Object) {}); // Omitting middle argument is not possible in TS
optionsClient.sendOffsetCommitV2Request('group', 0, 'memberId', [], () => {
});
optionsClient.close(() => {
});
var highLevelProducer = new kafka.HighLevelProducer(basicClient);
highLevelProducer.on('error', function(error: Error){});
highLevelProducer.on('ready', function(){
var messages = [{
topic: 'topicName',
messages: ['message body'],
attributes: 2
}, {
topic: 'topicName',
messages: ['message body'],
partition: 0
}, {
topic: 'topicName',
messages: ['message body'],
attributes: 0
}, {
topic: 'topicName',
messages: ['message body']
}, {
topic: 'topicName',
messages: [new kafka.KeyedMessage('key', 'message')]
}];
producer.send(messages, function(err: Error){});
producer.send(messages, function(err: Error, data: Object){});
producer.createTopics(['t'], true, function (err: Error, data: Object) {});
producer.createTopics(['t'], false, function (err, data) {});
// producer.createTopics(['t'], function (err: Error, data: Object) {}); // Omitting middle argument is not possible in TS
const basicKafkaClient = new kafka.KafkaClient();
const optionsKafkaClient = new kafka.KafkaClient({
kafkaHost: 'localhost:2181',
connectTimeout: 1000,
requestTimeout: 1000,
authConnect: true,
sslOptions: {},
clientId: "client id",
connectRetryOptions: {
retries: 5,
factor: 0,
minTimeout: 1000,
maxTimeout: 1000,
randomize: true
}
});
var fetchRequests = [{ topic: 'awesome' }];
var consumer = new kafka.Consumer(basicClient, fetchRequests, {
groupId: 'abcde',
autoCommit: true
optionsKafkaClient.connect();
const optionsProducer = new kafka.Producer(basicClient, {
requireAcks: 0,
ackTimeoutMs: 0,
partitionerType: 0
});
consumer.on('error', function(error: Error){});
consumer.on('message', function(message){});
consumer.addTopics(['t1', 't2'], function (err, added) {});
consumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {}, true);
const producer = new kafka.Producer(basicClient);
producer.on('error', (error: Error) => {
});
producer.on('ready', () => {
const messages = [{
topic: 'topicName',
messages: ['message body'],
partition: 0,
attributes: 2
}, {
topic: 'topicName',
messages: ['message body'],
partition: 0
}, {
topic: 'topicName',
messages: ['message body'],
attributes: 0
}, {
topic: 'topicName',
messages: ['message body']
}, {
topic: 'topicName',
messages: [new kafka.KeyedMessage('key', 'message')]
}];
consumer.removeTopics(['t1', 't2'], function (err, removed) {});
producer.send(messages, (err: Error) => {
});
producer.send(messages, (err: Error, data: any) => {
});
consumer.commit(function (err, data) {});
producer.createTopics(['t'], true, (err: Error, data: any) => {
});
producer.createTopics(['t'], (err: Error, data: any) => {
});
producer.createTopics(['t'], false, () => {
});
producer.close();
});
const highLevelProducer = new kafka.HighLevelProducer(basicClient);
highLevelProducer.on('error', (error: Error) => {
});
highLevelProducer.on('ready', () => {
const messages = [{
topic: 'topicName',
messages: ['message body'],
attributes: 2
}, {
topic: 'topicName',
messages: ['message body'],
partition: 0
}, {
topic: 'topicName',
messages: ['message body'],
attributes: 0
}, {
topic: 'topicName',
messages: ['message body']
}, {
topic: 'topicName',
messages: [new kafka.KeyedMessage('key', 'message')]
}];
highLevelProducer.send(messages, (err: Error) => {
});
highLevelProducer.send(messages, (err: Error, data: any) => {
});
producer.createTopics(['t'], true, (err: Error, data: any) => {
});
producer.createTopics(['t'], (err: Error, data: any) => {
});
producer.createTopics(['t'], false, () => {
});
producer.close();
});
const fetchRequests = [{topic: 'awesome'}];
const consumer = new kafka.Consumer(basicClient, fetchRequests, {
groupId: 'abcde',
autoCommit: true
});
consumer.on('error', (error: Error) => {
});
consumer.on('offsetOutOfRange', (error: Error) => {
});
consumer.on('message', (message: kafka.Message) => {
const topic = message.topic;
const value = message.value;
const offset = message.offset;
const partition = message.partition;
const highWaterOffset = message.highWaterOffset;
const key = message.key;
});
consumer.addTopics(['t1', 't2'], (err: any, added: any) => {
});
consumer.addTopics([{topic: 't1', offset: 10}], (err: any, added: any) => {
}, true);
consumer.removeTopics(['t1', 't2'], (err: any, removed: number) => {
});
consumer.removeTopics('t2', (err: any, removed: number) => {
});
consumer.commit((err: any, data: any) => {
});
consumer.commit(true, (err: any, data: any) => {
});
consumer.setOffset('topic', 0, 0);
consumer.pause();
consumer.resume();
consumer.pauseTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
'topic1',
{topic: 'topic2', partition: 0}
]);
consumer.resumeTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
'topic1',
{topic: 'topic2', partition: 0}
]);
consumer.close(true, function () {});
var fetchRequests = [{ topic: 'awesome' }];
var hlConsumer = new kafka.HighLevelConsumer(basicClient, fetchRequests, {
groupId: 'abcde',
autoCommit: true
consumer.close(true, () => {
});
consumer.close(() => {
});
hlConsumer.on('error', function(error: Error){});
hlConsumer.on('message', function(message){});
hlConsumer.addTopics(['t1', 't2'], function (err, added) {});
hlConsumer.addTopics([{ topic: 't1', offset: 10 }], function (err, added) {}, true);
const fetchRequests1 = [{topic: 'awesome'}];
const hlConsumer = new kafka.HighLevelConsumer(basicClient, fetchRequests1, {
groupId: 'abcde',
autoCommit: true
});
hlConsumer.removeTopics(['t1', 't2'], function (err, removed) {});
hlConsumer.on('error', (error: Error) => {
});
hlConsumer.on('offsetOutOfRange', (error: Error) => {
});
hlConsumer.on('message', (message: kafka.Message) => {
const topic = message.topic;
const value = message.value;
const offset = message.offset;
const partition = message.partition;
const highWaterOffset = message.highWaterOffset;
const key = message.key;
});
hlConsumer.addTopics(['t1', 't2'], (err: any, added: any) => {
});
hlConsumer.addTopics([{topic: 't1', offset: 10}], (err: any, added: any) => {
});
hlConsumer.commit(function (err, data) {});
hlConsumer.removeTopics(['t1', 't2'], (err: any, removed: number) => {
});
hlConsumer.removeTopics('t2', (err: any, removed: number) => {
});
hlConsumer.commit((err: any, data: any) => {
});
hlConsumer.commit(true, (err: any, data: any) => {
});
hlConsumer.setOffset('topic', 0, 0);
hlConsumer.pause();
hlConsumer.resume();
hlConsumer.pauseTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
hlConsumer.resumeTopics([
'topic1',
{ topic: 'topic2', partition: 0 }
]);
hlConsumer.close(true, function () {});
hlConsumer.close(true, () => {
});
hlConsumer.close(() => {
});
var ackBatchOptions = {'noAckBatchSize': 1024, 'noAckBatchAge': 10};
var cgOptions: kafka.ConsumerGroupOptions = {
host: 'localhost:2181/',
batch: ackBatchOptions,
groupId: 'groupID',
id: 'consumerID',
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "latest",
migrateHLC: false,
migrateRolling: true
const ackBatchOptions = {noAckBatchSize: 1024, noAckBatchAge: 10};
const cgOptions: kafka.ConsumerGroupOptions = {
host: 'localhost:2181/',
batch: ackBatchOptions,
groupId: 'groupID',
id: 'consumerID',
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "latest",
migrateHLC: false,
migrateRolling: true
};
var consumerGroup = new kafka.ConsumerGroup( cgOptions, ['topic1']);
consumerGroup.on('error', (err) => {});
consumerGroup.on('message', (msg) => {});
consumerGroup.close(true, () => {});
const consumerGroup = new kafka.ConsumerGroup(cgOptions, ['topic1']);
consumerGroup.on('error', (err) => {
});
consumerGroup.on('message', (msg) => {
});
consumerGroup.close(true, () => {
});
var offset = new kafka.Offset(basicClient);
const offset = new kafka.Offset(basicClient);
offset.on('ready', function(){});
offset.on('ready', () => {
});
offset.fetch([
{ topic: 't', partition: 0, time: Date.now(), maxNum: 1 },
{ topic: 't' }
], function (err, data) { });
{topic: 't', partition: 0, time: Date.now(), maxNum: 1},
{topic: 't'}
], (err: any, data: any) => {
});
offset.commit('groupId', [
{ topic: 't', partition: 0, offset: 10 }
], function (err, data) { });
{topic: 't', partition: 0, offset: 10}
], (err, data) => {
});
offset.fetchCommits('groupId', [
{ topic: 't', partition: 0 }
], function (err, data) {});
{topic: 't', partition: 0}
], (err, data) => {
});
offset.fetchLatestOffsets(['t'], (err, offsets) => {})
offset.fetchEarliestOffsets(['t'], (err, offsets) => {})
offset.fetchLatestOffsets(['t'], (err, offsets) => {
});
offset.fetchEarliestOffsets(['t'], (err, offsets) => {
});

View File

@@ -6,7 +6,7 @@
],
"noImplicitAny": true,
"noImplicitThis": true,
"strictNullChecks": false,
"strictNullChecks": true,
"baseUrl": "../",
"typeRoots": [
"../"
@@ -19,4 +19,4 @@
"index.d.ts",
"kafka-node-tests.ts"
]
}
}

View File

@@ -0,0 +1,3 @@
{
"extends": "dtslint/dt.json"
}