revamp our story for dealing with multiple types of burn chains -- unfortunately, since rust lacks higher-kinded types, we have to have a lot of type parameters to encode all the specifics of blockchain indexing

This commit is contained in:
Jude Nelson
2019-01-29 18:44:22 -05:00
parent f88e11d9aa
commit 111ec0dfa2

View File

@@ -21,6 +21,7 @@ use std::net;
use std::sync::{Arc, Mutex, LockResult, MutexGuard};
use rand::{Rng, thread_rng};
use std::path::{PathBuf};
use std::thread;
use ini::Ini;
@@ -34,18 +35,24 @@ use burnchains::bitcoin::keys::BitcoinPublicKey;
use burnchains::bitcoin::address::BitcoinAddress;
use burnchains::bitcoin::BitcoinNetworkType;
use burnchains::bitcoin::blocks::{BitcoinBlockDownloader, BitcoinBlockParser};
use burnchains::bitcoin::PeerMessage;
use burnchains::BLOCKSTACK_MAGIC_MAINNET;
use burnchains::BurnchainHeaderHash;
use burnchains::BurnchainBlock;
use burnchains::BlockChannel;
use burnchains::Error as burnchain_error;
use burnchains::MagicBytes;
use burnchains::indexer::{BurnHeaderIPC, BurnBlockIPC};
use bitcoin::BitcoinHash;
use bitcoin::blockdata::block::LoneBlockHeader;
use chainstate::burn::db::burndb::BurnDB;
use util::pipeline::PipelineStage;
use util::Error as util_error;
use dirs;
pub const USER_AGENT: &'static str = "Blockstack Core v21";
@@ -60,6 +67,7 @@ pub const BITCOIN_REGTEST_NAME: &'static str = "regtest";
// maybe change this
pub const FIRST_BLOCK_MAINNET: u64 = 373601;
pub const FIRST_BLOCK_MAINNET_HASH: BurnchainHeaderHash = BurnchainHeaderHash([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x10, 0xc0, 0x28, 0x5c, 0x41, 0x74, 0x93, 0xc7, 0x47, 0x69, 0xfe, 0x0c, 0x7c, 0x5a, 0xee, 0x84, 0xf6, 0x36, 0x7e, 0x48, 0x6a, 0xcb, 0x5a]);
// batch size for searching for a reorg
const REORG_BATCH_SIZE: u64 = 200;
@@ -80,7 +88,7 @@ pub fn network_id_to_bytes(network_id: BitcoinNetworkType) -> u32 {
}
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct BitcoinIndexerConfig {
// config fields
pub peer_host: String,
@@ -251,6 +259,13 @@ impl BitcoinIndexer {
})
}
pub fn dup(&self) -> BitcoinIndexer {
BitcoinIndexer {
config: self.config.clone(),
runtime: BitcoinIndexerRuntime::new(self.runtime.network_id)
}
}
/// (re)connect to our configured network peer.
/// Sets self.runtime.sock to a new socket referring to our configured
/// Bitcoin peer. If we fail to connect, this method sets the socket
@@ -394,7 +409,7 @@ impl BitcoinIndexer {
/// If the range falls of the end of the headers file, then the returned array will be
/// truncated to not include them (note that this method can return an empty list of the
/// start_block is off the end of the file).
pub fn read_headers(&self, headers_path: &String, start_block: u64, end_block: u64) -> Result<Vec<LoneBlockHeader>, btc_error> {
pub fn read_spv_headers(&self, headers_path: &String, start_block: u64, end_block: u64) -> Result<Vec<LoneBlockHeader>, btc_error> {
let mut headers = vec![];
for block_height in start_block..end_block {
let header_opt = SpvClient::read_block_header(headers_path, block_height)?;
@@ -410,15 +425,14 @@ impl BitcoinIndexer {
Ok(headers)
}
/// Search for a bitcoin reorg. Return the list of *new* header hashes and the offset into the
/// canonical headers where the reorg starts.
pub fn find_bitcoin_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<(u64, Vec<BurnchainHeaderHash>), btc_error> {
/// Search for a bitcoin reorg. Return the offset into the canonical bitcoin headers where
/// the reorg starts.
pub fn find_bitcoin_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<u64, btc_error> {
let mut reorg_headers_pathbuf = PathBuf::from(&headers_path);
reorg_headers_pathbuf.push(".reorg");
let reorg_headers_path = reorg_headers_pathbuf.to_str().unwrap().to_string();
let mut new_headers_reversed = vec![];
let mut new_tip = 0;
let mut found = false;
@@ -449,13 +463,13 @@ impl BitcoinIndexer {
})?;
// check for reorg
let canonical_headers = self.read_headers(&headers_path, start_block, start_block + REORG_BATCH_SIZE)
let canonical_headers = self.read_spv_headers(&headers_path, start_block, start_block + REORG_BATCH_SIZE)
.map_err(|e| {
error!("Failed to read canonical headers from {} to {}", start_block, start_block + REORG_BATCH_SIZE);
e
})?;
let reorg_headers = self.read_headers(&reorg_headers_path, start_block, start_block + REORG_BATCH_SIZE)
let reorg_headers = self.read_spv_headers(&reorg_headers_path, start_block, start_block + REORG_BATCH_SIZE)
.map_err(|e| {
error!("Failed to read reorg headers from {} to {}", start_block, start_block + REORG_BATCH_SIZE);
e
@@ -468,56 +482,18 @@ impl BitcoinIndexer {
found = true;
break;
}
// reorged
new_headers_reversed.push(reorg_headers[i as usize].clone());
}
start_block -= REORG_BATCH_SIZE;
}
let new_headers = new_headers_reversed
.iter()
.rev()
.map(|header| BurnchainHeaderHash::from_bitcoin_hash(&header.header.bitcoin_hash()))
.collect();
Ok((new_tip, new_headers))
Ok(new_tip)
}
/*
/// Download and parse bitcoin blocks into burnchain blcoks, and feed them into a channel
pub fn download_bitcoin_blocks(&mut self, headers_path: &String, start_block: u64, end_block: u64, out_channel: BlockChannel<BitcoinAddress, BitcoinPublicKey>) -> Result<(), btc_error> {
if end_block < start_block {
panic!("Invalid block range: {} - {}", start_block, end_block);
}
let canonical_end_block = SpvClient::get_headers_height(headers_path)?;
if canonical_end_block < end_block {
return Err(btc_error::MissingHeader);
}
// build up a pipeline to fetch and validate all of these blocks.
let headers = self.read_headers(headers_path, start_block, end_block)?;
let input_ipc = vec![];
for i in start_block..end_block {
input_ipc.push(IPCHeader {
header: headers[i-start_block].clone(),
height: i
});
}
let downloader_stage = BitcoinBlockDownloader::new();
let parser_stage = BitcoinBlockParser::new(self.runtime.network_id, self.config.magic_bytes.clone());
// let db_stage =
Err(btc_error::NotImplemented)
}
*/
}
impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
/// Instantiate the Bitcoin indexer, but don't connect to the peer network.
impl BurnchainIndexer<LoneBlockHeader, PeerMessage, BitcoinBlockDownloader, BitcoinBlockParser, BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
/// Instantiate the Bitcoin indexer, and connect to the peer network.
/// Instead, load our configuration state and sanity-check it.
/// Call connect() next.
///
/// Pass a directory (working_dir) that contains a "bitcoin.ini" file.
fn init(network_name: &String, working_dir: &String) -> Result<BitcoinIndexer, burnchain_error> {
@@ -536,8 +512,11 @@ impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
return Err(burnchain_error::bitcoin(btc_error::ConfigError("Unrecognized network name".to_string())));
}
let bitcoin_network_id = network_id_opt.unwrap();
BitcoinIndexer::from_file(bitcoin_network_id, &conf_path_str)
.map_err(burnchain_error::bitcoin)
let mut indexer = BitcoinIndexer::from_file(bitcoin_network_id, &conf_path_str)
.map_err(burnchain_error::bitcoin)?;
indexer.connect()?;
Ok(indexer)
}
/// Connect to the Bitcoin peer network.
@@ -565,9 +544,24 @@ impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
.map_err(burnchain_error::bitcoin)
}
/// Identify underlying reorgs and return the block height at which they occur, as well as the
/// *new* header hashes
fn find_chain_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<(u64, Vec<BurnchainHeaderHash>), burnchain_error> {
/// Read downloaded headers within a range
fn read_headers(&self, headers_path: &String, start_block: u64, end_block: u64) -> Result<Vec<BurnHeaderIPC<LoneBlockHeader>>, burnchain_error> {
let headers = self.read_spv_headers(headers_path, start_block, end_block)
.map_err(burnchain_error::bitcoin)?;
let mut ret_headers : Vec<BurnHeaderIPC<LoneBlockHeader>> = vec![];
for i in 0..headers.len() {
ret_headers.push({
BurnHeaderIPC {
header: headers[i].clone(),
height: (i as u64) + start_block
}
});
}
Ok(ret_headers)
}
/// Identify underlying reorgs and return the block height at which they occur
fn find_chain_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<u64, burnchain_error> {
self.find_bitcoin_reorg(headers_path, db_height)
.map_err(burnchain_error::bitcoin)
}
@@ -592,9 +586,12 @@ impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
.map_err(burnchain_error::bitcoin)
}
/// Synchronize blocks
fn sync_blocks(&mut self, headers_path: &String, start_height: u64, end_height: u64, block_channel: &BlockChannel<BitcoinAddress, BitcoinPublicKey>) -> Result<(), burnchain_error> {
Err(burnchain_error::bitcoin(btc_error::NotImplemented))
fn downloader(&self) -> BitcoinBlockDownloader {
BitcoinBlockDownloader::new(self.dup())
}
fn parser(&self) -> BitcoinBlockParser {
BitcoinBlockParser::new(self.runtime.network_id, self.config.magic_bytes)
}
}