python3++

This commit is contained in:
Maximilian Hils
2015-09-20 19:40:09 +02:00
parent 3f1ca556d1
commit 693cdfc6d7
7 changed files with 36 additions and 28 deletions

1
.gitignore vendored
View File

@@ -13,3 +13,4 @@ _cffi__*
.eggs/
netlib.egg-info/
pathod/
.cache/

View File

@@ -22,6 +22,7 @@ matrix:
- nosetests --with-cov --cov-report term-missing test/test_encoding.py
- nosetests --with-cov --cov-report term-missing test/test_odict.py
- nosetests --with-cov --cov-report term-missing test/test_certutils.py
- nosetests --with-cov --cov-report term-missing test/test_socks.py
- python: pypy
- python: pypy
env: OPENSSL=1.0.2

View File

@@ -3,7 +3,7 @@ import os
import ssl
import time
import datetime
import itertools
from six.moves import filter
import ipaddress
import sys
@@ -396,12 +396,12 @@ class SSLCert(object):
@property
def notbefore(self):
t = self.x509.get_notBefore()
return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ")
@property
def notafter(self):
t = self.x509.get_notAfter()
return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ")
@property
def has_expired(self):

View File

@@ -1,7 +1,7 @@
from __future__ import (absolute_import, print_function, division)
import socket
import struct
import array
import ipaddress
from . import tcp, utils
@@ -133,19 +133,23 @@ class Message(object):
def from_file(cls, f):
ver, msg, rsv, atyp = struct.unpack("!BBBB", f.safe_read(4))
if rsv != 0x00:
raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE,
"Socks Request: Invalid reserved byte: %s" % rsv)
raise SocksError(
REP.GENERAL_SOCKS_SERVER_FAILURE,
"Socks Request: Invalid reserved byte: %s" % rsv
)
if atyp == ATYP.IPV4_ADDRESS:
# We use tnoa here as ntop is not commonly available on Windows.
host = socket.inet_ntoa(f.safe_read(4))
host = ipaddress.IPv4Address(f.safe_read(4)).compressed
use_ipv6 = False
elif atyp == ATYP.IPV6_ADDRESS:
host = socket.inet_ntop(socket.AF_INET6, f.safe_read(16))
host = ipaddress.IPv6Address(f.safe_read(16)).compressed
use_ipv6 = True
elif atyp == ATYP.DOMAINNAME:
length, = struct.unpack("!B", f.safe_read(1))
host = f.safe_read(length)
if not utils.is_valid_host(host):
raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, "Invalid hostname: %s" % host)
host = host.decode("idna")
use_ipv6 = False
else:
raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED,
@@ -158,12 +162,12 @@ class Message(object):
def to_file(self, f):
f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp))
if self.atyp == ATYP.IPV4_ADDRESS:
f.write(socket.inet_aton(self.addr.host))
f.write(ipaddress.IPv4Address(self.addr.host).packed)
elif self.atyp == ATYP.IPV6_ADDRESS:
f.write(socket.inet_pton(socket.AF_INET6, self.addr.host))
f.write(ipaddress.IPv6Address(self.addr.host).packed)
elif self.atyp == ATYP.DOMAINNAME:
f.write(struct.pack("!B", len(self.addr.host)))
f.write(self.addr.host)
f.write(self.addr.host.encode("idna"))
else:
raise SocksError(
REP.ADDRESS_TYPE_NOT_SUPPORTED,

View File

@@ -141,6 +141,12 @@ _label_valid = re.compile(b"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
def is_valid_host(host):
"""
Checks if a hostname is valid.
Args:
host (bytes): The hostname
"""
try:
host.decode("idna")
except ValueError:

View File

@@ -100,10 +100,10 @@ class TestDummyCert:
r = certutils.dummy_cert(
ca.default_privatekey,
ca.default_ca,
"foo.com",
["one.com", "two.com", "*.three.com"]
b"foo.com",
[b"one.com", b"two.com", b"*.three.com"]
)
assert r.cn == "foo.com"
assert r.cn == b"foo.com"
class TestSSLCert:
@@ -112,13 +112,13 @@ class TestSSLCert:
with open(tutils.test_data.path("data/text_cert"), "rb") as f:
d = f.read()
c1 = certutils.SSLCert.from_pem(d)
assert c1.cn == "google.com"
assert c1.cn == b"google.com"
assert len(c1.altnames) == 436
with open(tutils.test_data.path("data/text_cert_2"), "rb") as f:
d = f.read()
c2 = certutils.SSLCert.from_pem(d)
assert c2.cn == "www.inode.co.nz"
assert c2.cn == b"www.inode.co.nz"
assert len(c2.altnames) == 2
assert c2.digest("sha1")
assert c2.notbefore

View File

@@ -1,6 +1,6 @@
import ipaddress
from io import BytesIO
import socket
from nose.plugins.skip import SkipTest
from netlib import socks, tcp, tutils
@@ -33,7 +33,7 @@ def test_client_greeting_assert_socks5():
else:
assert False
raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100)
raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100)
msg = socks.ClientGreeting.from_file(raw)
try:
msg.assert_socks5()
@@ -64,7 +64,7 @@ def test_server_greeting():
def test_server_greeting_assert_socks5():
raw = tutils.treader(b"HTTP/1.1 200 OK" + " " * 100)
raw = tutils.treader(b"HTTP/1.1 200 OK" + b" " * 100)
msg = socks.ServerGreeting.from_file(raw)
try:
msg.assert_socks5()
@@ -74,7 +74,7 @@ def test_server_greeting_assert_socks5():
else:
assert False
raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100)
raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100)
msg = socks.ServerGreeting.from_file(raw)
try:
msg.assert_socks5()
@@ -97,7 +97,7 @@ def test_message():
assert msg.ver == 5
assert msg.msg == 0x01
assert msg.atyp == 0x03
assert msg.addr == (b"example.com", 0xDEAD)
assert msg.addr == ("example.com", 0xDEAD)
def test_message_assert_socks5():
@@ -116,20 +116,16 @@ def test_message_ipv4():
msg.to_file(out)
assert out.getvalue() == raw.getvalue()[:-2]
assert msg.addr == (b"127.0.0.1", 0xDEAD)
assert msg.addr == ("127.0.0.1", 0xDEAD)
def test_message_ipv6():
if not hasattr(socket, "inet_ntop"):
raise SkipTest("Skipped because inet_ntop is not available")
# Test ATYP=0x04 (IPV6)
ipv6_addr = "2001:db8:85a3:8d3:1319:8a2e:370:7344"
raw = tutils.treader(
b"\x05\x01\x00\x04" +
socket.inet_pton(
socket.AF_INET6,
ipv6_addr) +
ipaddress.IPv6Address(ipv6_addr).packed +
b"\xDE\xAD\xBE\xEF")
out = BytesIO()
msg = socks.Message.from_file(raw)