mirror of
https://github.com/zhigang1992/cow.git
synced 2026-01-12 17:12:57 +08:00
New load balance strategy: latency.
Select proxy with lowest connection latency.
This commit is contained in:
@@ -24,6 +24,7 @@ type LoadBalanceMode byte
|
||||
const (
|
||||
loadBalanceBackup LoadBalanceMode = iota
|
||||
loadBalanceHash
|
||||
loadBalanceLatency
|
||||
)
|
||||
|
||||
// allow the same tunnel ports as polipo
|
||||
@@ -446,6 +447,8 @@ func (p configParser) ParseLoadBalance(val string) {
|
||||
config.LoadBalance = loadBalanceBackup
|
||||
case "hash":
|
||||
config.LoadBalance = loadBalanceHash
|
||||
case "latency":
|
||||
config.LoadBalance = loadBalanceLatency
|
||||
default:
|
||||
Fatalf("invalid loadBalance mode: %s\n", val)
|
||||
}
|
||||
|
||||
@@ -33,8 +33,9 @@ listen = http://127.0.0.1:7777
|
||||
|
||||
# 指定多个二级代理时使用的负载均衡策略,可选策略如下
|
||||
#
|
||||
# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用
|
||||
# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理
|
||||
# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用
|
||||
# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理
|
||||
# latency: 优先选择连接延迟最低的二级代理
|
||||
#
|
||||
# 一个二级代理连接失败后会依次尝试其他二级代理
|
||||
# 失败的二级代理会以一定的概率再次尝试使用,因此恢复后会重新启用
|
||||
|
||||
@@ -88,8 +88,8 @@ func estimateTimeout() {
|
||||
}
|
||||
return
|
||||
onErr:
|
||||
dialTimeout += 2
|
||||
readTimeout += 2
|
||||
dialTimeout += 2 * time.Second
|
||||
readTimeout += 2 * time.Second
|
||||
}
|
||||
|
||||
func runEstimateTimeout() {
|
||||
|
||||
193
parent_proxy.go
193
parent_proxy.go
@@ -9,12 +9,16 @@ import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Interface that all types of parent proxies should support.
|
||||
type ParentProxy interface {
|
||||
connect(*URL) (net.Conn, error)
|
||||
getServer() string // for use in updating server latency
|
||||
genConfig() string // for upgrading config
|
||||
}
|
||||
|
||||
@@ -52,6 +56,10 @@ func initParentPool() {
|
||||
case loadBalanceHash:
|
||||
debug.Println("hash parent pool", len(backPool.parent))
|
||||
parentProxy = &hashParentPool{*backPool}
|
||||
case loadBalanceLatency:
|
||||
debug.Println("latency parent pool", len(backPool.parent))
|
||||
go updateParentProxyLatency()
|
||||
parentProxy = newLatencyParentPool(backPool.parent)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,6 +157,157 @@ func connectInOrder(url *URL, pp []ParentWithFail, start int) (srvconn net.Conn,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type ParentWithLatency struct {
|
||||
ParentProxy
|
||||
latency time.Duration
|
||||
}
|
||||
|
||||
type latencyParentPool struct {
|
||||
parent []ParentWithLatency
|
||||
}
|
||||
|
||||
func newLatencyParentPool(parent []ParentWithFail) *latencyParentPool {
|
||||
lp := &latencyParentPool{}
|
||||
for _, p := range parent {
|
||||
lp.add(p.ParentProxy)
|
||||
}
|
||||
return lp
|
||||
}
|
||||
|
||||
func (pp *latencyParentPool) empty() bool {
|
||||
return len(pp.parent) == 0
|
||||
}
|
||||
|
||||
func (pp *latencyParentPool) add(parent ParentProxy) {
|
||||
pp.parent = append(pp.parent, ParentWithLatency{parent, 0})
|
||||
}
|
||||
|
||||
// Sort interface.
|
||||
func (pp *latencyParentPool) Len() int {
|
||||
return len(pp.parent)
|
||||
}
|
||||
|
||||
func (pp *latencyParentPool) Swap(i, j int) {
|
||||
p := pp.parent
|
||||
p[i], p[j] = p[j], p[i]
|
||||
}
|
||||
|
||||
func (pp *latencyParentPool) Less(i, j int) bool {
|
||||
p := pp.parent
|
||||
return p[i].latency < p[j].latency
|
||||
}
|
||||
|
||||
const latencyMax = time.Hour
|
||||
|
||||
var latencyMutex sync.RWMutex
|
||||
|
||||
func (pp *latencyParentPool) connect(url *URL) (srvconn net.Conn, err error) {
|
||||
var lp []ParentWithLatency
|
||||
// Read slice first.
|
||||
latencyMutex.RLock()
|
||||
lp = pp.parent
|
||||
latencyMutex.RUnlock()
|
||||
|
||||
var skipped []int
|
||||
nproxy := len(lp)
|
||||
if nproxy == 0 {
|
||||
return nil, errors.New("no parent proxy")
|
||||
}
|
||||
|
||||
for i := 0; i < nproxy; i++ {
|
||||
parent := lp[i]
|
||||
if parent.latency >= latencyMax {
|
||||
skipped = append(skipped, i)
|
||||
continue
|
||||
}
|
||||
if srvconn, err = parent.connect(url); err == nil {
|
||||
debug.Println("lowest latency proxy", parent.getServer())
|
||||
return
|
||||
}
|
||||
parent.latency = latencyMax
|
||||
}
|
||||
// last resort, try skipped one, not likely to succeed
|
||||
for _, skippedId := range skipped {
|
||||
if srvconn, err = lp[skippedId].connect(url); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (parent *ParentWithLatency) updateLatency(wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
proxy := parent.ParentProxy
|
||||
server := proxy.getServer()
|
||||
|
||||
host, port, err := net.SplitHostPort(server)
|
||||
if err != nil {
|
||||
panic("split host port parent server error" + err.Error())
|
||||
}
|
||||
|
||||
// Resolve host name first, so latency does not include resolve time.
|
||||
ip, err := net.LookupHost(host)
|
||||
if err != nil {
|
||||
parent.latency = latencyMax
|
||||
return
|
||||
}
|
||||
ipPort := net.JoinHostPort(ip[0], port)
|
||||
|
||||
const N = 3
|
||||
var total time.Duration
|
||||
for i := 0; i < N; i++ {
|
||||
now := time.Now()
|
||||
cn, err := net.DialTimeout("tcp", ipPort, dialTimeout)
|
||||
if err != nil {
|
||||
debug.Println("latency update dial:", err)
|
||||
total += time.Minute // 1 minute as penalty
|
||||
continue
|
||||
}
|
||||
total += time.Now().Sub(now)
|
||||
cn.Close()
|
||||
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
parent.latency = total / N
|
||||
debug.Println("latency", server, parent.latency)
|
||||
}
|
||||
|
||||
func (pp *latencyParentPool) updateLatency() {
|
||||
// Create a copy, update latency for the copy.
|
||||
var cp latencyParentPool
|
||||
cp.parent = append(cp.parent, pp.parent...)
|
||||
|
||||
// cp.parent is value instead of pointer, if we use `_, p := range cp.parent`,
|
||||
// the value in cp.parent will not be updated.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(cp.parent))
|
||||
for i, _ := range cp.parent {
|
||||
cp.parent[i].updateLatency(&wg)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Sort according to latency.
|
||||
sort.Stable(&cp)
|
||||
debug.Println("lantency lowest proxy", cp.parent[0].getServer())
|
||||
|
||||
// Update parent slice.
|
||||
latencyMutex.Lock()
|
||||
pp.parent = cp.parent
|
||||
latencyMutex.Unlock()
|
||||
}
|
||||
|
||||
func updateParentProxyLatency() {
|
||||
lp, ok := parentProxy.(*latencyParentPool)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
lp.updateLatency()
|
||||
time.Sleep(60 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// http parent proxy
|
||||
type httpParent struct {
|
||||
server string
|
||||
@@ -169,6 +328,10 @@ func newHttpParent(server string) *httpParent {
|
||||
return &httpParent{server: server}
|
||||
}
|
||||
|
||||
func (hp *httpParent) getServer() string {
|
||||
return hp.server
|
||||
}
|
||||
|
||||
func (hp *httpParent) genConfig() string {
|
||||
if hp.userPasswd != "" {
|
||||
return fmt.Sprintf("proxy = http://%s@%s", hp.userPasswd, hp.server)
|
||||
@@ -223,12 +386,16 @@ func newShadowsocksParent(server string) *shadowsocksParent {
|
||||
return &shadowsocksParent{server: server}
|
||||
}
|
||||
|
||||
func (sp *shadowsocksParent) getServer() string {
|
||||
return sp.server
|
||||
}
|
||||
|
||||
func (sp *shadowsocksParent) genConfig() string {
|
||||
if sp.method == "" {
|
||||
return fmt.Sprintf("proxy = ss://table:%s@%s", sp.passwd, sp.server)
|
||||
} else {
|
||||
return fmt.Sprintf("proxy = ss://%s:%s@%s", sp.method, sp.passwd, sp.server)
|
||||
method := sp.method
|
||||
if method == "" {
|
||||
method = "table"
|
||||
}
|
||||
return fmt.Sprintf("proxy = ss://%s:%s@%s", method, sp.passwd, sp.server)
|
||||
}
|
||||
|
||||
func (sp *shadowsocksParent) initCipher(method, passwd string) {
|
||||
@@ -255,6 +422,8 @@ func (sp *shadowsocksParent) connect(url *URL) (net.Conn, error) {
|
||||
// cow parent proxy
|
||||
type cowParent struct {
|
||||
server string
|
||||
method string
|
||||
passwd string
|
||||
cipher *ss.Cipher
|
||||
}
|
||||
|
||||
@@ -272,11 +441,19 @@ func newCowParent(srv, method, passwd string) *cowParent {
|
||||
if err != nil {
|
||||
Fatal("create cow cipher:", err)
|
||||
}
|
||||
return &cowParent{srv, cipher}
|
||||
return &cowParent{srv, method, passwd, cipher}
|
||||
}
|
||||
|
||||
func (cp *cowParent) getServer() string {
|
||||
return cp.server
|
||||
}
|
||||
|
||||
func (cp *cowParent) genConfig() string {
|
||||
return "" // no upgrading need
|
||||
method := cp.method
|
||||
if method == "" {
|
||||
method = "table"
|
||||
}
|
||||
return fmt.Sprintf("proxy = cow://%s:%s@%s", method, cp.passwd, cp.server)
|
||||
}
|
||||
|
||||
func (cp *cowParent) connect(url *URL) (net.Conn, error) {
|
||||
@@ -332,6 +509,10 @@ func newSocksParent(server string) *socksParent {
|
||||
return &socksParent{server}
|
||||
}
|
||||
|
||||
func (sp *socksParent) getServer() string {
|
||||
return sp.server
|
||||
}
|
||||
|
||||
func (sp *socksParent) genConfig() string {
|
||||
return fmt.Sprintf("proxy = socks5://%s", sp.server)
|
||||
}
|
||||
|
||||
20
proxy.go
20
proxy.go
@@ -192,11 +192,11 @@ func newCowProxy(method, passwd, addr string) *cowProxy {
|
||||
}
|
||||
|
||||
func (cp *cowProxy) genConfig() string {
|
||||
if cp.method == "" {
|
||||
return fmt.Sprintf("listen = cow://table:%s@%s", cp.passwd, cp.addr)
|
||||
} else {
|
||||
return fmt.Sprintf("listen = cow://%s:%s@%s", cp.method, cp.passwd, cp.addr)
|
||||
method := cp.method
|
||||
if method == "" {
|
||||
method = "table"
|
||||
}
|
||||
return fmt.Sprintf("listen = cow://%s:%s@%s", method, cp.passwd, cp.addr)
|
||||
}
|
||||
|
||||
func (cp *cowProxy) Addr() string {
|
||||
@@ -1333,12 +1333,14 @@ func sendBodyChunked(w io.Writer, r *bufio.Reader, rdSize int) (err error) {
|
||||
errl.Println("chunk size invalid:", err)
|
||||
return
|
||||
}
|
||||
if debug {
|
||||
// To debug getting malformed response status line with "0\r\n".
|
||||
if c, ok := w.(*clientConn); ok {
|
||||
debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s))
|
||||
/*
|
||||
if debug {
|
||||
// To debug getting malformed response status line with "0\r\n".
|
||||
if c, ok := w.(*clientConn); ok {
|
||||
debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s))
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
if size == 0 {
|
||||
r.Skip(len(s))
|
||||
if err = skipCRLF(r); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user