mirror of
https://github.com/zhigang1992/mitmproxy.git
synced 2026-04-24 04:14:57 +08:00
addons.proxyauth: out with the old, in with the new
- Strip out old auth mechanisms, and enable addon - Disable web app auth for now - this should just use the Tornado auth stuff
This commit is contained in:
@@ -99,12 +99,11 @@ HTTP Events
|
||||
:header-rows: 0
|
||||
|
||||
* - .. py:function:: http_connect(flow)
|
||||
|
||||
- Called when we receive an HTTP CONNECT request. Setting a non 2xx
|
||||
response on the flow will return the response to the client abort the
|
||||
connection. CONNECT requests and responses do not generate the usual
|
||||
HTTP handler events. CONNECT requests are only valid in regular and
|
||||
upstream proxy modes.
|
||||
response on the flow will return the response to the client abort the
|
||||
connection. CONNECT requests and responses do not generate the usual
|
||||
HTTP handler events. CONNECT requests are only valid in regular and
|
||||
upstream proxy modes.
|
||||
|
||||
*flow*
|
||||
A ``models.HTTPFlow`` object. The flow is guaranteed to have
|
||||
|
||||
@@ -3,6 +3,7 @@ from mitmproxy.addons import anticomp
|
||||
from mitmproxy.addons import clientplayback
|
||||
from mitmproxy.addons import streamfile
|
||||
from mitmproxy.addons import onboarding
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy.addons import replace
|
||||
from mitmproxy.addons import script
|
||||
from mitmproxy.addons import setheaders
|
||||
@@ -16,6 +17,7 @@ from mitmproxy.addons import upstream_auth
|
||||
def default_addons():
|
||||
return [
|
||||
onboarding.Onboarding(),
|
||||
proxyauth.ProxyAuth(),
|
||||
anticache.AntiCache(),
|
||||
anticomp.AntiComp(),
|
||||
stickyauth.StickyAuth(),
|
||||
|
||||
@@ -10,6 +10,13 @@ import mitmproxy.net.http
|
||||
REALM = "mitmproxy"
|
||||
|
||||
|
||||
def mkauth(username, password, scheme="basic"):
|
||||
v = binascii.b2a_base64(
|
||||
(username + ":" + password).encode("utf8")
|
||||
).decode("ascii")
|
||||
return scheme + " " + v
|
||||
|
||||
|
||||
def parse_http_basic_auth(s):
|
||||
words = s.split()
|
||||
if len(words) != 2:
|
||||
|
||||
@@ -1,176 +0,0 @@
|
||||
import argparse
|
||||
import binascii
|
||||
|
||||
|
||||
def parse_http_basic_auth(s):
|
||||
words = s.split()
|
||||
if len(words) != 2:
|
||||
return None
|
||||
scheme = words[0]
|
||||
try:
|
||||
user = binascii.a2b_base64(words[1]).decode("utf8", "replace")
|
||||
except binascii.Error:
|
||||
return None
|
||||
parts = user.split(':')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
return scheme, parts[0], parts[1]
|
||||
|
||||
|
||||
def assemble_http_basic_auth(scheme, username, password):
|
||||
v = binascii.b2a_base64((username + ":" + password).encode("utf8")).decode("ascii")
|
||||
return scheme + " " + v
|
||||
|
||||
|
||||
class NullProxyAuth:
|
||||
|
||||
"""
|
||||
No proxy auth at all (returns empty challange headers)
|
||||
"""
|
||||
|
||||
def __init__(self, password_manager):
|
||||
self.password_manager = password_manager
|
||||
|
||||
def clean(self, headers_):
|
||||
"""
|
||||
Clean up authentication headers, so they're not passed upstream.
|
||||
"""
|
||||
|
||||
def authenticate(self, headers_):
|
||||
"""
|
||||
Tests that the user is allowed to use the proxy
|
||||
"""
|
||||
return True
|
||||
|
||||
def auth_challenge_headers(self):
|
||||
"""
|
||||
Returns a dictionary containing the headers require to challenge the user
|
||||
"""
|
||||
return {}
|
||||
|
||||
|
||||
class BasicAuth(NullProxyAuth):
|
||||
CHALLENGE_HEADER = None
|
||||
AUTH_HEADER = None
|
||||
|
||||
def __init__(self, password_manager, realm):
|
||||
NullProxyAuth.__init__(self, password_manager)
|
||||
self.realm = realm
|
||||
|
||||
def clean(self, headers):
|
||||
del headers[self.AUTH_HEADER]
|
||||
|
||||
def authenticate(self, headers):
|
||||
auth_value = headers.get(self.AUTH_HEADER)
|
||||
if not auth_value:
|
||||
return False
|
||||
parts = parse_http_basic_auth(auth_value)
|
||||
if not parts:
|
||||
return False
|
||||
scheme, username, password = parts
|
||||
if scheme.lower() != 'basic':
|
||||
return False
|
||||
if not self.password_manager.test(username, password):
|
||||
return False
|
||||
self.username = username
|
||||
return True
|
||||
|
||||
def auth_challenge_headers(self):
|
||||
return {self.CHALLENGE_HEADER: 'Basic realm="%s"' % self.realm}
|
||||
|
||||
|
||||
class BasicWebsiteAuth(BasicAuth):
|
||||
CHALLENGE_HEADER = 'WWW-Authenticate'
|
||||
AUTH_HEADER = 'Authorization'
|
||||
|
||||
|
||||
class BasicProxyAuth(BasicAuth):
|
||||
CHALLENGE_HEADER = 'Proxy-Authenticate'
|
||||
AUTH_HEADER = 'Proxy-Authorization'
|
||||
|
||||
|
||||
class PassMan:
|
||||
|
||||
def test(self, username_, password_token_):
|
||||
return False
|
||||
|
||||
|
||||
class PassManNonAnon(PassMan):
|
||||
|
||||
"""
|
||||
Ensure the user specifies a username, accept any password.
|
||||
"""
|
||||
|
||||
def test(self, username, password_token_):
|
||||
if username:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class PassManHtpasswd(PassMan):
|
||||
|
||||
"""
|
||||
Read usernames and passwords from an htpasswd file
|
||||
"""
|
||||
|
||||
def __init__(self, path):
|
||||
"""
|
||||
Raises ValueError if htpasswd file is invalid.
|
||||
"""
|
||||
import passlib.apache
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(path)
|
||||
|
||||
def test(self, username, password_token):
|
||||
return bool(self.htpasswd.check_password(username, password_token))
|
||||
|
||||
|
||||
class PassManSingleUser(PassMan):
|
||||
|
||||
def __init__(self, username, password):
|
||||
self.username, self.password = username, password
|
||||
|
||||
def test(self, username, password_token):
|
||||
return self.username == username and self.password == password_token
|
||||
|
||||
|
||||
class AuthAction(argparse.Action):
|
||||
|
||||
"""
|
||||
Helper class to allow seamless integration int argparse. Example usage:
|
||||
parser.add_argument(
|
||||
"--nonanonymous",
|
||||
action=NonanonymousAuthAction, nargs=0,
|
||||
help="Allow access to any user long as a credentials are specified."
|
||||
)
|
||||
"""
|
||||
|
||||
def __call__(self, parser, namespace, values, option_string=None):
|
||||
passman = self.getPasswordManager(values)
|
||||
authenticator = BasicProxyAuth(passman, "mitmproxy")
|
||||
setattr(namespace, self.dest, authenticator)
|
||||
|
||||
def getPasswordManager(self, s): # pragma: no cover
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class SingleuserAuthAction(AuthAction):
|
||||
|
||||
def getPasswordManager(self, s):
|
||||
if len(s.split(':')) != 2:
|
||||
raise argparse.ArgumentTypeError(
|
||||
"Invalid single-user specification. Please use the format username:password"
|
||||
)
|
||||
username, password = s.split(':')
|
||||
return PassManSingleUser(username, password)
|
||||
|
||||
|
||||
class NonanonymousAuthAction(AuthAction):
|
||||
|
||||
def getPasswordManager(self, s):
|
||||
return PassManNonAnon()
|
||||
|
||||
|
||||
class HtpasswdAuthAction(AuthAction):
|
||||
|
||||
def getPasswordManager(self, s):
|
||||
return PassManHtpasswd(s)
|
||||
@@ -9,7 +9,6 @@ from mitmproxy import exceptions
|
||||
from mitmproxy import options as moptions
|
||||
from mitmproxy import certs
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net.http import authentication
|
||||
from mitmproxy.net.http import url
|
||||
|
||||
CONF_BASENAME = "mitmproxy"
|
||||
@@ -58,7 +57,6 @@ class ProxyConfig:
|
||||
def __init__(self, options: moptions.Options) -> None:
|
||||
self.options = options
|
||||
|
||||
self.authenticator = None
|
||||
self.check_ignore = None
|
||||
self.check_tcp = None
|
||||
self.certstore = None
|
||||
@@ -124,49 +122,3 @@ class ProxyConfig:
|
||||
self.upstream_server = None
|
||||
if options.upstream_server:
|
||||
self.upstream_server = parse_server_spec(options.upstream_server)
|
||||
|
||||
self.authenticator = authentication.NullProxyAuth(None)
|
||||
needsauth = any(
|
||||
[
|
||||
options.auth_nonanonymous,
|
||||
options.auth_singleuser,
|
||||
options.auth_htpasswd
|
||||
]
|
||||
)
|
||||
if needsauth:
|
||||
if options.mode == "transparent":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in transparent mode."
|
||||
)
|
||||
elif options.mode == "socks5":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in SOCKS mode. "
|
||||
"https://github.com/mitmproxy/mitmproxy/issues/738"
|
||||
)
|
||||
elif options.auth_singleuser:
|
||||
parts = options.auth_singleuser.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user specification. "
|
||||
"Please use the format username:password"
|
||||
)
|
||||
password_manager = authentication.PassManSingleUser(*parts)
|
||||
elif options.auth_nonanonymous:
|
||||
password_manager = authentication.PassManNonAnon()
|
||||
elif options.auth_htpasswd:
|
||||
try:
|
||||
password_manager = authentication.PassManHtpasswd(
|
||||
options.auth_htpasswd
|
||||
)
|
||||
except ValueError as v:
|
||||
raise exceptions.OptionsError(str(v))
|
||||
if options.mode == "reverse":
|
||||
self.authenticator = authentication.BasicWebsiteAuth(
|
||||
password_manager,
|
||||
self.upstream_server.address
|
||||
)
|
||||
else:
|
||||
self.authenticator = authentication.BasicProxyAuth(
|
||||
password_manager,
|
||||
"mitmproxy"
|
||||
)
|
||||
|
||||
@@ -8,7 +8,6 @@ from mitmproxy import http
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.proxy.protocol import base
|
||||
from mitmproxy.proxy.protocol import websockets as pwebsockets
|
||||
import mitmproxy.net.http
|
||||
from mitmproxy.net import tcp
|
||||
from mitmproxy.net import websockets
|
||||
|
||||
@@ -124,7 +123,6 @@ class HTTPMode(enum.Enum):
|
||||
upstream = 3
|
||||
|
||||
|
||||
FIRSTLINES = set(["absolute", "relative", "authority"])
|
||||
# At this point, we see only a subset of the proxy modes
|
||||
MODE_REQUEST_FORMS = {
|
||||
HTTPMode.regular: ("authority", "absolute"),
|
||||
@@ -270,13 +268,6 @@ class HttpLayer(base.Layer):
|
||||
|
||||
self.log("request", "debug", [repr(request)])
|
||||
|
||||
# Handle Proxy Authentication Proxy Authentication conceptually does
|
||||
# not work in transparent mode. We catch this misconfiguration on
|
||||
# startup. Here, we sort out requests after a successful CONNECT
|
||||
# request (which do not need to be validated anymore)
|
||||
if not self.connect_request and not self.authenticate(request):
|
||||
return False
|
||||
|
||||
# update host header in reverse proxy mode
|
||||
if self.config.options.mode == "reverse":
|
||||
f.request.headers["Host"] = self.config.upstream_server.address.host
|
||||
@@ -455,27 +446,3 @@ class HttpLayer(base.Layer):
|
||||
self.connect()
|
||||
if tls:
|
||||
raise exceptions.HttpProtocolException("Cannot change scheme in upstream proxy mode.")
|
||||
|
||||
def authenticate(self, request) -> bool:
|
||||
if self.config.authenticator:
|
||||
if self.config.authenticator.authenticate(request.headers):
|
||||
self.config.authenticator.clean(request.headers)
|
||||
else:
|
||||
if self.mode == HTTPMode.transparent:
|
||||
self.send_response(http.make_error_response(
|
||||
401,
|
||||
"Authentication Required",
|
||||
mitmproxy.net.http.Headers(
|
||||
**self.config.authenticator.auth_challenge_headers()
|
||||
)
|
||||
))
|
||||
else:
|
||||
self.send_response(http.make_error_response(
|
||||
407,
|
||||
"Proxy Authentication Required",
|
||||
mitmproxy.net.http.Headers(
|
||||
**self.config.authenticator.auth_challenge_headers()
|
||||
)
|
||||
))
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -12,7 +12,6 @@ from mitmproxy.addons import intercept
|
||||
from mitmproxy import options
|
||||
from mitmproxy import master
|
||||
from mitmproxy.tools.web import app
|
||||
from mitmproxy.net.http import authentication
|
||||
|
||||
|
||||
class Stop(Exception):
|
||||
@@ -52,7 +51,7 @@ class Options(options.Options):
|
||||
wdebug: bool = False,
|
||||
wport: int = 8081,
|
||||
wiface: str = "127.0.0.1",
|
||||
wauthenticator: Optional[authentication.PassMan] = None,
|
||||
# wauthenticator: Optional[authentication.PassMan] = None,
|
||||
wsingleuser: Optional[str] = None,
|
||||
whtpasswd: Optional[str] = None,
|
||||
**kwargs
|
||||
@@ -60,29 +59,30 @@ class Options(options.Options):
|
||||
self.wdebug = wdebug
|
||||
self.wport = wport
|
||||
self.wiface = wiface
|
||||
self.wauthenticator = wauthenticator
|
||||
self.wsingleuser = wsingleuser
|
||||
self.whtpasswd = whtpasswd
|
||||
# self.wauthenticator = wauthenticator
|
||||
# self.wsingleuser = wsingleuser
|
||||
# self.whtpasswd = whtpasswd
|
||||
self.intercept = intercept
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# TODO: This doesn't belong here.
|
||||
def process_web_options(self, parser):
|
||||
if self.wsingleuser or self.whtpasswd:
|
||||
if self.wsingleuser:
|
||||
if len(self.wsingleuser.split(':')) != 2:
|
||||
return parser.error(
|
||||
"Invalid single-user specification. Please use the format username:password"
|
||||
)
|
||||
username, password = self.wsingleuser.split(':')
|
||||
self.wauthenticator = authentication.PassManSingleUser(username, password)
|
||||
elif self.whtpasswd:
|
||||
try:
|
||||
self.wauthenticator = authentication.PassManHtpasswd(self.whtpasswd)
|
||||
except ValueError as v:
|
||||
return parser.error(v.message)
|
||||
else:
|
||||
self.wauthenticator = None
|
||||
# if self.wsingleuser or self.whtpasswd:
|
||||
# if self.wsingleuser:
|
||||
# if len(self.wsingleuser.split(':')) != 2:
|
||||
# return parser.error(
|
||||
# "Invalid single-user specification. Please use the format username:password"
|
||||
# )
|
||||
# username, password = self.wsingleuser.split(':')
|
||||
# # self.wauthenticator = authentication.PassManSingleUser(username, password)
|
||||
# elif self.whtpasswd:
|
||||
# try:
|
||||
# self.wauthenticator = authentication.PassManHtpasswd(self.whtpasswd)
|
||||
# except ValueError as v:
|
||||
# return parser.error(v.message)
|
||||
# else:
|
||||
# self.wauthenticator = None
|
||||
pass
|
||||
|
||||
|
||||
class WebMaster(master.Master):
|
||||
@@ -98,7 +98,7 @@ class WebMaster(master.Master):
|
||||
self.addons.add(*addons.default_addons())
|
||||
self.addons.add(self.view, intercept.Intercept())
|
||||
self.app = app.Application(
|
||||
self, self.options.wdebug, self.options.wauthenticator
|
||||
self, self.options.wdebug, False
|
||||
)
|
||||
# This line is just for type hinting
|
||||
self.options = self.options # type: Options
|
||||
|
||||
@@ -7,16 +7,9 @@ from mitmproxy.test import tutils
|
||||
from mitmproxy.addons import proxyauth
|
||||
|
||||
|
||||
def mkauth(username, password, scheme="basic"):
|
||||
v = binascii.b2a_base64(
|
||||
(username + ":" + password).encode("utf8")
|
||||
).decode("ascii")
|
||||
return scheme + " " + v
|
||||
|
||||
|
||||
def test_parse_http_basic_auth():
|
||||
assert proxyauth.parse_http_basic_auth(
|
||||
mkauth("test", "test")
|
||||
proxyauth.mkauth("test", "test")
|
||||
) == ("basic", "test", "test")
|
||||
assert not proxyauth.parse_http_basic_auth("")
|
||||
assert not proxyauth.parse_http_basic_auth("foo bar")
|
||||
@@ -92,19 +85,23 @@ def test_check():
|
||||
ctx.configure(up, auth_nonanonymous=True)
|
||||
f = tflow.tflow()
|
||||
assert not up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = "invalid"
|
||||
assert not up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = mkauth(
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test", scheme = "unknown"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:test")
|
||||
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:foo")
|
||||
assert not up.check(f)
|
||||
@@ -116,9 +113,13 @@ def test_check():
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = mkauth("test", "foo")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "foo"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
|
||||
@@ -133,7 +134,9 @@ def test_authenticate():
|
||||
assert f.response.status_code == 407
|
||||
|
||||
f = tflow.tflow()
|
||||
f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Proxy-Authorization")
|
||||
@@ -146,7 +149,9 @@ def test_authenticate():
|
||||
|
||||
f = tflow.tflow()
|
||||
f.mode = "transparent"
|
||||
f.request.headers["Authorization"] = mkauth("test", "test")
|
||||
f.request.headers["Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Authorization")
|
||||
|
||||
@@ -1,122 +0,0 @@
|
||||
import binascii
|
||||
|
||||
from mitmproxy.test import tutils
|
||||
from mitmproxy.net.http import authentication, Headers
|
||||
|
||||
|
||||
def test_parse_http_basic_auth():
|
||||
vals = ("basic", "foo", "bar")
|
||||
assert authentication.parse_http_basic_auth(
|
||||
authentication.assemble_http_basic_auth(*vals)
|
||||
) == vals
|
||||
assert not authentication.parse_http_basic_auth("")
|
||||
assert not authentication.parse_http_basic_auth("foo bar")
|
||||
v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
|
||||
assert not authentication.parse_http_basic_auth(v)
|
||||
|
||||
|
||||
class TestPassManNonAnon:
|
||||
|
||||
def test_simple(self):
|
||||
p = authentication.PassManNonAnon()
|
||||
assert not p.test("", "")
|
||||
assert p.test("user", "")
|
||||
|
||||
|
||||
class TestPassManHtpasswd:
|
||||
|
||||
def test_file_errors(self):
|
||||
tutils.raises(
|
||||
"malformed htpasswd file",
|
||||
authentication.PassManHtpasswd,
|
||||
tutils.test_data.path("mitmproxy/net/data/server.crt"))
|
||||
|
||||
def test_simple(self):
|
||||
pm = authentication.PassManHtpasswd(tutils.test_data.path("mitmproxy/net/data/htpasswd"))
|
||||
|
||||
vals = ("basic", "test", "test")
|
||||
authentication.assemble_http_basic_auth(*vals)
|
||||
assert pm.test("test", "test")
|
||||
assert not pm.test("test", "foo")
|
||||
assert not pm.test("foo", "test")
|
||||
assert not pm.test("test", "")
|
||||
assert not pm.test("", "")
|
||||
|
||||
|
||||
class TestPassManSingleUser:
|
||||
|
||||
def test_simple(self):
|
||||
pm = authentication.PassManSingleUser("test", "test")
|
||||
assert pm.test("test", "test")
|
||||
assert not pm.test("test", "foo")
|
||||
assert not pm.test("foo", "test")
|
||||
|
||||
|
||||
class TestNullProxyAuth:
|
||||
|
||||
def test_simple(self):
|
||||
na = authentication.NullProxyAuth(authentication.PassManNonAnon())
|
||||
assert not na.auth_challenge_headers()
|
||||
assert na.authenticate("foo")
|
||||
na.clean({})
|
||||
|
||||
|
||||
class TestBasicProxyAuth:
|
||||
|
||||
def test_simple(self):
|
||||
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
|
||||
headers = Headers()
|
||||
assert ba.auth_challenge_headers()
|
||||
assert not ba.authenticate(headers)
|
||||
|
||||
def test_authenticate_clean(self):
|
||||
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
|
||||
|
||||
headers = Headers()
|
||||
vals = ("basic", "foo", "bar")
|
||||
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
|
||||
assert ba.authenticate(headers)
|
||||
|
||||
ba.clean(headers)
|
||||
assert ba.AUTH_HEADER not in headers
|
||||
|
||||
headers[ba.AUTH_HEADER] = ""
|
||||
assert not ba.authenticate(headers)
|
||||
|
||||
headers[ba.AUTH_HEADER] = "foo"
|
||||
assert not ba.authenticate(headers)
|
||||
|
||||
vals = ("foo", "foo", "bar")
|
||||
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
|
||||
assert not ba.authenticate(headers)
|
||||
|
||||
ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
|
||||
vals = ("basic", "foo", "bar")
|
||||
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
|
||||
assert not ba.authenticate(headers)
|
||||
|
||||
|
||||
class Bunch:
|
||||
pass
|
||||
|
||||
|
||||
class TestAuthAction:
|
||||
|
||||
def test_nonanonymous(self):
|
||||
m = Bunch()
|
||||
aa = authentication.NonanonymousAuthAction(None, "authenticator")
|
||||
aa(None, m, None, None)
|
||||
assert m.authenticator
|
||||
|
||||
def test_singleuser(self):
|
||||
m = Bunch()
|
||||
aa = authentication.SingleuserAuthAction(None, "authenticator")
|
||||
aa(None, m, "foo:bar", None)
|
||||
assert m.authenticator
|
||||
tutils.raises("invalid", aa, None, m, "foo", None)
|
||||
|
||||
def test_httppasswd(self):
|
||||
m = Bunch()
|
||||
aa = authentication.HtpasswdAuthAction(None, "authenticator")
|
||||
aa(None, m, tutils.test_data.path("mitmproxy/net/data/htpasswd"), None)
|
||||
assert m.authenticator
|
||||
@@ -67,7 +67,8 @@ class TestBasic(tservers.HTTPProxyTest, SequenceTester):
|
||||
da
|
||||
"""
|
||||
)
|
||||
assert e.called[-1] == "requestheaders"
|
||||
assert "requestheaders" in e.called
|
||||
assert "responseheaders" not in e.called
|
||||
|
||||
def test_connect(self):
|
||||
e = Eventer()
|
||||
|
||||
@@ -113,13 +113,6 @@ class TestProcessProxyOptions:
|
||||
self.assert_err("expected one argument", "--upstream-auth")
|
||||
self.assert_err("mutually exclusive", "-R", "http://localhost", "-T")
|
||||
|
||||
def test_socks_auth(self):
|
||||
self.assert_err(
|
||||
"Proxy Authentication not supported in SOCKS mode.",
|
||||
"--socks",
|
||||
"--nonanonymous"
|
||||
)
|
||||
|
||||
def test_client_certs(self):
|
||||
with tutils.tmpdir() as cadir:
|
||||
self.assert_noerr("--client-certs", cadir)
|
||||
@@ -137,26 +130,6 @@ class TestProcessProxyOptions:
|
||||
tutils.test_data.path("mitmproxy/data/testkey.pem"))
|
||||
self.assert_err("does not exist", "--cert", "nonexistent")
|
||||
|
||||
def test_auth(self):
|
||||
p = self.assert_noerr("--nonanonymous")
|
||||
assert p.authenticator
|
||||
|
||||
p = self.assert_noerr(
|
||||
"--htpasswd",
|
||||
tutils.test_data.path("mitmproxy/data/htpasswd"))
|
||||
assert p.authenticator
|
||||
self.assert_err(
|
||||
"malformed htpasswd file",
|
||||
"--htpasswd",
|
||||
tutils.test_data.path("mitmproxy/data/htpasswd.invalid"))
|
||||
|
||||
p = self.assert_noerr("--singleuser", "test:test")
|
||||
assert p.authenticator
|
||||
self.assert_err(
|
||||
"invalid single-user specification",
|
||||
"--singleuser",
|
||||
"test")
|
||||
|
||||
def test_insecure(self):
|
||||
p = self.assert_noerr("--insecure")
|
||||
assert p.openssl_verification_mode_server == SSL.VERIFY_NONE
|
||||
|
||||
@@ -6,6 +6,7 @@ from mitmproxy.test import tutils
|
||||
from mitmproxy import controller
|
||||
from mitmproxy import options
|
||||
from mitmproxy.addons import script
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy import http
|
||||
from mitmproxy.proxy.config import HostMatcher, parse_server_spec
|
||||
import mitmproxy.net.http
|
||||
@@ -13,7 +14,6 @@ from mitmproxy.net import tcp
|
||||
from mitmproxy.net import socks
|
||||
from mitmproxy import certs
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.net.http import authentication
|
||||
from mitmproxy.net.http import http1
|
||||
from mitmproxy.net.tcp import Address
|
||||
from pathod import pathoc
|
||||
@@ -285,6 +285,7 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
|
||||
|
||||
class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
def test_auth(self):
|
||||
self.master.addons.add(proxyauth.ProxyAuth())
|
||||
self.master.options.auth_singleuser = "test:test"
|
||||
assert self.pathod("202").status_code == 407
|
||||
p = self.pathoc()
|
||||
@@ -295,14 +296,15 @@ class TestHTTPAuth(tservers.HTTPProxyTest):
|
||||
h'%s'='%s'
|
||||
""" % (
|
||||
self.server.port,
|
||||
mitmproxy.net.http.authentication.BasicProxyAuth.AUTH_HEADER,
|
||||
authentication.assemble_http_basic_auth("basic", "test", "test")
|
||||
"Proxy-Authorization",
|
||||
proxyauth.mkauth("test", "test")
|
||||
))
|
||||
assert ret.status_code == 202
|
||||
|
||||
|
||||
class TestHTTPReverseAuth(tservers.ReverseProxyTest):
|
||||
def test_auth(self):
|
||||
self.master.addons.add(proxyauth.ProxyAuth())
|
||||
self.master.options.auth_singleuser = "test:test"
|
||||
assert self.pathod("202").status_code == 401
|
||||
p = self.pathoc()
|
||||
@@ -312,8 +314,8 @@ class TestHTTPReverseAuth(tservers.ReverseProxyTest):
|
||||
'/p/202'
|
||||
h'%s'='%s'
|
||||
""" % (
|
||||
mitmproxy.net.http.authentication.BasicWebsiteAuth.AUTH_HEADER,
|
||||
authentication.assemble_http_basic_auth("basic", "test", "test")
|
||||
"Authorization",
|
||||
proxyauth.mkauth("test", "test")
|
||||
))
|
||||
assert ret.status_code == 202
|
||||
|
||||
|
||||
Reference in New Issue
Block a user