fix #593, fix #656, coverage++

This commit is contained in:
Maximilian Hils
2015-07-03 02:47:12 +02:00
parent 9bffd9cf03
commit 4c831992aa
3 changed files with 83 additions and 53 deletions

View File

@@ -2,7 +2,7 @@ import socket
import time
from libmproxy.proxy.config import HostMatcher
import libpathod
from netlib import tcp, http_auth, http
from netlib import tcp, http_auth, http, socks
from libpathod import pathoc, pathod
from netlib.certutils import SSLCert
import tutils
@@ -237,6 +237,7 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
for i in l:
if "serverdisconnect" in i:
return True
req = "get:'%s/p/200:b@1'"
p = self.pathoc()
assert p.request(req % self.server.urlbase)
@@ -355,8 +356,8 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxTest):
"""
ssl = True
ssloptions = pathod.SSLOptions(
cn = "trusted-cert",
certs = [
cn="trusted-cert",
certs=[
("trusted-cert", tutils.test_data.path("data/trusted-server.crt"))
])
@@ -364,7 +365,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxTest):
self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
self.config.openssl_trusted_cadir_server = tutils.test_data.path(
"data/trusted-cadir/")
self.pathoc()
def test_verification_w_pemfile(self):
@@ -381,9 +382,9 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest):
"""
ssl = True
ssloptions = pathod.SSLOptions(
cn = "untrusted-cert",
certs = [
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
cn="untrusted-cert",
certs=[
("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
])
def test_default_verification_w_bad_cert(self):
@@ -414,7 +415,7 @@ class TestHTTPSNoCommonName(tservers.HTTPProxTest):
"""
ssl = True
ssloptions = pathod.SSLOptions(
certs = [
certs=[
("*", tutils.test_data.path("data/no_common_name.pem"))
]
)
@@ -428,6 +429,43 @@ class TestReverse(tservers.ReverseProxTest, CommonMixin, TcpMixin):
reverse = True
class TestSocks5(tservers.SocksModeTest):
def test_simple(self):
p = self.pathoc()
p.socks_connect(("localhost", self.server.port))
f = p.request("get:/p/200")
assert f.status_code == 200
def test_with_authentication_only(self):
p = self.pathoc()
f = p.request("get:/p/200")
assert f.status_code == 502
assert "SOCKS5 mode failure" in f.content
def test_no_connect(self):
"""
mitmproxy doesn't support UDP or BIND SOCKS CMDs
"""
p = self.pathoc()
socks.ClientGreeting(
socks.VERSION.SOCKS5,
[socks.METHOD.NO_AUTHENTICATION_REQUIRED]
).to_file(p.wfile)
socks.Message(
socks.VERSION.SOCKS5,
socks.CMD.BIND,
socks.ATYP.DOMAINNAME,
("example.com", 8080)
).to_file(p.wfile)
p.wfile.flush()
p.rfile.read(2) # read server greeting
f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway.
assert f.status_code == 502
assert "SOCKS5 mode failure" in f.content
class TestSpoof(tservers.SpoofModeTest):
def test_http(self):
alist = (
@@ -444,7 +482,7 @@ class TestSpoof(tservers.SpoofModeTest):
assert l.server_conn.address
assert l.server_conn.address.host == a[0]
assert l.server_conn.address.port == a[1]
def test_http_without_host(self):
p = self.pathoc()
f = p.request("get:/p/304:r")
@@ -468,7 +506,7 @@ class TestSSLSpoof(tservers.SSLSpoofModeTest):
assert l.server_conn.address
assert l.server_conn.address.host == a[0]
assert l.server_conn.address.port == a[1]
def test_https_without_sni(self):
a = ("localhost", self.server.port)
self.config.mode.sslport = a[1]
@@ -556,7 +594,7 @@ class TestProxy(tservers.HTTPProxTest):
connection.close()
request, response = self.master.state.view[
0].request, self.master.state.view[0].response
0].request, self.master.state.view[0].response
assert response.code == 304 # sanity test for our low level request
# time.sleep might be a little bit shorter than a second
assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2
@@ -718,7 +756,6 @@ class TestStreamRequest(tservers.HTTPProxTest):
assert self.server.last_log()
def test_stream_chunked(self):
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(("127.0.0.1", self.proxy.port))
fconn = connection.makefile()
@@ -736,8 +773,8 @@ class TestStreamRequest(tservers.HTTPProxTest):
chunks = list(
content for _,
content,
_ in http.read_http_body_chunked(
content,
_ in http.read_http_body_chunked(
fconn,
headers,
None,
@@ -839,9 +876,9 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
class TestUpstreamProxySSL(
tservers.HTTPUpstreamProxTest,
CommonMixin,
TcpMixin):
tservers.HTTPUpstreamProxTest,
CommonMixin,
TcpMixin):
ssl = True
def _host_pattern_on(self, attr):
@@ -911,10 +948,12 @@ class TestUpstreamProxySSL(
"""
https://github.com/mitmproxy/mitmproxy/issues/313
"""
def handle_request(f):
f.request.httpversion = (1, 0)
del f.request.headers["Content-Length"]
f.reply()
_handle_request = self.chain[0].tmaster.handle_request
self.chain[0].tmaster.handle_request = handle_request
try:
@@ -932,6 +971,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
If we have a disconnect on a secure connection that's transparently proxified to
an upstream http proxy, we need to send the CONNECT request again.
"""
def kill_requests(master, attr, exclude):
k = [0] # variable scope workaround: put into array
_func = getattr(master, attr)
@@ -943,21 +983,22 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
f.error = Error("terminated")
f.reply(KILL)
return _func(f)
setattr(master, attr, handler)
kill_requests(self.chain[1].tmaster, "handle_request",
exclude=[
# fail first request
# fail first request
2, # allow second request
])
])
kill_requests(self.chain[0].tmaster, "handle_request",
exclude=[
1, # CONNECT
# fail first request
# fail first request
3, # reCONNECT
4, # request
])
])
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
@@ -979,18 +1020,18 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
assert self.proxy.tmaster.state.flows[1].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[
0].request.form_in == "authority"
0].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[
1].request.form_in == "relative"
1].request.form_in == "relative"
assert self.chain[0].tmaster.state.flows[
2].request.form_in == "authority"
2].request.form_in == "authority"
assert self.chain[0].tmaster.state.flows[
3].request.form_in == "relative"
3].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[
0].request.form_in == "relative"
0].request.form_in == "relative"
assert self.chain[1].tmaster.state.flows[
1].request.form_in == "relative"
1].request.form_in == "relative"
req = p.request("get:'/p/418:b\"content2\"'")