WIP: broken integration tests due to getting aggregate public key. Add retry logic to all http requests

Signed-off-by: Jacinta Ferrant <jacinta@trustmachines.co>
This commit is contained in:
Jacinta Ferrant
2023-10-11 15:51:12 -07:00
parent 8d62f80533
commit 1c5fcca9ed
4 changed files with 130 additions and 63 deletions

12
Cargo.lock generated
View File

@@ -391,6 +391,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"getrandom 0.2.8",
"instant",
"rand 0.8.5",
]
[[package]]
name = "backtrace"
version = "0.3.67"
@@ -3541,6 +3552,7 @@ dependencies = [
name = "stacks-signer"
version = "0.0.1"
dependencies = [
"backoff",
"bincode",
"clap 4.4.1",
"clarity",

View File

@@ -20,6 +20,7 @@ name = "stacks-signer"
path = "src/main.rs"
[dependencies]
backoff = "0.4"
bincode = "1.3.3"
clarity = { path = "../clarity" }
clap = { version = "4.1.1", features = ["derive", "env"] }

View File

@@ -37,6 +37,8 @@ pub enum RunLoopCommand {
#[derive(PartialEq, Debug)]
pub enum State {
// TODO: add a state for startup: such as Unitialized where it indicates we need to replay events/configure the signer
/// The runloop signer is unitialized
Unitialized,
/// The runloop is idle
Idle,
/// The runloop is executing a DKG round
@@ -76,6 +78,7 @@ impl<C: Coordinatable> RunLoop<C> {
// We have an aggregate public key. Check if we need to cast our vote
// TODO: Add state to keep track of what contract calls we have made so we don't recast our vote needlessly and force blocks to include multiple of the same stx transactions
// TODO: should I use ok() here or log some error?
debug!("Let us try to retrieve some shit");
if self.stacks_client.get_aggregate_public_key().ok().is_none() {
// Note this is written under the assumption that if no conensus is reached in the contract, the pox contract will flush all votes and start over
match self.stacks_client.get_aggregate_public_key_vote() {
@@ -159,6 +162,31 @@ impl<C: Coordinatable> RunLoop<C> {
/// Helper function to check the current state, process the next command in the queue, and update state accordingly
fn process_next_command(&mut self) {
match self.state {
State::Unitialized => {
// Load the aggregate public key from the stacks client if it is set
match self.stacks_client.get_aggregate_public_key() {
Ok(key) => {
if key.is_none() {
// No aggregate public key. Check if we are the coordinator and trigger DKG accordingly
// TODO: should replays messages in case we are in the middle of a DKG round and have already sent a DKG vote through...
let (coordinator_id, _) = calculate_coordinator(&self.signing_round.public_keys);
if self.signing_round.signer_id == coordinator_id {
self.commands.push_front(RunLoopCommand::Dkg);
}
} else {
self.coordinator.set_aggregate_public_key(key);
}
self.state = State::Idle;
}
Err(e) => {
// TODO: is this a fatal error? If we fail at startup to access the stacks client to see if DKG was set...this seems pretty fatal..
panic!(
"Failed to load aggregate public key from stacks client: {:?}",
e
);
}
}
}
State::Idle => {
if let Some(command) = self.commands.pop_front() {
while !self.execute_command(&command) {
@@ -246,7 +274,7 @@ impl From<&Config> for RunLoop<FrostCoordinator<v2::Aggregator>> {
.iter()
.map(|i| i - 1) // SigningRound::new (unlike SigningRound::from) doesn't do this
.collect::<Vec<u32>>();
let mut coordinator = FrostCoordinator::new(
let coordinator = FrostCoordinator::new(
total_signers,
total_keys,
threshold,
@@ -262,36 +290,13 @@ impl From<&Config> for RunLoop<FrostCoordinator<v2::Aggregator>> {
config.signer_ids_public_keys.clone(),
);
let stacks_client = StacksClient::from(config);
let mut commands = VecDeque::new();
// Load the aggregate public key from the stacks client if it is set
match stacks_client.get_aggregate_public_key() {
Ok(key) => {
if key.is_none() {
// No aggregate public key. Check if we are the coordinator and trigger DKG accordingly
// TODO: should replays messages in case we are in the middle of a DKG round and have already sent a DKG vote through...
let (coordinator_id, _) = calculate_coordinator(&signing_round.public_keys);
if config.signer_id == coordinator_id {
commands.push_front(RunLoopCommand::Dkg);
}
} else {
coordinator.set_aggregate_public_key(key);
}
}
Err(e) => {
// TODO: is this a fatal error? If we fail at startup to access the stacks client to see if DKG was set...this seems pretty fatal..
panic!(
"Failed to load aggregate public key from stacks client: {:?}",
e
);
}
}
RunLoop {
event_timeout: config.event_timeout,
coordinator,
signing_round,
stacks_client,
commands,
state: State::Idle,
commands: VecDeque::new(),
state: State::Unitialized,
}
}
}

View File

@@ -1,3 +1,5 @@
use std::time::Duration;
use bincode::Error as BincodeError;
use blockstack_lib::{
burnchains::Txid,
@@ -104,6 +106,9 @@ pub enum ClientError {
/// Clarity error occurred
#[error("Clarity Error: {0}")]
ClarityError(#[from] ClarityError),
/// Backoff retry timeout
#[error("Backoff retry timeout occurred. Stacks node may be down.")]
RetryTimeout,
}
/// The Stacks signer client used to communicate with the stacker-db instance
@@ -188,7 +193,7 @@ impl StacksClient {
let function_name = ClarityName::try_from(function_name_str)
.map_err(|_| ClientError::InvalidClarityName(function_name_str.to_string()))?;
let (contract_addr, contract_name) = self.get_pox_contract()?;
let contract_response_hex = self.read_only_contract_call(
let contract_response_hex = self.read_only_contract_call_with_retry(
&contract_addr,
&contract_name,
&function_name,
@@ -204,7 +209,7 @@ impl StacksClient {
let function_name = ClarityName::try_from(function_name_str)
.map_err(|_| ClientError::InvalidClarityName(function_name_str.to_string()))?;
let (contract_addr, contract_name) = self.get_pox_contract()?;
let contract_response_hex = self.read_only_contract_call(
let contract_response_hex = self.read_only_contract_call_with_retry(
&contract_addr,
&contract_name,
&function_name,
@@ -240,10 +245,14 @@ impl StacksClient {
/// Helper function to retrieve the current reward cycle number from the stacks node
fn get_current_reward_cycle(&self) -> Result<u64, ClientError> {
let response = self.stacks_node_client.get(self.pox_path()).send()?;
if !response.status().is_success() {
return Err(ClientError::RequestFailure(response.status()));
}
debug!("Retrieving current reward cycle...");
let send_request = || {
self.stacks_node_client
.get(self.pox_path())
.send()
.map_err(backoff::Error::transient)
};
let response = retry_http_request(self.pox_path(), send_request)?;
let json_response = response.json::<serde_json::Value>()?;
let entry = "current_cycle";
json_response
@@ -255,16 +264,21 @@ impl StacksClient {
/// Helper function to retrieve the next possible nonce for the signer from the stacks node
fn get_next_possible_nonce(&self) -> Result<u64, ClientError> {
debug!("Retrieving the next possible nonce...");
todo!("Get the next possible nonce from the stacks node")
}
/// Helper function to retrieve the pox contract address and name from the stacks node
fn get_pox_contract(&self) -> Result<(StacksAddress, ContractName), ClientError> {
debug!("Retrieving pox contract ID...");
// TODO: we may want to cache the pox contract inside the client itself (calling this function once on init)
let response = self.stacks_node_client.get(self.pox_path()).send()?;
if !response.status().is_success() {
return Err(ClientError::RequestFailure(response.status()));
}
let send_request = || {
self.stacks_node_client
.get(self.pox_path())
.send()
.map_err(backoff::Error::transient)
};
let response = retry_http_request(self.pox_path(), send_request)?;
let json_response = response.json::<serde_json::Value>()?;
let entry = "contract_id";
let contract_id_string = json_response
@@ -301,6 +315,7 @@ impl StacksClient {
function_name: ClarityName,
function_args: &[ClarityValue],
) -> Result<Txid, ClientError> {
debug!("Making a contract call...");
let signed_tx = self.build_signed_transaction(
contract_addr,
contract_name,
@@ -353,12 +368,15 @@ impl StacksClient {
/// Helper function to submit a transaction to the Stacks node
fn submit_tx(&self, tx: Vec<u8>) -> Result<Txid, ClientError> {
let res = self
.stacks_node_client
.post(self.transaction_path())
.header("Content-Type", "application/octet-stream")
.body(tx.clone())
.send()?;
let send_request = || {
self.stacks_node_client
.post(self.transaction_path())
.header("Content-Type", "application/octet-stream")
.body(tx.clone())
.send()
.map_err(backoff::Error::transient)
};
let res = retry_http_request(self.transaction_path(), send_request)?;
debug!("Transaction submission response: {:?}", res);
if res.status().is_success() {
// On success, the response body should be the txid as a string (no JSON blob)
@@ -373,7 +391,7 @@ impl StacksClient {
}
/// Makes a read only contract call to a stacks contract
pub fn read_only_contract_call(
pub fn read_only_contract_call_with_retry(
&self,
contract_addr: &StacksAddress,
contract_name: &ContractName,
@@ -384,15 +402,16 @@ impl StacksClient {
let body = json!({"sender": self.stacks_address.to_string(), "arguments": function_args})
.to_string();
let path = self.read_only_path(contract_addr, contract_name, function_name);
let response = self
.stacks_node_client
.post(path)
.header("Content-Type", "application/json")
.body(body)
.send()?;
if !response.status().is_success() {
return Err(ClientError::RequestFailure(response.status()));
}
let path_clone = path.clone();
let send_request = || {
self.stacks_node_client
.post(path.clone())
.header("Content-Type", "application/json")
.body(body.clone())
.send()
.map_err(backoff::Error::transient)
};
let response = retry_http_request(path_clone, send_request)?;
let response = response.json::<serde_json::Value>()?;
if !response
.get("okay")
@@ -437,6 +456,32 @@ impl StacksClient {
}
}
/// Helper function to retry a HTTP request with exponential backoff
fn retry_http_request<F, E>(
path: String,
request_fn: F,
) -> Result<reqwest::blocking::Response, ClientError>
where
F: FnMut() -> Result<reqwest::blocking::Response, backoff::Error<E>>,
{
let notify = |_err, dur| {
debug!("Failed to connect to {}. Next attempt in {:?}", path, dur);
};
let backoff_timer = backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(2))
.with_max_interval(Duration::from_millis(128))
.build();
let response = backoff::retry_notify(backoff_timer, request_fn, notify)
.map_err(|_| ClientError::RetryTimeout)?;
if !response.status().is_success() {
return Err(ClientError::RequestFailure(response.status()));
}
Ok(response)
}
/// Helper function to determine the slot ID for the provided stacker-db writer id and the message type
fn slot_id(id: u32, message: &Message) -> u32 {
let slot_id = match message {
@@ -503,7 +548,7 @@ mod tests {
fn read_only_contract_call_200_success() {
let config = TestConfig::new();
let h = spawn(move || {
config.client.read_only_contract_call(
config.client.read_only_contract_call_with_retry(
&config.client.stacks_address,
&ContractName::try_from("contract-name").unwrap(),
&ClarityName::try_from("function-name").unwrap(),
@@ -522,7 +567,7 @@ mod tests {
fn read_only_contract_call_200_failure() {
let config = TestConfig::new();
let h = spawn(move || {
config.client.read_only_contract_call(
config.client.read_only_contract_call_with_retry(
&config.client.stacks_address,
&ContractName::try_from("contract-name").unwrap(),
&ClarityName::try_from("function-name").unwrap(),
@@ -542,7 +587,7 @@ mod tests {
let config = TestConfig::new();
// Simulate a 400 Bad Request response
let h = spawn(move || {
config.client.read_only_contract_call(
config.client.read_only_contract_call_with_retry(
&config.client.stacks_address,
&ContractName::try_from("contract-name").unwrap(),
&ClarityName::try_from("function-name").unwrap(),
@@ -564,7 +609,7 @@ mod tests {
let config = TestConfig::new();
// Simulate a 400 Bad Request response
let h = spawn(move || {
config.client.read_only_contract_call(
config.client.read_only_contract_call_with_retry(
&config.client.stacks_address,
&ContractName::try_from("contract-name").unwrap(),
&ClarityName::try_from("function-name").unwrap(),
@@ -676,7 +721,6 @@ mod tests {
)
.unwrap();
let mut tx_bytes = [0u8; 1024];
{
let mut tx_bytes_writer = BufWriter::new(&mut tx_bytes[..]);
@@ -693,10 +737,13 @@ mod tests {
.0
+ 1;
let tx_clone = tx.clone();
let tx_clone = tx.clone();
let h = spawn(move || config.client.submit_tx(tx_clone.serialize_to_vec()));
let request_bytes = write_response(config.mock_server, format!("HTTP/1.1 200 OK\n\n{}", tx.txid().to_string()).as_bytes());
let request_bytes = write_response(
config.mock_server,
format!("HTTP/1.1 200 OK\n\n{}", tx.txid()).as_bytes(),
);
let returned_txid = h.join().unwrap().unwrap();
assert_eq!(returned_txid, tx.txid());
@@ -712,16 +759,18 @@ mod tests {
#[test]
fn transaction_contract_call_should_succeed() {
let config = TestConfig::new();
let h = spawn(move || config
.client
.transaction_contract_call(
let h = spawn(move || {
config.client.transaction_contract_call(
&config.client.stacks_address,
ContractName::try_from("contract-name").unwrap(),
ClarityName::try_from("function-name").unwrap(),
&[],
)
});
write_response(
config.mock_server,
b"HTTP/1.1 200 OK\n\n4e99f99bc4a05437abb8c7d0c306618f45b203196498e2ebe287f10497124958",
);
write_response(config.mock_server, format!("HTTP/1.1 200 OK\n\n4e99f99bc4a05437abb8c7d0c306618f45b203196498e2ebe287f10497124958").as_bytes());
assert!(h.join().unwrap().is_ok());
}
}