mirror of
https://github.com/zhigang1992/mitmproxy.git
synced 2026-01-12 17:32:27 +08:00
Move export to addon, kill Python and Locust export
Also add a "raw" export format. The Python and Locust exports are hard to maintain, their tests are extremely brittle, they didn't have full test coverage, and are by my guess very rarely used. I feel the Locust export should certainly be an externally maintained addon. The Python/requests export can come back if someone cares enough, and it can be structured in a way we can maintain.
This commit is contained in:
@@ -7,6 +7,7 @@ from mitmproxy.addons import core_option_validation
|
||||
from mitmproxy.addons import core
|
||||
from mitmproxy.addons import cut
|
||||
from mitmproxy.addons import disable_h2c
|
||||
from mitmproxy.addons import export
|
||||
from mitmproxy.addons import onboarding
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy.addons import replace
|
||||
@@ -31,6 +32,7 @@ def default_addons():
|
||||
clientplayback.ClientPlayback(),
|
||||
cut.Cut(),
|
||||
disable_h2c.DisableH2C(),
|
||||
export.Export(),
|
||||
onboarding.Onboarding(),
|
||||
proxyauth.ProxyAuth(),
|
||||
replace.Replace(),
|
||||
|
||||
75
mitmproxy/addons/export.py
Normal file
75
mitmproxy/addons/export.py
Normal file
@@ -0,0 +1,75 @@
|
||||
import typing
|
||||
|
||||
from mitmproxy import command
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.utils import strutils
|
||||
from mitmproxy.net.http.http1 import assemble
|
||||
|
||||
import pyperclip
|
||||
|
||||
|
||||
def curl_command(f: flow.Flow) -> str:
|
||||
if not hasattr(f, "request"):
|
||||
raise exceptions.CommandError("Can't export flow with no request.")
|
||||
data = "curl "
|
||||
request = f.request.copy() # type: ignore
|
||||
request.decode(strict=False)
|
||||
for k, v in request.headers.items(multi=True):
|
||||
data += "-H '%s:%s' " % (k, v)
|
||||
if request.method != "GET":
|
||||
data += "-X %s " % request.method
|
||||
data += "'%s'" % request.url
|
||||
if request.content:
|
||||
data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
|
||||
request.content,
|
||||
escape_single_quotes=True
|
||||
)
|
||||
return data
|
||||
|
||||
|
||||
def raw(f: flow.Flow) -> bytes:
|
||||
if not hasattr(f, "request"):
|
||||
raise exceptions.CommandError("Can't export flow with no request.")
|
||||
return assemble.assemble_request(f.request) # type: ignore
|
||||
|
||||
|
||||
formats = dict(
|
||||
curl = curl_command,
|
||||
raw = raw,
|
||||
)
|
||||
|
||||
|
||||
class Export():
|
||||
@command.command("export.formats")
|
||||
def formats(self) -> typing.Sequence[str]:
|
||||
"""
|
||||
Return a list of the supported export formats.
|
||||
"""
|
||||
return list(sorted(formats.keys()))
|
||||
|
||||
@command.command("export.file")
|
||||
def file(self, fmt: str, f: flow.Flow, path: str) -> None:
|
||||
"""
|
||||
Export a flow to path.
|
||||
"""
|
||||
if fmt not in formats:
|
||||
raise exceptions.CommandError("No such export format: %s" % fmt)
|
||||
func = formats[fmt] # type: typing.Any
|
||||
v = func(f)
|
||||
with open(path, "wb") as fp:
|
||||
if isinstance(v, bytes):
|
||||
fp.write(v)
|
||||
else:
|
||||
fp.write(v.encode("utf-8"))
|
||||
|
||||
@command.command("export.clip")
|
||||
def clip(self, fmt: str, f: flow.Flow) -> None:
|
||||
"""
|
||||
Export a flow to the system clipboard.
|
||||
"""
|
||||
if fmt not in formats:
|
||||
raise exceptions.CommandError("No such export format: %s" % fmt)
|
||||
func = formats[fmt] # type: typing.Any
|
||||
v = strutils.always_str(func(f))
|
||||
pyperclip.copy(v)
|
||||
@@ -24,6 +24,8 @@ def typename(t: type, ret: bool) -> str:
|
||||
return t.__name__
|
||||
elif t == typing.Sequence[flow.Flow]:
|
||||
return "[flow]" if ret else "flowspec"
|
||||
elif t == typing.Sequence[str]:
|
||||
return "[str]"
|
||||
elif t == Cuts:
|
||||
return "[cuts]" if ret else "cutspec"
|
||||
elif t == flow.Flow:
|
||||
|
||||
@@ -1,183 +0,0 @@
|
||||
import io
|
||||
import json
|
||||
import pprint
|
||||
import re
|
||||
import textwrap
|
||||
from typing import Any
|
||||
|
||||
from mitmproxy import http
|
||||
from mitmproxy.utils import strutils
|
||||
|
||||
|
||||
def curl_command(flow: http.HTTPFlow) -> str:
|
||||
data = "curl "
|
||||
|
||||
request = flow.request.copy()
|
||||
request.decode(strict=False)
|
||||
|
||||
for k, v in request.headers.items(multi=True):
|
||||
data += "-H '%s:%s' " % (k, v)
|
||||
|
||||
if request.method != "GET":
|
||||
data += "-X %s " % request.method
|
||||
|
||||
data += "'%s'" % request.url
|
||||
|
||||
if request.content:
|
||||
data += " --data-binary '%s'" % strutils.bytes_to_escaped_str(
|
||||
request.content,
|
||||
escape_single_quotes=True
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def python_arg(arg: str, val: Any) -> str:
|
||||
if not val:
|
||||
return ""
|
||||
if arg:
|
||||
arg += "="
|
||||
arg_str = "{}{},\n".format(
|
||||
arg,
|
||||
pprint.pformat(val, 79 - len(arg))
|
||||
)
|
||||
return textwrap.indent(arg_str, " " * 4)
|
||||
|
||||
|
||||
def python_code(flow: http.HTTPFlow):
|
||||
code = io.StringIO()
|
||||
|
||||
def writearg(arg, val):
|
||||
code.write(python_arg(arg, val))
|
||||
|
||||
code.write("import requests\n")
|
||||
code.write("\n")
|
||||
if flow.request.method.lower() in ("get", "post", "put", "head", "delete", "patch"):
|
||||
code.write("response = requests.{}(\n".format(flow.request.method.lower()))
|
||||
else:
|
||||
code.write("response = requests.request(\n")
|
||||
writearg("", flow.request.method)
|
||||
url_without_query = flow.request.url.split("?", 1)[0]
|
||||
writearg("", url_without_query)
|
||||
|
||||
writearg("params", list(flow.request.query.fields))
|
||||
|
||||
headers = flow.request.headers.copy()
|
||||
# requests adds those by default.
|
||||
for x in (":authority", "host", "content-length"):
|
||||
headers.pop(x, None)
|
||||
writearg("headers", dict(headers))
|
||||
try:
|
||||
if "json" not in flow.request.headers.get("content-type", ""):
|
||||
raise ValueError()
|
||||
writearg("json", json.loads(flow.request.text))
|
||||
except ValueError:
|
||||
writearg("data", flow.request.content)
|
||||
|
||||
code.seek(code.tell() - 2) # remove last comma
|
||||
code.write("\n)\n")
|
||||
code.write("\n")
|
||||
code.write("print(response.text)")
|
||||
|
||||
return code.getvalue()
|
||||
|
||||
|
||||
def locust_code(flow):
|
||||
code = textwrap.dedent("""
|
||||
from locust import HttpLocust, TaskSet, task
|
||||
|
||||
class UserBehavior(TaskSet):
|
||||
def on_start(self):
|
||||
''' on_start is called when a Locust start before any task is scheduled '''
|
||||
self.{name}()
|
||||
|
||||
@task()
|
||||
def {name}(self):
|
||||
url = self.locust.host + '{path}'
|
||||
{headers}{params}{data}
|
||||
self.response = self.client.request(
|
||||
method='{method}',
|
||||
url=url,{args}
|
||||
)
|
||||
|
||||
### Additional tasks can go here ###
|
||||
|
||||
|
||||
class WebsiteUser(HttpLocust):
|
||||
task_set = UserBehavior
|
||||
min_wait = 1000
|
||||
max_wait = 3000
|
||||
""").strip()
|
||||
|
||||
name = re.sub('\W|^(?=\d)', '_', flow.request.path.strip("/").split("?", 1)[0])
|
||||
if not name:
|
||||
new_name = "_".join([str(flow.request.host), str(flow.request.timestamp_start)])
|
||||
name = re.sub('\W|^(?=\d)', '_', new_name)
|
||||
|
||||
path_without_query = flow.request.path.split("?")[0]
|
||||
|
||||
args = ""
|
||||
headers = ""
|
||||
|
||||
def conv(x):
|
||||
return strutils.bytes_to_escaped_str(x, escape_single_quotes=True)
|
||||
|
||||
if flow.request.headers:
|
||||
lines = [
|
||||
(conv(k), conv(v)) for k, v in flow.request.headers.fields
|
||||
if conv(k).lower() not in [":authority", "host", "cookie"]
|
||||
]
|
||||
lines = [" '%s': '%s',\n" % (k, v) for k, v in lines]
|
||||
headers += "\n headers = {\n%s }\n" % "".join(lines)
|
||||
args += "\n headers=headers,"
|
||||
|
||||
params = ""
|
||||
if flow.request.query:
|
||||
lines = [
|
||||
" %s: %s,\n" % (repr(k), repr(v))
|
||||
for k, v in
|
||||
flow.request.query.collect()
|
||||
]
|
||||
params = "\n params = {\n%s }\n" % "".join(lines)
|
||||
args += "\n params=params,"
|
||||
|
||||
data = ""
|
||||
if flow.request.content:
|
||||
data = "\n data = '''%s'''\n" % conv(flow.request.content)
|
||||
args += "\n data=data,"
|
||||
|
||||
code = code.format(
|
||||
name=name,
|
||||
path=path_without_query,
|
||||
headers=headers,
|
||||
params=params,
|
||||
data=data,
|
||||
method=flow.request.method,
|
||||
args=args,
|
||||
)
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def locust_task(flow):
|
||||
code = locust_code(flow)
|
||||
start_task = len(code.split('@task')[0]) - 4
|
||||
end_task = -19 - len(code.split('### Additional')[1])
|
||||
task_code = code[start_task:end_task]
|
||||
|
||||
return task_code
|
||||
|
||||
|
||||
def url(flow):
|
||||
return flow.request.url
|
||||
|
||||
|
||||
EXPORTERS = [
|
||||
("content", "c", None),
|
||||
("headers+content", "h", None),
|
||||
("url", "u", url),
|
||||
("as curl command", "r", curl_command),
|
||||
("as python code", "p", python_code),
|
||||
("as locust code", "l", locust_code),
|
||||
("as locust task", "t", locust_task),
|
||||
]
|
||||
@@ -1,6 +1,8 @@
|
||||
import typing
|
||||
import urwid
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.tools.console import signals
|
||||
|
||||
|
||||
@@ -23,5 +25,14 @@ class CommandExecutor:
|
||||
except exceptions.CommandError as v:
|
||||
signals.status_message.send(message=str(v))
|
||||
else:
|
||||
if type(ret) == str:
|
||||
signals.status_message.send(message=ret)
|
||||
if ret:
|
||||
if type(ret) == typing.Sequence[flow.Flow]:
|
||||
signals.status_message.send(
|
||||
message="Command returned %s flows" % len(ret)
|
||||
)
|
||||
elif len(str(ret)) < 50:
|
||||
signals.status_message.send(message=str(ret))
|
||||
else:
|
||||
signals.status_message.send(
|
||||
message="Command returned too much data to display."
|
||||
)
|
||||
|
||||
@@ -9,7 +9,6 @@ import urwid.util
|
||||
import mitmproxy.net
|
||||
from functools import lru_cache
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy import export
|
||||
from mitmproxy.utils import human
|
||||
|
||||
try:
|
||||
@@ -306,28 +305,6 @@ def ask_save_body(scope, flow):
|
||||
signals.status_message.send(message="No content.")
|
||||
|
||||
|
||||
def export_to_clip_or_file(key, scope, flow, writer):
|
||||
"""
|
||||
Export selected flow to clipboard or a file.
|
||||
|
||||
key: _c_ontent, _h_eaders+content, _u_rl,
|
||||
cu_r_l_command, _p_ython_code,
|
||||
_l_ocust_code, locust_t_ask
|
||||
scope: None, _a_ll, re_q_uest, re_s_ponse
|
||||
writer: copy_to_clipboard_or_prompt, ask_save_path
|
||||
"""
|
||||
|
||||
for _, exp_key, exporter in export.EXPORTERS:
|
||||
if key == exp_key:
|
||||
if exporter is None: # 'c' & 'h'
|
||||
if scope is None:
|
||||
ask_scope_and_callback(flow, handle_flow_data, key, writer)
|
||||
else:
|
||||
handle_flow_data(scope, flow, key, writer)
|
||||
else: # other keys
|
||||
writer(exporter(flow))
|
||||
|
||||
|
||||
@lru_cache(maxsize=800)
|
||||
def raw_format_flow(f, flow):
|
||||
f = dict(f)
|
||||
|
||||
@@ -2,8 +2,8 @@ import urwid
|
||||
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import signals
|
||||
from mitmproxy.tools.console import master
|
||||
from mitmproxy.addons import view
|
||||
from mitmproxy import export
|
||||
import mitmproxy.tools.console.master # noqa
|
||||
|
||||
|
||||
@@ -140,25 +140,7 @@ class FlowItem(urwid.WidgetWrap):
|
||||
|
||||
def keypress(self, xxx_todo_changeme, key):
|
||||
(maxcol,) = xxx_todo_changeme
|
||||
key = common.shortcuts(key)
|
||||
if key == "E":
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Export to file",
|
||||
keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
callback = common.export_to_clip_or_file,
|
||||
args = (None, self.flow, common.ask_save_path)
|
||||
)
|
||||
# elif key == "C":
|
||||
# signals.status_prompt_onekey.send(
|
||||
# self,
|
||||
# prompt = "Export to clipboard",
|
||||
# keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
# callback = common.export_to_clip_or_file,
|
||||
# args = (None, self.flow, common.copy_to_clipboard_or_prompt)
|
||||
# )
|
||||
else:
|
||||
return key
|
||||
return common.shortcuts(key)
|
||||
|
||||
|
||||
class FlowListWalker(urwid.ListWalker):
|
||||
@@ -203,9 +185,7 @@ class FlowListWalker(urwid.ListWalker):
|
||||
|
||||
class FlowListBox(urwid.ListBox):
|
||||
|
||||
def __init__(
|
||||
self, master: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
) -> None:
|
||||
def __init__(self, master: master.ConsoleMaster) -> None:
|
||||
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
|
||||
super().__init__(FlowListWalker(master))
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@ import urwid
|
||||
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import export
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.net.http import status_codes
|
||||
@@ -619,29 +618,31 @@ class FlowView(tabs.Tabs):
|
||||
)
|
||||
)
|
||||
elif key == "E":
|
||||
if self.tab_offset == TAB_REQ:
|
||||
scope = "q"
|
||||
else:
|
||||
scope = "s"
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Export to file",
|
||||
keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
callback = common.export_to_clip_or_file,
|
||||
args = (scope, self.flow, common.ask_save_path)
|
||||
)
|
||||
pass
|
||||
# if self.tab_offset == TAB_REQ:
|
||||
# scope = "q"
|
||||
# else:
|
||||
# scope = "s"
|
||||
# signals.status_prompt_onekey.send(
|
||||
# self,
|
||||
# prompt = "Export to file",
|
||||
# keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
# callback = common.export_to_clip_or_file,
|
||||
# args = (scope, self.flow, common.ask_save_path)
|
||||
# )
|
||||
elif key == "C":
|
||||
if self.tab_offset == TAB_REQ:
|
||||
scope = "q"
|
||||
else:
|
||||
scope = "s"
|
||||
signals.status_prompt_onekey.send(
|
||||
self,
|
||||
prompt = "Export to clipboard",
|
||||
keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
callback = common.export_to_clip_or_file,
|
||||
args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
|
||||
)
|
||||
pass
|
||||
# if self.tab_offset == TAB_REQ:
|
||||
# scope = "q"
|
||||
# else:
|
||||
# scope = "s"
|
||||
# signals.status_prompt_onekey.send(
|
||||
# self,
|
||||
# prompt = "Export to clipboard",
|
||||
# keys = [(e[0], e[1]) for e in export.EXPORTERS],
|
||||
# callback = common.export_to_clip_or_file,
|
||||
# args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
|
||||
# )
|
||||
elif key == "x":
|
||||
conn.content = None
|
||||
signals.flow_change.send(self, flow=self.flow)
|
||||
|
||||
@@ -152,14 +152,15 @@ def default_keymap(km):
|
||||
km.add("A", "flow.resume @all", context="flowlist")
|
||||
km.add("a", "flow.resume @focus", context="flowlist")
|
||||
km.add("b", "console.command 'cut.save s.content|@focus '", context="flowlist")
|
||||
km.add("C", "console.command 'cut.clip '", context="flowlist")
|
||||
km.add("d", "view.remove @focus", context="flowlist")
|
||||
km.add("D", "view.duplicate @focus", context="flowlist")
|
||||
km.add("e", "set console_eventlog=toggle", context="flowlist")
|
||||
km.add("E", "console.command 'export.file curl @focus '", context="flowlist")
|
||||
km.add("f", "console.command 'set view_filter='", context="flowlist")
|
||||
km.add("F", "set console_focus_follow=toggle", context="flowlist")
|
||||
km.add("g", "view.go 0", context="flowlist")
|
||||
km.add("G", "view.go -1", context="flowlist")
|
||||
km.add("l", "console.command 'cut.clip '", context="flowlist")
|
||||
km.add("m", "flow.mark.toggle @focus", context="flowlist")
|
||||
km.add("r", "replay.client @focus", context="flowlist")
|
||||
km.add("S", "console.command 'replay.server '")
|
||||
|
||||
@@ -35,7 +35,6 @@ exclude =
|
||||
mitmproxy/proxy/server.py
|
||||
mitmproxy/tools/
|
||||
mitmproxy/controller.py
|
||||
mitmproxy/export.py
|
||||
mitmproxy/flow.py
|
||||
mitmproxy/io/compat.py
|
||||
mitmproxy/master.py
|
||||
@@ -54,7 +53,6 @@ exclude =
|
||||
mitmproxy/controller.py
|
||||
mitmproxy/ctx.py
|
||||
mitmproxy/exceptions.py
|
||||
mitmproxy/export.py
|
||||
mitmproxy/flow.py
|
||||
mitmproxy/io/io.py
|
||||
mitmproxy/io/compat.py
|
||||
|
||||
109
test/mitmproxy/addons/test_export.py
Normal file
109
test/mitmproxy/addons/test_export.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import pytest
|
||||
import os
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.addons import export # heh
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.test import taddons
|
||||
from unittest import mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def get_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(
|
||||
method=b'GET',
|
||||
content=b'',
|
||||
path=b"/path?a=foo&a=bar&b=baz"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def post_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(
|
||||
method=b'POST',
|
||||
headers=(),
|
||||
content=bytes(range(256))
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tcp_flow():
|
||||
return tflow.ttcpflow()
|
||||
|
||||
|
||||
class TestExportCurlCommand:
|
||||
def test_get(self, get_request):
|
||||
result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
|
||||
assert export.curl_command(get_request) == result
|
||||
|
||||
def test_post(self, post_request):
|
||||
result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
|
||||
str(bytes(range(256)))[2:-1]
|
||||
)
|
||||
assert export.curl_command(post_request) == result
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
|
||||
assert export.curl_command(patch_request) == result
|
||||
|
||||
def test_tcp(self, tcp_flow):
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
export.curl_command(tcp_flow)
|
||||
|
||||
|
||||
class TestRaw:
|
||||
def test_get(self, get_request):
|
||||
assert b"header: qvalue" in export.raw(get_request)
|
||||
|
||||
def test_tcp(self, tcp_flow):
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
export.raw(tcp_flow)
|
||||
|
||||
|
||||
def qr(f):
|
||||
with open(f, "rb") as fp:
|
||||
return fp.read()
|
||||
|
||||
|
||||
def test_export(tmpdir):
|
||||
f = str(tmpdir.join("path"))
|
||||
e = export.Export()
|
||||
with taddons.context():
|
||||
assert e.formats() == ["curl", "raw"]
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
e.file("nonexistent", tflow.tflow(resp=True), f)
|
||||
|
||||
e.file("raw", tflow.tflow(resp=True), f)
|
||||
assert qr(f)
|
||||
os.unlink(f)
|
||||
|
||||
e.file("curl", tflow.tflow(resp=True), f)
|
||||
assert qr(f)
|
||||
os.unlink(f)
|
||||
|
||||
|
||||
def test_clip(tmpdir):
|
||||
e = export.Export()
|
||||
with taddons.context():
|
||||
with pytest.raises(exceptions.CommandError):
|
||||
e.clip("nonexistent", tflow.tflow(resp=True))
|
||||
|
||||
with mock.patch('pyperclip.copy') as pc:
|
||||
e.clip("raw", tflow.tflow(resp=True))
|
||||
assert pc.called
|
||||
|
||||
with mock.patch('pyperclip.copy') as pc:
|
||||
e.clip("curl", tflow.tflow(resp=True))
|
||||
assert pc.called
|
||||
@@ -1,35 +0,0 @@
|
||||
from locust import HttpLocust, TaskSet, task
|
||||
|
||||
class UserBehavior(TaskSet):
|
||||
def on_start(self):
|
||||
''' on_start is called when a Locust start before any task is scheduled '''
|
||||
self.path()
|
||||
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
headers = {
|
||||
'header': 'qvalue',
|
||||
'content-length': '7',
|
||||
}
|
||||
|
||||
params = {
|
||||
'a': ['foo', 'bar'],
|
||||
'b': 'baz',
|
||||
}
|
||||
|
||||
self.response = self.client.request(
|
||||
method='GET',
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
|
||||
### Additional tasks can go here ###
|
||||
|
||||
|
||||
class WebsiteUser(HttpLocust):
|
||||
task_set = UserBehavior
|
||||
min_wait = 1000
|
||||
max_wait = 3000
|
||||
@@ -1,37 +0,0 @@
|
||||
from locust import HttpLocust, TaskSet, task
|
||||
|
||||
class UserBehavior(TaskSet):
|
||||
def on_start(self):
|
||||
''' on_start is called when a Locust start before any task is scheduled '''
|
||||
self.path()
|
||||
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
headers = {
|
||||
'header': 'qvalue',
|
||||
'content-length': '7',
|
||||
}
|
||||
|
||||
params = {
|
||||
'query': 'param',
|
||||
}
|
||||
|
||||
data = '''content'''
|
||||
|
||||
self.response = self.client.request(
|
||||
method='PATCH',
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
data=data,
|
||||
)
|
||||
|
||||
### Additional tasks can go here ###
|
||||
|
||||
|
||||
class WebsiteUser(HttpLocust):
|
||||
task_set = UserBehavior
|
||||
min_wait = 1000
|
||||
max_wait = 3000
|
||||
@@ -1,26 +0,0 @@
|
||||
from locust import HttpLocust, TaskSet, task
|
||||
|
||||
class UserBehavior(TaskSet):
|
||||
def on_start(self):
|
||||
''' on_start is called when a Locust start before any task is scheduled '''
|
||||
self.path()
|
||||
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
data = '''content'''
|
||||
|
||||
self.response = self.client.request(
|
||||
method='POST',
|
||||
url=url,
|
||||
data=data,
|
||||
)
|
||||
|
||||
### Additional tasks can go here ###
|
||||
|
||||
|
||||
class WebsiteUser(HttpLocust):
|
||||
task_set = UserBehavior
|
||||
min_wait = 1000
|
||||
max_wait = 3000
|
||||
@@ -1,20 +0,0 @@
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
headers = {
|
||||
'header': 'qvalue',
|
||||
'content-length': '7',
|
||||
}
|
||||
|
||||
params = {
|
||||
'a': ['foo', 'bar'],
|
||||
'b': 'baz',
|
||||
}
|
||||
|
||||
self.response = self.client.request(
|
||||
method='GET',
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
@@ -1,22 +0,0 @@
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
headers = {
|
||||
'header': 'qvalue',
|
||||
'content-length': '7',
|
||||
}
|
||||
|
||||
params = {
|
||||
'query': 'param',
|
||||
}
|
||||
|
||||
data = '''content'''
|
||||
|
||||
self.response = self.client.request(
|
||||
method='PATCH',
|
||||
url=url,
|
||||
headers=headers,
|
||||
params=params,
|
||||
data=data,
|
||||
)
|
||||
@@ -1,11 +0,0 @@
|
||||
@task()
|
||||
def path(self):
|
||||
url = self.locust.host + '/path'
|
||||
|
||||
data = '''\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'''
|
||||
|
||||
self.response = self.client.request(
|
||||
method='POST',
|
||||
url=url,
|
||||
data=data,
|
||||
)
|
||||
@@ -1,9 +0,0 @@
|
||||
import requests
|
||||
|
||||
response = requests.get(
|
||||
'http://address:22/path',
|
||||
params=[('a', 'foo'), ('a', 'bar'), ('b', 'baz')],
|
||||
headers={'header': 'qvalue'}
|
||||
)
|
||||
|
||||
print(response.text)
|
||||
@@ -1,10 +0,0 @@
|
||||
import requests
|
||||
|
||||
response = requests.patch(
|
||||
'http://address:22/path',
|
||||
params=[('query', 'param')],
|
||||
headers={'header': 'qvalue'},
|
||||
data=b'content'
|
||||
)
|
||||
|
||||
print(response.text)
|
||||
@@ -1,17 +0,0 @@
|
||||
import requests
|
||||
|
||||
response = requests.post(
|
||||
'http://address:22/path',
|
||||
data=(b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f\x10\x11\x12\x13'
|
||||
b'\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567'
|
||||
b'89:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f'
|
||||
b'\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f'
|
||||
b'\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f'
|
||||
b'\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf'
|
||||
b'\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf'
|
||||
b'\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf'
|
||||
b'\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf'
|
||||
b'\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef'
|
||||
b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff')
|
||||
)
|
||||
print(response.text)
|
||||
@@ -1,9 +0,0 @@
|
||||
import requests
|
||||
|
||||
response = requests.post(
|
||||
'http://address:22/path',
|
||||
headers={'content-type': 'application/json'},
|
||||
json={'email': 'example@example.com', 'name': 'example'}
|
||||
)
|
||||
|
||||
print(response.text)
|
||||
@@ -70,6 +70,7 @@ def test_typename():
|
||||
assert command.typename(command.Cuts, True) == "[cuts]"
|
||||
|
||||
assert command.typename(flow.Flow, False) == "flow"
|
||||
assert command.typename(typing.Sequence[str], False) == "[str]"
|
||||
|
||||
|
||||
class DummyConsole:
|
||||
|
||||
@@ -1,133 +0,0 @@
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from mitmproxy import export # heh
|
||||
from mitmproxy.net.http import Headers
|
||||
from mitmproxy.test import tflow
|
||||
from mitmproxy.test import tutils
|
||||
|
||||
|
||||
def clean_blanks(s):
|
||||
return re.sub(r"^\s+", "", s, flags=re.MULTILINE)
|
||||
|
||||
|
||||
def python_equals(testdata, text):
|
||||
"""
|
||||
Compare two bits of Python code, disregarding non-significant differences
|
||||
like whitespace on blank lines and trailing space.
|
||||
"""
|
||||
d = open(tutils.test_data.path(testdata)).read()
|
||||
assert clean_blanks(text).rstrip() == clean_blanks(d).rstrip()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def get_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(
|
||||
method=b'GET',
|
||||
content=b'',
|
||||
path=b"/path?a=foo&a=bar&b=baz"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def post_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(
|
||||
method=b'POST',
|
||||
headers=(),
|
||||
content=bytes(range(256))
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_request():
|
||||
return tflow.tflow(
|
||||
req=tutils.treq(method=b'PATCH', path=b"/path?query=param")
|
||||
)
|
||||
|
||||
|
||||
class TExport:
|
||||
def test_get(self, get_request):
|
||||
raise NotImplementedError()
|
||||
|
||||
def test_post(self, post_request):
|
||||
raise NotImplementedError()
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class TestExportCurlCommand(TExport):
|
||||
def test_get(self, get_request):
|
||||
result = """curl -H 'header:qvalue' -H 'content-length:7' 'http://address:22/path?a=foo&a=bar&b=baz'"""
|
||||
assert export.curl_command(get_request) == result
|
||||
|
||||
def test_post(self, post_request):
|
||||
result = "curl -X POST 'http://address:22/path' --data-binary '{}'".format(
|
||||
str(bytes(range(256)))[2:-1]
|
||||
)
|
||||
assert export.curl_command(post_request) == result
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'"""
|
||||
assert export.curl_command(patch_request) == result
|
||||
|
||||
|
||||
class TestExportPythonCode(TExport):
|
||||
def test_get(self, get_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/python_get.py",
|
||||
export.python_code(get_request))
|
||||
|
||||
def test_post(self, post_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/python_post.py",
|
||||
export.python_code(post_request))
|
||||
|
||||
def test_post_json(self, post_request):
|
||||
post_request.request.content = b'{"name": "example", "email": "example@example.com"}'
|
||||
post_request.request.headers = Headers(content_type="application/json")
|
||||
python_equals("mitmproxy/data/test_flow_export/python_post_json.py",
|
||||
export.python_code(post_request))
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/python_patch.py",
|
||||
export.python_code(patch_request))
|
||||
|
||||
|
||||
class TestExportLocustCode(TExport):
|
||||
def test_get(self, get_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_get.py",
|
||||
export.locust_code(get_request))
|
||||
|
||||
def test_post(self, post_request):
|
||||
post_request.request.content = b'content'
|
||||
post_request.request.headers.clear()
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_post.py",
|
||||
export.locust_code(post_request))
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_patch.py",
|
||||
export.locust_code(patch_request))
|
||||
|
||||
|
||||
class TestExportLocustTask(TExport):
|
||||
def test_get(self, get_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_task_get.py",
|
||||
export.locust_task(get_request))
|
||||
|
||||
def test_post(self, post_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_task_post.py",
|
||||
export.locust_task(post_request))
|
||||
|
||||
def test_patch(self, patch_request):
|
||||
python_equals("mitmproxy/data/test_flow_export/locust_task_patch.py",
|
||||
export.locust_task(patch_request))
|
||||
|
||||
|
||||
class TestURL:
|
||||
def test_url(self):
|
||||
flow = tflow.tflow()
|
||||
assert export.url(flow) == "http://address:22/path"
|
||||
Reference in New Issue
Block a user