feat: naka miner acts as signer set coordinator during block signing

* Replaces msg_id u32 with an enum for message identification
* Adds an additional slot for miner messages
* Adds a sync channel for listening to StackerDB events
* Adds a StackerDBs method for pushing a chunk locally and emitting event
* Uses a new message type to store DKG results, to be read by miners to instantiate coordinator
* Uses a test signing channel for nakamoto integration tests
* Currently builds with a branch of wsts
This commit is contained in:
Aaron Blankstein
2024-03-14 09:41:49 -05:00
parent 49eb61a705
commit 207cb690fe
30 changed files with 1904 additions and 625 deletions

View File

@@ -36,6 +36,7 @@ use stacks_common::codec::{
StacksMessageCodec,
};
pub use stacks_common::consts::SIGNER_SLOTS_PER_USER;
use stacks_common::types::chainstate::StacksPublicKey;
use stacks_common::util::hash::Sha512Trunc256Sum;
use tiny_http::{
Method as HttpMethod, Request as HttpRequest, Response as HttpResponse, Server as HttpServer,
@@ -64,8 +65,12 @@ pub struct BlockProposalSigners {
/// Event enum for newly-arrived signer subscribed events
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum SignerEvent {
/// The miner proposed blocks for signers to observe and sign
ProposedBlocks(Vec<BlockProposalSigners>),
/// The miner sent proposed blocks or messages for signers to observe and sign
ProposedBlocks(
Vec<BlockProposalSigners>,
Vec<SignerMessage>,
Option<StacksPublicKey>,
),
/// The signer messages for other signers and miners to observe
/// The u32 is the signer set to which the message belongs (either 0 or 1)
SignerMessages(u32, Vec<SignerMessage>),
@@ -255,7 +260,7 @@ impl EventReceiver for SignerEventReceiver {
/// Errors are recoverable -- the caller should call this method again even if it returns an
/// error.
fn next_event(&mut self) -> Result<SignerEvent, EventError> {
self.with_server(|event_receiver, http_server, is_mainnet| {
self.with_server(|event_receiver, http_server, _is_mainnet| {
// were we asked to terminate?
if event_receiver.is_stopped() {
return Err(EventError::Terminated);
@@ -278,21 +283,22 @@ impl EventReceiver for SignerEventReceiver {
)));
}
if request.url() == "/stackerdb_chunks" {
process_stackerdb_event(event_receiver.local_addr, request, is_mainnet)
process_stackerdb_event(event_receiver.local_addr, request)
.map_err(|e| {
error!("Error processing stackerdb_chunks message"; "err" => ?e);
e
})
} else if request.url() == "/proposal_response" {
process_proposal_response(request)
} else {
let url = request.url().to_string();
info!(
debug!(
"[{:?}] next_event got request with unexpected url {}, return OK so other side doesn't keep sending this",
event_receiver.local_addr,
request.url()
);
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
ack_dispatcher(request);
Err(EventError::UnrecognizedEvent(url))
}
})?
@@ -348,20 +354,22 @@ impl EventReceiver for SignerEventReceiver {
}
}
fn ack_dispatcher(request: HttpRequest) {
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
};
}
/// Process a stackerdb event from the node
fn process_stackerdb_event(
local_addr: Option<SocketAddr>,
mut request: HttpRequest,
is_mainnet: bool,
) -> Result<SignerEvent, EventError> {
debug!("Got stackerdb_chunks event");
let mut body = String::new();
if let Err(e) = request.as_reader().read_to_string(&mut body) {
error!("Failed to read body: {:?}", &e);
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
};
ack_dispatcher(request);
return Err(EventError::MalformedRequest(format!(
"Failed to read body: {:?}",
&e
@@ -371,47 +379,86 @@ fn process_stackerdb_event(
let event: StackerDBChunksEvent = serde_json::from_slice(body.as_bytes())
.map_err(|e| EventError::Deserialize(format!("Could not decode body to JSON: {:?}", &e)))?;
let signer_event = if event.contract_id == boot_code_id(MINERS_NAME, is_mainnet) {
let blocks: Vec<BlockProposalSigners> = event
.modified_slots
.iter()
.filter_map(|chunk| read_next::<BlockProposalSigners, _>(&mut &chunk.data[..]).ok())
.collect();
SignerEvent::ProposedBlocks(blocks)
} else if event.contract_id.name.to_string().starts_with(SIGNERS_NAME)
&& event.contract_id.issuer.1 == [0u8; 20]
{
let Some((signer_set, _)) =
get_signers_db_signer_set_message_id(event.contract_id.name.as_str())
else {
return Err(EventError::UnrecognizedStackerDBContract(event.contract_id));
};
// signer-XXX-YYY boot contract
let signer_messages: Vec<SignerMessage> = event
.modified_slots
.iter()
.filter_map(|chunk| read_next::<SignerMessage, _>(&mut &chunk.data[..]).ok())
.collect();
SignerEvent::SignerMessages(signer_set, signer_messages)
} else {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
local_addr,
event.contract_id
);
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
let event_contract_id = event.contract_id.clone();
let signer_event = match SignerEvent::try_from(event) {
Err(e) => {
info!(
"[{:?}] next_event got event from an unexpected contract id {}, return OK so other side doesn't keep sending this",
local_addr,
event_contract_id
);
ack_dispatcher(request);
return Err(e.into());
}
return Err(EventError::UnrecognizedStackerDBContract(event.contract_id));
Ok(x) => x,
};
if let Err(e) = request.respond(HttpResponse::empty(200u16)) {
error!("Failed to respond to request: {:?}", &e);
}
ack_dispatcher(request);
Ok(signer_event)
}
impl TryFrom<StackerDBChunksEvent> for SignerEvent {
type Error = EventError;
fn try_from(event: StackerDBChunksEvent) -> Result<Self, Self::Error> {
let signer_event = if event.contract_id.name.as_str() == MINERS_NAME
&& event.contract_id.issuer.1 == [0; 20]
{
let mut blocks = vec![];
let mut messages = vec![];
let mut miner_pk = None;
for chunk in event.modified_slots {
miner_pk = Some(chunk.recover_pk().map_err(|e| {
EventError::MalformedRequest(format!(
"Failed to recover PK from StackerDB chunk: {e}"
))
})?);
if chunk.slot_id % 2 == 0 {
// block
let Ok(block) =
BlockProposalSigners::consensus_deserialize(&mut chunk.data.as_slice())
else {
continue;
};
blocks.push(block);
} else if chunk.slot_id % 2 == 1 {
// message
let Ok(msg) = SignerMessage::consensus_deserialize(&mut chunk.data.as_slice())
else {
continue;
};
messages.push(msg);
} else {
return Err(EventError::UnrecognizedEvent(
"Unrecognized slot_id for miners contract".into(),
));
};
}
SignerEvent::ProposedBlocks(blocks, messages, miner_pk)
} else if event.contract_id.name.starts_with(SIGNERS_NAME)
&& event.contract_id.issuer.1 == [0u8; 20]
{
let Some((signer_set, _)) =
get_signers_db_signer_set_message_id(event.contract_id.name.as_str())
else {
return Err(EventError::UnrecognizedStackerDBContract(event.contract_id));
};
// signer-XXX-YYY boot contract
let signer_messages: Vec<SignerMessage> = event
.modified_slots
.iter()
.filter_map(|chunk| read_next::<SignerMessage, _>(&mut &chunk.data[..]).ok())
.collect();
SignerEvent::SignerMessages(signer_set, signer_messages)
} else {
return Err(EventError::UnrecognizedStackerDBContract(event.contract_id));
};
Ok(signer_event)
}
}
/// Process a proposal response from the node
fn process_proposal_response(mut request: HttpRequest) -> Result<SignerEvent, EventError> {
debug!("Got proposal_response event");
@@ -438,7 +485,7 @@ fn process_proposal_response(mut request: HttpRequest) -> Result<SignerEvent, Ev
Ok(SignerEvent::BlockValidationResponse(event))
}
fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
pub fn get_signers_db_signer_set_message_id(name: &str) -> Option<(u32, u32)> {
// Splitting the string by '-'
let parts: Vec<&str> = name.split('-').collect();
if parts.len() != 3 {

View File

@@ -49,7 +49,7 @@ pub use crate::events::{
SignerStopSignaler,
};
pub use crate::messages::{
BlockRejection, BlockResponse, RejectCode, SignerMessage, BLOCK_MSG_ID, TRANSACTIONS_MSG_ID,
BlockRejection, BlockResponse, MessageSlotID, RejectCode, SignerMessage,
};
pub use crate::runloop::{RunningSigner, Signer, SignerRunLoop};
pub use crate::session::{SignerSession, StackerDBSession};

View File

@@ -14,12 +14,23 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//! Messages in the signer-miner interaction have a multi-level hierarchy.
//! Signers send messages to each other through Packet messages. These messages,
//! as well as `BlockResponse`, `Transactions`, and `DkgResults` messages are stored
//! StackerDBs based on the `MessageSlotID` for the particular message type. This is a
//! shared identifier space between the four message kinds and their subtypes.
//!
//! These four message kinds are differentiated with a `SignerMessageTypePrefix`
//! and the `SignerMessage` enum.
use std::fmt::{Debug, Display};
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners;
use blockstack_lib::chainstate::nakamoto::NakamotoBlock;
use blockstack_lib::chainstate::stacks::events::StackerDBChunksEvent;
use blockstack_lib::chainstate::stacks::{StacksTransaction, ThresholdSignature};
@@ -54,28 +65,67 @@ use wsts::state_machine::{signer, SignError};
use crate::http::{decode_http_body, decode_http_request};
use crate::EventError;
// The slot IDS for each message type
const DKG_BEGIN_MSG_ID: u32 = 0;
const DKG_PRIVATE_BEGIN_MSG_ID: u32 = 1;
const DKG_END_BEGIN_MSG_ID: u32 = 2;
const DKG_END_MSG_ID: u32 = 3;
const DKG_PUBLIC_SHARES_MSG_ID: u32 = 4;
const DKG_PRIVATE_SHARES_MSG_ID: u32 = 5;
const NONCE_REQUEST_MSG_ID: u32 = 6;
const NONCE_RESPONSE_MSG_ID: u32 = 7;
const SIGNATURE_SHARE_REQUEST_MSG_ID: u32 = 8;
const SIGNATURE_SHARE_RESPONSE_MSG_ID: u32 = 9;
/// The slot ID for the block response for miners to observe
pub const BLOCK_MSG_ID: u32 = 10;
/// The slot ID for the transactions list for miners and signers to observe
pub const TRANSACTIONS_MSG_ID: u32 = 11;
define_u8_enum!(
/// Enum representing the stackerdb message identifier: this is
/// the contract index in the signers contracts (i.e., X in signers-0-X)
MessageSlotID {
/// DkgBegin message
DkgBegin = 0,
/// DkgPrivateBegin
DkgPrivateBegin = 1,
/// DkgEndBegin
DkgEndBegin = 2,
/// DkgEnd
DkgEnd = 3,
/// DkgPublicshares
DkgPublicShares = 4,
/// DkgPrivateShares
DkgPrivateShares = 5,
/// NonceRequest
NonceRequest = 6,
/// NonceResponse
NonceResponse = 7,
/// SignatureShareRequest
SignatureShareRequest = 8,
/// SignatureShareResponse
SignatureShareResponse = 9,
/// Block proposal responses for miners to observe
BlockResponse = 10,
/// Transactions list for miners and signers to observe
Transactions = 11,
/// DKG Results
DkgResults = 12
});
define_u8_enum!(SignerMessageTypePrefix {
BlockResponse = 0,
Packet = 1,
Transactions = 2
Transactions = 2,
DkgResults = 3
});
impl MessageSlotID {
/// Return the StackerDB contract corresponding to messages of this type
pub fn stacker_db_contract(
&self,
mainnet: bool,
reward_cycle: u64,
) -> QualifiedContractIdentifier {
NakamotoSigners::make_signers_db_contract_id(reward_cycle, self.to_u32(), mainnet)
}
/// Return the u32 identifier for the message slot (used to index the contract that stores it)
pub fn to_u32(&self) -> u32 {
self.to_u8().into()
}
}
impl Display for MessageSlotID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}({})", self, self.to_u8())
}
}
impl TryFrom<u8> for SignerMessageTypePrefix {
type Error = CodecError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
@@ -91,6 +141,7 @@ impl From<&SignerMessage> for SignerMessageTypePrefix {
SignerMessage::Packet(_) => SignerMessageTypePrefix::Packet,
SignerMessage::BlockResponse(_) => SignerMessageTypePrefix::BlockResponse,
SignerMessage::Transactions(_) => SignerMessageTypePrefix::Transactions,
SignerMessage::DkgResults { .. } => SignerMessageTypePrefix::DkgResults,
}
}
}
@@ -168,7 +219,7 @@ impl From<&RejectCode> for RejectCodeTypePrefix {
}
/// The messages being sent through the stacker db contracts
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Serialize, Deserialize)]
pub enum SignerMessage {
/// The signed/validated Nakamoto block for miners to observe
BlockResponse(BlockResponse),
@@ -176,30 +227,121 @@ pub enum SignerMessage {
Packet(Packet),
/// The list of transactions for miners and signers to observe that this signer cares about
Transactions(Vec<StacksTransaction>),
/// The results of a successful DKG
DkgResults {
/// The aggregate key from the DKG round
aggregate_key: Point,
/// The polynomial commits used to construct the aggregate key
party_polynomials: Vec<(u32, PolyCommitment)>,
},
}
impl Debug for SignerMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BlockResponse(b) => Debug::fmt(b, f),
Self::Packet(p) => Debug::fmt(p, f),
Self::Transactions(t) => f.debug_tuple("Transactions").field(t).finish(),
Self::DkgResults {
aggregate_key,
party_polynomials,
} => {
let party_polynomials: Vec<_> = party_polynomials
.iter()
.map(|(ix, commit)| (ix, commit.to_string()))
.collect();
f.debug_struct("DkgResults")
.field("aggregate_key", &aggregate_key.to_string())
.field("party_polynomials", &party_polynomials)
.finish()
}
}
}
}
impl SignerMessage {
/// Helper function to determine the slot ID for the provided stacker-db writer id
pub fn msg_id(&self) -> u32 {
pub fn msg_id(&self) -> MessageSlotID {
match self {
Self::Packet(packet) => match packet.msg {
Message::DkgBegin(_) => DKG_BEGIN_MSG_ID,
Message::DkgPrivateBegin(_) => DKG_PRIVATE_BEGIN_MSG_ID,
Message::DkgEndBegin(_) => DKG_END_BEGIN_MSG_ID,
Message::DkgEnd(_) => DKG_END_MSG_ID,
Message::DkgPublicShares(_) => DKG_PUBLIC_SHARES_MSG_ID,
Message::DkgPrivateShares(_) => DKG_PRIVATE_SHARES_MSG_ID,
Message::NonceRequest(_) => NONCE_REQUEST_MSG_ID,
Message::NonceResponse(_) => NONCE_RESPONSE_MSG_ID,
Message::SignatureShareRequest(_) => SIGNATURE_SHARE_REQUEST_MSG_ID,
Message::SignatureShareResponse(_) => SIGNATURE_SHARE_RESPONSE_MSG_ID,
Message::DkgBegin(_) => MessageSlotID::DkgBegin,
Message::DkgPrivateBegin(_) => MessageSlotID::DkgPrivateBegin,
Message::DkgEndBegin(_) => MessageSlotID::DkgEndBegin,
Message::DkgEnd(_) => MessageSlotID::DkgEnd,
Message::DkgPublicShares(_) => MessageSlotID::DkgPublicShares,
Message::DkgPrivateShares(_) => MessageSlotID::DkgPrivateShares,
Message::NonceRequest(_) => MessageSlotID::NonceRequest,
Message::NonceResponse(_) => MessageSlotID::NonceResponse,
Message::SignatureShareRequest(_) => MessageSlotID::SignatureShareRequest,
Message::SignatureShareResponse(_) => MessageSlotID::SignatureShareResponse,
},
Self::BlockResponse(_) => BLOCK_MSG_ID,
Self::Transactions(_) => TRANSACTIONS_MSG_ID,
Self::BlockResponse(_) => MessageSlotID::BlockResponse,
Self::Transactions(_) => MessageSlotID::Transactions,
Self::DkgResults { .. } => MessageSlotID::DkgResults,
}
}
}
impl SignerMessage {
/// Provide an interface for consensus serializing a DkgResults message
/// without constructing the DkgResults struct (this eliminates a clone)
pub fn serialize_dkg_result<'a, W: Write, I>(
fd: &mut W,
aggregate_key: &Point,
party_polynomials: I,
write_prefix: bool,
) -> Result<(), CodecError>
where
I: ExactSizeIterator + Iterator<Item = (&'a u32, &'a PolyCommitment)>,
{
if write_prefix {
SignerMessageTypePrefix::DkgResults
.to_u8()
.consensus_serialize(fd)?;
}
fd.write_all(&aggregate_key.compress().data)
.map_err(CodecError::WriteError)?;
let polynomials_len: u32 = party_polynomials
.len()
.try_into()
.map_err(|_| CodecError::ArrayTooLong)?;
polynomials_len.consensus_serialize(fd)?;
for (party_id, polynomial) in party_polynomials {
party_id.consensus_serialize(fd)?;
fd.write_all(&polynomial.id.id.to_bytes())
.map_err(CodecError::WriteError)?;
fd.write_all(&polynomial.id.kG.compress().data)
.map_err(CodecError::WriteError)?;
fd.write_all(&polynomial.id.kca.to_bytes())
.map_err(CodecError::WriteError)?;
let commit_len: u32 = polynomial
.poly
.len()
.try_into()
.map_err(|_| CodecError::ArrayTooLong)?;
commit_len.consensus_serialize(fd)?;
for poly in polynomial.poly.iter() {
fd.write_all(&poly.compress().data)
.map_err(CodecError::WriteError)?;
}
}
Ok(())
}
fn deserialize_point<R: Read>(fd: &mut R) -> Result<Point, CodecError> {
let mut bytes = [0; 33];
fd.read_exact(&mut bytes).map_err(CodecError::ReadError)?;
Point::try_from(&Compressed::from(bytes))
.map_err(|e| CodecError::DeserializeError(e.to_string()))
}
fn deserialize_scalar<R: Read>(fd: &mut R) -> Result<Scalar, CodecError> {
let mut bytes = [0; 32];
fd.read_exact(&mut bytes).map_err(CodecError::ReadError)?;
Ok(Scalar::from(bytes))
}
}
impl StacksMessageCodec for SignerMessage {
fn consensus_serialize<W: Write>(&self, fd: &mut W) -> Result<(), CodecError> {
write_next(fd, &(SignerMessageTypePrefix::from(self) as u8))?;
@@ -213,6 +355,17 @@ impl StacksMessageCodec for SignerMessage {
SignerMessage::Transactions(transactions) => {
write_next(fd, transactions)?;
}
SignerMessage::DkgResults {
aggregate_key,
party_polynomials,
} => {
Self::serialize_dkg_result(
fd,
aggregate_key,
party_polynomials.iter().map(|(a, b)| (a, b)),
false,
)?;
}
};
Ok(())
}
@@ -233,6 +386,46 @@ impl StacksMessageCodec for SignerMessage {
let transactions = read_next::<Vec<StacksTransaction>, _>(fd)?;
SignerMessage::Transactions(transactions)
}
SignerMessageTypePrefix::DkgResults => {
let aggregate_key = Self::deserialize_point(fd)?;
let party_polynomial_len = u32::consensus_deserialize(fd)?;
let mut party_polynomials = Vec::with_capacity(
party_polynomial_len
.try_into()
.expect("FATAL: u32 could not fit in usize"),
);
for _ in 0..party_polynomial_len {
let party_id = u32::consensus_deserialize(fd)?;
let polynomial_id_id = Self::deserialize_scalar(fd)?;
let polynomial_id_kg = Self::deserialize_point(fd)?;
let polynomial_id_kca = Self::deserialize_scalar(fd)?;
let commit_len = u32::consensus_deserialize(fd)?;
let mut polynomial_poly = Vec::with_capacity(
commit_len
.try_into()
.expect("FATAL: u32 could not fit in usize"),
);
for _ in 0..commit_len {
let poly = Self::deserialize_point(fd)?;
polynomial_poly.push(poly);
}
let polynomial_id = ID {
id: polynomial_id_id,
kG: polynomial_id_kg,
kca: polynomial_id_kca,
};
let polynomial = PolyCommitment {
id: polynomial_id,
poly: polynomial_poly,
};
party_polynomials.push((party_id, polynomial));
}
Self::DkgResults {
aggregate_key,
party_polynomials,
}
}
};
Ok(message)
}
@@ -1103,7 +1296,6 @@ impl From<BlockValidateReject> for SignerMessage {
#[cfg(test)]
mod test {
use blockstack_lib::chainstate::stacks::{
TransactionAnchorMode, TransactionAuth, TransactionPayload, TransactionPostConditionMode,
TransactionSmartContract, TransactionVersion,
@@ -1116,6 +1308,18 @@ mod test {
use wsts::common::Signature;
use super::{StacksMessageCodecExtensions, *};
#[test]
fn signer_slots_count_is_sane() {
let slot_identifiers_len = MessageSlotID::ALL.len();
assert!(
SIGNER_SLOTS_PER_USER as usize >= slot_identifiers_len,
"stacks_common::SIGNER_SLOTS_PER_USER ({}) must be >= slot identifiers ({})",
SIGNER_SLOTS_PER_USER,
slot_identifiers_len,
);
}
#[test]
fn serde_reject_code() {
let code = RejectCode::ValidationFailed(ValidateRejectCode::InvalidBlock);