WIP: fleshed out indexer implementation to check for burn chain reorgs, fetch headers, fetch blocks, parse blocks, and (soon) feed them to the burn db

This commit is contained in:
Jude Nelson
2019-01-27 18:18:59 -05:00
parent b61781f258
commit 1a462cab7a

View File

@@ -17,7 +17,6 @@
along with Blockstack. If not, see <http://www.gnu.org/licenses/>.
*/
use std::env;
use std::net;
use std::sync::{Arc, Mutex, LockResult, MutexGuard};
use rand::{Rng, thread_rng};
@@ -25,7 +24,6 @@ use std::path::{PathBuf};
use ini::Ini;
use burnchains::PublicKey;
use burnchains::indexer::*;
use burnchains::bitcoin::spv::*;
use burnchains::bitcoin::rpc::BitcoinRPC;
@@ -33,11 +31,20 @@ use burnchains::bitcoin::Error as btc_error;
use burnchains::bitcoin::messages::BitcoinMessageHandler;
use burnchains::bitcoin::keys::BitcoinPublicKey;
use burnchains::BurnchainTransaction;
use burnchains::bitcoin::address::{BitcoinAddressType, BitcoinAddress};
use bitcoin::network::constants as bitcoin_constants;
use burnchains::bitcoin::address::BitcoinAddress;
use burnchains::bitcoin::BitcoinNetworkType;
use burnchains::bitcoin::blocks::{BitcoinBlockDownloader, BitcoinBlockParser};
use burnchains::BLOCKSTACK_MAGIC_MAINNET;
use burnchains::BurnchainHeaderHash;
use burnchains::Error as burnchain_error;
use burnchains::MagicBytes;
use bitcoin::BitcoinHash;
use bitcoin::blockdata::block::LoneBlockHeader;
use chainstate::burn::db::burndb::BurnDB;
use dirs;
@@ -54,6 +61,9 @@ pub const BITCOIN_REGTEST_NAME: &'static str = "regtest";
// maybe change this
pub const FIRST_BLOCK_MAINNET: u64 = 373601;
// batch size for searching for a reorg
const REORG_BATCH_SIZE: u64 = 200;
pub fn network_id_to_name(network_id: BitcoinNetworkType) -> &'static str {
match network_id {
BitcoinNetworkType::mainnet => BITCOIN_MAINNET_NAME,
@@ -62,7 +72,7 @@ pub fn network_id_to_name(network_id: BitcoinNetworkType) -> &'static str {
}
}
pub fn network_id_to_magic(network_id: BitcoinNetworkType) -> u32 {
pub fn network_id_to_bytes(network_id: BitcoinNetworkType) -> u32 {
match network_id {
BitcoinNetworkType::mainnet => BITCOIN_MAINNET,
BitcoinNetworkType::testnet => BITCOIN_TESTNET,
@@ -76,11 +86,13 @@ pub struct BitcoinIndexerConfig {
pub peer_host: String,
pub peer_port: u16,
pub rpc_port: u16,
pub rpc_ssl: bool,
pub username: Option<String>,
pub password: Option<String>,
pub timeout: u32,
pub spv_headers_path: String,
pub first_block: u64
pub first_block: u64,
pub magic_bytes: MagicBytes
}
pub struct BitcoinIndexerRuntime {
@@ -100,25 +112,27 @@ pub struct BitcoinIndexer {
impl BitcoinIndexerConfig {
fn default() -> BitcoinIndexerConfig {
let mut spv_headers_path = dirs::home_dir().unwrap();
spv_headers_path.push(".blockstack-core");
spv_headers_path.push(".stacks");
spv_headers_path.push("bitcoin-spv-headers.dat");
return BitcoinIndexerConfig {
BitcoinIndexerConfig {
peer_host: "bitcoin.blockstack.com".to_string(),
peer_port: 8332,
rpc_port: 8333,
rpc_ssl: false,
username: Some("blockstack".to_string()),
password: Some("blockstacksystem".to_string()),
timeout: 30,
spv_headers_path: spv_headers_path.to_str().unwrap().to_string(),
first_block: FIRST_BLOCK_MAINNET
};
first_block: FIRST_BLOCK_MAINNET,
magic_bytes: BLOCKSTACK_MAGIC_MAINNET.clone()
}
}
fn from_file(path: &str) -> Result<BitcoinIndexerConfig, &'static str> {
fn from_file(path: &String) -> Result<BitcoinIndexerConfig, btc_error> {
let conf_path = PathBuf::from(path);
if !conf_path.is_file() {
return Err("Failed to load BitcoinIndexerConfig file: No such file or directory");
return Err(btc_error::ConfigError("Failed to load BitcoinIndexerConfig file: No such file or directory".to_string()));
}
let default_config = BitcoinIndexerConfig::default();
@@ -127,7 +141,7 @@ impl BitcoinIndexerConfig {
// got data!
let bitcoin_section_opt = ini_file.section(Some("bitcoin").to_owned());
if None == bitcoin_section_opt {
return Err("No [bitcoin] section in config file");
return Err(btc_error::ConfigError("No [bitcoin] section in config file".to_string()));
}
let bitcoin_section = bitcoin_section_opt.unwrap();
@@ -138,18 +152,18 @@ impl BitcoinIndexerConfig {
let peer_port = bitcoin_section.get("p2p_port")
.unwrap_or(&format!("{}", default_config.peer_port))
.trim().parse().map_err(|_e| "Invalid bitcoin:p2p_port value")?;
.trim().parse().map_err(|_e| btc_error::ConfigError("Invalid bitcoin:p2p_port value".to_string()))?;
if peer_port <= 1024 || peer_port >= 65535 {
return Err("Invalid p2p_port");
return Err(btc_error::ConfigError("Invalid p2p_port".to_string()));
}
let rpc_port = bitcoin_section.get("port")
.unwrap_or(&format!("{}", default_config.rpc_port))
.trim().parse().map_err(|_e| "Invalid bitcoin:port value")?;
.trim().parse().map_err(|_e| btc_error::ConfigError("Invalid bitcoin:port value".to_string()))?;
if rpc_port <= 1024 || rpc_port >= 65535 {
return Err("Invalid rpc_port");
return Err(btc_error::ConfigError("Invalid rpc_port".to_string()));
}
let username = bitcoin_section.get("user").and_then(|s| Some(s.clone()));
@@ -157,29 +171,56 @@ impl BitcoinIndexerConfig {
let timeout = bitcoin_section.get("timeout")
.unwrap_or(&format!("{}", default_config.timeout))
.trim().parse().map_err(|_e| "Invalid bitcoin:timeout value")?;
.trim().parse().map_err(|_e| btc_error::ConfigError("Invalid bitcoin:timeout value".to_string()))?;
let spv_headers_path = bitcoin_section.get("spv_path")
.unwrap_or(&default_config.spv_headers_path);
let first_block = bitcoin_section.get("first_block")
.unwrap_or(&format!("{}", FIRST_BLOCK_MAINNET))
.trim().parse().map_err(|_e| "Invalid bitcoin:first_block value")?;
.unwrap_or(&format!("{}", FIRST_BLOCK_MAINNET))
.trim().parse().map_err(|_e| btc_error::ConfigError("Invalid bitcoin:first_block value".to_string()))?;
let rpc_ssl_str = bitcoin_section.get("ssl")
.unwrap_or(&format!("{}", default_config.rpc_ssl))
.clone();
let rpc_ssl = rpc_ssl_str == "1" || rpc_ssl_str == "true";
let blockstack_section_opt = ini_file.section(Some("blockstack").to_owned());
if None == blockstack_section_opt {
return Err(btc_error::ConfigError("No [blockstack] section in config file".to_string()));
}
let blockstack_section = blockstack_section_opt.unwrap();
// defaults
let blockstack_magic_str = blockstack_section.get("network_id")
.unwrap_or(&"id".to_string())
.clone();
if blockstack_magic_str.len() != 2 {
return Err(btc_error::ConfigError("Invalid blockstack:network_id value: must be two bytes".to_string()));
}
let blockstack_magic = MagicBytes([blockstack_magic_str.as_bytes()[0] as u8, blockstack_magic_str.as_bytes()[1] as u8]);
let cfg = BitcoinIndexerConfig {
peer_host: peer_host.to_string(),
peer_port: peer_port,
rpc_port: rpc_port,
rpc_ssl: rpc_ssl,
username: username,
password: password,
timeout: timeout,
spv_headers_path: spv_headers_path.to_string(),
first_block: first_block
first_block: first_block,
magic_bytes: blockstack_magic
};
return Ok(cfg);
Ok(cfg)
},
Err(_) => {
return Err("Failed to parse BitcoinConfigIndexer config file");
Err(btc_error::ConfigError("Failed to parse BitcoinConfigIndexer config file".to_string()))
}
}
}
@@ -187,58 +228,58 @@ impl BitcoinIndexerConfig {
impl BitcoinIndexerRuntime {
pub fn default(network_id: BitcoinNetworkType) -> BitcoinIndexerRuntime {
pub fn new(network_id: BitcoinNetworkType) -> BitcoinIndexerRuntime {
let mut rng = thread_rng();
return BitcoinIndexerRuntime {
BitcoinIndexerRuntime {
sock: Arc::new(Mutex::new(None)),
services: 0,
user_agent: USER_AGENT.to_owned(),
version_nonce: rng.gen(),
network_id: network_id
};
}
}
}
impl BitcoinIndexer {
pub fn new() -> BitcoinIndexer {
let default_config = BitcoinIndexerConfig::default();
return BitcoinIndexer {
config: default_config,
runtime: BitcoinIndexerRuntime::default(BitcoinNetworkType::mainnet)
};
pub fn from_file(network_id: BitcoinNetworkType, config_file: &String) -> Result<BitcoinIndexer, btc_error> {
let config = BitcoinIndexerConfig::from_file(config_file)?;
let runtime = BitcoinIndexerRuntime::new(network_id);
Ok(BitcoinIndexer {
config: config,
runtime: runtime
})
}
/// (re)connect to our configured network peer.
/// Sets self.runtime.sock to a new socket referring to our configured
/// Bitcoin peer. If we fail to connect, this method sets the socket
/// to None.
fn reconnect_peer(&mut self) -> Result<(), &'static str> {
fn reconnect_peer(&mut self) -> Result<(), btc_error> {
match net::TcpStream::connect((self.config.peer_host.as_str(), self.config.peer_port)) {
Ok(s) => {
self.runtime.sock = Arc::new(Mutex::new(Some(s)));
return Ok(());
Ok(())
},
Err(_e) => {
self.runtime.sock = Arc::new(Mutex::new(None));
return Err("Failed to connect to remote peer");
Err(btc_error::ConnectionError)
}
}
}
/// Get a locked handle to the internal socket
pub fn socket_locked(&mut self) -> LockResult<MutexGuard<Option<net::TcpStream>>> {
return self.runtime.sock.lock();
self.runtime.sock.lock()
}
/// Open an RPC connection to bitcoind
pub fn get_bitcoin_client(&self) -> BitcoinRPC {
let client = BitcoinRPC::new(
format!("http://{}:{}", self.config.peer_host.as_str(), self.config.rpc_port),
BitcoinRPC::new(
format!("{}://{}:{}", if self.config.rpc_ssl { "https" } else { "http" }, self.config.peer_host.as_str(), self.config.rpc_port),
self.config.username.clone(),
self.config.password.clone()
);
return client;
)
}
/// Carry on a conversation with the bitcoin peer.
@@ -292,8 +333,8 @@ impl BitcoinIndexer {
let msg_result = self.recv_message();
match msg_result {
Ok(msg) => {
// got a message, so handle it!
let handled = self.handle_message(&msg, Some(message_handler));
// got a message; go consume it
let handled = self.handle_message(msg, Some(message_handler));
match handled {
Ok(do_continue) => {
keep_going = do_continue;
@@ -301,8 +342,8 @@ impl BitcoinIndexer {
debug!("Message handler indicates to stop");
}
}
Err(btc_error::UnhandledMessage) => {
debug!("Unhandled message {:?}", msg);
Err(btc_error::UnhandledMessage(m)) => {
debug!("Unhandled message {:?}", m);
}
Err(btc_error::ConnectionBroken) => {
debug!("Re-establish peer connection");
@@ -323,11 +364,155 @@ impl BitcoinIndexer {
}
}
}
return Ok(());
Ok(())
}
}
pub fn get_bitcoin_blockchain_height(&self) -> Result<u64, btc_error> {
let bitcoin_client = self.get_bitcoin_client();
bitcoin_client.getblockcount()
}
/// Synchronize on-disk headers from Bitcoin up to the given block height.
/// Returns the number of headers fetched.
pub fn sync_all_headers(&mut self, last_block: u64) -> Result<u64, btc_error> {
debug!("Sync all headers for blocks {} - {}", 0, last_block);
let mut spv_client = SpvClient::new(&self.config.spv_headers_path, 0, last_block, self.runtime.network_id);
spv_client.run(self)
.and_then(|_r| Ok(last_block - 1))
}
/// Synchronize the last N headers from bitcoin to a specific file.
/// Returns the number of headers fetched.
pub fn sync_last_headers(&mut self, path: &String, start_block: u64, last_block: u64) -> Result<u64, btc_error> {
debug!("Sync all headers for blocks {} - {}", 0, last_block);
let mut spv_client = SpvClient::new(&path, start_block, last_block, self.runtime.network_id);
spv_client.run(self)
.and_then(|r| Ok(last_block - 1 - start_block))
}
/// Get a range of block headers from a file.
/// If the range falls of the end of the headers file, then the returned array will be
/// truncated to not include them (note that this method can return an empty list of the
/// start_block is off the end of the file).
pub fn read_headers(&self, headers_path: &String, start_block: u64, end_block: u64) -> Result<Vec<LoneBlockHeader>, btc_error> {
let mut headers = vec![];
for block_height in start_block..end_block {
let header_opt = SpvClient::read_block_header(headers_path, block_height)?;
match header_opt {
Some(header) => {
headers.push(header.clone());
},
None => {
break;
}
}
}
Ok(headers)
}
/// Search for a bitcoin reorg. Return the list of *new* header hashes and the offset into the
/// canonical headers where the reorg starts.
pub fn find_bitcoin_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<(u64, Vec<BurnchainHeaderHash>), btc_error> {
let mut reorg_headers_pathbuf = PathBuf::from(&headers_path);
reorg_headers_pathbuf.push(".reorg");
let reorg_headers_path = reorg_headers_pathbuf.to_str().unwrap().to_string();
let mut new_headers_reversed = vec![];
let mut new_tip = 0;
let mut found = false;
// what's the last header we have from the canonical history?
let canonical_end_block = SpvClient::get_headers_height(&headers_path)?;
if canonical_end_block < db_height {
// should never happen
panic!("Headers is at block {}, but database is at block {}", canonical_end_block, db_height);
}
let mut start_block =
if db_height < REORG_BATCH_SIZE {
0
}
else {
db_height - REORG_BATCH_SIZE
};
while start_block > 0 && !found {
debug!("Search for reorg'ed headers from {} - {}", start_block, start_block + REORG_BATCH_SIZE);
// get new headers
let mut spv_client = SpvClient::new(&reorg_headers_path, start_block, start_block + REORG_BATCH_SIZE, self.runtime.network_id);
spv_client.run(self)
.map_err(|e| {
error!("Failed to fetch headers from {} - {}", start_block, REORG_BATCH_SIZE);
e
})?;
// check for reorg
let canonical_headers = self.read_headers(&headers_path, start_block, start_block + REORG_BATCH_SIZE)
.map_err(|e| {
error!("Failed to read canonical headers from {} to {}", start_block, start_block + REORG_BATCH_SIZE);
e
})?;
let reorg_headers = self.read_headers(&reorg_headers_path, start_block, start_block + REORG_BATCH_SIZE)
.map_err(|e| {
error!("Failed to read reorg headers from {} to {}", start_block, start_block + REORG_BATCH_SIZE);
e
})?;
for i in (start_block..(start_block + REORG_BATCH_SIZE)).rev() {
if canonical_headers[i as usize] == reorg_headers[i as usize] {
// shared history
new_tip = i + 1;
found = true;
break;
}
// reorged
new_headers_reversed.push(reorg_headers[i as usize].clone());
}
start_block -= REORG_BATCH_SIZE;
}
let new_headers = new_headers_reversed
.iter()
.rev()
.map(|header| BurnchainHeaderHash::from_bitcoin_hash(&header.header.bitcoin_hash()))
.collect();
Ok((new_tip, new_headers))
}
/*
/// Download and parse bitcoin blocks into burnchain blcoks, and feed them into a channel
pub fn download_bitcoin_blocks(&mut self, headers_path: &String, start_block: u64, end_block: u64, out_channel: BlockChannel<BitcoinAddress, BitcoinPublicKey>) -> Result<(), btc_error> {
if end_block < start_block {
panic!("Invalid block range: {} - {}", start_block, end_block);
}
let canonical_end_block = SpvClient::get_headers_height(headers_path)?;
if canonical_end_block < end_block {
return Err(btc_error::MissingHeader);
}
// build up a pipeline to fetch and validate all of these blocks.
let headers = self.read_headers(headers_path, start_block, end_block)?;
let input_ipc = vec![];
for i in start_block..end_block {
input_ipc.push(IPCHeader {
header: headers[i-start_block].clone(),
height: i
});
}
let downloader_stage = BitcoinBlockDownloader::new();
let parser_stage = BitcoinBlockParser::new(self.runtime.network_id, self.config.magic_bytes.clone());
// let db_stage =
Err(btc_error::NotImplemented)
}
*/
}
impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
/// Instantiate the Bitcoin indexer, but don't connect to the peer network.
@@ -335,27 +520,11 @@ impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
/// Call connect() next.
///
/// Pass a directory (working_dir) that contains a "bitcoin.ini" file.
fn setup(&mut self, working_dir: &str) -> Result<(), &'static str> {
let mut conf_path = PathBuf::from(working_dir);
conf_path.push("bitcoin.ini");
fn init(network_name: &String, working_dir: &String) -> Result<BitcoinIndexer, burnchain_error> {
let mut conf_path = PathBuf::from(working_dir);
conf_path.push("bitcoin.ini");
let conf_path_str = conf_path.to_str().unwrap().to_string();
match BitcoinIndexerConfig::from_file(conf_path.to_str().unwrap()) {
Ok(cfg) => {
self.config = cfg;
return Ok(());
},
Err(e) => {
return Err(e);
}
};
}
/// Connect to the Bitcoin peer network.
/// Use the peer host and peer port given in the config file,
/// and loaded in on setup. Don't call this before setup().
///
/// Pass "mainnet", "testnet", or "regtest" as the network name
fn connect(&mut self, network_name: &str) -> Result<(), &'static str> {
let network_id_opt = match network_name.as_ref() {
"mainnet" => Some(BitcoinNetworkType::mainnet),
"testnet" => Some(BitcoinNetworkType::testnet),
@@ -363,65 +532,69 @@ impl BurnchainIndexer<BitcoinAddress, BitcoinPublicKey> for BitcoinIndexer {
_ => None
};
if None == network_id_opt {
return Err("Unrecognized network name");
if network_id_opt.is_none() {
return Err(burnchain_error::bitcoin(btc_error::ConfigError("Unrecognized network name".to_string())));
}
let bitcoin_network_id = network_id_opt.unwrap();
BitcoinIndexer::from_file(bitcoin_network_id, &conf_path_str)
.map_err(burnchain_error::bitcoin)
}
/// Connect to the Bitcoin peer network.
/// Use the peer host and peer port given in the config file,
/// and loaded in on setup. Don't call this before init().
fn connect(&mut self) -> Result<(), burnchain_error> {
self.reconnect_peer()
.map_err(burnchain_error::bitcoin)
}
/// Get the location on disk where we keep headers
fn get_headers_path(&self) -> String {
self.config.spv_headers_path.clone()
}
/// Get the number of headers we have
fn get_headers_height(&self, headers_path: &String) -> Result<u64, burnchain_error> {
SpvClient::get_headers_height(headers_path)
.map_err(burnchain_error::bitcoin)
}
/// Get the height of the blockchain
fn get_blockchain_height(&self) -> Result<u64, burnchain_error> {
self.get_bitcoin_blockchain_height()
.map_err(burnchain_error::bitcoin)
}
/// Identify underlying reorgs and return the block height at which they occur, as well as the
/// *new* header hashes
fn find_chain_reorg(&mut self, headers_path: &String, db_height: u64) -> Result<(u64, Vec<BurnchainHeaderHash>), burnchain_error> {
self.find_bitcoin_reorg(headers_path, db_height)
.map_err(burnchain_error::bitcoin)
}
/// Download and store all headers between two block heights
fn sync_headers(&mut self, headers_path: &String, start_height: u64, end_height: u64) -> Result<(), burnchain_error> {
self.sync_last_headers(headers_path, start_height, end_height)
.map_err(burnchain_error::bitcoin)
.and_then(|_num_fetched| Ok(()))
}
/// Drop headers after a given height
fn drop_headers(&mut self, headers_path: &String, new_height: u64) -> Result<(), burnchain_error> {
let canonical_end_block = SpvClient::get_headers_height(headers_path)
.map_err(burnchain_error::bitcoin)?;
if canonical_end_block < new_height {
return Err(burnchain_error::bitcoin(btc_error::BlockchainHeight));
}
let network_id = network_id_opt.unwrap();
self.runtime = BitcoinIndexerRuntime::default(network_id);
return self.reconnect_peer();
SpvClient::drop_headers(headers_path, new_height)
.map_err(burnchain_error::bitcoin)
}
fn get_block_hash(&mut self, block_height: u64) -> Result<String, &'static str> {
return Err("not implemented");
}
fn get_block_txs(&mut self, block_hash: &str) -> Result<Box<Vec<BurnchainTransaction<BitcoinAddress, BitcoinPublicKey>>>, &'static str> {
return Err("not implemented");
/// Synchronize blocks
fn sync_blocks(&mut self, headers_path: &String, start_height: u64, end_height: u64, block_channel: &BlockChannel<BitcoinAddress, BitcoinPublicKey>) -> Result<(), burnchain_error> {
Err(burnchain_error::bitcoin(btc_error::NotImplemented))
}
}
/// Synchronize all block headers.
/// Returns the number of *new* headers fetched
pub fn sync_block_headers(indexer: &mut BitcoinIndexer, end_block: Option<u64>) -> Result<u64, btc_error> {
// how many blocks are there?
let last_block = match end_block {
Some(block_height) => {
block_height
}
None => {
let bitcoin_client = indexer.get_bitcoin_client();
let block_count = bitcoin_client.getblockcount()?;
block_count
}
};
let first_block = match SpvClient::get_headers_height(&indexer.config.spv_headers_path) {
Ok(block_height) => {
block_height
}
Err(btc_error::FilesystemError(ref e)) => {
// headers path doesn't exist
0
}
Err(e) => {
// some other error
debug!("Unable to find first block height: {:?}", e);
return Err(e);
}
};
if first_block >= last_block {
debug!("Fetched 0 headers -- all caught up");
return Ok(0);
}
debug!("Sync headers for blocks {} - {}", first_block, last_block);
let mut spv_client = SpvClient::new(&indexer.config.spv_headers_path, first_block, last_block, indexer.runtime.network_id);
let spv_res = spv_client.run(indexer)
.and_then(|_r| Ok(last_block - 1 - first_block));
return spv_res;
}