further simplify argument parsing, adjust tests

This commit is contained in:
Maximilian Hils
2013-08-17 13:24:49 +02:00
parent f6f66c0daa
commit e6a1e959a0
8 changed files with 123 additions and 195 deletions

View File

@@ -97,7 +97,7 @@ def parse_setheader(s):
"""
return _parse_hook(s)
def common_options(parser):
def add_common_arguments(parser):
parser.add_argument(
"--version",
action='version', version=version.NAMEVERSION

View File

@@ -1,7 +1,7 @@
import mailcap, mimetypes, tempfile, os, subprocess, glob, time, shlex, stat
import os.path, sys, weakref
import urwid
from .. import controller, utils, flow
from .. import controller, utils, flow, cmdline
import flowlist, flowview, help, common, grideditor, palettes, contentview, flowdetailview
EVENTLOG_SIZE = 500
@@ -317,50 +317,35 @@ class ConsoleState(flow.State):
self.set_focus(self.focus)
return ret
class Options(object):
attributes = [
"app",
"app_domain",
"app_ip",
"anticache",
"anticomp",
"client_replay",
"debug",
"eventlog",
"keepserving",
"kill",
"intercept",
"no_server",
"refresh_server_playback",
"rfile",
"script",
"showhost",
"replacements",
"rheaders",
"setheaders",
"server_replay",
"stickycookie",
"stickyauth",
"verbosity",
"wfile",
"nopop",
"palette",
]
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
for i in self.attributes:
if not hasattr(self, i):
setattr(self, i, None)
#begin nocover
class ConsoleMaster(flow.FlowMaster):
palette = []
@classmethod
def add_arguments(cls, parser):
cmdline.add_common_arguments(parser)
parser.add_argument(
"--debug", default=False,
action="store_true", dest="debug"
)
parser.add_argument(
"--palette", type=str, default="dark",
action="store", dest="palette",
help="Select color palette: " + ", ".join(palettes.palettes.keys())
)
group = parser.add_argument_group(
"Filters",
"See help in mitmproxy for filter expression syntax."
)
group.add_argument(
"-i", "--intercept", type=str, default=None,
action="store", dest="intercept",
help = "Intercept filter expression."
)
def __init__(self, server, options):
flow.FlowMaster.__init__(self, server, ConsoleState())
self.looptime = 0

View File

@@ -1,44 +1,9 @@
import sys, os
import sys, os, argparse
import netlib.utils
import flow, filt, utils
import flow, filt, utils, cmdline
class DumpError(Exception): pass
class Options(object):
attributes = [
"app",
"app_domain",
"app_ip",
"anticache",
"anticomp",
"client_replay",
"eventlog",
"keepserving",
"kill",
"no_server",
"nopop",
"refresh_server_playback",
"replacements",
"rfile",
"rheaders",
"setheaders",
"server_replay",
"scripts",
"showhost",
"stickycookie",
"stickyauth",
"verbosity",
"wfile",
]
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
for i in self.attributes:
if not hasattr(self, i):
setattr(self, i, None)
def str_response(resp):
r = "%s %s"%(resp.code, resp.msg)
if resp.is_replay():
@@ -58,6 +23,18 @@ def str_request(req, showhost):
class DumpMaster(flow.FlowMaster):
@classmethod
def add_arguments(cls,parser):
cmdline.add_common_arguments(parser)
parser.add_argument(
"--keepserving",
action="store_true", dest="keepserving", default=False,
help="Continue serving after client playback or file read. We exit by default."
)
parser.add_argument(
'filt', nargs=argparse.REMAINDER
)
def __init__(self, server, options, outfile=sys.stdout):
flow.FlowMaster.__init__(self, server, flow.State())
self.outfile = outfile

View File

@@ -4,29 +4,16 @@ from libmproxy import proxy, cmdline, version, dump
if __name__ == '__main__':
parser = argparse.ArgumentParser(usage = "%(prog)s [options] [filter]")
cmdline.common_options(parser)
# Add arguments unique to mitmdump
parser.add_argument(
"--keepserving",
action="store_true", dest="keepserving", default=False,
help="Continue serving after client playback or file read. We exit by default."
)
parser.add_argument(
'filt', nargs=argparse.REMAINDER
)
dump.DumpMaster.add_arguments(parser)
options = parser.parse_args()
server = proxy.get_server(parser,options)
m = dump.DumpMaster(server, options)
def cleankill(*args, **kwargs):
m.shutdown()
signal.signal(signal.SIGTERM, cleankill)
try:
m = dump.DumpMaster(server, options)
def cleankill(*args, **kwargs):
m.shutdown()
signal.signal(signal.SIGTERM, cleankill)
m.run()
except dump.DumpError, e:
print >> sys.stderr, "mitmdump:", e
sys.exit(1)
except KeyboardInterrupt:
pass

View File

@@ -4,27 +4,7 @@ from libmproxy import proxy, cmdline, version, console
if __name__ == '__main__':
parser = argparse.ArgumentParser(usage = "%(prog)s [options]")
cmdline.common_options(parser)
# Add arguments unique to mitmproxy
parser.add_argument(
"--debug", default=False
action="store_true", dest="debug"
)
parser.add_argument(
"--palette", type=str, default="dark",
action="store", dest="palette",
help="Select color palette: " + ", ".join(console.palettes.palettes.keys())
)
group = parser.add_argument_group(
"Filters",
"See help in mitmproxy for filter expression syntax."
)
group.add_argument(
"-i", "--intercept", type=str, default=None,
action="store", dest="intercept",
help = "Intercept filter expression."
)
console.ConsoleMaster.add_arguments(parser)
options = parser.parse_args()
server = proxy.get_server(parser,options)

View File

@@ -4,6 +4,15 @@ import tutils
import os.path
class MockParser(argparse.ArgumentParser):
"""
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
def error(self, message):
raise Exception(message)
def test_parse_replace_hook():
x = cmdline.parse_replace_hook("/foo/bar/voing")
assert x == ("foo", "bar", "voing")
@@ -14,11 +23,6 @@ def test_parse_replace_hook():
x = cmdline.parse_replace_hook("/bar/voing")
assert x == (".*", "bar", "voing")
tutils.raises(
cmdline.ParseException,
cmdline.parse_replace_hook,
"/foo"
)
tutils.raises(
"replacement regex",
cmdline.parse_replace_hook,
@@ -47,66 +51,54 @@ def test_shlex():
"""
absfilepath = os.path.normcase(os.path.abspath(__file__))
parser = argparse.ArgumentParser()
cmdline.common_options(parser)
parser = MockParser()
cmdline.add_common_arguments(parser)
opts = parser.parse_args(args=["-s",absfilepath])
assert os.path.isfile(opts.scripts[0][0])
def test_common():
parser = argparse.ArgumentParser()
cmdline.common_options(parser)
parser = MockParser()
cmdline.add_common_arguments(parser)
opts = parser.parse_args(args=[])
assert cmdline.get_common_options(opts)
opts = parser.parse_args(args=["-t","foo","-u","foo"])
opts.stickycookie_filt = "foo"
opts.stickyauth_filt = "foo"
v = cmdline.get_common_options(opts)
assert v["stickycookie"] == "foo"
assert v["stickyauth"] == "foo"
assert opts.stickycookie == "foo"
assert opts.stickyauth == "foo"
opts.setheader = ["/foo/bar/voing"]
v = cmdline.get_common_options(opts)
assert v["setheaders"] == [("foo", "bar", "voing")]
opts = parser.parse_args(args=["--setheader","/foo/bar/voing"])
assert opts.setheaders == [("foo", "bar", "voing")]
opts.setheader = ["//"]
tutils.raises(
"empty clause",
cmdline.get_common_options,
opts
parser.parse_args,
["--setheader","//"]
)
opts.setheader = []
opts.replace = ["/foo/bar/voing"]
v = cmdline.get_common_options(opts)
assert v["replacements"] == [("foo", "bar", "voing")]
opts = parser.parse_args(args=["--replace","/foo/bar/voing"])
assert opts.replacements == [("foo", "bar", "voing")]
opts.replace = ["//"]
tutils.raises(
"empty clause",
cmdline.get_common_options,
opts
parser.parse_args,
["--replace","//"]
)
opts.replace = []
opts.replace_file = [("/foo/bar/nonexistent")]
tutils.raises(
"could not read replace file",
cmdline.get_common_options,
opts
parser.parse_args,
["--replace-from-file","/foo/bar/nonexistent"]
)
opts.replace_file = [("/~/bar/nonexistent")]
tutils.raises(
"filter pattern",
cmdline.get_common_options,
opts
parser.parse_args,
["--replace-from-file","/~/bar/nonexistent"]
)
p = tutils.test_data.path("data/replace")
opts.replace_file = [("/foo/bar/%s"%p)]
v = cmdline.get_common_options(opts)["replacements"]
assert len(v) == 1
assert v[0][2].strip() == "replacecontents"
opts = parser.parse_args(args=["--replace-from-file",("/foo/bar/%s"%p)])
assert len(opts.replacements) == 1
assert opts.replacements[0][2].strip() == "replacecontents"

View File

@@ -1,4 +1,4 @@
import os
import os, argparse
from cStringIO import StringIO
from libmproxy import dump, flow, proxy
import tutils
@@ -17,6 +17,15 @@ def test_strfuncs():
assert "replay" in dump.str_request(t, False)
assert "replay" in dump.str_request(t, True)
parser = argparse.ArgumentParser()
dump.DumpMaster.add_arguments(parser)
def _options(**opts):
o = parser.parse_args([])
for k,v in opts.items():
if not hasattr(o, k):
raise RuntimeError("Cannot set unknown option property")
setattr(o, k, v)
return o
class TestDumpMaster:
def _cycle(self, m, content):
@@ -37,10 +46,10 @@ class TestDumpMaster:
m.handle_clientdisconnect(cd)
return f
def _dummy_cycle(self, n, filt, content, **options):
def _dummy_cycle(self, n, content, **options):
cs = StringIO()
o = dump.Options(**options)
m = dump.DumpMaster(None, o, filt, outfile=cs)
o = _options(**options)
m = dump.DumpMaster(None, o, outfile=cs)
for i in range(n):
self._cycle(m, content)
m.shutdown()
@@ -56,8 +65,8 @@ class TestDumpMaster:
def test_error(self):
cs = StringIO()
o = dump.Options(verbosity=1)
m = dump.DumpMaster(None, o, None, outfile=cs)
o = _options(verbosity=1)
m = dump.DumpMaster(None, o, outfile=cs)
f = tutils.tflow_err()
m.handle_request(f.request)
assert m.handle_error(f.error)
@@ -66,90 +75,89 @@ class TestDumpMaster:
def test_replay(self):
cs = StringIO()
o = dump.Options(server_replay="nonexistent", kill=True)
tutils.raises(dump.DumpError, dump.DumpMaster, None, o, None, outfile=cs)
o = _options(server_replay="nonexistent", kill=True)
tutils.raises(dump.DumpError, dump.DumpMaster, None, o, outfile=cs)
with tutils.tmpdir() as t:
p = os.path.join(t, "rep")
self._flowfile(p)
o = dump.Options(server_replay=p, kill=True)
m = dump.DumpMaster(None, o, None, outfile=cs)
o = _options(server_replay=p, kill=True)
m = dump.DumpMaster(None, o, outfile=cs)
self._cycle(m, "content")
self._cycle(m, "content")
o = dump.Options(server_replay=p, kill=False)
m = dump.DumpMaster(None, o, None, outfile=cs)
o = _options(server_replay=p, kill=False)
m = dump.DumpMaster(None, o, outfile=cs)
self._cycle(m, "nonexistent")
o = dump.Options(client_replay=p, kill=False)
m = dump.DumpMaster(None, o, None, outfile=cs)
o = _options(client_replay=p, kill=False)
m = dump.DumpMaster(None, o, outfile=cs)
def test_read(self):
with tutils.tmpdir() as t:
p = os.path.join(t, "read")
self._flowfile(p)
assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p)
assert "GET" in self._dummy_cycle(0, "", verbosity=1, rfile=p)
tutils.raises(
dump.DumpError, self._dummy_cycle,
0, None, "", verbosity=1, rfile="/nonexistent"
0, "", verbosity=1, rfile="/nonexistent"
)
# We now just ignore errors
self._dummy_cycle(0, None, "", verbosity=1, rfile=tutils.test_data.path("test_dump.py"))
self._dummy_cycle(0, "", verbosity=1, rfile=tutils.test_data.path("test_dump.py"))
def test_options(self):
o = dump.Options(verbosity = 2)
o = _options(verbosity = 2)
assert o.verbosity == 2
def test_filter(self):
assert not "GET" in self._dummy_cycle(1, "~u foo", "", verbosity=1)
assert not "GET" in self._dummy_cycle(1, "", filt=["~u","foo"], verbosity=1)
def test_app(self):
o = dump.Options(app=True)
o = _options(app=True)
s = mock.MagicMock()
m = dump.DumpMaster(s, o, None)
m = dump.DumpMaster(s, o)
assert s.apps.add.call_count == 2
def test_replacements(self):
o = dump.Options(replacements=[(".*", "content", "foo")])
m = dump.DumpMaster(None, o, None)
o = _options(replacements=[(".*", "content", "foo")])
m = dump.DumpMaster(None, o)
f = self._cycle(m, "content")
assert f.request.content == "foo"
def test_setheader(self):
o = dump.Options(setheaders=[(".*", "one", "two")])
m = dump.DumpMaster(None, o, None)
o = _options(setheaders=[(".*", "one", "two")])
m = dump.DumpMaster(None, o)
f = self._cycle(m, "content")
assert f.request.headers["one"] == ["two"]
def test_basic(self):
for i in (1, 2, 3):
assert "GET" in self._dummy_cycle(1, "~s", "", verbosity=i, eventlog=True)
assert "GET" in self._dummy_cycle(1, "~s", "\x00\x00\x00", verbosity=i)
assert "GET" in self._dummy_cycle(1, "~s", "ascii", verbosity=i)
assert "GET" in self._dummy_cycle(1, "", filt=["~s"], verbosity=i, eventlog=True)
assert "GET" in self._dummy_cycle(1, "\x00\x00\x00", filt=["~s"], verbosity=i)
assert "GET" in self._dummy_cycle(1, "ascii", filt=["~s"], verbosity=i)
def test_write(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a")
self._dummy_cycle(1, None, "", wfile=p, verbosity=0)
self._dummy_cycle(1, "", wfile=p, verbosity=0)
assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 1
def test_write_err(self):
tutils.raises(
tutils.raises(
dump.DumpError,
self._dummy_cycle,
1,
None,
"",
wfile = "nonexistentdir/foo"
)
def test_script(self):
ret = self._dummy_cycle(
1, None, "",
1, "",
scripts=[[tutils.test_data.path("scripts/all.py")]], verbosity=0, eventlog=True
)
assert "XCLIENTCONNECT" in ret
@@ -158,16 +166,15 @@ class TestDumpMaster:
assert "XCLIENTDISCONNECT" in ret
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=[["nonexistent"]]
self._dummy_cycle, 1, "", scripts=[["nonexistent"]]
)
tutils.raises(
dump.DumpError,
self._dummy_cycle, 1, None, "", scripts=[["starterr.py"]]
self._dummy_cycle, 1, "", scripts=[[tutils.test_data.path("scripts/starterr.py")]]
)
def test_stickycookie(self):
self._dummy_cycle(1, None, "", stickycookie = ".*")
self._dummy_cycle(1, "", stickycookie = ".*")
def test_stickyauth(self):
self._dummy_cycle(1, None, "", stickyauth = ".*")
self._dummy_cycle(1, "", stickyauth = ".*")

View File

@@ -74,7 +74,7 @@ class MockParser:
class TestProcessProxyOptions:
def p(self, *args):
parser = argparse.ArgumentParser()
cmdline.common_options(parser)
cmdline.add_common_arguments(parser)
opts = parser.parse_args(args=args)
m = MockParser()
return m, proxy.process_proxy_options(m, opts)
@@ -136,7 +136,7 @@ class TestProxyServer:
@tutils.SkipWindows # binding to 0.0.0.0:1 works without special permissions on Windows
def test_err(self):
parser = argparse.ArgumentParser()
cmdline.common_options(parser)
cmdline.add_common_arguments(parser)
opts = parser.parse_args(args=[])
tutils.raises("error starting proxy server", proxy.ProxyServer, opts, 1)