Add coding style check, reformat.

This commit is contained in:
Aldo Cortesi
2015-05-30 12:03:28 +12:00
parent 1a106f4080
commit a05a70d816
102 changed files with 1639 additions and 854 deletions

View File

@@ -1,8 +1,10 @@
import os, sys, mock
import os
import sys
import mock
if os.name == "nt":
m = mock.Mock()
m.__version__ = "1.1.1"
m.Widget = mock.Mock
m.WidgetWrap = mock.Mock
sys.modules['urwid'] = m
sys.modules['urwid.util'] = mock.Mock()
sys.modules['urwid.util'] = mock.Mock()

View File

@@ -4,14 +4,18 @@ parser = argparse.ArgumentParser()
parser.add_argument('--var', type=int)
var = 0
def start(ctx, argv):
global var
var = parser.parse_args(argv[1:]).var
def here(ctx):
global var
var += 1
return var
def errargs():
pass

View File

@@ -1,28 +1,36 @@
log = []
def clientconnect(ctx, cc):
ctx.log("XCLIENTCONNECT")
log.append("clientconnect")
def serverconnect(ctx, cc):
ctx.log("XSERVERCONNECT")
log.append("serverconnect")
def request(ctx, f):
ctx.log("XREQUEST")
log.append("request")
def response(ctx, f):
ctx.log("XRESPONSE")
log.append("response")
def responseheaders(ctx, f):
ctx.log("XRESPONSEHEADERS")
log.append("responseheaders")
def clientdisconnect(ctx, cc):
ctx.log("XCLIENTDISCONNECT")
log.append("clientdisconnect")
def error(ctx, cc):
ctx.log("XERROR")
log.append("error")

View File

@@ -29,4 +29,4 @@ def error(context, err):
@concurrent
def clientdisconnect(context, dc):
context.log("clientdisconnect")
context.log("clientdisconnect")

View File

@@ -1,5 +1,6 @@
from libmproxy.script import concurrent
@concurrent
def start(context, argv):
pass
pass

View File

@@ -2,4 +2,3 @@
def request(ctx, f):
f = ctx.duplicate_flow(f)
ctx.replay_request(f)

View File

@@ -4,4 +4,4 @@ def modify(chunks):
def responseheaders(context, flow):
flow.response.stream = modify
flow.response.stream = modify

View File

@@ -1,8 +1,13 @@
import mock, socket, os, time
import mock
import socket
import os
import time
from libmproxy import dump
from netlib import certutils, tcp
from libpathod.pathoc import Pathoc
import tutils, tservers
import tutils
import tservers
class TestApp(tservers.HTTPProxTest):
def test_basic(self):

View File

@@ -37,13 +37,24 @@ def test_parse_replace_hook():
def test_parse_server_spec():
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "")
assert cmdline.parse_server_spec("http://foo.com:88") == [False, False, "foo.com", 88]
assert cmdline.parse_server_spec("http://foo.com") == [False, False, "foo.com", 80]
assert cmdline.parse_server_spec("https://foo.com") == [True, True, "foo.com", 443]
assert cmdline.parse_server_spec_special("https2http://foo.com") == [True, False, "foo.com", 80]
assert cmdline.parse_server_spec_special("http2https://foo.com") == [False, True, "foo.com", 443]
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "foo.com")
tutils.raises("Invalid server specification", cmdline.parse_server_spec, "http://")
assert cmdline.parse_server_spec(
"http://foo.com:88") == [False, False, "foo.com", 88]
assert cmdline.parse_server_spec(
"http://foo.com") == [False, False, "foo.com", 80]
assert cmdline.parse_server_spec(
"https://foo.com") == [True, True, "foo.com", 443]
assert cmdline.parse_server_spec_special(
"https2http://foo.com") == [True, False, "foo.com", 80]
assert cmdline.parse_server_spec_special(
"http2https://foo.com") == [False, True, "foo.com", 443]
tutils.raises(
"Invalid server specification",
cmdline.parse_server_spec,
"foo.com")
tutils.raises(
"Invalid server specification",
cmdline.parse_server_spec,
"http://")
def test_parse_setheaders():
@@ -103,7 +114,7 @@ def test_common():
)
p = tutils.test_data.path("data/replace")
opts.replace_file = [("/foo/bar/%s"%p)]
opts.replace_file = [("/foo/bar/%s" % p)]
v = cmdline.get_common_options(opts)["replacements"]
assert len(v) == 1
assert v[0][2].strip() == "replacecontents"
@@ -122,5 +133,3 @@ def test_mitmdump():
def test_mitmweb():
ap = cmdline.mitmweb()
assert ap

View File

@@ -1,4 +1,7 @@
import os, sys, mock, gc
import os
import sys
import mock
import gc
from os.path import normpath
import mock_urwid
from libmproxy import console
@@ -6,6 +9,7 @@ from libmproxy.console import common
import tutils
class TestConsoleState:
def test_flow(self):
"""

View File

@@ -31,40 +31,39 @@ class TestContentView:
def test_view_auto(self):
v = cv.ViewAuto()
f = v(
odict.ODictCaseless(),
"foo",
1000
)
odict.ODictCaseless(),
"foo",
1000
)
assert f[0] == "Raw"
f = v(
odict.ODictCaseless(
[["content-type", "text/html"]],
),
"<html></html>",
1000
)
odict.ODictCaseless(
[["content-type", "text/html"]],
),
"<html></html>",
1000
)
assert f[0] == "HTML"
f = v(
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"foo",
1000
)
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"foo",
1000
)
assert f[0] == "Raw"
f = v(
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"<xml></xml>",
1000
)
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"<xml></xml>",
1000
)
assert f[0].startswith("XML")
def test_view_urlencoded(self):
d = utils.urlencode([("one", "two"), ("three", "four")])
v = cv.ViewURLEncoded()
@@ -91,7 +90,7 @@ class TestContentView:
v = cv.ViewJSON()
assert v([], "{}", 1000)
assert not v([], "{", 1000)
assert v([], "[" + ",".join(["0"]*cv.VIEW_CUTOFF) + "]", 1000)
assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000)
assert v([], "[1, 2, 3, 4, 5]", 5)
def test_view_xml(self):
@@ -145,18 +144,18 @@ class TestContentView:
def test_view_image(self):
v = cv.ViewImage()
p = tutils.test_data.path("data/image.png")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
p = tutils.test_data.path("data/image.gif")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
p = tutils.test_data.path("data/image-err1.jpg")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
p = tutils.test_data.path("data/image.ico")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
assert not v([], "flibble", sys.maxint)
assert not v([], "flibble", sys.maxsize)
def test_view_multipart(self):
view = cv.ViewMultipart()
@@ -187,71 +186,70 @@ Larry
def test_get_content_view(self):
r = cv.get_content_view(
cv.get("Raw"),
[["content-type", "application/json"]],
"[1, 2, 3]",
1000,
lambda x, l: None,
False
)
cv.get("Raw"),
[["content-type", "application/json"]],
"[1, 2, 3]",
1000,
lambda x, l: None,
False
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),
[["content-type", "application/json"]],
"[1, 2, 3]",
1000,
lambda x, l: None,
False
)
cv.get("Auto"),
[["content-type", "application/json"]],
"[1, 2, 3]",
1000,
lambda x, l: None,
False
)
assert r[0] == "JSON"
r = cv.get_content_view(
cv.get("Auto"),
[["content-type", "application/json"]],
"[1, 2",
1000,
lambda x, l: None,
False
)
cv.get("Auto"),
[["content-type", "application/json"]],
"[1, 2",
1000,
lambda x, l: None,
False
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("AMF"),
[],
"[1, 2",
1000,
lambda x, l: None,
False
)
cv.get("AMF"),
[],
"[1, 2",
1000,
lambda x, l: None,
False
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]"),
1000,
lambda x, l: None,
False
)
cv.get("Auto"),
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]"),
1000,
lambda x, l: None,
False
)
assert "decoded gzip" in r[0]
assert "JSON" in r[0]
r = cv.get_content_view(
cv.get("XML"),
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]"),
1000,
lambda x, l: None,
False
)
cv.get("XML"),
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]"),
1000,
lambda x, l: None,
False
)
assert "decoded gzip" in r[0]
assert "Raw" in r[0]
@@ -261,24 +259,25 @@ if pyamf:
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf01")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
p = tutils.test_data.path("data/amf02")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
def test_view_amf_response():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf03")
assert v([], file(p,"rb").read(), sys.maxint)
assert v([], file(p, "rb").read(), sys.maxsize)
if cv.ViewProtobuf.is_available():
def test_view_protobuf_request():
v = cv.ViewProtobuf()
p = tutils.test_data.path("data/protobuf01")
content_type, output = v([], file(p,"rb").read(), sys.maxint)
content_type, output = v([], file(p, "rb").read(), sys.maxsize)
assert content_type == "Protobuf"
assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")

View File

@@ -5,10 +5,12 @@ if os.name == "nt":
import libmproxy.console.help as help
class DummyLoop:
def __init__(self):
self.widget = None
class DummyMaster:
def __init__(self):
self.loop = DummyLoop()

View File

@@ -8,5 +8,3 @@ class TestMaster:
msg = mock.MagicMock()
m.handle("type", msg)
assert msg.reply.call_count == 1

View File

@@ -116,7 +116,6 @@ class TestDumpMaster:
0, None, "", verbosity=1, rfile="test_dump.py"
)
def test_options(self):
o = dump.Options(verbosity = 2)
assert o.verbosity == 2
@@ -147,21 +146,25 @@ class TestDumpMaster:
def test_basic(self):
for i in (1, 2, 3):
assert "GET" in self._dummy_cycle(1, "~s", "", flow_detail=i)
assert "GET" in self._dummy_cycle(1, "~s", "\x00\x00\x00", flow_detail=i)
assert "GET" in self._dummy_cycle(
1,
"~s",
"\x00\x00\x00",
flow_detail=i)
assert "GET" in self._dummy_cycle(1, "~s", "ascii", flow_detail=i)
def test_write(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a")
self._dummy_cycle(1, None, "", outfile=(p,"wb"), verbosity=0)
assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 1
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1
def test_write_append(self):
with tutils.tmpdir() as d:
p = os.path.join(d, "a.append")
self._dummy_cycle(1, None, "", outfile=(p,"wb"), verbosity=0)
self._dummy_cycle(1, None, "", outfile=(p,"ab"), verbosity=0)
assert len(list(flow.FlowReader(open(p,"rb")).stream())) == 2
self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0)
self._dummy_cycle(1, None, "", outfile=(p, "ab"), verbosity=0)
assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2
def test_write_err(self):
tutils.raises(

View File

@@ -1,5 +1,6 @@
from libmproxy import encoding
def test_identity():
assert "string" == encoding.decode("identity", "string")
assert "string" == encoding.encode("identity", "string")
@@ -8,12 +9,25 @@ def test_identity():
def test_gzip():
assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string"))
assert "string" == encoding.decode(
"gzip",
encoding.encode(
"gzip",
"string"))
assert None == encoding.decode("gzip", "bogus")
def test_deflate():
assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string"))
assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4])
assert "string" == encoding.decode(
"deflate",
encoding.encode(
"deflate",
"string"))
assert "string" == encoding.decode(
"deflate",
encoding.encode(
"deflate",
"string")[
2:-
4])
assert None == encoding.decode("deflate", "bogus")

View File

@@ -21,7 +21,7 @@ def test_load_scripts():
f += " foo bar" # two arguments required
try:
s = script.Script(f, tmaster) # Loads the script file.
except Exception, v:
except Exception as v:
if not "ImportError" in str(v):
raise
else:

View File

@@ -5,6 +5,7 @@ from libmproxy.protocol import http
from libmproxy.protocol.primitives import Error
import tutils
class TestParsing:
def _dump(self, x):
c = cStringIO.StringIO()
@@ -99,7 +100,15 @@ class TestMatching:
headers = odict.ODictCaseless()
headers["header_response"] = ["svalue"]
f.response = http.HTTPResponse((1, 1), 200, "OK", headers, "content_response", None, None)
f.response = http.HTTPResponse(
(1,
1),
200,
"OK",
headers,
"content_response",
None,
None)
return f

View File

@@ -1,4 +1,6 @@
import Queue, time, os.path
import Queue
import time
import os.path
from cStringIO import StringIO
import email.utils
import mock
@@ -33,7 +35,6 @@ def test_app_registry():
assert ar.get(r)
class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
@@ -115,7 +116,15 @@ class TestClientPlaybackState:
class TestServerPlaybackState:
def test_hash(self):
s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False)
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow()
r2 = tutils.tflow()
@@ -131,7 +140,15 @@ class TestServerPlaybackState:
assert s._hash(r) != s._hash(r2)
def test_headers(self):
s = flow.ServerPlaybackState(["foo"], [], False, False, None, False, None, False)
s = flow.ServerPlaybackState(
["foo"],
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
r.request.headers["foo"] = ["bar"]
r2 = tutils.tflow(resp=True)
@@ -152,7 +169,9 @@ class TestServerPlaybackState:
r2 = tutils.tflow(resp=True)
r2.request.headers["key"] = ["two"]
s = flow.ServerPlaybackState(None, [r, r2], False, False, None, False, None, False)
s = flow.ServerPlaybackState(
None, [
r, r2], False, False, None, False, None, False)
assert s.count() == 2
assert len(s.fmap.keys()) == 1
@@ -173,34 +192,41 @@ class TestServerPlaybackState:
r2 = tutils.tflow(resp=True)
r2.request.headers["key"] = ["two"]
s = flow.ServerPlaybackState(None, [r, r2], False, True, None, False, None, False)
s = flow.ServerPlaybackState(
None, [
r, r2], False, True, None, False, None, False)
assert s.count() == 2
s.next_flow(r)
assert s.count() == 2
def test_ignore_params(self):
s = flow.ServerPlaybackState(None, [], False, False, ["param1", "param2"], False, None, False)
s = flow.ServerPlaybackState(
None, [], False, False, [
"param1", "param2"], False, None, False)
r = tutils.tflow(resp=True)
r.request.path="/test?param1=1"
r.request.path = "/test?param1=1"
r2 = tutils.tflow(resp=True)
r2.request.path="/test"
r2.request.path = "/test"
assert s._hash(r) == s._hash(r2)
r2.request.path="/test?param1=2"
r2.request.path = "/test?param1=2"
assert s._hash(r) == s._hash(r2)
r2.request.path="/test?param2=1"
r2.request.path = "/test?param2=1"
assert s._hash(r) == s._hash(r2)
r2.request.path="/test?param3=2"
r2.request.path = "/test?param3=2"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params(self):
s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False)
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.headers[
"Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.content = "paramx=x&param1=1"
r2 = tutils.tflow(resp=True)
r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.headers[
"Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.content = "paramx=x&param1=1"
# same parameters
assert s._hash(r) == s._hash(r2)
@@ -208,20 +234,22 @@ class TestServerPlaybackState:
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# missing parameter
r2.request.content="paramx=x"
r2.request.content = "paramx=x"
assert s._hash(r) == s._hash(r2)
# ignorable parameter added
r2.request.content="paramx=x&param1=2"
r2.request.content = "paramx=x&param1=2"
assert s._hash(r) == s._hash(r2)
# not ignorable parameter changed
r2.request.content="paramx=y&param1=1"
r2.request.content = "paramx=y&param1=1"
assert not s._hash(r) == s._hash(r2)
# not ignorable parameter missing
r2.request.content="param1=1"
r2.request.content = "param1=1"
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_params_other_content_type(self):
s = flow.ServerPlaybackState(None, [], False, False, None, False, ["param1", "param2"], False)
s = flow.ServerPlaybackState(
None, [], False, False, None, False, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
r.request.headers["Content-Type"] = ["application/json"]
r.request.content = '{"param1":"1"}'
@@ -235,19 +263,31 @@ class TestServerPlaybackState:
assert not s._hash(r) == s._hash(r2)
def test_ignore_payload_wins_over_params(self):
#NOTE: parameters are mutually exclusive in options
s = flow.ServerPlaybackState(None, [], False, False, None, True, ["param1", "param2"], False)
# NOTE: parameters are mutually exclusive in options
s = flow.ServerPlaybackState(
None, [], False, False, None, True, [
"param1", "param2"], False)
r = tutils.tflow(resp=True)
r.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.headers[
"Content-Type"] = ["application/x-www-form-urlencoded"]
r.request.content = "paramx=y"
r2 = tutils.tflow(resp=True)
r2.request.headers["Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.headers[
"Content-Type"] = ["application/x-www-form-urlencoded"]
r2.request.content = "paramx=x"
# same parameters
assert s._hash(r) == s._hash(r2)
def test_ignore_content(self):
s = flow.ServerPlaybackState(None, [], False, False, None, False, None, False)
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
@@ -257,8 +297,16 @@ class TestServerPlaybackState:
r2.request.content = "bar"
assert not s._hash(r) == s._hash(r2)
#now ignoring content
s = flow.ServerPlaybackState(None, [], False, False, None, True, None, False)
# now ignoring content
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
True,
None,
False)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.content = "foo"
@@ -272,14 +320,22 @@ class TestServerPlaybackState:
assert s._hash(r) == s._hash(r2)
def test_ignore_host(self):
s = flow.ServerPlaybackState(None, [], False, False, None, False, None, True)
s = flow.ServerPlaybackState(
None,
[],
False,
False,
None,
False,
None,
True)
r = tutils.tflow(resp=True)
r2 = tutils.tflow(resp=True)
r.request.host="address"
r2.request.host="address"
r.request.host = "address"
r2.request.host = "address"
assert s._hash(r) == s._hash(r2)
r2.request.host="wrong_address"
r2.request.host = "wrong_address"
assert s._hash(r) == s._hash(r2)
@@ -343,12 +399,14 @@ class TestFlow:
def test_getset_state(self):
f = tutils.tflow(resp=True)
state = f.get_state()
assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state()
assert f.get_state() == protocol.http.HTTPFlow.from_state(
state).get_state()
f.response = None
f.error = Error("error")
state = f.get_state()
assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state()
assert f.get_state() == protocol.http.HTTPFlow.from_state(
state).get_state()
f2 = f.copy()
f2.id = f.id # copy creates a different uuid
@@ -430,7 +488,6 @@ class TestFlow:
assert f.response.content == "abarb"
class TestState:
def test_backup(self):
c = flow.State()
@@ -519,7 +576,7 @@ class TestState:
assert c.intercept_txt == "~q"
assert "Invalid" in c.set_intercept("~")
assert not c.set_intercept(None)
assert c.intercept_txt == None
assert c.intercept_txt is None
def _add_request(self, state):
f = tutils.tflow()
@@ -608,7 +665,13 @@ class TestSerialize:
def test_load_flows_reverse(self):
r = self._treader()
s = flow.State()
conf = ProxyConfig(mode="reverse", upstream_server=[True,True,"use-this-domain",80])
conf = ProxyConfig(
mode="reverse",
upstream_server=[
True,
True,
"use-this-domain",
80])
fm = flow.FlowMaster(DummyServer(conf), s)
fm.load_flows(r)
assert s.flows[0].request.host == "use-this-domain"
@@ -630,7 +693,6 @@ class TestSerialize:
r = flow.FlowReader(sio)
assert len(list(r.stream()))
def test_error(self):
sio = StringIO()
sio.write("bogus")
@@ -661,7 +723,8 @@ class TestFlowMaster:
assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
assert not fm.unload_scripts()
assert fm.load_script("nonexistent")
assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py"))
assert "ValueError" in fm.load_script(
tutils.test_data.path("scripts/starterr.py"))
assert len(fm.scripts) == 0
def test_getset_ignore(self):
@@ -707,14 +770,14 @@ class TestFlowMaster:
assert fm.scripts[0].ns["log"][-1] == "request"
fm.handle_response(f)
assert fm.scripts[0].ns["log"][-1] == "response"
#load second script
# load second script
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
assert len(fm.scripts) == 2
fm.handle_clientdisconnect(f.server_conn)
assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
#unload first script
# unload first script
fm.unload_scripts()
assert len(fm.scripts) == 0
assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
@@ -765,7 +828,16 @@ class TestFlowMaster:
f = tutils.tflow(resp=True)
pb = [tutils.tflow(resp=True), f]
fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
assert not fm.start_server_playback(pb, False, [], False, False, None, False, None, False)
assert not fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert not fm.start_client_playback(pb, False)
fm.client_playback.testing = True
@@ -788,16 +860,43 @@ class TestFlowMaster:
fm.refresh_server_playback = True
assert not fm.do_server_playback(tutils.tflow())
fm.start_server_playback(pb, False, [], False, False, None, False, None, False)
fm.start_server_playback(
pb,
False,
[],
False,
False,
None,
False,
None,
False)
assert fm.do_server_playback(tutils.tflow())
fm.start_server_playback(pb, False, [], True, False, None, False, None, False)
fm.start_server_playback(
pb,
False,
[],
True,
False,
None,
False,
None,
False)
r = tutils.tflow()
r.request.content = "gibble"
assert not fm.do_server_playback(r)
assert fm.do_server_playback(tutils.tflow())
fm.start_server_playback(pb, False, [], True, False, None, False, None, False)
fm.start_server_playback(
pb,
False,
[],
True,
False,
None,
False,
None,
False)
q = Queue.Queue()
fm.tick(q, 0)
assert fm.should_exit.is_set()
@@ -812,7 +911,16 @@ class TestFlowMaster:
pb = [f]
fm = flow.FlowMaster(None, s)
fm.refresh_server_playback = True
fm.start_server_playback(pb, True, [], False, False, None, False, None, False)
fm.start_server_playback(
pb,
True,
[],
False,
False,
None,
False,
None,
False)
f = tutils.tflow()
f.request.host = "nonexistent"
@@ -862,8 +970,9 @@ class TestFlowMaster:
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
def r():
r = flow.FlowReader(open(p,"rb"))
r = flow.FlowReader(open(p, "rb"))
return list(r.stream())
s = flow.State()
@@ -884,6 +993,7 @@ class TestFlowMaster:
assert not r()[1].response
class TestRequest:
def test_simple(self):
f = tutils.tflow()
@@ -919,7 +1029,7 @@ class TestRequest:
r.host = "address"
r.port = 22
assert r.url== "https://address:22/path"
assert r.url == "https://address:22/path"
assert r.pretty_url(True) == "https://address:22/path"
r.headers["Host"] = ["foo.com"]
@@ -1062,6 +1172,7 @@ class TestRequest:
resp.headers = h
assert resp.headers.get_first("content-type") == "text/plain"
class TestResponse:
def test_simple(self):
f = tutils.tflow(resp=True)
@@ -1077,7 +1188,9 @@ class TestResponse:
assert resp.size() == len(resp.assemble())
resp.content = CONTENT_MISSING
tutils.raises("Cannot assemble flow with CONTENT_MISSING", resp.assemble)
tutils.raises(
"Cannot assemble flow with CONTENT_MISSING",
resp.assemble)
def test_refresh(self):
r = tutils.tresp()
@@ -1086,14 +1199,15 @@ class TestResponse:
pre = r.headers["date"]
r.refresh(n)
assert pre == r.headers["date"]
r.refresh(n+60)
r.refresh(n + 60)
d = email.utils.parsedate_tz(r.headers["date"][0])
d = email.utils.mktime_tz(d)
# Weird that this is not exact...
assert abs(60-(d-n)) <= 1
assert abs(60 - (d - n)) <= 1
r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
r.headers[
"set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"]
r.refresh()
def test_refresh_cookie(self):
@@ -1146,7 +1260,7 @@ class TestResponse:
def test_header_size(self):
r = tutils.tresp()
result = len(r._assemble_headers())
assert result==44
assert result == 44
def test_get_content_type(self):
h = odict.ODictCaseless()
@@ -1178,7 +1292,7 @@ class TestClientConnection:
c = tutils.tclient_conn()
assert ClientConnection.from_state(c.get_state()).get_state() ==\
c.get_state()
c.get_state()
c2 = tutils.tclient_conn()
c2.address.address = (c2.address.host, 4242)
@@ -1295,7 +1409,6 @@ def test_setheaders():
h.run(f)
assert f.request.content == "foo"
h.clear()
h.add("~s", "one", "two")
h.add("~s", "one", "three")

View File

@@ -5,16 +5,17 @@ import tservers
after being fixed to check for regressions.
"""
class TestFuzzy(tservers.HTTPProxTest):
def test_idna_err(self):
req = r'get:"http://localhost:%s":i10,"\xc6"'
p = self.pathoc()
assert p.request(req%self.server.port).status_code == 400
assert p.request(req % self.server.port).status_code == 400
def test_nullbytes(self):
req = r'get:"http://localhost:%s":i19,"\x00"'
p = self.pathoc()
assert p.request(req%self.server.port).status_code == 400
assert p.request(req % self.server.port).status_code == 400
def test_invalid_ports(self):
req = 'get:"http://localhost:999999"'
@@ -24,12 +25,12 @@ class TestFuzzy(tservers.HTTPProxTest):
def test_invalid_ipv6_url(self):
req = 'get:"http://localhost:%s":i13,"["'
p = self.pathoc()
assert p.request(req%self.server.port).status_code == 400
assert p.request(req % self.server.port).status_code == 400
def test_invalid_upstream(self):
req = r"get:'http://localhost:%s/p/200:i10,\x27+\x27'"
p = self.pathoc()
assert p.request(req%self.server.port).status_code == 502
assert p.request(req % self.server.port).status_code == 502
def test_upstream_disconnect(self):
req = r'200:d0'

View File

@@ -1,4 +1,5 @@
import tutils, sys
import tutils
import sys
from libmproxy.platform import pf
@@ -6,10 +7,20 @@ class TestLookup:
def test_simple(self):
if sys.platform == "freebsd10":
p = tutils.test_data.path("data/pf02")
d = open(p,"rb").read()
d = open(p, "rb").read()
else:
p = tutils.test_data.path("data/pf01")
d = open(p,"rb").read()
d = open(p, "rb").read()
assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80)
tutils.raises("Could not resolve original destination", pf.lookup, "192.168.1.112", 40000, d)
tutils.raises("Could not resolve original destination", pf.lookup, "192.168.1.111", 40001, d)
tutils.raises(
"Could not resolve original destination",
pf.lookup,
"192.168.1.112",
40000,
d)
tutils.raises(
"Could not resolve original destination",
pf.lookup,
"192.168.1.111",
40001,
d)

View File

@@ -61,7 +61,8 @@ class TestHTTPRequest:
assert "Host" in r.headers
def test_expect_header(self):
s = StringIO("GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
s = StringIO(
"GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
w = StringIO()
r = HTTPRequest.from_stream(s, wfile=w)
assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
@@ -84,7 +85,8 @@ class TestHTTPRequest:
tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
s = StringIO("GET http://address:22/ HTTP/1.1")
r = HTTPRequest.from_stream(s)
assert r.assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n"
assert r.assemble(
) == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\nContent-Length: 0\r\n\r\n"
def test_http_options_relative_form_in(self):
"""
@@ -105,10 +107,10 @@ class TestHTTPRequest:
r.host = 'address'
r.port = 80
r.scheme = "http"
assert r.assemble() == ("OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
"Host: address\r\n"
"Content-Length: 0\r\n\r\n")
assert r.assemble() == (
"OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
"Host: address\r\n"
"Content-Length: 0\r\n\r\n")
def test_assemble_unknown_form(self):
r = tutils.treq()
@@ -257,7 +259,8 @@ class TestHTTPResponse:
def test_get_cookies_with_parameters(self):
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
h["Set-Cookie"] = [
"cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
resp = tutils.tresp()
resp.headers = h
result = resp.get_cookies()

View File

@@ -78,7 +78,6 @@ class TestProcessProxyOptions:
def test_no_transparent(self):
self.assert_err("transparent mode not supported", "-T")
@mock.patch("libmproxy.platform.resolver")
def test_modes(self, _):
self.assert_noerr("-R", "http://localhost")
@@ -96,28 +95,42 @@ class TestProcessProxyOptions:
def test_client_certs(self):
with tutils.tmpdir() as cadir:
self.assert_noerr("--client-certs", cadir)
self.assert_err("directory does not exist", "--client-certs", "nonexistent")
self.assert_err(
"directory does not exist",
"--client-certs",
"nonexistent")
def test_certs(self):
with tutils.tmpdir() as cadir:
self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem"))
self.assert_noerr(
"--cert",
tutils.test_data.path("data/testkey.pem"))
self.assert_err("does not exist", "--cert", "nonexistent")
def test_auth(self):
p = self.assert_noerr("--nonanonymous")
assert p.authenticator
p = self.assert_noerr("--htpasswd", tutils.test_data.path("data/htpasswd"))
p = self.assert_noerr(
"--htpasswd",
tutils.test_data.path("data/htpasswd"))
assert p.authenticator
self.assert_err("malformed htpasswd file", "--htpasswd", tutils.test_data.path("data/htpasswd.invalid"))
self.assert_err(
"malformed htpasswd file",
"--htpasswd",
tutils.test_data.path("data/htpasswd.invalid"))
p = self.assert_noerr("--singleuser", "test:test")
assert p.authenticator
self.assert_err("invalid single-user specification", "--singleuser", "test")
self.assert_err(
"invalid single-user specification",
"--singleuser",
"test")
class TestProxyServer:
@tutils.SkipWindows # binding to 0.0.0.0:1 works without special permissions on Windows
# binding to 0.0.0.0:1 works without special permissions on Windows
@tutils.SkipWindows
def test_err(self):
conf = ProxyConfig(
port=1
@@ -142,6 +155,12 @@ class TestConnectionHandler:
def test_fatal_error(self):
config = mock.Mock()
config.mode.get_upstream_server.side_effect = RuntimeError
c = ConnectionHandler(config, mock.MagicMock(), ("127.0.0.1", 8080), None, mock.MagicMock())
c = ConnectionHandler(
config,
mock.MagicMock(),
("127.0.0.1",
8080),
None,
mock.MagicMock())
with tutils.capture_stderr(c.handle) as output:
assert "mitmproxy has crashed" in output

View File

@@ -11,7 +11,7 @@ class TestScript:
s = flow.State()
fm = flow.FlowMaster(None, s)
sp = tutils.test_data.path("scripts/a.py")
p = script.Script("%s --var 40"%sp, fm)
p = script.Script("%s --var 40" % sp, fm)
assert "here" in p.ns
assert p.run("here") == (True, 41)
@@ -79,7 +79,9 @@ class TestScript:
def test_concurrent2(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
s = script.Script(tutils.test_data.path("scripts/concurrent_decorator.py"), fm)
s = script.Script(
tutils.test_data.path("scripts/concurrent_decorator.py"),
fm)
s.load()
m = mock.Mock()
@@ -110,8 +112,9 @@ class TestScript:
fm = flow.FlowMaster(None, s)
tutils.raises(
"decorator not supported for this method",
script.Script, tutils.test_data.path("scripts/concurrent_decorator_err.py"), fm
)
script.Script,
tutils.test_data.path("scripts/concurrent_decorator_err.py"),
fm)
def test_command_parsing():
@@ -120,5 +123,3 @@ def test_command_parsing():
absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py"))
s = script.Script(absfilepath, fm)
assert os.path.isfile(s.argv[0])

View File

@@ -1,10 +1,12 @@
import socket, time
import socket
import time
from libmproxy.proxy.config import HostMatcher
import libpathod
from netlib import tcp, http_auth, http
from libpathod import pathoc, pathod
from netlib.certutils import SSLCert
import tutils, tservers
import tutils
import tservers
from libmproxy.protocol import KILL, Error
from libmproxy.protocol.http import CONTENT_MISSING
@@ -16,9 +18,10 @@ from libmproxy.protocol.http import CONTENT_MISSING
for a 200 response.
"""
class CommonMixin:
def test_large(self):
assert len(self.pathod("200:b@50k").content) == 1024*50
assert len(self.pathod("200:b@50k").content) == 1024 * 50
@staticmethod
def wait_until_not_live(flow):
@@ -56,7 +59,8 @@ class CommonMixin:
# Port error
l.request.port = 1
# In upstream mode, we get a 502 response from the upstream proxy server.
# In upstream mode with ssl, the replay will fail as we cannot establish SSL with the upstream proxy.
# In upstream mode with ssl, the replay will fail as we cannot establish
# SSL with the upstream proxy.
rt = self.master.replay_request(l, block=True)
assert not rt
if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl:
@@ -68,7 +72,9 @@ class CommonMixin:
f = self.pathod("304")
assert f.status_code == 304
l = self.master.state.view[-1] # In Upstream mode with SSL, we may already have a previous CONNECT request.
# In Upstream mode with SSL, we may already have a previous CONNECT
# request.
l = self.master.state.view[-1]
assert l.client_conn.address
assert "host" in l.request.headers
assert l.response.code == 304
@@ -90,11 +96,13 @@ class CommonMixin:
log = self.server.last_log()
assert log["request"]["sni"] == "testserver.com"
class TcpMixin:
def _ignore_on(self):
assert not hasattr(self, "_ignore_backup")
self._ignore_backup = self.config.check_ignore
self.config.check_ignore = HostMatcher([".+:%s" % self.server.port] + self.config.check_ignore.patterns)
self.config.check_ignore = HostMatcher(
[".+:%s" % self.server.port] + self.config.check_ignore.patterns)
def _ignore_off(self):
assert hasattr(self, "_ignore_backup")
@@ -125,22 +133,26 @@ class TcpMixin:
# Test Non-HTTP traffic
spec = "200:i0,@100:d0" # this results in just 100 random bytes
assert self.pathod(spec).status_code == 502 # mitmproxy responds with bad gateway
# mitmproxy responds with bad gateway
assert self.pathod(spec).status_code == 502
self._ignore_on()
tutils.raises("invalid server response", self.pathod, spec) # pathoc tries to parse answer as HTTP
tutils.raises(
"invalid server response",
self.pathod,
spec) # pathoc tries to parse answer as HTTP
self._ignore_off()
def _tcpproxy_on(self):
assert not hasattr(self, "_tcpproxy_backup")
self._tcpproxy_backup = self.config.check_tcp
self.config.check_tcp = HostMatcher([".+:%s" % self.server.port] + self.config.check_tcp.patterns)
self.config.check_tcp = HostMatcher(
[".+:%s" % self.server.port] + self.config.check_tcp.patterns)
def _tcpproxy_off(self):
assert hasattr(self, "_tcpproxy_backup")
self.config.check_ignore = self._tcpproxy_backup
del self._tcpproxy_backup
def test_tcp(self):
spec = '304:h"Alternate-Protocol"="mitmproxy-will-remove-this"'
n = self.pathod(spec)
@@ -165,6 +177,7 @@ class TcpMixin:
# Make sure that TCP messages are in the event log.
assert any("mitmproxy-will-remove-this" in m for m in self.master.log)
class AppMixin:
def test_app(self):
ret = self.app("/")
@@ -188,30 +201,30 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
def test_upstream_ssl_error(self):
p = self.pathoc()
ret = p.request("get:'https://localhost:%s/'"%self.server.port)
ret = p.request("get:'https://localhost:%s/'" % self.server.port)
assert ret.status_code == 400
def test_connection_close(self):
# Add a body, so we have a content-length header, which combined with
# HTTP1.1 means the connection is kept alive.
response = '%s/p/200:b@1'%self.server.urlbase
response = '%s/p/200:b@1' % self.server.urlbase
# Lets sanity check that the connection does indeed stay open by
# issuing two requests over the same connection
p = self.pathoc()
assert p.request("get:'%s'"%response)
assert p.request("get:'%s'"%response)
assert p.request("get:'%s'" % response)
assert p.request("get:'%s'" % response)
# Now check that the connection is closed as the client specifies
p = self.pathoc()
assert p.request("get:'%s':h'Connection'='close'"%response)
assert p.request("get:'%s':h'Connection'='close'" % response)
# There's a race here, which means we can get any of a number of errors.
# Rather than introduce yet another sleep into the test suite, we just
# relax the Exception specification.
tutils.raises(Exception, p.request, "get:'%s'"%response)
tutils.raises(Exception, p.request, "get:'%s'" % response)
def test_reconnect(self):
req = "get:'%s/p/200:b@1:da'"%self.server.urlbase
req = "get:'%s/p/200:b@1:da'" % self.server.urlbase
p = self.pathoc()
assert p.request(req)
# Server has disconnected. Mitmproxy should detect this, and reconnect.
@@ -225,8 +238,8 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
return True
req = "get:'%s/p/200:b@1'"
p = self.pathoc()
assert p.request(req%self.server.urlbase)
assert p.request(req%self.server2.urlbase)
assert p.request(req % self.server.urlbase)
assert p.request(req % self.server2.urlbase)
assert switched(self.proxy.log)
def test_get_connection_err(self):
@@ -237,7 +250,7 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
def test_blank_leading_line(self):
p = self.pathoc()
req = "get:'%s/p/201':i0,'\r\n'"
assert p.request(req%self.server.urlbase).status_code == 201
assert p.request(req % self.server.urlbase).status_code == 201
def test_invalid_headers(self):
p = self.pathoc()
@@ -251,7 +264,9 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(("127.0.0.1", self.proxy.port))
spec = '301:h"Transfer-Encoding"="chunked":r:b"0\\r\\n\\r\\n"'
connection.send("GET http://localhost:%d/p/%s HTTP/1.1\r\n"%(self.server.port, spec))
connection.send(
"GET http://localhost:%d/p/%s HTTP/1.1\r\n" %
(self.server.port, spec))
connection.send("\r\n")
resp = connection.recv(50000)
connection.close()
@@ -270,13 +285,20 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
self.master.set_stream_large_bodies(None)
def test_stream_modify(self):
self.master.load_script(tutils.test_data.path("scripts/stream_modify.py"))
self.master.load_script(
tutils.test_data.path("scripts/stream_modify.py"))
d = self.pathod('200:b"foo"')
assert d.content == "bar"
self.master.unload_scripts()
class TestHTTPAuth(tservers.HTTPProxTest):
authenticator = http_auth.BasicProxyAuth(http_auth.PassManSingleUser("test", "test"), "realm")
authenticator = http_auth.BasicProxyAuth(
http_auth.PassManSingleUser(
"test",
"test"),
"realm")
def test_auth(self):
assert self.pathod("202").status_code == 407
p = self.pathoc()
@@ -284,7 +306,7 @@ class TestHTTPAuth(tservers.HTTPProxTest):
get
'http://localhost:%s/p/202'
h'%s'='%s'
"""%(
""" % (
self.server.port,
http_auth.BasicProxyAuth.AUTH_HEADER,
http.assemble_http_basic_auth("basic", "test", "test")
@@ -294,6 +316,7 @@ class TestHTTPAuth(tservers.HTTPProxTest):
class TestHTTPConnectSSLError(tservers.HTTPProxTest):
certfile = True
def test_go(self):
self.config.ssl_ports.append(self.proxy.port)
p = self.pathoc_raw()
@@ -306,6 +329,7 @@ class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True)
clientcerts = True
def test_clientcert(self):
f = self.pathod("304")
assert f.status_code == 304
@@ -319,6 +343,7 @@ class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin):
ssl = True
certfile = True
def test_certfile(self):
assert self.pathod("304")
@@ -328,11 +353,12 @@ class TestHTTPSNoCommonName(tservers.HTTPProxTest):
Test what happens if we get a cert without common name back.
"""
ssl = True
ssloptions=pathod.SSLOptions(
certs = [
("*", tutils.test_data.path("data/no_common_name.pem"))
]
)
ssloptions = pathod.SSLOptions(
certs = [
("*", tutils.test_data.path("data/no_common_name.pem"))
]
)
def test_http(self):
f = self.pathod("202")
assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
@@ -373,7 +399,6 @@ class TestHttps2Http(tservers.ReverseProxTest):
assert p.request("get:'/p/200'").status_code == 400
class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
ssl = False
@@ -413,15 +438,19 @@ class TestProxy(tservers.HTTPProxTest):
connection.connect(("127.0.0.1", self.proxy.port))
# call pathod server, wait a second to complete the request
connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port)
connection.send(
"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
self.server.port)
time.sleep(1)
connection.send("\r\n")
connection.recv(50000)
connection.close()
request, response = self.master.state.view[0].request, self.master.state.view[0].response
request, response = self.master.state.view[
0].request, self.master.state.view[0].response
assert response.code == 304 # sanity test for our low level request
assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2 #time.sleep might be a little bit shorter than a second
# time.sleep might be a little bit shorter than a second
assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2
def test_request_timestamps_not_affected_by_client_time(self):
# test that don't include user wait time in request's timestamps
@@ -441,10 +470,14 @@ class TestProxy(tservers.HTTPProxTest):
# tests that the client_conn a tcp connection has a tcp_setup_timestamp
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(("localhost", self.proxy.port))
connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port)
connection.send(
"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
self.server.port)
connection.send("\r\n")
connection.recv(5000)
connection.send("GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n"%self.server.port)
connection.send(
"GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
self.server.port)
connection.send("\r\n")
connection.recv(5000)
connection.close()
@@ -462,8 +495,10 @@ class TestProxy(tservers.HTTPProxTest):
f = self.master.state.view[0]
assert f.server_conn.address == ("127.0.0.1", self.server.port)
class TestProxySSL(tservers.HTTPProxTest):
ssl=True
ssl = True
def test_request_ssl_setup_timestamp_presence(self):
# tests that the ssl timestamp is present when ssl is used
f = self.pathod("304:b@10k")
@@ -479,16 +514,24 @@ class MasterRedirectRequest(tservers.TestMaster):
request = f.request
if request.path == "/p/201":
addr = f.live.c.server_conn.address
assert f.live.change_server(("127.0.0.1", self.redirect_port), ssl=False)
assert not f.live.change_server(("127.0.0.1", self.redirect_port), ssl=False)
tutils.raises("SSL handshake error", f.live.change_server, ("127.0.0.1", self.redirect_port), ssl=True)
assert f.live.change_server(
("127.0.0.1", self.redirect_port), ssl=False)
assert not f.live.change_server(
("127.0.0.1", self.redirect_port), ssl=False)
tutils.raises(
"SSL handshake error",
f.live.change_server,
("127.0.0.1",
self.redirect_port),
ssl=True)
assert f.live.change_server(addr, ssl=False)
request.url = "http://127.0.0.1:%s/p/201" % self.redirect_port
tservers.TestMaster.handle_request(self, f)
def handle_response(self, f):
f.response.content = str(f.client_conn.address.port)
f.response.headers["server-conn-id"] = [str(f.server_conn.source_address.port)]
f.response.headers[
"server-conn-id"] = [str(f.server_conn.source_address.port)]
tservers.TestMaster.handle_response(self, f)
@@ -502,37 +545,41 @@ class TestRedirectRequest(tservers.HTTPProxTest):
self.server.clear_log()
self.server2.clear_log()
r1 = p.request("get:'%s/p/200'"%self.server.urlbase)
r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
assert r1.status_code == 200
assert self.server.last_log()
assert not self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
r2 = p.request("get:'%s/p/201'"%self.server.urlbase)
r2 = p.request("get:'%s/p/201'" % self.server.urlbase)
assert r2.status_code == 201
assert not self.server.last_log()
assert self.server2.last_log()
self.server.clear_log()
self.server2.clear_log()
r3 = p.request("get:'%s/p/202'"%self.server.urlbase)
r3 = p.request("get:'%s/p/202'" % self.server.urlbase)
assert r3.status_code == 202
assert self.server.last_log()
assert not self.server2.last_log()
assert r1.content == r2.content == r3.content
assert r1.headers.get_first("server-conn-id") == r3.headers.get_first("server-conn-id")
assert r1.headers.get_first(
"server-conn-id") == r3.headers.get_first("server-conn-id")
# Make sure that we actually use the same connection in this test case
class MasterStreamRequest(tservers.TestMaster):
"""
Enables the stream flag on the flow for all requests
"""
def handle_responseheaders(self, f):
f.response.stream = True
f.reply()
class TestStreamRequest(tservers.HTTPProxTest):
masterclass = MasterStreamRequest
@@ -541,7 +588,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
# a request with 100k of data but without content-length
self.server.clear_log()
r1 = p.request("get:'%s/p/200:r:b@100k:d102400'"%self.server.urlbase)
r1 = p.request("get:'%s/p/200:r:b@100k:d102400'" % self.server.urlbase)
assert r1.status_code == 200
assert len(r1.content) > 100000
assert self.server.last_log()
@@ -551,13 +598,13 @@ class TestStreamRequest(tservers.HTTPProxTest):
# simple request with streaming turned on
self.server.clear_log()
r1 = p.request("get:'%s/p/200'"%self.server.urlbase)
r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
assert r1.status_code == 200
assert self.server.last_log()
# now send back 100k of data, streamed but not chunked
self.server.clear_log()
r1 = p.request("get:'%s/p/200:b@100k'"%self.server.urlbase)
r1 = p.request("get:'%s/p/200:b@100k'" % self.server.urlbase)
assert r1.status_code == 200
assert self.server.last_log()
@@ -567,15 +614,27 @@ class TestStreamRequest(tservers.HTTPProxTest):
connection.connect(("127.0.0.1", self.proxy.port))
fconn = connection.makefile()
spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n7\\r\\nisatest\\r\\n0\\r\\n\\r\\n"'
connection.send("GET %s/p/%s HTTP/1.1\r\n"%(self.server.urlbase, spec))
connection.send(
"GET %s/p/%s HTTP/1.1\r\n" %
(self.server.urlbase, spec))
connection.send("\r\n")
httpversion, code, msg, headers, content = http.read_response(fconn, "GET", None, include_body=False)
httpversion, code, msg, headers, content = http.read_response(
fconn, "GET", None, include_body=False)
assert headers["Transfer-Encoding"][0] == 'chunked'
assert code == 200
chunks = list(content for _, content, _ in http.read_http_body_chunked(fconn, headers, None, "GET", 200, False))
chunks = list(
content for _,
content,
_ in http.read_http_body_chunked(
fconn,
headers,
None,
"GET",
200,
False))
assert chunks == ["this", "isatest", ""]
connection.close()
@@ -589,6 +648,7 @@ class MasterFakeResponse(tservers.TestMaster):
class TestFakeResponse(tservers.HTTPProxTest):
masterclass = MasterFakeResponse
def test_fake(self):
f = self.pathod("200")
assert "header_response" in f.headers.keys()
@@ -601,6 +661,7 @@ class MasterKillRequest(tservers.TestMaster):
class TestKillRequest(tservers.HTTPProxTest):
masterclass = MasterKillRequest
def test_kill(self):
tutils.raises("server disconnect", self.pathod, "200")
# Nothing should have hit the server
@@ -614,6 +675,7 @@ class MasterKillResponse(tservers.TestMaster):
class TestKillResponse(tservers.HTTPProxTest):
masterclass = MasterKillResponse
def test_kill(self):
tutils.raises("server disconnect", self.pathod, "200")
# The server should have seen a request
@@ -627,6 +689,7 @@ class EResolver(tservers.TResolver):
class TestTransparentResolveError(tservers.TransparentProxTest):
resolver = EResolver
def test_resolve_error(self):
assert self.pathod("304").status_code == 502
@@ -640,6 +703,7 @@ class MasterIncomplete(tservers.TestMaster):
class TestIncompleteResponse(tservers.HTTPProxTest):
masterclass = MasterIncomplete
def test_incomplete(self):
assert self.pathod("200").status_code == 502
@@ -656,10 +720,16 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
ssl = False
def test_order(self):
self.proxy.tmaster.replacehooks.add("~q", "foo", "bar") # replace in request
self.proxy.tmaster.replacehooks.add(
"~q",
"foo",
"bar") # replace in request
self.chain[0].tmaster.replacehooks.add("~q", "bar", "baz")
self.chain[1].tmaster.replacehooks.add("~q", "foo", "oh noes!")
self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY") # replace in response
self.chain[0].tmaster.replacehooks.add(
"~s",
"baz",
"ORLY") # replace in response
p = self.pathoc()
req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
@@ -667,7 +737,10 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
assert req.status_code == 418
class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin):
class TestUpstreamProxySSL(
tservers.HTTPUpstreamProxTest,
CommonMixin,
TcpMixin):
ssl = True
def _host_pattern_on(self, attr):
@@ -677,7 +750,10 @@ class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin)
assert not hasattr(self, "_ignore_%s_backup" % attr)
backup = []
for proxy in self.chain:
old_matcher = getattr(proxy.tmaster.server.config, "check_%s" % attr)
old_matcher = getattr(
proxy.tmaster.server.config,
"check_%s" %
attr)
backup.append(old_matcher)
setattr(
proxy.tmaster.server.config,
@@ -721,11 +797,14 @@ class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin)
assert req.content == "content"
assert req.status_code == 418
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT from pathoc to chain[0],
# request from pathoc to chain[0]
assert self.chain[0].tmaster.state.flow_count() == 2 # CONNECT from proxy to chain[1],
# request from proxy to chain[1]
assert self.chain[1].tmaster.state.flow_count() == 1 # request from chain[0] (regular proxy doesn't store CONNECTs)
# CONNECT from pathoc to chain[0],
assert self.proxy.tmaster.state.flow_count() == 2
# request from pathoc to chain[0]
# CONNECT from proxy to chain[1],
assert self.chain[0].tmaster.state.flow_count() == 2
# request from proxy to chain[1]
# request from chain[0] (regular proxy doesn't store CONNECTs)
assert self.chain[1].tmaster.state.flow_count() == 1
def test_closing_connect_response(self):
"""
@@ -755,6 +834,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
def kill_requests(master, attr, exclude):
k = [0] # variable scope workaround: put into array
_func = getattr(master, attr)
def handler(f):
k[0] += 1
if not (k[0] in exclude):
@@ -766,9 +846,9 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
kill_requests(self.chain[1].tmaster, "handle_request",
exclude=[
# fail first request
# fail first request
2, # allow second request
])
])
kill_requests(self.chain[0].tmaster, "handle_request",
exclude=[
@@ -776,16 +856,18 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
# fail first request
3, # reCONNECT
4, # request
])
])
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
assert self.chain[0].tmaster.state.flow_count() == 4 # CONNECT, failing request,
# reCONNECT, request
assert self.chain[1].tmaster.state.flow_count() == 2 # failing request, request
# (doesn't store (repeated) CONNECTs from chain[0]
# as it is a regular proxy)
# CONNECT, failing request,
assert self.chain[0].tmaster.state.flow_count() == 4
# reCONNECT, request
# failing request, request
assert self.chain[1].tmaster.state.flow_count() == 2
# (doesn't store (repeated) CONNECTs from chain[0]
# as it is a regular proxy)
assert req.content == "content"
assert req.status_code == 418
@@ -795,18 +877,26 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert self.proxy.tmaster.state.flows[0].request.form_in == "authority"
assert self.proxy.tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[0].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[2].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[3].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[
0].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[
1].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[
2].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[
3].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[0].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[
0].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[
1].request.form_in == "relative"
req = p.request("get:'/p/418:b\"content2\"'")
assert req.status_code == 502
assert self.proxy.tmaster.state.flow_count() == 3 # + new request
assert self.chain[0].tmaster.state.flow_count() == 6 # + new request, repeated CONNECT from chain[1]
# (both terminated)
assert self.chain[1].tmaster.state.flow_count() == 2 # nothing happened here
# + new request, repeated CONNECT from chain[1]
assert self.chain[0].tmaster.state.flow_count() == 6
# (both terminated)
# nothing happened here
assert self.chain[1].tmaster.state.flow_count() == 2

View File

@@ -48,9 +48,11 @@ def test_urldecode():
s = "one=two&three=four"
assert len(utils.urldecode(s)) == 2
def test_multipartdecode():
boundary = 'somefancyboundary'
headers = odict.ODict([('content-type', ('multipart/form-data; boundary=%s' % boundary))])
headers = odict.ODict(
[('content-type', ('multipart/form-data; boundary=%s' % boundary))])
content = "--{0}\n" \
"Content-Disposition: form-data; name=\"field1\"\n\n" \
"value1\n" \
@@ -65,6 +67,7 @@ def test_multipartdecode():
assert form[0] == ('field1', 'value1')
assert form[1] == ('field2', 'value2')
def test_pretty_duration():
assert utils.pretty_duration(0.00001) == "0ms"
assert utils.pretty_duration(0.0001) == "0ms"
@@ -79,10 +82,13 @@ def test_pretty_duration():
assert utils.pretty_duration(1.123) == "1.12s"
assert utils.pretty_duration(0.123) == "123ms"
def test_LRUCache():
cache = utils.LRUCache(2)
class Foo:
ran = False
def gen(self, x):
self.ran = True
return x

View File

@@ -1,5 +1,6 @@
from __future__ import print_function
import requests, time
import requests
import time
n = 100
url = "http://192.168.1.1/"
@@ -7,18 +8,17 @@ proxy = "http://192.168.1.115:8080/"
start = time.time()
for _ in range(n):
requests.get(url, allow_redirects=False, proxies=dict(http=proxy))
print(".", end="")
t_mitmproxy = time.time()-start
requests.get(url, allow_redirects=False, proxies=dict(http=proxy))
print(".", end="")
t_mitmproxy = time.time() - start
print("\r\nTotal time with mitmproxy: {}".format(t_mitmproxy))
start = time.time()
for _ in range(n):
requests.get(url, allow_redirects=False)
print(".", end="")
t_without = time.time()-start
requests.get(url, allow_redirects=False)
print(".", end="")
t_without = time.time() - start
print("\r\nTotal time without mitmproxy: {}".format(t_without))
print("\r\nTotal time without mitmproxy: {}".format(t_without))

View File

@@ -1,7 +1,10 @@
#!/usr/bin/env python
import sys
sys.path.insert(0, "../..")
import socket, tempfile, ssl, subprocess
import socket
import tempfile
import ssl
import subprocess
addr = socket.gethostbyname(sys.argv[1])
print ssl.get_server_certificate((addr, 443))

View File

@@ -2,12 +2,14 @@ import SocketServer
from threading import Thread
from time import sleep
class service(SocketServer.BaseRequestHandler):
def handle(self):
data = 'dummy'
print "Client connected with ", self.client_address
while True:
self.request.send("HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 7\r\n\r\ncontent")
self.request.send(
"HTTP/1.1 200 OK\r\nConnection: close\r\nContent-Length: 7\r\n\r\ncontent")
data = self.request.recv(1024)
if not len(data):
print "Connection closed by remote: ", self.client_address
@@ -17,5 +19,5 @@ class service(SocketServer.BaseRequestHandler):
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
pass
server = ThreadedTCPServer(('',1520), service)
server = ThreadedTCPServer(('', 1520), service)
server.serve_forever()

View File

@@ -2,7 +2,7 @@
# yappi (https://code.google.com/p/yappi/)
#
# Requirements:
# - Apache Bench "ab" binary
# - Apache Bench "ab" binary
# - pip install click yappi
from libmproxy.main import mitmdump
@@ -13,14 +13,17 @@ import time
import yappi
import click
class ApacheBenchThread(Thread):
def __init__(self, concurrency):
self.concurrency = concurrency
super(ApacheBenchThread, self).__init__()
def run(self):
time.sleep(2)
system("ab -n 1024 -c {} -X 127.0.0.1:8080 http://example.com/".format(self.concurrency))
class ApacheBenchThread(Thread):
def __init__(self, concurrency):
self.concurrency = concurrency
super(ApacheBenchThread, self).__init__()
def run(self):
time.sleep(2)
system(
"ab -n 1024 -c {} -X 127.0.0.1:8080 http://example.com/".format(self.concurrency))
@click.command()
@click.option('--profiler', default="yappi", type=click.Choice(['yappi']))
@@ -28,24 +31,24 @@ class ApacheBenchThread(Thread):
@click.option('--concurrency', default=1, type=click.INT)
def main(profiler, clock_type, concurrency):
outfile = "callgrind.mitmdump-{}-c{}".format(clock_type, concurrency)
a = ApacheBenchThread(concurrency)
a.start()
outfile = "callgrind.mitmdump-{}-c{}".format(clock_type, concurrency)
a = ApacheBenchThread(concurrency)
a.start()
if profiler == "yappi":
yappi.set_clock_type(clock_type)
yappi.start(builtins=True)
if profiler == "yappi":
yappi.set_clock_type(clock_type)
yappi.start(builtins=True)
print("Start mitmdump...")
mitmdump(["-k","-q","-S", "1024example"])
print("mitmdump stopped.")
print("Save profile information...")
if profiler == "yappi":
yappi.stop()
stats = yappi.get_func_stats()
stats.save(outfile, type='callgrind')
print("Done.")
print("Start mitmdump...")
mitmdump(["-k", "-q", "-S", "1024example"])
print("mitmdump stopped.")
print("Save profile information...")
if profiler == "yappi":
yappi.stop()
stats = yappi.get_func_stats()
stats.save(outfile, type='callgrind')
print("Done.")
if __name__ == '__main__':
main()
main()

View File

@@ -1,23 +1,28 @@
import os.path
import threading, Queue
import shutil, tempfile
import threading
import Queue
import shutil
import tempfile
import flask
import mock
from libmproxy.proxy.config import ProxyConfig
from libmproxy.proxy.server import ProxyServer
from libmproxy.proxy.primitives import TransparentProxyMode
import libpathod.test, libpathod.pathoc
import libpathod.test
import libpathod.pathoc
from libmproxy import flow, controller
from libmproxy.cmdline import APP_HOST, APP_PORT
import tutils
testapp = flask.Flask(__name__)
@testapp.route("/")
def hello():
return "testapp"
@testapp.route("/error")
def error():
raise ValueError("An exception...")
@@ -57,7 +62,8 @@ class ProxyThread(threading.Thread):
def __init__(self, tmaster):
threading.Thread.__init__(self)
self.tmaster = tmaster
self.name = "ProxyThread (%s:%s)" % (tmaster.server.address.host, tmaster.server.address.port)
self.name = "ProxyThread (%s:%s)" % (
tmaster.server.address.host, tmaster.server.address.port)
controller.should_exit = False
@property
@@ -87,8 +93,12 @@ class ProxTestBase(object):
@classmethod
def setupAll(cls):
cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
cls.server = libpathod.test.Daemon(
ssl=cls.ssl,
ssloptions=cls.ssloptions)
cls.server2 = libpathod.test.Daemon(
ssl=cls.ssl,
ssloptions=cls.ssloptions)
cls.config = ProxyConfig(**cls.get_proxy_config())
@@ -151,9 +161,9 @@ class HTTPProxTest(ProxTestBase):
p = self.pathoc(sni=sni)
spec = spec.encode("string_escape")
if self.ssl:
q = "get:'/p/%s'"%spec
q = "get:'/p/%s'" % spec
else:
q = "get:'%s/p/%s'"%(self.server.urlbase, spec)
q = "get:'%s/p/%s'" % (self.server.urlbase, spec)
return p.request(q)
def app(self, page):
@@ -162,10 +172,10 @@ class HTTPProxTest(ProxTestBase):
("127.0.0.1", self.proxy.port), True, fp=None
)
p.connect((APP_HOST, APP_PORT))
return p.request("get:'%s'"%page)
return p.request("get:'%s'" % page)
else:
p = self.pathoc()
return p.request("get:'http://%s%s'"%(APP_HOST, page))
return p.request("get:'http://%s%s'" % (APP_HOST, page))
class TResolver:
@@ -188,7 +198,10 @@ class TransparentProxTest(ProxTestBase):
ports = [cls.server.port, cls.server2.port]
else:
ports = []
cls.config.mode = TransparentProxyMode(cls.resolver(cls.server.port), ports)
cls.config.mode = TransparentProxyMode(
cls.resolver(
cls.server.port),
ports)
@classmethod
def get_proxy_config(cls):
@@ -202,10 +215,10 @@ class TransparentProxTest(ProxTestBase):
"""
if self.ssl:
p = self.pathoc(sni=sni)
q = "get:'/p/%s'"%spec
q = "get:'/p/%s'" % spec
else:
p = self.pathoc()
q = "get:'/p/%s'"%spec
q = "get:'/p/%s'" % spec
return p.request(q)
def pathoc(self, sni=None):
@@ -221,6 +234,7 @@ class TransparentProxTest(ProxTestBase):
class ReverseProxTest(ProxTestBase):
ssl = None
@classmethod
def get_proxy_config(cls):
d = ProxTestBase.get_proxy_config()
@@ -249,10 +263,10 @@ class ReverseProxTest(ProxTestBase):
"""
if self.ssl:
p = self.pathoc(sni=sni)
q = "get:'/p/%s'"%spec
q = "get:'/p/%s'" % spec
else:
p = self.pathoc()
q = "get:'/p/%s'"%spec
q = "get:'/p/%s'" % spec
return p.request(q)
@@ -278,8 +292,8 @@ class ChainProxTest(ProxTestBase):
cls.chain.insert(0, proxy)
# Patch the orginal proxy to upstream mode
cls.config = cls.proxy.tmaster.config = cls.proxy.tmaster.server.config = ProxyConfig(**cls.get_proxy_config())
cls.config = cls.proxy.tmaster.config = cls.proxy.tmaster.server.config = ProxyConfig(
**cls.get_proxy_config())
@classmethod
def teardownAll(cls):
@@ -303,5 +317,6 @@ class ChainProxTest(ProxTestBase):
)
return d
class HTTPUpstreamProxTest(ChainProxTest, HTTPProxTest):
pass

View File

@@ -1,5 +1,8 @@
from cStringIO import StringIO
import os, shutil, tempfile, argparse
import os
import shutil
import tempfile
import argparse
from contextlib import contextmanager
import sys
from libmproxy import flow, utils, controller
@@ -14,8 +17,11 @@ from nose.plugins.skip import SkipTest
from mock import Mock
from time import time
def _SkipWindows():
raise SkipTest("Skipped on Windows.")
def SkipWindows(fn):
if os.name == "nt":
return _SkipWindows
@@ -83,10 +89,23 @@ def treq(content="content", scheme="http", host="address", port=22):
"""
headers = odict.ODictCaseless()
headers["header"] = ["qvalue"]
req = http.HTTPRequest("relative", "GET", scheme, host, port, "/path", (1, 1), headers, content,
None, None, None)
req = http.HTTPRequest(
"relative",
"GET",
scheme,
host,
port,
"/path",
(1,
1),
headers,
content,
None,
None,
None)
return req
def treq_absolute(content="content"):
"""
@return: libmproxy.protocol.http.HTTPRequest
@@ -107,7 +126,15 @@ def tresp(content="message"):
headers = odict.ODictCaseless()
headers["header_response"] = ["svalue"]
resp = http.HTTPResponse((1, 1), 200, "OK", headers, content, time(), time())
resp = http.HTTPResponse(
(1,
1),
200,
"OK",
headers,
content,
time(),
time())
return resp
@@ -118,10 +145,11 @@ def terr(content="error"):
err = Error(content)
return err
def tflowview(request_contents=None):
m = Mock()
cs = ConsoleState()
if request_contents == None:
if request_contents is None:
flow = tflow()
else:
flow = tflow(req=treq(request_contents))
@@ -129,9 +157,11 @@ def tflowview(request_contents=None):
fv = FlowView(m, cs, flow)
return fv
def get_body_line(last_displayed_body, line_nb):
return last_displayed_body.contents()[line_nb + 2]
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()
@@ -149,6 +179,7 @@ class MockParser(argparse.ArgumentParser):
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
def error(self, message):
raise Exception(message)
@@ -169,14 +200,14 @@ def raises(exc, obj, *args, **kwargs):
:kwargs Arguments to be passed to the callable.
"""
try:
apply(obj, args, kwargs)
except Exception, v:
obj(*args, **kwargs)
except Exception as v:
if isinstance(exc, basestring):
if exc.lower() in str(v).lower():
return
else:
raise AssertionError(
"Expected %s, but caught %s"%(
"Expected %s, but caught %s" % (
repr(str(exc)), v
)
)
@@ -185,7 +216,7 @@ def raises(exc, obj, *args, **kwargs):
return
else:
raise AssertionError(
"Expected %s, but caught %s %s"%(
"Expected %s, but caught %s %s" % (
exc.__name__, v.__class__.__name__, str(v)
)
)