clean up imports; add pipeline processor trait impls to the block downloader and block parser so we can chain them together

This commit is contained in:
Jude Nelson
2019-01-27 18:18:31 -05:00
parent 90a2af22e1
commit b61781f258

View File

@@ -17,56 +17,49 @@
along with Blockstack. If not, see <http://www.gnu.org/licenses/>.
*/
use std::fs;
use std::cmp;
use std::sync::Arc;
use std::sync::mpsc::{SyncSender, Receiver, sync_channel};
use std::thread;
use std::thread::JoinHandle;
use std::ops::Deref;
use bitcoin::blockdata::block::{LoneBlockHeader, BlockHeader, Block};
use bitcoin::blockdata::transaction::{Transaction, TxIn, TxOut};
use bitcoin::blockdata::block::{LoneBlockHeader, Block};
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::blockdata::opcodes::All as btc_opcodes;
use bitcoin::blockdata::opcodes::Class;
use bitcoin::blockdata::script::{Script, Instruction, Instructions};
use bitcoin::blockdata::script::{Script, Instruction};
use bitcoin::network::encodable::{ConsensusEncodable, ConsensusDecodable, VarInt};
use bitcoin::network::serialize::{RawEncoder, RawDecoder, serialize, deserialize, BitcoinHash};
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::message as btc_message;
use bitcoin::util::hash::{Sha256dHash, bitcoin_merkle_root};
use bitcoin::util::uint::Uint256;
use bitcoin::util::hash::bitcoin_merkle_root;
use burnchains::bitcoin::indexer::BitcoinIndexer;
use burnchains::bitcoin::Error as btc_error;
use burnchains::bitcoin::spv::SpvClient;
use burnchains::bitcoin::messages::BitcoinMessageHandler;
use burnchains::bitcoin::network::PeerMessage;
use burnchains::bitcoin::PeerMessage;
use burnchains::bitcoin::bits;
use burnchains::bitcoin::keys::BitcoinPublicKey;
use burnchains::bitcoin::address::{BitcoinAddressType, BitcoinAddress};
use burnchains::bitcoin::address::BitcoinAddress;
use burnchains::bitcoin::BitcoinNetworkType;
use burnchains::{
BurnchainBlock,
BurnchainTxInput,
BurnchainTxOutput,
BurnchainTransaction,
PublicKey,
BurnchainTransaction,
Txid,
BlockHash,
BurnchainHeaderHash,
MagicBytes,
Hash160,
MAGIC_BYTES_LENGTH
};
use util::pipeline::PipelineProcessor;
use util::hash::to_hex;
// IPC messages between threads
#[derive(Debug, Clone, PartialEq)]
pub struct IPCHeader {
pub height: u64,
pub header: LoneBlockHeader
}
#[derive(Debug, Clone, PartialEq)]
pub struct IPCBlock {
pub height: u64,
pub header: LoneBlockHeader,
@@ -74,62 +67,41 @@ pub struct IPCBlock {
}
pub struct BitcoinBlockDownloader {
headers_path: String,
cur_request: Option<Arc<IPCHeader>>,
network_id: BitcoinNetworkType,
pub chan_in: Option<Receiver<Arc<IPCHeader>>>,
pub chan_out: Option<SyncSender<Arc<IPCBlock>>>,
pub thread: Option<JoinHandle<()>>
cur_request: IPCHeader,
cur_block: Option<IPCBlock>
}
pub struct BitcoinBlockParser {
network_id: BitcoinNetworkType,
magic_bytes: MagicBytes,
pub chan_in: Option<Receiver<Arc<IPCBlock>>>,
pub chan_out: Option<SyncSender<Arc<BurnchainBlock<BitcoinAddress, BitcoinPublicKey>>>>,
pub thread: Option<JoinHandle<()>>
magic_bytes: MagicBytes
}
impl BitcoinBlockDownloader {
pub fn new(headers_path: &str, start_block: u64, end_block: u64, network_id: BitcoinNetworkType) -> BitcoinBlockDownloader {
pub fn new(block_height: u64, block_header: &LoneBlockHeader) -> BitcoinBlockDownloader {
BitcoinBlockDownloader {
headers_path: headers_path.to_owned(),
cur_request: None,
network_id: network_id,
chan_in: None,
chan_out: None,
thread: None
cur_request: IPCHeader {
height: block_height,
header: block_header.clone()
},
cur_block: None
}
}
// TODO: connection methods and thread start
// TODO: receive a block and send it off
/// Go get all the blocks.
/// keep trying forever.
pub fn run(&mut self, indexer: &mut BitcoinIndexer) -> Result<(), btc_error> {
return indexer.peer_communicate(self);
pub fn run(&mut self, indexer: &mut BitcoinIndexer) -> Result<IPCBlock, btc_error> {
indexer.peer_communicate(self)?;
assert!(self.cur_block.is_some());
let ipc_block = self.cur_block.take().unwrap();
Ok(ipc_block)
}
}
/// Ask for the next block height to download.
/// Remember which request we're on.
fn request_next_block(&mut self, indexer: &mut BitcoinIndexer) -> Result<bool, btc_error> {
let ipc_header =
match self.chan_in {
Some(ref chan) => {
chan.recv()
.map_err(|_e| btc_error::PipelineError)
},
None => Err(btc_error::PipelineError)
}?;
let res = indexer.send_getblocks(&vec![ipc_header.header.header.bitcoin_hash()])
.and_then(|_r| Ok(true));
self.cur_request = Some(ipc_header);
res
impl PipelineProcessor<IPCHeader, IPCBlock, BitcoinIndexer> for BitcoinBlockDownloader {
fn process(&mut self, ipc_header: &IPCHeader, indexer: &mut BitcoinIndexer) -> Result<IPCBlock, String> {
self.cur_request = (*ipc_header).clone();
self.cur_block = None;
self.run(indexer)
.map_err(|e| format!("Failed to download {} ({})", ipc_header.height, to_hex(ipc_header.header.header.bitcoin_hash().as_bytes())))
}
}
@@ -138,62 +110,56 @@ impl BitcoinMessageHandler for BitcoinBlockDownloader {
/// Trait message handler
/// initiate the conversation with the bitcoin peer
fn begin_session(&mut self, indexer: &mut BitcoinIndexer) -> Result<bool, btc_error> {
// sanity check
fs::metadata(&self.headers_path)
.map_err(btc_error::FilesystemError)?;
// ask for the block
self.request_next_block(indexer)
let block_hash = self.cur_request.header.header.bitcoin_hash().clone();
indexer.send_getblocks(&vec![block_hash])
.and_then(|_r| Ok(true))
}
/// Trait message handler
/// Take headers, validate them, and ask for more
fn handle_message(&mut self, indexer: &mut BitcoinIndexer, msg: &PeerMessage) -> Result<bool, btc_error> {
/// Wait for a block to arrive that matches self.cur_request
fn handle_message(&mut self, indexer: &mut BitcoinIndexer, msg: PeerMessage) -> Result<bool, btc_error> {
// send to our consumer thread for parsing
let mut ask_next = false;
if self.cur_block.is_some() {
debug!("Already have a block");
return Ok(false);
}
let height;
let header;
let block_hash;
match msg.deref() {
btc_message::NetworkMessage::Block(ref block) => {
match self.cur_request {
Some(ref ipc_header) => {
debug!("Got block {}: {}", ipc_header.height, block.bitcoin_hash());
// forward block to parser
let ipc_block = Arc::new(IPCBlock {
height: ipc_header.height,
header: ipc_header.header.clone(),
block: msg.clone()
});
// send off to parser
match self.chan_out {
Some(ref chan) => {
chan.send(ipc_block)
.map_err(|_e| btc_error::PipelineError)?;
}
None => {}
};
// get the next-requested block
ask_next = true;
},
None => {
debug!("No outstanding block request");
return Ok(false);
}
btc_message::NetworkMessage::Block(block) => {
// make sure this block matches
if !BitcoinBlockParser::check_block(&block, &self.cur_request.header) {
debug!("Requested block {}, got block {}", &to_hex(self.cur_request.header.header.bitcoin_hash().as_bytes()), &to_hex(block.bitcoin_hash().as_bytes()));
// try again
indexer.send_getblocks(&vec![self.cur_request.header.header.bitcoin_hash()])?;
return Ok(true);
}
// got valid data!
height = self.cur_request.height;
header = self.cur_request.header.clone();
block_hash = self.cur_request.header.header.bitcoin_hash();
},
_ => {
return Err(btc_error::UnhandledMessage);
return Err(btc_error::UnhandledMessage(msg.clone()));
}
}
if ask_next {
self.request_next_block(indexer);
Ok(true)
}
else {
Ok(false)
}
debug!("Got block {}: {}", height, &to_hex(block_hash.as_bytes()));
// store response. we're done.
let ipc_block = IPCBlock {
height: height,
header: header,
block: msg
};
self.cur_block = Some(ipc_block);
Ok(false)
}
}
@@ -202,13 +168,29 @@ impl BitcoinBlockParser {
pub fn new(network_id: BitcoinNetworkType, magic_bytes: MagicBytes) -> BitcoinBlockParser {
BitcoinBlockParser {
network_id: network_id,
magic_bytes: magic_bytes.clone(),
chan_in: None,
chan_out: None,
thread: None
magic_bytes: magic_bytes.clone()
}
}
/// Verify that a block matches a header
pub fn check_block(block: &Block, header: &LoneBlockHeader) -> bool {
if header.header.bitcoin_hash() != block.bitcoin_hash() {
return false;
}
// block transactions must match header merkle root
let tx_merkle_root = bitcoin_merkle_root(block.txdata
.iter()
.map(|ref tx| { tx.txid() })
.collect());
if block.header.merkle_root != tx_merkle_root {
return false;
}
true
}
/// Parse the data output to get a byte payload
fn parse_data(&self, data_output: &Script) -> Option<(u8, Vec<u8>)> {
if !data_output.is_op_return() {
@@ -362,7 +344,7 @@ impl BitcoinBlockParser {
BurnchainBlock {
block_height: block_height,
block_hash: BlockHash::from_vec_be(&block.bitcoin_hash().as_bytes().to_vec()).unwrap(), // block hashes are little-endian in Blockstack, and this *should* panic if it fails
block_hash: BurnchainHeaderHash::from_bitcoin_hash(&block.bitcoin_hash()),
txs: accepted_txs
}
}
@@ -374,28 +356,33 @@ impl BitcoinBlockParser {
/// (in which case, we should re-start the conversation with the peer and try again).
pub fn process_block(&self, block: &Block, header: &LoneBlockHeader, height: u64) -> Option<BurnchainBlock<BitcoinAddress, BitcoinPublicKey>> {
// block header contents must match
if header.header.bitcoin_hash() != block.bitcoin_hash() {
if !BitcoinBlockParser::check_block(block, header) {
error!("Expected block {} does not match received block {}", header.header.bitcoin_hash(), block.bitcoin_hash());
return None;
}
// block transactions must match header merkle root
let tx_merkle_root = bitcoin_merkle_root(block.txdata
.iter()
.map(|ref tx| { tx.txid() })
.collect());
if block.header.merkle_root != tx_merkle_root {
error!("Expected block {} merkle root {}, got {}", block.bitcoin_hash(), block.header.merkle_root, tx_merkle_root);
return None;
}
// parse it
let burn_block = self.parse_block(&block, height);
Some(burn_block)
}
}
impl PipelineProcessor<IPCBlock, BurnchainBlock<BitcoinAddress, BitcoinPublicKey>, Option<u64>> for BitcoinBlockParser {
fn process(&mut self, ipc_block: &IPCBlock, ignored: &mut Option<u64>) -> Result<BurnchainBlock<BitcoinAddress, BitcoinPublicKey>, String> {
match ipc_block.block.deref() {
btc_message::NetworkMessage::Block(ref block) => {
let burn_block_opt = self.process_block(&block, &ipc_block.header, ipc_block.height);
assert!(burn_block_opt.is_some()); // we shouldn't get here if the burn block was invalid
Ok(burn_block_opt.unwrap())
},
_ => {
panic!("Did not receive a Block message"); // should never happen
}
}
}
}
#[cfg(test)]
mod tests {
@@ -412,12 +399,9 @@ mod tests {
BurnchainTxInput,
BurnchainTxOutput,
BurnchainTransaction,
PublicKey,
Txid,
BlockHash,
BurnchainHeaderHash,
MagicBytes,
Hash160,
MAGIC_BYTES_LENGTH,
BurnchainInputType,
};
@@ -470,11 +454,11 @@ mod tests {
Txid(ret)
}
fn to_block_hash(inp: &Vec<u8>) -> BlockHash {
fn to_block_hash(inp: &Vec<u8>) -> BurnchainHeaderHash {
let mut ret = [0; 32];
let bytes = &inp[..inp.len()];
ret.copy_from_slice(bytes);
BlockHash(ret)
BurnchainHeaderHash(ret)
}
#[test]