fix script tests

This commit is contained in:
Maximilian Hils
2014-08-18 01:47:39 +02:00
parent 5b7e19a77e
commit 94fbf066f7
3 changed files with 26 additions and 17 deletions

View File

@@ -17,6 +17,7 @@ class ClientConnection(tcp.BaseHandler, stateobject.SimpleStateObject):
self.rfile = None
self.address = None
self.clientcert = None
self.ssl_established = None
self.timestamp_start = utils.timestamp()
self.timestamp_end = None
@@ -30,6 +31,7 @@ class ClientConnection(tcp.BaseHandler, stateobject.SimpleStateObject):
)
_stateobject_attributes = dict(
ssl_established=bool,
timestamp_start=float,
timestamp_end=float,
timestamp_ssl_setup=float

View File

@@ -120,7 +120,7 @@ def _handle_concurrent_reply(fn, o, *args, **kwargs):
def run():
fn(*args, **kwargs)
o.reply()
o.reply() # If the script did not call .reply(), we have to do it now.
threading.Thread(target=run, name="ScriptThread").start()

View File

@@ -73,7 +73,7 @@ class TestScript:
r2.reply()
# Two instantiations
assert m.call_count == 2
assert m.call_count == 0 # No calls yet.
assert (time.time() - t_start) < 0.09
def test_concurrent2(self):
@@ -81,22 +81,29 @@ class TestScript:
fm = flow.FlowMaster(None, s)
s = script.Script(tutils.test_data.path("scripts/concurrent_decorator.py"), fm)
s.load()
f = tutils.tflow_full()
f.error = tutils.terr(f.request)
f.reply = f.request.reply
m = mock.Mock()
with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
t_start = time.time()
s.run("clientconnect", f)
s.run("serverconnect", f)
s.run("response", f)
s.run("error", f)
s.run("clientdisconnect", f)
while (time.time() - t_start) < 1 and m.call_count <= 5:
if m.call_count == 5:
return
time.sleep(0.001)
assert False
class Dummy:
def __init__(self):
self.response = self
self.error = self
self.reply = m
t_start = time.time()
for hook in ("clientconnect",
"serverconnect",
"response",
"error",
"clientconnect"):
d = Dummy()
assert s.run(hook, d)[0]
d.reply()
while (time.time() - t_start) < 5 and m.call_count <= 5:
if m.call_count == 5:
return
time.sleep(0.001)
assert False
def test_concurrent_err(self):
s = flow.State()