From 4fd0d45e540902eeecd69670677b023ce9b59a6b Mon Sep 17 00:00:00 2001 From: Jude Nelson Date: Mon, 29 Jan 2018 19:04:59 -0500 Subject: [PATCH] add queue implementation --- blockstack/lib/queue.py | 219 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 blockstack/lib/queue.py diff --git a/blockstack/lib/queue.py b/blockstack/lib/queue.py new file mode 100644 index 000000000..d1ae1007b --- /dev/null +++ b/blockstack/lib/queue.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +""" + Blockstack + ~~~~~ + copyright: (c) 2014-2015 by Halfmoon Labs, Inc. + copyright: (c) 2016-2018 by Blockstack.org + + This file is part of Blockstack. + + Blockstack is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Blockstack is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + You should have received a copy of the GNU General Public License + along with Blockstack. If not, see . +""" + +import sqlite3 +import traceback +import os +import json +import base64 +import time +import random +import threading + +from util import db_query_execute + +QUEUE_SQL = """ +CREATE TABLE IF NOT EXISTS queue( name STRING NOT NULL, + queue_id STRING NOT NULL, + data NOT NULL ); +""" + +from virtualchain import get_logger + +log = get_logger() + +DB_SERIALIZE_LOCK = threading.Lock() + +def queuedb_create(path): + """ + Create a sqlite3 db at the given path. + Create all the tables and indexes we need. + Raises if the table already exists + """ + + global QUEUE_SQL, ERROR_SQL + + lines = [l + ";" for l in QUEUE_SQL.split(";")] + con = sqlite3.connect( path, isolation_level=None ) + + for line in lines: + con.execute(line) + + con.commit() + con.row_factory = queuedb_row_factory + return con + + +def queuedb_open(path): + """ + Open a connection to the given database. + hack: At most one queue-open can happen globally. + """ + with DB_SERIALIZE_LOCK: + return queuedb_create( path ) + + +def queuedb_row_factory(cursor, row): + """ + Dict row factory + """ + d = {} + for idx, col in enumerate(cursor.description): + d[col[0]] = row[idx] + + return d + + +def queuedb_query_execute(cur, query, values): + """ + Execute a query. If it fails, abort the program. + Gracefully handle lock contention by timing out. + """ + return db_query_execute(cur, query, values) + + +def queuedb_find(path, queue_id, name, offset=None, limit=None): + """ + Find a record by name and queue ID. + Return the rows on success (empty list if not found) + Raise on error + """ + return queuedb_findall(path, queue_id, name=name, offset=offset, limit=limit) + + +def queuedb_findall(path, queue_id, name=None, offset=None, limit=None): + """ + Get all queued entries for a queue and a name. + If name is None, then find all queue entries + + Return the rows on success (empty list if not found) + Raise on error + """ + sql = "SELECT * FROM queue WHERE queue_id = ? ORDER BY rowid ASC" + args = (queue_id,) + + if name: + sql += ' AND name = ?' + args += (name,) + + if limit: + sql += ' LIMIT ?' + args += (limit,) + + if offset: + sql += ' OFFSET ?' + args += (offset,) + + sql += ';' + + db = queuedb_open(path) + if db is None: + raise Exception("Failed to open %s" % path) + + cur = db.cursor() + rows = queuedb_query_execute(cur, sql, args) + + count = 0 + ret = [] + for row in rows: + dat = {} + dat.update(row) + ret.append(dat) + + db.close() + return ret + + +def queuedb_append(path, queue_id, name, data): + """ + Append an element to the back of the queue. + Return True on success + Raise on error + """ + sql = "INSERT INTO queue VALUES (?,?,?);" + args = (name, queue_id, data) + + db = queuedb_open(path) + if db is None: + raise Exception("Failed to open %s" % path) + + cur = db.cursor() + res = queuedb_query_execute(cur, sql, args) + + db.commit() + db.close() + return True + + +def queuedb_remove(path, entry, cur=None): + """ + Remove an element from a queue. + Return True on success + Raise on error + """ + sql = "DELETE FROM queue WHERE queue_id = ? AND name = ?;" + args = (entry['queue_id'], entry['name']) + + cursor = None + if cur: + cursor = cur + else: + db = queuedb_open(path) + if db is None: + raise Exception("Failed to open %s" % path) + + cursor = db.cursor() + + res = queuedb_query_execute(cursor, sql, args) + + if cur is None: + db.commit() + db.close() + + return True + + +def queuedb_removeall(path, entries): + """ + Remove all entries from a queue + """ + db = queuedb_open(path) + if db is None: + raise Exception("Failed to open %s" % path) + + cursor = db.cursor() + queuedb_query_execute(cursor, 'BEGIN', ()) + + for entry in entries: + queuedb_remove(path, entry, cur=cursor) + + queuedb_query_execute(cursor, 'END', ()) + db.close() + + return True + + +def queuedb_peek(path, queue_id, name): + """ + Find the oldest instance of a named record in a queue. + """ + return queuedb_find( queue_id, name, limit=1, path=path )