Merge branch 'master' of github.com:mitmproxy/mitmproxy

This commit is contained in:
Maximilian Hils
2014-03-08 15:47:27 +01:00
15 changed files with 384 additions and 227 deletions

View File

@@ -1,5 +1,6 @@
import flask
import os.path
import os.path, os
import proxy
mapp = flask.Flask(__name__)
mapp.debug = True
@@ -16,14 +17,12 @@ def index():
@mapp.route("/cert/pem")
def certs_pem():
capath = master().server.config.cacert
p = os.path.splitext(capath)[0] + "-cert.pem"
p = os.path.join(master().server.config.confdir, proxy.CONF_BASENAME + "-ca-cert.pem")
return flask.Response(open(p, "rb").read(), mimetype='application/x-x509-ca-cert')
@mapp.route("/cert/p12")
def certs_p12():
capath = master().server.config.cacert
p = os.path.splitext(capath)[0] + "-cert.p12"
p = os.path.join(master().server.config.confdir, proxy.CONF_BASENAME + "-ca-cert.p12")
return flask.Response(open(p, "rb").read(), mimetype='application/x-pkcs12')

View File

@@ -387,4 +387,4 @@ def common_options(parser):
help="Allow access to users specified in an Apache htpasswd file."
)
proxy.certificate_option_group(parser)
proxy.ssl_option_group(parser)

View File

@@ -77,7 +77,6 @@ class PathEdit(urwid.Edit, _PathCompleter):
class ActionBar(common.WWrap):
def __init__(self):
self.message("")
self.expire = None
def selectable(self):
return True

View File

@@ -68,7 +68,8 @@ def _mkhelp():
("space", "next flow"),
("|", "run script on this flow"),
("/", "search in response body (case sensitive)"),
("n", "repeat previous search"),
("n", "repeat search forward"),
("N", "repeat search backwards"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
@@ -255,7 +256,7 @@ class FlowView(common.WWrap):
)
return f
def search_wrapped_around(self, last_find_line, last_search_index):
def search_wrapped_around(self, last_find_line, last_search_index, backwards):
"""
returns true if search wrapped around the bottom.
"""
@@ -265,15 +266,39 @@ class FlowView(common.WWrap):
current_search_index = self.state.get_flow_setting(self.flow,
"last_search_index")
if current_find_line <= last_find_line:
return True
elif current_find_line == last_find_line:
if current_search_index <= last_search_index:
return True
if not backwards:
message = "search hit BOTTOM, continuing at TOP"
if current_find_line <= last_find_line:
return True, message
elif current_find_line == last_find_line:
if current_search_index <= last_search_index:
return True, message
else:
message = "search hit TOP, continuing at BOTTOM"
if current_find_line >= last_find_line:
return True, message
elif current_find_line == last_find_line:
if current_search_index >= last_search_index:
return True, message
return False
return False, ""
def search(self, search_string):
def search_again(self, backwards=False):
"""
runs the previous search again, forwards or backwards.
"""
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
if last_search_string:
message = self.search(last_search_string, backwards)
if message:
self.master.statusbar.message(message)
else:
message = "no previous searches have been made"
self.master.statusbar.message(message)
return message
def search(self, search_string, backwards=False):
"""
similar to view_response or view_request, but instead of just
displaying the conn, it highlights a word that the user is
@@ -301,7 +326,7 @@ class FlowView(common.WWrap):
# generate the body, highlight the words and get focus
headers, msg, body = self.conn_text_raw(text)
try:
body, focus_position = self.search_highlight_text(body, search_string)
body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards)
except SearchError:
return "Search not supported in this view."
@@ -318,8 +343,10 @@ class FlowView(common.WWrap):
self.last_displayed_body = list_box
if self.search_wrapped_around(last_find_line, last_search_index):
return "search hit BOTTOM, continuing at TOP"
wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards)
if wrapped:
return wrapped_message
def search_get_start(self, search_string):
start_line = 0
@@ -344,57 +371,94 @@ class FlowView(common.WWrap):
return (start_line, start_index)
def search_highlight_text(self, text_objects, search_string, looping = False):
def search_get_range(self, len_text_objects, start_line, backwards):
if not backwards:
loop_range = xrange(start_line, len_text_objects)
else:
loop_range = xrange(start_line, -1, -1)
return loop_range
def search_find(self, text, search_string, start_index, backwards):
if backwards == False:
find_index = text.find(search_string, start_index)
else:
if start_index != 0:
start_index -= len(search_string)
else:
start_index = None
find_index = text.rfind(search_string, 0, start_index)
return find_index
def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False):
start_line, start_index = self.search_get_start(search_string)
i = start_line
found = False
text_objects = copy.deepcopy(text_objects)
for text_object in text_objects[start_line:]:
if i != start_line:
start_index = 0
loop_range = self.search_get_range(len(text_objects), start_line, backwards)
for i in loop_range:
text_object = text_objects[i]
try:
text, style = text_object.get_text()
except AttributeError:
raise SearchError()
find_index = text.find(search_string, start_index)
if find_index != -1:
before = text[:find_index]
after = text[find_index+len(search_string):]
new_text = urwid.Text(
[
before,
(self.highlight_color, search_string),
after,
]
)
if i != start_line:
start_index = 0
find_index = self.search_find(text, search_string, start_index, backwards)
if find_index != -1:
new_text = self.search_highlight_object(text, find_index, search_string)
text_objects[i] = new_text
found = True
self.state.add_flow_setting(self.flow, "last_search_index",
find_index)
self.state.add_flow_setting(self.flow, "last_find_line", i)
text_objects[i] = new_text
found = True
break
i += 1
# handle search WRAP
if found:
focus_pos = i
else :
# loop from the beginning, but not forever.
if (start_line == 0 and start_index == 0) or looping:
if looping:
focus_pos = None
else:
self.state.add_flow_setting(self.flow, "last_search_index", 0)
self.state.add_flow_setting(self.flow, "last_find_line", 0)
text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
if not backwards:
self.state.add_flow_setting(self.flow, "last_search_index", 0)
self.state.add_flow_setting(self.flow, "last_find_line", 0)
else:
self.state.add_flow_setting(self.flow, "last_search_index", None)
self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1)
text_objects, focus_pos = self.search_highlight_text(text_objects,
search_string, looping=True, backwards=backwards)
return text_objects, focus_pos
def search_highlight_object(self, text_object, find_index, search_string):
"""
just a little abstraction
"""
before = text_object[:find_index]
after = text_object[find_index+len(search_string):]
new_text = urwid.Text(
[
before,
(self.highlight_color, search_string),
after,
]
)
return new_text
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
@@ -761,13 +825,9 @@ class FlowView(common.WWrap):
None,
self.search)
elif key == "n":
last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
if last_search_string:
message = self.search(last_search_string)
if message:
self.master.statusbar.message(message)
else:
self.master.statusbar.message("no previous searches have been made")
self.search_again(backwards=False)
elif key == "N":
self.search_again(backwards=True)
else:
return key

View File

@@ -35,7 +35,6 @@ class TemporaryServerChangeMixin(object):
This mixin allows safe modification of the target server,
without any need to expose the ConnectionHandler to the Flow.
"""
def change_server(self, address, ssl):
if address == self.c.server_conn.address():
return
@@ -98,4 +97,6 @@ def handle_messages(conntype, connection_handler):
def handle_error(conntype, connection_handler, error):
return _handler(conntype, connection_handler).handle_error(error)
return _handler(conntype, connection_handler).handle_error(error)

View File

@@ -32,12 +32,11 @@ class TCPHandler(ProtocolHandler):
if d == "": # connection closed
break
data.write(d)
"""
OpenSSL Connections have an internal buffer that might contain data altough everything is read
from the socket. Thankfully, connection.pending() returns the amount of bytes in this buffer,
so we can read it completely at once.
"""
# OpenSSL Connections have an internal buffer that might
# contain data altough everything is read from the socket.
# Thankfully, connection.pending() returns the amount of
# bytes in this buffer, so we can read it completely at
# once.
if src.ssl_established:
data.write(rfile.read(src.connection.pending()))
else: # no data left, but not closed yet
@@ -57,4 +56,4 @@ class TCPHandler(ProtocolHandler):
self.c.log("%s %s\r\n%s" % (direction, dst_str,data))
dst.wfile.write(data)
dst.wfile.flush()
dst.wfile.flush()

View File

@@ -4,6 +4,10 @@ from netlib import tcp, http, certutils, http_auth
import utils, version, platform, controller, stateobject
TRANSPARENT_SSL_PORTS = [443, 8443]
CONF_BASENAME = "mitmproxy"
CONF_DIR = "~/.mitmproxy"
CA_CERT_NAME = "mitmproxy-ca.pem"
class AddressPriority(object):
@@ -37,10 +41,12 @@ class Log:
class ProxyConfig:
def __init__(self, certfile=None, cacert=None, clientcerts=None, no_upstream_cert=False, body_size_limit=None,
reverse_proxy=None, forward_proxy=None, transparent_proxy=None, authenticator=None):
self.certfile = certfile
self.cacert = cacert
def __init__(self, confdir=CONF_DIR, clientcerts=None,
no_upstream_cert=False, body_size_limit=None, reverse_proxy=None,
forward_proxy=None, transparent_proxy=None, authenticator=None,
ciphers=None, certs=None
):
self.ciphers = ciphers
self.clientcerts = clientcerts
self.no_upstream_cert = no_upstream_cert
self.body_size_limit = body_size_limit
@@ -48,7 +54,9 @@ class ProxyConfig:
self.forward_proxy = forward_proxy
self.transparent_proxy = transparent_proxy
self.authenticator = authenticator
self.certstore = certutils.CertStore()
self.confdir = os.path.expanduser(confdir)
self.certstore = certutils.CertStore.from_store(self.confdir, CONF_BASENAME)
class ClientConnection(tcp.BaseHandler, stateobject.SimpleStateObject):
@@ -380,9 +388,12 @@ class ConnectionHandler:
if client:
if self.client_conn.ssl_established:
raise ProxyError(502, "SSL to Client already established.")
dummycert = self.find_cert()
self.client_conn.convert_to_ssl(dummycert, self.config.certfile or self.config.cacert,
handle_sni=self.handle_sni)
cert, key = self.find_cert()
self.client_conn.convert_to_ssl(
cert, key,
handle_sni = self.handle_sni,
cipher_list = self.config.ciphers
)
def server_reconnect(self, no_ssl=False):
address = self.server_conn.address
@@ -410,22 +421,18 @@ class ConnectionHandler:
self.channel.tell("log", Log(msg))
def find_cert(self):
if self.config.certfile:
with open(self.config.certfile, "rb") as f:
return certutils.SSLCert.from_pem(f.read())
else:
host = self.server_conn.address.host
sans = []
if not self.config.no_upstream_cert or not self.server_conn.ssl_established:
upstream_cert = self.server_conn.cert
if upstream_cert.cn:
host = upstream_cert.cn.decode("utf8").encode("idna")
sans = upstream_cert.altnames
host = self.server_conn.address.host
sans = []
if not self.config.no_upstream_cert or not self.server_conn.ssl_established:
upstream_cert = self.server_conn.cert
if upstream_cert.cn:
host = upstream_cert.cn.decode("utf8").encode("idna")
sans = upstream_cert.altnames
ret = self.config.certstore.get_cert(host, sans, self.config.cacert)
if not ret:
raise ProxyError(502, "Unable to generate dummy cert.")
return ret
ret = self.config.certstore.get_cert(host, sans)
if not ret:
raise ProxyError(502, "Unable to generate dummy cert.")
return ret
def handle_sni(self, connection):
"""
@@ -441,9 +448,9 @@ class ConnectionHandler:
self.server_reconnect() # reconnect to upstream server with SNI
# Now, change client context to reflect changed certificate:
new_context = SSL.Context(SSL.TLSv1_METHOD)
new_context.use_privatekey_file(self.config.certfile or self.config.cacert)
dummycert = self.find_cert()
new_context.use_certificate(dummycert.x509)
cert, key = self.find_cert()
new_context.use_privatekey_file(key)
new_context.use_certificate(cert.X509)
connection.set_context(new_context)
# An unhandled exception in this method will core dump PyOpenSSL, so
# make dang sure it doesn't happen.
@@ -458,7 +465,6 @@ class ProxyServerError(Exception):
class ProxyServer(tcp.TCPServer):
allow_reuse_address = True
bound = True
def __init__(self, config, port, host='', server_version=version.NAMEVERSION):
"""
Raises ProxyServerError if there's a startup problem.
@@ -498,30 +504,29 @@ class DummyServer:
# Command-line utils
def certificate_option_group(parser):
def ssl_option_group(parser):
group = parser.add_argument_group("SSL")
group.add_argument(
"--cert", action="store",
type=str, dest="cert", default=None,
help="User-created SSL certificate file."
"--cert", dest='certs', default=[], type=str,
metavar = "SPEC", action="append",
help='Add an SSL certificate. SPEC is of the form "[domain=]path". '\
'The domain may include a wildcard, and is equal to "*" if not specified. '\
'The file at path is a certificate in PEM format. If a private key is included in the PEM, '\
'it is used, else the default key in the conf dir is used. Can be passed multiple times.'
)
group.add_argument(
"--client-certs", action="store",
type=str, dest="clientcerts", default=None,
help="Client certificate directory."
)
group.add_argument(
"--ciphers", action="store",
type=str, dest="ciphers", default=None,
help="SSL cipher specification."
)
def process_proxy_options(parser, options):
if options.cert:
options.cert = os.path.expanduser(options.cert)
if not os.path.exists(options.cert):
return parser.error("Manually created certificate does not exist: %s" % options.cert)
cacert = os.path.join(options.confdir, "mitmproxy-ca.pem")
cacert = os.path.expanduser(cacert)
if not os.path.exists(cacert):
certutils.dummy_ca(cacert)
body_size_limit = utils.parse_size(options.body_size_limit)
if options.reverse_proxy and options.transparent_proxy:
return parser.error("Can't set both reverse proxy and transparent proxy.")
@@ -574,14 +579,24 @@ def process_proxy_options(parser, options):
else:
authenticator = http_auth.NullProxyAuth(None)
certs = []
for i in options.certs:
parts = i.split("=", 1)
if len(parts) == 1:
parts = ["*", parts[0]]
parts[1] = os.path.expanduser(parts[1])
if not os.path.exists(parts[1]):
parser.error("Certificate file does not exist: %s"%parts[1])
certs.append(parts)
return ProxyConfig(
certfile=options.cert,
cacert=cacert,
clientcerts=options.clientcerts,
body_size_limit=body_size_limit,
no_upstream_cert=options.no_upstream_cert,
reverse_proxy=rp,
forward_proxy=fp,
transparent_proxy=trans,
authenticator=authenticator
authenticator=authenticator,
ciphers=options.ciphers,
certs = certs,
)

View File

@@ -102,7 +102,7 @@ setup(
"netlib>=%s"%version.VERSION,
"urwid>=1.1",
"pyasn1>0.1.2",
"pyopenssl>=0.13",
"pyopenssl>=0.14",
"Pillow>=2.3.0",
"lxml",
"flask"

View File

@@ -9,11 +9,8 @@ class TestApp(tservers.HTTPProxTest):
assert self.app("/").status_code == 200
def test_cert(self):
path = tutils.test_data.path("data/confdir/") + "mitmproxy-ca-cert."
with tutils.tmpdir() as d:
for ext in ["pem", "p12"]:
resp = self.app("/cert/%s" % ext)
assert resp.status_code == 200
with open(path + ext, "rb") as f:
assert resp.content == f.read()
assert resp.content

View File

@@ -120,7 +120,7 @@ class TestContentView:
def test_view_css(self):
v = cv.ViewCSS()
with open('./test/data/1.css', 'r') as fp:
with open(tutils.test_data.path('data/1.css'), 'r') as fp:
fixture_1 = fp.read()
result = v([], 'a', 100)
@@ -276,100 +276,3 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")
def test_search_highlights():
# Default text in requests is content. We will search for nt once, and
# expect the first bit to be highlighted. We will do it again and expect the
# second to be.
f = tutils.tflowview()
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
def test_search_returns_useful_messages():
f = tutils.tflowview()
# original string is content. this string should not be in there.
response = f.search("oranges and other fruit.")
assert response == "no matches for 'oranges and other fruit.'"
def test_search_highlights_clears_prev():
f = tutils.tflowview(request_contents="this is string\nstring is string")
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# search again, it should not be highlighted again.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
def test_search_highlights_multi_line():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# should highlight second line, first appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
# should highlight third line, second appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
def test_search_loops():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# get to the end.
f.search("string")
f.search("string")
f.search("string")
# should highlight the first line.
message = f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
assert message == "search hit BOTTOM, continuing at TOP"
def test_search_focuses():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
# should be focusing on the 2nd text line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert f.last_displayed_body.focus == text_object
def test_search_does_not_crash_on_bad():
"""
this used to crash, kept for reference.
"""
f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
f.search("AFTERCUTOFF")
# pretend F
f.state.add_flow_setting(
f.flow,
(f.state.view_flow_mode, "fullcontents"),
True
)
f.master.refresh_flow(f.flow)
# text changed, now this string will exist. can happen when user presses F
# for full text view
f.search("AFTERCUTOFF")

176
test/test_console_search.py Normal file
View File

@@ -0,0 +1,176 @@
import sys
import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding
import tutils
def test_search_highlights():
# Default text in requests is content. We will search for nt once, and
# expect the first bit to be highlighted. We will do it again and expect the
# second to be.
f = tutils.tflowview()
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
def test_search_returns_useful_messages():
f = tutils.tflowview()
# original string is content. this string should not be in there.
test_string = "oranges and other fruit."
response = f.search(test_string)
assert response == "no matches for '%s'" % test_string
def test_search_highlights_clears_prev():
f = tutils.tflowview(request_contents="this is string\nstring is string")
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# search again, it should not be highlighted again.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
def test_search_highlights_multi_line(flow=None):
f = flow if flow else tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# should highlight second line, first appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
# should highlight third line, second appearance of string.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
def test_search_loops():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# get to the end.
f.search("string")
f.search("string")
f.search("string")
# should highlight the first line.
message = f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
assert message == "search hit BOTTOM, continuing at TOP"
def test_search_focuses():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# should highlight the first line.
f.search("string")
# should be focusing on the 2nd text line.
f.search("string")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert f.last_displayed_body.focus == text_object
def test_search_does_not_crash_on_bad():
"""
this used to crash, kept for reference.
"""
f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
f.search("AFTERCUTOFF")
# pretend F
f.state.add_flow_setting(
f.flow,
(f.state.view_flow_mode, "fullcontents"),
True
)
f.master.refresh_flow(f.flow)
# text changed, now this string will exist. can happen when user presses F
# for full text view
f.search("AFTERCUTOFF")
def test_search_backwards():
f = tutils.tflowview(request_contents="content, content")
first_match = ('content, content', [(None, 2), (f.highlight_color, 2)])
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == first_match
f.search("nt")
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('content, content', [(None, 5), (f.highlight_color, 2)])
f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == first_match
def test_search_back_multiline():
f = tutils.tflowview(request_contents="this is string\nstring is string")
# shared assertions. highlight and pointers should now be on the third
# 'string' appearance
test_search_highlights_multi_line(f)
# should highlight second line, first appearance of string.
f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
# should highlight the first line again.
f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
def test_search_back_multi_multi_line():
"""
same as above for some bugs the above won't catch.
"""
f = tutils.tflowview(request_contents="this is string\nthis is string\nthis is string")
f.search("string")
f.search_again()
f.search_again()
# should be on second line
f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# first line now
f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 0)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
def test_search_backwards_wraps():
"""
when searching past line 0, it should loop.
"""
f = tutils.tflowview(request_contents="this is string\nthis is string\nthis is string")
# should be on second line
f.search("string")
f.search_again()
text_object = tutils.get_body_line(f.last_displayed_body, 1)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
# should be on third now.
f.search_again(backwards=True)
message = f.search_again(backwards=True)
text_object = tutils.get_body_line(f.last_displayed_body, 2)
assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
assert message == "search hit TOP, continuing at BOTTOM"

View File

@@ -88,7 +88,6 @@ class TestHTTPResponse:
class TestInvalidRequests(tservers.HTTPProxTest):
ssl = True
def test_double_connect(self):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
@@ -117,9 +116,7 @@ class TestProxyChaining(tservers.HTTPChainProxyTest):
class TestProxyChainingSSL(tservers.HTTPChainProxyTest):
ssl = True
def test_simple(self):
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
assert req.content == "content"

View File

@@ -70,10 +70,6 @@ class TestProcessProxyOptions:
def test_simple(self):
assert self.p()
def test_cert(self):
self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem"))
self.assert_err("does not exist", "--cert", "nonexistent")
def test_confdir(self):
with tutils.tmpdir() as confdir:
self.assert_noerr("--confdir", confdir)
@@ -90,11 +86,16 @@ class TestProcessProxyOptions:
self.assert_err("invalid reverse proxy", "-P", "reverse")
self.assert_noerr("-P", "http://localhost")
def test_certs(self):
def test_client_certs(self):
with tutils.tmpdir() as confdir:
self.assert_noerr("--client-certs", confdir)
self.assert_err("directory does not exist", "--client-certs", "nonexistent")
def test_certs(self):
with tutils.tmpdir() as confdir:
self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem"))
self.assert_err("does not exist", "--cert", "nonexistent")
def test_auth(self):
p = self.assert_noerr("--nonanonymous")
assert p.authenticator

View File

@@ -206,13 +206,21 @@ class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin):
def test_certfile(self):
assert self.pathod("304")
class TestHTTPSNoCommonName(tservers.HTTPProxTest, CommonMixin):
class TestHTTPSNoCommonName(tservers.HTTPProxTest):
"""
Test what happens if we get a cert without common name back.
"""
ssl = True
ssloptions=pathod.SSLOptions(certfile=tutils.test_data.path("data/no_common_name.pem"),
keyfile=tutils.test_data.path("data/no_common_name.pem"))
ssloptions=pathod.SSLOptions(
certs = [
("*", tutils.test_data.path("data/no_common_name.pem"))
]
)
def test_http(self):
f = self.pathod("202")
assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
class TestReverse(tservers.ReverseProxTest, CommonMixin):
reverse = True
@@ -370,7 +378,6 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
assert self.pathod("304").status_code == 502
class MasterIncomplete(tservers.TestMaster):
def handle_request(self, m):
resp = tutils.tresp()
@@ -383,5 +390,3 @@ class TestIncompleteResponse(tservers.HTTPProxTest):
def test_incomplete(self):
assert self.pathod("200").status_code == 502

View File

@@ -1,4 +1,6 @@
import os.path
import threading, Queue
import shutil, tempfile
import flask
import libpathod.test, libpathod.pathoc
from libmproxy import proxy, flow, controller
@@ -28,7 +30,6 @@ class TestMaster(flow.FlowMaster):
self.apps.add(testapp, "testapp", 80)
self.apps.add(errapp, "errapp", 80)
self.clear_log()
self.start_app(APP_HOST, APP_PORT, False)
def handle_request(self, m):
flow.FlowMaster.handle_request(self, m)
@@ -73,25 +74,31 @@ class ProxTestBase(object):
ssl = None
ssloptions = False
clientcerts = False
certfile = None
no_upstream_cert = False
authenticator = None
masterclass = TestMaster
externalapp = False
@classmethod
def setupAll(cls):
cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
pconf = cls.get_proxy_config()
cls.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
config = proxy.ProxyConfig(
no_upstream_cert = cls.no_upstream_cert,
cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
confdir = cls.confdir,
authenticator = cls.authenticator,
**pconf
)
tmaster = cls.masterclass(config)
tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
cls.proxy = ProxyThread(tmaster)
cls.proxy.start()
@classmethod
def tearDownAll(cls):
shutil.rmtree(cls.confdir)
@property
def master(cls):
return cls.proxy.tmaster
@@ -126,8 +133,6 @@ class ProxTestBase(object):
d = dict()
if cls.clientcerts:
d["clientcerts"] = tutils.test_data.path("data/clientcert")
if cls.certfile:
d["certfile"] =tutils.test_data.path("data/testkey.pem")
return d
@@ -252,7 +257,6 @@ class ChainProxTest(ProxTestBase):
"""
n = 2
chain_config = [lambda: proxy.ProxyConfig(
cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
)] * n
@classmethod
def setupAll(cls):
@@ -265,6 +269,7 @@ class ChainProxTest(ProxTestBase):
cls.chain[-1].port
)
tmaster = cls.masterclass(config)
tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
cls.chain.append(ProxyThread(tmaster))
cls.chain[-1].start()