add locking so that the full indexer and the updater don't step on one another

This commit is contained in:
Aaron Blankstein
2017-08-22 16:31:41 -04:00
parent 69a1139c7a
commit 119728cfb0
2 changed files with 65 additions and 10 deletions

View File

@@ -65,6 +65,7 @@ SEARCH_API_ENDPOINT_ENABLED = True
SEARCH_BLOCKCHAIN_DATA_FILE = "/var/blockstack-search/blockchain_data.json"
SEARCH_PROFILE_DATA_FILE = "/var/blockstack-search/profile_data.json"
SEARCH_LAST_INDEX_DATA_FILE = "/var/blockstack-search/last_indexed.json"
SEARCH_LOCKFILE = "/var/blockstack-search/indexer_lockfile.json"
SEARCH_BULK_INSERT_LIMIT = 1000
SEARCH_DEFAULT_LIMIT = 50
SEARCH_LUCENE_ENABLED = False

View File

@@ -23,12 +23,14 @@ This file is part of Blockstack.
along with Blockstack. If not, see <http://www.gnu.org/licenses/>.
"""
import sys, os
import sys, os, time
import tempfile
import json
from datetime import datetime
from api.config import (
SEARCH_BLOCKCHAIN_DATA_FILE, SEARCH_PROFILE_DATA_FILE,
SEARCH_LAST_INDEX_DATA_FILE)
SEARCH_LAST_INDEX_DATA_FILE, SEARCH_LOCKFILE)
from .utils import validUsername
from .utils import get_json, config_log
@@ -45,16 +47,10 @@ def fetch_namespace():
Fetch all names in a namespace that should be indexed.
Data is saved in data/ directory
"""
info_resp = proxy.getinfo()
last_block_processed = info_resp['last_block_processed']
resp = proxy.get_all_names()
with open(SEARCH_BLOCKCHAIN_DATA_FILE, 'w') as fout:
fout.write(json.dumps(resp))
with open(SEARCH_LAST_INDEX_DATA_FILE, 'w') as fout:
fout.write(json.dumps(last_block_processed))
def print_status_bar(filled, total):
pct = float(filled) / total
@@ -67,7 +63,10 @@ def update_profiles():
if not os.path.exists(SEARCH_LAST_INDEX_DATA_FILE):
return {'error' : 'No last index, you need to rebuild the whole index.'}
with open(SEARCH_LAST_INDEX_DATA_FILE, 'r') as fin:
last_block_processed = json.load(fin)
search_indexer_info = json.load(fin)
last_block_processed = search_indexer_info['last_block_height']
last_full_index = search_indexer_info['last_full_index']
info_resp = proxy.getinfo()
try:
@@ -113,10 +112,19 @@ def update_profiles():
if profile['fqu'] in names_updated:
all_profiles[ix] = updated_profiles[profile['fqu']]
if not obtain_lockfile():
return {'error' : 'Could not obtain lockfile, abandoning my update.'}
with open(SEARCH_LAST_INDEX_DATA_FILE, 'r') as fin:
search_indexer_info = json.load(fin)
if search_indexer_info['last_full_index'] != last_full_index:
return {'error' : 'Full re-index written during our update. Abandoning'}
with open(SEARCH_PROFILE_DATA_FILE, 'w') as fout:
json.dump(all_profiles, fout)
with open(SEARCH_LAST_INDEX_DATA_FILE, 'w') as fout:
json.dump(new_block_height, fout)
search_indexer_info['last_block_height'] = new_block_height
json.dump(search_indexer_info, fout)
return {'status' : True, 'message' : 'Indexed {} profiles'.format(len(names_updated))}
@@ -132,6 +140,9 @@ def fetch_profiles(max_to_fetch = None, just_test_set = False):
with open(SEARCH_BLOCKCHAIN_DATA_FILE, 'r') as fin:
all_names = json.load(file)
info_resp = proxy.getinfo()
last_block_processed = info_resp['last_block_processed']
all_profiles = []
if max_to_fetch == None:
@@ -158,8 +169,51 @@ def fetch_profiles(max_to_fetch = None, just_test_set = False):
except:
pass
attempts = 0
while not obtain_lockfile():
attempts += 1
time.sleep(5)
if attempts > 10:
print "ERROR! Could not obtain lockfile"
return
with open(SEARCH_PROFILE_DATA_FILE, 'w') as fout:
json.dump(all_profiles, fout)
with open(SEARCH_LAST_INDEX_DATA_FILE, 'w') as fout:
search_index_data = {
'last_block_height' : last_block_processed,
'last_full_index' : datetime.now().isoformat()
}
json.dump(search_index_data, fout)
def obtain_lockfile():
if os.path.exists(SEARCH_LOCKFILE):
with open(SEARCH_LOCKFILE, 'r') as fin:
pid = json.load(fin)
try:
os.kill(pid, 0)
return False # lockfile exists, pid still running.
except:
pass
# lockfile stale. unlink it
os.unlink(SEARCH_LOCKFILE)
fd, path = tempfile.mkstemp(prefix=".indexer.lock.", dir=os.path.dirname(SEARCH_LOCKFILE))
try:
with os.fdopen(fd, 'w') as fout:
json.dump(os.getpid(), fout)
os.link( path, SEARCH_LOCKFILE )
os.unlink( path )
except:
import traceback as tb; tb.print_exc()
return False
# make sure we got it
with open(SEARCH_LOCKFILE, 'r') as fin:
pid = json.load(fin)
if pid == os.getpid():
return True
print "Wrong pid : {} != {}".format(pid, os.getpid())
return False
if __name__ == "__main__":