mirror of
https://github.com/zhigang1992/mitmproxy.git
synced 2026-03-28 01:44:17 +08:00
addons.script: convert to test.taddons
This commit is contained in:
@@ -1,18 +1,17 @@
|
||||
import traceback
|
||||
|
||||
import sys
|
||||
import time
|
||||
import re
|
||||
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
import re
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import options
|
||||
from mitmproxy import proxy
|
||||
from mitmproxy.addons import script
|
||||
from mitmproxy import master
|
||||
|
||||
from .. import mastertest
|
||||
from .. import tutils as ttutils
|
||||
|
||||
|
||||
@@ -64,73 +63,68 @@ def test_load_script():
|
||||
assert ns.start
|
||||
|
||||
|
||||
class TestScript(mastertest.MasterTest):
|
||||
class TestScript:
|
||||
def test_simple(self):
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer())
|
||||
sc = script.Script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/recorder.py"
|
||||
with taddons.context() as tctx:
|
||||
sc = script.Script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/recorder.py"
|
||||
)
|
||||
)
|
||||
)
|
||||
m.addons.add(sc)
|
||||
assert sc.ns.call_log == [
|
||||
("solo", "start", (), {}),
|
||||
("solo", "configure", (o, o.keys()), {})
|
||||
]
|
||||
sc.load_script()
|
||||
assert sc.ns.call_log == [
|
||||
("solo", "start", (), {}),
|
||||
]
|
||||
|
||||
sc.ns.call_log = []
|
||||
f = tflow.tflow(resp=True)
|
||||
m.request(f)
|
||||
sc.ns.call_log = []
|
||||
f = tflow.tflow(resp=True)
|
||||
sc.request(f)
|
||||
|
||||
recf = sc.ns.call_log[0]
|
||||
assert recf[1] == "request"
|
||||
recf = sc.ns.call_log[0]
|
||||
assert recf[1] == "request"
|
||||
|
||||
def test_reload(self):
|
||||
o = options.Options()
|
||||
m = mastertest.RecordingMaster(o, proxy.DummyServer())
|
||||
with tutils.tmpdir():
|
||||
with open("foo.py", "w"):
|
||||
pass
|
||||
sc = script.Script("foo.py")
|
||||
m.addons.add(sc)
|
||||
|
||||
for _ in range(100):
|
||||
with open("foo.py", "a") as f:
|
||||
f.write(".")
|
||||
m.addons.invoke_with_context(sc, "tick")
|
||||
time.sleep(0.1)
|
||||
if m.event_log:
|
||||
return
|
||||
raise AssertionError("Change event not detected.")
|
||||
with taddons.context() as tctx:
|
||||
with tutils.tmpdir():
|
||||
with open("foo.py", "w"):
|
||||
pass
|
||||
sc = script.Script("foo.py")
|
||||
tctx.configure(sc)
|
||||
for _ in range(100):
|
||||
with open("foo.py", "a") as f:
|
||||
f.write(".")
|
||||
sc.tick()
|
||||
time.sleep(0.1)
|
||||
if tctx.master.event_log:
|
||||
return
|
||||
raise AssertionError("Change event not detected.")
|
||||
|
||||
def test_exception(self):
|
||||
o = options.Options()
|
||||
m = mastertest.RecordingMaster(o, proxy.DummyServer())
|
||||
sc = script.Script(
|
||||
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
|
||||
)
|
||||
m.addons.add(sc)
|
||||
f = tflow.tflow(resp=True)
|
||||
m.request(f)
|
||||
assert m.event_log[0][0] == "error"
|
||||
assert len(m.event_log[0][1].splitlines()) == 6
|
||||
assert re.search('addonscripts/error.py", line \d+, in request', m.event_log[0][1])
|
||||
assert re.search('addonscripts/error.py", line \d+, in mkerr', m.event_log[0][1])
|
||||
assert m.event_log[0][1].endswith("ValueError: Error!\n")
|
||||
with taddons.context() as tctx:
|
||||
sc = script.Script(
|
||||
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
|
||||
)
|
||||
sc.start()
|
||||
f = tflow.tflow(resp=True)
|
||||
sc.request(f)
|
||||
assert tctx.master.event_log[0][0] == "error"
|
||||
assert len(tctx.master.event_log[0][1].splitlines()) == 6
|
||||
assert re.search('addonscripts/error.py", line \d+, in request', tctx.master.event_log[0][1])
|
||||
assert re.search('addonscripts/error.py", line \d+, in mkerr', tctx.master.event_log[0][1])
|
||||
assert tctx.master.event_log[0][1].endswith("ValueError: Error!\n")
|
||||
|
||||
def test_addon(self):
|
||||
o = options.Options()
|
||||
m = master.Master(o, proxy.DummyServer())
|
||||
sc = script.Script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/addon.py"
|
||||
with taddons.context() as tctx:
|
||||
sc = script.Script(
|
||||
tutils.test_data.path(
|
||||
"mitmproxy/data/addonscripts/addon.py"
|
||||
)
|
||||
)
|
||||
)
|
||||
m.addons.add(sc)
|
||||
assert sc.ns.event_log == [
|
||||
'scriptstart', 'addonstart', 'addonconfigure'
|
||||
]
|
||||
sc.start()
|
||||
tctx.configure(sc)
|
||||
assert sc.ns.event_log == [
|
||||
'scriptstart', 'addonstart', 'addonconfigure'
|
||||
]
|
||||
|
||||
|
||||
class TestCutTraceback:
|
||||
@@ -151,7 +145,7 @@ class TestCutTraceback:
|
||||
assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb))
|
||||
|
||||
|
||||
class TestScriptLoader(mastertest.MasterTest):
|
||||
class TestScriptLoader:
|
||||
def test_run_once(self):
|
||||
o = options.Options(scripts=[])
|
||||
m = master.Master(o, proxy.DummyServer())
|
||||
@@ -199,44 +193,48 @@ class TestScriptLoader(mastertest.MasterTest):
|
||||
|
||||
def test_order(self):
|
||||
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
|
||||
|
||||
o = options.Options(
|
||||
scripts = [
|
||||
"%s %s" % (rec, "a"),
|
||||
"%s %s" % (rec, "b"),
|
||||
"%s %s" % (rec, "c"),
|
||||
]
|
||||
)
|
||||
m = mastertest.RecordingMaster(o, proxy.DummyServer())
|
||||
sc = script.ScriptLoader()
|
||||
m.addons.add(sc)
|
||||
with taddons.context() as tctx:
|
||||
tctx.master.addons.add(sc)
|
||||
tctx.configure(
|
||||
sc,
|
||||
scripts = [
|
||||
"%s %s" % (rec, "a"),
|
||||
"%s %s" % (rec, "b"),
|
||||
"%s %s" % (rec, "c"),
|
||||
]
|
||||
)
|
||||
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
|
||||
assert debug == [
|
||||
('debug', 'a start'), ('debug', 'a configure'),
|
||||
('debug', 'b start'), ('debug', 'b configure'),
|
||||
('debug', 'c start'), ('debug', 'c configure')
|
||||
]
|
||||
tctx.master.event_log = []
|
||||
tctx.configure(
|
||||
sc,
|
||||
scripts = [
|
||||
"%s %s" % (rec, "c"),
|
||||
"%s %s" % (rec, "a"),
|
||||
"%s %s" % (rec, "b"),
|
||||
]
|
||||
)
|
||||
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
|
||||
# No events, only order has changed
|
||||
assert debug == []
|
||||
|
||||
debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"]
|
||||
assert debug == [
|
||||
('debug', 'a start'), ('debug', 'a configure'),
|
||||
('debug', 'b start'), ('debug', 'b configure'),
|
||||
('debug', 'c start'), ('debug', 'c configure')
|
||||
]
|
||||
m.event_log[:] = []
|
||||
|
||||
o.scripts = [
|
||||
"%s %s" % (rec, "c"),
|
||||
"%s %s" % (rec, "a"),
|
||||
"%s %s" % (rec, "b"),
|
||||
]
|
||||
debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"]
|
||||
# No events, only order has changed
|
||||
assert debug == []
|
||||
|
||||
o.scripts = [
|
||||
"%s %s" % (rec, "x"),
|
||||
"%s %s" % (rec, "a"),
|
||||
]
|
||||
debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"]
|
||||
assert debug == [
|
||||
('debug', 'c done'),
|
||||
('debug', 'b done'),
|
||||
('debug', 'x start'),
|
||||
('debug', 'x configure'),
|
||||
('debug', 'a configure'),
|
||||
]
|
||||
tctx.master.event_log = []
|
||||
tctx.configure(
|
||||
sc,
|
||||
scripts = [
|
||||
"%s %s" % (rec, "x"),
|
||||
"%s %s" % (rec, "a"),
|
||||
]
|
||||
)
|
||||
debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]
|
||||
assert debug == [
|
||||
('debug', 'c done'),
|
||||
('debug', 'b done'),
|
||||
('debug', 'x start'),
|
||||
('debug', 'x configure'),
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user