feat: new libsigner implementation. Stacker signers and sBTC signers would use this crate to implement a Stacks event receiver and a runloop.

This commit is contained in:
Jude Nelson
2023-08-28 11:05:34 -04:00
parent ea67dc0fad
commit 4e306ce17c
9 changed files with 1682 additions and 0 deletions

46
libsigner/Cargo.toml Normal file
View File

@@ -0,0 +1,46 @@
[package]
name = "libsigner"
version = "0.0.1"
authors = [ "Jude Nelson <jude@stacks.org>" ]
license = "GPLv3"
homepage = "https://github.com/blockstack/stacks-blockchain"
repository = "https://github.com/blockstack/stacks-blockchain"
description = "Library for Stacks signer daemons"
keywords = [ "stacks", "stx", "bitcoin", "crypto", "blockstack", "decentralized", "dapps", "blockchain" ]
readme = "README.md"
resolver = "2"
edition = "2021"
[lib]
name = "libsigner"
path = "./src/libsigner.rs"
[[bin]]
name = "stackschat"
path = "demos/stackschat/main.rs"
[dependencies]
serde = "1"
serde_derive = "1"
serde_stacker = "0.1"
stacks-common = { path = "../stacks-common" }
clarity = { path = "../clarity" }
libstackerdb = { path = "../libstackerdb" }
slog = { version = "2.5.2", features = [ "max_level_trace" ] }
slog-term = "2.6.0"
slog-json = { version = "2.3.0", optional = true }
libc = "0.2"
[dependencies.serde_json]
version = "1.0"
features = ["arbitrary_precision", "unbounded_depth"]
[dependencies.secp256k1]
version = "0.24.3"
features = ["serde", "recovery"]
[target.'cfg(all(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64"), not(target_env = "msvc")))'.dependencies]
sha2 = { version = "0.10", features = ["asm"] }
[target.'cfg(any(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")), target_env = "msvc"))'.dependencies]
sha2 = { version = "0.10" }

131
libsigner/src/error.rs Normal file
View File

@@ -0,0 +1,131 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::error;
use std::fmt;
use std::io;
/// Errors originating from doing an RPC request to the Stacks node
#[derive(Debug)]
pub enum RPCError {
IO(io::Error),
Deserialize(String),
NotConnected,
MalformedRequest(String),
MalformedResponse(String),
HttpError(u32),
}
impl fmt::Display for RPCError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RPCError::IO(ref s) => fmt::Display::fmt(s, f),
RPCError::Deserialize(ref s) => fmt::Display::fmt(s, f),
RPCError::HttpError(ref s) => {
write!(f, "HTTP code {}", s)
}
RPCError::MalformedRequest(ref s) => {
write!(f, "Malformed request: {}", s)
}
RPCError::MalformedResponse(ref s) => {
write!(f, "Malformed response: {}", s)
}
RPCError::NotConnected => {
write!(f, "Not connected")
}
}
}
}
impl error::Error for RPCError {
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
RPCError::IO(ref s) => Some(s),
RPCError::Deserialize(..) => None,
RPCError::HttpError(..) => None,
RPCError::MalformedRequest(..) => None,
RPCError::MalformedResponse(..) => None,
RPCError::NotConnected => None,
}
}
}
impl From<io::Error> for RPCError {
fn from(e: io::Error) -> RPCError {
RPCError::IO(e)
}
}
/// Errors originating from receiving event data from the Stacks node
#[derive(Debug)]
pub enum EventError {
IO(io::Error),
Deserialize(String),
MalformedRequest(String),
NotBound,
Terminated,
AlreadyRunning,
FailedToStart,
UnrecognizedEvent(String),
}
impl fmt::Display for EventError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
EventError::IO(ref s) => fmt::Display::fmt(s, f),
EventError::Deserialize(ref s) => fmt::Display::fmt(s, f),
EventError::MalformedRequest(ref s) => {
write!(f, "Malformed request: {}", s)
}
EventError::NotBound => {
write!(f, "Not bound to a port yet")
}
EventError::Terminated => {
write!(f, "Listener is terminated")
}
EventError::AlreadyRunning => {
write!(f, "Thread already running")
}
EventError::FailedToStart => {
write!(f, "Failed to start thread")
}
EventError::UnrecognizedEvent(ref ev) => {
write!(f, "Unrecognized event '{}'", &ev)
}
}
}
}
impl error::Error for EventError {
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
EventError::IO(ref s) => Some(s),
EventError::Deserialize(..) => None,
EventError::MalformedRequest(..) => None,
EventError::NotBound => None,
EventError::Terminated => None,
EventError::AlreadyRunning => None,
EventError::FailedToStart => None,
EventError::UnrecognizedEvent(..) => None,
}
}
}
impl From<io::Error> for EventError {
fn from(e: io::Error) -> EventError {
EventError::IO(e)
}
}

268
libsigner/src/events.rs Normal file
View File

@@ -0,0 +1,268 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::sync::mpsc::Sender;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::net::SocketAddr;
use std::net::TcpListener;
use std::net::TcpStream;
use std::io::Read;
use clarity::vm::types::QualifiedContractIdentifier;
use libstackerdb::StackerDBChunkData;
use serde::{Deserialize, Serialize};
use crate::http::{decode_http_body, decode_http_request};
use crate::EventError;
/// Event structure for newly-arrived StackerDB data
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct StackerDBChunksEvent {
pub contract_id: QualifiedContractIdentifier,
pub modified_slots: Vec<StackerDBChunkData>,
}
/// Trait to implement a stop-signaler for the event receiver thread.
/// The caller calls `send()` and the event receiver loop (which lives in a separate thread) will
/// terminate.
pub trait EventStopSignaler {
fn send(&mut self);
}
/// Trait to implement to handle StackerDB events sent by the Stacks node
pub trait EventReceiver {
/// The implementation of ST will ensure that a call to ST::send() will cause
/// the call to `is_stopped()` below to return true.
type ST: EventStopSignaler + Send + Sync;
/// Open a server socket to the given socket address.
fn bind(&mut self, listener: SocketAddr) -> Result<SocketAddr, EventError>;
/// Return the next event
fn next_event(&mut self) -> Result<StackerDBChunksEvent, EventError>;
/// Add a downstream event consumer
fn add_consumer(&mut self, event_out: Sender<StackerDBChunksEvent>);
/// Forward the event to downstream consumers
fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool;
/// Determine if the receiver should hang up
fn is_stopped(&self) -> bool;
/// Get a stop signal instance that, when sent, will cause this receiver to stop accepting new
/// events. Called after `bind()`.
fn get_stop_signaler(&mut self) -> Result<Self::ST, EventError>;
/// Main loop for the receiver.
/// Typically, this is started in a separate thread.
fn main_loop(&mut self) {
loop {
if self.is_stopped() {
info!("Event receiver stopped");
break;
}
let next_event = match self.next_event() {
Ok(event) => event,
Err(EventError::UnrecognizedEvent(..)) => {
// got an event that we don't care about (not a problem)
continue;
}
Err(EventError::Terminated) => {
// we're done
info!("Caught termination signal");
break;
}
Err(e) => {
warn!("Failed to receive next event: {:?}", &e);
continue;
}
};
if !self.forward_event(next_event) {
info!("Failed to forward event");
break;
}
}
info!("Event receiver main loop exit");
}
}
pub struct StackerDBEventReceiver {
/// contracts we're listening for
pub stackerdb_contract_ids: Vec<QualifiedContractIdentifier>,
/// Address we bind to
local_addr: Option<SocketAddr>,
/// server socket that listens for HTTP POSTs from the node
sock: Option<TcpListener>,
/// channel into which to write newly-discovered data
out_channels: Vec<Sender<StackerDBChunksEvent>>,
/// inter-thread stop variable -- if set to true, then the `main_loop` will exit
stop_signal: Arc<AtomicBool>,
}
impl StackerDBEventReceiver {
/// Make a new StackerDB event receiver, and return both the receiver and the read end of a
/// channel into which node-received data can be obtained.
pub fn new(contract_ids: Vec<QualifiedContractIdentifier>) -> StackerDBEventReceiver {
let stackerdb_receiver = StackerDBEventReceiver {
stackerdb_contract_ids: contract_ids,
sock: None,
local_addr: None,
out_channels: vec![],
stop_signal: Arc::new(AtomicBool::new(false)),
};
stackerdb_receiver
}
/// Do something with the socket
pub fn with_socket<F, R>(&mut self, todo: F) -> Result<R, EventError>
where
F: FnOnce(&mut StackerDBEventReceiver, &mut TcpListener) -> R,
{
let mut sock = if let Some(s) = self.sock.take() {
s
} else {
return Err(EventError::NotBound);
};
let res = todo(self, &mut sock);
self.sock = Some(sock);
Ok(res)
}
}
/// Stop signaler implementation
pub struct StackerDBStopSignaler {
stop_signal: Arc<AtomicBool>,
local_addr: SocketAddr,
}
impl StackerDBStopSignaler {
pub fn new(sig: Arc<AtomicBool>, local_addr: SocketAddr) -> StackerDBStopSignaler {
StackerDBStopSignaler {
stop_signal: sig,
local_addr,
}
}
}
impl EventStopSignaler for StackerDBStopSignaler {
fn send(&mut self) {
self.stop_signal.store(true, Ordering::SeqCst);
// wake up the thread so the atomicbool can be checked
let _ = TcpStream::connect(&self.local_addr);
}
}
impl EventReceiver for StackerDBEventReceiver {
type ST = StackerDBStopSignaler;
/// Start listening on the given socket address.
/// Returns the address that was bound.
/// Errors out if bind(2) fails
fn bind(&mut self, listener: SocketAddr) -> Result<SocketAddr, EventError> {
let srv = TcpListener::bind(listener)?;
let bound_addr = srv.local_addr()?;
self.sock = Some(srv);
self.local_addr = Some(bound_addr.clone());
Ok(bound_addr)
}
/// Wait for the node to post something, and then return it.
/// Errors are recoverable -- the caller should call this method again even if it returns an
/// error.
fn next_event(&mut self) -> Result<StackerDBChunksEvent, EventError> {
self.with_socket(|event_receiver, server_sock| {
let (mut node_sock, _) = server_sock.accept()?;
// were we asked to terminate?
if event_receiver.is_stopped() {
return Err(EventError::Terminated);
}
let mut buf = vec![];
node_sock.read_to_end(&mut buf)?;
let (verb, path, headers, body_offset) = decode_http_request(&buf)?;
if verb != "POST" {
return Err(EventError::MalformedRequest(format!(
"Unrecognized verb '{}'",
&verb
)));
}
if path != "/stackerdb_chunks" {
return Err(EventError::UnrecognizedEvent(path));
}
let body = decode_http_body(&headers, &buf[body_offset..])?;
let event: StackerDBChunksEvent = serde_json::from_slice(&body).map_err(|e| {
EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e))
})?;
Ok(event)
})?
}
/// Determine if the receiver is hung up
fn is_stopped(&self) -> bool {
self.stop_signal.load(Ordering::SeqCst)
}
/// Forward an event
/// Return true on success; false on error.
/// Returning false terminates the event receiver.
fn forward_event(&mut self, ev: StackerDBChunksEvent) -> bool {
if self.out_channels.len() == 0 {
// nothing to do
error!("No channels connected to event receiver");
return false;
} else if self.out_channels.len() == 1 {
// avoid a clone
if let Err(e) = self.out_channels[0].send(ev) {
error!("Failed to send to signer runloop: {:?}", &e);
return false;
}
return true;
} else {
for (i, out_channel) in self.out_channels.iter().enumerate() {
if let Err(e) = out_channel.send(ev.clone()) {
error!("Failed to send to signer runloop #{}: {:?}", i, &e);
return false;
}
}
return true;
}
}
/// Add an event consumer. A received event will be forwarded to this Sender.
fn add_consumer(&mut self, out_channel: Sender<StackerDBChunksEvent>) {
self.out_channels.push(out_channel);
}
/// Get a stopped signaler. The caller can then use it to terminate the event receiver loop,
/// even if it's in a different thread.
fn get_stop_signaler(&mut self) -> Result<StackerDBStopSignaler, EventError> {
if let Some(local_addr) = self.local_addr.as_ref() {
Ok(StackerDBStopSignaler::new(
self.stop_signal.clone(),
local_addr.clone(),
))
} else {
Err(EventError::NotBound)
}
}
}

241
libsigner/src/http.rs Normal file
View File

@@ -0,0 +1,241 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::io;
use std::io::{Read, Write};
use std::net::SocketAddr;
use stacks_common::codec::MAX_MESSAGE_LEN;
use stacks_common::deps_common::httparse;
use stacks_common::util::chunked_encoding::*;
use crate::error::EventError;
use crate::error::RPCError;
pub const MAX_HTTP_HEADERS: usize = 32;
pub const MAX_HTTP_HEADER_LEN: usize = 4096;
/// Decode the HTTP request payload into its headers and body.
/// Returns (verb, path, table of headers, body_offset) on success
pub fn decode_http_request(
payload: &[u8],
) -> Result<(String, String, HashMap<String, String>, usize), EventError> {
// realistically, there won't be more than 32 headers
let mut headers_buf = [httparse::EMPTY_HEADER; MAX_HTTP_HEADERS];
let mut req = httparse::Request::new(&mut headers_buf);
let (verb, path, headers, body_offset) =
if let Ok(httparse::Status::Complete(body_offset)) = req.parse(payload) {
// version must be valid
match req
.version
.ok_or(EventError::MalformedRequest("No HTTP version".to_string()))?
{
0 => {}
1 => {}
_ => {
return Err(EventError::MalformedRequest(
"Invalid HTTP version".to_string(),
));
}
};
let verb = req
.method
.ok_or(EventError::MalformedRequest("No HTTP method".to_string()))?
.to_string();
let path = req
.path
.ok_or(EventError::MalformedRequest("No HTTP path".to_string()))?
.to_string();
let mut headers: HashMap<String, String> = HashMap::new();
for i in 0..req.headers.len() {
let value = String::from_utf8(req.headers[i].value.to_vec()).map_err(|_e| {
EventError::MalformedRequest("Invalid HTTP header value: not utf-8".to_string())
})?;
if !value.is_ascii() {
return Err(EventError::MalformedRequest(format!(
"Invalid HTTP request: header value is not ASCII-US"
)));
}
if value.len() > MAX_HTTP_HEADER_LEN {
return Err(EventError::MalformedRequest(format!(
"Invalid HTTP request: header value is too big"
)));
}
let key = req.headers[i].name.to_string().to_lowercase();
if headers.get(&key).is_some() {
return Err(EventError::MalformedRequest(format!(
"Invalid HTTP request: duplicate header \"{}\"",
key
)));
}
headers.insert(key, value);
}
(verb, path, headers, body_offset)
} else {
return Err(EventError::Deserialize(
"Failed to decode HTTP headers".to_string(),
));
};
Ok((verb, path, headers, body_offset))
}
/// Decode the HTTP response payload into its headers and body.
/// Return the offset into payload where the body starts, and a table of headers.
///
/// If the payload contains a status code other than 200, then RPCERror::HttpError(..) will be
/// returned with the status code.
/// If the payload is missing necessary data, then RPCError::MalformedResponse(..) will be
/// returned, with a human-readable reason string.
/// If the payload does not contain a full HTTP header list, then RPCError::Deserialize(..) will be
/// returned. This can happen if there are more than MAX_HTTP_HEADERS in the payload, for example.
pub fn decode_http_response(payload: &[u8]) -> Result<(HashMap<String, String>, usize), RPCError> {
// realistically, there won't be more than 32 headers
let mut headers_buf = [httparse::EMPTY_HEADER; MAX_HTTP_HEADERS];
let mut resp = httparse::Response::new(&mut headers_buf);
// consume respuest
let (headers, body_offset) =
if let Ok(httparse::Status::Complete(body_offset)) = resp.parse(payload) {
if let Some(code) = resp.code {
if code != 200 {
return Err(RPCError::HttpError(code.into()));
}
} else {
return Err(RPCError::MalformedResponse(
"No HTTP status code returned".to_string(),
));
}
if let Some(version) = resp.version {
if version != 0 && version != 1 {
return Err(RPCError::MalformedResponse(format!(
"Unrecognized HTTP code {}",
version
)));
}
} else {
return Err(RPCError::MalformedResponse(
"No HTTP version given".to_string(),
));
}
let mut headers: HashMap<String, String> = HashMap::new();
for i in 0..resp.headers.len() {
let value = String::from_utf8(resp.headers[i].value.to_vec()).map_err(|_e| {
RPCError::MalformedResponse("Invalid HTTP header value: not utf-8".to_string())
})?;
if !value.is_ascii() {
return Err(RPCError::MalformedResponse(
"Invalid HTTP response: header value is not ASCII-US".to_string(),
));
}
if value.len() > MAX_HTTP_HEADER_LEN {
return Err(RPCError::MalformedResponse(format!(
"Invalid HTTP response: header value is too big"
)));
}
let key = resp.headers[i].name.to_string().to_lowercase();
if headers.contains_key(&key) {
return Err(RPCError::MalformedResponse(format!(
"Invalid HTTP respuest: duplicate header \"{}\"",
key
)));
}
headers.insert(key, value);
}
(headers, body_offset)
} else {
return Err(RPCError::Deserialize(
"Failed to decode HTTP headers".to_string(),
));
};
Ok((headers, body_offset))
}
/// Decode an HTTP body, given the headers.
pub fn decode_http_body(headers: &HashMap<String, String>, mut buf: &[u8]) -> io::Result<Vec<u8>> {
let chunked = if let Some(val) = headers.get("transfer-encoding") {
val == "chunked"
} else {
false
};
let body = if chunked {
// chunked encoding
let ptr = &mut buf;
let mut fd = HttpChunkedTransferReader::from_reader(ptr, MAX_MESSAGE_LEN.into());
let mut decoded_body = vec![];
fd.read_to_end(&mut decoded_body)?;
decoded_body
} else {
// body is just as-is
buf.to_vec()
};
Ok(body)
}
/// Run an HTTP request, synchronously, through the given read/write handle
/// Return the HTTP reply, decoded if it was chunked
pub fn run_http_request<S: Read + Write>(
sock: &mut S,
host: &SocketAddr,
verb: &str,
path: &str,
content_type: Option<&str>,
payload: &[u8],
) -> Result<Vec<u8>, RPCError> {
let content_length_hdr = if payload.len() > 0 {
format!("Content-Length: {}\r\n", payload.len())
} else {
"".to_string()
};
let req_txt = if let Some(content_type) = content_type {
format!(
"{} {} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\nContent-Type: {}\r\n{}User-Agent: libsigner/0.1\r\nAccept: */*\r\n\r\n",
verb, path, format!("{}", host), content_type, content_length_hdr
)
} else {
format!(
"{} {} HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n{}User-Agent: libsigner/0.1\r\nAccept: */*\r\n\r\n",
verb, path, format!("{}", host), content_length_hdr
)
};
debug!("HTTP request\n{}", &req_txt);
sock.write_all(req_txt.as_bytes())?;
sock.write_all(payload)?;
let mut buf = vec![];
sock.read_to_end(&mut buf)?;
let (headers, body_offset) = decode_http_response(&buf)?;
if body_offset >= buf.len() {
// no body
debug!("No HTTP body");
return Ok(vec![]);
}
decode_http_body(&headers, &buf[body_offset..]).map_err(|e| e.into())
}

View File

@@ -0,0 +1,47 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#![allow(unused_imports)]
#![allow(dead_code)]
#[macro_use(o, slog_log, slog_trace, slog_debug, slog_info, slog_warn, slog_error)]
extern crate slog;
extern crate serde;
extern crate serde_json;
#[macro_use]
extern crate stacks_common;
extern crate clarity;
extern crate libc;
#[cfg(test)]
mod tests;
mod error;
mod events;
mod http;
mod runloop;
mod session;
pub use crate::session::{SignerSession, StackerDBSession};
pub use crate::error::{EventError, RPCError};
pub use crate::runloop::{RunningSigner, Signer, SignerRunLoop};
pub use crate::events::{
EventReceiver, EventStopSignaler, StackerDBChunksEvent, StackerDBEventReceiver,
StackerDBStopSignaler,
};

244
libsigner/src/runloop.rs Normal file
View File

@@ -0,0 +1,244 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#![allow(unused_imports)]
#![allow(dead_code)]
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::events::{EventReceiver, EventStopSignaler, StackerDBChunksEvent};
use crate::error::EventError;
use stacks_common::deps_common::ctrlc as termination;
use stacks_common::deps_common::ctrlc::SignalId;
use libc;
/// Some libcs, like musl, have a very small stack size.
/// Make sure it's big enough.
const THREAD_STACK_SIZE: usize = 128 * 1024 * 1024; // 128 MB
/// stderr fileno
const STDERR: i32 = 2;
/// Trait describing the needful components of a top-level runloop.
/// This is where the signer business logic would go.
/// Implement this, and you get all the multithreaded setup for free.
pub trait SignerRunLoop<R> {
/// Hint to set how long to wait for new events
fn set_event_timeout(&mut self, timeout: Duration);
/// Getter for the event poll timeout
fn get_event_timeout(&self) -> Duration;
/// Run one pass of the event loop, given new StackerDB events discovered since the last pass.
/// Returns Some(R) if this is the final pass -- the runloop evaluated to R
/// Returns None to keep running.
fn run_one_pass(&mut self, events: Option<StackerDBChunksEvent>) -> Option<R>;
/// This is the main loop body for the signer. It continuously receives events from
/// `event_recv`, polling for up to `self.get_event_timeout()` units of time. Once it has
/// polled for events, they are fed into `run_one_pass()`. This continues until either
/// `run_one_pass()` returns `false`, or the event receiver hangs up. At this point, this
/// method calls the `event_stop_signaler.send()` to terminate the receiver.
///
/// This would run in a separate thread from the event receiver.
fn main_loop<EVST: EventStopSignaler>(
&mut self,
event_recv: Receiver<StackerDBChunksEvent>,
mut event_stop_signaler: EVST,
) -> Option<R> {
loop {
let poll_timeout = self.get_event_timeout();
let next_event_opt = match event_recv.recv_timeout(poll_timeout) {
Ok(event) => Some(event),
Err(RecvTimeoutError::Timeout) => None,
Err(RecvTimeoutError::Disconnected) => {
info!("Event receiver disconnected");
return None;
}
};
if let Some(final_state) = self.run_one_pass(next_event_opt) {
// finished!
info!("Runloop exit; signaling event-receiver to stop");
event_stop_signaler.send();
return Some(final_state);
}
}
}
}
/// The top-level signer implementation
pub struct Signer<R, SL: SignerRunLoop<R> + Send + Sync, EV: EventReceiver + Send> {
/// the runloop itself
signer_loop: Option<SL>,
/// the event receiver to use
event_receiver: Option<EV>,
/// marker to permit the R type
_phantom: PhantomData<R>,
}
/// The running signer implementation
pub struct RunningSigner<EV: EventReceiver, R> {
/// join handle for signer runloop
signer_join: JoinHandle<Option<R>>,
/// join handle for event receiver
event_join: JoinHandle<()>,
/// kill signal for the event receiver
stop_signal: EV::ST,
}
impl<EV: EventReceiver, R> RunningSigner<EV, R> {
/// Stop the signer, and get the final state
pub fn stop(mut self) -> Option<R> {
// kill event receiver
self.stop_signal.send();
debug!("Try join event loop...");
// wait for event receiver join
let _ = self.event_join.join().map_err(|thread_panic| {
error!("Event thread panicked with: '{:?}'", &thread_panic);
thread_panic
});
info!("Event receiver thread joined");
// wait for runloop to join
debug!("Try join signer loop...");
let result_opt = self
.signer_join
.join()
.map_err(|thread_panic| {
error!("Event thread panicked with: '{:?}'", &thread_panic);
thread_panic
})
.unwrap_or(None);
info!("Signer thread joined");
result_opt
}
}
/// Write to stderr in an async-safe manner.
/// See signal-safety(7)
#[warn(unused)]
fn async_safe_write_stderr(msg: &str) {
#[cfg(windows)]
unsafe {
// write(2) inexplicably has a different ABI only on Windows.
libc::write(
STDERR,
msg.as_ptr() as *const libc::c_void,
msg.len() as u32,
);
}
#[cfg(not(windows))]
unsafe {
libc::write(STDERR, msg.as_ptr() as *const libc::c_void, msg.len());
}
}
/// This is a termination handler for handling receipt of a terminating UNIX signal, like SIGINT,
/// SIGQUIT, SIGTERM, or SIGBUS. You'd call this as part of the startup code for the signer daemon.
/// DO NOT call this from within the library!
pub fn set_runloop_signal_handler<ST: EventStopSignaler + Send + 'static>(mut stop_signaler: ST) {
termination::set_handler(move |sig_id| match sig_id {
SignalId::Bus => {
let msg = "Caught SIGBUS; crashing immediately and dumping core\n";
async_safe_write_stderr(msg);
unsafe {
libc::abort();
}
}
_ => {
let msg = format!("Graceful termination request received (signal `{}`), will complete the ongoing runloop cycles and terminate\n", sig_id);
async_safe_write_stderr(&msg);
stop_signaler.send();
}
}).expect("FATAL: failed to set signal handler");
}
impl<
R: Send + 'static,
SL: SignerRunLoop<R> + Send + Sync + 'static,
EV: EventReceiver + Send + 'static,
> Signer<R, SL, EV>
{
pub fn new(runloop: SL, event_receiver: EV) -> Signer<R, SL, EV> {
Signer {
signer_loop: Some(runloop),
event_receiver: Some(event_receiver),
_phantom: PhantomData,
}
}
/// This is a helper function to spawn both the runloop and event receiver in their own
/// threads. Advanced signers may not need this method, and instead opt to run the receiver
/// and runloop directly. However, this method is present to help signer developers to get
/// their implementations off the ground.
///
/// The given `bind_addr` is the server address this event receiver needs to listen on, so the
/// stacks node can POST events to it.
///
/// On success, this method consumes the Signer and returns a RunningSigner with the relevant
/// inter-thread communication primitives for the caller to shut down the system.
pub fn spawn(&mut self, bind_addr: SocketAddr) -> Result<RunningSigner<EV, R>, EventError> {
let mut event_receiver = self
.event_receiver
.take()
.ok_or(EventError::AlreadyRunning)?;
let mut signer_loop = self.signer_loop.take().ok_or(EventError::AlreadyRunning)?;
let (event_send, event_recv) = channel();
event_receiver.add_consumer(event_send);
event_receiver.bind(bind_addr)?;
let stop_signaler = event_receiver.get_stop_signaler()?;
let mut ret_stop_signaler = event_receiver.get_stop_signaler()?;
// start a thread for the event receiver
let event_thread = thread::Builder::new()
.name("event_receiver".to_string())
.stack_size(THREAD_STACK_SIZE)
.spawn(move || event_receiver.main_loop())
.map_err(|e| {
error!("EventReceiver failed to start: {:?}", &e);
EventError::FailedToStart
})?;
// start receiving events and doing stuff with them
let runloop_thread = thread::Builder::new()
.name("signer_runloop".to_string())
.stack_size(THREAD_STACK_SIZE)
.spawn(move || signer_loop.main_loop(event_recv, stop_signaler))
.map_err(|e| {
error!("SignerRunLoop failed to start: {:?}", &e);
ret_stop_signaler.send();
EventError::FailedToStart
})?;
let running_signer = RunningSigner {
signer_join: runloop_thread,
event_join: event_thread,
stop_signal: ret_stop_signaler,
};
Ok(running_signer)
}
}

216
libsigner/src/session.rs Normal file
View File

@@ -0,0 +1,216 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::net::SocketAddr;
use std::net::TcpStream;
use std::str;
use libstackerdb::{
stackerdb_get_chunk_path, stackerdb_get_metadata_path, stackerdb_post_chunk_path, SlotMetadata,
StackerDBChunkAckData, StackerDBChunkData,
};
use clarity::vm::types::QualifiedContractIdentifier;
use crate::error::RPCError;
use crate::http::run_http_request;
use serde_json;
pub trait SignerSession {
fn connect(
&mut self,
host: SocketAddr,
stackerdb_contract_id: QualifiedContractIdentifier,
) -> Result<(), RPCError>;
fn list_chunks(&mut self) -> Result<Vec<SlotMetadata>, RPCError>;
fn get_chunks(
&mut self,
slots_and_versions: &[(u32, u32)],
) -> Result<Vec<Option<Vec<u8>>>, RPCError>;
fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result<Vec<Option<Vec<u8>>>, RPCError>;
fn put_chunk(&mut self, chunk: StackerDBChunkData) -> Result<StackerDBChunkAckData, RPCError>;
/// Get a single chunk with the given version
/// Returns Ok(Some(..)) if the chunk exists
/// Returns Ok(None) if the chunk with the given version does not exist
/// Returns Err(..) on transport error
fn get_chunk(&mut self, slot_id: u32, version: u32) -> Result<Option<Vec<u8>>, RPCError> {
Ok(self.get_chunks(&[(slot_id, version)])?[0].clone())
}
/// Get a single latest chunk.
/// Returns Ok(Some(..)) if the slot exists
/// Returns Ok(None) if not
/// Returns Err(..) on transport error
fn get_latest_chunk(&mut self, slot_id: u32) -> Result<Option<Vec<u8>>, RPCError> {
Ok(self.get_latest_chunks(&[(slot_id)])?[0].clone())
}
}
/// signer session for a stackerdb instance
pub struct StackerDBSession {
/// host we're talking to
pub host: SocketAddr,
/// contract we're talking to
pub stackerdb_contract_id: QualifiedContractIdentifier,
/// connection to the replica
sock: Option<TcpStream>,
}
impl StackerDBSession {
/// instantiate but don't connect
pub fn new(
host: SocketAddr,
stackerdb_contract_id: QualifiedContractIdentifier,
) -> StackerDBSession {
StackerDBSession {
host,
stackerdb_contract_id,
sock: None,
}
}
/// connect or reconnect to the node
fn connect_or_reconnect(&mut self) -> Result<(), RPCError> {
debug!("connect to {}", &self.host);
self.sock = Some(TcpStream::connect(self.host.clone())?);
Ok(())
}
/// Do something with the connected socket
fn with_socket<F, R>(&mut self, todo: F) -> Result<R, RPCError>
where
F: FnOnce(&mut StackerDBSession, &mut TcpStream) -> R,
{
if self.sock.is_none() {
self.connect_or_reconnect()?;
}
let mut sock = if let Some(s) = self.sock.take() {
s
} else {
return Err(RPCError::NotConnected);
};
let res = todo(self, &mut sock);
self.sock = Some(sock);
Ok(res)
}
/// send an HTTP RPC request and receive a reply.
/// Return the HTTP reply, decoded if it was chunked
fn rpc_request(
&mut self,
verb: &str,
path: &str,
content_type: Option<&str>,
payload: &[u8],
) -> Result<Vec<u8>, RPCError> {
self.with_socket(|session, sock| {
run_http_request(sock, &session.host, verb, path, content_type, payload)
})?
}
}
impl SignerSession for StackerDBSession {
/// connect to the replica
fn connect(
&mut self,
host: SocketAddr,
stackerdb_contract_id: QualifiedContractIdentifier,
) -> Result<(), RPCError> {
self.host = host;
self.stackerdb_contract_id = stackerdb_contract_id;
self.connect_or_reconnect()
}
/// query the replica for a list of chunks
fn list_chunks(&mut self) -> Result<Vec<SlotMetadata>, RPCError> {
let bytes = self.rpc_request(
"GET",
&stackerdb_get_metadata_path(self.stackerdb_contract_id.clone()),
None,
&[],
)?;
let metadata: Vec<SlotMetadata> = serde_json::from_slice(&bytes)
.map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?;
Ok(metadata)
}
/// query the replica for zero or more chunks
fn get_chunks(
&mut self,
slots_and_versions: &[(u32, u32)],
) -> Result<Vec<Option<Vec<u8>>>, RPCError> {
let mut payloads = vec![];
for (slot_id, slot_version) in slots_and_versions.iter() {
let path = stackerdb_get_chunk_path(
self.stackerdb_contract_id.clone(),
*slot_id,
Some(*slot_version),
);
let chunk = match self.rpc_request("GET", &path, None, &[]) {
Ok(body_bytes) => Some(body_bytes),
Err(RPCError::HttpError(code)) => {
if code != 404 {
return Err(RPCError::HttpError(code));
}
None
}
Err(e) => {
return Err(e);
}
};
payloads.push(chunk);
}
Ok(payloads)
}
/// query the replica for zero or more latest chunks
fn get_latest_chunks(&mut self, slot_ids: &[u32]) -> Result<Vec<Option<Vec<u8>>>, RPCError> {
let mut payloads = vec![];
for slot_id in slot_ids.iter() {
let path = stackerdb_get_chunk_path(self.stackerdb_contract_id.clone(), *slot_id, None);
let chunk = match self.rpc_request("GET", &path, None, &[]) {
Ok(body_bytes) => Some(body_bytes),
Err(RPCError::HttpError(code)) => {
if code != 404 {
return Err(RPCError::HttpError(code));
}
None
}
Err(e) => {
return Err(e);
}
};
payloads.push(chunk);
}
Ok(payloads)
}
/// upload a chunk
fn put_chunk(&mut self, chunk: StackerDBChunkData) -> Result<StackerDBChunkAckData, RPCError> {
let body =
serde_json::to_vec(&chunk).map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?;
let path = stackerdb_post_chunk_path(self.stackerdb_contract_id.clone());
let resp_bytes = self.rpc_request("POST", &path, Some("application/json"), &body)?;
let ack: StackerDBChunkAckData = serde_json::from_slice(&resp_bytes)
.map_err(|e| RPCError::Deserialize(format!("{:?}", &e)))?;
Ok(ack)
}
}

347
libsigner/src/tests/http.rs Normal file
View File

@@ -0,0 +1,347 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::io;
use std::io::Read;
use std::io::Write;
use std::str;
use crate::error::EventError;
use crate::error::RPCError;
use crate::http::{decode_http_body, decode_http_request, decode_http_response, run_http_request};
use stacks_common::util::chunked_encoding::*;
#[test]
fn test_decode_http_request_ok() {
let tests = vec![
("GET /foo HTTP/1.1\r\nHost: localhost:6270\r\n\r\n",
("GET", "/foo", vec![("host", "localhost:6270")])),
("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\n\r\n",
("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar")])),
("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\n\r\n",
("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar")])),
("GET /foo HTTP/1.1\r\nConnection: close\r\nHost: localhost:6270\r\n\r\n",
("GET", "/foo", vec![("connection", "close"), ("host", "localhost:6270")])),
("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nConnection: close\r\nFoo: Bar\r\n\r\n",
("POST", "asdf", vec![("host", "core.blockstack.org"), ("connection", "close"), ("foo", "Bar")])),
("POST asdf HTTP/1.1\r\nHost: core.blockstack.org\r\nFoo: Bar\r\nConnection: close\r\n\r\n",
("POST", "asdf", vec![("host", "core.blockstack.org"), ("foo", "Bar"), ("connection", "close")])),
("GET /foo HTTP/1.1\r\nhOsT: localhost:6270\r\n\r\n",
("GET", "/foo", vec![("host", "localhost:6270")])),
("GET /foo HTTP/1.1\r\ncOnNeCtIoN: cLoSe\r\nhOsT: localhost:6270\r\n\r\n",
("GET", "/foo", vec![("connection", "cLoSe"), ("host", "localhost:6270")])),
("POST asdf HTTP/1.1\r\nhOsT: core.blockstack.org\r\nCOnNeCtIoN: kEeP-aLiVE\r\nFoo: Bar\r\n\r\n",
("POST", "asdf", vec![("host", "core.blockstack.org"), ("connection", "kEeP-aLiVE"), ("foo", "Bar")]))
];
for (data, (expected_verb, expected_path, headers_list)) in tests.iter() {
let mut expected_headers = HashMap::new();
for (key, val) in headers_list.iter() {
expected_headers.insert(key.to_string(), val.to_string());
}
let (verb, path, headers, _) = decode_http_request(data.as_bytes()).unwrap();
assert_eq!(verb, expected_verb.to_string());
assert_eq!(path, expected_path.to_string());
assert_eq!(headers, expected_headers);
}
}
#[test]
fn test_decode_http_request_err() {
let tests = vec![
(
"GET /foo HTTP/1.1\r\n",
EventError::Deserialize("".to_string()),
),
(
"GET /foo HTTP/\r\n\r\n",
EventError::Deserialize("".to_string()),
),
(
"GET /foo HTTP/1.1\r\nHost:",
EventError::Deserialize("".to_string()),
),
(
"GET /foo HTTP/1.1\r\nHost: foo:80\r\nHost: bar:80\r\n\r\n",
EventError::MalformedRequest("".to_string()),
),
(
"GET /foo HTTP/1.1\r\nHost: localhost:6270\r\nfoo: \u{2764}\r\n\r\n",
EventError::MalformedRequest("".to_string()),
),
];
for (data, expected_err_type) in tests.iter() {
let err = decode_http_request(data.as_bytes()).unwrap_err();
match (err, expected_err_type) {
(EventError::Deserialize(..), EventError::Deserialize(..)) => {}
(EventError::MalformedRequest(..), EventError::MalformedRequest(..)) => {}
(x, y) => {
error!("expected error mismatch: {:?} != {:?}", &y, &x);
panic!();
}
}
}
}
#[test]
fn test_decode_http_response_ok() {
let tests = vec![
("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 123\r\nX-Request-ID: 0\r\n\r\n",
vec![("content-type", "application/octet-stream"), ("content-length", "123"), ("x-request-id", "0")]),
("HTTP/1.1 200 Ok\r\nContent-Type: application/octet-stream\r\nTransfer-encoding: chunked\r\nX-Request-ID: 0\r\n\r\n",
vec![("content-type", "application/octet-stream"), ("transfer-encoding", "chunked"), ("x-request-id", "0")]),
("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 123\r\nConnection: close\r\nX-Request-ID: 0\r\n\r\n",
vec![("content-type", "application/octet-stream"), ("content-length", "123"), ("connection", "close"), ("x-request-id", "0")]),
("HTTP/1.1 200 Ok\r\nConnection: close\r\nContent-Type: application/octet-stream\r\nTransfer-encoding: chunked\r\nX-Request-ID: 0\r\n\r\n",
vec![("connection", "close"), ("content-type", "application/octet-stream"), ("transfer-encoding", "chunked"), ("x-request-id", "0")])
];
for (data, header_list) in tests.iter() {
let mut expected_headers = HashMap::new();
for (key, val) in header_list.iter() {
expected_headers.insert(key.to_string(), val.to_string());
}
let (headers, _) = decode_http_response(data.as_bytes()).unwrap();
assert_eq!(headers, expected_headers);
}
}
#[test]
fn test_decode_http_response_err() {
let tests = vec![
("HTTP/1.1 400 Bad Request\r\nContent-Type: application/json\r\nContent-Length: 456\r\nFoo: Bar\r\nX-Request-ID: 0\r\n\r\n",
RPCError::HttpError(400)),
("HTTP/1.1 200",
RPCError::Deserialize("".to_string())),
("HTTP/1.1 200 OK\r\nfoo: \u{2764}\r\n\r\n",
RPCError::MalformedResponse("".to_string())),
("HTTP/1.1 200 OK\r\nfoo: bar\r\nfoo: bar\r\n\r\n",
RPCError::MalformedResponse("".to_string())),
];
for (data, expected_err_type) in tests.iter() {
let err_type = decode_http_response(data.as_bytes()).unwrap_err();
match (err_type, expected_err_type) {
(RPCError::HttpError(x), RPCError::HttpError(y)) => assert_eq!(x, *y),
(RPCError::Deserialize(_), RPCError::Deserialize(_)) => {}
(RPCError::MalformedResponse(_), RPCError::MalformedResponse(_)) => {}
(x, y) => {
error!("expected error mismatch: {:?} != {:?}", &y, &x);
panic!();
}
}
}
}
#[test]
fn test_decode_http_body() {
let tests = vec![
(true, ""),
(true, "this is the song that never ends"),
(false, ""),
(false, "this is the song that never ends"),
];
for (chunked, expected_body) in tests.iter() {
let (headers, encoded_body) = if *chunked {
let mut hdrs = HashMap::new();
hdrs.insert("transfer-encoding".to_string(), "chunked".to_string());
let mut state = HttpChunkedTransferWriterState::new(5);
let mut buf = vec![];
let mut body_bytes = expected_body.as_bytes().to_vec();
let mut fd = HttpChunkedTransferWriter::from_writer_state(&mut buf, &mut state);
fd.write_all(&mut body_bytes).unwrap();
fd.flush().unwrap();
(hdrs, buf)
} else {
let hdrs = HashMap::new();
let body = expected_body.as_bytes().to_vec();
(hdrs, body)
};
let body = decode_http_body(&headers, &mut &encoded_body).unwrap();
assert_eq!(&body[..], expected_body.as_bytes());
}
}
/// Mock HTTP socket for testing `run_http_request()`.
/// Implements Read and Write.
/// On Read, returns a given pre-set reply.
/// Buffers all written data on Write
struct MockHTTPSocket {
request: Vec<u8>,
reply: Vec<u8>,
ptr: usize,
}
impl MockHTTPSocket {
fn new(reply: String) -> MockHTTPSocket {
MockHTTPSocket {
request: vec![],
reply: reply.as_bytes().to_vec(),
ptr: 0,
}
}
}
impl Read for MockHTTPSocket {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut nr = 0;
while nr < buf.len() && self.ptr < self.reply.len() {
buf[nr] = self.reply[self.ptr];
self.ptr += 1;
nr += 1;
}
Ok(nr)
}
}
impl Write for MockHTTPSocket {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.request.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[test]
fn test_run_http_request_with_body() {
let tests = vec![
("GET", "/test-no-content-type-and-no-body", None, vec![]),
(
"GET",
"/test-content-type-and-no-body",
Some("application/octet-stream"),
vec![],
),
(
"GET",
"/test-no-content-type-and-body",
None,
"hello world".as_bytes().to_vec(),
),
(
"GET",
"/test-content-type-and-body",
Some("application/octet-stream"),
"hello world".as_bytes().to_vec(),
),
];
for (verb, path, content_type, payload) in tests.into_iter() {
// test with chunking
let mut state = HttpChunkedTransferWriterState::new(5);
let mut buf = vec![];
let mut body_bytes = "this is the song that never ends".as_bytes().to_vec();
let mut fd = HttpChunkedTransferWriter::from_writer_state(&mut buf, &mut state);
fd.write_all(&mut body_bytes).unwrap();
fd.flush().unwrap();
let mut msock_chunked = MockHTTPSocket::new(format!("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n{}", str::from_utf8(&buf).unwrap()));
// test without chunking
let mut msock_plain = MockHTTPSocket::new(format!(
"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\n\r\n{}",
str::from_utf8(&body_bytes).unwrap()
));
let result_chunked = run_http_request(
&mut msock_chunked,
&"127.0.0.1:20443".parse().unwrap(),
verb,
path,
content_type,
&payload,
)
.unwrap();
assert_eq!(result_chunked, body_bytes);
let result_plain = run_http_request(
&mut msock_plain,
&"127.0.0.1:20443".parse().unwrap(),
verb,
path,
content_type,
&payload,
)
.unwrap();
assert_eq!(result_plain, body_bytes);
}
}
#[test]
fn test_run_http_request_no_body() {
let tests = vec![
("GET", "/test-no-content-type-and-no-body", None, vec![]),
(
"GET",
"/test-content-type-and-no-body",
Some("application/octet-stream"),
vec![],
),
(
"GET",
"/test-no-content-type-and-body",
None,
"hello world".as_bytes().to_vec(),
),
(
"GET",
"/test-content-type-and-body",
Some("application/octet-stream"),
"hello world".as_bytes().to_vec(),
),
];
for (verb, path, content_type, payload) in tests.into_iter() {
// test with chunking
let mut msock_chunked = MockHTTPSocket::new("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n".to_string());
// test without chunking
let mut msock_plain = MockHTTPSocket::new(
"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-type: text/plain\r\n\r\n".to_string(),
);
let result_chunked = run_http_request(
&mut msock_chunked,
&"127.0.0.1:20443".parse().unwrap(),
verb,
path,
content_type,
&payload,
)
.unwrap();
let result_plain = run_http_request(
&mut msock_plain,
&"127.0.0.1:20443".parse().unwrap(),
verb,
path,
content_type,
&payload,
)
.unwrap();
assert_eq!(result_chunked.len(), 0);
assert_eq!(result_plain.len(), 0);
}
}

142
libsigner/src/tests/mod.rs Normal file
View File

@@ -0,0 +1,142 @@
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
mod http;
use std::io::Write;
use std::mem;
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::thread;
use std::time::Duration;
use serde_json;
use crate::{Signer, SignerRunLoop, StackerDBChunksEvent, StackerDBEventReceiver};
use clarity::vm::types::QualifiedContractIdentifier;
use stacks_common::util::secp256k1::Secp256k1PrivateKey;
use stacks_common::util::sleep_ms;
use libstackerdb::StackerDBChunkData;
/// Simple runloop implementation. It receives `max_events` events and returns `events` from the
/// last call to `run_one_pass` as its final state.
struct SimpleRunLoop {
poll_timeout: Duration,
events: Vec<StackerDBChunksEvent>,
max_events: usize,
}
impl SimpleRunLoop {
pub fn new(max_events: usize) -> SimpleRunLoop {
SimpleRunLoop {
poll_timeout: Duration::from_millis(100),
events: vec![],
max_events,
}
}
}
impl SignerRunLoop<Vec<StackerDBChunksEvent>> for SimpleRunLoop {
fn set_event_timeout(&mut self, timeout: Duration) {
self.poll_timeout = timeout;
}
fn get_event_timeout(&self) -> Duration {
self.poll_timeout.clone()
}
fn run_one_pass(
&mut self,
event: Option<StackerDBChunksEvent>,
) -> Option<Vec<StackerDBChunksEvent>> {
debug!("Got event: {:?}", &event);
if let Some(event) = event {
self.events.push(event);
}
if self.events.len() >= self.max_events {
return Some(mem::replace(&mut self.events, vec![]));
} else {
return None;
}
}
}
/// Set up a simple event listener thread and signer runloop thread, and verify that a mocked node
/// can feed the event listener events, which in turn get fed into the signer runloop for
/// processing. Verify that the event stop signaler can be used to terminate both the event loop
/// and the signer runloop.
#[test]
fn test_simple_signer() {
let ev = StackerDBEventReceiver::new(vec![QualifiedContractIdentifier::parse(
"ST2DS4MSWSGJ3W9FBC6BVT0Y92S345HY8N3T6AV7R.hello-world",
)
.unwrap()]);
let mut signer = Signer::new(SimpleRunLoop::new(5), ev);
let endpoint: SocketAddr = "127.0.0.1:30000".parse().unwrap();
let thread_endpoint = endpoint.clone();
let mut chunks = vec![];
for i in 0..5 {
let privk = Secp256k1PrivateKey::new();
let mut chunk = StackerDBChunkData::new(i as u32, 1, "hello world".as_bytes().to_vec());
chunk.sign(&privk).unwrap();
let chunk_event = StackerDBChunksEvent {
contract_id: QualifiedContractIdentifier::parse(
"ST2DS4MSWSGJ3W9FBC6BVT0Y92S345HY8N3T6AV7R.hello-world",
)
.unwrap(),
modified_slots: vec![chunk],
};
chunks.push(chunk_event);
}
let thread_chunks = chunks.clone();
// simulate a node that's trying to push data
let mock_stacks_node = thread::spawn(move || {
let mut num_sent = 0;
while num_sent < thread_chunks.len() {
let mut sock = match TcpStream::connect(&thread_endpoint) {
Ok(sock) => sock,
Err(..) => {
sleep_ms(100);
continue;
}
};
let body = serde_json::to_string(&thread_chunks[num_sent]).unwrap();
let req = format!("POST /stackerdb_chunks HTTP/1.0\r\nConnection: close\r\nContent-Length: {}\r\n\r\n{}", &body.len(), body);
debug!("Send:\n{}", &req);
sock.write_all(req.as_bytes()).unwrap();
sock.flush().unwrap();
num_sent += 1;
}
});
let running_signer = signer.spawn(endpoint).unwrap();
sleep_ms(5000);
let accepted_events = running_signer.stop().unwrap();
// runloop got the event that the mocked stacks node sent
assert_eq!(accepted_events, chunks);
mock_stacks_node.join().unwrap();
}