mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-05-25 18:21:36 +08:00
this module is defunct and will not be used
This commit is contained in:
@@ -21,15 +21,18 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{SyncSender, Receiver};
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::thread;
|
||||
|
||||
use std::sync::mpsc::RecvError as recv_error;
|
||||
|
||||
use util::Error as util_error;
|
||||
|
||||
pub trait PipelineProcessor<I, O, C>
|
||||
pub trait PipelineProcessor<I, O>
|
||||
where
|
||||
I: Sync + Send,
|
||||
O: Sync + Send
|
||||
{
|
||||
fn process(&mut self, item: &I, context: &mut C) -> Result<O, String>;
|
||||
fn process(&mut self, item: &I) -> Result<O, String>;
|
||||
}
|
||||
|
||||
pub struct PipelineStage<I, O>
|
||||
@@ -41,7 +44,7 @@ where
|
||||
chan_out: Option<SyncSender<Arc<O>>>,
|
||||
|
||||
source: Option<Vec<I>>,
|
||||
sink: bool
|
||||
term: bool,
|
||||
}
|
||||
|
||||
impl<I, O> PipelineStage<I, O>
|
||||
@@ -54,7 +57,7 @@ where
|
||||
chan_in: None,
|
||||
chan_out: None,
|
||||
source: None,
|
||||
sink: false
|
||||
term: false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,16 +66,25 @@ where
|
||||
chan_in: None,
|
||||
chan_out: None,
|
||||
source: Some(inputs.into_iter().rev().collect()),
|
||||
sink: false
|
||||
term: false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_sink() -> PipelineStage<I, O> {
|
||||
pub fn new_terminus(input_channel: Receiver<Arc<I>>) -> PipelineStage<I, O> {
|
||||
PipelineStage {
|
||||
chan_in: None,
|
||||
chan_in: Some(input_channel),
|
||||
chan_out: None,
|
||||
source: None,
|
||||
sink: true
|
||||
term: true
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_sink(out_channel: SyncSender<Arc<O>>) -> PipelineStage<I, O> {
|
||||
PipelineStage {
|
||||
chan_in: None,
|
||||
chan_out: Some(out_channel),
|
||||
source: None,
|
||||
term: false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +94,7 @@ where
|
||||
match self.chan_in {
|
||||
Some(ref mut channel) => {
|
||||
channel.recv()
|
||||
.map_err(|e| util_error::RecvError(e))
|
||||
.map_err(|e| util_error::ChannelNotConnected)
|
||||
},
|
||||
None => {
|
||||
Err(util_error::ChannelNotConnected)
|
||||
@@ -104,7 +116,7 @@ where
|
||||
}
|
||||
|
||||
fn send(&mut self, output: O) -> Result<(), util_error> {
|
||||
if !self.sink {
|
||||
if !self.term {
|
||||
match self.chan_out {
|
||||
Some(ref mut channel) => {
|
||||
channel.send(Arc::new(output))
|
||||
@@ -119,16 +131,27 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next<P, C>(&mut self, mut p: P, ctx: &mut C) -> Result<(), util_error>
|
||||
// returns true if this method can be called again
|
||||
// returns false if the sender was disconnected
|
||||
// returns Err otherwise -- i.e. if the processor fails
|
||||
pub fn next<P>(&mut self, processor: &mut P) -> Result<bool, util_error>
|
||||
where
|
||||
P: PipelineProcessor<I, O, C>
|
||||
P: PipelineProcessor<I, O>
|
||||
{
|
||||
let arc_i = self.recv()?;
|
||||
let out = p.process(&arc_i, ctx)
|
||||
.map_err(|se| util_error::ProcessError(se))?;
|
||||
match self.recv() {
|
||||
Ok(arc_i) => {
|
||||
let out = processor.process(&arc_i)
|
||||
.map_err(|se| util_error::ProcessError(se))?;
|
||||
|
||||
self.send(out)?;
|
||||
Ok(())
|
||||
match self.send(out) {
|
||||
Ok(()) => Ok(true),
|
||||
Err(util_error::ChannelNotConnected) => Ok(false),
|
||||
Err(_e) => Err(_e)
|
||||
}
|
||||
},
|
||||
Err(util_error::ChannelNotConnected) | Err(util_error::ChannelSourceDrained) => Ok(false),
|
||||
Err(_e) => Err(_e)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect<A, T, B>(stage1: &mut PipelineStage<A, T>, stage2: &mut PipelineStage<T, B>, bufsize: usize)
|
||||
@@ -141,4 +164,28 @@ where
|
||||
stage1.chan_out = Some(sender);
|
||||
stage2.chan_in = Some(receiver);
|
||||
}
|
||||
|
||||
/*
|
||||
pub fn thread<'a, P>(stage: &'a mut PipelineStage<I, O>, processor: &'a mut P) -> thread::JoinHandle<Result<(), util_error>>
|
||||
where
|
||||
P: PipelineProcessor<I, O> + Send,
|
||||
{
|
||||
thread::spawn(|| {
|
||||
while true {
|
||||
match stage.next(processor) {
|
||||
Ok(true) => {
|
||||
continue;
|
||||
}
|
||||
Ok(false) => {
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user