From b721b0a8b01592720d144ebb24a4405dd8f462e3 Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Mon, 6 Dec 2021 16:47:45 -0500 Subject: [PATCH] refactor: implement StreamCursor variants and and Streamer trait instead of a "god struct" BlockStreamData; switch &String to &str; refactor tests with new APIs --- src/chainstate/stacks/db/blocks.rs | 698 +++++++++++++++++------------ 1 file changed, 416 insertions(+), 282 deletions(-) diff --git a/src/chainstate/stacks/db/blocks.rs b/src/chainstate/stacks/db/blocks.rs index 36d4f488a..5906d6dd4 100644 --- a/src/chainstate/stacks/db/blocks.rs +++ b/src/chainstate/stacks/db/blocks.rs @@ -374,135 +374,7 @@ impl StagingMicroblock { } } -impl BlockStreamData { - pub fn new_block(index_block_hash: StacksBlockId) -> BlockStreamData { - BlockStreamData { - index_block_hash: index_block_hash, - rowid: None, - offset: 0, - total_bytes: 0, - - is_microblock: false, - microblock_hash: BlockHeaderHash([0u8; 32]), - parent_index_block_hash: StacksBlockId([0u8; 32]), - seq: 0, - unconfirmed: false, - num_items_buf: [0u8; 4], - num_items_ptr: 0, - - is_headers: false, - num_headers: 0, - } - } - - pub fn new_microblock_confirmed( - chainstate: &StacksChainState, - tail_index_microblock_hash: StacksBlockId, - ) -> Result { - // look up parent - let mblock_info = StacksChainState::load_staging_microblock_info_indexed( - &chainstate.db(), - &tail_index_microblock_hash, - )? - .ok_or(Error::NoSuchBlockError)?; - - let parent_index_block_hash = StacksBlockHeader::make_index_block_hash( - &mblock_info.consensus_hash, - &mblock_info.anchored_block_hash, - ); - - // need to send out the consensus_serialize()'ed array length before sending microblocks. - // this is exactly what seq tells us, though. - let num_items_buf = ((mblock_info.sequence as u32) + 1).to_be_bytes(); - - Ok(BlockStreamData { - index_block_hash: StacksBlockId([0u8; 32]), - rowid: None, - offset: 0, - total_bytes: 0, - - is_microblock: true, - microblock_hash: mblock_info.microblock_hash, - parent_index_block_hash: parent_index_block_hash, - seq: mblock_info.sequence, - unconfirmed: false, - num_items_buf: num_items_buf, - num_items_ptr: 0, - - is_headers: false, - num_headers: 0, - }) - } - - pub fn new_microblock_unconfirmed( - chainstate: &StacksChainState, - anchored_index_block_hash: StacksBlockId, - seq: u16, - ) -> Result { - let mblock_info = StacksChainState::load_next_descendant_microblock( - &chainstate.db(), - &anchored_index_block_hash, - seq, - )? - .ok_or(Error::NoSuchBlockError)?; - - Ok(BlockStreamData { - index_block_hash: anchored_index_block_hash.clone(), - rowid: None, - offset: 0, - total_bytes: 0, - - is_microblock: true, - microblock_hash: mblock_info.block_hash(), - parent_index_block_hash: anchored_index_block_hash, - seq: seq, - unconfirmed: true, - num_items_buf: [0u8; 4], - num_items_ptr: 4, // stops us from trying to send a length prefix - - is_headers: false, - num_headers: 0, - }) - } - - pub fn new_headers( - chainstate: &StacksChainState, - tip: &StacksBlockId, - num_headers_requested: u32, - ) -> Result { - let header_info = StacksChainState::load_staging_block_info(chainstate.db(), tip)? - .ok_or(Error::NoSuchBlockError)?; - - let num_headers = if header_info.height < (num_headers_requested as u64) { - header_info.height as u32 - } else { - num_headers_requested - }; - - test_debug!("Request for {} headers from {}", num_headers, tip); - - // need to send out the consensus_serialize()'ed array length before sending headers. - let num_items_buf = num_headers.to_be_bytes(); - Ok(BlockStreamData { - index_block_hash: tip.clone(), - rowid: None, - offset: 0, - total_bytes: 0, - - is_microblock: false, - microblock_hash: BlockHeaderHash([0u8; 32]), - parent_index_block_hash: StacksBlockId([0u8; 32]), - seq: 0, - unconfirmed: false, - - num_items_buf: num_items_buf, - num_items_ptr: 0, - - is_headers: true, - num_headers: num_headers, - }) - } - +impl MicroblockStreamData { fn stream_count(&mut self, fd: &mut W, count: u64) -> Result { let mut num_written = 0; while self.num_items_ptr < self.num_items_buf.len() && num_written < count { @@ -545,6 +417,146 @@ impl BlockStreamData { } Ok(num_written) } +} + +impl StreamCursor { + pub fn new_block(index_block_hash: StacksBlockId) -> StreamCursor { + StreamCursor::Block(BlockStreamData { + index_block_hash: index_block_hash, + offset: 0, + total_bytes: 0, + }) + } + + pub fn new_microblock_confirmed( + chainstate: &StacksChainState, + tail_index_microblock_hash: StacksBlockId, + ) -> Result { + // look up parent + let mblock_info = StacksChainState::load_staging_microblock_info_indexed( + &chainstate.db(), + &tail_index_microblock_hash, + )? + .ok_or(Error::NoSuchBlockError)?; + + let parent_index_block_hash = StacksBlockHeader::make_index_block_hash( + &mblock_info.consensus_hash, + &mblock_info.anchored_block_hash, + ); + + // need to send out the consensus_serialize()'ed array length before sending microblocks. + // this is exactly what seq tells us, though. + let num_items_buf = ((mblock_info.sequence as u32) + 1).to_be_bytes(); + + Ok(StreamCursor::Microblocks(MicroblockStreamData { + index_block_hash: StacksBlockId([0u8; 32]), + rowid: None, + offset: 0, + total_bytes: 0, + microblock_hash: mblock_info.microblock_hash, + parent_index_block_hash: parent_index_block_hash, + seq: mblock_info.sequence, + unconfirmed: false, + num_items_buf: num_items_buf, + num_items_ptr: 0, + })) + } + + pub fn new_microblock_unconfirmed( + chainstate: &StacksChainState, + anchored_index_block_hash: StacksBlockId, + seq: u16, + ) -> Result { + let mblock_info = StacksChainState::load_next_descendant_microblock( + &chainstate.db(), + &anchored_index_block_hash, + seq, + )? + .ok_or(Error::NoSuchBlockError)?; + + Ok(StreamCursor::Microblocks(MicroblockStreamData { + index_block_hash: anchored_index_block_hash.clone(), + rowid: None, + offset: 0, + total_bytes: 0, + microblock_hash: mblock_info.block_hash(), + parent_index_block_hash: anchored_index_block_hash, + seq: seq, + unconfirmed: true, + num_items_buf: [0u8; 4], + num_items_ptr: 4, // stops us from trying to send a length prefix + })) + } + + pub fn new_headers( + chainstate: &StacksChainState, + tip: &StacksBlockId, + num_headers_requested: u32, + ) -> Result { + let header_info = StacksChainState::load_staging_block_info(chainstate.db(), tip)? + .ok_or(Error::NoSuchBlockError)?; + + let num_headers = if header_info.height < (num_headers_requested as u64) { + header_info.height as u32 + } else { + num_headers_requested + }; + + test_debug!("Request for {} headers from {}", num_headers, tip); + + Ok(StreamCursor::Headers(HeaderStreamData { + index_block_hash: tip.clone(), + offset: 0, + total_bytes: 0, + num_headers: num_headers, + header_bytes: None, + end_of_stream: false, + corked: false + })) + } + + fn stream_one_byte(fd: &mut W, b: u8) -> Result { + loop { + match fd.write(&[b]) { + Ok(0) => { + // done (disconnected) + return Ok(0); + } + Ok(n) => { + return Ok(n as u64); + } + Err(e) => { + if e.kind() == io::ErrorKind::Interrupted { + // EINTR; try again + continue; + } else if e.kind() == io::ErrorKind::WouldBlock + || (cfg!(windows) && e.kind() == io::ErrorKind::TimedOut) + { + // blocked + return Ok(0); + } else { + return Err(Error::WriteError(e)); + } + } + } + } + } + + pub fn get_offset(&self) -> u64 { + match self { + StreamCursor::Block(ref stream) => stream.offset(), + StreamCursor::Microblocks(ref stream) => stream.offset(), + StreamCursor::Headers(ref stream) => stream.offset() + } + } + + pub fn add_more_bytes(&mut self, nw: u64) { + match self { + StreamCursor::Block(ref mut stream) => stream.add_bytes(nw), + StreamCursor::Microblocks(ref mut stream) => stream.add_bytes(nw), + StreamCursor::Headers(ref mut stream) => stream.add_bytes(nw) + } + } pub fn stream_to( &mut self, @@ -552,30 +564,93 @@ impl BlockStreamData { fd: &mut W, count: u64, ) -> Result { - if self.is_microblock { - let mut num_written = 0; - if !self.unconfirmed { - // Confirmed microblocks are represented as a consensus-encoded vector of - // microblocks, in reverse sequence order. - // Write 4-byte length prefix first - num_written += self.stream_count(fd, count)?; - StacksChainState::stream_microblocks_confirmed(&chainstate, fd, self, count) - .and_then(|bytes_sent| Ok(bytes_sent + num_written)) - } else { - StacksChainState::stream_microblocks_unconfirmed(&chainstate, fd, self, count) - .and_then(|bytes_sent| Ok(bytes_sent + num_written)) + match self { + StreamCursor::Microblocks(ref mut stream) => { + let mut num_written = 0; + if !stream.unconfirmed { + // Confirmed microblocks are represented as a consensus-encoded vector of + // microblocks, in reverse sequence order. + // Write 4-byte length prefix first + num_written += stream.stream_count(fd, count)?; + StacksChainState::stream_microblocks_confirmed(&chainstate, fd, stream, count) + .and_then(|bytes_sent| Ok(bytes_sent + num_written)) + } else { + StacksChainState::stream_microblocks_unconfirmed(&chainstate, fd, stream, count) + .and_then(|bytes_sent| Ok(bytes_sent + num_written)) + } + }, + StreamCursor::Headers(ref mut stream) => { + let mut num_written = 0; + if stream.total_bytes == 0 { + test_debug!("Opening header stream"); + let byte_written = StreamCursor::stream_one_byte(fd, '[' as u8)?; + num_written += byte_written; + stream.total_bytes += byte_written; + } + if stream.total_bytes > 0 { + let mut sent = chainstate + .stream_headers(fd, stream, count)?; + + if stream.end_of_stream && !stream.corked { + // end of stream; cork it + test_debug!("Corking header stream"); + let byte_written = StreamCursor::stream_one_byte(fd, ']' as u8)?; + if byte_written > 0 { + sent += byte_written; + stream.total_bytes += byte_written; + stream.corked = true; + } + } + num_written += sent; + } + Ok(num_written) + }, + StreamCursor::Block(ref mut stream) => { + chainstate.stream_block(fd, stream, count) } - } else if self.is_headers { - let num_written = self.stream_count(fd, count)?; - chainstate - .stream_headers(fd, self, count) - .and_then(|bytes_sent| Ok(bytes_sent + num_written)) - } else { - chainstate.stream_block(fd, self, count) } } } +impl Streamer for StreamCursor { + fn offset(&self) -> u64 { + self.get_offset() + } + fn add_bytes(&mut self, nw: u64) { + self.add_more_bytes(nw) + } +} + +impl Streamer for HeaderStreamData { + fn offset(&self) -> u64 { + self.offset + } + fn add_bytes(&mut self, nw: u64) { + self.offset += nw; + self.total_bytes += nw; + } +} + +impl Streamer for BlockStreamData { + fn offset(&self) -> u64 { + self.offset + } + fn add_bytes(&mut self, nw: u64) { + self.offset += nw; + self.total_bytes += nw; + } +} + +impl Streamer for MicroblockStreamData { + fn offset(&self) -> u64 { + self.offset + } + fn add_bytes(&mut self, nw: u64) { + self.offset += nw; + self.total_bytes += nw; + } +} + impl StacksChainState { fn get_index_block_pathbuf(blocks_dir: &str, index_block_hash: &StacksBlockId) -> PathBuf { let block_hash_bytes = index_block_hash.as_bytes(); @@ -636,7 +711,7 @@ impl StacksChainState { } pub fn atomic_file_store( - path: &String, + path: &str, delete_on_error: bool, mut writer: F, ) -> Result<(), Error> @@ -679,14 +754,14 @@ impl StacksChainState { Ok(()) } - pub fn atomic_file_write(path: &String, bytes: &Vec) -> Result<(), Error> { + pub fn atomic_file_write(path: &str, bytes: &Vec) -> Result<(), Error> { StacksChainState::atomic_file_store(path, false, |ref mut fd| { fd.write_all(bytes) .map_err(|e| Error::DBError(db_error::IOError(e))) }) } - pub fn get_file_size(path: &String) -> Result { + pub fn get_file_size(path: &str) -> Result { let sz = match fs::metadata(path) { Ok(md) => md.len(), Err(e) => { @@ -701,7 +776,7 @@ impl StacksChainState { Ok(sz) } - pub fn consensus_load(path: &String) -> Result { + pub fn consensus_load(path: &str) -> Result { let mut fd = fs::OpenOptions::new() .read(true) .write(false) @@ -721,7 +796,7 @@ impl StacksChainState { /// Do we have a stored a block in the chunk store? pub fn has_block_indexed( - blocks_dir: &String, + blocks_dir: &str, index_block_hash: &StacksBlockId, ) -> Result { let block_path = StacksChainState::get_index_block_path(blocks_dir, index_block_hash)?; @@ -740,7 +815,7 @@ impl StacksChainState { /// Have we processed and stored a particular block? pub fn has_stored_block( blocks_db: &DBConn, - blocks_dir: &String, + blocks_dir: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result { @@ -764,7 +839,7 @@ impl StacksChainState { /// Store a block to the chunk store, named by its hash pub fn store_block( - blocks_dir: &String, + blocks_dir: &str, consensus_hash: &ConsensusHash, block: &StacksBlock, ) -> Result<(), Error> { @@ -785,7 +860,7 @@ impl StacksChainState { /// Store an empty block to the chunk store, named by its hash. #[cfg(test)] fn store_empty_block( - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result<(), Error> { @@ -842,7 +917,7 @@ impl StacksChainState { /// Free up all state for an invalid block fn free_block_state( - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, block_header: &StacksBlockHeader, ) -> () { @@ -873,7 +948,7 @@ impl StacksChainState { #[cfg(test)] pub fn list_microblocks( blocks_conn: &DBConn, - blocks_dir: &String, + blocks_dir: &str, ) -> Result)>, Error> { let mut blocks = StacksChainState::list_blocks(blocks_conn)?; let mut ret = vec![]; @@ -900,7 +975,7 @@ impl StacksChainState { /// Returns Ok(none) if this block was found, but is known to be invalid /// Returns Err(...) on not found or I/O error pub fn load_block_bytes( - blocks_dir: &String, + blocks_dir: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result>, Error> { @@ -938,7 +1013,7 @@ impl StacksChainState { /// Returns Ok(None) if this block was found, but is known to be invalid /// Returns Err(...) on not found or I/O error pub fn load_block( - blocks_dir: &String, + blocks_dir: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -953,7 +1028,7 @@ impl StacksChainState { Ok(Some(block)) } - fn inner_load_block_header(block_path: &String) -> Result, Error> { + fn inner_load_block_header(block_path: &str) -> Result, Error> { let sz = StacksChainState::get_file_size(block_path)?; if sz == 0 { debug!("Zero-sized block {}", &block_path); @@ -969,7 +1044,7 @@ impl StacksChainState { /// Returns Ok(None) if this block was found, but is known to be invalid /// Returns Err(...) on not found or I/O error pub fn load_block_header( - blocks_dir: &String, + blocks_dir: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -989,7 +1064,7 @@ impl StacksChainState { /// Returns Ok(None) if this block was found, but is known to be invalid /// Returns Err(...) on not found or I/O error pub fn load_block_header_indexed( - blocks_dir: &String, + blocks_dir: &str, index_block_hash: &StacksBlockId, ) -> Result, Error> { let block_path = StacksChainState::get_index_block_path(blocks_dir, index_block_hash)?; @@ -1011,7 +1086,7 @@ impl StacksChainState { /// Query should be structured to return rows of BLOBs fn load_block_data_blobs

( conn: &DBConn, - sql_query: &String, + sql_query: &str, sql_args: P, ) -> Result>, Error> where @@ -1093,7 +1168,7 @@ impl StacksChainState { /// Load up a preprocessed (queued) but still unprocessed block. pub fn load_staging_block( block_conn: &DBConn, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -1146,7 +1221,7 @@ impl StacksChainState { #[cfg(test)] fn load_staging_block_data( block_conn: &DBConn, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -1206,7 +1281,7 @@ impl StacksChainState { /// Load up a block's microblock public key hash, staging or not fn load_block_pubkey_hash( block_conn: &DBConn, - block_path: &String, + block_path: &str, consensus_hash: &ConsensusHash, block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -1581,7 +1656,7 @@ impl StacksChainState { /// Doesn't matter if it's staging or not. pub fn load_parent_block_header( sort_ic: &SortitionDBConn, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, anchored_block_hash: &BlockHeaderHash, ) -> Result, Error> { @@ -1625,7 +1700,7 @@ impl StacksChainState { /// chain. fn store_staging_block<'a>( tx: &mut DBTx<'a>, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, block: &StacksBlock, parent_consensus_hash: &ConsensusHash, @@ -2133,7 +2208,7 @@ impl StacksChainState { /// The blocks database will eventually delete all orphaned data. fn delete_orphaned_epoch_data<'a>( tx: &mut DBTx<'a>, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, anchored_block_hash: &BlockHeaderHash, ) -> Result<(), Error> { @@ -2195,7 +2270,7 @@ impl StacksChainState { fn set_block_processed<'a, 'b>( tx: &mut DBTx<'a>, mut sort_tx_opt: Option<&mut SortitionHandleTx<'b>>, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, anchored_block_hash: &BlockHeaderHash, accept: bool, @@ -2326,7 +2401,7 @@ impl StacksChainState { #[cfg(test)] fn set_block_orphaned<'a>( tx: &mut DBTx<'a>, - blocks_path: &String, + blocks_path: &str, consensus_hash: &ConsensusHash, anchored_block_hash: &BlockHeaderHash, ) -> Result<(), Error> { @@ -2724,10 +2799,10 @@ impl StacksChainState { Ok(microblock_info) } - /// Write data to the fd - fn write_stream_data( + /// Write header data to the fd + fn write_stream_data( fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut S, input: &mut R, count: u64, ) -> Result { @@ -2735,21 +2810,20 @@ impl StacksChainState { let nr = input.read(&mut buf).map_err(Error::ReadError)?; fd.write_all(&buf[0..nr]).map_err(Error::WriteError)?; - stream.offset += nr as u64; - stream.total_bytes += nr as u64; + stream.add_bytes(nr as u64); Ok(nr as u64) } - /// Stream data from one Read to one Write - fn stream_data( + /// Stream header data from one Read to one Write + fn stream_data( fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut S, input: &mut R, count: u64, ) -> Result { input - .seek(SeekFrom::Start(stream.offset)) + .seek(SeekFrom::Start(stream.offset())) .map_err(Error::ReadError)?; StacksChainState::write_stream_data(fd, stream, input, count) @@ -2759,54 +2833,75 @@ impl StacksChainState { /// If this method returns 0, it's because we're EOF on the header and should begin the next. fn stream_one_header( blocks_conn: &DBConn, - block_path: &String, + block_path: &str, fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut HeaderStreamData, count: u64, ) -> Result { - let header = - StacksChainState::load_block_header_indexed(block_path, &stream.index_block_hash)? - .ok_or(Error::NoSuchBlockError)?; + if stream.header_bytes.is_none() && stream.num_headers > 0 { + let header = + StacksChainState::load_block_header_indexed(block_path, &stream.index_block_hash)? + .ok_or(Error::NoSuchBlockError)?; - let header_info = - StacksChainState::load_staging_block_info(blocks_conn, &stream.index_block_hash)? - .ok_or(Error::NoSuchBlockError)?; + let header_info = + StacksChainState::load_staging_block_info(blocks_conn, &stream.index_block_hash)? + .ok_or(Error::NoSuchBlockError)?; - let parent_index_block_hash = StacksBlockHeader::make_index_block_hash( - &header_info.parent_consensus_hash, - &header_info.parent_anchored_block_hash, - ); + let parent_index_block_hash = StacksBlockHeader::make_index_block_hash( + &header_info.parent_consensus_hash, + &header_info.parent_anchored_block_hash, + ); - let mut header_bytes = vec![]; - let extended_header = ExtendedStacksHeader { - consensus_hash: header_info.consensus_hash, - header: header, - parent_block_id: parent_index_block_hash, - }; - extended_header - .consensus_serialize(&mut header_bytes) - .expect("BUG: could not reserialize block header"); + let mut header_bytes = vec![]; + let extended_header = ExtendedStacksHeader { + consensus_hash: header_info.consensus_hash, + header: header, + parent_block_id: parent_index_block_hash, + }; - if stream.offset >= (header_bytes.len() as u64) { - // EOF - return Ok(0); + serde_json::to_writer(&mut header_bytes, &extended_header) + .map_err(|e| Error::NetError(net_error::SerializeError(format!("Failed to send as JSON: {:?}", &e))))?; + + if stream.num_headers > 1 { + header_bytes.push(',' as u8); + } + + test_debug!("header_bytes: {}", String::from_utf8(header_bytes.clone()).unwrap()); + + stream.header_bytes = Some(header_bytes); + stream.offset = 0; } - let num_bytes = StacksChainState::write_stream_data( - fd, - stream, - &mut &header_bytes[(stream.offset as usize)..], - count, - )?; - test_debug!( - "Stream header hash={} offset={} total_bytes={}, num_bytes={} num_headers={}", - &stream.index_block_hash, - stream.offset, - stream.total_bytes, - num_bytes, - stream.num_headers - ); - Ok(num_bytes) + if stream.header_bytes.is_some() { + let header_bytes = stream.header_bytes.take().expect("Do not have header bytes and did not set them"); + let res = (|| { + if stream.offset >= (header_bytes.len() as u64) { + // EOF + return Ok(0); + } + + let num_bytes = StacksChainState::write_stream_data( + fd, + stream, + &mut &header_bytes[(stream.offset as usize)..], + count, + )?; + test_debug!( + "Stream header hash={} offset={} total_bytes={}, num_bytes={} num_headers={}", + &stream.index_block_hash, + stream.offset, + stream.total_bytes, + num_bytes, + stream.num_headers + ); + Ok(num_bytes) + })(); + stream.header_bytes = Some(header_bytes); + res + } + else { + Ok(0) + } } /// Stream multiple headers from disk, moving in reverse order from the chain tip back. @@ -2815,7 +2910,7 @@ impl StacksChainState { fn stream_headers( &self, fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut HeaderStreamData, count: u64, ) -> Result { let mut to_write = count; @@ -2834,6 +2929,8 @@ impl StacksChainState { "No more header to stream after {}", &stream.index_block_hash ); + stream.header_bytes = None; + stream.end_of_stream = true; break; } Err(e) => { @@ -2848,6 +2945,8 @@ impl StacksChainState { "No more header to stream after {}", &stream.index_block_hash ); + stream.header_bytes = None; + stream.end_of_stream = true; break; } @@ -2863,6 +2962,8 @@ impl StacksChainState { "Out of headers to stream after block {}", &stream.index_block_hash ); + stream.header_bytes = None; + stream.end_of_stream = true; break; } }; @@ -2873,11 +2974,12 @@ impl StacksChainState { ); stream.index_block_hash = parent_index_block_hash; - stream.offset = 0; stream.num_headers = stream .num_headers .checked_sub(1) .expect("BUG: streamed more headers than called for"); + + stream.header_bytes = None; } else { to_write = to_write .checked_sub(nw) @@ -2890,7 +2992,8 @@ impl StacksChainState { ); } debug!( - "Streamed headers: {} - {} = {}", + "Streamed headers ({} remaining): {} - {} = {}", + stream.num_headers, count, to_write, count - to_write @@ -2903,7 +3006,7 @@ impl StacksChainState { fn stream_one_microblock( blocks_conn: &DBConn, fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut MicroblockStreamData, count: u64, ) -> Result { let rowid = match stream.rowid { @@ -2961,7 +3064,7 @@ impl StacksChainState { fn stream_microblocks_confirmed( chainstate: &StacksChainState, fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut MicroblockStreamData, count: u64, ) -> Result { let mut to_write = count; @@ -3026,7 +3129,7 @@ impl StacksChainState { /// Stream block data from the chunk store. fn stream_data_from_chunk_store( - blocks_path: &String, + blocks_path: &str, fd: &mut W, stream: &mut BlockStreamData, count: u64, @@ -3073,7 +3176,7 @@ impl StacksChainState { pub fn stream_microblocks_unconfirmed( chainstate: &StacksChainState, fd: &mut W, - stream: &mut BlockStreamData, + stream: &mut MicroblockStreamData, count: u64, ) -> Result { let mut to_write = count; @@ -3878,7 +3981,7 @@ impl StacksChainState { /// Returns true if an orphan block was processed fn process_next_orphaned_staging_block<'a>( blocks_tx: &mut DBTx<'a>, - blocks_path: &String, + blocks_path: &str, ) -> Result { test_debug!("Find next orphaned block"); @@ -3975,7 +4078,7 @@ impl StacksChainState { /// Returns None if not. fn find_next_staging_block<'a>( blocks_tx: &mut StacksDBTx<'a>, - blocks_path: &String, + blocks_path: &str, sort_tx: &mut SortitionHandleTx, ) -> Result, StagingBlock)>, Error> { test_debug!("Find next staging block"); @@ -5925,6 +6028,8 @@ pub mod test { use super::*; + use serde_json; + pub fn make_empty_coinbase_block(mblock_key: &StacksPrivateKey) -> StacksBlock { let privk = StacksPrivateKey::from_hex( "59e4d5e18351d6027a37920efe53c2f1cbadc50dca7d77169b7291dff936ed6d01", @@ -8879,47 +8984,70 @@ pub mod test { fn stream_one_header_to_vec( blocks_conn: &DBConn, - blocks_path: &String, - stream: &mut BlockStreamData, + blocks_path: &str, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { - let mut bytes = vec![]; - StacksChainState::stream_one_header(blocks_conn, blocks_path, &mut bytes, stream, count) - .map(|nr| { - assert_eq!(bytes.len(), nr as usize); - bytes - }) + if let StreamCursor::Headers(ref mut stream) = stream { + let mut bytes = vec![]; + StacksChainState::stream_one_header(blocks_conn, blocks_path, &mut bytes, stream, count) + .map(|nr| { + assert_eq!(bytes.len(), nr as usize); + + // truncate trailing ',' if it exists + let len = bytes.len(); + if len > 0 { + if bytes[len-1] == ',' as u8 { + let _ = bytes.pop(); + } + } + bytes + }) + } + else { + panic!("not a header stream"); + } } fn stream_one_staging_microblock_to_vec( blocks_conn: &DBConn, - stream: &mut BlockStreamData, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { - let mut bytes = vec![]; - StacksChainState::stream_one_microblock(blocks_conn, &mut bytes, stream, count).map(|nr| { - assert_eq!(bytes.len(), nr as usize); - bytes - }) + if let StreamCursor::Microblocks(ref mut stream) = stream { + let mut bytes = vec![]; + StacksChainState::stream_one_microblock(blocks_conn, &mut bytes, stream, count).map(|nr| { + assert_eq!(bytes.len(), nr as usize); + bytes + }) + } + else { + panic!("not a microblock stream"); + } } fn stream_chunk_to_vec( - blocks_path: &String, - stream: &mut BlockStreamData, + blocks_path: &str, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { - let mut bytes = vec![]; - StacksChainState::stream_data_from_chunk_store(blocks_path, &mut bytes, stream, count).map( - |nr| { - assert_eq!(bytes.len(), nr as usize); - bytes - }, - ) + if let StreamCursor::Block(ref mut stream) = stream { + let mut bytes = vec![]; + StacksChainState::stream_data_from_chunk_store(blocks_path, &mut bytes, stream, count).map( + |nr| { + assert_eq!(bytes.len(), nr as usize); + bytes + }, + ) + } + else { + panic!("not a block stream"); + } } fn stream_headers_to_vec( chainstate: &mut StacksChainState, - stream: &mut BlockStreamData, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { let mut bytes = vec![]; @@ -8931,7 +9059,7 @@ pub mod test { fn stream_unconfirmed_microblocks_to_vec( chainstate: &mut StacksChainState, - stream: &mut BlockStreamData, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { let mut bytes = vec![]; @@ -8943,7 +9071,7 @@ pub mod test { fn stream_confirmed_microblocks_to_vec( chainstate: &mut StacksChainState, - stream: &mut BlockStreamData, + stream: &mut StreamCursor, count: u64, ) -> Result, chainstate_error> { let mut bytes = vec![]; @@ -9025,11 +9153,11 @@ pub mod test { StacksBlockHeader::make_index_block_hash(&consensus_hash, &block.block_hash()); // can't stream a non-existant block - let mut stream = BlockStreamData::new_block(index_block_header.clone()); + let mut stream = StreamCursor::new_block(index_block_header.clone()); assert!(stream_chunk_to_vec(&chainstate.blocks_path, &mut stream, 123).is_err()); // stream unmodified - let stream_2 = BlockStreamData::new_block(index_block_header.clone()); + let stream_2 = StreamCursor::new_block(index_block_header.clone()); assert_eq!(stream, stream_2); // store block to staging @@ -9066,7 +9194,7 @@ pub mod test { set_block_processed(&mut chainstate, &consensus_hash, &block.block_hash(), true); // can still stream it - let mut stream = BlockStreamData::new_block(index_block_header.clone()); + let mut stream = StreamCursor::new_block(index_block_header.clone()); // stream from chunk store let mut all_block_bytes = vec![]; @@ -9180,12 +9308,12 @@ pub mod test { } // can't stream a non-existant header - assert!(BlockStreamData::new_headers(&chainstate, &StacksBlockId([0x11; 32]), 1).is_err()); + assert!(StreamCursor::new_headers(&chainstate, &StacksBlockId([0x11; 32]), 1).is_err()); // stream back individual headers for i in 0..blocks.len() { let mut stream = - BlockStreamData::new_headers(&chainstate, &blocks_index_hashes[i], 1).unwrap(); + StreamCursor::new_headers(&chainstate, &blocks_index_hashes[i], 1).unwrap(); let mut next_header_bytes = vec![]; loop { // torture test @@ -9202,8 +9330,7 @@ pub mod test { next_header_bytes.append(&mut next_bytes); } test_debug!("Got {} total bytes", next_header_bytes.len()); - let header = - ExtendedStacksHeader::consensus_deserialize(&mut &next_header_bytes[..]).unwrap(); + let header : ExtendedStacksHeader = serde_json::from_reader(&mut &next_header_bytes[..]).unwrap(); assert_eq!(header.consensus_hash, ConsensusHash([(i + 1) as u8; 20])); assert_eq!(header.header, blocks[i].header); @@ -9237,12 +9364,14 @@ pub mod test { // get them all -- ask for more than there is let mut stream = - BlockStreamData::new_headers(&chainstate, blocks_index_hashes.last().unwrap(), 4096) + StreamCursor::new_headers(&chainstate, blocks_index_hashes.last().unwrap(), 4096) .unwrap(); let header_bytes = stream_headers_to_vec(&mut chainstate, &mut stream, 1024 * 1024).unwrap(); + + eprintln!("headers: {}", String::from_utf8(header_bytes.clone()).unwrap()); let headers: Vec = - Vec::consensus_deserialize(&mut &header_bytes[..]).unwrap(); + serde_json::from_reader(&mut &header_bytes[..]).unwrap(); assert_eq!(headers.len(), block_expected_headers.len()); for ((i, h), eh) in headers @@ -9257,7 +9386,7 @@ pub mod test { } } - let mut stream = BlockStreamData::new_headers( + let mut stream = StreamCursor::new_headers( &chainstate, blocks_fork_index_hashes.last().unwrap(), 4096, @@ -9266,7 +9395,7 @@ pub mod test { let header_bytes = stream_headers_to_vec(&mut chainstate, &mut stream, 1024 * 1024).unwrap(); let fork_headers: Vec = - Vec::consensus_deserialize(&mut &header_bytes[..]).unwrap(); + serde_json::from_reader(&mut &header_bytes[..]).unwrap(); assert_eq!(fork_headers.len(), block_fork_expected_headers.len()); for ((i, h), eh) in fork_headers @@ -9291,7 +9420,7 @@ pub mod test { // ask for only a few let mut stream = - BlockStreamData::new_headers(&chainstate, blocks_index_hashes.last().unwrap(), 10) + StreamCursor::new_headers(&chainstate, blocks_index_hashes.last().unwrap(), 10) .unwrap(); let mut header_bytes = vec![]; loop { @@ -9302,8 +9431,11 @@ pub mod test { } header_bytes.append(&mut next_bytes); } + + eprintln!("header bytes: {}", String::from_utf8(header_bytes.clone()).unwrap()); + let headers: Vec = - Vec::consensus_deserialize(&mut &header_bytes[..]).unwrap(); + serde_json::from_reader(&mut &header_bytes[..]).unwrap(); assert_eq!(headers.len(), 10); for (i, hdr) in headers.iter().enumerate() { @@ -9313,7 +9445,7 @@ pub mod test { // ask for only a few let mut stream = - BlockStreamData::new_headers(&chainstate, blocks_fork_index_hashes.last().unwrap(), 10) + StreamCursor::new_headers(&chainstate, blocks_fork_index_hashes.last().unwrap(), 10) .unwrap(); let mut header_bytes = vec![]; loop { @@ -9325,7 +9457,7 @@ pub mod test { header_bytes.append(&mut next_bytes); } let headers: Vec = - Vec::consensus_deserialize(&mut &header_bytes[..]).unwrap(); + serde_json::from_reader(&mut &header_bytes[..]).unwrap(); assert_eq!(headers.len(), 10); for (i, hdr) in headers.iter().enumerate() { @@ -9353,15 +9485,17 @@ pub mod test { StacksBlockHeader::make_index_block_hash(&consensus_hash, &block.block_hash()); // can't stream a non-existant microblock - let mut stream = BlockStreamData::new_block(index_block_header.clone()); - assert!(StacksChainState::stream_one_microblock( - &chainstate.db(), - &mut vec![], - &mut stream, - 123 - ) - .is_err()); - assert!(stream.rowid.is_none()); + if let Err(super::Error::NoSuchBlockError) = StreamCursor::new_microblock_confirmed(&chainstate, index_block_header.clone()) { + } + else { + panic!("Opened nonexistant microblock"); + } + + if let Err(super::Error::NoSuchBlockError) = StreamCursor::new_microblock_unconfirmed(&chainstate, index_block_header.clone(), 0) { + } + else { + panic!("Opened nonexistant microblock"); + } // store microblocks to staging and stream them back for (i, mblock) in mblocks.iter().enumerate() { @@ -9376,7 +9510,7 @@ pub mod test { let mut staging_mblocks = vec![]; for j in 0..(i + 1) { let mut next_mblock_bytes = vec![]; - let mut stream = BlockStreamData::new_microblock_unconfirmed( + let mut stream = StreamCursor::new_microblock_unconfirmed( &chainstate, index_block_header.clone(), j as u16, @@ -9414,7 +9548,7 @@ pub mod test { for k in 0..(i + 1) { test_debug!("start at seq {}", k); let mut staging_mblock_bytes = vec![]; - let mut stream = BlockStreamData::new_microblock_unconfirmed( + let mut stream = StreamCursor::new_microblock_unconfirmed( &chainstate, index_block_header.clone(), k as u16, @@ -9526,7 +9660,7 @@ pub mod test { // verify that we can stream everything let microblock_index_header = StacksBlockHeader::make_index_block_hash(&consensus_hash, &mblocks[i].block_hash()); - let mut stream = BlockStreamData::new_microblock_confirmed( + let mut stream = StreamCursor::new_microblock_confirmed( &chainstate, microblock_index_header.clone(), )