Merge pull request #1701 from cortesi/addontest2

Test suite cleanups
This commit is contained in:
Aldo Cortesi
2016-11-02 11:15:27 +13:00
committed by GitHub
45 changed files with 294 additions and 273 deletions

View File

@@ -1,21 +1,24 @@
from mitmproxy import exceptions
from mitmproxy import ctx
from mitmproxy import io
from mitmproxy import flow
import typing
class ClientPlayback:
def __init__(self):
self.flows = None
self.current = None
self.keepserving = None
self.current_thread = None
self.keepserving = False
self.has_replayed = False
def count(self):
def count(self) -> int:
if self.flows:
return len(self.flows)
return 0
def load(self, flows):
def load(self, flows: typing.Sequence[flow.Flow]):
self.flows = flows
def configure(self, options, updated):
@@ -32,11 +35,11 @@ class ClientPlayback:
self.keepserving = options.keepserving
def tick(self):
if self.current and not self.current.is_alive():
self.current = None
if self.flows and not self.current:
self.current = ctx.master.replay_request(self.flows.pop(0))
if self.current_thread and not self.current_thread.is_alive():
self.current_thread = None
if self.flows and not self.current_thread:
self.current_thread = ctx.master.replay_request(self.flows.pop(0))
self.has_replayed = True
if self.has_replayed:
if not self.flows and not self.current and not self.keepserving:
if not self.flows and not self.current_thread and not self.keepserving:
ctx.master.shutdown()

View File

@@ -7,7 +7,7 @@ from mitmproxy import stateobject
from mitmproxy import connections
from mitmproxy import version
from typing import Optional, Dict # noqa
import typing # noqa
class Error(stateobject.StateObject):
@@ -26,7 +26,7 @@ class Error(stateobject.StateObject):
timestamp: Seconds since the epoch
"""
def __init__(self, msg, timestamp=None):
def __init__(self, msg: str, timestamp=None) -> None:
"""
@type msg: str
@type timestamp: float
@@ -70,20 +70,20 @@ class Flow(stateobject.StateObject):
type: str,
client_conn: connections.ClientConnection,
server_conn: connections.ServerConnection,
live=None
):
live: bool=None
) -> None:
self.type = type
self.id = str(uuid.uuid4())
self.client_conn = client_conn
self.server_conn = server_conn
self.live = live
self.error = None # type: Optional[Error]
self.error = None # type: typing.Optional[Error]
self.intercepted = False # type: bool
self._backup = None # type: Optional[Flow]
self.reply = None # type: Optional[controller.Reply]
self._backup = None # type: typing.Optional[Flow]
self.reply = None # type: typing.Optional[controller.Reply]
self.marked = False # type: bool
self.metadata = dict() # type: Dict[str, str]
self.metadata = dict() # type: typing.Dict[str, str]
_stateobject_attributes = dict(
id=str,

View File

@@ -19,6 +19,14 @@ def treader(bytes):
return tcp.Reader(fp)
@contextmanager
def chdir(dir):
orig_dir = os.getcwd()
os.chdir(dir)
yield
os.chdir(orig_dir)
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()
@@ -89,7 +97,7 @@ class RaisesContext:
return True
test_data = data.Data(__name__).push("../../test/mitmproxy/net")
test_data = data.Data(__name__).push("../../test/")
def treq(**kwargs):

View File

@@ -6,6 +6,7 @@ import inspect
class Data:
def __init__(self, name):
self.name = name
m = importlib.import_module(name)
dirname = os.path.dirname(inspect.getsourcefile(m))
self.dirname = os.path.abspath(dirname)
@@ -14,8 +15,10 @@ class Data:
"""
Change the data object to a path relative to the module.
"""
self.dirname = os.path.join(self.dirname, subpath)
return self
dirname = os.path.join(self.dirname, subpath)
ret = Data(self.name)
ret.dirname = dirname
return ret
def path(self, path):
"""

View File

@@ -216,7 +216,7 @@ class TokValueFile(Token):
os.path.abspath(os.path.join(settings.staticdir, s))
)
uf = settings.unconstrained_file_access
if not uf and not s.startswith(settings.staticdir):
if not uf and not s.startswith(os.path.normpath(settings.staticdir)):
raise exceptions.FileAccessDenied(
"File access outside of configured directory"
)

View File

@@ -10,7 +10,7 @@ testpaths = test
addopts = --capture=no --color=yes
[coverage:run]
branch = True
branch = False
omit = *contrib*, *tnetstring*, *platform*, *main.py
[coverage:report]

View File

@@ -1,38 +1,65 @@
from mitmproxy.test import tflow
import os
import mock
from mitmproxy.addons import clientplayback
from mitmproxy import options
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy import io
from mitmproxy import exceptions
from .. import mastertest
from mitmproxy.addons import clientplayback
from mitmproxy.test import taddons
def tdump(path, flows):
w = io.FlowWriter(open(path, "wb"))
for i in flows:
w.add(i)
class MockThread():
def is_alive(self):
return False
class TestClientPlayback:
def test_playback(self):
cp = clientplayback.ClientPlayback()
cp.configure(options.Options(), [])
assert cp.count() == 0
f = tflow.tflow(resp=True)
cp.load([f])
assert cp.count() == 1
RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread"
with mock.patch(RP) as rp:
assert not cp.current
with mastertest.mockctx():
with taddons.context():
assert cp.count() == 0
f = tflow.tflow(resp=True)
cp.load([f])
assert cp.count() == 1
RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread"
with mock.patch(RP) as rp:
assert not cp.current_thread
cp.tick()
rp.assert_called()
assert cp.current
rp.assert_called()
assert cp.current_thread
cp.keepserving = False
cp.flows = None
cp.current = None
with mock.patch("mitmproxy.master.Master.shutdown") as sd:
with mastertest.mockctx():
cp.keepserving = False
cp.flows = None
cp.current_thread = None
with mock.patch("mitmproxy.master.Master.shutdown") as sd:
cp.tick()
sd.assert_called()
sd.assert_called()
cp.current_thread = MockThread()
with mock.patch("mitmproxy.master.Master.shutdown") as sd:
cp.tick()
assert cp.current_thread is None
def test_configure(self):
cp = clientplayback.ClientPlayback()
cp.configure(
options.Options(), []
)
with taddons.context() as tctx:
with tutils.tmpdir() as td:
path = os.path.join(td, "flows")
tdump(path, [tflow.tflow()])
tctx.configure(cp, client_replay=[path])
tctx.configure(cp, client_replay=[])
tctx.configure(cp)
tutils.raises(
exceptions.OptionsError,
tctx.configure,
cp,
client_replay=["nonexistent"]
)

View File

@@ -1,6 +1,7 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import tutils, mastertest
from .. import mastertest
import os.path

View File

@@ -1,6 +1,7 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import tutils, mastertest, tservers
from .. import mastertest, tservers
from mitmproxy.addons import replace
from mitmproxy import master
from mitmproxy import options

View File

@@ -4,6 +4,7 @@ import sys
import time
from mitmproxy.test import tflow
from mitmproxy.test import tutils
import re
from mitmproxy import exceptions
from mitmproxy import options
@@ -11,7 +12,8 @@ from mitmproxy import proxy
from mitmproxy.addons import script
from mitmproxy import master
from .. import tutils, mastertest
from .. import mastertest
from .. import tutils as ttutils
class TestParseCommand:
@@ -32,25 +34,31 @@ class TestParseCommand:
def test_parse_args(self):
with tutils.chdir(tutils.test_data.dirname):
assert script.parse_command("data/addonscripts/recorder.py") == ("data/addonscripts/recorder.py", [])
assert script.parse_command("data/addonscripts/recorder.py foo bar") == ("data/addonscripts/recorder.py", ["foo", "bar"])
assert script.parse_command("data/addonscripts/recorder.py 'foo bar'") == ("data/addonscripts/recorder.py", ["foo bar"])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py"
) == ("mitmproxy/data/addonscripts/recorder.py", [])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py foo bar"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo", "bar"])
assert script.parse_command(
"mitmproxy/data/addonscripts/recorder.py 'foo bar'"
) == ("mitmproxy/data/addonscripts/recorder.py", ["foo bar"])
@tutils.skip_not_windows
@ttutils.skip_not_windows
def test_parse_windows(self):
with tutils.chdir(tutils.test_data.dirname):
assert script.parse_command(
"data\\addonscripts\\recorder.py"
) == ("data\\addonscripts\\recorder.py", [])
"mitmproxy/data\\addonscripts\\recorder.py"
) == ("mitmproxy/data\\addonscripts\\recorder.py", [])
assert script.parse_command(
"data\\addonscripts\\recorder.py 'foo \\ bar'"
) == ("data\\addonscripts\\recorder.py", ['foo \\ bar'])
"mitmproxy/data\\addonscripts\\recorder.py 'foo \\ bar'"
) == ("mitmproxy/data\\addonscripts\\recorder.py", ['foo \\ bar'])
def test_load_script():
ns = script.load_script(
tutils.test_data.path(
"data/addonscripts/recorder.py"
"mitmproxy/data/addonscripts/recorder.py"
), []
)
assert ns.start
@@ -62,7 +70,7 @@ class TestScript(mastertest.MasterTest):
m = master.Master(o, proxy.DummyServer())
sc = script.Script(
tutils.test_data.path(
"data/addonscripts/recorder.py"
"mitmproxy/data/addonscripts/recorder.py"
)
)
m.addons.add(sc)
@@ -100,7 +108,7 @@ class TestScript(mastertest.MasterTest):
o = options.Options()
m = mastertest.RecordingMaster(o, proxy.DummyServer())
sc = script.Script(
tutils.test_data.path("data/addonscripts/error.py")
tutils.test_data.path("mitmproxy/data/addonscripts/error.py")
)
m.addons.add(sc)
f = tflow.tflow(resp=True)
@@ -116,7 +124,7 @@ class TestScript(mastertest.MasterTest):
m = master.Master(o, proxy.DummyServer())
sc = script.Script(
tutils.test_data.path(
"data/addonscripts/addon.py"
"mitmproxy/data/addonscripts/addon.py"
)
)
m.addons.add(sc)
@@ -154,7 +162,7 @@ class TestScriptLoader(mastertest.MasterTest):
with m.handlecontext():
sc = sl.run_once(
tutils.test_data.path(
"data/addonscripts/recorder.py"
"mitmproxy/data/addonscripts/recorder.py"
), [f]
)
evts = [i[1] for i in sc.ns.call_log]
@@ -176,7 +184,7 @@ class TestScriptLoader(mastertest.MasterTest):
assert len(m.addons) == 1
o.update(
scripts = [
tutils.test_data.path("data/addonscripts/recorder.py")
tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
]
)
assert len(m.addons) == 2
@@ -190,7 +198,7 @@ class TestScriptLoader(mastertest.MasterTest):
tutils.raises(exceptions.OptionsError, m.addons.add, o, sc)
def test_order(self):
rec = tutils.test_data.path("data/addonscripts/recorder.py")
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")
o = options.Options(
scripts = [

View File

@@ -1,6 +1,7 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import tutils, mastertest
from .. import mastertest
from mitmproxy.addons import setheaders
from mitmproxy import options

View File

@@ -1,6 +1,7 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import tutils, mastertest
from .. import mastertest
from mitmproxy.addons import stickycookie
from mitmproxy import master
from mitmproxy import options

View File

@@ -1,11 +1,11 @@
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.addons import view
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy.test import taddons
from .. import tutils
class Options(options.Options):
def __init__(

View File

@@ -1,18 +1,17 @@
import os
from os.path import normpath
from mitmproxy.tools.console import pathedit
from mitmproxy.test import tutils
from mock import patch
from .. import tutils
class TestPathCompleter:
def test_lookup_construction(self):
c = pathedit._PathCompleter()
cd = tutils.test_data.path("completion")
cd = tutils.test_data.path("mitmproxy/completion")
ca = os.path.join(cd, "a")
assert c.complete(ca).endswith(normpath("/completion/aaa"))
assert c.complete(ca).endswith(normpath("/completion/aab"))
@@ -60,7 +59,7 @@ class TestPathEdit:
with patch('urwid.widget.Edit.get_edit_text') as get_text, \
patch('urwid.widget.Edit.set_edit_text') as set_text:
cd = tutils.test_data.path("completion")
cd = tutils.test_data.path("mitmproxy/completion")
get_text.return_value = os.path.join(cd, "a")
# Pressing tab should set completed path

View File

@@ -29,10 +29,10 @@ class TestPassManHtpasswd:
tutils.raises(
"malformed htpasswd file",
authentication.PassManHtpasswd,
tutils.test_data.path("data/server.crt"))
tutils.test_data.path("mitmproxy/net/data/server.crt"))
def test_simple(self):
pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
pm = authentication.PassManHtpasswd(tutils.test_data.path("mitmproxy/net/data/htpasswd"))
vals = ("basic", "test", "test")
authentication.assemble_http_basic_auth(*vals)
@@ -118,5 +118,5 @@ class TestAuthAction:
def test_httppasswd(self):
m = Bunch()
aa = authentication.HtpasswdAuthAction(None, "authenticator")
aa(None, m, tutils.test_data.path("data/htpasswd"), None)
aa(None, m, tutils.test_data.path("mitmproxy/net/data/htpasswd"), None)
assert m.authenticator

View File

@@ -164,7 +164,7 @@ class TestServerSSL(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cipher_list="AES256-SHA",
chain_file=tutils.test_data.path("data/server.crt")
chain_file=tutils.test_data.path("mitmproxy/net/data/server.crt")
)
def test_echo(self):
@@ -203,8 +203,8 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/self-signed.crt"),
key=tutils.test_data.path("data/verificationcerts/self-signed.key")
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/self-signed.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/self-signed.key")
)
def test_mode_default_should_pass(self):
@@ -241,7 +241,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
c.convert_to_ssl(
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
)
assert c.ssl_verification_error
@@ -255,8 +255,8 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key")
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.key")
)
def test_should_fail_without_sni(self):
@@ -265,7 +265,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
with tutils.raises(exceptions.TlsException):
c.convert_to_ssl(
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
)
def test_should_fail(self):
@@ -275,7 +275,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
c.convert_to_ssl(
sni="mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
)
assert c.ssl_verification_error
@@ -284,8 +284,8 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key")
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.key")
)
def test_mode_strict_w_pemfile_should_pass(self):
@@ -294,7 +294,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
c.convert_to_ssl(
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
)
assert c.ssl_verification_error is None
@@ -310,7 +310,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
c.convert_to_ssl(
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_path=tutils.test_data.path("data/verificationcerts/")
ca_path=tutils.test_data.path("mitmproxy/net/data/verificationcerts/")
)
assert c.ssl_verification_error is None
@@ -342,7 +342,7 @@ class TestSSLClientCert(tservers.ServerTestBase):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
cert=tutils.test_data.path("data/clientcert/client.pem"))
cert=tutils.test_data.path("mitmproxy/net/data/clientcert/client.pem"))
assert c.rfile.readline().strip() == b"1"
def test_clientcert_err(self):
@@ -351,7 +351,7 @@ class TestSSLClientCert(tservers.ServerTestBase):
tutils.raises(
exceptions.TlsException,
c.convert_to_ssl,
cert=tutils.test_data.path("data/clientcert/make")
cert=tutils.test_data.path("mitmproxy/net/data/clientcert/make")
)
@@ -570,7 +570,7 @@ class TestDHParams(tservers.ServerTestBase):
handler = HangHandler
ssl = dict(
dhparams=certs.CertStore.load_dhparam(
tutils.test_data.path("data/dhparam.pem"),
tutils.test_data.path("mitmproxy/net/data/dhparam.pem"),
),
cipher_list="DHE-RSA-AES256-SHA"
)

View File

@@ -50,10 +50,10 @@ class _TServer(tcp.TCPServer):
if self.ssl is not None:
cert = self.ssl.get(
"cert",
tutils.test_data.path("data/server.crt"))
tutils.test_data.path("mitmproxy/net/data/server.crt"))
raw_key = self.ssl.get(
"key",
tutils.test_data.path("data/server.key"))
tutils.test_data.path("mitmproxy/net/data/server.key"))
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM,
open(raw_key, "rb").read())

View File

@@ -1,12 +1,16 @@
from mitmproxy.test import tflow
from test.mitmproxy import tutils, mastertest
from mitmproxy.test import tutils
from mitmproxy import controller
from mitmproxy.addons import script
from mitmproxy import options
from mitmproxy import proxy
from mitmproxy import master
import time
from test.mitmproxy import mastertest
from test.mitmproxy import tutils as ttutils
class Thing:
def __init__(self):
@@ -15,12 +19,12 @@ class Thing:
class TestConcurrent(mastertest.MasterTest):
@tutils.skip_appveyor
@ttutils.skip_appveyor
def test_concurrent(self):
m = master.Master(options.Options(), proxy.DummyServer())
sc = script.Script(
tutils.test_data.path(
"data/addonscripts/concurrent_decorator.py"
"mitmproxy/data/addonscripts/concurrent_decorator.py"
)
)
m.addons.add(sc)
@@ -37,7 +41,7 @@ class TestConcurrent(mastertest.MasterTest):
m = mastertest.RecordingMaster(options.Options(), proxy.DummyServer())
sc = script.Script(
tutils.test_data.path(
"data/addonscripts/concurrent_decorator_err.py"
"mitmproxy/data/addonscripts/concurrent_decorator_err.py"
)
)
with m.handlecontext():

View File

@@ -143,13 +143,13 @@ class TestDummyCert:
class TestSSLCert:
def test_simple(self):
with open(tutils.test_data.path("data/text_cert"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f:
d = f.read()
c1 = certs.SSLCert.from_pem(d)
assert c1.cn == b"google.com"
assert len(c1.altnames) == 436
with open(tutils.test_data.path("data/text_cert_2"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/net/data/text_cert_2"), "rb") as f:
d = f.read()
c2 = certs.SSLCert.from_pem(d)
assert c2.cn == b"www.inode.co.nz"
@@ -168,14 +168,14 @@ class TestSSLCert:
assert c1 != c2
def test_err_broken_sans(self):
with open(tutils.test_data.path("data/text_cert_weird1"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/net/data/text_cert_weird1"), "rb") as f:
d = f.read()
c = certs.SSLCert.from_pem(d)
# This breaks unless we ignore a decoding error.
assert c.altnames is not None
def test_der(self):
with open(tutils.test_data.path("data/dercert"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/net/data/dercert"), "rb") as f:
d = f.read()
s = certs.SSLCert.from_der(d)
assert s.cn

View File

@@ -1,6 +1,6 @@
import argparse
from mitmproxy.tools import cmdline
from . import tutils
from mitmproxy.test import tutils
def test_parse_replace_hook():
@@ -91,7 +91,7 @@ def test_common():
opts
)
p = tutils.test_data.path("data/replace")
p = tutils.test_data.path("mitmproxy/data/replace")
opts.replace_file = [("/foo/bar/%s" % p)]
v = cmdline.get_common_options(opts)["replacements"]
assert len(v) == 1

View File

@@ -5,8 +5,7 @@ from mitmproxy.net.http import url
from mitmproxy.types import multidict
import mitmproxy.contentviews as cv
from . import tutils
import mitmproxy.test.tutils
from mitmproxy.test import tutils
try:
import pyamf
@@ -117,7 +116,7 @@ class TestContentView:
def test_view_css(self):
v = cv.ViewCSS()
with open(tutils.test_data.path('data/1.css'), 'r') as fp:
with open(tutils.test_data.path('mitmproxy/data/1.css'), 'r') as fp:
fixture_1 = fp.read()
result = v('a')
@@ -140,16 +139,16 @@ class TestContentView:
def test_view_image(self):
v = cv.ViewImage()
p = tutils.test_data.path("data/image.png")
p = tutils.test_data.path("mitmproxy/data/image.png")
assert v(open(p, "rb").read())
p = tutils.test_data.path("data/image.gif")
p = tutils.test_data.path("mitmproxy/data/image.gif")
assert v(open(p, "rb").read())
p = tutils.test_data.path("data/image-err1.jpg")
p = tutils.test_data.path("mitmproxy/data/image-err1.jpg")
assert v(open(p, "rb").read())
p = tutils.test_data.path("data/image.ico")
p = tutils.test_data.path("mitmproxy/data/image.ico")
assert v(open(p, "rb").read())
assert not v(b"flibble")
@@ -232,7 +231,7 @@ def test_get_content_view():
def test_get_message_content_view():
r = mitmproxy.test.tutils.treq()
r = tutils.treq()
desc, lines, err = cv.get_message_content_view("raw", r)
assert desc == "Raw"
@@ -253,22 +252,22 @@ if pyamf:
def test_view_amf_request():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf01")
p = tutils.test_data.path("mitmproxy/data/amf01")
assert v(open(p, "rb").read())
p = tutils.test_data.path("data/amf02")
p = tutils.test_data.path("mitmproxy/data/amf02")
assert v(open(p, "rb").read())
def test_view_amf_response():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf03")
p = tutils.test_data.path("mitmproxy/data/amf03")
assert v(open(p, "rb").read())
if cv.ViewProtobuf.is_available():
def test_view_protobuf_request():
v = cv.ViewProtobuf()
p = tutils.test_data.path("data/protobuf01")
p = tutils.test_data.path("mitmproxy/data/protobuf01")
content_type, output = v(open(p, "rb").read())
assert content_type == "Protobuf"
assert output.next()[0][1] == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'

View File

@@ -1,4 +1,3 @@
from test.mitmproxy import tutils
from threading import Thread, Event
from mock import Mock
@@ -9,7 +8,7 @@ import queue
from mitmproxy.exceptions import Kill, ControlException
from mitmproxy import proxy
from mitmproxy import master
from mitmproxy.test.tutils import raises
from mitmproxy.test import tutils
class TMsg:
@@ -81,7 +80,7 @@ class TestChannel:
done = Event()
done.set()
channel = controller.Channel(q, done)
with raises(Kill):
with tutils.raises(Kill):
channel.ask("test", Mock(name="test_ask_shutdown"))

View File

@@ -6,7 +6,8 @@ import mitmproxy.io
from mitmproxy.tools import dump
from mitmproxy import exceptions
from mitmproxy import proxy
from . import tutils, mastertest
from mitmproxy.test import tutils
from . import mastertest
class TestDumpMaster(mastertest.MasterTest):
@@ -156,7 +157,7 @@ class TestDumpMaster(mastertest.MasterTest):
ret = self.dummy_cycle(
self.mkmaster(
None,
scripts=[tutils.test_data.path("data/scripts/all.py")],
scripts=[tutils.test_data.path("mitmproxy/data/scripts/all.py")],
verbosity=2
),
1, b"",

View File

@@ -8,16 +8,15 @@ from mitmproxy import options
from mitmproxy import contentviews
from mitmproxy import proxy
from mitmproxy.addons import script
from mitmproxy.utils import data
from mitmproxy import master
from mitmproxy.test import tutils as netutils
from mitmproxy.test import tutils
from mitmproxy.net.http import Headers
from mitmproxy.net.http import cookies
from . import tutils, mastertest
from . import mastertest
example_dir = data.Data(__name__).push("../../examples")
example_dir = tutils.test_data.push("../examples")
class ScriptError(Exception):
@@ -42,7 +41,7 @@ def tscript(cmd, args=""):
class TestScripts(mastertest.MasterTest):
def test_add_header(self):
m, _ = tscript("add_header.py")
f = tflow.tflow(resp=netutils.tresp())
f = tflow.tflow(resp=tutils.tresp())
m.response(f)
assert f.response.headers["newheader"] == "foo"
@@ -58,7 +57,7 @@ class TestScripts(mastertest.MasterTest):
tscript("iframe_injector.py")
m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe")
f = tflow.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>"))
f = tflow.tflow(resp=tutils.tresp(content=b"<html>mitmproxy</html>"))
m.response(f)
content = f.response.content
assert b'iframe' in content and b'evil_iframe' in content
@@ -67,7 +66,7 @@ class TestScripts(mastertest.MasterTest):
m, sc = tscript("modify_form.py")
form_header = Headers(content_type="application/x-www-form-urlencoded")
f = tflow.tflow(req=netutils.treq(headers=form_header))
f = tflow.tflow(req=tutils.treq(headers=form_header))
m.request(f)
assert f.request.urlencoded_form[b"mitmproxy"] == b"rocks"
@@ -78,7 +77,7 @@ class TestScripts(mastertest.MasterTest):
def test_modify_querystring(self):
m, sc = tscript("modify_querystring.py")
f = tflow.tflow(req=netutils.treq(path="/search?q=term"))
f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
m.request(f)
assert f.request.query["mitmproxy"] == "rocks"
@@ -89,13 +88,13 @@ class TestScripts(mastertest.MasterTest):
def test_arguments(self):
m, sc = tscript("arguments.py", "mitmproxy rocks")
f = tflow.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy"))
f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy"))
m.response(f)
assert f.response.content == b"I <3 rocks"
def test_redirect_requests(self):
m, sc = tscript("redirect_requests.py")
f = tflow.tflow(req=netutils.treq(host="example.org"))
f = tflow.tflow(req=tutils.treq(host="example.org"))
m.request(f)
assert f.request.host == "mitmproxy.org"
@@ -110,8 +109,8 @@ class TestHARDump:
# Create a dummy flow for testing
return tflow.tflow(
req=netutils.treq(method=b'GET', **times),
resp=netutils.tresp(content=resp_content, **times)
req=tutils.treq(method=b'GET', **times),
resp=tutils.tresp(content=resp_content, **times)
)
def test_no_file_arg(self):

View File

@@ -2,7 +2,7 @@ from mitmproxy.test import tflow
import mock
import io
import mitmproxy.test.tutils
from mitmproxy.test import tutils
from mitmproxy.net.http import Headers
import mitmproxy.io
from mitmproxy import flowfilter, options
@@ -14,7 +14,7 @@ from mitmproxy import connections
from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer
from mitmproxy import master
from . import tutils, tservers
from . import tservers
class TestHTTPFlow:

View File

@@ -1,10 +1,9 @@
from mitmproxy.test import tflow
import re
import mitmproxy.test.tutils
from mitmproxy.net.http import Headers
from mitmproxy import export # heh
from . import tutils
from mitmproxy.test import tutils
def clean_blanks(s):
@@ -21,15 +20,15 @@ def python_equals(testdata, text):
def req_get():
return mitmproxy.test.tutils.treq(method=b'GET', content=b'', path=b"/path?a=foo&a=bar&b=baz")
return tutils.treq(method=b'GET', content=b'', path=b"/path?a=foo&a=bar&b=baz")
def req_post():
return mitmproxy.test.tutils.treq(method=b'POST', headers=())
return tutils.treq(method=b'POST', headers=())
def req_patch():
return mitmproxy.test.tutils.treq(method=b'PATCH', path=b"/path?query=param")
return tutils.treq(method=b'PATCH', path=b"/path?query=param")
class TestExportCurlCommand:
@@ -52,53 +51,53 @@ class TestExportCurlCommand:
class TestExportPythonCode:
def test_get(self):
flow = tflow.tflow(req=req_get())
python_equals("data/test_flow_export/python_get.py", export.python_code(flow))
python_equals("mitmproxy/data/test_flow_export/python_get.py", export.python_code(flow))
def test_post(self):
flow = tflow.tflow(req=req_post())
python_equals("data/test_flow_export/python_post.py", export.python_code(flow))
python_equals("mitmproxy/data/test_flow_export/python_post.py", export.python_code(flow))
def test_post_json(self):
p = req_post()
p.content = b'{"name": "example", "email": "example@example.com"}'
p.headers = Headers(content_type="application/json")
flow = tflow.tflow(req=p)
python_equals("data/test_flow_export/python_post_json.py", export.python_code(flow))
python_equals("mitmproxy/data/test_flow_export/python_post_json.py", export.python_code(flow))
def test_patch(self):
flow = tflow.tflow(req=req_patch())
python_equals("data/test_flow_export/python_patch.py", export.python_code(flow))
python_equals("mitmproxy/data/test_flow_export/python_patch.py", export.python_code(flow))
class TestExportLocustCode:
def test_get(self):
flow = tflow.tflow(req=req_get())
python_equals("data/test_flow_export/locust_get.py", export.locust_code(flow))
python_equals("mitmproxy/data/test_flow_export/locust_get.py", export.locust_code(flow))
def test_post(self):
p = req_post()
p.content = b'content'
p.headers = ''
flow = tflow.tflow(req=p)
python_equals("data/test_flow_export/locust_post.py", export.locust_code(flow))
python_equals("mitmproxy/data/test_flow_export/locust_post.py", export.locust_code(flow))
def test_patch(self):
flow = tflow.tflow(req=req_patch())
python_equals("data/test_flow_export/locust_patch.py", export.locust_code(flow))
python_equals("mitmproxy/data/test_flow_export/locust_patch.py", export.locust_code(flow))
class TestExportLocustTask:
def test_get(self):
flow = tflow.tflow(req=req_get())
python_equals("data/test_flow_export/locust_task_get.py", export.locust_task(flow))
python_equals("mitmproxy/data/test_flow_export/locust_task_get.py", export.locust_task(flow))
def test_post(self):
flow = tflow.tflow(req=req_post())
python_equals("data/test_flow_export/locust_task_post.py", export.locust_task(flow))
python_equals("mitmproxy/data/test_flow_export/locust_task_post.py", export.locust_task(flow))
def test_patch(self):
flow = tflow.tflow(req=req_patch())
python_equals("data/test_flow_export/locust_task_patch.py", export.locust_task(flow))
python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py", export.locust_task(flow))
class TestURL:

View File

@@ -1,10 +1,10 @@
from mitmproxy import io
from mitmproxy import exceptions
from . import tutils
from mitmproxy.test import tutils
def test_load():
with open(tutils.test_data.path("data/dumpfile-011"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/data/dumpfile-011"), "rb") as f:
flow_reader = io.FlowReader(f)
flows = list(flow_reader.stream())
assert len(flows) == 1
@@ -12,7 +12,7 @@ def test_load():
def test_cannot_convert():
with open(tutils.test_data.path("data/dumpfile-010"), "rb") as f:
with open(tutils.test_data.path("mitmproxy/data/dumpfile-010"), "rb") as f:
flow_reader = io.FlowReader(f)
with tutils.raises(exceptions.FlowReadException):
list(flow_reader.stream())

View File

@@ -3,8 +3,7 @@ from mitmproxy.test import tflow
from mock import patch
from mitmproxy import flowfilter
from . import tutils
from . import tutils as ttutils
class TestParsing:
@@ -382,10 +381,10 @@ class TestMatchingTCPFlow:
class TestMatchingDummyFlow:
def flow(self):
return tutils.tdummyflow()
return ttutils.tdummyflow()
def err(self):
return tutils.tdummyflow(err=True)
return ttutils.tdummyflow(err=True)
def q(self, q, o):
return flowfilter.parse(q)(o)

View File

@@ -1,16 +1,16 @@
import sys
from mitmproxy.platform import pf
from . import tutils
from mitmproxy.test import tutils
class TestLookup:
def test_simple(self):
if sys.platform == "freebsd10":
p = tutils.test_data.path("data/pf02")
p = tutils.test_data.path("mitmproxy/data/pf02")
d = open(p, "rb").read()
else:
p = tutils.test_data.path("data/pf01")
p = tutils.test_data.path("mitmproxy/data/pf01")
d = open(p, "rb").read()
assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80)
tutils.raises(

View File

@@ -1,6 +1,7 @@
from mitmproxy.test import tflow
import os
import mock
import argparse
from OpenSSL import SSL
from mitmproxy.tools import cmdline
@@ -12,7 +13,9 @@ from mitmproxy.proxy import config
from mitmproxy import exceptions
from pathod import test
from mitmproxy.net.http import http1
from . import tutils
from mitmproxy.test import tutils
from . import tutils as ttutils
class TestServerConnection:
@@ -55,10 +58,21 @@ class TestServerConnection:
assert "foo" in repr(sc)
class MockParser(argparse.ArgumentParser):
"""
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
def error(self, message):
raise Exception(message)
class TestProcessProxyOptions:
def p(self, *args):
parser = tutils.MockParser()
parser = MockParser()
cmdline.common_options(parser)
args = parser.parse_args(args=args)
opts = cmdline.get_common_options(args)
@@ -115,7 +129,7 @@ class TestProcessProxyOptions:
self.assert_noerr("--client-certs", cadir)
self.assert_noerr(
"--client-certs",
os.path.join(tutils.test_data.path("data/clientcert"), "client.pem"))
os.path.join(tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem"))
self.assert_err(
"path does not exist",
"--client-certs",
@@ -124,7 +138,7 @@ class TestProcessProxyOptions:
def test_certs(self):
self.assert_noerr(
"--cert",
tutils.test_data.path("data/testkey.pem"))
tutils.test_data.path("mitmproxy/data/testkey.pem"))
self.assert_err("does not exist", "--cert", "nonexistent")
def test_auth(self):
@@ -133,12 +147,12 @@ class TestProcessProxyOptions:
p = self.assert_noerr(
"--htpasswd",
tutils.test_data.path("data/htpasswd"))
tutils.test_data.path("mitmproxy/data/htpasswd"))
assert p.authenticator
self.assert_err(
"malformed htpasswd file",
"--htpasswd",
tutils.test_data.path("data/htpasswd.invalid"))
tutils.test_data.path("mitmproxy/data/htpasswd.invalid"))
p = self.assert_noerr("--singleuser", "test:test")
assert p.authenticator
@@ -165,7 +179,7 @@ class TestProcessProxyOptions:
class TestProxyServer:
# binding to 0.0.0.0:1 works without special permissions on Windows
@tutils.skip_windows
@ttutils.skip_windows
def test_err(self):
conf = ProxyConfig(
options.Options(listen_port=1),
@@ -205,5 +219,5 @@ class TestConnectionHandler:
config,
channel
)
with tutils.capture_stderr(c.handle) as output:
with ttutils.capture_stderr(c.handle) as output:
assert "mitmproxy has crashed" in output

View File

@@ -1,4 +1,4 @@
from . import tutils
from mitmproxy.test import tutils
import base64
from mitmproxy.proxy import config

View File

@@ -2,7 +2,7 @@ import os
import socket
import time
import mitmproxy.test.tutils
from mitmproxy.test import tutils
from mitmproxy import controller
from mitmproxy import options
from mitmproxy.addons import script
@@ -16,11 +16,12 @@ from mitmproxy import exceptions
from mitmproxy.net.http import authentication
from mitmproxy.net.http import http1
from mitmproxy.net.tcp import Address
from mitmproxy.test.tutils import raises
from pathod import pathoc
from pathod import pathod
from . import tutils, tservers
from . import tutils as ttutils
from . import tservers
"""
Note that the choice of response code in these tests matters more than you
@@ -159,7 +160,7 @@ class TcpMixin:
# mitmproxy responds with bad gateway
assert self.pathod(spec).status_code == 502
self._ignore_on()
with raises(exceptions.HttpException):
with tutils.raises(exceptions.HttpException):
self.pathod(spec) # pathoc tries to parse answer as HTTP
self._ignore_off()
@@ -238,7 +239,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
# There's a race here, which means we can get any of a number of errors.
# Rather than introduce yet another sleep into the test suite, we just
# relax the Exception specification.
with raises(Exception):
with tutils.raises(Exception):
p.request("get:'%s'" % response)
def test_reconnect(self):
@@ -277,7 +278,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
def test_stream_modify(self):
s = script.Script(
tutils.test_data.path("data/addonscripts/stream_modify.py")
tutils.test_data.path("mitmproxy/data/addonscripts/stream_modify.py")
)
self.master.addons.add(s)
d = self.pathod('200:b"foo"')
@@ -327,7 +328,7 @@ class TestHTTPS(tservers.HTTPProxyTest, CommonMixin, TcpMixin):
def test_clientcert_file(self):
try:
self.config.clientcerts = os.path.join(
tutils.test_data.path("data/clientcert"), "client.pem")
tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem")
f = self.pathod("304")
assert f.status_code == 304
assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
@@ -336,7 +337,7 @@ class TestHTTPS(tservers.HTTPProxyTest, CommonMixin, TcpMixin):
def test_clientcert_dir(self):
try:
self.config.clientcerts = tutils.test_data.path("data/clientcert")
self.config.clientcerts = tutils.test_data.path("mitmproxy/data/clientcert")
f = self.pathod("304")
assert f.status_code == 304
assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
@@ -375,7 +376,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org",
certs=[
("example.mitmproxy.org", tutils.test_data.path("data/servercert/trusted-leaf.pem"))
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/trusted-leaf.pem"))
]
)
@@ -388,7 +389,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
self.config.options.update(
ssl_insecure=False,
ssl_verify_upstream_trusted_cadir=tutils.test_data.path(
"data/servercert/"
"mitmproxy/data/servercert/"
),
ssl_verify_upstream_trusted_ca=None,
)
@@ -399,7 +400,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
ssl_insecure=False,
ssl_verify_upstream_trusted_cadir=None,
ssl_verify_upstream_trusted_ca=tutils.test_data.path(
"data/servercert/trusted-root.pem"
"mitmproxy/data/servercert/trusted-root.pem"
),
)
assert self._request().status_code == 242
@@ -414,7 +415,7 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org",
certs=[
("example.mitmproxy.org", tutils.test_data.path("data/servercert/self-signed.pem"))
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/self-signed.pem"))
])
def _request(self):
@@ -426,7 +427,7 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
def get_options(cls):
opts = super().get_options()
opts.ssl_verify_upstream_trusted_ca = tutils.test_data.path(
"data/servercert/trusted-root.pem"
"mitmproxy/data/servercert/trusted-root.pem"
)
return opts
@@ -453,7 +454,7 @@ class TestHTTPSNoCommonName(tservers.HTTPProxyTest):
ssl = True
ssloptions = pathod.SSLOptions(
certs=[
(b"*", tutils.test_data.path("data/no_common_name.pem"))
(b"*", tutils.test_data.path("mitmproxy/data/no_common_name.pem"))
]
)
@@ -563,7 +564,7 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
def test_tcp_stream_modify(self):
s = script.Script(
tutils.test_data.path("data/addonscripts/tcp_stream_modify.py")
tutils.test_data.path("mitmproxy/data/addonscripts/tcp_stream_modify.py")
)
self.master.addons.add(s)
self._tcpproxy_on()
@@ -594,7 +595,7 @@ class TestProxy(tservers.HTTPProxyTest):
assert "host" in f.request.headers
assert f.response.status_code == 304
@tutils.skip_appveyor
@ttutils.skip_appveyor
def test_response_timestamps(self):
# test that we notice at least 1 sec delay between timestamps
# in response object
@@ -605,7 +606,7 @@ class TestProxy(tservers.HTTPProxyTest):
# timestamp_start might fire a bit late, so we play safe and only require 300ms.
assert 0.3 <= response.timestamp_end - response.timestamp_start
@tutils.skip_appveyor
@ttutils.skip_appveyor
def test_request_timestamps(self):
# test that we notice a delay between timestamps in request object
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -833,7 +834,7 @@ class TestKillRequest(tservers.HTTPProxyTest):
masterclass = MasterKillRequest
def test_kill(self):
with raises(exceptions.HttpReadDisconnect):
with tutils.raises(exceptions.HttpReadDisconnect):
self.pathod("200")
# Nothing should have hit the server
assert not self.server.last_log()
@@ -850,7 +851,7 @@ class TestKillResponse(tservers.HTTPProxyTest):
masterclass = MasterKillResponse
def test_kill(self):
with raises(exceptions.HttpReadDisconnect):
with tutils.raises(exceptions.HttpReadDisconnect):
self.pathod("200")
# The server should have seen a request
assert self.server.last_log()
@@ -1042,7 +1043,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
class AddUpstreamCertsToClientChainMixin:
ssl = True
servercert = tutils.test_data.path("data/servercert/trusted-root.pem")
servercert = tutils.test_data.path("mitmproxy/data/servercert/trusted-root.pem")
ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org",
certs=[

View File

@@ -1,4 +1,3 @@
import argparse
import sys
from contextlib import contextmanager
from unittest.case import SkipTest
@@ -6,12 +5,9 @@ from unittest.case import SkipTest
import io
import mitmproxy.test.tutils
import os
import shutil
import tempfile
from mitmproxy import controller
from mitmproxy import flow
import mitmproxy.test.tflow
from mitmproxy.utils import data
def _skip_windows(*args):
@@ -64,46 +60,9 @@ def tdummyflow(client_conn=True, server_conn=True, err=None):
return f
def get_body_line(last_displayed_body, line_nb):
return last_displayed_body.contents()[line_nb + 2]
@contextmanager
def chdir(dir):
orig_dir = os.getcwd()
os.chdir(dir)
yield
os.chdir(orig_dir)
@contextmanager
def tmpdir(*args, **kwargs):
temp_workdir = tempfile.mkdtemp(*args, **kwargs)
with chdir(temp_workdir):
yield temp_workdir
shutil.rmtree(temp_workdir)
class MockParser(argparse.ArgumentParser):
"""
argparse.ArgumentParser sys.exits() by default.
Make it more testable by throwing an exception instead.
"""
def error(self, message):
raise Exception(message)
raises = mitmproxy.test.tutils.raises
@contextmanager
def capture_stderr(command, *args, **kwargs):
out, sys.stderr = sys.stderr, io.StringIO()
command(*args, **kwargs)
yield sys.stderr.getvalue()
sys.stderr = out
test_data = data.Data(__name__)

View File

@@ -2,7 +2,7 @@ import os
from pathod import language
from pathod.language import base, exceptions
from . import tutils
from mitmproxy.test import tutils
def parse_request(s):

View File

@@ -1,7 +1,7 @@
import os
from pathod.language import generators
from . import tutils
from mitmproxy.test import tutils
def test_randomgenerator():

View File

@@ -2,7 +2,8 @@ import io
from pathod import language
from pathod.language import http, base
from . import tutils
from mitmproxy.test import tutils
from . import tservers
def parse_request(s):
@@ -302,8 +303,8 @@ def test_shortcuts():
assert next(language.parse_pathod(
"400:l'foo'")).headers[0].key.val == b"Location"
assert b"Android" in tutils.render(parse_request("get:/:ua"))
assert b"User-Agent" in tutils.render(parse_request("get:/:ua"))
assert b"Android" in tservers.render(parse_request("get:/:ua"))
assert b"User-Agent" in tservers.render(parse_request("get:/:ua"))
def test_user_agent():

View File

@@ -7,7 +7,7 @@ from pathod import language
from pathod.language import http2
from pathod.protocols.http2 import HTTP2StateProtocol
from . import tutils
from mitmproxy.test import tutils
def parse_request(s):

View File

@@ -2,7 +2,8 @@ from pathod import language
from pathod.language import websockets
import mitmproxy.net.websockets
from . import tutils
from mitmproxy.test import tutils
from . import tservers
def parse_request(s):
@@ -62,7 +63,7 @@ class TestWebsocketFrame:
def test_flags(self):
wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3")
frm = mitmproxy.net.websockets.Frame.from_bytes(tutils.render(wf))
frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf))
assert frm.header.fin
assert frm.header.mask
assert frm.header.rsv1
@@ -70,7 +71,7 @@ class TestWebsocketFrame:
assert frm.header.rsv3
wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3")
frm = mitmproxy.net.websockets.Frame.from_bytes(tutils.render(wf))
frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf))
assert not frm.header.fin
assert not frm.header.mask
assert not frm.header.rsv1
@@ -80,7 +81,7 @@ class TestWebsocketFrame:
def fr(self, spec, **kwargs):
settings = language.base.Settings(**kwargs)
wf = parse_request(spec)
return mitmproxy.net.websockets.Frame.from_bytes(tutils.render(wf, settings))
return mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf, settings))
def test_construction(self):
assert self.fr("wf:c1").header.opcode == 1

View File

@@ -4,13 +4,13 @@ from mock import Mock
from mitmproxy.net import http
from mitmproxy.net import tcp
from mitmproxy.net.http import http1
from mitmproxy.test.tutils import raises
from mitmproxy import exceptions
from pathod import pathoc, language
from pathod.protocols.http2 import HTTP2StateProtocol
from . import tutils
from mitmproxy.test import tutils
from . import tservers
def test_response():
@@ -18,7 +18,7 @@ def test_response():
assert repr(r)
class PathocTestDaemon(tutils.DaemonTests):
class PathocTestDaemon(tservers.DaemonTests):
def tval(self, requests, timeout=None, showssl=False, **kwargs):
s = io.StringIO()
c = pathoc.Pathoc(
@@ -64,7 +64,7 @@ class TestDaemonSSL(PathocTestDaemon):
def test_clientcert(self):
self.tval(
["get:/p/200"],
clientcert=tutils.test_data.path("data/clientcert/client.pem"),
clientcert=tutils.test_data.path("pathod/data/clientcert/client.pem"),
)
log = self.d.log()
assert log[0]["request"]["clientcert"]["keyinfo"]
@@ -171,12 +171,12 @@ class TestDaemon(PathocTestDaemon):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.rfile, c.wfile = io.BytesIO(), io.BytesIO()
with raises("connect failed"):
with tutils.raises("connect failed"):
c.http_connect(to)
c.rfile = io.BytesIO(
b"HTTP/1.1 500 OK\r\n"
)
with raises("connect failed"):
with tutils.raises("connect failed"):
c.http_connect(to)
c.rfile = io.BytesIO(
b"HTTP/1.1 200 OK\r\n"

View File

@@ -3,7 +3,7 @@ import mock
from pathod import pathoc_cmdline as cmdline
from . import tutils
from mitmproxy.test import tutils
@mock.patch("argparse.ArgumentParser.error")
@@ -52,7 +52,7 @@ def test_pathoc(perror):
[
"pathoc",
"foo.com:8888",
tutils.test_data.path("data/request")
tutils.test_data.path("pathod/data/request")
]
)
assert len(list(a.requests)) == 1

View File

@@ -3,8 +3,9 @@ import io
from pathod import pathod
from mitmproxy.net import tcp
from mitmproxy import exceptions
from mitmproxy.test import tutils
from . import tutils
from . import tservers
class TestPathod:
@@ -24,7 +25,7 @@ class TestPathod:
assert len(p.get_log()) <= p.LOGBUF
class TestTimeout(tutils.DaemonTests):
class TestTimeout(tservers.DaemonTests):
timeout = 0.01
def test_timeout(self):
@@ -36,7 +37,7 @@ class TestTimeout(tutils.DaemonTests):
assert self.d.last_log()["type"] == "timeout"
class TestNotAfterConnect(tutils.DaemonTests):
class TestNotAfterConnect(tservers.DaemonTests):
ssl = False
ssloptions = dict(
not_after_connect=True
@@ -50,10 +51,10 @@ class TestNotAfterConnect(tutils.DaemonTests):
assert r[0].status_code == 202
class TestCustomCert(tutils.DaemonTests):
class TestCustomCert(tservers.DaemonTests):
ssl = True
ssloptions = dict(
certs=[(b"*", tutils.test_data.path("data/testkey.pem"))],
certs=[(b"*", tutils.test_data.path("pathod/data/testkey.pem"))],
)
def test_connect(self):
@@ -64,7 +65,7 @@ class TestCustomCert(tutils.DaemonTests):
assert "test.com" in str(r.sslinfo.certchain[0].get_subject())
class TestSSLCN(tutils.DaemonTests):
class TestSSLCN(tservers.DaemonTests):
ssl = True
ssloptions = dict(
cn=b"foo.com"
@@ -78,7 +79,7 @@ class TestSSLCN(tutils.DaemonTests):
assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
class TestNohang(tutils.DaemonTests):
class TestNohang(tservers.DaemonTests):
nohang = True
def test_nohang(self):
@@ -88,14 +89,14 @@ class TestNohang(tutils.DaemonTests):
assert "Pauses have been disabled" in l["response"]["msg"]
class TestHexdump(tutils.DaemonTests):
class TestHexdump(tservers.DaemonTests):
hexdump = True
def test_hexdump(self):
assert self.get(r"200:b'\xf0'")
class TestNocraft(tutils.DaemonTests):
class TestNocraft(tservers.DaemonTests):
nocraft = True
def test_nocraft(self):
@@ -104,7 +105,7 @@ class TestNocraft(tutils.DaemonTests):
assert b"Crafting disabled" in r.content
class CommonTests(tutils.DaemonTests):
class CommonTests(tservers.DaemonTests):
def test_binarydata(self):
assert self.get(r"200:b'\xf0'")
@@ -252,7 +253,7 @@ class TestDaemonSSL(CommonTests):
assert self.d.last_log()["cipher"][1] > 0
class TestHTTP2(tutils.DaemonTests):
class TestHTTP2(tservers.DaemonTests):
ssl = True
nohang = True

View File

@@ -2,7 +2,7 @@ import mock
from pathod import pathod_cmdline as cmdline
from . import tutils
from mitmproxy.test import tutils
def test_parse_anchor_spec():
@@ -18,7 +18,7 @@ def test_pathod(perror):
[
"pathod",
"--cert",
tutils.test_data.path("data/testkey.pem")
tutils.test_data.path("pathod/data/testkey.pem")
]
)
assert a.ssl_certs
@@ -46,7 +46,7 @@ def test_pathod(perror):
[
"pathod",
"-a",
"foo=" + tutils.test_data.path("data/response")
"foo=" + tutils.test_data.path("pathod/data/response")
]
)
assert a.anchors

View File

@@ -2,7 +2,7 @@ import logging
import requests
from pathod import test
from . import tutils
from mitmproxy.test import tutils
import requests.packages.urllib3
@@ -34,8 +34,8 @@ class TestDaemonManual:
def test_startstop_ssl_explicit(self):
ssloptions = dict(
certfile=tutils.test_data.path("data/testkey.pem"),
cacert=tutils.test_data.path("data/testkey.pem"),
certfile=tutils.test_data.path("pathod/data/testkey.pem"),
cacert=tutils.test_data.path("pathod/data/testkey.pem"),
ssl_after_connect=False
)
d = test.Daemon(ssl=ssloptions)

View File

@@ -1,6 +1,6 @@
from pathod import utils
from . import tutils
from mitmproxy.test import tutils
def test_membool():

View File

@@ -5,7 +5,6 @@ import requests
import io
import urllib
from mitmproxy.utils import data
from mitmproxy.net import tcp
from mitmproxy.test import tutils
@@ -40,7 +39,7 @@ class DaemonTests:
opts["confdir"] = cls.confdir
so = pathod.SSLOptions(**opts)
cls.d = test.Daemon(
staticdir=test_data.path("data"),
staticdir=tutils.test_data.path("pathod/data"),
anchors=[
(re.compile("/anchor/.*"), "202:da")
],
@@ -139,13 +138,6 @@ class DaemonTests:
return ret, logfp.getvalue()
tmpdir = tutils.tmpdir
raises = tutils.raises
test_data = data.Data(__name__)
def render(r, settings=language.Settings()):
r = r.resolve(settings)
s = io.BytesIO()