Add sidecar scripts for fee estimation and stacks block delay detection

This commit is contained in:
AshtonStephens
2023-11-21 12:50:23 -05:00
parent 36995663b0
commit e585d258cc
4 changed files with 424 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
{
"toml_file_location": "Path/To/miner.toml",
"polling_delay_seconds": 60
}

View File

@@ -0,0 +1,6 @@
{
"polling_delay_seconds": 60,
"max_stacks_delay_seconds": 1500,
"recovery_delay_seconds": 660,
"shell_command": ["echo", "command not specified"]
}

View File

@@ -0,0 +1,188 @@
"""
Script to continuously update the `satoshis_per_byte` value in a TOML file with the
mean fee estimate from a list of API endpoints.
Usage:
$ COMMAND /path/to/miner.toml polling_delay_seconds
Args:
toml_file_location (str): The path to the TOML file to update.
polling_delay_seconds (int): The frequency in seconds to check for fee updates.
"""
import toml
import json
import requests
import time
from backoff_utils import strategies
from backoff_utils import apply_backoff
from sys import argv
# Fee estimation API URLS and their corresponding fee extraction functions.
# At least one of these needs to be working in order for the script to function.
FEE_ESTIMATIONS = [
# Bitcoiner Live API
(
'https://bitcoiner.live/api/fees/estimates/latest',
lambda response_json: response_json["estimates"]["30"]["sat_per_vbyte"],
),
# Mempool Space API
(
'https://mempool.space/api/v1/fees/recommended',
lambda response_json: response_json["halfHourFee"],
),
# Blockchain.info API
(
'https://api.blockchain.info/mempool/fees',
lambda response_json: response_json["regular"],
),
]
def calculate_fee_estimate():
"""
Calculates the mean fee estimate from a list of API URLs
and their corresponding fee extraction functions.
Args:
FEE_ESTIMATIONS (list): A list of tuples, where each tuple
contains the URL of an API endpoint and a function that extracts
the fee estimate from the JSON response.
Returns:
int: The mean fee estimate in sat/Byte.
Raises:
None
"""
# Gather all API estimated fees in sat/Byte
estimated_fees = []
for api_url, unpack_fee_estimate in FEE_ESTIMATIONS:
try:
json_response = json.loads(get_from_api(api_url))
estimated_fee = unpack_fee_estimate(json_response)
estimated_fees.append(estimated_fee)
except Exception as e:
pass
# Calculate the mean fee estimate
mean_fee = int(sum(estimated_fees) / len(estimated_fees))
return mean_fee
@apply_backoff(
strategy=strategies.Exponential,
catch_exceptions=(RuntimeError,),
max_tries=3,
max_delay=60,
)
def get_from_api(api_url: str) -> str:
"""
Sends a GET request to the specified API URL and returns the string response.
Args:
api_url (str): The URL of the API endpoint to call.
Returns:
dict: The string response data.
Raises:
RuntimeError: If the API call fails.
"""
try:
# Make a GET request to the API endpoint
response = requests.get(api_url)
# Check if the request was successful
if response.status_code == 200:
# Parse the response and return the data
return response.text
except Exception as e:
# If an exception occurs, raise a RuntimeError
raise RuntimeError("Failed to unpack JSON.")
# If the code reaches this point, it means the API call failed.
raise RuntimeError("Failed to get response.")
def update_config_fee(toml_file_location: str, polling_delay_seconds: int):
"""
Updates the `satoshis_per_byte` value in the specified TOML file
with the mean fee estimate from a list of API endpoints.
Args:
toml_file_location (str): The path to the TOML file to update.
Raises:
IOError: If the TOML file cannot be read or written.
RuntimeError: If the fee estimation process fails.
"""
while True:
# Calculate mean fee estimate from the list of APIs
fee_estimate = calculate_fee_estimate()
# Read toml file data
with open(toml_file_location, 'r') as toml_file:
toml_data = toml.load(toml_file)
# Update satoshis_per_byte data
toml_data["burnchain"]["satoshis_per_byte"] = fee_estimate
# Update toml file with configuration changes
with open(toml_file_location, 'w') as toml_file:
toml.dump(toml_data, toml_file)
time.sleep()
def read_config(config_location: str):
"""
Reads and returns the contents of a configuration file.
"""
with open(config_location, "r") as config_file:
return json.load(config_file)
def main():
"""
Continuously updates the `satoshis_per_byte` value in the specified
TOML file with the mean fee estimate from a list of API endpoints.
Usage:
$ {argv[0]} /path/to/miner.toml polling_delay
"""
try:
configuration = {}
if len(argv) == 1:
configuration = read_config("./config/fee-estimate.json")
elif "-c" in argv:
# Load configuration from specified file
config_location = argv[argv.index("-c") + 1]
configuration = read_config(config_location)
else:
# Load configuration from command-line arguments
configuration = {
"toml_file_location": argv[1],
"polling_delay_seconds": int(argv[2]),
}
update_config_fee(**configuration)
# Print usage if there are errors.
except Exception as e:
print(f"Failed to run {argv[0]}")
print(f"\n\t$ COMMAND /path/to/miner.toml polling_delay_seconds")
print("\t\tOR")
print(f"\t$ COMMAND -c /path/to/config_file.json\n")
print(f"Error: {e}")
# Execute main.
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,226 @@
"""
Monitors the time difference between Stacks blocks and Bitcoin blocks, triggering an event
when the time difference exceeds a specified threshold.
This script continuously checks the time difference between the latest Stacks
block and the latest Bitcoin block. If the time difference exceeds a user-defined
threshold, the script executes a user-defined shell command. The script utilizes
exponential backoff with retries and a maximum delay to handle temporary API outages.
Usage:
$ COMMAND polling_delay_seconds max_stacks_delay_seconds recovery_delay_seconds shell_command...
OR
$ COMMAND -c /path/to/config_file
Options:
polling_delay_seconds: The time interval between checking the time difference,
in seconds.
max_stacks_delay_seconds: The maximum acceptable time difference between
Stacks and Bitcoin blocks, in seconds.
recovery_delay_seconds: The delay after executing the shell command before
resuming monitoring, in seconds.
shell_command: The shell command to execute when the time difference exceeds
the threshold.
Alternatively, you can provide a configuration file using the -c option.
The configuration file should be a JSON file with the following fields:
```json
{
"polling_delay_seconds": <int>,
"max_stacks_delay_seconds": <int>,
"recovery_delay_seconds": <int>,
"shell_command": <list[str]>
}
```
Example:
```json
{
"polling_delay_seconds": 60,
"max_stacks_delay_seconds": 60,
"recovery_delay_seconds": 60,
"shell_command": ["echo", "hello, world!"],
}
```
"""
import toml
import json
import requests
import time
from backoff_utils import strategies
from backoff_utils import apply_backoff
from datetime import datetime
from sys import argv
import subprocess
# Stacks API endpoints.
API_URL_LATEST_STACKS_BLOCK = "https://api.mainnet.hiro.so/extended/v1/block?limit=1"
API_URL_LATEST_STACKS_TRANSACTION = "https://api.mainnet.hiro.so/extended/v1/tx/{transaction_id}"
# Bitcoin API endpoints.
API_URL_LATEST_BTC_BLOCK_HASH = "https://mempool.space/api/blocks/tip/hash"
API_URL_BTC_BLOCK_FROM_HASH = "https://mempool.space/api/block/{block_hash}"
@apply_backoff(
strategy=strategies.Exponential,
catch_exceptions=(RuntimeError,),
max_tries=3,
max_delay=60,
)
def get_from_api(api_url: str) -> dict:
"""
Sends a GET request to the specified API URL and returns the string response.
Args:
api_url (str): The URL of the API endpoint to call.
Returns:
dict: The string response data.
Raises:
RuntimeError: If the API call fails or the response cannot be parsed as JSON.
"""
try:
# Make a GET request to the API endpoint
response = requests.get(api_url)
# Check if the request was successful
if response.status_code == 200:
# Parse the response and return the data
return response.text
except Exception as e:
# If an exception occurs, raise a RuntimeError
raise RuntimeError("Failed to unpack JSON.")
# If the code reaches this point, it means the API call failed.
raise RuntimeError("Failed to get response.")
def get_latest_bitcoin_block_timestamp() -> int:
"""
Retrieves the timestamp of the latest Bitcoin block.
Returns:
int: The timestamp of the latest Bitcoin block.
"""
latest_btc_block_hash = get_from_api(API_URL_LATEST_BTC_BLOCK_HASH)
json_response = json.loads(get_from_api(
API_URL_BTC_BLOCK_FROM_HASH.format(block_hash=latest_btc_block_hash)))
return json_response["timestamp"]
def get_latest_stacks_block_timestamp() -> int:
"""
Retrieves the timestamp of the latest Stacks block.
Returns:
int: The timestamp of the latest Stacks block.
"""
latest_stacks_block_json = json.loads(get_from_api(API_URL_LATEST_STACKS_BLOCK))
return latest_stacks_block_json["results"][0]["burn_block_time"]
def stacks_block_delay_event_listener(
polling_delay_seconds: int,
max_stacks_delay_seconds: int,
recovery_delay_seconds: int,
shell_command: list[str],
):
"""
Continuously monitors the time between Stacks blocks and Bitcoin blocks.
If the time difference exceeds a specified threshold, the script executes
a user-defined shell command. The script utilizes exponential backoff with
retries and a maximum delay to handle temporary API outages.
Args:
polling_delay_seconds (int): The time interval between checking the
time difference, in seconds (default: 60).
max_stacks_delay_seconds (int): The maximum acceptable time difference
between Stacks and Bitcoin blocks, in seconds (default: 60).
recovery_delay_seconds (int): The delay after executing the shell
command before resuming monitoring, in seconds (default: 60).
shell_command (list[str]): The shell command to execute when the time
difference exceeds the threshold (default: ["echo", "hello"]).
"""
while True:
# Continuously retrieve the timestamps of the latest Stacks and Bitcoin blocks.
latest_stacks_block_timestamp = get_latest_stacks_block_timestamp()
latest_bitcoin_block_timestamp = get_latest_bitcoin_block_timestamp()
# Calculate the time difference between the latest Stacks and Bitcoin blocks.
stacks_block_delay = datetime.fromtimestamp(latest_bitcoin_block_timestamp) - \
datetime.fromtimestamp(latest_stacks_block_timestamp)
# If the time difference exceeds the specified threshold execute the shell command.
if stacks_block_delay.seconds > max_stacks_delay_seconds:
print(f"Delay between stacks and bitcoin block: {stacks_block_delay}")
print(f"$ {' '.join(shell_command)}")
subprocess.run(shell_command, shell=True)
time.sleep(recovery_delay_seconds) # Wait for the recovery period before resuming monitoring.
# If the time difference is within the acceptable range wait for the polling interval.
else:
time.sleep(polling_delay_seconds)
def read_config(config_location: str):
"""
Reads and returns the contents of a configuration file.
"""
with open(config_location, "r") as config_file:
return json.load(config_file)
def main():
"""
Continuously monitors the time between Stacks blocks and Bitcoin blocks,
triggering an event when thresholds are exceeded.
If the time difference exceeds a specified threshold, the script executes
a user-defined shell command. It utilizes exponential backoff with
retries and a maximum delay to handle temporary API outages.
"""
try:
configuration = {}
if len(argv) == 1:
configuration = read_config("./config/stacks-block-delay-event-trigger.json")
elif "-c" in argv:
# Load configuration from specified file
config_location = argv[argv.index("-c") + 1]
configuration = read_config(config_location)
else:
# Load configuration from command-line arguments
configuration = {
"polling_delay_seconds": int(argv[1]),
"max_stacks_delay_seconds": int(argv[2]),
"recovery_delay_seconds": int(argv[3]),
"shell_command": argv[4:],
}
stacks_block_delay_event_listener(**configuration)
# Print usage if there are errors.
except Exception as e:
print(f"Failed to run {argv[0]}")
print(f"\n\t$ COMMAND polling_delay_seconds max_stacks_delay_seconds recovery_delay_seconds shell_command...")
print("\t\tOR")
print(f"\t$ COMMAND -c /path/to/config_file.json\n")
print(f"Error: {e}")
# Execute main.
if __name__ == "__main__":
main()