commands: support *args for commands

Use this to simplify meta-commands in console, and to create a console_choose
command that prompts the user for a choice, and then executes a command with
variable substitution.
This commit is contained in:
Aldo Cortesi
2017-04-30 21:20:32 +12:00
parent bcbe87bb09
commit 3cd93567f5
6 changed files with 122 additions and 47 deletions

View File

@@ -1,3 +1,6 @@
"""
This module manges and invokes typed commands.
"""
import inspect
import typing
import shlex
@@ -17,10 +20,10 @@ Cuts = typing.Sequence[
def typename(t: type, ret: bool) -> str:
"""
Translates a type to an explanatory string. Ifl ret is True, we're
Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
if t in (str, int, bool):
if issubclass(t, (str, int, bool)):
return t.__name__
elif t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
@@ -44,11 +47,20 @@ class Command:
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
self.has_positional = False
for i in sig.parameters.values():
# This is the kind for *args paramters
if i.kind == i.VAR_POSITIONAL:
self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
self.returntype = sig.return_annotation
def paramnames(self) -> typing.Sequence[str]:
return [typename(i, False) for i in self.paramtypes]
v = [typename(i, False) for i in self.paramtypes]
if self.has_positional:
v[-1] = "*" + v[-1][1:-1]
return v
def retname(self) -> str:
return typename(self.returntype, True) if self.returntype else ""
@@ -64,17 +76,31 @@ class Command:
"""
Call the command with a set of arguments. At this point, all argumets are strings.
"""
if len(self.paramtypes) != len(args):
if not self.has_positional and (len(self.paramtypes) != len(args)):
raise exceptions.CommandError("Usage: %s" % self.signature_help())
remainder = [] # type: typing.Sequence[str]
if self.has_positional:
remainder = args[len(self.paramtypes) - 1:]
args = args[:len(self.paramtypes) - 1]
pargs = []
for i in range(len(args)):
pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
if typecheck.check_command_type(args[i], self.paramtypes[i]):
pargs.append(args[i])
else:
pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
if remainder:
if typecheck.check_command_type(remainder, self.paramtypes[-1]):
pargs.extend(remainder)
else:
raise exceptions.CommandError("Invalid value type.")
with self.manager.master.handlecontext():
ret = self.func(*pargs)
if not typecheck.check_command_return_type(ret, self.returntype):
if not typecheck.check_command_type(ret, self.returntype):
raise exceptions.CommandError("Command returned unexpected data")
return ret
@@ -126,7 +152,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
if argtype == str:
if issubclass(argtype, str):
return spec
elif argtype == bool:
if spec == "true":
@@ -137,7 +163,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
raise exceptions.CommandError(
"Booleans are 'true' or 'false', got %s" % spec
)
elif argtype == int:
elif issubclass(argtype, int):
try:
return int(spec)
except ValueError as e:
@@ -153,6 +179,8 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
elif argtype == typing.Sequence[str]:
return [i.strip() for i in spec.split(",")]
else:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)

View File

@@ -2,7 +2,6 @@ import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import master
from mitmproxy.addons import view
import mitmproxy.tools.console.master # noqa
@@ -185,7 +184,9 @@ class FlowListWalker(urwid.ListWalker):
class FlowListBox(urwid.ListBox):
def __init__(self, master: master.ConsoleMaster) -> None:
def __init__(
self, master: "mitmproxy.tools.console.master.ConsoleMaster"
) -> None:
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
super().__init__(FlowListWalker(master))

View File

@@ -9,9 +9,11 @@ import subprocess
import sys
import tempfile
import traceback
import typing
import urwid
from mitmproxy import ctx
from mitmproxy import addons
from mitmproxy import command
from mitmproxy import master
@@ -84,12 +86,31 @@ class ConsoleAddon:
self.master = master
self.started = False
@command.command("console.choose")
def console_choose(
self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
command, then invoke another command with all occurances of {choice}
replaced by the choice the user made.
"""
choices = ctx.master.commands.call_args(choicecmd, [])
def callback(opt):
repl = " ".join(cmd)
repl = repl.replace("{choice}", opt)
self.master.commands.call(repl)
self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
ctx.log.info(choices)
@command.command("console.command")
def console_command(self, partial: str) -> None:
def console_command(self, *partial: typing.Sequence[str]) -> None:
"""
Prompt the user to edit a command with a (possilby empty) starting value.
"""
signals.status_prompt_command.send(partial=partial)
signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore
@command.command("console.view.commands")
def view_commands(self) -> None:
@@ -146,16 +167,21 @@ def default_keymap(km):
km.add("O", "console.view.options")
km.add("Q", "console.exit")
km.add("q", "console.view.pop")
km.add("i", "console.command 'set intercept='")
km.add("W", "console.command 'set save_stream_file='")
km.add("i", "console.command set intercept=")
km.add("W", "console.command set save_stream_file=")
km.add("A", "flow.resume @all", context="flowlist")
km.add("a", "flow.resume @focus", context="flowlist")
km.add("b", "console.command 'cut.save s.content|@focus '", context="flowlist")
km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
km.add("d", "view.remove @focus", context="flowlist")
km.add("D", "view.duplicate @focus", context="flowlist")
km.add("e", "set console_eventlog=toggle", context="flowlist")
km.add("E", "console.command 'export.file curl @focus '", context="flowlist")
km.add(
"E",
"console.choose Format export.formats "
"console.command export.file {choice} @focus ''",
context="flowlist"
)
km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist")
km.add("g", "view.go 0", context="flowlist")

View File

@@ -1,7 +1,7 @@
import typing
def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
Check if the provided value is an instance of typeinfo. Returns True if the
types match, False otherwise. This function supports only those types
@@ -17,7 +17,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
if not isinstance(value, (tuple, list)):
return False
for v in value:
if not check_command_return_type(v, T):
if not check_command_type(v, T):
return False
elif typename.startswith("typing.Union"):
try:
@@ -26,7 +26,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
# Python 3.5.x
types = typeinfo.__union_params__ # type: ignore
for T in types:
checks = [check_command_return_type(value, T) for T in types]
checks = [check_command_type(value, T) for T in types]
if not any(checks):
return False
elif value is None and typeinfo is None:

View File

@@ -1,9 +1,6 @@
import typing
from mitmproxy import command
from mitmproxy import flow
from mitmproxy import master
from mitmproxy import options
from mitmproxy import proxy
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
@@ -19,24 +16,41 @@ class TAddon:
def cmd2(self, foo: str) -> str:
return 99
def cmd3(self, foo: int) -> int:
return foo
def empty(self) -> None:
pass
def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]:
return list(var)
class TestCommand:
def test_varargs(self):
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "varargs", a.varargs)
assert c.signature_help() == "varargs str *str -> [str]"
assert c.call(["one", "two", "three"]) == ["two", "three"]
with pytest.raises(exceptions.CommandError):
c.call(["one", "two", 3])
def test_call(self):
o = options.Options()
m = master.Master(o, proxy.DummyServer(o))
cm = command.CommandManager(m)
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
assert c.signature_help() == "cmd.path str -> str"
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
assert c.signature_help() == "cmd.path str -> str"
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
c.call(["foo"])
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
c.call(["foo"])
c = command.Command(cm, "cmd.three", a.cmd3)
assert c.call(["1"]) == 1
def test_simple():
@@ -74,14 +88,12 @@ def test_typename():
class DummyConsole:
def load(self, l):
l.add_command("view.resolve", self.resolve)
l.add_command("cut", self.cut)
@command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
@command.command("cut")
def cut(self, spec: str) -> command.Cuts:
return [["test"]]
@@ -115,6 +127,13 @@ def test_parsearg():
tctx.master.commands, "foo", command.Cuts
) == [["test"]]
assert command.parsearg(
tctx.master.commands, "foo", typing.Sequence[str]
) == ["foo"]
assert command.parsearg(
tctx.master.commands, "foo, bar", typing.Sequence[str]
) == ["foo", "bar"]
class TDec:
@command.command("cmd1")

View File

@@ -88,25 +88,26 @@ def test_check_any():
typecheck.check_option_type("foo", None, typing.Any)
def test_check_command_return_type():
assert(typecheck.check_command_return_type("foo", str))
assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
assert(typecheck.check_command_return_type(None, None))
assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
assert(typecheck.check_command_return_type([["foo", b"bar"]], command.Cuts))
assert(not typecheck.check_command_return_type(["foo", b"bar"], command.Cuts))
assert(not typecheck.check_command_return_type([["foo", 22]], command.Cuts))
def test_check_command_type():
assert(typecheck.check_command_type("foo", str))
assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
assert(typecheck.check_command_type(None, None))
assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
typecheck.check_command_return_type([10], m)
typecheck.check_command_type([10], m)
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
assert not typecheck.check_command_return_type([22], m)
assert not typecheck.check_command_type([22], m)