mirror of
https://github.com/alexgo-io/stacks-puppet-node.git
synced 2026-04-28 11:46:07 +08:00
Added support for load-balancing:
-- checks if no. of zero confirmation tx on server are > MAX_PENDING_TX -- if yes then switches to another server -- goes in a round robin fashion -- for profile activations, load-balancer works slightly differently
This commit is contained in:
@@ -16,6 +16,22 @@ from config_local import MAIN_SERVER, LOAD_SERVERS
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from commontools import log
|
||||
|
||||
#-----------------------------------
|
||||
def pending_transactions(server):
|
||||
|
||||
namecoind = NamecoindServer(server, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD)
|
||||
|
||||
reply = namecoind.namecoind.listtransactions("",10000)
|
||||
|
||||
counter = 0
|
||||
|
||||
for i in reply:
|
||||
if i['confirmations'] == 0:
|
||||
counter += 1
|
||||
|
||||
|
||||
return counter
|
||||
|
||||
#-----------------------------------
|
||||
def check_address(address):
|
||||
|
||||
|
||||
@@ -18,38 +18,17 @@ log = logging.getLogger()
|
||||
from pymongo import MongoClient
|
||||
client = MongoClient()
|
||||
local_db = client['temp_db']
|
||||
expiring_users = local_db.users
|
||||
|
||||
from time import sleep
|
||||
|
||||
#-----------------------------------
|
||||
def send_update():
|
||||
|
||||
for i in expiring_users.find():
|
||||
key = i['name']
|
||||
try:
|
||||
value = json.loads(i['value'])
|
||||
|
||||
value['message'] = value['message'].replace('This OneName username','This username')
|
||||
except:
|
||||
value = i['value']
|
||||
|
||||
print key
|
||||
print value
|
||||
print '-' * 5
|
||||
|
||||
try:
|
||||
update_name(key,value)
|
||||
except Exception as e:
|
||||
print e
|
||||
sleep(5)
|
||||
|
||||
#-----------------------------------
|
||||
if __name__ == '__main__':
|
||||
|
||||
key = 'u/onename'
|
||||
key = 'u/ajackson'
|
||||
log.debug(get_server(key))
|
||||
#value = json.loads('{"next":"u/awright"}')
|
||||
#update_name(key,value)
|
||||
|
||||
#expiring_users =
|
||||
#send_update()
|
||||
@@ -8,9 +8,10 @@
|
||||
import os
|
||||
import json
|
||||
|
||||
from config import MONGODB_URI, LOAD_BALANCER, OLD_DB
|
||||
from config import MONGODB_URI, OLD_DB
|
||||
|
||||
from coinrpc import namecoind
|
||||
from coinrpc import namecoind, NamecoindServer
|
||||
from config import NAMECOIND_SERVER, NAMECOIND_PORT, NAMECOIND_USER, NAMECOIND_PASSWD
|
||||
|
||||
from blockdata.register import process_user
|
||||
|
||||
@@ -40,11 +41,41 @@ old_client = MongoClient(OLD_DB)
|
||||
old_db = old_client.get_default_database()
|
||||
old_users = old_db.user
|
||||
|
||||
local_client = MongoClient()
|
||||
local_db = local_client['namecoin']
|
||||
local_db = MongoClient()['namecoin']
|
||||
register_queue = local_db.queue
|
||||
|
||||
from config_local import problem_users, banned_users
|
||||
#
|
||||
|
||||
load_servers = ['named3','named4','named6','named7','named8']
|
||||
|
||||
current_server = 0
|
||||
|
||||
MAX_PENDING_TX = 50
|
||||
|
||||
from blockdata.namecoind_cluster import pending_transactions
|
||||
|
||||
#-----------------------------------
|
||||
def load_balance():
|
||||
|
||||
global current_server
|
||||
|
||||
print "current server: %s" %load_servers[current_server]
|
||||
|
||||
while(1):
|
||||
if pending_transactions(load_servers[current_server]) > MAX_PENDING_TX:
|
||||
|
||||
if current_server == len(load_servers) - 1:
|
||||
current_server = 0
|
||||
else:
|
||||
current_server += 1
|
||||
|
||||
print "load balancing: switching to %s" %load_servers[current_server]
|
||||
sleep(30)
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
#-----------------------------------
|
||||
def process_profile(username,profile):
|
||||
|
||||
@@ -52,14 +83,13 @@ def process_profile(username,profile):
|
||||
return
|
||||
|
||||
try:
|
||||
process_user(username,profile)
|
||||
process_user(username,profile,load_servers[current_server])
|
||||
except Exception as e:
|
||||
print e
|
||||
|
||||
#-----------------------------------
|
||||
def profile_on_blockchain(username,DB_profile):
|
||||
|
||||
sleep(2)
|
||||
try:
|
||||
block_profile = namecoind.get_full_profile('u/' + username)
|
||||
except:
|
||||
@@ -106,6 +136,12 @@ def register_users():
|
||||
|
||||
counter += 1
|
||||
|
||||
check_queue = register_queue.find_one({"key":'u/' + user['username']})
|
||||
|
||||
if check_queue is not None:
|
||||
print "Already in queue"
|
||||
continue
|
||||
|
||||
if 'dispatched' in new_user and new_user['dispatched'] is False:
|
||||
|
||||
if datetime.datetime.utcnow() - new_user['created_at'] > datetime.timedelta(minutes=5):
|
||||
@@ -114,12 +150,11 @@ def register_users():
|
||||
process_profile(user['username'],user['profile'])
|
||||
new_user['dispatched'] = True
|
||||
registrations.save(new_user)
|
||||
sleep(20)
|
||||
else:
|
||||
print "New user (within 15 mins): " + user['username']
|
||||
|
||||
elif 'dispatched' in new_user and new_user['dispatched'] is True:
|
||||
|
||||
|
||||
try:
|
||||
block_profile = namecoind.get_full_profile('u/' + user['username'])
|
||||
except:
|
||||
@@ -138,6 +173,9 @@ def register_users():
|
||||
print "Random: " + user['username']
|
||||
#registrations.remove(new_user)
|
||||
|
||||
if counter % 5 == 0:
|
||||
load_balance()
|
||||
|
||||
print counter
|
||||
|
||||
#-----------------------------------
|
||||
@@ -192,8 +230,6 @@ def check_transfer():
|
||||
def update_users():
|
||||
|
||||
for new_user in updates.find():
|
||||
|
||||
sleep(1)
|
||||
|
||||
user_id = new_user['user_id']
|
||||
user = users.find_one({"_id":user_id})
|
||||
@@ -275,14 +311,22 @@ def cleanup_db():
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
print "----------"
|
||||
|
||||
#-----------------------------------
|
||||
def get_pending_state():
|
||||
|
||||
print "Pending registrations: %s" %registrations.count()
|
||||
print "Pending updates: %s" %updates.count()
|
||||
print "Pending transfers: %s" %transfer.count()
|
||||
|
||||
#-----------------------------------
|
||||
if __name__ == '__main__':
|
||||
|
||||
#check_transfer()
|
||||
update_users()
|
||||
register_users()
|
||||
#update_users()
|
||||
#register_users()
|
||||
|
||||
cleanup_db()
|
||||
#cleanup_db()
|
||||
|
||||
get_pending_state()
|
||||
@@ -1,7 +1,6 @@
|
||||
while true; do
|
||||
python register_daemon.py;
|
||||
python -m blockdata.activate;
|
||||
echo "sleeping";
|
||||
echo "-------------------------";
|
||||
sleep 1200;
|
||||
sleep 100;
|
||||
done
|
||||
@@ -15,7 +15,9 @@ from encrypt import bip38_decrypt
|
||||
from coinkit import BitcoinKeypair, NamecoinKeypair
|
||||
|
||||
from commontools import log
|
||||
from coinrpc import bitcoind
|
||||
from coinrpc.bitcoind_server import BitcoindServer
|
||||
from config import BITCOIND_SERVER, BITCOIND_PORT, BITCOIND_USER, BITCOIND_PASSWD, BITCOIND_USE_HTTPS, BITCOIND_WALLET_PASSPHRASE
|
||||
bitcoind = BitcoindServer(BITCOIND_SERVER, BITCOIND_PORT, BITCOIND_USER, BITCOIND_PASSWD, BITCOIND_USE_HTTPS, BITCOIND_WALLET_PASSPHRASE)
|
||||
|
||||
#-----------------------------------
|
||||
from pymongo import MongoClient
|
||||
|
||||
Reference in New Issue
Block a user