Merge branch 'develop' into feat/drop-problematic-transactions

This commit is contained in:
Jude Nelson
2022-07-27 01:30:25 +00:00
committed by GitHub
10 changed files with 2593 additions and 126 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -77,8 +77,10 @@ pub enum Error {
NoncontiguousHeader,
/// Missing header
MissingHeader,
/// Invalid target
/// Invalid header proof-of-work (i.e. due to a bad timestamp or a bad `bits` field)
InvalidPoW,
/// Chainwork would decrease by including a given header
InvalidChainWork,
/// Wrong number of bytes for constructing an address
InvalidByteSequence,
/// Configuration error
@@ -107,6 +109,7 @@ impl fmt::Display for Error {
Error::NoncontiguousHeader => write!(f, "Non-contiguous header"),
Error::MissingHeader => write!(f, "Missing header"),
Error::InvalidPoW => write!(f, "Invalid proof of work"),
Error::InvalidChainWork => write!(f, "Chain difficulty cannot decrease"),
Error::InvalidByteSequence => write!(f, "Invalid sequence of bytes"),
Error::ConfigError(ref e_str) => fmt::Display::fmt(e_str, f),
Error::BlockchainHeight => write!(f, "Value is beyond the end of the blockchain"),
@@ -133,6 +136,7 @@ impl error::Error for Error {
Error::NoncontiguousHeader => None,
Error::MissingHeader => None,
Error::InvalidPoW => None,
Error::InvalidChainWork => None,
Error::InvalidByteSequence => None,
Error::ConfigError(ref _e_str) => None,
Error::BlockchainHeight => None,

View File

@@ -39,13 +39,14 @@ use crate::burnchains::bitcoin::Error as btc_error;
use crate::burnchains::bitcoin::PeerMessage;
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
use rusqlite::OptionalExtension;
use rusqlite::Row;
use rusqlite::Transaction;
use rusqlite::{Connection, OpenFlags, NO_PARAMS};
use crate::util_lib::db::{
query_row, query_rows, sqlite_open, tx_begin_immediate, tx_busy_handler, u64_to_sql, DBConn,
DBTx, Error as db_error, FromColumn, FromRow,
query_int, query_row, query_rows, sqlite_open, tx_begin_immediate, tx_busy_handler, u64_to_sql,
DBConn, DBTx, Error as db_error, FromColumn, FromRow,
};
use stacks_common::util::get_epoch_time_secs;
use stacks_common::util::hash::{hex_bytes, to_hex};
@@ -67,7 +68,7 @@ const BITCOIN_GENESIS_BLOCK_HASH_REGTEST: &'static str =
pub const BLOCK_DIFFICULTY_CHUNK_SIZE: u64 = 2016;
const BLOCK_DIFFICULTY_INTERVAL: u32 = 14 * 24 * 60 * 60; // two weeks, in seconds
pub const SPV_DB_VERSION: &'static str = "1";
pub const SPV_DB_VERSION: &'static str = "2";
const SPV_INITIAL_SCHEMA: &[&'static str] = &[
r#"
@@ -84,6 +85,17 @@ const SPV_INITIAL_SCHEMA: &[&'static str] = &[
"CREATE TABLE db_config(version TEXT NOT NULL);",
];
// store the running chain work totals for each difficulty interval.
// unlike the `headers` table, this table will never be deleted from, since we use it to determine
// whether or not newly-arrived headers represent a better chain than the best-known chain. The
// only way to _replace_ a row is to find a header difficulty interval with a _higher_ work score.
const SPV_SCHEMA_2: &[&'static str] = &[r#"
CREATE TABLE chain_work(
interval INTEGER PRIMARY KEY,
work TEXT NOT NULL -- 32-byte (256-bit) integer
);
"#];
pub struct SpvClient {
pub headers_path: String,
pub start_block_height: u64,
@@ -130,7 +142,7 @@ impl SpvClient {
readwrite: bool,
reverse_order: bool,
) -> Result<SpvClient, btc_error> {
let conn = SpvClient::db_open(headers_path, readwrite)?;
let conn = SpvClient::db_open(headers_path, readwrite, true)?;
let mut client = SpvClient {
headers_path: headers_path.to_owned(),
start_block_height: start_block,
@@ -143,7 +155,35 @@ impl SpvClient {
};
if readwrite {
client.init_block_headers()?;
client.init_block_headers(true)?;
}
Ok(client)
}
#[cfg(test)]
pub fn new_without_migration(
headers_path: &str,
start_block: u64,
end_block: Option<u64>,
network_id: BitcoinNetworkType,
readwrite: bool,
reverse_order: bool,
) -> Result<SpvClient, btc_error> {
let conn = SpvClient::db_open(headers_path, readwrite, false)?;
let mut client = SpvClient {
headers_path: headers_path.to_owned(),
start_block_height: start_block,
end_block_height: end_block,
cur_block_height: start_block,
network_id: network_id,
readwrite: readwrite,
reverse_order: reverse_order,
headers_db: conn,
};
if readwrite {
client.init_block_headers(false)?;
}
Ok(client)
@@ -153,6 +193,11 @@ impl SpvClient {
&self.headers_db
}
#[cfg(test)]
pub fn conn_mut(&mut self) -> &mut DBConn {
&mut self.headers_db
}
pub fn tx_begin<'a>(&'a mut self) -> Result<DBTx<'a>, btc_error> {
if !self.readwrite {
return Err(db_error::ReadOnly.into());
@@ -169,6 +214,9 @@ impl SpvClient {
for row_text in SPV_INITIAL_SCHEMA {
tx.execute_batch(row_text).map_err(db_error::SqliteError)?;
}
for row_text in SPV_SCHEMA_2 {
tx.execute_batch(row_text).map_err(db_error::SqliteError)?;
}
tx.execute(
"INSERT INTO db_config (version) VALUES (?1)",
@@ -180,7 +228,57 @@ impl SpvClient {
Ok(())
}
fn db_open(headers_path: &str, readwrite: bool) -> Result<DBConn, btc_error> {
fn db_get_version(conn: &DBConn) -> Result<String, btc_error> {
let version_str = conn
.query_row("SELECT MAX(version) FROM db_config", NO_PARAMS, |row| {
let version: String = row.get_unwrap(0);
Ok(version)
})
.optional()
.map_err(db_error::SqliteError)?
.unwrap_or("0".to_string());
Ok(version_str)
}
fn db_set_version(tx: &Transaction, version: &str) -> Result<(), btc_error> {
tx.execute("UPDATE db_config SET version = ?1", &[version])
.map_err(db_error::SqliteError)
.map_err(|e| e.into())
.and_then(|_| Ok(()))
}
#[cfg(test)]
pub fn test_db_migrate(conn: &mut DBConn) -> Result<(), btc_error> {
SpvClient::db_migrate(conn)
}
fn db_migrate(conn: &mut DBConn) -> Result<(), btc_error> {
let version = SpvClient::db_get_version(conn)?;
while version != SPV_DB_VERSION {
let version = SpvClient::db_get_version(conn)?;
match version.as_str() {
"1" => {
debug!("Migrate SPV DB from schema 1 to 2");
let tx = tx_begin_immediate(conn)?;
for row_text in SPV_SCHEMA_2 {
tx.execute_batch(row_text).map_err(db_error::SqliteError)?;
}
SpvClient::db_set_version(&tx, "2")?;
tx.commit().map_err(db_error::SqliteError)?;
}
SPV_DB_VERSION => {
break;
}
_ => {
panic!("Unrecognized SPV version {}", &version);
}
}
}
Ok(())
}
fn db_open(headers_path: &str, readwrite: bool, migrate: bool) -> Result<DBConn, btc_error> {
let mut create_flag = false;
let open_flags = if fs::metadata(headers_path).is_err() {
// need to create
@@ -205,6 +303,9 @@ impl SpvClient {
if create_flag {
SpvClient::db_instantiate(&mut conn)?;
}
if readwrite && migrate {
SpvClient::db_migrate(&mut conn)?;
}
Ok(conn)
}
@@ -229,6 +330,157 @@ impl SpvClient {
indexer.peer_communicate(self, true)
}
/// Calculate the total work over a given interval of headers.
fn get_interval_work(interval_headers: &Vec<LoneBlockHeader>) -> Uint256 {
let mut work = Uint256::from_u64(0);
for hdr in interval_headers.iter() {
work = work + hdr.header.work();
}
work
}
/// Find the highest interval for which we have a chain work score.
/// The interval corresponds to blocks (interval - 1) * 2016 ... interval * 2016
pub fn find_highest_work_score_interval(&self) -> Result<u64, btc_error> {
let max_interval_opt: Option<i64> = self
.conn()
.query_row(
"SELECT interval FROM chain_work ORDER BY interval DESC LIMIT 1",
NO_PARAMS,
|row| row.get(0),
)
.optional()
.map_err(db_error::SqliteError)?;
Ok(max_interval_opt.map(|x| x as u64).unwrap_or(0))
}
/// Find the total work score for an interval, if it has been calculated
pub fn find_interval_work(&self, interval: u64) -> Result<Option<Uint256>, btc_error> {
let work_hex: Option<String> = self
.conn()
.query_row(
"SELECT work FROM chain_work WHERE interval = ?1",
&[&u64_to_sql(interval)?],
|row| row.get(0),
)
.optional()
.map_err(db_error::SqliteError)?;
Ok(work_hex.map(|x| Uint256::from_hex_be(&x).expect("FATAL: work is not a uint256")))
}
/// Store an interval's running total work.
/// The interval must not yet have an interval work score, or must be less than or equal to the
/// currently-stored interval.
pub fn store_interval_work(&mut self, interval: u64, work: Uint256) -> Result<(), btc_error> {
if let Some(cur_work) = self.find_interval_work(interval)? {
if cur_work > work {
error!(
"Tried to store work {} to interval {}, which has work {} already",
work, interval, cur_work
);
return Err(btc_error::InvalidChainWork);
}
}
let tx = self.tx_begin()?;
let args: &[&dyn ToSql] = &[&u64_to_sql(interval)?, &work.to_hex_be()];
tx.execute(
"INSERT OR REPLACE INTO chain_work (interval,work) VALUES (?1,?2)",
args,
)
.map_err(db_error::SqliteError)?;
tx.commit().map_err(db_error::SqliteError)?;
Ok(())
}
/// Update the total chain work table up to a given interval (even if partial).
/// This method is idempotent.
/// Returns the total work.
pub fn update_chain_work(&mut self) -> Result<Uint256, btc_error> {
let highest_interval = self.find_highest_work_score_interval()?;
let mut work_so_far = if highest_interval > 0 {
self.find_interval_work(highest_interval - 1)?
.expect("FATAL: no work score for highest known interval")
} else {
Uint256::from_u64(0)
};
let last_interval = self.get_headers_height()? / BLOCK_DIFFICULTY_CHUNK_SIZE + 1;
test_debug!(
"Highest work-calculation interval is {} (height {}), work {}; update to {}",
highest_interval,
highest_interval * BLOCK_DIFFICULTY_CHUNK_SIZE,
work_so_far,
last_interval
);
for interval in (highest_interval + 1)..(last_interval + 1) {
let mut partial = false;
let interval_headers = self.read_block_headers(
(interval - 1) * BLOCK_DIFFICULTY_CHUNK_SIZE,
interval * BLOCK_DIFFICULTY_CHUNK_SIZE,
)?;
let interval_work = SpvClient::get_interval_work(&interval_headers);
work_so_far = work_so_far + interval_work;
if interval_headers.len() == BLOCK_DIFFICULTY_CHUNK_SIZE as usize {
self.store_interval_work(interval - 1, work_so_far)?;
} else {
partial = true;
}
test_debug!(
"Chain work in {} interval {} ({}-{}) is {}, total is {}",
if partial { "partial" } else { "full" },
interval - 1,
(interval - 1) * BLOCK_DIFFICULTY_CHUNK_SIZE,
(interval - 1) * BLOCK_DIFFICULTY_CHUNK_SIZE + (interval_headers.len() as u64),
interval_work,
work_so_far
);
if partial {
break;
}
}
Ok(work_so_far)
}
/// Get the total chain work.
/// You will have needed to call update_chain_work() prior to this after inserting new headers.
pub fn get_chain_work(&self) -> Result<Uint256, btc_error> {
let highest_full_interval = self.find_highest_work_score_interval()?;
let highest_interval_work = if highest_full_interval == 0 {
Uint256::from_u64(0)
} else {
self.find_interval_work(highest_full_interval)?
.expect("FATAL: have interval but no work")
};
let partial_interval = if highest_full_interval == 0 {
0
} else {
highest_full_interval + 1
};
let partial_interval_headers = self.read_block_headers(
partial_interval * BLOCK_DIFFICULTY_CHUNK_SIZE,
(partial_interval + 1) * BLOCK_DIFFICULTY_CHUNK_SIZE,
)?;
assert!(
partial_interval_headers.len() < BLOCK_DIFFICULTY_CHUNK_SIZE as usize,
"interval {} is not partial",
partial_interval
);
let partial_interval_work = SpvClient::get_interval_work(&partial_interval_headers);
debug!("Chain work: highest work-calculated interval is {} with total work {} partial {} ({} headers)", &highest_full_interval, &highest_interval_work, &partial_interval_work, partial_interval_headers.len());
Ok(highest_interval_work + partial_interval_work)
}
/// Validate a headers message we requested
/// * must have at least one header
/// * headers must be contiguous
@@ -292,6 +544,24 @@ impl SpvClient {
Some(res) => res.header,
};
// each header's timestamp must exceed the median of the past 11 blocks
if block_height > 11 {
let past_11_headers =
self.read_block_headers(block_height - 11, block_height)?;
let mut past_timestamps: Vec<u32> =
past_11_headers.iter().map(|hdr| hdr.header.time).collect();
past_timestamps.sort();
if header_i.time <= past_timestamps[5] {
error!(
"Block {} timestamp {} <= {} (median of {:?})",
block_height, header_i.time, past_timestamps[5], &past_timestamps
);
return Err(btc_error::InvalidPoW);
}
}
// header difficulty must not change in a difficulty interval
let (bits, difficulty) =
match self.get_target(block_height, &header_i, &headers, i)? {
Some(x) => x,
@@ -307,7 +577,7 @@ impl SpvClient {
return Err(btc_error::InvalidPoW);
}
let header_hash = header_i.bitcoin_hash().into_le();
if difficulty <= header_hash {
if difficulty < header_hash {
error!(
"block {} hash {} has less work than difficulty {} in {}",
block_height,
@@ -429,8 +699,9 @@ impl SpvClient {
.and_then(|_x| Ok(()))
}
/// Initialize the block headers file with the genesis block hash
fn init_block_headers(&mut self) -> Result<(), btc_error> {
/// Initialize the block headers file with the genesis block hash.
/// Optionally sip migration for testing.
fn init_block_headers(&mut self, migrate: bool) -> Result<(), btc_error> {
assert!(self.readwrite, "SPV header DB is open read-only");
let (genesis_block, genesis_block_hash_str) = match self.network_id {
BitcoinNetworkType::Mainnet => (
@@ -464,6 +735,10 @@ impl SpvClient {
tx.commit().map_err(db_error::SqliteError)?;
debug!("Initialized block headers at {}", self.headers_path);
if migrate {
self.update_chain_work()?;
}
return Ok(());
}
@@ -471,7 +746,7 @@ impl SpvClient {
/// -- validate them
/// -- store them
/// Can error if there has been a reorg, or if the headers don't correspond to headers we asked
/// for.
/// for, or if the new chain has less total work than the old chain.
fn handle_headers(
&mut self,
insert_height: u64,
@@ -482,9 +757,11 @@ impl SpvClient {
let num_headers = block_headers.len();
let first_header_hash = block_headers[0].header.bitcoin_hash();
let last_header_hash = block_headers[block_headers.len() - 1].header.bitcoin_hash();
let total_work_before = self.update_chain_work()?;
if !self.reverse_order {
// fetching headers in ascending order
// fetching headers in ascending order, so verify that the first item in
// `block_headers` connects to a parent in the DB (if it has one)
self.insert_block_headers_after(insert_height, block_headers)
.map_err(|e| {
error!("Failed to insert block headers: {:?}", &e);
@@ -505,7 +782,8 @@ impl SpvClient {
e
})?;
} else {
// fetching headers in descending order
// fetching headers in descending order, so verify that the last item in
// `block_headers` connects to a child in the DB (if it has one)
self.insert_block_headers_before(insert_height, block_headers)
.map_err(|e| {
error!("Failed to insert block headers: {:?}", &e);
@@ -530,6 +808,15 @@ impl SpvClient {
}
if num_headers > 0 {
let total_work_after = self.update_chain_work()?;
if total_work_after < total_work_before {
error!(
"New headers represent less work than the old headers ({} < {})",
total_work_before, total_work_after
);
return Err(btc_error::InvalidChainWork);
}
debug!(
"Handled {} Headers: {}-{}",
num_headers, first_header_hash, last_header_hash
@@ -564,6 +851,15 @@ impl SpvClient {
Ok(())
}
#[cfg(test)]
pub fn test_write_block_headers(
&mut self,
height: u64,
headers: Vec<LoneBlockHeader>,
) -> Result<(), btc_error> {
self.write_block_headers(height, headers)
}
/// Insert block headers into the headers DB.
/// Verify that the first header's parent exists and connects with this header chain, and verify that
/// the headers are themselves contiguous.
@@ -665,23 +961,6 @@ impl SpvClient {
}
}
match self.read_block_header(start_height)? {
Some(parent_header) => {
// contiguous?
if block_headers[0].header.prev_blockhash != parent_header.header.bitcoin_hash() {
warn!("Received discontiguous headers at height {}: we have parent {:?} ({}), but were given {:?} ({})",
start_height, &parent_header.header, parent_header.header.bitcoin_hash(), &block_headers[0].header, &block_headers[0].header.bitcoin_hash());
return Err(btc_error::NoncontiguousHeader);
}
}
None => {
debug!(
"No header for parent block {}, so will not validate continuity",
start_height - 1
);
}
}
// store them
self.write_block_headers(start_height + 1, block_headers)
}
@@ -707,8 +986,44 @@ impl SpvClient {
Ok(())
}
/// Determine the (bits, target) between two headers
pub fn get_target_between_headers(
first_header: &LoneBlockHeader,
last_header: &LoneBlockHeader,
) -> (u32, Uint256) {
let max_target = Uint256([
0x0000000000000000,
0x0000000000000000,
0x0000000000000000,
0x00000000ffff0000,
]);
// find actual timespan as being clamped between +/- 4x of the target timespan
let mut actual_timespan = (last_header.header.time - first_header.header.time) as u64;
let target_timespan = BLOCK_DIFFICULTY_INTERVAL as u64;
if actual_timespan < (target_timespan / 4) {
actual_timespan = target_timespan / 4;
}
if actual_timespan > (target_timespan * 4) {
actual_timespan = target_timespan * 4;
}
let last_target = last_header.header.target();
let new_target =
(last_target * Uint256::from_u64(actual_timespan)) / Uint256::from_u64(target_timespan);
let target = cmp::min(new_target, max_target);
let bits = BlockHeader::compact_target_from_u256(&target);
let target = BlockHeader::compact_target_to_u256(bits);
(bits, target)
}
/// Determine the target difficult over a given difficulty adjustment interval
/// the `interval` parameter is the difficulty interval -- a 2016-block interval.
/// * On mainnet, `headers_in_range` can be empty. If it's not empty, then the 0th element is
/// treated as the parent of `current_header`. On testnet, `headers_in_range` must be a range
/// of headers in the given `interval`.
/// Returns (new bits, new target)
pub fn get_target(
&self,
@@ -758,7 +1073,7 @@ impl SpvClient {
if current_header_height % BLOCK_DIFFICULTY_CHUNK_SIZE != 0
&& self.network_id == BitcoinNetworkType::Testnet
{
// In Testnet mode, if the new block's timestamp is more than 2* 10 minutes
// In Testnet mode, if the new block's timestamp is more than 2 * 60 * 10 minutes
// then allow mining of a min-difficulty block.
if current_header.time > parent_header.time + 10 * 60 * 2 {
return Ok(Some((max_target_bits, max_target)));
@@ -775,34 +1090,20 @@ impl SpvClient {
let first_header =
match self.read_block_header((interval - 1) * BLOCK_DIFFICULTY_CHUNK_SIZE)? {
Some(res) => res.header,
Some(res) => res,
None => return Ok(None),
};
let last_header =
match self.read_block_header(interval * BLOCK_DIFFICULTY_CHUNK_SIZE - 1)? {
Some(res) => res.header,
Some(res) => res,
None => return Ok(None),
};
// find actual timespan as being clamped between +/- 4x of the target timespan
let mut actual_timespan = (last_header.time - first_header.time) as u64;
let target_timespan = BLOCK_DIFFICULTY_INTERVAL as u64;
if actual_timespan < (target_timespan / 4) {
actual_timespan = target_timespan / 4;
}
if actual_timespan > (target_timespan * 4) {
actual_timespan = target_timespan * 4;
}
let last_target = last_header.target();
let new_target =
last_target * Uint256::from_u64(actual_timespan) / Uint256::from_u64(target_timespan);
let target = cmp::min(new_target, max_target);
let bits = BlockHeader::compact_target_from_u256(&target);
Ok(Some((bits, target)))
Ok(Some(SpvClient::get_target_between_headers(
&first_header,
&last_header,
)))
}
/// Ask for the next batch of headers (note that this will return the maximal size of headers)
@@ -1260,15 +1561,16 @@ mod test {
assert_eq!(spv_client.read_block_headers(0, 10).unwrap(), all_headers);
// should fail
if let Err(btc_error::NoncontiguousHeader) =
spv_client.insert_block_headers_before(2, headers.clone())
{
} else {
assert!(false);
}
// should succeed, since we only check that the last header connects
// to its child, if the child is stored at all
spv_client
.insert_block_headers_before(1, headers.clone())
.unwrap();
spv_client
.insert_block_headers_before(2, headers.clone())
.unwrap();
// should fail
// should fail now, since there's a child to check
if let Err(btc_error::NoncontiguousHeader) =
spv_client.insert_block_headers_before(1, headers.clone())
{

View File

@@ -980,27 +980,24 @@ impl Burnchain {
if sync_height + 1 < orig_header_height {
// a reorg happened
warn!(
"Dropping headers higher than {} due to burnchain reorg",
"Dropped headers higher than {} due to burnchain reorg",
sync_height
);
indexer.drop_headers(sync_height)?;
}
// get latest headers.
debug!("Sync headers from {}", sync_height);
let highest_header = indexer.get_highest_header_height()?;
let end_block = indexer.sync_headers(sync_height, None)?;
let mut start_block = match sync_height {
0 => 0,
_ => sync_height,
};
debug!("Sync headers from {}", highest_header);
let end_block = indexer.sync_headers(highest_header, None)?;
let mut start_block = sync_height;
if db_height < start_block {
start_block = db_height;
}
debug!(
"Sync'ed headers from {} to {}. DB at {}",
start_block, end_block, db_height
highest_header, end_block, db_height
);
if start_block == db_height && db_height == end_block {
// all caught up
@@ -1218,22 +1215,22 @@ impl Burnchain {
let db_height = burn_chain_tip.block_height;
// handle reorgs
// handle reorgs (which also updates our best-known chain work and headers DB)
let (sync_height, did_reorg) = Burnchain::sync_reorg(indexer)?;
if did_reorg {
// a reorg happened
warn!(
"Dropping headers higher than {} due to burnchain reorg",
"Dropped headers higher than {} due to burnchain reorg",
sync_height
);
indexer.drop_headers(sync_height)?;
}
// get latest headers.
debug!("Sync headers from {}", sync_height);
// fetch all headers, no matter what
let mut end_block = indexer.sync_headers(sync_height, None)?;
// fetch all new headers
let highest_header_height = indexer.get_highest_header_height()?;
let mut end_block = indexer.sync_headers(highest_header_height, None)?;
if did_reorg && sync_height > 0 {
// a reorg happened, and the last header fetched
// is on a smaller fork than the one we just
@@ -1258,7 +1255,7 @@ impl Burnchain {
debug!(
"Sync'ed headers from {} to {}. DB at {}",
sync_height, end_block, db_height
highest_header_height, end_block, db_height
);
if let Some(target_block_height) = target_block_height_opt {