From 4e306ce17c847e7518caf90b133b7b0bbd23395e Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Mon, 28 Aug 2023 11:05:34 -0400 Subject: [PATCH] feat: new libsigner implementation. Stacker signers and sBTC signers would use this crate to implement a Stacks event receiver and a runloop. --- libsigner/Cargo.toml | 46 +++++ libsigner/src/error.rs | 131 ++++++++++++++ libsigner/src/events.rs | 268 ++++++++++++++++++++++++++++ libsigner/src/http.rs | 241 +++++++++++++++++++++++++ libsigner/src/libsigner.rs | 47 +++++ libsigner/src/runloop.rs | 244 +++++++++++++++++++++++++ libsigner/src/session.rs | 216 ++++++++++++++++++++++ libsigner/src/tests/http.rs | 347 ++++++++++++++++++++++++++++++++++++ libsigner/src/tests/mod.rs | 142 +++++++++++++++ 9 files changed, 1682 insertions(+) create mode 100644 libsigner/Cargo.toml create mode 100644 libsigner/src/error.rs create mode 100644 libsigner/src/events.rs create mode 100644 libsigner/src/http.rs create mode 100644 libsigner/src/libsigner.rs create mode 100644 libsigner/src/runloop.rs create mode 100644 libsigner/src/session.rs create mode 100644 libsigner/src/tests/http.rs create mode 100644 libsigner/src/tests/mod.rs diff --git a/libsigner/Cargo.toml b/libsigner/Cargo.toml new file mode 100644 index 000000000..7d29b8956 --- /dev/null +++ b/libsigner/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "libsigner" +version = "0.0.1" +authors = [ "Jude Nelson " ] +license = "GPLv3" +homepage = "https://github.com/blockstack/stacks-blockchain" +repository = "https://github.com/blockstack/stacks-blockchain" +description = "Library for Stacks signer daemons" +keywords = [ "stacks", "stx", "bitcoin", "crypto", "blockstack", "decentralized", "dapps", "blockchain" ] +readme = "README.md" +resolver = "2" +edition = "2021" + +[lib] +name = "libsigner" +path = "./src/libsigner.rs" + +[[bin]] +name = "stackschat" +path = "demos/stackschat/main.rs" + +[dependencies] +serde = "1" +serde_derive = "1" +serde_stacker = "0.1" +stacks-common = { path = "../stacks-common" } +clarity = { path = "../clarity" } +libstackerdb = { path = "../libstackerdb" } +slog = { version = "2.5.2", features = [ "max_level_trace" ] } +slog-term = "2.6.0" +slog-json = { version = "2.3.0", optional = true } +libc = "0.2" + +[dependencies.serde_json] +version = "1.0" +features = ["arbitrary_precision", "unbounded_depth"] + +[dependencies.secp256k1] +version = "0.24.3" +features = ["serde", "recovery"] + +[target.'cfg(all(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64"), not(target_env = "msvc")))'.dependencies] +sha2 = { version = "0.10", features = ["asm"] } + +[target.'cfg(any(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")), target_env = "msvc"))'.dependencies] +sha2 = { version = "0.10" } diff --git a/libsigner/src/error.rs b/libsigner/src/error.rs new file mode 100644 index 000000000..fb2933695 --- /dev/null +++ b/libsigner/src/error.rs @@ -0,0 +1,131 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::error; +use std::fmt; +use std::io; + +/// Errors originating from doing an RPC request to the Stacks node +#[derive(Debug)] +pub enum RPCError { + IO(io::Error), + Deserialize(String), + NotConnected, + MalformedRequest(String), + MalformedResponse(String), + HttpError(u32), +} + +impl fmt::Display for RPCError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + RPCError::IO(ref s) => fmt::Display::fmt(s, f), + RPCError::Deserialize(ref s) => fmt::Display::fmt(s, f), + RPCError::HttpError(ref s) => { + write!(f, "HTTP code {}", s) + } + RPCError::MalformedRequest(ref s) => { + write!(f, "Malformed request: {}", s) + } + RPCError::MalformedResponse(ref s) => { + write!(f, "Malformed response: {}", s) + } + RPCError::NotConnected => { + write!(f, "Not connected") + } + } + } +} + +impl error::Error for RPCError { + fn cause(&self) -> Option<&dyn error::Error> { + match *self { + RPCError::IO(ref s) => Some(s), + RPCError::Deserialize(..) => None, + RPCError::HttpError(..) => None, + RPCError::MalformedRequest(..) => None, + RPCError::MalformedResponse(..) => None, + RPCError::NotConnected => None, + } + } +} + +impl From for RPCError { + fn from(e: io::Error) -> RPCError { + RPCError::IO(e) + } +} + +/// Errors originating from receiving event data from the Stacks node +#[derive(Debug)] +pub enum EventError { + IO(io::Error), + Deserialize(String), + MalformedRequest(String), + NotBound, + Terminated, + AlreadyRunning, + FailedToStart, + UnrecognizedEvent(String), +} + +impl fmt::Display for EventError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + EventError::IO(ref s) => fmt::Display::fmt(s, f), + EventError::Deserialize(ref s) => fmt::Display::fmt(s, f), + EventError::MalformedRequest(ref s) => { + write!(f, "Malformed request: {}", s) + } + EventError::NotBound => { + write!(f, "Not bound to a port yet") + } + EventError::Terminated => { + write!(f, "Listener is terminated") + } + EventError::AlreadyRunning => { + write!(f, "Thread already running") + } + EventError::FailedToStart => { + write!(f, "Failed to start thread") + } + EventError::UnrecognizedEvent(ref ev) => { + write!(f, "Unrecognized event '{}'", &ev) + } + } + } +} + +impl error::Error for EventError { + fn cause(&self) -> Option<&dyn error::Error> { + match *self { + EventError::IO(ref s) => Some(s), + EventError::Deserialize(..) => None, + EventError::MalformedRequest(..) => None, + EventError::NotBound => None, + EventError::Terminated => None, + EventError::AlreadyRunning => None, + EventError::FailedToStart => None, + EventError::UnrecognizedEvent(..) => None, + } + } +} + +impl From for EventError { + fn from(e: io::Error) -> EventError { + EventError::IO(e) + } +} diff --git a/libsigner/src/events.rs b/libsigner/src/events.rs new file mode 100644 index 000000000..3bc01e110 --- /dev/null +++ b/libsigner/src/events.rs @@ -0,0 +1,268 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::sync::mpsc::Sender; + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use std::net::SocketAddr; +use std::net::TcpListener; +use std::net::TcpStream; + +use std::io::Read; + +use clarity::vm::types::QualifiedContractIdentifier; +use libstackerdb::StackerDBChunkData; + +use serde::{Deserialize, Serialize}; + +use crate::http::{decode_http_body, decode_http_request}; +use crate::EventError; + +/// Event structure for newly-arrived StackerDB data +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct StackerDBChunksEvent { + pub contract_id: QualifiedContractIdentifier, + pub modified_slots: Vec, +} + +/// Trait to implement a stop-signaler for the event receiver thread. +/// The caller calls `send()` and the event receiver loop (which lives in a separate thread) will +/// terminate. +pub trait EventStopSignaler { + fn send(&mut self); +} + +/// Trait to implement to handle StackerDB events sent by the Stacks node +pub trait EventReceiver { + /// The implementation of ST will ensure that a call to ST::send() will cause + /// the call to `is_stopped()` below to return true. + type ST: EventStopSignaler + Send + Sync; + + /// Open a server socket to the given socket address. + fn bind(&mut self, listener: SocketAddr) -> Result; + /// Return the next event + fn next_event(&mut self) -> Result; + /// Add a downstream event consumer + fn add_consumer(&mut self, event_out: Sender); + /// Forward the event to downstream consumers + fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool; + /// Determine if the receiver should hang up + fn is_stopped(&self) -> bool; + /// Get a stop signal instance that, when sent, will cause this receiver to stop accepting new + /// events. Called after `bind()`. + fn get_stop_signaler(&mut self) -> Result; + + /// Main loop for the receiver. + /// Typically, this is started in a separate thread. + fn main_loop(&mut self) { + loop { + if self.is_stopped() { + info!("Event receiver stopped"); + break; + } + let next_event = match self.next_event() { + Ok(event) => event, + Err(EventError::UnrecognizedEvent(..)) => { + // got an event that we don't care about (not a problem) + continue; + } + Err(EventError::Terminated) => { + // we're done + info!("Caught termination signal"); + break; + } + Err(e) => { + warn!("Failed to receive next event: {:?}", &e); + continue; + } + }; + if !self.forward_event(next_event) { + info!("Failed to forward event"); + break; + } + } + info!("Event receiver main loop exit"); + } +} + +pub struct StackerDBEventReceiver { + /// contracts we're listening for + pub stackerdb_contract_ids: Vec, + /// Address we bind to + local_addr: Option, + /// server socket that listens for HTTP POSTs from the node + sock: Option, + /// channel into which to write newly-discovered data + out_channels: Vec>, + /// inter-thread stop variable -- if set to true, then the `main_loop` will exit + stop_signal: Arc, +} + +impl StackerDBEventReceiver { + /// Make a new StackerDB event receiver, and return both the receiver and the read end of a + /// channel into which node-received data can be obtained. + pub fn new(contract_ids: Vec) -> StackerDBEventReceiver { + let stackerdb_receiver = StackerDBEventReceiver { + stackerdb_contract_ids: contract_ids, + sock: None, + local_addr: None, + out_channels: vec![], + stop_signal: Arc::new(AtomicBool::new(false)), + }; + stackerdb_receiver + } + + /// Do something with the socket + pub fn with_socket(&mut self, todo: F) -> Result + where + F: FnOnce(&mut StackerDBEventReceiver, &mut TcpListener) -> R, + { + let mut sock = if let Some(s) = self.sock.take() { + s + } else { + return Err(EventError::NotBound); + }; + + let res = todo(self, &mut sock); + + self.sock = Some(sock); + Ok(res) + } +} + +/// Stop signaler implementation +pub struct StackerDBStopSignaler { + stop_signal: Arc, + local_addr: SocketAddr, +} + +impl StackerDBStopSignaler { + pub fn new(sig: Arc, local_addr: SocketAddr) -> StackerDBStopSignaler { + StackerDBStopSignaler { + stop_signal: sig, + local_addr, + } + } +} + +impl EventStopSignaler for StackerDBStopSignaler { + fn send(&mut self) { + self.stop_signal.store(true, Ordering::SeqCst); + + // wake up the thread so the atomicbool can be checked + let _ = TcpStream::connect(&self.local_addr); + } +} + +impl EventReceiver for StackerDBEventReceiver { + type ST = StackerDBStopSignaler; + + /// Start listening on the given socket address. + /// Returns the address that was bound. + /// Errors out if bind(2) fails + fn bind(&mut self, listener: SocketAddr) -> Result { + let srv = TcpListener::bind(listener)?; + let bound_addr = srv.local_addr()?; + self.sock = Some(srv); + self.local_addr = Some(bound_addr.clone()); + Ok(bound_addr) + } + + /// Wait for the node to post something, and then return it. + /// Errors are recoverable -- the caller should call this method again even if it returns an + /// error. + fn next_event(&mut self) -> Result { + self.with_socket(|event_receiver, server_sock| { + let (mut node_sock, _) = server_sock.accept()?; + + // were we asked to terminate? + if event_receiver.is_stopped() { + return Err(EventError::Terminated); + } + + let mut buf = vec![]; + node_sock.read_to_end(&mut buf)?; + + let (verb, path, headers, body_offset) = decode_http_request(&buf)?; + if verb != "POST" { + return Err(EventError::MalformedRequest(format!( + "Unrecognized verb '{}'", + &verb + ))); + } + if path != "/stackerdb_chunks" { + return Err(EventError::UnrecognizedEvent(path)); + } + + let body = decode_http_body(&headers, &buf[body_offset..])?; + let event: StackerDBChunksEvent = serde_json::from_slice(&body).map_err(|e| { + EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)) + })?; + Ok(event) + })? + } + + /// Determine if the receiver is hung up + fn is_stopped(&self) -> bool { + self.stop_signal.load(Ordering::SeqCst) + } + + /// Forward an event + /// Return true on success; false on error. + /// Returning false terminates the event receiver. + fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool { + if self.out_channels.len() == 0 { + // nothing to do + error!("No channels connected to event receiver"); + return false; + } else if self.out_channels.len() == 1 { + // avoid a clone + if let Err(e) = self.out_channels[0].send(ev) { + error!("Failed to send to signer runloop: {:?}", &e); + return false; + } + return true; + } else { + for (i, out_channel) in self.out_channels.iter().enumerate() { + if let Err(e) = out_channel.send(ev.clone()) { + error!("Failed to send to signer runloop #{}: {:?}", i, &e); + return false; + } + } + return true; + } + } + + /// Add an event consumer. A received event will be forwarded to this Sender. + fn add_consumer(&mut self, out_channel: Sender) { + self.out_channels.push(out_channel); + } + + /// Get a stopped signaler. The caller can then use it to terminate the event receiver loop, + /// even if it's in a different thread. + fn get_stop_signaler(&mut self) -> Result { + if let Some(local_addr) = self.local_addr.as_ref() { + Ok(StackerDBStopSignaler::new( + self.stop_signal.clone(), + local_addr.clone(), + )) + } else { + Err(EventError::NotBound) + } + } +} diff --git a/libsigner/src/http.rs b/libsigner/src/http.rs new file mode 100644 index 000000000..d70b16a5c --- /dev/null +++ b/libsigner/src/http.rs @@ -0,0 +1,241 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::io; +use std::io::{Read, Write}; + +use std::net::SocketAddr; + +use stacks_common::codec::MAX_MESSAGE_LEN; +use stacks_common::deps_common::httparse; +use stacks_common::util::chunked_encoding::*; + +use crate::error::EventError; +use crate::error::RPCError; + +pub const MAX_HTTP_HEADERS: usize = 32; +pub const MAX_HTTP_HEADER_LEN: usize = 4096; + +/// Decode the HTTP request payload into its headers and body. +/// Returns (verb, path, table of headers, body_offset) on success +pub fn decode_http_request( + payload: &[u8], +) -> Result<(String, String, HashMap, usize), EventError> { + // realistically, there won't be more than 32 headers + let mut headers_buf = [httparse::EMPTY_HEADER; MAX_HTTP_HEADERS]; + let mut req = httparse::Request::new(&mut headers_buf); + let (verb, path, headers, body_offset) = + if let Ok(httparse::Status::Complete(body_offset)) = req.parse(payload) { + // version must be valid + match req + .version + .ok_or(EventError::MalformedRequest("No HTTP version".to_string()))? + { + 0 => {} + 1 => {} + _ => { + return Err(EventError::MalformedRequest( + "Invalid HTTP version".to_string(), + )); + } + }; + + let verb = req + .method + .ok_or(EventError::MalformedRequest("No HTTP method".to_string()))? + .to_string(); + let path = req + .path + .ok_or(EventError::MalformedRequest("No HTTP path".to_string()))? + .to_string(); + + let mut headers: HashMap = HashMap::new(); + for i in 0..req.headers.len() { + let value = String::from_utf8(req.headers[i].value.to_vec()).map_err(|_e| { + EventError::MalformedRequest("Invalid HTTP header value: not utf-8".to_string()) + })?; + if !value.is_ascii() { + return Err(EventError::MalformedRequest(format!( + "Invalid HTTP request: header value is not ASCII-US" + ))); + } + if value.len() > MAX_HTTP_HEADER_LEN { + return Err(EventError::MalformedRequest(format!( + "Invalid HTTP request: header value is too big" + ))); + } + + let key = req.headers[i].name.to_string().to_lowercase(); + if headers.get(&key).is_some() { + return Err(EventError::MalformedRequest(format!( + "Invalid HTTP request: duplicate header \"{}\"", + key + ))); + } + headers.insert(key, value); + } + (verb, path, headers, body_offset) + } else { + return Err(EventError::Deserialize( + "Failed to decode HTTP headers".to_string(), + )); + }; + + Ok((verb, path, headers, body_offset)) +} + +/// Decode the HTTP response payload into its headers and body. +/// Return the offset into payload where the body starts, and a table of headers. +/// +/// If the payload contains a status code other than 200, then RPCERror::HttpError(..) will be +/// returned with the status code. +/// If the payload is missing necessary data, then RPCError::MalformedResponse(..) will be +/// returned, with a human-readable reason string. +/// If the payload does not contain a full HTTP header list, then RPCError::Deserialize(..) will be +/// returned. This can happen if there are more than MAX_HTTP_HEADERS in the payload, for example. +pub fn decode_http_response(payload: &[u8]) -> Result<(HashMap, usize), RPCError> { + // realistically, there won't be more than 32 headers + let mut headers_buf = [httparse::EMPTY_HEADER; MAX_HTTP_HEADERS]; + let mut resp = httparse::Response::new(&mut headers_buf); + + // consume respuest + let (headers, body_offset) = + if let Ok(httparse::Status::Complete(body_offset)) = resp.parse(payload) { + if let Some(code) = resp.code { + if code != 200 { + return Err(RPCError::HttpError(code.into())); + } + } else { + return Err(RPCError::MalformedResponse( + "No HTTP status code returned".to_string(), + )); + } + if let Some(version) = resp.version { + if version != 0 && version != 1 { + return Err(RPCError::MalformedResponse(format!( + "Unrecognized HTTP code {}", + version + ))); + } + } else { + return Err(RPCError::MalformedResponse( + "No HTTP version given".to_string(), + )); + } + let mut headers: HashMap = HashMap::new(); + for i in 0..resp.headers.len() { + let value = String::from_utf8(resp.headers[i].value.to_vec()).map_err(|_e| { + RPCError::MalformedResponse("Invalid HTTP header value: not utf-8".to_string()) + })?; + if !value.is_ascii() { + return Err(RPCError::MalformedResponse( + "Invalid HTTP response: header value is not ASCII-US".to_string(), + )); + } + if value.len() > MAX_HTTP_HEADER_LEN { + return Err(RPCError::MalformedResponse(format!( + "Invalid HTTP response: header value is too big" + ))); + } + + let key = resp.headers[i].name.to_string().to_lowercase(); + if headers.contains_key(&key) { + return Err(RPCError::MalformedResponse(format!( + "Invalid HTTP respuest: duplicate header \"{}\"", + key + ))); + } + headers.insert(key, value); + } + (headers, body_offset) + } else { + return Err(RPCError::Deserialize( + "Failed to decode HTTP headers".to_string(), + )); + }; + + Ok((headers, body_offset)) +} + +/// Decode an HTTP body, given the headers. +pub fn decode_http_body(headers: &HashMap, mut buf: &[u8]) -> io::Result> { + let chunked = if let Some(val) = headers.get("transfer-encoding") { + val == "chunked" + } else { + false + }; + + let body = if chunked { + // chunked encoding + let ptr = &mut buf; + let mut fd = HttpChunkedTransferReader::from_reader(ptr, MAX_MESSAGE_LEN.into()); + let mut decoded_body = vec![]; + fd.read_to_end(&mut decoded_body)?; + decoded_body + } else { + // body is just as-is + buf.to_vec() + }; + + Ok(body) +} + +/// Run an HTTP request, synchronously, through the given read/write handle +/// Return the HTTP reply, decoded if it was chunked +pub fn run_http_request( + sock: &mut S, + host: &SocketAddr, + verb: &str, + path: &str, + content_type: Option<&str>, + payload: &[u8], +) -> Result, RPCError> { + let content_length_hdr = if payload.len() > 0 { + format!("Content-Length: {}\r\n", payload.len()) + } else { + "".to_string() + }; + + let req_txt = if let Some(content_type) = content_type { + format!( + "{} {} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\nContent-Type: {}\r\n{}User-Agent: libsigner/0.1\r\nAccept: */*\r\n\r\n", + verb, path, format!("{}", host), content_type, content_length_hdr + ) + } else { + format!( + "{} {} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n{}User-Agent: libsigner/0.1\r\nAccept: */*\r\n\r\n", + verb, path, format!("{}", host), content_length_hdr + ) + }; + debug!("HTTP request\n{}", &req_txt); + + sock.write_all(req_txt.as_bytes())?; + sock.write_all(payload)?; + + let mut buf = vec![]; + + sock.read_to_end(&mut buf)?; + + let (headers, body_offset) = decode_http_response(&buf)?; + if body_offset >= buf.len() { + // no body + debug!("No HTTP body"); + return Ok(vec![]); + } + + decode_http_body(&headers, &buf[body_offset..]).map_err(|e| e.into()) +} diff --git a/libsigner/src/libsigner.rs b/libsigner/src/libsigner.rs new file mode 100644 index 000000000..11f6ba614 --- /dev/null +++ b/libsigner/src/libsigner.rs @@ -0,0 +1,47 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#![allow(unused_imports)] +#![allow(dead_code)] +#[macro_use(o, slog_log, slog_trace, slog_debug, slog_info, slog_warn, slog_error)] +extern crate slog; + +extern crate serde; +extern crate serde_json; +#[macro_use] +extern crate stacks_common; +extern crate clarity; +extern crate libc; + +#[cfg(test)] +mod tests; + +mod error; +mod events; +mod http; +mod runloop; +mod session; + +pub use crate::session::{SignerSession, StackerDBSession}; + +pub use crate::error::{EventError, RPCError}; + +pub use crate::runloop::{RunningSigner, Signer, SignerRunLoop}; + +pub use crate::events::{ + EventReceiver, EventStopSignaler, StackerDBChunksEvent, StackerDBEventReceiver, + StackerDBStopSignaler, +}; diff --git a/libsigner/src/runloop.rs b/libsigner/src/runloop.rs new file mode 100644 index 000000000..895938c4d --- /dev/null +++ b/libsigner/src/runloop.rs @@ -0,0 +1,244 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +#![allow(unused_imports)] +#![allow(dead_code)] + +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; +use std::thread; +use std::thread::JoinHandle; +use std::time::Duration; + +use crate::events::{EventReceiver, EventStopSignaler, StackerDBChunksEvent}; + +use crate::error::EventError; + +use stacks_common::deps_common::ctrlc as termination; +use stacks_common::deps_common::ctrlc::SignalId; + +use libc; + +/// Some libcs, like musl, have a very small stack size. +/// Make sure it's big enough. +const THREAD_STACK_SIZE: usize = 128 * 1024 * 1024; // 128 MB + +/// stderr fileno +const STDERR: i32 = 2; + +/// Trait describing the needful components of a top-level runloop. +/// This is where the signer business logic would go. +/// Implement this, and you get all the multithreaded setup for free. +pub trait SignerRunLoop { + /// Hint to set how long to wait for new events + fn set_event_timeout(&mut self, timeout: Duration); + /// Getter for the event poll timeout + fn get_event_timeout(&self) -> Duration; + /// Run one pass of the event loop, given new StackerDB events discovered since the last pass. + /// Returns Some(R) if this is the final pass -- the runloop evaluated to R + /// Returns None to keep running. + fn run_one_pass(&mut self, events: Option) -> Option; + + /// This is the main loop body for the signer. It continuously receives events from + /// `event_recv`, polling for up to `self.get_event_timeout()` units of time. Once it has + /// polled for events, they are fed into `run_one_pass()`. This continues until either + /// `run_one_pass()` returns `false`, or the event receiver hangs up. At this point, this + /// method calls the `event_stop_signaler.send()` to terminate the receiver. + /// + /// This would run in a separate thread from the event receiver. + fn main_loop( + &mut self, + event_recv: Receiver, + mut event_stop_signaler: EVST, + ) -> Option { + loop { + let poll_timeout = self.get_event_timeout(); + let next_event_opt = match event_recv.recv_timeout(poll_timeout) { + Ok(event) => Some(event), + Err(RecvTimeoutError::Timeout) => None, + Err(RecvTimeoutError::Disconnected) => { + info!("Event receiver disconnected"); + return None; + } + }; + if let Some(final_state) = self.run_one_pass(next_event_opt) { + // finished! + info!("Runloop exit; signaling event-receiver to stop"); + event_stop_signaler.send(); + return Some(final_state); + } + } + } +} + +/// The top-level signer implementation +pub struct Signer + Send + Sync, EV: EventReceiver + Send> { + /// the runloop itself + signer_loop: Option, + /// the event receiver to use + event_receiver: Option, + /// marker to permit the R type + _phantom: PhantomData, +} + +/// The running signer implementation +pub struct RunningSigner { + /// join handle for signer runloop + signer_join: JoinHandle>, + /// join handle for event receiver + event_join: JoinHandle<()>, + /// kill signal for the event receiver + stop_signal: EV::ST, +} + +impl RunningSigner { + /// Stop the signer, and get the final state + pub fn stop(mut self) -> Option { + // kill event receiver + self.stop_signal.send(); + + debug!("Try join event loop..."); + // wait for event receiver join + let _ = self.event_join.join().map_err(|thread_panic| { + error!("Event thread panicked with: '{:?}'", &thread_panic); + thread_panic + }); + info!("Event receiver thread joined"); + + // wait for runloop to join + debug!("Try join signer loop..."); + let result_opt = self + .signer_join + .join() + .map_err(|thread_panic| { + error!("Event thread panicked with: '{:?}'", &thread_panic); + thread_panic + }) + .unwrap_or(None); + + info!("Signer thread joined"); + result_opt + } +} + +/// Write to stderr in an async-safe manner. +/// See signal-safety(7) +#[warn(unused)] +fn async_safe_write_stderr(msg: &str) { + #[cfg(windows)] + unsafe { + // write(2) inexplicably has a different ABI only on Windows. + libc::write( + STDERR, + msg.as_ptr() as *const libc::c_void, + msg.len() as u32, + ); + } + #[cfg(not(windows))] + unsafe { + libc::write(STDERR, msg.as_ptr() as *const libc::c_void, msg.len()); + } +} + +/// This is a termination handler for handling receipt of a terminating UNIX signal, like SIGINT, +/// SIGQUIT, SIGTERM, or SIGBUS. You'd call this as part of the startup code for the signer daemon. +/// DO NOT call this from within the library! +pub fn set_runloop_signal_handler(mut stop_signaler: ST) { + termination::set_handler(move |sig_id| match sig_id { + SignalId::Bus => { + let msg = "Caught SIGBUS; crashing immediately and dumping core\n"; + async_safe_write_stderr(msg); + unsafe { + libc::abort(); + } + } + _ => { + let msg = format!("Graceful termination request received (signal `{}`), will complete the ongoing runloop cycles and terminate\n", sig_id); + async_safe_write_stderr(&msg); + stop_signaler.send(); + } + }).expect("FATAL: failed to set signal handler"); +} + +impl< + R: Send + 'static, + SL: SignerRunLoop + Send + Sync + 'static, + EV: EventReceiver + Send + 'static, + > Signer +{ + pub fn new(runloop: SL, event_receiver: EV) -> Signer { + Signer { + signer_loop: Some(runloop), + event_receiver: Some(event_receiver), + _phantom: PhantomData, + } + } + + /// This is a helper function to spawn both the runloop and event receiver in their own + /// threads. Advanced signers may not need this method, and instead opt to run the receiver + /// and runloop directly. However, this method is present to help signer developers to get + /// their implementations off the ground. + /// + /// The given `bind_addr` is the server address this event receiver needs to listen on, so the + /// stacks node can POST events to it. + /// + /// On success, this method consumes the Signer and returns a RunningSigner with the relevant + /// inter-thread communication primitives for the caller to shut down the system. + pub fn spawn(&mut self, bind_addr: SocketAddr) -> Result, EventError> { + let mut event_receiver = self + .event_receiver + .take() + .ok_or(EventError::AlreadyRunning)?; + let mut signer_loop = self.signer_loop.take().ok_or(EventError::AlreadyRunning)?; + + let (event_send, event_recv) = channel(); + event_receiver.add_consumer(event_send); + + event_receiver.bind(bind_addr)?; + let stop_signaler = event_receiver.get_stop_signaler()?; + let mut ret_stop_signaler = event_receiver.get_stop_signaler()?; + + // start a thread for the event receiver + let event_thread = thread::Builder::new() + .name("event_receiver".to_string()) + .stack_size(THREAD_STACK_SIZE) + .spawn(move || event_receiver.main_loop()) + .map_err(|e| { + error!("EventReceiver failed to start: {:?}", &e); + EventError::FailedToStart + })?; + + // start receiving events and doing stuff with them + let runloop_thread = thread::Builder::new() + .name("signer_runloop".to_string()) + .stack_size(THREAD_STACK_SIZE) + .spawn(move || signer_loop.main_loop(event_recv, stop_signaler)) + .map_err(|e| { + error!("SignerRunLoop failed to start: {:?}", &e); + ret_stop_signaler.send(); + EventError::FailedToStart + })?; + + let running_signer = RunningSigner { + signer_join: runloop_thread, + event_join: event_thread, + stop_signal: ret_stop_signaler, + }; + + Ok(running_signer) + } +} diff --git a/libsigner/src/session.rs b/libsigner/src/session.rs new file mode 100644 index 000000000..fb9d5c0e3 --- /dev/null +++ b/libsigner/src/session.rs @@ -0,0 +1,216 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::net::SocketAddr; +use std::net::TcpStream; +use std::str; + +use libstackerdb::{ + stackerdb_get_chunk_path, stackerdb_get_metadata_path, stackerdb_post_chunk_path, SlotMetadata, + StackerDBChunkAckData, StackerDBChunkData, +}; + +use clarity::vm::types::QualifiedContractIdentifier; + +use crate::error::RPCError; +use crate::http::run_http_request; + +use serde_json; + +pub trait SignerSession { + fn connect( + &mut self, + host: SocketAddr, + stackerdb_contract_id: QualifiedContractIdentifier, + ) -> Result<(), RPCError>; + fn list_chunks(&mut self) -> Result, RPCError>; + fn get_chunks( + &mut self, + slots_and_versions: &[(u32, u32)], + ) -> Result>>, RPCError>; + fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result>>, RPCError>; + fn put_chunk(&mut self, chunk: StackerDBChunkData) -> Result; + + /// Get a single chunk with the given version + /// Returns Ok(Some(..)) if the chunk exists + /// Returns Ok(None) if the chunk with the given version does not exist + /// Returns Err(..) on transport error + fn get_chunk(&mut self, slot_id: u32, version: u32) -> Result>, RPCError> { + Ok(self.get_chunks(&[(slot_id, version)])?[0].clone()) + } + + /// Get a single latest chunk. + /// Returns Ok(Some(..)) if the slot exists + /// Returns Ok(None) if not + /// Returns Err(..) on transport error + fn get_latest_chunk(&mut self, slot_id: u32) -> Result>, RPCError> { + Ok(self.get_latest_chunks(&[(slot_id)])?[0].clone()) + } +} + +/// signer session for a stackerdb instance +pub struct StackerDBSession { + /// host we're talking to + pub host: SocketAddr, + /// contract we're talking to + pub stackerdb_contract_id: QualifiedContractIdentifier, + /// connection to the replica + sock: Option, +} + +impl StackerDBSession { + /// instantiate but don't connect + pub fn new( + host: SocketAddr, + stackerdb_contract_id: QualifiedContractIdentifier, + ) -> StackerDBSession { + StackerDBSession { + host, + stackerdb_contract_id, + sock: None, + } + } + + /// connect or reconnect to the node + fn connect_or_reconnect(&mut self) -> Result<(), RPCError> { + debug!("connect to {}", &self.host); + self.sock = Some(TcpStream::connect(self.host.clone())?); + Ok(()) + } + + /// Do something with the connected socket + fn with_socket(&mut self, todo: F) -> Result + where + F: FnOnce(&mut StackerDBSession, &mut TcpStream) -> R, + { + if self.sock.is_none() { + self.connect_or_reconnect()?; + } + + let mut sock = if let Some(s) = self.sock.take() { + s + } else { + return Err(RPCError::NotConnected); + }; + + let res = todo(self, &mut sock); + + self.sock = Some(sock); + Ok(res) + } + + /// send an HTTP RPC request and receive a reply. + /// Return the HTTP reply, decoded if it was chunked + fn rpc_request( + &mut self, + verb: &str, + path: &str, + content_type: Option<&str>, + payload: &[u8], + ) -> Result, RPCError> { + self.with_socket(|session, sock| { + run_http_request(sock, &session.host, verb, path, content_type, payload) + })? + } +} + +impl SignerSession for StackerDBSession { + /// connect to the replica + fn connect( + &mut self, + host: SocketAddr, + stackerdb_contract_id: QualifiedContractIdentifier, + ) -> Result<(), RPCError> { + self.host = host; + self.stackerdb_contract_id = stackerdb_contract_id; + self.connect_or_reconnect() + } + + /// query the replica for a list of chunks + fn list_chunks(&mut self) -> Result, RPCError> { + let bytes = self.rpc_request( + "GET", + &stackerdb_get_metadata_path(self.stackerdb_contract_id.clone()), + None, + &[], + )?; + let metadata: Vec = serde_json::from_slice(&bytes) + .map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?; + Ok(metadata) + } + + /// query the replica for zero or more chunks + fn get_chunks( + &mut self, + slots_and_versions: &[(u32, u32)], + ) -> Result>>, RPCError> { + let mut payloads = vec![]; + for (slot_id, slot_version) in slots_and_versions.iter() { + let path = stackerdb_get_chunk_path( + self.stackerdb_contract_id.clone(), + *slot_id, + Some(*slot_version), + ); + let chunk = match self.rpc_request("GET", &path, None, &[]) { + Ok(body_bytes) => Some(body_bytes), + Err(RPCError::HttpError(code)) => { + if code != 404 { + return Err(RPCError::HttpError(code)); + } + None + } + Err(e) => { + return Err(e); + } + }; + payloads.push(chunk); + } + Ok(payloads) + } + + /// query the replica for zero or more latest chunks + fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result>>, RPCError> { + let mut payloads = vec![]; + for slot_id in slot_ids.iter() { + let path = stackerdb_get_chunk_path(self.stackerdb_contract_id.clone(), *slot_id, None); + let chunk = match self.rpc_request("GET", &path, None, &[]) { + Ok(body_bytes) => Some(body_bytes), + Err(RPCError::HttpError(code)) => { + if code != 404 { + return Err(RPCError::HttpError(code)); + } + None + } + Err(e) => { + return Err(e); + } + }; + payloads.push(chunk); + } + Ok(payloads) + } + + /// upload a chunk + fn put_chunk(&mut self, chunk: StackerDBChunkData) -> Result { + let body = + serde_json::to_vec(&chunk).map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?; + let path = stackerdb_post_chunk_path(self.stackerdb_contract_id.clone()); + let resp_bytes = self.rpc_request("POST", &path, Some("application/json"), &body)?; + let ack: StackerDBChunkAckData = serde_json::from_slice(&resp_bytes) + .map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?; + Ok(ack) + } +} diff --git a/libsigner/src/tests/http.rs b/libsigner/src/tests/http.rs new file mode 100644 index 000000000..8a1981b15 --- /dev/null +++ b/libsigner/src/tests/http.rs @@ -0,0 +1,347 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::io; +use std::io::Read; +use std::io::Write; +use std::str; + +use crate::error::EventError; +use crate::error::RPCError; +use crate::http::{decode_http_body, decode_http_request, decode_http_response, run_http_request}; + +use stacks_common::util::chunked_encoding::*; + +#[test] +fn test_decode_http_request_ok() { + let tests = vec![ + ("GET /foo HTTP/1.1\r\nHost: localhost:6270\r\n\r\n", + ("GET", "/foo", vec![("host", "localhost:6270")])), + ("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\n\r\n", + ("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar")])), + ("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\n\r\n", + ("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar")])), + ("GET /foo HTTP/1.1\r\nConnection: close\r\nHost: localhost:6270\r\n\r\n", + ("GET", "/foo", vec![("connection", "close"), ("host", "localhost:6270")])), + ("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nConnection: close\r\nFoo: Bar\r\n\r\n", + ("POST", "asdf", vec![("host", "core.blockstack.org"), ("connection", "close"), ("foo", "Bar")])), + ("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\nConnection: close\r\n\r\n", + ("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar"), ("connection", "close")])), + ("GET /foo HTTP/1.1\r\nhOsT: localhost:6270\r\n\r\n", + ("GET", "/foo", vec![("host", "localhost:6270")])), + ("GET /foo HTTP/1.1\r\ncOnNeCtIoN: cLoSe\r\nhOsT: localhost:6270\r\n\r\n", + ("GET", "/foo", vec![("connection", "cLoSe"), ("host", "localhost:6270")])), + ("POST asdf HTTP/1.1\r\nhOsT: core.blockstack.org\r\nCOnNeCtIoN: kEeP-aLiVE\r\nFoo: Bar\r\n\r\n", + ("POST", "asdf", vec![("host", "core.blockstack.org"), ("connection", "kEeP-aLiVE"), ("foo", "Bar")])) + ]; + + for (data, (expected_verb, expected_path, headers_list)) in tests.iter() { + let mut expected_headers = HashMap::new(); + for (key, val) in headers_list.iter() { + expected_headers.insert(key.to_string(), val.to_string()); + } + + let (verb, path, headers, _) = decode_http_request(data.as_bytes()).unwrap(); + assert_eq!(verb, expected_verb.to_string()); + assert_eq!(path, expected_path.to_string()); + assert_eq!(headers, expected_headers); + } +} + +#[test] +fn test_decode_http_request_err() { + let tests = vec![ + ( + "GET /foo HTTP/1.1\r\n", + EventError::Deserialize("".to_string()), + ), + ( + "GET /foo HTTP/\r\n\r\n", + EventError::Deserialize("".to_string()), + ), + ( + "GET /foo HTTP/1.1\r\nHost:", + EventError::Deserialize("".to_string()), + ), + ( + "GET /foo HTTP/1.1\r\nHost: foo:80\r\nHost: bar:80\r\n\r\n", + EventError::MalformedRequest("".to_string()), + ), + ( + "GET /foo HTTP/1.1\r\nHost: localhost:6270\r\nfoo: \u{2764}\r\n\r\n", + EventError::MalformedRequest("".to_string()), + ), + ]; + + for (data, expected_err_type) in tests.iter() { + let err = decode_http_request(data.as_bytes()).unwrap_err(); + match (err, expected_err_type) { + (EventError::Deserialize(..), EventError::Deserialize(..)) => {} + (EventError::MalformedRequest(..), EventError::MalformedRequest(..)) => {} + (x, y) => { + error!("expected error mismatch: {:?} != {:?}", &y, &x); + panic!(); + } + } + } +} + +#[test] +fn test_decode_http_response_ok() { + let tests = vec![ + ("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 123\r\nX-Request-ID: 0\r\n\r\n", + vec![("content-type", "application/octet-stream"), ("content-length", "123"), ("x-request-id", "0")]), + ("HTTP/1.1 200 Ok\r\nContent-Type: application/octet-stream\r\nTransfer-encoding: chunked\r\nX-Request-ID: 0\r\n\r\n", + vec![("content-type", "application/octet-stream"), ("transfer-encoding", "chunked"), ("x-request-id", "0")]), + ("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 123\r\nConnection: close\r\nX-Request-ID: 0\r\n\r\n", + vec![("content-type", "application/octet-stream"), ("content-length", "123"), ("connection", "close"), ("x-request-id", "0")]), + ("HTTP/1.1 200 Ok\r\nConnection: close\r\nContent-Type: application/octet-stream\r\nTransfer-encoding: chunked\r\nX-Request-ID: 0\r\n\r\n", + vec![("connection", "close"), ("content-type", "application/octet-stream"), ("transfer-encoding", "chunked"), ("x-request-id", "0")]) + ]; + + for (data, header_list) in tests.iter() { + let mut expected_headers = HashMap::new(); + for (key, val) in header_list.iter() { + expected_headers.insert(key.to_string(), val.to_string()); + } + + let (headers, _) = decode_http_response(data.as_bytes()).unwrap(); + assert_eq!(headers, expected_headers); + } +} + +#[test] +fn test_decode_http_response_err() { + let tests = vec![ + ("HTTP/1.1 400 Bad Request\r\nContent-Type: application/json\r\nContent-Length: 456\r\nFoo: Bar\r\nX-Request-ID: 0\r\n\r\n", + RPCError::HttpError(400)), + ("HTTP/1.1 200", + RPCError::Deserialize("".to_string())), + ("HTTP/1.1 200 OK\r\nfoo: \u{2764}\r\n\r\n", + RPCError::MalformedResponse("".to_string())), + ("HTTP/1.1 200 OK\r\nfoo: bar\r\nfoo: bar\r\n\r\n", + RPCError::MalformedResponse("".to_string())), + ]; + + for (data, expected_err_type) in tests.iter() { + let err_type = decode_http_response(data.as_bytes()).unwrap_err(); + match (err_type, expected_err_type) { + (RPCError::HttpError(x), RPCError::HttpError(y)) => assert_eq!(x, *y), + (RPCError::Deserialize(_), RPCError::Deserialize(_)) => {} + (RPCError::MalformedResponse(_), RPCError::MalformedResponse(_)) => {} + (x, y) => { + error!("expected error mismatch: {:?} != {:?}", &y, &x); + panic!(); + } + } + } +} + +#[test] +fn test_decode_http_body() { + let tests = vec![ + (true, ""), + (true, "this is the song that never ends"), + (false, ""), + (false, "this is the song that never ends"), + ]; + for (chunked, expected_body) in tests.iter() { + let (headers, encoded_body) = if *chunked { + let mut hdrs = HashMap::new(); + hdrs.insert("transfer-encoding".to_string(), "chunked".to_string()); + + let mut state = HttpChunkedTransferWriterState::new(5); + let mut buf = vec![]; + let mut body_bytes = expected_body.as_bytes().to_vec(); + let mut fd = HttpChunkedTransferWriter::from_writer_state(&mut buf, &mut state); + fd.write_all(&mut body_bytes).unwrap(); + fd.flush().unwrap(); + (hdrs, buf) + } else { + let hdrs = HashMap::new(); + let body = expected_body.as_bytes().to_vec(); + (hdrs, body) + }; + + let body = decode_http_body(&headers, &mut &encoded_body).unwrap(); + assert_eq!(&body[..], expected_body.as_bytes()); + } +} + +/// Mock HTTP socket for testing `run_http_request()`. +/// Implements Read and Write. +/// On Read, returns a given pre-set reply. +/// Buffers all written data on Write +struct MockHTTPSocket { + request: Vec, + reply: Vec, + ptr: usize, +} + +impl MockHTTPSocket { + fn new(reply: String) -> MockHTTPSocket { + MockHTTPSocket { + request: vec![], + reply: reply.as_bytes().to_vec(), + ptr: 0, + } + } +} + +impl Read for MockHTTPSocket { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut nr = 0; + while nr < buf.len() && self.ptr < self.reply.len() { + buf[nr] = self.reply[self.ptr]; + self.ptr += 1; + nr += 1; + } + Ok(nr) + } +} + +impl Write for MockHTTPSocket { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.request.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[test] +fn test_run_http_request_with_body() { + let tests = vec![ + ("GET", "/test-no-content-type-and-no-body", None, vec![]), + ( + "GET", + "/test-content-type-and-no-body", + Some("application/octet-stream"), + vec![], + ), + ( + "GET", + "/test-no-content-type-and-body", + None, + "hello world".as_bytes().to_vec(), + ), + ( + "GET", + "/test-content-type-and-body", + Some("application/octet-stream"), + "hello world".as_bytes().to_vec(), + ), + ]; + + for (verb, path, content_type, payload) in tests.into_iter() { + // test with chunking + let mut state = HttpChunkedTransferWriterState::new(5); + let mut buf = vec![]; + let mut body_bytes = "this is the song that never ends".as_bytes().to_vec(); + let mut fd = HttpChunkedTransferWriter::from_writer_state(&mut buf, &mut state); + fd.write_all(&mut body_bytes).unwrap(); + fd.flush().unwrap(); + + let mut msock_chunked = MockHTTPSocket::new(format!("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n{}", str::from_utf8(&buf).unwrap())); + + // test without chunking + let mut msock_plain = MockHTTPSocket::new(format!( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\n\r\n{}", + str::from_utf8(&body_bytes).unwrap() + )); + + let result_chunked = run_http_request( + &mut msock_chunked, + &"127.0.0.1:20443".parse().unwrap(), + verb, + path, + content_type, + &payload, + ) + .unwrap(); + assert_eq!(result_chunked, body_bytes); + + let result_plain = run_http_request( + &mut msock_plain, + &"127.0.0.1:20443".parse().unwrap(), + verb, + path, + content_type, + &payload, + ) + .unwrap(); + assert_eq!(result_plain, body_bytes); + } +} + +#[test] +fn test_run_http_request_no_body() { + let tests = vec![ + ("GET", "/test-no-content-type-and-no-body", None, vec![]), + ( + "GET", + "/test-content-type-and-no-body", + Some("application/octet-stream"), + vec![], + ), + ( + "GET", + "/test-no-content-type-and-body", + None, + "hello world".as_bytes().to_vec(), + ), + ( + "GET", + "/test-content-type-and-body", + Some("application/octet-stream"), + "hello world".as_bytes().to_vec(), + ), + ]; + + for (verb, path, content_type, payload) in tests.into_iter() { + // test with chunking + let mut msock_chunked = MockHTTPSocket::new("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n".to_string()); + + // test without chunking + let mut msock_plain = MockHTTPSocket::new( + "HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\n\r\n".to_string(), + ); + + let result_chunked = run_http_request( + &mut msock_chunked, + &"127.0.0.1:20443".parse().unwrap(), + verb, + path, + content_type, + &payload, + ) + .unwrap(); + let result_plain = run_http_request( + &mut msock_plain, + &"127.0.0.1:20443".parse().unwrap(), + verb, + path, + content_type, + &payload, + ) + .unwrap(); + + assert_eq!(result_chunked.len(), 0); + assert_eq!(result_plain.len(), 0); + } +} diff --git a/libsigner/src/tests/mod.rs b/libsigner/src/tests/mod.rs new file mode 100644 index 000000000..78c0b4d4e --- /dev/null +++ b/libsigner/src/tests/mod.rs @@ -0,0 +1,142 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +mod http; + +use std::io::Write; +use std::mem; +use std::net::{SocketAddr, TcpStream, ToSocketAddrs}; +use std::thread; +use std::time::Duration; + +use serde_json; + +use crate::{Signer, SignerRunLoop, StackerDBChunksEvent, StackerDBEventReceiver}; + +use clarity::vm::types::QualifiedContractIdentifier; + +use stacks_common::util::secp256k1::Secp256k1PrivateKey; +use stacks_common::util::sleep_ms; + +use libstackerdb::StackerDBChunkData; + +/// Simple runloop implementation. It receives `max_events` events and returns `events` from the +/// last call to `run_one_pass` as its final state. +struct SimpleRunLoop { + poll_timeout: Duration, + events: Vec, + max_events: usize, +} + +impl SimpleRunLoop { + pub fn new(max_events: usize) -> SimpleRunLoop { + SimpleRunLoop { + poll_timeout: Duration::from_millis(100), + events: vec![], + max_events, + } + } +} + +impl SignerRunLoop> for SimpleRunLoop { + fn set_event_timeout(&mut self, timeout: Duration) { + self.poll_timeout = timeout; + } + + fn get_event_timeout(&self) -> Duration { + self.poll_timeout.clone() + } + + fn run_one_pass( + &mut self, + event: Option, + ) -> Option> { + debug!("Got event: {:?}", &event); + if let Some(event) = event { + self.events.push(event); + } + + if self.events.len() >= self.max_events { + return Some(mem::replace(&mut self.events, vec![])); + } else { + return None; + } + } +} + +/// Set up a simple event listener thread and signer runloop thread, and verify that a mocked node +/// can feed the event listener events, which in turn get fed into the signer runloop for +/// processing. Verify that the event stop signaler can be used to terminate both the event loop +/// and the signer runloop. +#[test] +fn test_simple_signer() { + let ev = StackerDBEventReceiver::new(vec![QualifiedContractIdentifier::parse( + "ST2DS4MSWSGJ3W9FBC6BVT0Y92S345HY8N3T6AV7R.hello-world", + ) + .unwrap()]); + let mut signer = Signer::new(SimpleRunLoop::new(5), ev); + let endpoint: SocketAddr = "127.0.0.1:30000".parse().unwrap(); + let thread_endpoint = endpoint.clone(); + + let mut chunks = vec![]; + for i in 0..5 { + let privk = Secp256k1PrivateKey::new(); + let mut chunk = StackerDBChunkData::new(i as u32, 1, "hello world".as_bytes().to_vec()); + chunk.sign(&privk).unwrap(); + + let chunk_event = StackerDBChunksEvent { + contract_id: QualifiedContractIdentifier::parse( + "ST2DS4MSWSGJ3W9FBC6BVT0Y92S345HY8N3T6AV7R.hello-world", + ) + .unwrap(), + modified_slots: vec![chunk], + }; + chunks.push(chunk_event); + } + + let thread_chunks = chunks.clone(); + + // simulate a node that's trying to push data + let mock_stacks_node = thread::spawn(move || { + let mut num_sent = 0; + while num_sent < thread_chunks.len() { + let mut sock = match TcpStream::connect(&thread_endpoint) { + Ok(sock) => sock, + Err(..) => { + sleep_ms(100); + continue; + } + }; + + let body = serde_json::to_string(&thread_chunks[num_sent]).unwrap(); + let req = format!("POST /stackerdb_chunks HTTP/1.0\r\nConnection: close\r\nContent-Length: {}\r\n\r\n{}", &body.len(), body); + debug!("Send:\n{}", &req); + + sock.write_all(req.as_bytes()).unwrap(); + sock.flush().unwrap(); + + num_sent += 1; + } + }); + + let running_signer = signer.spawn(endpoint).unwrap(); + sleep_ms(5000); + let accepted_events = running_signer.stop().unwrap(); + + // runloop got the event that the mocked stacks node sent + assert_eq!(accepted_events, chunks); + mock_stacks_node.join().unwrap(); +}