mirror of
https://github.com/zhigang1992/graphql-engine.git
synced 2026-05-25 18:32:23 +08:00
committed by
Vamshi Surabhi
parent
e952495235
commit
2aa66ca6f5
@@ -39,18 +39,26 @@ class GQLWsClient():
|
||||
self.ws_queue.queue.clear()
|
||||
self.ws_id_query_queues = dict()
|
||||
self.ws_active_query_ids = set()
|
||||
self._ws = websocket.WebSocketApp(self.ws_url.geturl(), on_message=self._on_message, on_close=self._on_close)
|
||||
|
||||
self.connected_event = threading.Event()
|
||||
self.init_done = False
|
||||
self.is_closing = False
|
||||
self.remote_closed = False
|
||||
|
||||
self._ws = websocket.WebSocketApp(self.ws_url.geturl(),
|
||||
on_open=self._on_open, on_message=self._on_message, on_close=self._on_close)
|
||||
self.wst = threading.Thread(target=self._ws.run_forever)
|
||||
self.wst.daemon = True
|
||||
self.wst.start()
|
||||
self.remote_closed = False
|
||||
self.connected = False
|
||||
self.init_done = False
|
||||
|
||||
def recreate_conn(self):
|
||||
self.teardown()
|
||||
self.create_conn()
|
||||
|
||||
def wait_for_connection(self, timeout=10):
|
||||
assert not self.is_closing
|
||||
assert self.connected_event.wait(timeout=timeout)
|
||||
|
||||
def get_ws_event(self, timeout):
|
||||
return self.ws_queue.get(timeout=timeout)
|
||||
|
||||
@@ -61,9 +69,7 @@ class GQLWsClient():
|
||||
return self.ws_id_query_queues[query_id].get(timeout=timeout)
|
||||
|
||||
def send(self, frame):
|
||||
if not self.connected:
|
||||
self.recreate_conn()
|
||||
time.sleep(1)
|
||||
self.wait_for_connection()
|
||||
if frame.get('type') == 'stop':
|
||||
self.ws_active_query_ids.discard( frame.get('id') )
|
||||
elif frame.get('type') == 'start' and 'id' in frame:
|
||||
@@ -118,7 +124,8 @@ class GQLWsClient():
|
||||
yield self.get_ws_query_event(query_id, timeout)
|
||||
|
||||
def _on_open(self):
|
||||
self.connected = True
|
||||
if not self.is_closing:
|
||||
self.connected_event.set()
|
||||
|
||||
def _on_message(self, message):
|
||||
json_msg = json.loads(message)
|
||||
@@ -131,18 +138,16 @@ class GQLWsClient():
|
||||
self.ws_id_query_queues[json_msg['id']] = queue.Queue(maxsize=-1)
|
||||
#Put event in the correponding query_queue
|
||||
self.ws_id_query_queues[query_id].put(json_msg)
|
||||
elif json_msg['type'] == 'ka':
|
||||
self.connected = True
|
||||
else:
|
||||
elif json_msg['type'] != 'ka':
|
||||
#Put event in the main queue
|
||||
self.ws_queue.put(json_msg)
|
||||
|
||||
def _on_close(self):
|
||||
self.remote_closed = True
|
||||
self.connected = False
|
||||
self.init_done = False
|
||||
|
||||
def teardown(self):
|
||||
self.is_closing = True
|
||||
if not self.remote_closed:
|
||||
self._ws.close()
|
||||
self.wst.join()
|
||||
|
||||
@@ -206,7 +206,6 @@ def validate_gql_ws_q(hge_ctx, endpoint, query, headers, exp_http_response, retr
|
||||
#Got query complete before payload. Retry once more
|
||||
print("Got query complete before getting query response payload. Retrying")
|
||||
ws_client.recreate_conn()
|
||||
time.sleep(3)
|
||||
return validate_gql_ws_q(hge_ctx, query, headers, exp_http_response, False)
|
||||
else:
|
||||
assert resp['type'] in ['data', 'error'], resp
|
||||
|
||||
Reference in New Issue
Block a user