From edea3583d44ee6bc4633c6b309852dea9f8f77f5 Mon Sep 17 00:00:00 2001 From: Chen Yufei Date: Wed, 9 Jul 2014 03:49:59 +0800 Subject: [PATCH] New load balance strategy: latency. Select proxy with lowest connection latency. --- config.go | 3 + doc/sample-config/rc | 5 +- estimate_timeout.go | 4 +- parent_proxy.go | 193 +++++++++++++++++++++++++++++++++++++++++-- proxy.go | 20 +++-- 5 files changed, 206 insertions(+), 19 deletions(-) diff --git a/config.go b/config.go index 47242cc..f964278 100644 --- a/config.go +++ b/config.go @@ -24,6 +24,7 @@ type LoadBalanceMode byte const ( loadBalanceBackup LoadBalanceMode = iota loadBalanceHash + loadBalanceLatency ) // allow the same tunnel ports as polipo @@ -446,6 +447,8 @@ func (p configParser) ParseLoadBalance(val string) { config.LoadBalance = loadBalanceBackup case "hash": config.LoadBalance = loadBalanceHash + case "latency": + config.LoadBalance = loadBalanceLatency default: Fatalf("invalid loadBalance mode: %s\n", val) } diff --git a/doc/sample-config/rc b/doc/sample-config/rc index f00578c..4fcbc3c 100644 --- a/doc/sample-config/rc +++ b/doc/sample-config/rc @@ -33,8 +33,9 @@ listen = http://127.0.0.1:7777 # 指定多个二级代理时使用的负载均衡策略,可选策略如下 # -# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用 -# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理 +# backup: 默认策略,优先使用第一个指定的二级代理,其他仅作备份使用 +# hash: 根据请求的 host name,优先使用 hash 到的某一个二级代理 +# latency: 优先选择连接延迟最低的二级代理 # # 一个二级代理连接失败后会依次尝试其他二级代理 # 失败的二级代理会以一定的概率再次尝试使用,因此恢复后会重新启用 diff --git a/estimate_timeout.go b/estimate_timeout.go index 82340a4..6a975c6 100644 --- a/estimate_timeout.go +++ b/estimate_timeout.go @@ -88,8 +88,8 @@ func estimateTimeout() { } return onErr: - dialTimeout += 2 - readTimeout += 2 + dialTimeout += 2 * time.Second + readTimeout += 2 * time.Second } func runEstimateTimeout() { diff --git a/parent_proxy.go b/parent_proxy.go index 5d2c09b..f78b0b2 100644 --- a/parent_proxy.go +++ b/parent_proxy.go @@ -9,12 +9,16 @@ import ( "io" "math/rand" "net" + "sort" "strconv" + "sync" + "time" ) // Interface that all types of parent proxies should support. type ParentProxy interface { connect(*URL) (net.Conn, error) + getServer() string // for use in updating server latency genConfig() string // for upgrading config } @@ -52,6 +56,10 @@ func initParentPool() { case loadBalanceHash: debug.Println("hash parent pool", len(backPool.parent)) parentProxy = &hashParentPool{*backPool} + case loadBalanceLatency: + debug.Println("latency parent pool", len(backPool.parent)) + go updateParentProxyLatency() + parentProxy = newLatencyParentPool(backPool.parent) } } @@ -149,6 +157,157 @@ func connectInOrder(url *URL, pp []ParentWithFail, start int) (srvconn net.Conn, return nil, err } +type ParentWithLatency struct { + ParentProxy + latency time.Duration +} + +type latencyParentPool struct { + parent []ParentWithLatency +} + +func newLatencyParentPool(parent []ParentWithFail) *latencyParentPool { + lp := &latencyParentPool{} + for _, p := range parent { + lp.add(p.ParentProxy) + } + return lp +} + +func (pp *latencyParentPool) empty() bool { + return len(pp.parent) == 0 +} + +func (pp *latencyParentPool) add(parent ParentProxy) { + pp.parent = append(pp.parent, ParentWithLatency{parent, 0}) +} + +// Sort interface. +func (pp *latencyParentPool) Len() int { + return len(pp.parent) +} + +func (pp *latencyParentPool) Swap(i, j int) { + p := pp.parent + p[i], p[j] = p[j], p[i] +} + +func (pp *latencyParentPool) Less(i, j int) bool { + p := pp.parent + return p[i].latency < p[j].latency +} + +const latencyMax = time.Hour + +var latencyMutex sync.RWMutex + +func (pp *latencyParentPool) connect(url *URL) (srvconn net.Conn, err error) { + var lp []ParentWithLatency + // Read slice first. + latencyMutex.RLock() + lp = pp.parent + latencyMutex.RUnlock() + + var skipped []int + nproxy := len(lp) + if nproxy == 0 { + return nil, errors.New("no parent proxy") + } + + for i := 0; i < nproxy; i++ { + parent := lp[i] + if parent.latency >= latencyMax { + skipped = append(skipped, i) + continue + } + if srvconn, err = parent.connect(url); err == nil { + debug.Println("lowest latency proxy", parent.getServer()) + return + } + parent.latency = latencyMax + } + // last resort, try skipped one, not likely to succeed + for _, skippedId := range skipped { + if srvconn, err = lp[skippedId].connect(url); err == nil { + return + } + } + return nil, err +} + +func (parent *ParentWithLatency) updateLatency(wg *sync.WaitGroup) { + defer wg.Done() + proxy := parent.ParentProxy + server := proxy.getServer() + + host, port, err := net.SplitHostPort(server) + if err != nil { + panic("split host port parent server error" + err.Error()) + } + + // Resolve host name first, so latency does not include resolve time. + ip, err := net.LookupHost(host) + if err != nil { + parent.latency = latencyMax + return + } + ipPort := net.JoinHostPort(ip[0], port) + + const N = 3 + var total time.Duration + for i := 0; i < N; i++ { + now := time.Now() + cn, err := net.DialTimeout("tcp", ipPort, dialTimeout) + if err != nil { + debug.Println("latency update dial:", err) + total += time.Minute // 1 minute as penalty + continue + } + total += time.Now().Sub(now) + cn.Close() + + time.Sleep(5 * time.Millisecond) + } + parent.latency = total / N + debug.Println("latency", server, parent.latency) +} + +func (pp *latencyParentPool) updateLatency() { + // Create a copy, update latency for the copy. + var cp latencyParentPool + cp.parent = append(cp.parent, pp.parent...) + + // cp.parent is value instead of pointer, if we use `_, p := range cp.parent`, + // the value in cp.parent will not be updated. + var wg sync.WaitGroup + wg.Add(len(cp.parent)) + for i, _ := range cp.parent { + cp.parent[i].updateLatency(&wg) + } + wg.Wait() + + // Sort according to latency. + sort.Stable(&cp) + debug.Println("lantency lowest proxy", cp.parent[0].getServer()) + + // Update parent slice. + latencyMutex.Lock() + pp.parent = cp.parent + latencyMutex.Unlock() +} + +func updateParentProxyLatency() { + lp, ok := parentProxy.(*latencyParentPool) + if !ok { + return + } + + for { + lp.updateLatency() + time.Sleep(60 * time.Second) + } +} + // http parent proxy type httpParent struct { server string @@ -169,6 +328,10 @@ func newHttpParent(server string) *httpParent { return &httpParent{server: server} } +func (hp *httpParent) getServer() string { + return hp.server +} + func (hp *httpParent) genConfig() string { if hp.userPasswd != "" { return fmt.Sprintf("proxy = http://%s@%s", hp.userPasswd, hp.server) @@ -223,12 +386,16 @@ func newShadowsocksParent(server string) *shadowsocksParent { return &shadowsocksParent{server: server} } +func (sp *shadowsocksParent) getServer() string { + return sp.server +} + func (sp *shadowsocksParent) genConfig() string { - if sp.method == "" { - return fmt.Sprintf("proxy = ss://table:%s@%s", sp.passwd, sp.server) - } else { - return fmt.Sprintf("proxy = ss://%s:%s@%s", sp.method, sp.passwd, sp.server) + method := sp.method + if method == "" { + method = "table" } + return fmt.Sprintf("proxy = ss://%s:%s@%s", method, sp.passwd, sp.server) } func (sp *shadowsocksParent) initCipher(method, passwd string) { @@ -255,6 +422,8 @@ func (sp *shadowsocksParent) connect(url *URL) (net.Conn, error) { // cow parent proxy type cowParent struct { server string + method string + passwd string cipher *ss.Cipher } @@ -272,11 +441,19 @@ func newCowParent(srv, method, passwd string) *cowParent { if err != nil { Fatal("create cow cipher:", err) } - return &cowParent{srv, cipher} + return &cowParent{srv, method, passwd, cipher} +} + +func (cp *cowParent) getServer() string { + return cp.server } func (cp *cowParent) genConfig() string { - return "" // no upgrading need + method := cp.method + if method == "" { + method = "table" + } + return fmt.Sprintf("proxy = cow://%s:%s@%s", method, cp.passwd, cp.server) } func (cp *cowParent) connect(url *URL) (net.Conn, error) { @@ -332,6 +509,10 @@ func newSocksParent(server string) *socksParent { return &socksParent{server} } +func (sp *socksParent) getServer() string { + return sp.server +} + func (sp *socksParent) genConfig() string { return fmt.Sprintf("proxy = socks5://%s", sp.server) } diff --git a/proxy.go b/proxy.go index d766585..b1dde91 100644 --- a/proxy.go +++ b/proxy.go @@ -192,11 +192,11 @@ func newCowProxy(method, passwd, addr string) *cowProxy { } func (cp *cowProxy) genConfig() string { - if cp.method == "" { - return fmt.Sprintf("listen = cow://table:%s@%s", cp.passwd, cp.addr) - } else { - return fmt.Sprintf("listen = cow://%s:%s@%s", cp.method, cp.passwd, cp.addr) + method := cp.method + if method == "" { + method = "table" } + return fmt.Sprintf("listen = cow://%s:%s@%s", method, cp.passwd, cp.addr) } func (cp *cowProxy) Addr() string { @@ -1333,12 +1333,14 @@ func sendBodyChunked(w io.Writer, r *bufio.Reader, rdSize int) (err error) { errl.Println("chunk size invalid:", err) return } - if debug { - // To debug getting malformed response status line with "0\r\n". - if c, ok := w.(*clientConn); ok { - debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s)) + /* + if debug { + // To debug getting malformed response status line with "0\r\n". + if c, ok := w.(*clientConn); ok { + debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s)) + } } - } + */ if size == 0 { r.Skip(len(s)) if err = skipCRLF(r); err != nil {