Housekeeping and cleanups

- No output to stdout on load in examples - they muck up the test suite.
- Use the odict module directly, rather than aliasing it. The small convenience
this gives to scripters is not worth it.
- Move the cookie tests from the flow test module to the protocol_http test
module.
This commit is contained in:
Aldo Cortesi
2015-04-14 11:58:10 +12:00
parent f37efecd0a
commit bea0bd236a
12 changed files with 98 additions and 94 deletions

View File

@@ -122,7 +122,7 @@ The main classes you will deal with in writing mitmproxy scripts are:
<td> A handle for interacting with mitmproxy's from within scripts.</td>
</tr>
<tr>
<th>libmproxy.flow.ODict</th>
<th>libmproxy.odict.ODict</th>
<td>A dictionary-like object for managing sets of key/value data. There
is also a variant called CaselessODict that ignores key case for some

View File

@@ -4,7 +4,6 @@
from libmproxy import filt
def start(context, argv):
print argv
if len(argv) != 2:
raise ValueError("Usage: -s 'filt.py FILTER'")
context.filter = filt.parse(argv[1])
@@ -12,4 +11,4 @@ def start(context, argv):
def response(context, flow):
if flow.match(context.filter):
print("Flow matches filter:")
print(flow)
print(flow)

View File

@@ -12,6 +12,7 @@ import traceback
import urwid
import netlib.utils
from netlib import odict
from . import common
from .. import utils, encoding, flow
@@ -519,7 +520,7 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request):
return "No content", ""
msg = []
hdrs = flow.ODictCaseless([list(i) for i in hdrItems])
hdrs = odict.ODictCaseless([list(i) for i in hdrItems])
enc = hdrs.get_first("content-encoding")
if enc and enc != "identity":

View File

@@ -1,6 +1,7 @@
from __future__ import absolute_import
import os, sys, copy
import urwid
from netlib import odict
from . import common, grideditor, contentview, signals, searchable, tabs
from . import flowdetailview
from .. import utils, flow, controller
@@ -275,11 +276,11 @@ class FlowView(tabs.Tabs):
signals.flow_change.send(self, flow = self.flow)
def set_headers(self, lst, conn):
conn.headers = flow.ODictCaseless(lst)
conn.headers = odict.ODictCaseless(lst)
signals.flow_change.send(self, flow = self.flow)
def set_query(self, lst, conn):
conn.set_query(flow.ODict(lst))
conn.set_query(odict.ODict(lst))
signals.flow_change.send(self, flow = self.flow)
def set_path_components(self, lst, conn):
@@ -287,7 +288,7 @@ class FlowView(tabs.Tabs):
signals.flow_change.send(self, flow = self.flow)
def set_form(self, lst, conn):
conn.set_form_urlencoded(flow.ODict(lst))
conn.set_form_urlencoded(odict.ODict(lst))
signals.flow_change.send(self, flow = self.flow)
def edit_form(self, conn):
@@ -311,7 +312,7 @@ class FlowView(tabs.Tabs):
if not self.flow.response:
self.flow.response = HTTPResponse(
self.flow.request.httpversion,
200, "OK", flow.ODictCaseless(), ""
200, "OK", odict.ODictCaseless(), ""
)
self.flow.response.reply = controller.DummyReply()
message = self.flow.response

View File

@@ -17,9 +17,6 @@ from .proxy.config import HostMatcher
from .proxy.connection import ClientConnection, ServerConnection
import urlparse
ODict = odict.ODict
ODictCaseless = odict.ODictCaseless
class AppRegistry:
def __init__(self):

View File

@@ -8,7 +8,7 @@ from email.utils import parsedate_tz, formatdate, mktime_tz
import threading
from netlib import http, tcp, http_status
import netlib.utils
from netlib.odict import ODict, ODictCaseless
from netlib import odict
from .tcp import TCPHandler
from .primitives import KILL, ProtocolHandler, Flow, Error
from ..proxy.connection import ServerConnection
@@ -45,7 +45,7 @@ def send_connect_request(conn, host, port, update_state=True):
port,
None,
(1, 1),
ODictCaseless(),
odict.ODictCaseless(),
""
)
conn.send(upstream_request.assemble())
@@ -100,7 +100,7 @@ class HTTPMessage(stateobject.StateObject):
timestamp_end=None):
self.httpversion = httpversion
self.headers = headers
"""@type: ODictCaseless"""
"""@type: odict.ODictCaseless"""
self.content = content
self.timestamp_start = timestamp_start
@@ -108,7 +108,7 @@ class HTTPMessage(stateobject.StateObject):
_stateobject_attributes = dict(
httpversion=tuple,
headers=ODictCaseless,
headers=odict.ODictCaseless,
content=str,
timestamp_start=float,
timestamp_end=float
@@ -242,7 +242,7 @@ class HTTPRequest(HTTPMessage):
httpversion: HTTP version tuple, e.g. (1,1)
headers: ODictCaseless object
headers: odict.ODictCaseless object
content: Content of the request, None, or CONTENT_MISSING if there
is content associated, but not present. CONTENT_MISSING evaluates
@@ -280,7 +280,7 @@ class HTTPRequest(HTTPMessage):
timestamp_end=None,
form_out=None
):
assert isinstance(headers, ODictCaseless) or not headers
assert isinstance(headers, odict.ODictCaseless) or not headers
HTTPMessage.__init__(
self,
httpversion,
@@ -521,7 +521,7 @@ class HTTPRequest(HTTPMessage):
return self.get_form_urlencoded()
elif self.headers.in_any("content-type", HDR_FORM_MULTIPART, True):
return self.get_form_multipart()
return ODict([])
return odict.ODict([])
def get_form_urlencoded(self):
"""
@@ -530,13 +530,13 @@ class HTTPRequest(HTTPMessage):
indicates non-form data.
"""
if self.content and self.headers.in_any("content-type", HDR_FORM_URLENCODED, True):
return ODict(utils.urldecode(self.content))
return ODict([])
return odict.ODict(utils.urldecode(self.content))
return odict.ODict([])
def get_form_multipart(self):
if self.content and self.headers.in_any("content-type", HDR_FORM_MULTIPART, True):
return ODict(utils.multipartdecode(self.headers, self.content))
return ODict([])
return odict.ODict(utils.multipartdecode(self.headers, self.content))
return odict.ODict([])
def set_form_urlencoded(self, odict):
"""
@@ -577,8 +577,8 @@ class HTTPRequest(HTTPMessage):
"""
_, _, _, _, query, _ = urlparse.urlparse(self.url)
if query:
return ODict(utils.urldecode(query))
return ODict([])
return odict.ODict(utils.urldecode(query))
return odict.ODict([])
def set_query(self, odict):
"""
@@ -697,7 +697,7 @@ class HTTPResponse(HTTPMessage):
def __init__(self, httpversion, code, msg, headers, content, timestamp_start=None,
timestamp_end=None):
assert isinstance(headers, ODictCaseless) or headers is None
assert isinstance(headers, odict.ODictCaseless) or headers is None
HTTPMessage.__init__(
self,
httpversion,

View File

@@ -2,8 +2,9 @@ import os
from nose.plugins.skip import SkipTest
if os.name == "nt":
raise SkipTest("Skipped on Windows.")
import sys
from netlib import odict
import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding
import tutils
@@ -30,14 +31,14 @@ class TestContentView:
def test_view_auto(self):
v = cv.ViewAuto()
f = v(
flow.ODictCaseless(),
odict.ODictCaseless(),
"foo",
1000
)
assert f[0] == "Raw"
f = v(
flow.ODictCaseless(
odict.ODictCaseless(
[["content-type", "text/html"]],
),
"<html></html>",
@@ -46,7 +47,7 @@ class TestContentView:
assert f[0] == "HTML"
f = v(
flow.ODictCaseless(
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"foo",
@@ -55,7 +56,7 @@ class TestContentView:
assert f[0] == "Raw"
f = v(
flow.ODictCaseless(
odict.ODictCaseless(
[["content-type", "text/flibble"]],
),
"<xml></xml>",
@@ -166,20 +167,20 @@ Content-Disposition: form-data; name="submit-name"
Larry
--AaB03x
""".strip()
h = flow.ODictCaseless(
h = odict.ODictCaseless(
[("Content-Type", "multipart/form-data; boundary=AaB03x")]
)
assert view(h, v, 1000)
h = flow.ODictCaseless()
h = odict.ODictCaseless()
assert not view(h, v, 1000)
h = flow.ODictCaseless(
h = odict.ODictCaseless(
[("Content-Type", "multipart/form-data")]
)
assert not view(h, v, 1000)
h = flow.ODictCaseless(
h = odict.ODictCaseless(
[("Content-Type", "unparseable")]
)
assert not view(h, v, 1000)
@@ -281,4 +282,3 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")

View File

@@ -1,4 +1,5 @@
import cStringIO
from netlib import odict
from libmproxy import filt, flow
from libmproxy.protocol import http
from libmproxy.protocol.primitives import Error
@@ -74,7 +75,7 @@ class TestParsing:
class TestMatching:
def req(self):
headers = flow.ODictCaseless()
headers = odict.ODictCaseless()
headers["header"] = ["qvalue"]
req = http.HTTPRequest(
"absolute",
@@ -96,7 +97,7 @@ class TestMatching:
def resp(self):
f = self.req()
headers = flow.ODictCaseless()
headers = odict.ODictCaseless()
headers["header_response"] = ["svalue"]
f.response = http.HTTPResponse((1, 1), 200, "OK", headers, "content_response", None, None)
@@ -253,4 +254,3 @@ class TestMatching:
assert self.q("! ~c 201", s)
assert self.q("!~c 201 !~c 202", s)
assert not self.q("!~c 201 !~c 200", s)

View File

@@ -2,6 +2,7 @@ import Queue, time, os.path
from cStringIO import StringIO
import email.utils
import mock
from netlib import odict
from libmproxy import filt, protocol, controller, utils, tnetstring, flow
from libmproxy.protocol.primitives import Error, Flow
from libmproxy.protocol.http import decoded, CONTENT_MISSING
@@ -931,7 +932,7 @@ class TestRequest:
assert r.get_path_components() == []
r.path = "/foo/bar"
assert r.get_path_components() == ["foo", "bar"]
q = flow.ODict()
q = odict.ODict()
q["test"] = ["123"]
r.set_query(q)
assert r.get_path_components() == ["foo", "bar"]
@@ -945,12 +946,12 @@ class TestRequest:
assert "%2F" in r.path
def test_getset_form_urlencoded(self):
d = flow.ODict([("one", "two"), ("three", "four")])
d = odict.ODict([("one", "two"), ("three", "four")])
r = tutils.treq(content=utils.urlencode(d.lst))
r.headers["content-type"] = [protocol.http.HDR_FORM_URLENCODED]
assert r.get_form_urlencoded() == d
d = flow.ODict([("x", "y")])
d = odict.ODict([("x", "y")])
r.set_form_urlencoded(d)
assert r.get_form_urlencoded() == d
@@ -958,7 +959,7 @@ class TestRequest:
assert not r.get_form_urlencoded()
def test_getset_query(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
r = tutils.treq()
r.path = "/foo?x=y&a=b"
@@ -975,14 +976,14 @@ class TestRequest:
r.path = "/foo?x=y&a=b"
assert r.get_query()
r.set_query(flow.ODict([]))
r.set_query(odict.ODict([]))
assert not r.get_query()
qv = flow.ODict([("a", "b"), ("c", "d")])
qv = odict.ODict([("a", "b"), ("c", "d")])
r.set_query(qv)
assert r.get_query() == qv
def test_anticache(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
r = tutils.treq()
r.headers = h
h["if-modified-since"] = ["test"]
@@ -1046,43 +1047,8 @@ class TestRequest:
r.encode("gzip")
assert r.get_decoded_content() == "falafel"
def test_get_cookies_none(self):
h = flow.ODictCaseless()
r = tutils.treq()
r.headers = h
assert r.get_cookies() is None
def test_get_cookies_single(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==1
assert result['cookiename']==('cookievalue',{})
def test_get_cookies_double(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('cookievalue',{})
assert result['othercookiename']==('othercookievalue',{})
def test_get_cookies_withequalsign(self):
h = flow.ODictCaseless()
h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('coo=kievalue',{})
assert result['othercookiename']==('othercookievalue',{})
def test_header_size(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["headername"] = ["headervalue"]
r = tutils.treq()
r.headers = h
@@ -1090,7 +1056,7 @@ class TestRequest:
assert len(raw) == 62
def test_get_content_type(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
resp = tutils.tresp()
resp.headers = h
@@ -1183,13 +1149,13 @@ class TestResponse:
assert result==44
def test_get_cookies_none(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
resp = tutils.tresp()
resp.headers = h
assert not resp.get_cookies()
def test_get_cookies_simple(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue"]
resp = tutils.tresp()
resp.headers = h
@@ -1199,7 +1165,7 @@ class TestResponse:
assert result["cookiename"] == ("cookievalue", {})
def test_get_cookies_with_parameters(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
resp = tutils.tresp()
resp.headers = h
@@ -1214,7 +1180,7 @@ class TestResponse:
assert result["cookiename"][1]["httponly"]==""
def test_get_cookies_no_value(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"]
resp = tutils.tresp()
resp.headers = h
@@ -1225,7 +1191,7 @@ class TestResponse:
assert len(result["cookiename"][1])==2
def test_get_cookies_twocookies(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue","othercookie=othervalue"]
resp = tutils.tresp()
resp.headers = h
@@ -1237,7 +1203,7 @@ class TestResponse:
assert result["othercookie"] == ("othervalue", {})
def test_get_content_type(self):
h = flow.ODictCaseless()
h = odict.ODictCaseless()
h["Content-Type"] = ["text/plain"]
resp = tutils.tresp()
resp.headers = h

View File

@@ -1,6 +1,10 @@
from mock import MagicMock
from libmproxy.protocol.http import *
from cStringIO import StringIO
from mock import MagicMock
from libmproxy.protocol.http import *
from netlib import odict
import tutils, tservers
@@ -131,6 +135,41 @@ class TestHTTPRequest:
assert r.get_form_multipart.called
def test_get_cookies_none(self):
h = odict.ODictCaseless()
r = tutils.treq()
r.headers = h
assert r.get_cookies() is None
def test_get_cookies_single(self):
h = odict.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==1
assert result['cookiename']==('cookievalue',{})
def test_get_cookies_double(self):
h = odict.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue;othercookiename=othercookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('cookievalue',{})
assert result['othercookiename']==('othercookievalue',{})
def test_get_cookies_withequalsign(self):
h = odict.ODictCaseless()
h["Cookie"] = ["cookiename=coo=kievalue;othercookiename=othercookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result)==2
assert result['cookiename']==('coo=kievalue',{})
assert result['othercookiename']==('othercookievalue',{})

View File

@@ -1,5 +1,6 @@
import json
from libmproxy import utils, flow
from netlib import odict
import tutils
utils.CERT_SLEEP_TIME = 0
@@ -54,7 +55,7 @@ def test_urldecode():
def test_multipartdecode():
boundary = 'somefancyboundary'
headers = flow.ODict([('content-type', ('multipart/form-data; boundary=%s' % boundary))])
headers = odict.ODict([('content-type', ('multipart/form-data; boundary=%s' % boundary))])
content = "--{0}\n" \
"Content-Disposition: form-data; name=\"field1\"\n\n" \
"value1\n" \

View File

@@ -9,7 +9,7 @@ import mock_urwid
from libmproxy.console.flowview import FlowView
from libmproxy.console import ConsoleState
from libmproxy.protocol.primitives import Error
from netlib import certutils
from netlib import certutils, odict
from nose.plugins.skip import SkipTest
from mock import Mock
from time import time
@@ -81,7 +81,7 @@ def treq(content="content", scheme="http", host="address", port=22):
"""
@return: libmproxy.protocol.http.HTTPRequest
"""
headers = flow.ODictCaseless()
headers = odict.ODictCaseless()
headers["header"] = ["qvalue"]
req = http.HTTPRequest("relative", "GET", scheme, host, port, "/path", (1, 1), headers, content,
None, None, None)
@@ -104,7 +104,7 @@ def tresp(content="message"):
@return: libmproxy.protocol.http.HTTPResponse
"""
headers = flow.ODictCaseless()
headers = odict.ODictCaseless()
headers["header_response"] = ["svalue"]
resp = http.HTTPResponse((1, 1), 200, "OK", headers, content, time(), time())