fix(highland): correct the definition of consume

The record variable should contain the type of Highland.Nil for it also takes
the marker as a sign of the end of stream.

See http://highlandjs.org/#consume
This commit is contained in:
Alvis HT Tang
2018-02-22 19:12:00 +00:00
parent aeb265279f
commit 81db85e6ee
2 changed files with 49 additions and 54 deletions

View File

@@ -1,5 +1,3 @@
// Note: try to maintain the ordering and separators
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -49,18 +47,18 @@ var anyArrStream: Highland.Stream<any[]>;
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
interface Foo {
foo(): string;
foo(): string;
}
interface Bar {
bar(): string;
bar(): string;
}
interface StrFooArrMap {
[key:string]: Foo[];
[key: string]: Foo[];
}
interface StrBarArrMap {
[key:string]: Bar[];
[key: string]: Bar[];
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -128,9 +126,9 @@ fooStream = streamRedirect.to;
fooStream = _<Foo>();
fooStream = _(fooArr);
fooStream = _<Foo>((push, next) => {
push(null, foo);
push(err);
next();
push(null, foo);
push(err);
next();
});
fooStream = _(fooStream);
@@ -173,16 +171,23 @@ fooArrStream = fooStream.collect();
fooStream = fooStream.compact();
barStream = fooStream.consume((err: Error, x: Foo, push: (err: Error, value?: Bar) => void, next: () => void) => {
push(err);
push(null, bar);
next();
});
barStream = fooStream.consume(
(
err: Error,
x: Foo | Highland.Nil,
push: (err: Error, value?: Bar | Highland.Nil) => void,
next: () => void
) => {
push(err);
push(null, bar);
next();
}
);
barStream = fooStream.consume<Bar>((err, x, push, next) => {
push(err);
push(null, bar);
next();
push(err);
push(null, bar);
next();
});
fooStream = fooStream.debounce(num);
@@ -191,30 +196,32 @@ fooStream = fooStream.doto((x: Foo) => {});
fooStream = fooStream.drop(2);
fooStream = fooStream.errors((err: Error, push: (e: Error, x?: Foo) => void) => {
push(err);
push(null, x);
push(null, foo);
});
fooStream = fooStream.errors(
(err: Error, push: (e: Error, x?: Foo) => void) => {
push(err);
push(null, x);
push(null, foo);
}
);
fooStream = fooStream.errors((err, push) => {
push(err);
push(null, x);
push(null, foo);
push(err);
push(null, x);
push(null, foo);
});
fooStream = fooStream.filter((x: Foo) => {
return bool;
return bool;
});
fooStream = fooStream.find((x: Foo) => {
return bool;
return bool;
});
fooStream = fooStream.findWhere(obj);
strFooArrMapStream = fooStream.group((x: Foo) => {
return str;
return str;
});
strFooArrMapStream = fooStream.group(str);
@@ -227,7 +234,7 @@ fooStream = fooStream.last();
fooStream = fooStream.latest();
barStream = fooStream.map((x: Foo) => {
return bar;
return bar;
});
barStream = fooStream.pluck<Bar>(str);
@@ -235,26 +242,24 @@ barStream = fooStream.pluck<Bar>(str);
fooStream = fooStream.ratelimit(3, 1000);
barStream = fooStream.reduce(bar, (memo: Bar, x: Foo) => {
return memo;
return memo;
});
barStream = fooStream.reduce1(bar, (memo: Bar, x: Foo) => {
return memo;
return memo;
});
fooStream = fooStream.reject((x: Foo) => {
return bool;
return bool;
});
barStream = fooStream.scan(bar, (memo: Bar, x: Foo) => {
return memo;
return memo;
});
//missing scan1
fooStream = fooStream.stopOnError((e: Error) => {
});
fooStream = fooStream.stopOnError((e: Error) => {});
fooStream = fooStream.take(num);
@@ -264,7 +269,6 @@ fooStream = fooStream.throttle(num);
fooStream = fooStream.where(obj);
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// HIGHER-ORDER STREAMS
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@@ -274,15 +278,15 @@ fooStream = fooStream.concat(fooStream);
fooStream = fooStream.concat(fooArr);
fooStream = fooStream.flatFilter((x: Foo) => {
return boolStream;
return boolStream;
});
barStream = fooStream.flatMap((x: Foo) => {
return barStream;
return barStream;
});
barStream = fooStream.flatMap((x: Foo) => {
return bar;
return bar;
});
barStream = fooStream.flatten<Bar>();
@@ -315,24 +319,16 @@ fooStream.apply(func);
fooStream.done(() => {});
fooStream.each((x: Foo) => {
});
fooStream.each((x: Foo) => {});
fooStream = fooStream.pipe(fooStream);
barStream = fooStream.pipe(barStream);
fooStream.pull((err: Error, x: Foo) => {
fooStream.pull((err: Error, x: Foo) => {});
});
fooStream.pull((err, x) => {});
fooStream.pull((err, x) => {
});
fooStream.toArray((arr: Foo[]) => {
});
fooStream.toArray((arr: Foo[]) => {});
fooStream.toCallback((err: Error, x: Foo) => {});
fooStream.toCallback((err: Error) => {});
@@ -360,7 +356,6 @@ f = _.wrapCallback(func, num);
f = _.wrapCallback(func, strArr);
f = _.wrapCallback(func, fn);
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
// OBJECTS
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

View File

@@ -563,7 +563,7 @@ declare namespace Highland {
* @param {Function} f - the function to handle errors and values
* @api public
*/
consume<U>(f: (err: Error, x: R, push: (err: Error | null, value?: U | Highland.Nil) => void, next: () => void) => void): Stream<U>;
consume<U>(f: (err: Error, x: R | Highland.Nil, push: (err: Error | null, value?: U | Highland.Nil) => void, next: () => void) => void): Stream<U>;
/**
* Holds off pushing data events downstream until there has been no more