diff --git a/stackslib/src/core/tests/mod.rs b/stackslib/src/core/tests/mod.rs index ea713481c..c4ce530ed 100644 --- a/stackslib/src/core/tests/mod.rs +++ b/stackslib/src/core/tests/mod.rs @@ -40,15 +40,14 @@ use crate::chainstate::stacks::{ C32_ADDRESS_VERSION_MAINNET_SINGLESIG, C32_ADDRESS_VERSION_TESTNET_SINGLESIG, }; use crate::core::mempool::db_get_all_nonces; +use crate::core::mempool::decode_tx_stream; +use crate::core::mempool::MemPoolSyncData; use crate::core::mempool::MemPoolWalkSettings; use crate::core::mempool::TxTag; use crate::core::mempool::{BLOOM_COUNTER_DEPTH, BLOOM_COUNTER_ERROR_RATE, MAX_BLOOM_COUNTER_TXS}; use crate::core::FIRST_BURNCHAIN_CONSENSUS_HASH; use crate::core::FIRST_STACKS_BLOCK_HASH; -use crate::net::stream::StreamCursor; use crate::net::Error as NetError; -use crate::net::HttpResponseType; -use crate::net::MemPoolSyncData; use crate::util_lib::bloom::test::setup_bloom_counter; use crate::util_lib::bloom::*; use crate::util_lib::db::{tx_begin_immediate, DBConn, FromRow}; @@ -2447,352 +2446,6 @@ fn test_find_next_missing_transactions() { assert!(next_page_opt.is_none()); } -#[test] -fn test_stream_txs() { - let mut chainstate = instantiate_chainstate(false, 0x80000000, function_name!()); - let chainstate_path = chainstate_path(function_name!()); - let mut mempool = MemPoolDB::open_test(false, 0x80000000, &chainstate_path).unwrap(); - - let addr = StacksAddress { - version: 1, - bytes: Hash160([0xff; 20]), - }; - let mut txs = vec![]; - let block_height = 10; - let mut total_len = 0; - - let mut mempool_tx = mempool.tx_begin().unwrap(); - for i in 0..10 { - let pk = StacksPrivateKey::new(); - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(0); - - let txid = tx.txid(); - let tx_bytes = tx.serialize_to_vec(); - let origin_addr = tx.origin_address(); - let origin_nonce = tx.get_origin_nonce(); - let sponsor_addr = tx.sponsor_address().unwrap_or(origin_addr.clone()); - let sponsor_nonce = tx.get_sponsor_nonce().unwrap_or(origin_nonce); - let tx_fee = tx.get_tx_fee(); - - total_len += tx_bytes.len(); - - // should succeed - MemPoolDB::try_add_tx( - &mut mempool_tx, - &mut chainstate, - &ConsensusHash([0x1 + (block_height as u8); 20]), - &BlockHeaderHash([0x2 + (block_height as u8); 32]), - txid.clone(), - tx_bytes, - tx_fee, - block_height as u64, - &origin_addr, - origin_nonce, - &sponsor_addr, - sponsor_nonce, - None, - ) - .unwrap(); - - eprintln!("Added {} {}", i, &txid); - txs.push(tx); - } - mempool_tx.commit().unwrap(); - - let mut buf = vec![]; - let stream = StreamCursor::new_tx_stream( - MemPoolSyncData::TxTags([0u8; 32], vec![]), - MAX_BLOOM_COUNTER_TXS.into(), - block_height, - Some(Txid([0u8; 32])), - ); - let mut tx_stream_data = if let StreamCursor::MempoolTxs(stream_data) = stream { - stream_data - } else { - unreachable!(); - }; - - loop { - let nw = match mempool.stream_txs(&mut buf, &mut tx_stream_data, 10) { - Ok(nw) => nw, - Err(e) => { - error!("Failed to stream_to: {:?}", &e); - panic!(); - } - }; - if nw == 0 { - break; - } - } - - eprintln!("Read {} bytes of tx data", buf.len()); - - // buf decodes to the list of txs we have - let mut decoded_txs = vec![]; - let mut ptr = &buf[..]; - loop { - let tx: StacksTransaction = match read_next::(&mut ptr) { - Ok(tx) => tx, - Err(e) => match e { - codec_error::ReadError(ref ioe) => match ioe.kind() { - io::ErrorKind::UnexpectedEof => { - eprintln!("out of transactions"); - break; - } - _ => { - panic!("IO error: {:?}", &e); - } - }, - _ => { - panic!("other error: {:?}", &e); - } - }, - }; - decoded_txs.push(tx); - } - - let mut tx_set = HashSet::new(); - for tx in txs.iter() { - tx_set.insert(tx.txid()); - } - - // the order won't be preserved - assert_eq!(tx_set.len(), decoded_txs.len()); - for tx in decoded_txs { - assert!(tx_set.contains(&tx.txid())); - } - - // verify that we can stream through pagination, with an empty tx tags - let mut page_id = Txid([0u8; 32]); - let mut decoded_txs = vec![]; - loop { - let stream = StreamCursor::new_tx_stream( - MemPoolSyncData::TxTags([0u8; 32], vec![]), - 1, - block_height, - Some(page_id), - ); - - let mut tx_stream_data = if let StreamCursor::MempoolTxs(stream_data) = stream { - stream_data - } else { - unreachable!(); - }; - - let mut buf = vec![]; - loop { - let nw = match mempool.stream_txs(&mut buf, &mut tx_stream_data, 10) { - Ok(nw) => nw, - Err(e) => { - error!("Failed to stream_to: {:?}", &e); - panic!(); - } - }; - if nw == 0 { - break; - } - } - - // buf decodes to the list of txs we have, plus page ids - let mut ptr = &buf[..]; - test_debug!("Decode {}", to_hex(ptr)); - let (mut next_txs, next_page) = HttpResponseType::decode_tx_stream(&mut ptr, None).unwrap(); - - decoded_txs.append(&mut next_txs); - - // for fun, use a page ID that is actually a well-formed prefix of a transaction - if let Some(ref tx) = decoded_txs.last() { - let mut evil_buf = tx.serialize_to_vec(); - let mut evil_page_id = [0u8; 32]; - evil_page_id.copy_from_slice(&evil_buf[0..32]); - evil_buf.extend_from_slice(&evil_page_id); - - test_debug!("Decode evil buf {}", &to_hex(&evil_buf)); - - let (evil_next_txs, evil_next_page) = - HttpResponseType::decode_tx_stream(&mut &evil_buf[..], None).unwrap(); - - // should still work - assert_eq!(evil_next_txs.len(), 1); - assert_eq!(evil_next_txs[0].txid(), tx.txid()); - assert_eq!(evil_next_page.unwrap().0[0..32], evil_buf[0..32]); - } - - if let Some(next_page) = next_page { - page_id = next_page; - } else { - break; - } - } - - // make sure we got them all - let mut tx_set = HashSet::new(); - for tx in txs.iter() { - tx_set.insert(tx.txid()); - } - - // the order won't be preserved - assert_eq!(tx_set.len(), decoded_txs.len()); - for tx in decoded_txs { - assert!(tx_set.contains(&tx.txid())); - } - - // verify that we can stream through pagination, with a full bloom filter - let mut page_id = Txid([0u8; 32]); - let all_txs_tags: Vec<_> = txs - .iter() - .map(|tx| TxTag::from(&[0u8; 32], &tx.txid())) - .collect(); - loop { - let stream = StreamCursor::new_tx_stream( - MemPoolSyncData::TxTags([0u8; 32], all_txs_tags.clone()), - 1, - block_height, - Some(page_id), - ); - - let mut tx_stream_data = if let StreamCursor::MempoolTxs(stream_data) = stream { - stream_data - } else { - unreachable!(); - }; - - let mut buf = vec![]; - loop { - let nw = match mempool.stream_txs(&mut buf, &mut tx_stream_data, 10) { - Ok(nw) => nw, - Err(e) => { - error!("Failed to stream_to: {:?}", &e); - panic!(); - } - }; - if nw == 0 { - break; - } - } - - // buf decodes to an empty list of txs, plus page ID - let mut ptr = &buf[..]; - test_debug!("Decode {}", to_hex(ptr)); - let (next_txs, next_page) = HttpResponseType::decode_tx_stream(&mut ptr, None).unwrap(); - - assert_eq!(next_txs.len(), 0); - - if let Some(next_page) = next_page { - page_id = next_page; - } else { - break; - } - } -} - -#[test] -fn test_decode_tx_stream() { - let addr = StacksAddress { - version: 1, - bytes: Hash160([0xff; 20]), - }; - let mut txs = vec![]; - for _i in 0..10 { - let pk = StacksPrivateKey::new(); - let mut tx = StacksTransaction { - version: TransactionVersion::Testnet, - chain_id: 0x80000000, - auth: TransactionAuth::from_p2pkh(&pk).unwrap(), - anchor_mode: TransactionAnchorMode::Any, - post_condition_mode: TransactionPostConditionMode::Allow, - post_conditions: vec![], - payload: TransactionPayload::TokenTransfer( - addr.to_account_principal(), - 123, - TokenTransferMemo([0u8; 34]), - ), - }; - tx.set_tx_fee(1000); - tx.set_origin_nonce(0); - txs.push(tx); - } - - // valid empty tx stream - let empty_stream = [0x11u8; 32]; - let (next_txs, next_page) = - HttpResponseType::decode_tx_stream(&mut empty_stream.as_ref(), None).unwrap(); - assert_eq!(next_txs.len(), 0); - assert_eq!(next_page, Some(Txid([0x11; 32]))); - - // valid tx stream with a page id at the end - let mut tx_stream: Vec = vec![]; - for tx in txs.iter() { - tx.consensus_serialize(&mut tx_stream).unwrap(); - } - tx_stream.extend_from_slice(&[0x22; 32]); - - let (next_txs, next_page) = - HttpResponseType::decode_tx_stream(&mut &tx_stream[..], None).unwrap(); - assert_eq!(next_txs, txs); - assert_eq!(next_page, Some(Txid([0x22; 32]))); - - // valid tx stream with _no_ page id at the end - let mut partial_stream: Vec = vec![]; - txs[0].consensus_serialize(&mut partial_stream).unwrap(); - let (next_txs, next_page) = - HttpResponseType::decode_tx_stream(&mut &partial_stream[..], None).unwrap(); - assert_eq!(next_txs.len(), 1); - assert_eq!(next_txs[0], txs[0]); - assert!(next_page.is_none()); - - // garbage tx stream - let garbage_stream = [0xff; 256]; - let err = HttpResponseType::decode_tx_stream(&mut garbage_stream.as_ref(), None); - match err { - Err(NetError::ExpectedEndOfStream) => {} - x => { - error!("did not fail: {:?}", &x); - panic!(); - } - } - - // tx stream that is too short - let short_stream = [0x33u8; 33]; - let err = HttpResponseType::decode_tx_stream(&mut short_stream.as_ref(), None); - match err { - Err(NetError::ExpectedEndOfStream) => {} - x => { - error!("did not fail: {:?}", &x); - panic!(); - } - } - - // tx stream has a tx, a page ID, and then another tx - let mut interrupted_stream = vec![]; - txs[0].consensus_serialize(&mut interrupted_stream).unwrap(); - interrupted_stream.extend_from_slice(&[0x00u8; 32]); - txs[1].consensus_serialize(&mut interrupted_stream).unwrap(); - - let err = HttpResponseType::decode_tx_stream(&mut &interrupted_stream[..], None); - match err { - Err(NetError::ExpectedEndOfStream) => {} - x => { - error!("did not fail: {:?}", &x); - panic!(); - } - } -} - #[test] fn test_drop_and_blacklist_txs_by_time() { let mut chainstate = instantiate_chainstate(false, 0x80000000, function_name!());