package main import ( "bytes" "errors" "fmt" "io" "net" "strings" "sync" "time" "github.com/cyfdecyf/bufio" "github.com/cyfdecyf/leakybuf" ss "github.com/shadowsocks/shadowsocks-go/shadowsocks" ) // As I'm using ReadSlice to read line, it's possible to get // bufio.ErrBufferFull while reading line, so set it to a large value to // prevent such problems. // // For limits about URL and HTTP header size, refer to: // http://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url // "de facto limit of 2000 characters" // http://www.mnot.net/blog/2011/07/11/what_proxies_must_do // "URIs should be allowed at least 8000 octets, and HTTP headers should have // 4000 as an absolute minimum". // In practice, there are sites using cookies larger than 4096 bytes, // e.g. www.fitbit.com. So set http buffer size to 8192 to be safe. const httpBufSize = 8192 // Hold at most 4MB memory as buffer for parsing http request/response and // holding post data. var httpBuf = leakybuf.NewLeakyBuf(512, httpBufSize) // If no keep-alive header in response, use this as the keep-alive value. const defaultServerConnTimeout = 15 * time.Second // Close client connection if no new requests received in some time. // (On OS X, the default soft limit of open file descriptor is 256, which is // very conservative and easy to cause problem if we are not careful to limit // open fds.) const clientConnTimeout = 15 * time.Second const fullKeepAliveHeader = "Keep-Alive: timeout=15\r\n" // If client closed connection for HTTP CONNECT method in less then 1 second, // consider it as an ssl error. This is only effective for Chrome which will // drop connection immediately upon SSL error. const sslLeastDuration = time.Second // Some code are learnt from the http package // encapulate actual error for an retry error type RetryError struct { error } func isErrRetry(err error) bool { if err == nil { return false } _, ok := err.(RetryError) return ok } var zeroTime time.Time type directConn struct { net.Conn } func (dc directConn) String() string { return "direct connection" } type serverConnState byte const ( svConnected serverConnState = iota svSendRecvResponse svStopped ) type serverConn struct { net.Conn bufRd *bufio.Reader buf []byte // buffer for the buffered reader hostPort string state serverConnState willCloseOn time.Time siteInfo *VisitCnt visited bool } type clientConn struct { net.Conn // connection to the proxy client bufRd *bufio.Reader buf []byte // buffer for the buffered reader proxy Proxy } var ( errPageSent = errors.New("error page has sent") errClientTimeout = errors.New("read client request timeout") errAuthRequired = errors.New("authentication requried") ) type Proxy interface { Serve(*sync.WaitGroup, <-chan struct{}) Addr() string genConfig() string // for upgrading config } var listenProxy []Proxy func addListenProxy(p Proxy) { listenProxy = append(listenProxy, p) } type httpProxy struct { addr string // listen address, contains port port string // for use when generating PAC addrInPAC string // proxy server address to use in PAC } func newHttpProxy(addr, addrInPAC string) *httpProxy { _, port, err := net.SplitHostPort(addr) if err != nil { panic("proxy addr" + err.Error()) } return &httpProxy{addr, port, addrInPAC} } func (proxy *httpProxy) genConfig() string { if proxy.addrInPAC != "" { return fmt.Sprintf("listen = http://%s %s", proxy.addr, proxy.addrInPAC) } else { return fmt.Sprintf("listen = http://%s", proxy.addr) } } func (proxy *httpProxy) Addr() string { return proxy.addr } func (hp *httpProxy) Serve(wg *sync.WaitGroup, quit <-chan struct{}) { defer func() { wg.Done() }() ln, err := net.Listen("tcp", hp.addr) if err != nil { fmt.Println("listen http failed:", err) return } var exit bool go func() { <-quit exit = true ln.Close() }() host, _, _ := net.SplitHostPort(hp.addr) var pacURL string if host == "" || host == "0.0.0.0" { pacURL = fmt.Sprintf("http://:%s/pac", hp.port) } else if hp.addrInPAC == "" { pacURL = fmt.Sprintf("http://%s/pac", hp.addr) } else { pacURL = fmt.Sprintf("http://%s/pac", hp.addrInPAC) } info.Printf("COW %s listen http %s, PAC url %s\n", version, hp.addr, pacURL) for { conn, err := ln.Accept() if err != nil && !exit { errl.Printf("http proxy(%s) accept %v\n", ln.Addr(), err) if isErrTooManyOpenFd(err) { connPool.CloseAll() } time.Sleep(time.Millisecond) continue } if exit { debug.Println("exiting the http listner") break } c := newClientConn(conn, hp) go c.serve() } } type cowProxy struct { addr string method string passwd string cipher *ss.Cipher } func newCowProxy(method, passwd, addr string) *cowProxy { cipher, err := ss.NewCipher(method, passwd) if err != nil { Fatal("can't initialize cow proxy server", err) } return &cowProxy{addr, method, passwd, cipher} } func (cp *cowProxy) genConfig() string { method := cp.method if method == "" { method = "table" } return fmt.Sprintf("listen = cow://%s:%s@%s", method, cp.passwd, cp.addr) } func (cp *cowProxy) Addr() string { return cp.addr } func (cp *cowProxy) Serve(wg *sync.WaitGroup, quit <-chan struct{}) { defer func() { wg.Done() }() ln, err := net.Listen("tcp", cp.addr) if err != nil { fmt.Println("listen cow failed:", err) return } info.Printf("COW %s cow proxy address %s\n", version, cp.addr) var exit bool go func() { <-quit exit = true ln.Close() }() for { conn, err := ln.Accept() if err != nil && !exit { errl.Printf("cow proxy(%s) accept %v\n", ln.Addr(), err) if isErrTooManyOpenFd(err) { connPool.CloseAll() } time.Sleep(time.Millisecond) continue } if exit { debug.Println("exiting cow listner") break } ssConn := ss.NewConn(conn, cp.cipher.Copy()) c := newClientConn(ssConn, cp) go c.serve() } } func newClientConn(cli net.Conn, proxy Proxy) *clientConn { buf := httpBuf.Get() c := &clientConn{ Conn: cli, buf: buf, bufRd: bufio.NewReaderFromBuf(cli, buf), proxy: proxy, } if debug { debug.Printf("cli(%s) connected, total %d clients\n", cli.RemoteAddr(), incCliCnt()) } return c } func (c *clientConn) releaseBuf() { if c.bufRd != nil { // debug.Println("release client buffer") httpBuf.Put(c.buf) c.buf = nil c.bufRd = nil } } func (c *clientConn) Close() { c.releaseBuf() if debug { debug.Printf("cli(%s) closed, total %d clients\n", c.RemoteAddr(), decCliCnt()) } c.Conn.Close() } func (c *clientConn) setReadTimeout(msg string) { // Always keep connections alive for cow conn from client for more reuse. // For other client connections, set read timeout so we can close the // connection after a period of idle to reduce number of open connections. if _, ok := c.Conn.(*ss.Conn); !ok { // make actual timeout a little longer than keep-alive value sent to client setConnReadTimeout(c.Conn, clientConnTimeout+2*time.Second, msg) } } func (c *clientConn) unsetReadTimeout(msg string) { if _, ok := c.Conn.(*ss.Conn); !ok { unsetConnReadTimeout(c.Conn, msg) } } // Listen address as key, not including port part. var selfListenAddr map[string]bool // Called in main, so no need to protect concurrent initialization. func initSelfListenAddr() { selfListenAddr = make(map[string]bool) // Add empty host to self listen addr, in case there's no Host header. selfListenAddr[""] = true for _, proxy := range listenProxy { addr := proxy.Addr() // Handle wildcard address. if addr[0] == ':' || strings.HasPrefix(addr, "0.0.0.0") { for _, ad := range hostAddr() { selfListenAddr[ad] = true } selfListenAddr["localhost"] = true continue } host, _, err := net.SplitHostPort(addr) if err != nil { panic("listen addr invalid: " + addr) } selfListenAddr[host] = true if host == "127.0.0.1" { selfListenAddr["localhost"] = true } else if host == "localhost" { selfListenAddr["127.0.0.1"] = true } } } func isSelfRequest(r *Request) bool { if r.URL.HostPort != "" { return false } // Maxthon sometimes sends requests without host in request line, // in that case, get host information from Host header. // But if client PAC setting is using cow server's DNS name, we can't // decide if the request is for cow itself (need reverse lookup). // So if request path seems like getting PAC, simply return true. if r.URL.Path == "/pac" || strings.HasPrefix(r.URL.Path, "/pac?") { return true } r.URL.ParseHostPort(r.Header.Host) if selfListenAddr[r.URL.Host] { return true } debug.Printf("fixed request with no host in request line %s\n", r) return false } func (c *clientConn) serveSelfURL(r *Request) (err error) { if _, ok := c.proxy.(*httpProxy); !ok { goto end } if r.Method != "GET" { goto end } if r.URL.Path == "/pac" || strings.HasPrefix(r.URL.Path, "/pac?") { sendPAC(c) // PAC header contains connection close, send non nil error to close // client connection. return errPageSent } end: sendErrorPage(c, "404 not found", "Page not found", genErrMsg(r, nil, "Serving request to COW proxy.")) errl.Printf("cli(%s) page not found, serving request to cow %s\n%s", c.RemoteAddr(), r, r.Verbose()) return errPageSent } func (c *clientConn) shouldRetry(r *Request, sv *serverConn, re error) bool { if !isErrRetry(re) { return false } err, _ := re.(RetryError) if !r.responseNotSent() { if debug { debug.Printf("cli(%s) has sent some response, can't retry %v\n", c.RemoteAddr(), r) } return false } if r.partial { if debug { debug.Printf("cli(%s) partial request, can't retry %v\n", c.RemoteAddr(), r) } sendErrorPage(c, "502 partial request", err.Error(), genErrMsg(r, sv, "Request is too large to hold in buffer, can't retry. "+ "Refresh to retry may work.")) return false } else if r.raw == nil { msg := "Please report issue to the developer: Non partial request with buffer released" errl.Println(msg, r) panic(msg) } if r.tooManyRetry() { if sv.maybeFake() { // Sometimes GFW reset will got EOF error leading to retry too many times. // In that case, consider the url as temp blocked and try parent proxy. siteStat.TempBlocked(r.URL) r.tryCnt = 0 return true } debug.Printf("cli(%s) can't retry %v tryCnt=%d\n", c.RemoteAddr(), r, r.tryCnt) sendErrorPage(c, "502 retry failed", "Can't finish HTTP request", genErrMsg(r, sv, "Has tried several times.")) return false } return true } func dbgPrintRq(c *clientConn, r *Request) { if r.Trailer { errl.Printf("cli(%s) request %s has Trailer header\n%s", c.RemoteAddr(), r, r.Verbose()) } if dbgRq { if verbose { dbgRq.Printf("cli(%s) request %s\n%s", c.RemoteAddr(), r, r.Verbose()) } else { dbgRq.Printf("cli(%s) request %s\n", c.RemoteAddr(), r) } } } type SinkWriter struct{} func (s SinkWriter) Write(p []byte) (int, error) { return len(p), nil } func (c *clientConn) serve() { var r Request var rp Response var sv *serverConn var err error var authed bool // For cow proxy server, authentication is done by matching password. if _, ok := c.proxy.(*cowProxy); ok { authed = true } defer func() { r.releaseBuf() c.Close() }() // Refer to implementation.md for the design choices on parsing the request // and response. for { if c.bufRd == nil || c.buf == nil { panic("client read buffer nil") } if err = parseRequest(c, &r); err != nil { debug.Printf("cli(%s) parse request %v\n", c.RemoteAddr(), err) if err == io.EOF || isErrConnReset(err) { return } if err != errClientTimeout { sendErrorPage(c, "404 Bad request", "Bad request", err.Error()) return } sendErrorPage(c, statusRequestTimeout, statusRequestTimeout, "Your browser didn't send a complete request in time.") return } dbgPrintRq(c, &r) // PAC may leak frequently visited sites information. But if cow // requires authentication for PAC, some clients may not be able // handle it. (e.g. Proxy SwitchySharp extension on Chrome.) if isSelfRequest(&r) { if err = c.serveSelfURL(&r); err != nil { return } continue } if auth.required && !authed { if err = Authenticate(c, &r); err != nil { errl.Printf("cli(%s) %v\n", c.RemoteAddr(), err) // Request may have body. To make things simple, close // connection so we don't need to skip request body before // reading the next request. return } authed = true } if r.isConnect && !config.TunnelAllowedPort[r.URL.Port] { sendErrorPage(c, statusForbidden, "Forbidden tunnel port", genErrMsg(&r, nil, "Please contact proxy admin.")) return } if r.ExpectContinue { sendErrorPage(c, statusExpectFailed, "Expect header not supported", "Please contact COW's developer if you see this.") // Client may have sent request body at this point. Simply close // connection so we don't need to handle this case. // NOTE: sendErrorPage tells client the connection will keep alive, but // actually it will close here. return } retry: r.tryOnce() if bool(debug) && r.isRetry() { debug.Printf("cli(%s) retry request tryCnt=%d %v\n", c.RemoteAddr(), r.tryCnt, &r) } if sv, err = c.getServerConn(&r); err != nil { if debug { debug.Printf("cli(%s) failed to get server conn %v\n", c.RemoteAddr(), &r) } // Failed connection will send error page back to the client. // For CONNECT, the client read buffer is released in copyClient2Server, // so can't go back to getRequest. if err == errPageSent && !r.isConnect { if r.hasBody() { // skip request body debug.Printf("cli(%s) skip request body %v\n", c.RemoteAddr(), &r) sendBody(SinkWriter{}, c.bufRd, int(r.ContLen), r.Chunking) } continue } return } if r.isConnect { // server connection will be closed in doConnect err = sv.doConnect(&r, c) if c.shouldRetry(&r, sv, err) { goto retry } // debug.Printf("doConnect %s to %s done\n", c.RemoteAddr(), r.URL.HostPort) return } if err = sv.doRequest(c, &r, &rp); err != nil { // For client I/O error, we can actually put server connection to // pool. But let's make thing simple for now. sv.Close() if c.shouldRetry(&r, sv, err) { goto retry } else if err == errPageSent && (!r.hasBody() || r.hasSent()) { // Can only continue if request has no body, or request body // has been read. continue } return } // Put server connection to pool, so other clients can use it. _, isCowConn := sv.Conn.(cowConn) if rp.ConnectionKeepAlive || isCowConn { if debug { debug.Printf("cli(%s) connPool put %s", c.RemoteAddr(), sv.hostPort) } // If the server connection is not going to be used soon, // release buffer before putting back to pool can save memory. sv.releaseBuf() connPool.Put(sv) } else { if debug { debug.Printf("cli(%s) server %s close conn\n", c.RemoteAddr(), sv.hostPort) } sv.Close() } if !r.ConnectionKeepAlive { if debug { debug.Printf("cli(%s) close connection\n", c.RemoteAddr()) } return } } } func genErrMsg(r *Request, sv *serverConn, what string) string { if sv == nil { return fmt.Sprintf("

HTTP Request %v

%s

", r, what) } return fmt.Sprintf("

HTTP Request %v

%s

Using %s.

", r, what, sv.Conn) } func (c *clientConn) handleBlockedRequest(r *Request, err error) error { siteStat.TempBlocked(r.URL) return RetryError{err} } func (c *clientConn) handleServerReadError(r *Request, sv *serverConn, err error, msg string) error { if debug { debug.Printf("cli(%s) server read error %s %T %v %v\n", c.RemoteAddr(), msg, err, err, r) } if err == io.EOF { return RetryError{err} } if sv.maybeFake() && maybeBlocked(err) { return c.handleBlockedRequest(r, err) } if r.responseNotSent() { sendErrorPage(c, "502 read error", err.Error(), genErrMsg(r, sv, msg)) return errPageSent } errl.Printf("cli(%s) unhandled server read error %s %v %s\n", c.RemoteAddr(), msg, err, r) return err } func (c *clientConn) handleServerWriteError(r *Request, sv *serverConn, err error, msg string) error { // This function is only called in doRequest, no response is sent to client. // So if visiting blocked site, can always retry request. if sv.maybeFake() && isErrConnReset(err) { siteStat.TempBlocked(r.URL) } return RetryError{err} } func dbgPrintRep(c *clientConn, r *Request, rp *Response) { if rp.Trailer { errl.Printf("cli(%s) response %s has Trailer header\n%s", c.RemoteAddr(), rp, rp.Verbose()) } if dbgRep { if verbose { dbgRep.Printf("cli(%s) response %s %s\n%s", c.RemoteAddr(), r, rp, rp.Verbose()) } else { dbgRep.Printf("cli(%s) response %s %s\n", c.RemoteAddr(), r, rp) } } } func (c *clientConn) readResponse(sv *serverConn, r *Request, rp *Response) (err error) { sv.initBuf() defer func() { rp.releaseBuf() }() /* if r.partial { return RetryError{errors.New("debug retry for partial request")} } */ /* // force retry for debugging if r.tryCnt == 1 { return RetryError{errors.New("debug retry in readResponse")} } */ if err = parseResponse(sv, r, rp); err != nil { return c.handleServerReadError(r, sv, err, "parse response") } dbgPrintRep(c, r, rp) // After have received the first reponses from the server, we consider // ther server as real instead of fake one caused by wrong DNS reply. So // don't time out later. sv.state = svSendRecvResponse r.state = rsRecvBody r.releaseBuf() if _, err = c.Write(rp.rawResponse()); err != nil { return err } rp.releaseBuf() if rp.hasBody(r.Method) { if err = sendBody(c, sv.bufRd, int(rp.ContLen), rp.Chunking); err != nil { if debug { debug.Printf("cli(%s) send body %v\n", c.RemoteAddr(), err) } // Non persistent connection will return nil upon successful response reading if err == io.EOF { // For persistent connection, EOF from server is error. // Response header has been read, server using persistent // connection indicates the end of response and proxy should // not got EOF while reading response. // The client connection will be closed to indicate this error. // Proxy can't send error page here because response header has // been sent. return fmt.Errorf("read response body unexpected EOF %v", rp) } else if isErrOpRead(err) { return c.handleServerReadError(r, sv, err, "read response body") } // errl.Printf("cli(%s) sendBody error %T %v %v", err, err, r) return err } } r.state = rsDone /* if debug { debug.Printf("[Finished] %v request %s %s\n", c.RemoteAddr(), r.Method, r.URL) } */ if rp.ConnectionKeepAlive { if rp.KeepAlive == time.Duration(0) { sv.willCloseOn = time.Now().Add(defaultServerConnTimeout) } else { // debug.Printf("cli(%s) server %s keep-alive %v\n", c.RemoteAddr(), sv.hostPort, rp.KeepAlive) sv.willCloseOn = time.Now().Add(rp.KeepAlive) } } return } func (c *clientConn) getServerConn(r *Request) (*serverConn, error) { siteInfo := siteStat.GetVisitCnt(r.URL) // For CONNECT method, always create new connection. if r.isConnect { return c.createServerConn(r, siteInfo) } sv := connPool.Get(r.URL.HostPort, siteInfo.AsDirect()) if sv != nil { // For websites like feedly, the site itself is not blocked, but the // content it loads may result reset. So we should reset server // connection state to just connected. sv.state = svConnected if debug { debug.Printf("cli(%s) connPool get %s\n", c.RemoteAddr(), r.URL.HostPort) } return sv, nil } if debug { debug.Printf("cli(%s) connPool no conn %s", c.RemoteAddr(), r.URL.HostPort) } return c.createServerConn(r, siteInfo) } func connectDirect2(url *URL, siteInfo *VisitCnt, recursive bool) (net.Conn, error) { var c net.Conn var err error if siteInfo.AlwaysDirect() { c, err = net.Dial("tcp", url.HostPort) } else { to := dialTimeout if siteInfo.OnceBlocked() && to >= defaultDialTimeout { // If once blocked, decrease timeout to switch to parent proxy faster. to = minDialTimeout } else if siteInfo.AsDirect() { // If usually can be accessed directly, increase timeout to avoid // problems when network condition is bad. to = maxTimeout } c, err = net.DialTimeout("tcp", url.HostPort, to) } if err != nil { debug.Printf("error direct connect to: %s %v\n", url.HostPort, err) if isErrTooManyOpenFd(err) && !recursive { return connectDirect2(url, siteInfo, true) } return nil, err } // debug.Println("directly connected to", url.HostPort) return directConn{c}, nil } func connectDirect(url *URL, siteInfo *VisitCnt) (net.Conn, error) { return connectDirect2(url, siteInfo, false) } func isErrTimeout(err error) bool { if ne, ok := err.(net.Error); ok { return ne.Timeout() } return false } func isHttpErrCode(err error) bool { if config.HttpErrorCode <= 0 { return false } if err == CustomHttpErr { return true } return false } func maybeBlocked(err error) bool { if parentProxy.empty() { return false } return isErrTimeout(err) || isErrConnReset(err) || isHttpErrCode(err) } // Connect to requested server according to whether it's visit count. // If direct connection fails, try parent proxies. func (c *clientConn) connect(r *Request, siteInfo *VisitCnt) (srvconn net.Conn, err error) { var errMsg string if config.AlwaysProxy { if srvconn, err = parentProxy.connect(r.URL); err == nil { return } errMsg = genErrMsg(r, nil, "Parent proxy connection failed, always use parent proxy.") goto fail } if siteInfo.AsBlocked() && !parentProxy.empty() { // In case of connection error to socks server, fallback to direct connection if srvconn, err = parentProxy.connect(r.URL); err == nil { return } if siteInfo.AlwaysBlocked() { errMsg = genErrMsg(r, nil, "Parent proxy connection failed, always blocked site.") goto fail } if siteInfo.AsTempBlocked() { errMsg = genErrMsg(r, nil, "Parent proxy connection failed, temporarily blocked site.") goto fail } if srvconn, err = connectDirect(r.URL, siteInfo); err == nil { return } errMsg = genErrMsg(r, nil, "Parent proxy and direct connection failed, maybe blocked site.") } else { // In case of error on direction connection, try parent server if srvconn, err = connectDirect(r.URL, siteInfo); err == nil { return } if parentProxy.empty() { errMsg = genErrMsg(r, nil, "Direct connection failed, no parent proxy.") goto fail } if siteInfo.AlwaysDirect() { errMsg = genErrMsg(r, nil, "Direct connection failed, always direct site.") goto fail } // net.Dial does two things: DNS lookup and TCP connection. // GFW may cause failure here: make it time out or reset connection. // debug.Printf("type of err %T %v\n", err, err) // RST during TCP handshake is valid and would return as connection // refused error. My observation is that GFW does not use RST to stop // TCP handshake. // To simplify things and avoid error in my observation, always try // parent proxy in case of Dial error. var socksErr error if srvconn, socksErr = parentProxy.connect(r.URL); socksErr == nil { c.handleBlockedRequest(r, err) if debug { debug.Printf("cli(%s) direct connection failed, use parent proxy for %v\n", c.RemoteAddr(), r) } return srvconn, nil } errMsg = genErrMsg(r, nil, "Direct and parent proxy connection failed, maybe blocked site.") } fail: sendErrorPage(c, "504 Connection failed", err.Error(), errMsg) return nil, errPageSent } func (c *clientConn) createServerConn(r *Request, siteInfo *VisitCnt) (*serverConn, error) { srvconn, err := c.connect(r, siteInfo) if err != nil { return nil, err } sv := newServerConn(srvconn, r.URL.HostPort, siteInfo) if debug { debug.Printf("cli(%s) connected to %s %d concurrent connections\n", c.RemoteAddr(), sv.hostPort, incSrvConnCnt(sv.hostPort)) } return sv, nil } // Should call initBuf before reading http response from server. This allows // us not init buf for connect method which does not need to parse http // respnose. func newServerConn(c net.Conn, hostPort string, siteInfo *VisitCnt) *serverConn { sv := &serverConn{ Conn: c, hostPort: hostPort, siteInfo: siteInfo, } return sv } func (sv *serverConn) isDirect() bool { _, ok := sv.Conn.(directConn) return ok } func (sv *serverConn) updateVisit() { if sv.visited { return } sv.visited = true if sv.isDirect() { sv.siteInfo.DirectVisit() } else { sv.siteInfo.BlockedVisit() } } func (sv *serverConn) initBuf() { if sv.bufRd == nil { sv.buf = httpBuf.Get() sv.bufRd = bufio.NewReaderFromBuf(sv, sv.buf) } } func (sv *serverConn) releaseBuf() { if sv.bufRd != nil { // debug.Println("release server buffer") httpBuf.Put(sv.buf) sv.buf = nil sv.bufRd = nil } } func (sv *serverConn) Close() error { sv.releaseBuf() if debug { debug.Printf("close connection to %s remains %d concurrent connections\n", sv.hostPort, decSrvConnCnt(sv.hostPort)) } return sv.Conn.Close() } func (sv *serverConn) maybeFake() bool { return sv.state == svConnected && sv.isDirect() && !sv.siteInfo.AlwaysDirect() } func setConnReadTimeout(cn net.Conn, d time.Duration, msg string) { if err := cn.SetReadDeadline(time.Now().Add(d)); err != nil { errl.Println("set readtimeout:", msg, err) } } func unsetConnReadTimeout(cn net.Conn, msg string) { if err := cn.SetReadDeadline(zeroTime); err != nil { // It's possible that conn has been closed, so use debug log. debug.Println("unset readtimeout:", msg, err) } } func (sv *serverConn) setReadTimeout(msg string) { to := readTimeout if sv.siteInfo.OnceBlocked() && to > defaultReadTimeout { to = minReadTimeout } else if sv.siteInfo.AsDirect() { to = maxTimeout } setConnReadTimeout(sv.Conn, to, msg) } func (sv *serverConn) unsetReadTimeout(msg string) { unsetConnReadTimeout(sv.Conn, msg) } func (sv *serverConn) maybeSSLErr(cliStart time.Time) bool { // If client closes connection very soon, maybe there's SSL error, maybe // not (e.g. user stopped request). // COW can't tell which is the case, so this detection is not reliable. return sv.state > svConnected && time.Now().Sub(cliStart) < sslLeastDuration } func (sv *serverConn) mayBeClosed() bool { if _, ok := sv.Conn.(cowConn); ok { debug.Println("cow parent would keep alive") return false } return time.Now().After(sv.willCloseOn) } // Use smaller buffer for connection method as the buffer will be hold for a // very long time. const connectBufSize = 4096 // Hold at most 2M memory for connection buffer. This can support 256 // concurrent connect method. var connectBuf = leakybuf.NewLeakyBuf(512, connectBufSize) func copyServer2Client(sv *serverConn, c *clientConn, r *Request) (err error) { buf := connectBuf.Get() defer func() { connectBuf.Put(buf) }() /* // force retry for debugging if r.tryCnt == 1 && sv.maybeFake() { time.Sleep(1) return RetryError{errors.New("debug retry in copyServer2Client")} } */ total := 0 const directThreshold = 8192 readTimeoutSet := false for { // debug.Println("srv->cli") if sv.maybeFake() { sv.setReadTimeout("srv->cli") readTimeoutSet = true } else if readTimeoutSet { sv.unsetReadTimeout("srv->cli") readTimeoutSet = false } var n int if n, err = sv.Read(buf); err != nil { if sv.maybeFake() && maybeBlocked(err) { siteStat.TempBlocked(r.URL) debug.Printf("srv->cli blocked site %s detected, err: %v retry\n", r.URL.HostPort, err) return RetryError{err} } // Expected error besides EOF: "use of closed network connection", // this is to make blocking read return. // debug.Printf("copyServer2Client read data: %v\n", err) return } total += n if _, err = c.Write(buf[0:n]); err != nil { // debug.Printf("copyServer2Client write data: %v\n", err) return } // debug.Printf("srv(%s)->cli(%s) sent %d bytes data\n", r.URL.HostPort, c.RemoteAddr(), total) // set state to rsRecvBody to indicate the request has partial response sent to client r.state = rsRecvBody sv.state = svSendRecvResponse if total > directThreshold { sv.updateVisit() } } } type serverWriter struct { rq *Request sv *serverConn } func newServerWriter(r *Request, sv *serverConn) *serverWriter { return &serverWriter{r, sv} } // Write to server, store written data in request buffer if necessary. // We have to save request body in order to retry request. // FIXME: too tighly coupled with Request. func (sw *serverWriter) Write(p []byte) (int, error) { if sw.rq.raw == nil { // buffer released } else if sw.rq.raw.Len() >= 2*httpBufSize { // Avoid using too much memory to hold request body. If a request is // not buffered completely, COW can't retry and can release memory // immediately. debug.Println(sw.rq, "request body too large, not buffering any more") sw.rq.releaseBuf() sw.rq.partial = true } else if sw.rq.responseNotSent() { sw.rq.raw.Write(p) } else { // has sent response, happens when saving data for CONNECT method sw.rq.releaseBuf() } return sw.sv.Write(p) } func copyClient2Server(c *clientConn, sv *serverConn, r *Request, srvStopped notification, done chan struct{}) (err error) { // sv.maybeFake may change during execution in this function. // So need a variable to record the whether timeout is set. deadlineIsSet := false defer func() { if deadlineIsSet { // May need to retry, unset timeout here to avoid read client // timeout on retry. Note c.Conn maybe closed when calling this. unsetConnReadTimeout(c.Conn, "cli->srv after err") } close(done) }() var n int if r.isRetry() { if debug { debug.Printf("cli(%s)->srv(%s) retry request %d bytes of buffered body\n", c.RemoteAddr(), r.URL.HostPort, len(r.rawBody())) } if _, err = sv.Write(r.rawBody()); err != nil { debug.Println("cli->srv send to server error") return } } w := newServerWriter(r, sv) if c.bufRd != nil { n = c.bufRd.Buffered() if n > 0 { buffered, _ := c.bufRd.Peek(n) // should not return error if _, err = w.Write(buffered); err != nil { // debug.Printf("cli->srv write buffered err: %v\n", err) return } } if debug { debug.Printf("cli(%s)->srv(%s) released read buffer\n", c.RemoteAddr(), r.URL.HostPort) } c.releaseBuf() } var start time.Time if config.DetectSSLErr { start = time.Now() } buf := connectBuf.Get() defer func() { connectBuf.Put(buf) }() for { // debug.Println("cli->srv") if sv.maybeFake() { setConnReadTimeout(c.Conn, time.Second, "cli->srv") deadlineIsSet = true } else if deadlineIsSet { // maybeFake may trun to false after timeout, but timeout should be unset unsetConnReadTimeout(c.Conn, "cli->srv before read") deadlineIsSet = false } if n, err = c.Read(buf); err != nil { if config.DetectSSLErr && sv.maybeFake() && (isErrConnReset(err) || err == io.EOF) && sv.maybeSSLErr(start) { debug.Println("client connection closed very soon, taken as SSL error:", r) siteStat.TempBlocked(r.URL) } else if isErrTimeout(err) && !srvStopped.hasNotified() { // debug.Printf("cli(%s)->srv(%s) timeout\n", c.RemoteAddr(), r.URL.HostPort) continue } // debug.Printf("cli->srv read err: %v\n", err) return } // copyServer2Client will detect write to closed server. Just store client content for retry. if _, err = w.Write(buf[:n]); err != nil { // XXX is it enough to only do block detection in copyServer2Client? /* if sv.maybeFake() && isErrConnReset(err) { siteStat.TempBlocked(r.URL) errl.Printf("copyClient2Server blocked site %d detected, retry\n", r.URL.HostPort) return RetryError{err} } */ // debug.Printf("cli->srv write err: %v\n", err) return } // debug.Printf("cli(%s)->srv(%s) sent %d bytes data\n", c.RemoteAddr(), r.URL.HostPort, n) } } var connEstablished = []byte("HTTP/1.1 200 Tunnel established\r\n\r\n") // Do HTTP CONNECT func (sv *serverConn) doConnect(r *Request, c *clientConn) (err error) { r.state = rsCreated _, isHttpConn := sv.Conn.(httpConn) _, isCowConn := sv.Conn.(cowConn) if isHttpConn || isCowConn { if debug { debug.Printf("cli(%s) send CONNECT request to parent\n", c.RemoteAddr()) } if err = sv.sendHTTPProxyRequestHeader(r, c); err != nil { debug.Printf("cli(%s) error send CONNECT request to parent: %v\n", c.RemoteAddr(), err) return err } } else if !r.isRetry() { // debug.Printf("send connection confirmation to %s->%s\n", c.RemoteAddr(), r.URL.HostPort) if _, err = c.Write(connEstablished); err != nil { debug.Printf("cli(%s) error send 200 Connecion established: %v\n", c.RemoteAddr(), err) return err } } var cli2srvErr error done := make(chan struct{}) srvStopped := newNotification() go func() { // debug.Printf("doConnect: cli(%s)->srv(%s)\n", c.RemoteAddr(), r.URL.HostPort) cli2srvErr = copyClient2Server(c, sv, r, srvStopped, done) // Close sv to force read from server in copyServer2Client return. // Note: there's no other code closing the server connection for CONNECT. sv.Close() }() // debug.Printf("doConnect: srv(%s)->cli(%s)\n", r.URL.HostPort, c.RemoteAddr()) err = copyServer2Client(sv, c, r) if isErrRetry(err) { srvStopped.notify() <-done // debug.Printf("doConnect: cli(%s)->srv(%s) stopped\n", c.RemoteAddr(), r.URL.HostPort) } else { // close client connection to force read from client in copyClient2Server return c.Conn.Close() } if isErrRetry(cli2srvErr) { return cli2srvErr } return } func (sv *serverConn) sendHTTPProxyRequestHeader(r *Request, c *clientConn) (err error) { if _, err = sv.Write(r.proxyRequestLine()); err != nil { return c.handleServerWriteError(r, sv, err, "send proxy request line to http parent") } if hc, ok := sv.Conn.(httpConn); ok && hc.parent.authHeader != nil { // Add authorization header for parent http proxy if _, err = sv.Write(hc.parent.authHeader); err != nil { return c.handleServerWriteError(r, sv, err, "send proxy authorization header to http parent") } } // When retry, body is in raw buffer. if _, err = sv.Write(r.rawHeaderBody()); err != nil { return c.handleServerWriteError(r, sv, err, "send proxy request header to http parent") } /* if bool(dbgRq) && verbose { debug.Printf("request to http proxy:\n%s%s", r.proxyRequestLine(), r.rawHeaderBody()) } */ return } func (sv *serverConn) sendRequestHeader(r *Request, c *clientConn) (err error) { // Send request to the server switch sv.Conn.(type) { case httpConn, cowConn: return sv.sendHTTPProxyRequestHeader(r, c) } /* if bool(debug) && verbose { debug.Printf("request to server\n%s", r.rawRequest()) } */ if _, err = sv.Write(r.rawRequest()); err != nil { err = c.handleServerWriteError(r, sv, err, "send request to server") } return } func (sv *serverConn) sendRequestBody(r *Request, c *clientConn) (err error) { // Send request body. If this is retry, r.raw contains request body and is // sent while sending raw request. if !r.hasBody() || r.isRetry() { return } err = sendBody(newServerWriter(r, sv), c.bufRd, int(r.ContLen), r.Chunking) if err != nil { errl.Printf("cli(%s) send request body error %v %s\n", c.RemoteAddr(), err, r) if isErrOpWrite(err) { err = c.handleServerWriteError(r, sv, err, "send request body") } return } if debug { debug.Printf("cli(%s) request body sent %s\n", c.RemoteAddr(), r) } return } // Do HTTP request other that CONNECT func (sv *serverConn) doRequest(c *clientConn, r *Request, rp *Response) (err error) { r.state = rsCreated if err = sv.sendRequestHeader(r, c); err != nil { return } if err = sv.sendRequestBody(r, c); err != nil { return } r.state = rsSent if err = c.readResponse(sv, r, rp); err == nil { sv.updateVisit() } return err } // Send response body if header specifies content length func sendBodyWithContLen(w io.Writer, r *bufio.Reader, contLen int) (err error) { // debug.Println("Sending body with content length", contLen) if contLen == 0 { return } if err = copyN(w, r, contLen, httpBufSize); err != nil { debug.Println("sendBodyWithContLen error:", err) } return } // Use this function until we find Trailer headers actually in use. func skipTrailer(r *bufio.Reader) error { // It's possible to get trailer headers, but the body will always end with // a line with just CRLF. for { s, err := r.ReadSlice('\n') if err != nil { errl.Println("skip trailer:", err) return err } if len(s) == 2 && s[0] == '\r' && s[1] == '\n' { return nil } errl.Printf("skip trailer: %#v", string(s)) if len(s) == 1 || len(s) == 2 { return fmt.Errorf("malformed chunk body end: %#v", string(s)) } } } func skipCRLF(r *bufio.Reader) (err error) { var buf [2]byte if _, err = io.ReadFull(r, buf[:]); err != nil { errl.Println("skip chunk body end:", err) return } if buf[0] != '\r' || buf[1] != '\n' { return fmt.Errorf("malformed chunk body end: %#v", string(buf[:])) } return } // Send response body if header specifies chunked encoding. rdSize specifies // the size of each read on Reader, it should be set to be the buffer size of // the Reader, this parameter is added for testing. func sendBodyChunked(w io.Writer, r *bufio.Reader, rdSize int) (err error) { // debug.Println("Sending chunked body") for { var s []byte // Read chunk size line, ignore chunk extension if any. if s, err = r.PeekSlice('\n'); err != nil { errl.Println("peek chunk size:", err) return } smid := bytes.IndexByte(s, ';') if smid == -1 { smid = len(s) } else { // use error log to find usage of chunk extension errl.Printf("got chunk extension: %s\n", s) } var size int64 if size, err = ParseIntFromBytes(TrimSpace(s[:smid]), 16); err != nil { errl.Println("chunk size invalid:", err) return } /* if debug { // To debug getting malformed response status line with "0\r\n". if c, ok := w.(*clientConn); ok { debug.Printf("cli(%s) chunk size %d %#v\n", c.RemoteAddr(), size, string(s)) } } */ if size == 0 { r.Skip(len(s)) if err = skipCRLF(r); err != nil { return } if _, err = w.Write([]byte(chunkEnd)); err != nil { debug.Println("send chunk ending:", err) } return } // RFC 2616 19.3 only suggest tolerating single LF for // headers, not for chunked encoding. So assume the server will send // CRLF. If not, the following parse int may find errors. total := len(s) + int(size) + 2 // total data size for this chunk, including ending CRLF // PeekSlice will not advance reader, so we can just copy total sized data. if err = copyN(w, r, total, rdSize); err != nil { debug.Println("copy chunked data:", err) return } } } const chunkEnd = "0\r\n\r\n" func sendBodySplitIntoChunk(w io.Writer, r *bufio.Reader) (err error) { // debug.Printf("sendBodySplitIntoChunk called\n") var b []byte for { b, err = r.ReadNext() // debug.Println("split into chunk n =", n, "err =", err) if err != nil { if err == io.EOF { // EOF is expected here as the server is closing connection. // debug.Println("end chunked encoding") _, err = w.Write([]byte(chunkEnd)) if err != nil { debug.Println("write chunk end 0", err) } return } debug.Println("read error in sendBodySplitIntoChunk", err) return } chunkSize := []byte(fmt.Sprintf("%x\r\n", len(b))) if _, err = w.Write(chunkSize); err != nil { debug.Printf("write chunk size %v\n", err) return } if _, err = w.Write(b); err != nil { debug.Println("write chunk data:", err) return } if _, err = w.Write([]byte(CRLF)); err != nil { debug.Println("write chunk ending CRLF:", err) return } } } // Send message body. func sendBody(w io.Writer, bufRd *bufio.Reader, contLen int, chunk bool) (err error) { // chunked encoding has precedence over content length // COW does not sanitize response header, but can correctly handle it if chunk { err = sendBodyChunked(w, bufRd, httpBufSize) } else if contLen >= 0 { // It's possible to have content length 0 if server response has no // body. err = sendBodyWithContLen(w, bufRd, int(contLen)) } else { // Must be reading server response here, because sendBody is called in // reading response iff chunked or content length > 0. err = sendBodySplitIntoChunk(w, bufRd) } return }