From 6c98cd60562df3b6d29853207fcaba8b14bf5bbd Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Mon, 2 Oct 2023 15:50:30 -0400 Subject: [PATCH] refactor: put MemPoolSyncData into the mempool module, and remove transaction streaming code --- stackslib/src/core/mempool.rs | 334 ++++++++++++++++++++-------------- 1 file changed, 202 insertions(+), 132 deletions(-) diff --git a/stackslib/src/core/mempool.rs b/stackslib/src/core/mempool.rs index c6b155f1a..8b04f6de0 100644 --- a/stackslib/src/core/mempool.rs +++ b/stackslib/src/core/mempool.rs @@ -18,6 +18,7 @@ use std::cmp::{self, Ordering}; use std::collections::{HashMap, HashSet, VecDeque}; use std::fs; use std::hash::Hasher; +use std::io; use std::io::{Read, Write}; use std::ops::Deref; use std::ops::DerefMut; @@ -50,7 +51,7 @@ use crate::core::StacksEpochId; use crate::core::FIRST_BURNCHAIN_CONSENSUS_HASH; use crate::core::FIRST_STACKS_BLOCK_HASH; use crate::monitoring::increment_stx_mempool_gc; -use crate::net::stream::TxStreamData; +use crate::net::Error as net_error; use crate::util_lib::db::query_int; use crate::util_lib::db::query_row_columns; use crate::util_lib::db::query_rows; @@ -69,8 +70,6 @@ use stacks_common::util::hash::to_hex; use stacks_common::util::hash::Sha512Trunc256Sum; use std::time::Instant; -use crate::net::MemPoolSyncData; - use crate::util_lib::bloom::{BloomCounter, BloomFilter, BloomNodeHasher}; use crate::clarity_vm::clarity::ClarityConnection; @@ -90,6 +89,9 @@ use crate::monitoring; use crate::types::chainstate::{BlockHeaderHash, StacksAddress, StacksBlockId}; use crate::util_lib::db::table_exists; +use stacks_common::codec::{read_next, write_next, MAX_MESSAGE_LEN}; +use stacks_common::util::retry::{BoundReader, RetryReader}; + // maximum number of confirmations a transaction can have before it's garbage-collected pub const MEMPOOL_MAX_TRANSACTION_AGE: u64 = 256; pub const MAXIMUM_MEMPOOL_TX_CHAINING: u64 = 25; @@ -150,6 +152,167 @@ impl StacksMessageCodec for TxTag { } } +define_u8_enum!(MemPoolSyncDataID { + BloomFilter = 0x01, + TxTags = 0x02 +}); + +#[derive(Debug, Clone, PartialEq)] +pub enum MemPoolSyncData { + BloomFilter(BloomFilter), + TxTags([u8; 32], Vec), +} + +impl StacksMessageCodec for MemPoolSyncData { + fn consensus_serialize(&self, fd: &mut W) -> Result<(), codec_error> { + match *self { + MemPoolSyncData::BloomFilter(ref bloom_filter) => { + write_next(fd, &MemPoolSyncDataID::BloomFilter.to_u8())?; + write_next(fd, bloom_filter)?; + } + MemPoolSyncData::TxTags(ref seed, ref tags) => { + write_next(fd, &MemPoolSyncDataID::TxTags.to_u8())?; + write_next(fd, seed)?; + write_next(fd, tags)?; + } + } + Ok(()) + } + + fn consensus_deserialize(fd: &mut R) -> Result { + let data_id: u8 = read_next(fd)?; + match MemPoolSyncDataID::from_u8(data_id).ok_or(codec_error::DeserializeError(format!( + "Unrecognized MemPoolSyncDataID {}", + &data_id + )))? { + MemPoolSyncDataID::BloomFilter => { + let bloom_filter: BloomFilter = read_next(fd)?; + Ok(MemPoolSyncData::BloomFilter(bloom_filter)) + } + MemPoolSyncDataID::TxTags => { + let seed: [u8; 32] = read_next(fd)?; + let txtags: Vec = read_next(fd)?; + Ok(MemPoolSyncData::TxTags(seed, txtags)) + } + } + } +} + +/// Read the trailing page ID from a transaction stream +fn parse_mempool_query_page_id( + pos: usize, + retry_reader: &mut RetryReader<'_, R>, +) -> Result, net_error> { + // possibly end-of-transactions, in which case, the last 32 bytes should be + // a page ID. Expect end-of-stream after this. + retry_reader.set_position(pos); + let next_page: Txid = match read_next(retry_reader) { + Ok(txid) => txid, + Err(e) => match e { + codec_error::ReadError(ref ioe) => match ioe.kind() { + io::ErrorKind::UnexpectedEof => { + if pos == retry_reader.position() { + // this is fine -- the node didn't get another page + return Ok(None); + } else { + // partial data -- corrupt stream + test_debug!("Unexpected EOF: {} != {}", pos, retry_reader.position()); + return Err(e.into()); + } + } + _ => { + return Err(e.into()); + } + }, + e => { + return Err(e.into()); + } + }, + }; + + test_debug!("Read page_id {:?}", &next_page); + Ok(Some(next_page)) +} + +/// Decode a transaction stream, returned from /v2/mempool/query. +/// The wire format is a list of transactions (no SIP-003 length prefix), followed by an +/// optional 32-byte page ID. Obtain both the transactions and page ID, if it exists. +pub fn decode_tx_stream( + fd: &mut R, +) -> Result<(Vec, Option), net_error> { + // The wire format is `tx, tx, tx, tx, .., tx, txid`. + // The last 32 bytes are the page ID for the next mempool query. + // NOTE: there will be no length prefix on this. + let mut txs: Vec = vec![]; + let mut bound_reader = BoundReader::from_reader(fd, MAX_MESSAGE_LEN as u64); + let mut retry_reader = RetryReader::new(&mut bound_reader); + let mut page_id = None; + let mut expect_eof = false; + + loop { + let pos = retry_reader.position(); + let next_msg: Result = read_next(&mut retry_reader); + match next_msg { + Ok(tx) => { + if expect_eof { + // this should have failed + test_debug!("Expected EOF; got transaction {}", tx.txid()); + return Err(net_error::ExpectedEndOfStream); + } + + test_debug!("Read transaction {}", tx.txid()); + txs.push(tx); + Ok(()) + } + Err(e) => match e { + codec_error::ReadError(ref ioe) => match ioe.kind() { + io::ErrorKind::UnexpectedEof => { + if expect_eof { + if pos != retry_reader.position() { + // read partial data. The stream is corrupt. + test_debug!( + "Expected EOF; stream advanced from {} to {}", + pos, + retry_reader.position() + ); + return Err(net_error::ExpectedEndOfStream); + } + } else { + // couldn't read a full transaction. This is possibly a page ID, whose + // 32 bytes decode to the prefix of a well-formed transaction. + test_debug!("Try to read page ID trailer after ReadError"); + page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?; + } + break; + } + _ => Err(e), + }, + codec_error::DeserializeError(_msg) => { + if expect_eof { + // this should have failed due to EOF + test_debug!("Expected EOF; got DeserializeError '{}'", &_msg); + return Err(net_error::ExpectedEndOfStream); + } + + // failed to parse a transaction. This is possibly a page ID. + test_debug!("Try to read page ID trailer after ReadError"); + page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?; + + // do one more pass to make sure we're actually end-of-stream. + // otherwise, the stream itself was corrupt, since any 32 bytes is a valid + // txid and the presence of more bytes means that we simply got a bad tx + // that we couldn't decode. + expect_eof = true; + Ok(()) + } + _ => Err(e), + }, + }?; + } + + Ok((txs, page_id)) +} + pub struct MemPoolAdmitter { cur_block: BlockHeaderHash, cur_consensus_hash: ConsensusHash, @@ -1212,6 +1375,21 @@ impl MemPoolDB { }) } + pub fn reopen(&self, readwrite: bool) -> Result { + if let Err(e) = fs::metadata(&self.path) { + return Err(db_error::IOError(e)); + } + + let open_flags = if readwrite { + OpenFlags::SQLITE_OPEN_READ_WRITE + } else { + OpenFlags::SQLITE_OPEN_READ_ONLY + }; + + let conn = sqlite_open(&self.path, open_flags, true)?; + Ok(conn) + } + /// Open the mempool db within the chainstate directory. /// The chainstate must be instantiated already. pub fn open( @@ -2445,6 +2623,24 @@ impl MemPoolDB { query_row(&self.conn(), sql, args) } + pub fn find_next_missing_transactions( + &self, + data: &MemPoolSyncData, + height: u64, + last_randomized_txid: &Txid, + max_txs: u64, + max_run: u64, + ) -> Result<(Vec, Option, u64), db_error> { + Self::static_find_next_missing_transactions( + self.conn(), + data, + height, + last_randomized_txid, + max_txs, + max_run, + ) + } + /// Get the next batch of transactions from our mempool that are *not* represented in the given /// MemPoolSyncData. Transactions are ordered lexicographically by randomized_txids.hashed_txid, since this allows us /// to use the txid as a cursor while ensuring that each node returns txids in a deterministic random order @@ -2452,8 +2648,8 @@ impl MemPoolDB { /// a requesting node will still have a good chance of getting something useful). /// Also, return the next value to pass for `last_randomized_txid` to load the next page. /// Also, return the number of rows considered. - pub fn find_next_missing_transactions( - &self, + pub fn static_find_next_missing_transactions( + conn: &DBConn, data: &MemPoolSyncData, height: u64, last_randomized_txid: &Txid, @@ -2483,7 +2679,7 @@ impl MemPoolDB { } } - let mut stmt = self.conn().prepare(sql)?; + let mut stmt = conn.prepare(sql)?; let mut rows = stmt.query(args)?; let mut num_rows_visited = 0; let mut next_page = None; @@ -2528,130 +2724,4 @@ impl MemPoolDB { Ok((ret, next_page, num_rows_visited)) } - - /// Stream transaction data. - /// Send back one transaction at a time. - pub fn stream_txs( - &self, - fd: &mut W, - query: &mut TxStreamData, - count: u64, - ) -> Result { - let mut num_written = 0; - while num_written < count { - // write out bufferred tx - let start = query.tx_buf_ptr; - let end = cmp::min(query.tx_buf.len(), ((start as u64) + count) as usize); - fd.write_all(&query.tx_buf[start..end]) - .map_err(ChainstateError::WriteError)?; - - let nw = end.saturating_sub(start) as u64; - - query.tx_buf_ptr = end; - num_written += nw; - - if query.tx_buf_ptr >= query.tx_buf.len() { - if query.corked { - // we're done - test_debug!( - "Finished streaming txs; last page was {:?}", - &query.last_randomized_txid - ); - break; - } - - if query.num_txs >= query.max_txs { - // no more space in this stream - debug!( - "No more space in this query after {:?}. Corking tx stream.", - &query.last_randomized_txid - ); - - // send the next page ID - query.tx_buf_ptr = 0; - query.tx_buf.clear(); - query.corked = true; - - query - .last_randomized_txid - .consensus_serialize(&mut query.tx_buf) - .map_err(ChainstateError::CodecError)?; - continue; - } - - // load next - let remaining = query.max_txs.saturating_sub(query.num_txs); - let (next_txs, next_last_randomized_txid_opt, num_rows_visited) = self - .find_next_missing_transactions( - &query.tx_query, - query.height, - &query.last_randomized_txid, - 1, - remaining, - )?; - - debug!( - "Streaming mempool propagation stepped"; - "rows_visited" => num_rows_visited, - "last_rand_txid" => %query.last_randomized_txid, - "num_txs" => query.num_txs, - "max_txs" => query.max_txs - ); - - query.num_txs += num_rows_visited; - if next_txs.len() > 0 { - query.tx_buf_ptr = 0; - query.tx_buf.clear(); - - for next_tx in next_txs.iter() { - next_tx - .consensus_serialize(&mut query.tx_buf) - .map_err(ChainstateError::CodecError)?; - } - if let Some(next_last_randomized_txid) = next_last_randomized_txid_opt { - query.last_randomized_txid = next_last_randomized_txid; - } else { - test_debug!( - "No more txs after {}", - &next_txs - .last() - .map(|tx| tx.txid()) - .unwrap_or(Txid([0u8; 32])) - ); - break; - } - } else if let Some(next_txid) = next_last_randomized_txid_opt { - test_debug!( - "No rows returned for {}; cork tx stream with next page {}", - &query.last_randomized_txid, - &next_txid - ); - - // no rows found - query.last_randomized_txid = next_txid; - - // send the next page ID - query.tx_buf_ptr = 0; - query.tx_buf.clear(); - query.corked = true; - - query - .last_randomized_txid - .consensus_serialize(&mut query.tx_buf) - .map_err(ChainstateError::CodecError)?; - } else if next_last_randomized_txid_opt.is_none() { - // no more transactions - test_debug!( - "No more txs to send after {:?}; corking stream", - &query.last_randomized_txid - ); - - query.tx_buf_ptr = 0; - query.tx_buf.clear(); - query.corked = true; - } - } - } - Ok(num_written) - } }