refactor: put MemPoolSyncData into the mempool module, and remove transaction streaming code

This commit is contained in:
Jude Nelson
2023-10-02 15:50:30 -04:00
parent 2bd674f879
commit 6c98cd6056

View File

@@ -18,6 +18,7 @@ use std::cmp::{self, Ordering};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs;
use std::hash::Hasher;
use std::io;
use std::io::{Read, Write};
use std::ops::Deref;
use std::ops::DerefMut;
@@ -50,7 +51,7 @@ use crate::core::StacksEpochId;
use crate::core::FIRST_BURNCHAIN_CONSENSUS_HASH;
use crate::core::FIRST_STACKS_BLOCK_HASH;
use crate::monitoring::increment_stx_mempool_gc;
use crate::net::stream::TxStreamData;
use crate::net::Error as net_error;
use crate::util_lib::db::query_int;
use crate::util_lib::db::query_row_columns;
use crate::util_lib::db::query_rows;
@@ -69,8 +70,6 @@ use stacks_common::util::hash::to_hex;
use stacks_common::util::hash::Sha512Trunc256Sum;
use std::time::Instant;
use crate::net::MemPoolSyncData;
use crate::util_lib::bloom::{BloomCounter, BloomFilter, BloomNodeHasher};
use crate::clarity_vm::clarity::ClarityConnection;
@@ -90,6 +89,9 @@ use crate::monitoring;
use crate::types::chainstate::{BlockHeaderHash, StacksAddress, StacksBlockId};
use crate::util_lib::db::table_exists;
use stacks_common::codec::{read_next, write_next, MAX_MESSAGE_LEN};
use stacks_common::util::retry::{BoundReader, RetryReader};
// maximum number of confirmations a transaction can have before it's garbage-collected
pub const MEMPOOL_MAX_TRANSACTION_AGE: u64 = 256;
pub const MAXIMUM_MEMPOOL_TX_CHAINING: u64 = 25;
@@ -150,6 +152,167 @@ impl StacksMessageCodec for TxTag {
}
}
define_u8_enum!(MemPoolSyncDataID {
BloomFilter = 0x01,
TxTags = 0x02
});
#[derive(Debug, Clone, PartialEq)]
pub enum MemPoolSyncData {
BloomFilter(BloomFilter<BloomNodeHasher>),
TxTags([u8; 32], Vec<TxTag>),
}
impl StacksMessageCodec for MemPoolSyncData {
fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), codec_error> {
match *self {
MemPoolSyncData::BloomFilter(ref bloom_filter) => {
write_next(fd, &MemPoolSyncDataID::BloomFilter.to_u8())?;
write_next(fd, bloom_filter)?;
}
MemPoolSyncData::TxTags(ref seed, ref tags) => {
write_next(fd, &MemPoolSyncDataID::TxTags.to_u8())?;
write_next(fd, seed)?;
write_next(fd, tags)?;
}
}
Ok(())
}
fn consensus_deserialize<R: Read>(fd: &mut R) -> Result<MemPoolSyncData, codec_error> {
let data_id: u8 = read_next(fd)?;
match MemPoolSyncDataID::from_u8(data_id).ok_or(codec_error::DeserializeError(format!(
"Unrecognized MemPoolSyncDataID {}",
&data_id
)))? {
MemPoolSyncDataID::BloomFilter => {
let bloom_filter: BloomFilter<BloomNodeHasher> = read_next(fd)?;
Ok(MemPoolSyncData::BloomFilter(bloom_filter))
}
MemPoolSyncDataID::TxTags => {
let seed: [u8; 32] = read_next(fd)?;
let txtags: Vec<TxTag> = read_next(fd)?;
Ok(MemPoolSyncData::TxTags(seed, txtags))
}
}
}
}
/// Read the trailing page ID from a transaction stream
fn parse_mempool_query_page_id<R: Read>(
pos: usize,
retry_reader: &mut RetryReader<'_, R>,
) -> Result<Option<Txid>, net_error> {
// possibly end-of-transactions, in which case, the last 32 bytes should be
// a page ID. Expect end-of-stream after this.
retry_reader.set_position(pos);
let next_page: Txid = match read_next(retry_reader) {
Ok(txid) => txid,
Err(e) => match e {
codec_error::ReadError(ref ioe) => match ioe.kind() {
io::ErrorKind::UnexpectedEof => {
if pos == retry_reader.position() {
// this is fine -- the node didn't get another page
return Ok(None);
} else {
// partial data -- corrupt stream
test_debug!("Unexpected EOF: {} != {}", pos, retry_reader.position());
return Err(e.into());
}
}
_ => {
return Err(e.into());
}
},
e => {
return Err(e.into());
}
},
};
test_debug!("Read page_id {:?}", &next_page);
Ok(Some(next_page))
}
/// Decode a transaction stream, returned from /v2/mempool/query.
/// The wire format is a list of transactions (no SIP-003 length prefix), followed by an
/// optional 32-byte page ID. Obtain both the transactions and page ID, if it exists.
pub fn decode_tx_stream<R: Read>(
fd: &mut R,
) -> Result<(Vec<StacksTransaction>, Option<Txid>), net_error> {
// The wire format is `tx, tx, tx, tx, .., tx, txid`.
// The last 32 bytes are the page ID for the next mempool query.
// NOTE: there will be no length prefix on this.
let mut txs: Vec<StacksTransaction> = vec![];
let mut bound_reader = BoundReader::from_reader(fd, MAX_MESSAGE_LEN as u64);
let mut retry_reader = RetryReader::new(&mut bound_reader);
let mut page_id = None;
let mut expect_eof = false;
loop {
let pos = retry_reader.position();
let next_msg: Result<StacksTransaction, _> = read_next(&mut retry_reader);
match next_msg {
Ok(tx) => {
if expect_eof {
// this should have failed
test_debug!("Expected EOF; got transaction {}", tx.txid());
return Err(net_error::ExpectedEndOfStream);
}
test_debug!("Read transaction {}", tx.txid());
txs.push(tx);
Ok(())
}
Err(e) => match e {
codec_error::ReadError(ref ioe) => match ioe.kind() {
io::ErrorKind::UnexpectedEof => {
if expect_eof {
if pos != retry_reader.position() {
// read partial data. The stream is corrupt.
test_debug!(
"Expected EOF; stream advanced from {} to {}",
pos,
retry_reader.position()
);
return Err(net_error::ExpectedEndOfStream);
}
} else {
// couldn't read a full transaction. This is possibly a page ID, whose
// 32 bytes decode to the prefix of a well-formed transaction.
test_debug!("Try to read page ID trailer after ReadError");
page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?;
}
break;
}
_ => Err(e),
},
codec_error::DeserializeError(_msg) => {
if expect_eof {
// this should have failed due to EOF
test_debug!("Expected EOF; got DeserializeError '{}'", &_msg);
return Err(net_error::ExpectedEndOfStream);
}
// failed to parse a transaction. This is possibly a page ID.
test_debug!("Try to read page ID trailer after ReadError");
page_id = parse_mempool_query_page_id(pos, &mut retry_reader)?;
// do one more pass to make sure we're actually end-of-stream.
// otherwise, the stream itself was corrupt, since any 32 bytes is a valid
// txid and the presence of more bytes means that we simply got a bad tx
// that we couldn't decode.
expect_eof = true;
Ok(())
}
_ => Err(e),
},
}?;
}
Ok((txs, page_id))
}
pub struct MemPoolAdmitter {
cur_block: BlockHeaderHash,
cur_consensus_hash: ConsensusHash,
@@ -1212,6 +1375,21 @@ impl MemPoolDB {
})
}
pub fn reopen(&self, readwrite: bool) -> Result<DBConn, db_error> {
if let Err(e) = fs::metadata(&self.path) {
return Err(db_error::IOError(e));
}
let open_flags = if readwrite {
OpenFlags::SQLITE_OPEN_READ_WRITE
} else {
OpenFlags::SQLITE_OPEN_READ_ONLY
};
let conn = sqlite_open(&self.path, open_flags, true)?;
Ok(conn)
}
/// Open the mempool db within the chainstate directory.
/// The chainstate must be instantiated already.
pub fn open(
@@ -2445,6 +2623,24 @@ impl MemPoolDB {
query_row(&self.conn(), sql, args)
}
pub fn find_next_missing_transactions(
&self,
data: &MemPoolSyncData,
height: u64,
last_randomized_txid: &Txid,
max_txs: u64,
max_run: u64,
) -> Result<(Vec<StacksTransaction>, Option<Txid>, u64), db_error> {
Self::static_find_next_missing_transactions(
self.conn(),
data,
height,
last_randomized_txid,
max_txs,
max_run,
)
}
/// Get the next batch of transactions from our mempool that are *not* represented in the given
/// MemPoolSyncData. Transactions are ordered lexicographically by randomized_txids.hashed_txid, since this allows us
/// to use the txid as a cursor while ensuring that each node returns txids in a deterministic random order
@@ -2452,8 +2648,8 @@ impl MemPoolDB {
/// a requesting node will still have a good chance of getting something useful).
/// Also, return the next value to pass for `last_randomized_txid` to load the next page.
/// Also, return the number of rows considered.
pub fn find_next_missing_transactions(
&self,
pub fn static_find_next_missing_transactions(
conn: &DBConn,
data: &MemPoolSyncData,
height: u64,
last_randomized_txid: &Txid,
@@ -2483,7 +2679,7 @@ impl MemPoolDB {
}
}
let mut stmt = self.conn().prepare(sql)?;
let mut stmt = conn.prepare(sql)?;
let mut rows = stmt.query(args)?;
let mut num_rows_visited = 0;
let mut next_page = None;
@@ -2528,130 +2724,4 @@ impl MemPoolDB {
Ok((ret, next_page, num_rows_visited))
}
/// Stream transaction data.
/// Send back one transaction at a time.
pub fn stream_txs<W: Write>(
&self,
fd: &mut W,
query: &mut TxStreamData,
count: u64,
) -> Result<u64, ChainstateError> {
let mut num_written = 0;
while num_written < count {
// write out bufferred tx
let start = query.tx_buf_ptr;
let end = cmp::min(query.tx_buf.len(), ((start as u64) + count) as usize);
fd.write_all(&query.tx_buf[start..end])
.map_err(ChainstateError::WriteError)?;
let nw = end.saturating_sub(start) as u64;
query.tx_buf_ptr = end;
num_written += nw;
if query.tx_buf_ptr >= query.tx_buf.len() {
if query.corked {
// we're done
test_debug!(
"Finished streaming txs; last page was {:?}",
&query.last_randomized_txid
);
break;
}
if query.num_txs >= query.max_txs {
// no more space in this stream
debug!(
"No more space in this query after {:?}. Corking tx stream.",
&query.last_randomized_txid
);
// send the next page ID
query.tx_buf_ptr = 0;
query.tx_buf.clear();
query.corked = true;
query
.last_randomized_txid
.consensus_serialize(&mut query.tx_buf)
.map_err(ChainstateError::CodecError)?;
continue;
}
// load next
let remaining = query.max_txs.saturating_sub(query.num_txs);
let (next_txs, next_last_randomized_txid_opt, num_rows_visited) = self
.find_next_missing_transactions(
&query.tx_query,
query.height,
&query.last_randomized_txid,
1,
remaining,
)?;
debug!(
"Streaming mempool propagation stepped";
"rows_visited" => num_rows_visited,
"last_rand_txid" => %query.last_randomized_txid,
"num_txs" => query.num_txs,
"max_txs" => query.max_txs
);
query.num_txs += num_rows_visited;
if next_txs.len() > 0 {
query.tx_buf_ptr = 0;
query.tx_buf.clear();
for next_tx in next_txs.iter() {
next_tx
.consensus_serialize(&mut query.tx_buf)
.map_err(ChainstateError::CodecError)?;
}
if let Some(next_last_randomized_txid) = next_last_randomized_txid_opt {
query.last_randomized_txid = next_last_randomized_txid;
} else {
test_debug!(
"No more txs after {}",
&next_txs
.last()
.map(|tx| tx.txid())
.unwrap_or(Txid([0u8; 32]))
);
break;
}
} else if let Some(next_txid) = next_last_randomized_txid_opt {
test_debug!(
"No rows returned for {}; cork tx stream with next page {}",
&query.last_randomized_txid,
&next_txid
);
// no rows found
query.last_randomized_txid = next_txid;
// send the next page ID
query.tx_buf_ptr = 0;
query.tx_buf.clear();
query.corked = true;
query
.last_randomized_txid
.consensus_serialize(&mut query.tx_buf)
.map_err(ChainstateError::CodecError)?;
} else if next_last_randomized_txid_opt.is_none() {
// no more transactions
test_debug!(
"No more txs to send after {:?}; corking stream",
&query.last_randomized_txid
);
query.tx_buf_ptr = 0;
query.tx_buf.clear();
query.corked = true;
}
}
}
Ok(num_written)
}
}