From f70a646902db51083894ac511634036e2dab3509 Mon Sep 17 00:00:00 2001 From: hebo Date: Thu, 5 Dec 2019 20:51:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=8Eproxy=E8=8E=B7=E5=8F=96client=20ip?= =?UTF-8?q?=E5=9C=B0=E5=9D=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Godeps/Godeps.json | 4 + capture/network.go | 37 +++- session-dealer/controller.go | 6 +- session-dealer/mysql/session.go | 8 +- .../github.com/pires/go-proxyproto/.gitignore | 11 + vendor/github.com/pires/go-proxyproto/LICENSE | 201 ++++++++++++++++++ .../github.com/pires/go-proxyproto/README.md | 69 ++++++ .../pires/go-proxyproto/addr_proto.go | 71 +++++++ vendor/github.com/pires/go-proxyproto/go.mod | 3 + .../github.com/pires/go-proxyproto/header.go | 149 +++++++++++++ .../pires/go-proxyproto/protocol.go | 130 +++++++++++ vendor/github.com/pires/go-proxyproto/v1.go | 116 ++++++++++ vendor/github.com/pires/go-proxyproto/v2.go | 195 +++++++++++++++++ .../pires/go-proxyproto/version_cmd.go | 39 ++++ 14 files changed, 1024 insertions(+), 15 deletions(-) create mode 100644 vendor/github.com/pires/go-proxyproto/.gitignore create mode 100644 vendor/github.com/pires/go-proxyproto/LICENSE create mode 100644 vendor/github.com/pires/go-proxyproto/README.md create mode 100644 vendor/github.com/pires/go-proxyproto/addr_proto.go create mode 100644 vendor/github.com/pires/go-proxyproto/go.mod create mode 100644 vendor/github.com/pires/go-proxyproto/header.go create mode 100644 vendor/github.com/pires/go-proxyproto/protocol.go create mode 100644 vendor/github.com/pires/go-proxyproto/v1.go create mode 100644 vendor/github.com/pires/go-proxyproto/v2.go create mode 100644 vendor/github.com/pires/go-proxyproto/version_cmd.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 8fa4383..d874684 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -94,6 +94,10 @@ "Comment": "v2.1.0-beta-226-g360ef5c", "Rev": "360ef5cee790475e9d75cb349e1b47b0a7a4efee" }, + { + "ImportPath": "github.com/pires/go-proxyproto", + "Rev": "83d61d90241fdbce96130ef0cc4ce5e495a95bdf" + }, { "ImportPath": "github.com/rcrowley/go-metrics", "Rev": "e181e095bae94582363434144c61a9653aff6e50" diff --git a/capture/network.go b/capture/network.go index d3521ec..fcc5603 100644 --- a/capture/network.go +++ b/capture/network.go @@ -1,17 +1,21 @@ package capture import ( + "bufio" + "bytes" "flag" "fmt" + "math/rand" + "time" + "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/zr-hebo/sniffer-agent/communicator" "golang.org/x/net/bpf" - "math/rand" - "time" "github.com/google/gopacket/pcapgo" + proto "github.com/pires/go-proxyproto" log "github.com/sirupsen/logrus" "github.com/zr-hebo/sniffer-agent/model" sd "github.com/zr-hebo/sniffer-agent/session-dealer" @@ -133,12 +137,24 @@ func (nc *networkCard) listenNormal() { m := packet.Metadata() m.CaptureInfo = ci - // send FIN tcp packet to avoid not complete session cannot be released tcpPkt := packet.TransportLayer().(*layers.TCP) + // send FIN tcp packet to avoid not complete session cannot be released // deal FIN packet + if tcpPkt.FIN { + nc.parseTCPPackage(packet, nil) + continue + } + // deal auth packet - if tcpPkt.FIN || sd.IsAuthPacket(tcpPkt.Payload) { - nc.parseTCPPackage(packet) + if sd.IsAuthPacket(tcpPkt.Payload) { + reader := bufio.NewReader(bytes.NewReader(data)) + header, _ := proto.Read(reader) + var clientIP *string + if header != nil { + clientIPContent := header.SourceAddress.String() + clientIP = &clientIPContent + } + nc.parseTCPPackage(packet, clientIP) continue } @@ -151,14 +167,14 @@ func (nc *networkCard) listenNormal() { } aliveCounter = 0 - nc.parseTCPPackage(packet) + nc.parseTCPPackage(packet, nil) } }() return } -func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) { +func (nc *networkCard) parseTCPPackage(packet gopacket.Packet, clientIP *string) { var err error defer func() { if err != nil { @@ -183,14 +199,13 @@ func (nc *networkCard) parseTCPPackage(packet gopacket.Packet) { return } - // get IP from ip layer srcIP := ipInfo.SrcIP.String() dstIP := ipInfo.DstIP.String() srcPort := int(tcpPkt.SrcPort) dstPort := int(tcpPkt.DstPort) if dstPort == nc.listenPort { // deal mysql server response - err = readToServerPackage(&srcIP, srcPort, tcpPkt, nc.receiver) + err = readToServerPackage(clientIP, &srcIP, srcPort, tcpPkt, nc.receiver) if err != nil { return } @@ -240,7 +255,7 @@ func readFromServerPackage( } func readToServerPackage( - srcIP *string, srcPort int, tcpPkt *layers.TCP, receiver chan model.QueryPiece) (err error) { + clientIP, srcIP *string, srcPort int, tcpPkt *layers.TCP, receiver chan model.QueryPiece) (err error) { defer func() { if err != nil { log.Error("read package send from client to mysql server failed <-- %s", err.Error()) @@ -267,7 +282,7 @@ func readToServerPackage( sessionKey := spliceSessionKey(srcIP, srcPort) session := sessionPool[*sessionKey] if session == nil { - session = sd.NewSession(sessionKey, srcIP, srcPort, localIPAddr, snifferPort, receiver) + session = sd.NewSession(sessionKey, clientIP, srcIP, srcPort, localIPAddr, snifferPort, receiver) sessionPool[*sessionKey] = session } diff --git a/session-dealer/controller.go b/session-dealer/controller.go index cbd8541..585796d 100644 --- a/session-dealer/controller.go +++ b/session-dealer/controller.go @@ -5,13 +5,13 @@ import ( "github.com/zr-hebo/sniffer-agent/session-dealer/mysql" ) -func NewSession(sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int, +func NewSession(sessionKey, clientAlias, clientIP *string, clientPort int, serverIP *string, serverPort int, receiver chan model.QueryPiece) (session ConnSession) { switch serviceType { case ServiceTypeMysql: - session = mysql.NewMysqlSession(sessionKey, clientIP, clientPort, serverIP, serverPort, receiver) + session = mysql.NewMysqlSession(sessionKey, clientAlias, clientIP, clientPort, serverIP, serverPort, receiver) default: - session = mysql.NewMysqlSession(sessionKey, clientIP, clientPort, serverIP, serverPort, receiver) + session = mysql.NewMysqlSession(sessionKey, clientAlias, clientIP, clientPort, serverIP, serverPort, receiver) } return } diff --git a/session-dealer/mysql/session.go b/session-dealer/mysql/session.go index 876d4fe..1d70aa2 100644 --- a/session-dealer/mysql/session.go +++ b/session-dealer/mysql/session.go @@ -15,6 +15,7 @@ type MysqlSession struct { connectionID *string visitUser *string visitDB *string + clientAlias *string clientHost *string clientPort int serverIP *string @@ -46,10 +47,11 @@ type prepareInfo struct { } func NewMysqlSession( - sessionKey *string, clientIP *string, clientPort int, serverIP *string, serverPort int, + sessionKey, clientAlias, clientIP *string, clientPort int, serverIP *string, serverPort int, receiver chan model.QueryPiece) (ms *MysqlSession) { ms = &MysqlSession{ connectionID: sessionKey, + clientAlias: clientAlias, clientHost: clientIP, clientPort: clientPort, serverIP: serverIP, @@ -333,6 +335,10 @@ func filterQueryPieceBySQL(mqp *model.PooledMysqlQueryPiece, querySQL []byte) (* } func (ms *MysqlSession) composeQueryPiece() (mqp *model.PooledMysqlQueryPiece) { + clientIP := ms.clientAlias + if clientIP == nil || len(*clientIP) < 1 { + clientIP = ms.clientHost + } return model.NewPooledMysqlQueryPiece( ms.connectionID, ms.clientHost, ms.visitUser, ms.visitDB, ms.clientHost, ms.serverIP, ms.clientPort, ms.serverPort, communicator.GetMysqlCapturePacketRate(), ms.stmtBeginTime) diff --git a/vendor/github.com/pires/go-proxyproto/.gitignore b/vendor/github.com/pires/go-proxyproto/.gitignore new file mode 100644 index 0000000..a2d2c30 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/.gitignore @@ -0,0 +1,11 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +.idea +bin +pkg + +*.out diff --git a/vendor/github.com/pires/go-proxyproto/LICENSE b/vendor/github.com/pires/go-proxyproto/LICENSE new file mode 100644 index 0000000..8dada3e --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/pires/go-proxyproto/README.md b/vendor/github.com/pires/go-proxyproto/README.md new file mode 100644 index 0000000..9648fd2 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/README.md @@ -0,0 +1,69 @@ +# go-proxyproto + +[![Actions Status](https://github.com/pires/go-proxyproto/workflows/test/badge.svg)](https://github.com/pires/go-proxyproto/actions) +[![Coverage Status](https://coveralls.io/repos/github/pires/go-proxyproto/badge.svg?branch=master)](https://coveralls.io/github/pires/go-proxyproto?branch=master) +[![Go Report Card](https://goreportcard.com/badge/github.com/pires/go-proxyproto)](https://goreportcard.com/report/github.com/pires/go-proxyproto) +[![](https://godoc.org/github.com/pires/go-proxyproto?status.svg)](http://godoc.org/github.com/pires/go-proxyproto) + + +A Go library implementation of the [PROXY protocol, versions 1 and 2](http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt), +which provides, as per specification: +> (...) a convenient way to safely transport connection +> information such as a client's address across multiple layers of NAT or TCP +> proxies. It is designed to require little changes to existing components and +> to limit the performance impact caused by the processing of the transported +> information. + +This library is to be used in one of or both proxy clients and proxy servers that need to support said protocol. +Both protocol versions, 1 (text-based) and 2 (binary-based) are supported. + +## Installation + +```shell +$ go get -u github.com/pires/go-proxyproto +``` + +## Usage + +### Client (TODO) + +### Server + +```go +package main + +import ( + "log" + "net" + + proxyproto "github.com/pires/go-proxyproto" +) + +func main() { + // Create a listener + addr := "localhost:9876" + list, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalf("couldn't listen to %q: %q\n", addr, err.Error()) + } + + // Wrap listener in a proxyproto listener + proxyListener := &proxyproto.Listener{Listener: list} + defer proxyListener.Close() + + // Wait for a connection and accept it + conn, err := proxyListener.Accept() + defer conn.Close() + + // Print connection details + if conn.LocalAddr() == nil { + log.Fatal("couldn't retrieve local address") + } + log.Printf("local address: %q", conn.LocalAddr().String()) + + if conn.RemoteAddr() == nil { + log.Fatal("couldn't retrieve remote address") + } + log.Printf("remote address: %q", conn.RemoteAddr().String()) +} +``` \ No newline at end of file diff --git a/vendor/github.com/pires/go-proxyproto/addr_proto.go b/vendor/github.com/pires/go-proxyproto/addr_proto.go new file mode 100644 index 0000000..56b9155 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/addr_proto.go @@ -0,0 +1,71 @@ +package proxyproto + +// AddressFamilyAndProtocol represents address family and transport protocol. +type AddressFamilyAndProtocol byte + +const ( + UNSPEC = '\x00' + TCPv4 = '\x11' + UDPv4 = '\x12' + TCPv6 = '\x21' + UDPv6 = '\x22' + UnixStream = '\x31' + UnixDatagram = '\x32' +) + +var supportedTransportProtocol = map[AddressFamilyAndProtocol]bool{ + TCPv4: true, + UDPv4: true, + TCPv6: true, + UDPv6: true, + UnixStream: true, + UnixDatagram: true, +} + +// IsIPv4 returns true if the address family is IPv4 (AF_INET4), false otherwise. +func (ap AddressFamilyAndProtocol) IsIPv4() bool { + return 0x10 == ap&0xF0 +} + +// IsIPv6 returns true if the address family is IPv6 (AF_INET6), false otherwise. +func (ap AddressFamilyAndProtocol) IsIPv6() bool { + return 0x20 == ap&0xF0 +} + +// IsUnix returns true if the address family is UNIX (AF_UNIX), false otherwise. +func (ap AddressFamilyAndProtocol) IsUnix() bool { + return 0x30 == ap&0xF0 +} + +// IsStream returns true if the transport protocol is TCP or STREAM (SOCK_STREAM), false otherwise. +func (ap AddressFamilyAndProtocol) IsStream() bool { + return 0x01 == ap&0x0F +} + +// IsDatagram returns true if the transport protocol is UDP or DGRAM (SOCK_DGRAM), false otherwise. +func (ap AddressFamilyAndProtocol) IsDatagram() bool { + return 0x02 == ap&0x0F +} + +// IsUnspec returns true if the transport protocol or address family is unspecified, false otherwise. +func (ap AddressFamilyAndProtocol) IsUnspec() bool { + return (0x00 == ap&0xF0) || (0x00 == ap&0x0F) +} + +func (ap AddressFamilyAndProtocol) toByte() byte { + if ap.IsIPv4() && ap.IsStream() { + return TCPv4 + } else if ap.IsIPv4() && ap.IsDatagram() { + return UDPv4 + } else if ap.IsIPv6() && ap.IsStream() { + return TCPv6 + } else if ap.IsIPv6() && ap.IsDatagram() { + return UDPv6 + } else if ap.IsUnix() && ap.IsStream() { + return UnixStream + } else if ap.IsUnix() && ap.IsDatagram() { + return UnixDatagram + } + + return UNSPEC +} diff --git a/vendor/github.com/pires/go-proxyproto/go.mod b/vendor/github.com/pires/go-proxyproto/go.mod new file mode 100644 index 0000000..0700cb1 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/go.mod @@ -0,0 +1,3 @@ +module github.com/pires/go-proxyproto + +go 1.13 diff --git a/vendor/github.com/pires/go-proxyproto/header.go b/vendor/github.com/pires/go-proxyproto/header.go new file mode 100644 index 0000000..e2aeee3 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/header.go @@ -0,0 +1,149 @@ +// Package proxyproto implements Proxy Protocol (v1 and v2) parser and writer, as per specification: +// http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt +package proxyproto + +import ( + "bufio" + "bytes" + "errors" + "io" + "net" + "time" +) + +var ( + // Protocol + SIGV1 = []byte{'\x50', '\x52', '\x4F', '\x58', '\x59'} + SIGV2 = []byte{'\x0D', '\x0A', '\x0D', '\x0A', '\x00', '\x0D', '\x0A', '\x51', '\x55', '\x49', '\x54', '\x0A'} + + ErrCantReadProtocolVersionAndCommand = errors.New("Can't read proxy protocol version and command") + ErrCantReadAddressFamilyAndProtocol = errors.New("Can't read address family or protocol") + ErrCantReadLength = errors.New("Can't read length") + ErrCantResolveSourceUnixAddress = errors.New("Can't resolve source Unix address") + ErrCantResolveDestinationUnixAddress = errors.New("Can't resolve destination Unix address") + ErrNoProxyProtocol = errors.New("Proxy protocol signature not present") + ErrUnknownProxyProtocolVersion = errors.New("Unknown proxy protocol version") + ErrUnsupportedProtocolVersionAndCommand = errors.New("Unsupported proxy protocol version and command") + ErrUnsupportedAddressFamilyAndProtocol = errors.New("Unsupported address family and protocol") + ErrInvalidLength = errors.New("Invalid length") + ErrInvalidAddress = errors.New("Invalid address") + ErrInvalidPortNumber = errors.New("Invalid port number") +) + +// Header is the placeholder for proxy protocol header. +type Header struct { + Version byte + Command ProtocolVersionAndCommand + TransportProtocol AddressFamilyAndProtocol + SourceAddress net.IP + DestinationAddress net.IP + SourcePort uint16 + DestinationPort uint16 +} + +// RemoteAddr returns the address of the remote endpoint of the connection. +func (header *Header) RemoteAddr() net.Addr { + return &net.TCPAddr{ + IP: header.SourceAddress, + Port: int(header.SourcePort), + } +} + +// LocalAddr returns the address of the local endpoint of the connection. +func (header *Header) LocalAddr() net.Addr { + return &net.TCPAddr{ + IP: header.DestinationAddress, + Port: int(header.DestinationPort), + } +} + +// EqualTo returns true if headers are equivalent, false otherwise. +// Deprecated: use EqualsTo instead. This method will eventually be removed. +func (header *Header) EqualTo(otherHeader *Header) bool { + return header.EqualsTo(otherHeader) +} + +// EqualsTo returns true if headers are equivalent, false otherwise. +func (header *Header) EqualsTo(otherHeader *Header) bool { + if otherHeader == nil { + return false + } + if header.Command.IsLocal() { + return true + } + return header.Version == otherHeader.Version && + header.TransportProtocol == otherHeader.TransportProtocol && + header.SourceAddress.String() == otherHeader.SourceAddress.String() && + header.DestinationAddress.String() == otherHeader.DestinationAddress.String() && + header.SourcePort == otherHeader.SourcePort && + header.DestinationPort == otherHeader.DestinationPort +} + +// WriteTo renders a proxy protocol header in a format and writes it to an io.Writer. +func (header *Header) WriteTo(w io.Writer) (int64, error) { + buf, err := header.Format() + if err != nil { + return 0, err + } + + return bytes.NewBuffer(buf).WriteTo(w) +} + +// Format renders a proxy protocol header in a format to write over the wire. +func (header *Header) Format() ([]byte, error) { + switch header.Version { + case 1: + return header.formatVersion1() + case 2: + return header.formatVersion2() + default: + return nil, ErrUnknownProxyProtocolVersion + } +} + +// Read identifies the proxy protocol version and reads the remaining of +// the header, accordingly. +// +// If proxy protocol header signature is not present, the reader buffer remains untouched +// and is safe for reading outside of this code. +// +// If proxy protocol header signature is present but an error is raised while processing +// the remaining header, assume the reader buffer to be in a corrupt state. +// Also, this operation will block until enough bytes are available for peeking. +func Read(reader *bufio.Reader) (*Header, error) { + // In order to improve speed for small non-PROXYed packets, take a peek at the first byte alone. + if b1, err := reader.Peek(1); err == nil && (bytes.Equal(b1[:1], SIGV1[:1]) || bytes.Equal(b1[:1], SIGV2[:1])) { + if signature, err := reader.Peek(5); err == nil && bytes.Equal(signature[:5], SIGV1) { + return parseVersion1(reader) + } else if signature, err := reader.Peek(12); err == nil && bytes.Equal(signature[:12], SIGV2) { + return parseVersion2(reader) + } + } + + return nil, ErrNoProxyProtocol +} + +// ReadTimeout acts as Read but takes a timeout. If that timeout is reached, it's assumed +// there's no proxy protocol header. +func ReadTimeout(reader *bufio.Reader, timeout time.Duration) (*Header, error) { + type header struct { + h *Header + e error + } + read := make(chan *header, 1) + + go func() { + h := &header{} + h.h, h.e = Read(reader) + read <- h + }() + + timer := time.NewTimer(timeout) + select { + case result := <-read: + timer.Stop() + return result.h, result.e + case <-timer.C: + return nil, ErrNoProxyProtocol + } +} diff --git a/vendor/github.com/pires/go-proxyproto/protocol.go b/vendor/github.com/pires/go-proxyproto/protocol.go new file mode 100644 index 0000000..d577970 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/protocol.go @@ -0,0 +1,130 @@ +package proxyproto + +import ( + "bufio" + "net" + "sync" + "time" +) + +// Listener is used to wrap an underlying listener, +// whose connections may be using the HAProxy Proxy Protocol. +// If the connection is using the protocol, the RemoteAddr() will return +// the correct client address. +type Listener struct { + Listener net.Listener +} + +// Conn is used to wrap and underlying connection which +// may be speaking the Proxy Protocol. If it is, the RemoteAddr() will +// return the address of the client instead of the proxy address. +type Conn struct { + bufReader *bufio.Reader + conn net.Conn + header *Header + once sync.Once +} + +// Accept waits for and returns the next connection to the listener. +func (p *Listener) Accept() (net.Conn, error) { + // Get the underlying connection + conn, err := p.Listener.Accept() + if err != nil { + return nil, err + } + return NewConn(conn), nil +} + +// Close closes the underlying listener. +func (p *Listener) Close() error { + return p.Listener.Close() +} + +// Addr returns the underlying listener's network address. +func (p *Listener) Addr() net.Addr { + return p.Listener.Addr() +} + +// NewConn is used to wrap a net.Conn that may be speaking +// the proxy protocol into a proxyproto.Conn +func NewConn(conn net.Conn) *Conn { + pConn := &Conn{ + bufReader: bufio.NewReader(conn), + conn: conn, + } + return pConn +} + +// Read is check for the proxy protocol header when doing +// the initial scan. If there is an error parsing the header, +// it is returned and the socket is closed. +func (p *Conn) Read(b []byte) (int, error) { + var err error + p.once.Do(func() { + err = p.readHeader() + }) + if err != nil { + return 0, err + } + return p.bufReader.Read(b) +} + +// Write wraps original conn.Write +func (p *Conn) Write(b []byte) (int, error) { + return p.conn.Write(b) +} + +// Close wraps original conn.Close +func (p *Conn) Close() error { + return p.conn.Close() +} + +// LocalAddr returns the address of the server if the proxy +// protocol is being used, otherwise just returns the address of +// the socket server. +func (p *Conn) LocalAddr() net.Addr { + p.once.Do(func() { p.readHeader() }) + if p.header == nil || p.header.Command.IsLocal() { + return p.conn.LocalAddr() + } + + return p.header.LocalAddr() +} + +// RemoteAddr returns the address of the client if the proxy +// protocol is being used, otherwise just returns the address of +// the socket peer. +func (p *Conn) RemoteAddr() net.Addr { + p.once.Do(func() { p.readHeader() }) + if p.header == nil || p.header.Command.IsLocal() { + return p.conn.RemoteAddr() + } + + return p.header.RemoteAddr() +} + +// SetDeadline wraps original conn.SetDeadline +func (p *Conn) SetDeadline(t time.Time) error { + return p.conn.SetDeadline(t) +} + +// SetReadDeadline wraps original conn.SetReadDeadline +func (p *Conn) SetReadDeadline(t time.Time) error { + return p.conn.SetReadDeadline(t) +} + +// SetWriteDeadline wraps original conn.SetWriteDeadline +func (p *Conn) SetWriteDeadline(t time.Time) error { + return p.conn.SetWriteDeadline(t) +} + +func (p *Conn) readHeader() (err error) { + p.header, err = Read(p.bufReader) + // For the purpose of this wrapper shamefully stolen from armon/go-proxyproto + // let's act as if there was no error when PROXY protocol is not present. + if err == ErrNoProxyProtocol { + err = nil + } + + return +} diff --git a/vendor/github.com/pires/go-proxyproto/v1.go b/vendor/github.com/pires/go-proxyproto/v1.go new file mode 100644 index 0000000..ca9c104 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/v1.go @@ -0,0 +1,116 @@ +package proxyproto + +import ( + "bufio" + "bytes" + "net" + "strconv" + "strings" +) + +const ( + CRLF = "\r\n" + SEPARATOR = " " +) + +func initVersion1() *Header { + header := new(Header) + header.Version = 1 + // Command doesn't exist in v1 + header.Command = PROXY + return header +} + +func parseVersion1(reader *bufio.Reader) (*Header, error) { + // Make sure we have a v1 header + line, err := reader.ReadString('\n') + if !strings.HasSuffix(line, CRLF) { + return nil, ErrCantReadProtocolVersionAndCommand + } + tokens := strings.Split(line[:len(line)-2], SEPARATOR) + if len(tokens) < 6 { + return nil, ErrCantReadProtocolVersionAndCommand + } + + header := initVersion1() + + // Read address family and protocol + switch tokens[1] { + case "TCP4": + header.TransportProtocol = TCPv4 + case "TCP6": + header.TransportProtocol = TCPv6 + default: + header.TransportProtocol = UNSPEC + } + + // Read addresses and ports + header.SourceAddress, err = parseV1IPAddress(header.TransportProtocol, tokens[2]) + if err != nil { + return nil, err + } + header.DestinationAddress, err = parseV1IPAddress(header.TransportProtocol, tokens[3]) + if err != nil { + return nil, err + } + header.SourcePort, err = parseV1PortNumber(tokens[4]) + if err != nil { + return nil, err + } + header.DestinationPort, err = parseV1PortNumber(tokens[5]) + if err != nil { + return nil, err + } + return header, nil +} + +func (header *Header) formatVersion1() ([]byte, error) { + // As of version 1, only "TCP4" ( \x54 \x43 \x50 \x34 ) for TCP over IPv4, + // and "TCP6" ( \x54 \x43 \x50 \x36 ) for TCP over IPv6 are allowed. + proto := "UNKNOWN" + if header.TransportProtocol == TCPv4 { + proto = "TCP4" + } else if header.TransportProtocol == TCPv6 { + proto = "TCP6" + } + + var buf bytes.Buffer + buf.Write(SIGV1) + buf.WriteString(SEPARATOR) + buf.WriteString(proto) + buf.WriteString(SEPARATOR) + buf.WriteString(header.SourceAddress.String()) + buf.WriteString(SEPARATOR) + buf.WriteString(header.DestinationAddress.String()) + buf.WriteString(SEPARATOR) + buf.WriteString(strconv.Itoa(int(header.SourcePort))) + buf.WriteString(SEPARATOR) + buf.WriteString(strconv.Itoa(int(header.DestinationPort))) + buf.WriteString(CRLF) + + return buf.Bytes(), nil +} + +func parseV1PortNumber(portStr string) (port uint16, err error) { + _port, _err := strconv.Atoi(portStr) + if _err == nil { + if _port < 0 || _port > 65535 { + err = ErrInvalidPortNumber + } else { + port = uint16(_port) + } + } else { + err = ErrInvalidPortNumber + } + + return +} + +func parseV1IPAddress(protocol AddressFamilyAndProtocol, addrStr string) (addr net.IP, err error) { + addr = net.ParseIP(addrStr) + tryV4 := addr.To4() + if (protocol == TCPv4 && tryV4 == nil) || (protocol == TCPv6 && tryV4 != nil) { + err = ErrInvalidAddress + } + return +} diff --git a/vendor/github.com/pires/go-proxyproto/v2.go b/vendor/github.com/pires/go-proxyproto/v2.go new file mode 100644 index 0000000..e069e02 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/v2.go @@ -0,0 +1,195 @@ +package proxyproto + +import ( + "bufio" + "bytes" + "encoding/binary" + "io" +) + +var ( + lengthV4 = uint16(12) + lengthV6 = uint16(36) + lengthUnix = uint16(218) + + lengthV4Bytes = func() []byte { + a := make([]byte, 2) + binary.BigEndian.PutUint16(a, lengthV4) + return a + }() + lengthV6Bytes = func() []byte { + a := make([]byte, 2) + binary.BigEndian.PutUint16(a, lengthV6) + return a + }() + lengthUnixBytes = func() []byte { + a := make([]byte, 2) + binary.BigEndian.PutUint16(a, lengthUnix) + return a + }() +) + +type _ports struct { + SrcPort uint16 + DstPort uint16 +} + +type _addr4 struct { + Src [4]byte + Dst [4]byte + SrcPort uint16 + DstPort uint16 +} + +type _addr6 struct { + Src [16]byte + Dst [16]byte + _ports +} + +type _addrUnix struct { + Src [108]byte + Dst [108]byte +} + +func parseVersion2(reader *bufio.Reader) (header *Header, err error) { + // Skip first 12 bytes (signature) + for i := 0; i < 12; i++ { + if _, err = reader.ReadByte(); err != nil { + return nil, ErrCantReadProtocolVersionAndCommand + } + } + + header = new(Header) + header.Version = 2 + + // Read the 13th byte, protocol version and command + b13, err := reader.ReadByte() + if err != nil { + return nil, ErrCantReadProtocolVersionAndCommand + } + header.Command = ProtocolVersionAndCommand(b13) + if _, ok := supportedCommand[header.Command]; !ok { + return nil, ErrUnsupportedProtocolVersionAndCommand + } + + // Read the 14th byte, address family and protocol + b14, err := reader.ReadByte() + if err != nil { + return nil, ErrCantReadAddressFamilyAndProtocol + } + header.TransportProtocol = AddressFamilyAndProtocol(b14) + if _, ok := supportedTransportProtocol[header.TransportProtocol]; !ok { + return nil, ErrUnsupportedAddressFamilyAndProtocol + } + + // Make sure there are bytes available as specified in length + var length uint16 + if err := binary.Read(io.LimitReader(reader, 2), binary.BigEndian, &length); err != nil { + return nil, ErrCantReadLength + } + if !header.validateLength(length) { + return nil, ErrInvalidLength + } + + if _, err := reader.Peek(int(length)); err != nil { + return nil, ErrInvalidLength + } + + // Length-limited reader for payload section + payloadReader := io.LimitReader(reader, int64(length)) + + // Read addresses and ports + if header.TransportProtocol.IsIPv4() { + var addr _addr4 + if err := binary.Read(payloadReader, binary.BigEndian, &addr); err != nil { + return nil, ErrInvalidAddress + } + header.SourceAddress = addr.Src[:] + header.DestinationAddress = addr.Dst[:] + header.SourcePort = addr.SrcPort + header.DestinationPort = addr.DstPort + } else if header.TransportProtocol.IsIPv6() { + var addr _addr6 + if err := binary.Read(payloadReader, binary.BigEndian, &addr); err != nil { + return nil, ErrInvalidAddress + } + header.SourceAddress = addr.Src[:] + header.DestinationAddress = addr.Dst[:] + header.SourcePort = addr.SrcPort + header.DestinationPort = addr.DstPort + } + // TODO fully support Unix addresses + // else if header.TransportProtocol.IsUnix() { + // var addr _addrUnix + // if err := binary.Read(payloadReader, binary.BigEndian, &addr); err != nil { + // return nil, ErrInvalidAddress + // } + // + //if header.SourceAddress, err = net.ResolveUnixAddr("unix", string(addr.Src[:])); err != nil { + // return nil, ErrCantResolveSourceUnixAddress + //} + //if header.DestinationAddress, err = net.ResolveUnixAddr("unix", string(addr.Dst[:])); err != nil { + // return nil, ErrCantResolveDestinationUnixAddress + //} + //} + + // TODO add encapsulated TLV support + + // Drain the remaining padding + payloadReader.Read(make([]byte, length)) + + return header, nil +} + +func (header *Header) formatVersion2() ([]byte, error) { + var buf bytes.Buffer + buf.Write(SIGV2) + buf.WriteByte(header.Command.toByte()) + buf.WriteByte(header.TransportProtocol.toByte()) + // TODO add encapsulated TLV length + var addrSrc, addrDst []byte + if header.TransportProtocol.IsIPv4() { + buf.Write(lengthV4Bytes) + addrSrc = header.SourceAddress.To4() + addrDst = header.DestinationAddress.To4() + } else if header.TransportProtocol.IsIPv6() { + buf.Write(lengthV6Bytes) + addrSrc = header.SourceAddress.To16() + addrDst = header.DestinationAddress.To16() + } else if header.TransportProtocol.IsUnix() { + buf.Write(lengthUnixBytes) + // TODO is below right? + addrSrc = []byte(header.SourceAddress.String()) + addrDst = []byte(header.DestinationAddress.String()) + } + buf.Write(addrSrc) + buf.Write(addrDst) + + portSrcBytes := func() []byte { + a := make([]byte, 2) + binary.BigEndian.PutUint16(a, header.SourcePort) + return a + }() + buf.Write(portSrcBytes) + + portDstBytes := func() []byte { + a := make([]byte, 2) + binary.BigEndian.PutUint16(a, header.DestinationPort) + return a + }() + buf.Write(portDstBytes) + + return buf.Bytes(), nil +} + +func (header *Header) validateLength(length uint16) bool { + if header.TransportProtocol.IsIPv4() { + return length >= lengthV4 + } else if header.TransportProtocol.IsIPv6() { + return length >= lengthV6 + } else if header.TransportProtocol.IsUnix() { + return length >= lengthUnix + } + return false +} diff --git a/vendor/github.com/pires/go-proxyproto/version_cmd.go b/vendor/github.com/pires/go-proxyproto/version_cmd.go new file mode 100644 index 0000000..2ee1a05 --- /dev/null +++ b/vendor/github.com/pires/go-proxyproto/version_cmd.go @@ -0,0 +1,39 @@ +package proxyproto + +// ProtocolVersionAndCommand represents proxy protocol version and command. +type ProtocolVersionAndCommand byte + +const ( + LOCAL = '\x20' + PROXY = '\x21' +) + +var supportedCommand = map[ProtocolVersionAndCommand]bool{ + LOCAL: true, + PROXY: true, +} + +// IsLocal returns true if the protocol version is \x2 and command is LOCAL, false otherwise. +func (pvc ProtocolVersionAndCommand) IsLocal() bool { + return 0x20 == pvc&0xF0 && 0x00 == pvc&0x0F +} + +// IsProxy returns true if the protocol version is \x2 and command is PROXY, false otherwise. +func (pvc ProtocolVersionAndCommand) IsProxy() bool { + return 0x20 == pvc&0xF0 && 0x01 == pvc&0x0F +} + +// IsUnspec returns true if the protocol version or command is unspecified, false otherwise. +func (pvc ProtocolVersionAndCommand) IsUnspec() bool { + return !(pvc.IsLocal() || pvc.IsProxy()) +} + +func (pvc ProtocolVersionAndCommand) toByte() byte { + if pvc.IsLocal() { + return LOCAL + } else if pvc.IsProxy() { + return PROXY + } + + return LOCAL +}