mirror of
https://github.com/zhigang1992/mitmproxy.git
synced 2026-04-19 23:30:16 +08:00
move code from mitmproxy to netlib
This commit is contained in:
@@ -4,6 +4,8 @@ import mock
|
||||
import gc
|
||||
from os.path import normpath
|
||||
import mock_urwid
|
||||
|
||||
import netlib.tutils
|
||||
from libmproxy import console
|
||||
from libmproxy.console import common
|
||||
|
||||
@@ -60,13 +62,13 @@ class TestConsoleState:
|
||||
|
||||
def _add_response(self, state):
|
||||
f = self._add_request(state)
|
||||
f.response = tutils.tresp()
|
||||
f.response = netlib.tutils.tresp()
|
||||
state.update_flow(f)
|
||||
|
||||
def test_add_response(self):
|
||||
c = console.ConsoleState()
|
||||
f = self._add_request(c)
|
||||
f.response = tutils.tresp()
|
||||
f.response = netlib.tutils.tresp()
|
||||
c.focus = None
|
||||
c.update_flow(f)
|
||||
|
||||
|
||||
@@ -4,7 +4,9 @@ if os.name == "nt":
|
||||
raise SkipTest("Skipped on Windows.")
|
||||
import sys
|
||||
|
||||
import netlib.utils
|
||||
from netlib import odict
|
||||
|
||||
import libmproxy.console.contentview as cv
|
||||
from libmproxy import utils, flow, encoding
|
||||
import tutils
|
||||
@@ -65,10 +67,10 @@ class TestContentView:
|
||||
assert f[0].startswith("XML")
|
||||
|
||||
def test_view_urlencoded(self):
|
||||
d = utils.urlencode([("one", "two"), ("three", "four")])
|
||||
d = netlib.utils.urlencode([("one", "two"), ("three", "four")])
|
||||
v = cv.ViewURLEncoded()
|
||||
assert v([], d, 100)
|
||||
d = utils.urlencode([("adsfa", "")])
|
||||
d = netlib.utils.urlencode([("adsfa", "")])
|
||||
v = cv.ViewURLEncoded()
|
||||
assert v([], d, 100)
|
||||
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
import os
|
||||
from cStringIO import StringIO
|
||||
|
||||
import netlib.tutils
|
||||
from netlib.http.semantics import CONTENT_MISSING
|
||||
|
||||
from libmproxy import dump, flow
|
||||
from libmproxy.protocol import http
|
||||
from libmproxy.protocol import http, http_wrappers
|
||||
from libmproxy.proxy.primitives import Log
|
||||
import tutils
|
||||
import mock
|
||||
|
||||
|
||||
def test_strfuncs():
|
||||
t = tutils.tresp()
|
||||
t = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
t.is_replay = True
|
||||
dump.str_response(t)
|
||||
|
||||
@@ -26,14 +27,14 @@ def test_strfuncs():
|
||||
|
||||
class TestDumpMaster:
|
||||
def _cycle(self, m, content):
|
||||
f = tutils.tflow(req=tutils.treq(content))
|
||||
f = tutils.tflow(req=netlib.tutils.treq(content))
|
||||
l = Log("connect")
|
||||
l.reply = mock.MagicMock()
|
||||
m.handle_log(l)
|
||||
m.handle_clientconnect(f.client_conn)
|
||||
m.handle_serverconnect(f.server_conn)
|
||||
m.handle_request(f)
|
||||
f.response = tutils.tresp(content)
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(content))
|
||||
f = m.handle_response(f)
|
||||
m.handle_clientdisconnect(f.client_conn)
|
||||
return f
|
||||
@@ -70,7 +71,7 @@ class TestDumpMaster:
|
||||
f = tutils.tflow()
|
||||
f.request.content = CONTENT_MISSING
|
||||
m.handle_request(f)
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
f.response.content = CONTENT_MISSING
|
||||
m.handle_response(f)
|
||||
assert "content missing" in cs.getvalue()
|
||||
|
||||
@@ -3,18 +3,20 @@ import time
|
||||
import os.path
|
||||
from cStringIO import StringIO
|
||||
import email.utils
|
||||
import mock
|
||||
|
||||
import netlib.utils
|
||||
from netlib import odict
|
||||
from netlib.http.semantics import CONTENT_MISSING
|
||||
from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, HDR_FORM_MULTIPART
|
||||
|
||||
from libmproxy import filt, protocol, controller, utils, tnetstring, flow
|
||||
from libmproxy.protocol import http_wrappers
|
||||
from libmproxy.protocol.primitives import Error, Flow
|
||||
from libmproxy.protocol.http import decoded
|
||||
from libmproxy.proxy.config import HostMatcher
|
||||
from libmproxy.proxy import ProxyConfig
|
||||
from libmproxy.proxy.server import DummyServer
|
||||
from libmproxy.proxy.connection import ClientConnection
|
||||
import mock
|
||||
import tutils
|
||||
|
||||
|
||||
@@ -22,7 +24,7 @@ def test_app_registry():
|
||||
ar = flow.AppRegistry()
|
||||
ar.add("foo", "domain", 80)
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.host = "domain"
|
||||
r.port = 80
|
||||
assert ar.get(r)
|
||||
@@ -30,7 +32,7 @@ def test_app_registry():
|
||||
r.port = 81
|
||||
assert not ar.get(r)
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.host = "domain2"
|
||||
r.port = 80
|
||||
assert not ar.get(r)
|
||||
@@ -41,7 +43,7 @@ def test_app_registry():
|
||||
class TestStickyCookieState:
|
||||
def _response(self, cookie, host):
|
||||
s = flow.StickyCookieState(filt.parse(".*"))
|
||||
f = tutils.tflow(req=tutils.treq(host=host, port=80), resp=True)
|
||||
f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
|
||||
f.response.headers["Set-Cookie"] = [cookie]
|
||||
s.handle_response(f)
|
||||
return s, f
|
||||
@@ -383,7 +385,7 @@ class TestFlow:
|
||||
|
||||
def test_backup(self):
|
||||
f = tutils.tflow()
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
f.request.content = "foo"
|
||||
assert not f.modified()
|
||||
f.backup()
|
||||
@@ -516,16 +518,16 @@ class TestState:
|
||||
assert c.add_flow(newf)
|
||||
assert c.active_flow_count() == 2
|
||||
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
assert c.update_flow(f)
|
||||
assert c.flow_count() == 2
|
||||
assert c.active_flow_count() == 1
|
||||
|
||||
_ = tutils.tresp()
|
||||
_ = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
assert not c.update_flow(None)
|
||||
assert c.active_flow_count() == 1
|
||||
|
||||
newf.response = tutils.tresp()
|
||||
newf.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
assert c.update_flow(newf)
|
||||
assert c.active_flow_count() == 0
|
||||
|
||||
@@ -557,7 +559,7 @@ class TestState:
|
||||
c.set_limit("~s")
|
||||
assert c.limit_txt == "~s"
|
||||
assert len(c.view) == 0
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
c.update_flow(f)
|
||||
assert len(c.view) == 1
|
||||
c.set_limit(None)
|
||||
@@ -589,7 +591,7 @@ class TestState:
|
||||
def _add_response(self, state):
|
||||
f = tutils.tflow()
|
||||
state.add_flow(f)
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
state.update_flow(f)
|
||||
|
||||
def _add_error(self, state):
|
||||
@@ -807,11 +809,11 @@ class TestFlowMaster:
|
||||
fm.anticomp = True
|
||||
f = tutils.tflow(req=None)
|
||||
fm.handle_clientconnect(f.client_conn)
|
||||
f.request = tutils.treq()
|
||||
f.request = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
fm.handle_request(f)
|
||||
assert s.flow_count() == 1
|
||||
|
||||
f.response = tutils.tresp()
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
fm.handle_response(f)
|
||||
assert not fm.handle_response(None)
|
||||
assert s.flow_count() == 1
|
||||
@@ -856,7 +858,7 @@ class TestFlowMaster:
|
||||
s = flow.State()
|
||||
|
||||
f = tutils.tflow()
|
||||
f.response = tutils.tresp(f.request)
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
|
||||
pb = [f]
|
||||
|
||||
fm = flow.FlowMaster(None, s)
|
||||
@@ -910,7 +912,7 @@ class TestFlowMaster:
|
||||
def test_server_playback_kill(self):
|
||||
s = flow.State()
|
||||
f = tutils.tflow()
|
||||
f.response = tutils.tresp(f.request)
|
||||
f.response = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp(f.request))
|
||||
pb = [f]
|
||||
fm = flow.FlowMaster(None, s)
|
||||
fm.refresh_server_playback = True
|
||||
@@ -1009,7 +1011,7 @@ class TestRequest:
|
||||
assert r.get_state() == r2.get_state()
|
||||
|
||||
def test_get_url(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
|
||||
assert r.url == "http://address:22/path"
|
||||
|
||||
@@ -1030,7 +1032,7 @@ class TestRequest:
|
||||
assert r.pretty_url(True) == "https://foo.com:22/path"
|
||||
|
||||
def test_path_components(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.path = "/"
|
||||
assert r.get_path_components() == []
|
||||
r.path = "/foo/bar"
|
||||
@@ -1050,8 +1052,8 @@ class TestRequest:
|
||||
|
||||
def test_getset_form_urlencoded(self):
|
||||
d = odict.ODict([("one", "two"), ("three", "four")])
|
||||
r = tutils.treq(content=utils.urlencode(d.lst))
|
||||
r.headers["content-type"] = [protocol.http.HDR_FORM_URLENCODED]
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
|
||||
r.headers["content-type"] = [HDR_FORM_URLENCODED]
|
||||
assert r.get_form_urlencoded() == d
|
||||
|
||||
d = odict.ODict([("x", "y")])
|
||||
@@ -1064,7 +1066,7 @@ class TestRequest:
|
||||
def test_getset_query(self):
|
||||
h = odict.ODictCaseless()
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.path = "/foo?x=y&a=b"
|
||||
q = r.get_query()
|
||||
assert q.lst == [("x", "y"), ("a", "b")]
|
||||
@@ -1087,7 +1089,7 @@ class TestRequest:
|
||||
|
||||
def test_anticache(self):
|
||||
h = odict.ODictCaseless()
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.headers = h
|
||||
h["if-modified-since"] = ["test"]
|
||||
h["if-none-match"] = ["test"]
|
||||
@@ -1096,7 +1098,7 @@ class TestRequest:
|
||||
assert not "if-none-match" in r.headers
|
||||
|
||||
def test_replace(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.path = "path/foo"
|
||||
r.headers["Foo"] = ["fOo"]
|
||||
r.content = "afoob"
|
||||
@@ -1106,31 +1108,31 @@ class TestRequest:
|
||||
assert r.headers["boo"] == ["boo"]
|
||||
|
||||
def test_constrain_encoding(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.headers["accept-encoding"] = ["gzip", "oink"]
|
||||
r.constrain_encoding()
|
||||
assert "oink" not in r.headers["accept-encoding"]
|
||||
|
||||
def test_decodeencode(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
r.decode()
|
||||
assert not r.headers["content-encoding"]
|
||||
assert r.content == "falafel"
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.content = "falafel"
|
||||
assert not r.decode()
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
r.encode("identity")
|
||||
assert r.headers["content-encoding"] == ["identity"]
|
||||
assert r.content == "falafel"
|
||||
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
r.encode("gzip")
|
||||
@@ -1141,7 +1143,7 @@ class TestRequest:
|
||||
assert r.content == "falafel"
|
||||
|
||||
def test_get_decoded_content(self):
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
r.content = None
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
assert r.get_decoded_content() == None
|
||||
@@ -1153,7 +1155,7 @@ class TestRequest:
|
||||
def test_get_content_type(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Content-Type"] = ["text/plain"]
|
||||
resp = tutils.tresp()
|
||||
resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
resp.headers = h
|
||||
assert resp.headers.get_first("content-type") == "text/plain"
|
||||
|
||||
@@ -1166,7 +1168,7 @@ class TestResponse:
|
||||
assert resp2.get_state() == resp.get_state()
|
||||
|
||||
def test_refresh(self):
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
n = time.time()
|
||||
r.headers["date"] = [email.utils.formatdate(n)]
|
||||
pre = r.headers["date"]
|
||||
@@ -1184,7 +1186,7 @@ class TestResponse:
|
||||
r.refresh()
|
||||
|
||||
def test_refresh_cookie(self):
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
|
||||
# Invalid expires format, sent to us by Reddit.
|
||||
c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
|
||||
@@ -1194,7 +1196,7 @@ class TestResponse:
|
||||
assert "00:21:38" in r._refresh_cookie(c, 60)
|
||||
|
||||
def test_replace(self):
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
r.headers["Foo"] = ["fOo"]
|
||||
r.content = "afoob"
|
||||
assert r.replace("foo(?i)", "boo") == 3
|
||||
@@ -1202,21 +1204,21 @@ class TestResponse:
|
||||
assert r.headers["boo"] == ["boo"]
|
||||
|
||||
def test_decodeencode(self):
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
assert r.decode()
|
||||
assert not r.headers["content-encoding"]
|
||||
assert r.content == "falafel"
|
||||
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
r.encode("identity")
|
||||
assert r.headers["content-encoding"] == ["identity"]
|
||||
assert r.content == "falafel"
|
||||
|
||||
r = tutils.tresp()
|
||||
r = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
r.headers["content-encoding"] = ["identity"]
|
||||
r.content = "falafel"
|
||||
r.encode("gzip")
|
||||
@@ -1233,7 +1235,7 @@ class TestResponse:
|
||||
def test_get_content_type(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Content-Type"] = ["text/plain"]
|
||||
resp = tutils.tresp()
|
||||
resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
resp.headers = h
|
||||
assert resp.headers.get_first("content-type") == "text/plain"
|
||||
|
||||
@@ -1277,7 +1279,7 @@ class TestClientConnection:
|
||||
|
||||
|
||||
def test_decoded():
|
||||
r = tutils.treq()
|
||||
r = http_wrappers.HTTPRequest.wrap(netlib.tutils.treq())
|
||||
assert r.content == "content"
|
||||
assert not r.headers["content-encoding"]
|
||||
r.encode("gzip")
|
||||
|
||||
@@ -17,14 +17,7 @@ def mock_protocol(data='', chunked=False):
|
||||
return http1.HTTP1Protocol(rfile=rfile, wfile=wfile)
|
||||
|
||||
|
||||
|
||||
def test_HttpAuthenticationError():
|
||||
x = HttpAuthenticationError({"foo": "bar"})
|
||||
assert str(x)
|
||||
assert "foo" in x.headers
|
||||
|
||||
|
||||
# TODO: move test to netlib
|
||||
# TODO: move test to netlib http1 protocol
|
||||
# def test_stripped_chunked_encoding_no_content():
|
||||
# """
|
||||
# https://github.com/mitmproxy/mitmproxy/issues/186
|
||||
@@ -38,183 +31,6 @@ def test_HttpAuthenticationError():
|
||||
# assert "Content-Length" in r._assemble_headers()
|
||||
#
|
||||
|
||||
class TestHTTPRequest:
|
||||
def test_asterisk_form_in(self):
|
||||
f = tutils.tflow(req=None)
|
||||
protocol = mock_protocol("OPTIONS * HTTP/1.1")
|
||||
f.request = HTTPRequest.from_protocol(protocol)
|
||||
|
||||
assert f.request.form_in == "relative"
|
||||
f.request.host = f.server_conn.address.host
|
||||
f.request.port = f.server_conn.address.port
|
||||
f.request.scheme = "http"
|
||||
assert protocol.assemble(f.request) == (
|
||||
"OPTIONS * HTTP/1.1\r\n"
|
||||
"Host: address:22\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
|
||||
def test_relative_form_in(self):
|
||||
protocol = mock_protocol("GET /foo\xff HTTP/1.1")
|
||||
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
|
||||
protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
assert r.headers["Upgrade"] == ["h2c"]
|
||||
|
||||
def test_expect_header(self):
|
||||
protocol = mock_protocol(
|
||||
"GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
|
||||
assert r.content == "foo"
|
||||
assert protocol.tcp_handler.rfile.read(3) == "bar"
|
||||
|
||||
def test_authority_form_in(self):
|
||||
protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
|
||||
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
|
||||
protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
r.scheme, r.host, r.port = "http", "address", 22
|
||||
assert protocol.assemble(r) == (
|
||||
"CONNECT address:22 HTTP/1.1\r\n"
|
||||
"Host: address:22\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
assert r.pretty_url(False) == "address:22"
|
||||
|
||||
def test_absolute_form_in(self):
|
||||
protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
|
||||
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
|
||||
protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
assert protocol.assemble(r) == (
|
||||
"GET http://address:22/ HTTP/1.1\r\n"
|
||||
"Host: address:22\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
|
||||
def test_http_options_relative_form_in(self):
|
||||
"""
|
||||
Exercises fix for Issue #392.
|
||||
"""
|
||||
protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
r.host = 'address'
|
||||
r.port = 80
|
||||
r.scheme = "http"
|
||||
assert protocol.assemble(r) == (
|
||||
"OPTIONS /secret/resource HTTP/1.1\r\n"
|
||||
"Host: address\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
|
||||
def test_http_options_absolute_form_in(self):
|
||||
protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
|
||||
r = HTTPRequest.from_protocol(protocol)
|
||||
r.host = 'address'
|
||||
r.port = 80
|
||||
r.scheme = "http"
|
||||
assert protocol.assemble(r) == (
|
||||
"OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
|
||||
"Host: address\r\n"
|
||||
"Content-Length: 0\r\n\r\n")
|
||||
|
||||
def test_set_url(self):
|
||||
r = tutils.treq_absolute()
|
||||
r.url = "https://otheraddress:42/ORLY"
|
||||
assert r.scheme == "https"
|
||||
assert r.host == "otheraddress"
|
||||
assert r.port == 42
|
||||
assert r.path == "/ORLY"
|
||||
|
||||
def test_repr(self):
|
||||
r = tutils.treq()
|
||||
assert repr(r)
|
||||
|
||||
def test_pretty_host(self):
|
||||
r = tutils.treq()
|
||||
assert r.pretty_host(True) == "address"
|
||||
assert r.pretty_host(False) == "address"
|
||||
r.headers["host"] = ["other"]
|
||||
assert r.pretty_host(True) == "other"
|
||||
assert r.pretty_host(False) == "address"
|
||||
r.host = None
|
||||
assert r.pretty_host(True) == "other"
|
||||
assert r.pretty_host(False) is None
|
||||
del r.headers["host"]
|
||||
assert r.pretty_host(True) is None
|
||||
assert r.pretty_host(False) is None
|
||||
|
||||
# Invalid IDNA
|
||||
r.headers["host"] = [".disqus.com"]
|
||||
assert r.pretty_host(True) == ".disqus.com"
|
||||
|
||||
def test_get_form_for_urlencoded(self):
|
||||
r = tutils.treq()
|
||||
r.headers.add("content-type", "application/x-www-form-urlencoded")
|
||||
r.get_form_urlencoded = MagicMock()
|
||||
|
||||
r.get_form()
|
||||
|
||||
assert r.get_form_urlencoded.called
|
||||
|
||||
def test_get_form_for_multipart(self):
|
||||
r = tutils.treq()
|
||||
r.headers.add("content-type", "multipart/form-data")
|
||||
r.get_form_multipart = MagicMock()
|
||||
|
||||
r.get_form()
|
||||
|
||||
assert r.get_form_multipart.called
|
||||
|
||||
def test_get_cookies_none(self):
|
||||
h = odict.ODictCaseless()
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
assert len(r.get_cookies()) == 0
|
||||
|
||||
def test_get_cookies_single(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = ["cookiename=cookievalue"]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert result['cookiename'] == ['cookievalue']
|
||||
|
||||
def test_get_cookies_double(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = [
|
||||
"cookiename=cookievalue;othercookiename=othercookievalue"
|
||||
]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert result['cookiename'] == ['cookievalue']
|
||||
assert result['othercookiename'] == ['othercookievalue']
|
||||
|
||||
def test_get_cookies_withequalsign(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = [
|
||||
"cookiename=coo=kievalue;othercookiename=othercookievalue"
|
||||
]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert result['cookiename'] == ['coo=kievalue']
|
||||
assert result['othercookiename'] == ['othercookievalue']
|
||||
|
||||
def test_set_cookies(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = ["cookiename=cookievalue"]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
result["cookiename"] = ["foo"]
|
||||
r.set_cookies(result)
|
||||
assert r.get_cookies()["cookiename"] == ["foo"]
|
||||
|
||||
|
||||
class TestHTTPResponse:
|
||||
def test_read_from_stringio(self):
|
||||
@@ -241,80 +57,7 @@ class TestHTTPResponse:
|
||||
HTTPResponse.from_protocol, protocol, "GET"
|
||||
)
|
||||
|
||||
def test_repr(self):
|
||||
r = tutils.tresp()
|
||||
assert "unknown content type" in repr(r)
|
||||
r.headers["content-type"] = ["foo"]
|
||||
assert "foo" in repr(r)
|
||||
assert repr(tutils.tresp(content=CONTENT_MISSING))
|
||||
|
||||
def test_get_cookies_none(self):
|
||||
h = odict.ODictCaseless()
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
assert not resp.get_cookies()
|
||||
|
||||
def test_get_cookies_simple(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = ["cookiename=cookievalue"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
||||
|
||||
def test_get_cookies_with_parameters(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = [
|
||||
"cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0][0] == "cookievalue"
|
||||
attrs = result["cookiename"][0][1]
|
||||
assert len(attrs) == 4
|
||||
assert attrs["domain"] == ["example.com"]
|
||||
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
|
||||
assert attrs["path"] == ["/"]
|
||||
assert attrs["httponly"] == [None]
|
||||
|
||||
def test_get_cookies_no_value(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = [
|
||||
"cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
|
||||
]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0][0] == ""
|
||||
assert len(result["cookiename"][0][1]) == 2
|
||||
|
||||
def test_get_cookies_twocookies(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
||||
assert "othercookie" in result
|
||||
assert result["othercookie"][0] == ["othervalue", odict.ODict()]
|
||||
|
||||
def test_set_cookies(self):
|
||||
resp = tutils.tresp()
|
||||
v = resp.get_cookies()
|
||||
v.add("foo", ["bar", odict.ODictCaseless()])
|
||||
resp.set_cookies(v)
|
||||
|
||||
v = resp.get_cookies()
|
||||
assert len(v) == 1
|
||||
assert v["foo"] == [["bar", odict.ODictCaseless()]]
|
||||
|
||||
|
||||
class TestHTTPFlow(object):
|
||||
|
||||
@@ -2,6 +2,7 @@ import socket
|
||||
import time
|
||||
from OpenSSL import SSL
|
||||
|
||||
import netlib.tutils
|
||||
from netlib import tcp, http, socks
|
||||
from netlib.certutils import SSLCert
|
||||
from netlib.http import authentication
|
||||
@@ -9,7 +10,7 @@ from netlib.http.semantics import CONTENT_MISSING
|
||||
from libpathod import pathoc, pathod
|
||||
|
||||
from libmproxy.proxy.config import HostMatcher
|
||||
from libmproxy.protocol import KILL, Error
|
||||
from libmproxy.protocol import KILL, Error, http_wrappers
|
||||
import tutils
|
||||
import tservers
|
||||
|
||||
@@ -783,7 +784,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
|
||||
|
||||
class MasterFakeResponse(tservers.TestMaster):
|
||||
def handle_request(self, f):
|
||||
resp = tutils.tresp()
|
||||
resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
f.reply(resp)
|
||||
|
||||
|
||||
@@ -848,7 +849,7 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
|
||||
|
||||
class MasterIncomplete(tservers.TestMaster):
|
||||
def handle_request(self, f):
|
||||
resp = tutils.tresp()
|
||||
resp = http_wrappers.HTTPResponse.wrap(netlib.tutils.tresp())
|
||||
resp.content = CONTENT_MISSING
|
||||
f.reply(resp)
|
||||
|
||||
|
||||
@@ -44,11 +44,6 @@ def test_pretty_json():
|
||||
assert not utils.pretty_json("moo")
|
||||
|
||||
|
||||
def test_urldecode():
|
||||
s = "one=two&three=four"
|
||||
assert len(utils.urldecode(s)) == 2
|
||||
|
||||
|
||||
def test_multipartdecode():
|
||||
boundary = 'somefancyboundary'
|
||||
headers = odict.ODict(
|
||||
@@ -116,13 +111,6 @@ def test_LRUCache():
|
||||
assert len(cache.cache) == 2
|
||||
|
||||
|
||||
def test_unparse_url():
|
||||
assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
|
||||
assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"
|
||||
assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
|
||||
assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
|
||||
|
||||
|
||||
def test_parse_size():
|
||||
assert not utils.parse_size("")
|
||||
assert utils.parse_size("1") == 1
|
||||
@@ -144,7 +132,3 @@ def test_parse_content_type():
|
||||
|
||||
def test_safe_subn():
|
||||
assert utils.safe_subn("foo", u"bar", "\xc2foo")
|
||||
|
||||
|
||||
def test_urlencode():
|
||||
assert utils.urlencode([('foo', 'bar')])
|
||||
|
||||
@@ -1,22 +1,25 @@
|
||||
from cStringIO import StringIO
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import argparse
|
||||
from contextlib import contextmanager
|
||||
import sys
|
||||
from libmproxy import flow, utils, controller
|
||||
from libmproxy.protocol import http
|
||||
from libmproxy.proxy.connection import ClientConnection, ServerConnection
|
||||
import mock_urwid
|
||||
from libmproxy.console.flowview import FlowView
|
||||
from libmproxy.console import ConsoleState
|
||||
from libmproxy.protocol.primitives import Error
|
||||
from netlib import certutils, odict
|
||||
from cStringIO import StringIO
|
||||
from contextlib import contextmanager
|
||||
from nose.plugins.skip import SkipTest
|
||||
from mock import Mock
|
||||
from time import time
|
||||
|
||||
from netlib import certutils, odict
|
||||
import netlib.tutils
|
||||
|
||||
from libmproxy import flow, utils, controller
|
||||
from libmproxy.protocol import http, http_wrappers
|
||||
from libmproxy.proxy.connection import ClientConnection, ServerConnection
|
||||
from libmproxy.console.flowview import FlowView
|
||||
from libmproxy.console import ConsoleState
|
||||
from libmproxy.protocol.primitives import Error
|
||||
|
||||
|
||||
def _SkipWindows():
|
||||
raise SkipTest("Skipped on Windows.")
|
||||
@@ -43,12 +46,17 @@ def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
|
||||
if server_conn is True:
|
||||
server_conn = tserver_conn()
|
||||
if req is True:
|
||||
req = treq()
|
||||
req = netlib.tutils.treq()
|
||||
if resp is True:
|
||||
resp = tresp()
|
||||
resp = netlib.tutils.tresp()
|
||||
if err is True:
|
||||
err = terr()
|
||||
|
||||
if req:
|
||||
req = http_wrappers.HTTPRequest.wrap(req)
|
||||
if resp:
|
||||
resp = http_wrappers.HTTPResponse.wrap(resp)
|
||||
|
||||
f = http.HTTPFlow(client_conn, server_conn)
|
||||
f.request = req
|
||||
f.response = resp
|
||||
@@ -83,60 +91,6 @@ def tserver_conn():
|
||||
return c
|
||||
|
||||
|
||||
def treq(content="content", scheme="http", host="address", port=22):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPRequest
|
||||
"""
|
||||
headers = odict.ODictCaseless()
|
||||
headers["header"] = ["qvalue"]
|
||||
req = http.HTTPRequest(
|
||||
"relative",
|
||||
"GET",
|
||||
scheme,
|
||||
host,
|
||||
port,
|
||||
"/path",
|
||||
(1, 1),
|
||||
headers,
|
||||
content,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
return req
|
||||
|
||||
|
||||
def treq_absolute(content="content"):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPRequest
|
||||
"""
|
||||
r = treq(content)
|
||||
r.form_in = r.form_out = "absolute"
|
||||
r.host = "address"
|
||||
r.port = 22
|
||||
r.scheme = "http"
|
||||
return r
|
||||
|
||||
|
||||
def tresp(content="message"):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPResponse
|
||||
"""
|
||||
|
||||
headers = odict.ODictCaseless()
|
||||
headers["header_response"] = ["svalue"]
|
||||
|
||||
resp = http.HTTPResponse(
|
||||
(1, 1),
|
||||
200,
|
||||
"OK",
|
||||
headers,
|
||||
content,
|
||||
time(),
|
||||
time(),
|
||||
)
|
||||
return resp
|
||||
|
||||
|
||||
def terr(content="error"):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user