Update kafka node (#18267)

* updating types for kafka-node

* fixed typos

* linter errors

* typo
This commit is contained in:
Bill Kim
2017-07-20 15:53:43 -07:00
committed by Wesley Wigham
parent e046dbc3bc
commit 2b0f00ff27
2 changed files with 29 additions and 17 deletions

View File

@@ -6,26 +6,27 @@
// # Classes
export class Client {
constructor(connectionString: string, clientId?: string, options?: ZKOptions, noBatchOptions?: AckBatchOptions, sslOptions?: any);
close(callback?: () => void): void;
topicExists(topics: string[], callback: (err?: TopicsNotExistError | any) => any): void;
close(cb?: () => void): void;
topicExists(topics: string[], cb: (error?: TopicsNotExistError | any) => any): void;
refreshMetadata(topics: string[], cb?: (error?: any) => any): void;
sendOffsetCommitV2Request(group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: () => void): void;
sendOffsetCommitV2Request(group: string, generationId: number, memberId: string, commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
}
export class KafkaClient extends Client {
constructor(options?: KafkaClientOptions);
on(eventName: "ready", cb: () => any): this;
on(eventName: "error", cb: (error: any) => any): this;
connect(): void;
}
export class Producer {
constructor(client: Client, options?: ProducerOptions, customPartitioner?: any);
constructor(client: Client, options?: ProducerOptions, customPartitioner?: CustomPartitioner);
on(eventName: "ready", cb: () => any): void;
on(eventName: "error", cb: (error: any) => any): void;
send(payloads: ProduceRequest[], cb: (error: any, data: any) => any): void;
createTopics(topics: string[], async: true, cb: (error?: any, data?: any) => any): void;
createTopics(topics: string[], async: false, cb: () => any): void;
createTopics(topics: string[], cb: (error?: any, data?: any) => any): void;
close(): void;
createTopics(topics: string[], async: boolean, cb: (error: any, data: any) => any): void;
createTopics(topics: string[], cb: (error: any, data: any) => any): void;
close(cb?: () => any): void;
}
export class HighLevelProducer extends Producer {
@@ -54,14 +55,16 @@ export class HighLevelConsumer {
client: Client;
on(eventName: "message", cb: (message: Message) => any): void;
on(eventName: "error" | "offsetOutOfRange", cb: (error: any) => any): void;
on(eventName: "rebalancing" | "rebalanced", cb: () => any): void;
addTopics(topics: string[] | Topic[], cb?: (error: any, added: string[] | Topic[]) => any): void;
removeTopics(topics: string | string[], cb: (error: any, removed: number) => any): void;
commit(cb: (error: any, data: any) => any): void;
commit(force: boolean, cb: (error: any, data: any) => any): void;
sendOffsetCommitRequest(commits: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
setOffset(topic: string, partition: number, offset: number): void;
pause(): void;
resume(): void;
close(force: boolean, cb: () => any): void;
close(force: boolean, cb: (error: any) => any): void;
close(cb: () => any): void;
}
@@ -69,11 +72,13 @@ export class ConsumerGroup extends HighLevelConsumer {
constructor(options: ConsumerGroupOptions, topics: string[] | string);
generationId: number;
memberId: string;
client: KafkaClient & Client;
}
export class Offset {
constructor(client: Client);
on(eventName: string, cb: (error?: any) => any): void;
on(eventName: "ready" | "connect", cb: () => any): void;
on(eventName: "error", cb: (error: any) => any): void;
fetch(payloads: OffsetRequest[], cb: (error: any, data: any) => any): void;
commit(groupId: string, payloads: OffsetCommitRequest[], cb: (error: any, data: any) => any): void;
fetchCommits(groupId: string, payloads: OffsetFetchRequest[], cb: (error: any, data: any) => any): void;
@@ -106,13 +111,13 @@ export interface KafkaClientOptions {
kafkaHost?: string;
connectTimeout?: number;
requestTimeout?: number;
authConnect?: boolean;
connectRetryOptions?: ConnectRetryOptions;
autoConnect?: boolean;
connectRetryOptions?: RetryOptions;
sslOptions?: any;
clientId?: string;
}
export interface ConnectRetryOptions {
export interface RetryOptions {
retries?: number;
factor?: number;
minTimeout?: number;
@@ -153,17 +158,21 @@ export interface ConsumerOptions {
export interface HighLevelConsumerOptions extends ConsumerOptions {
id?: string;
maxNumSegments?: number;
maxTickMessages?: number;
rebalanceRetry?: RetryOptions;
}
export interface CustomPartitionAssignmentProtocol {
name: string;
version: number;
userData: {};
assign(topicPattern: any, groupMembers: any, callback: (error: any, result: any) => void): void;
assign(topicPattern: any, groupMembers: any, cb: (error: any, result: any) => void): void;
}
export interface ConsumerGroupOptions {
host: string;
kafkaHost?: string;
host?: string;
zk?: ZKOptions;
batch?: AckBatchOptions;
ssl?: boolean;
@@ -172,13 +181,14 @@ export interface ConsumerGroupOptions {
sessionTimeout?: number;
protocol?: Array<"roundrobin" | "range" | CustomPartitionAssignmentProtocol>;
fromOffset?: "earliest" | "latest" | "none";
outOfRangeOffset?: "earliest" | "latest" | "none";
migrateHLC?: boolean;
migrateRolling?: boolean;
autoCommit?: boolean;
autoCommitIntervalMs?: number;
fetchMaxWaitMs?: number;
paused?: boolean;
maxNumSegments?: number;
maxTickMessages?: number;
fetchMinBytes?: number;
fetchMaxBytes?: number;
retries?: number;
@@ -217,3 +227,5 @@ export interface OffsetFetchRequest {
export class TopicsNotExistError extends Error {
topics: string | string[];
}
export type CustomPartitioner = (partitions: number[], key: any) => number;

View File

@@ -28,7 +28,7 @@ const optionsKafkaClient = new kafka.KafkaClient({
kafkaHost: 'localhost:2181',
connectTimeout: 1000,
requestTimeout: 1000,
authConnect: true,
autoConnect: true,
sslOptions: {},
clientId: "client id",
connectRetryOptions: {