From 583ca4d635723a883549ad383563a28f6654e937 Mon Sep 17 00:00:00 2001 From: bjdgyc Date: Thu, 29 Jul 2021 19:02:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/handler/link_cstp.go | 56 ++++++++++++++++++++----------------- server/handler/link_tun.go | 21 +++++++------- server/handler/payload.go | 35 ++++++----------------- server/handler/pool.go | 27 +++++++++++++----- server/sessdata/protocol.go | 2 +- 5 files changed, 69 insertions(+), 72 deletions(-) diff --git a/server/handler/link_cstp.go b/server/handler/link_cstp.go index 09083e0..5e842bf 100644 --- a/server/handler/link_cstp.go +++ b/server/handler/link_cstp.go @@ -20,6 +20,7 @@ func LinkCstp(conn net.Conn, cSess *sessdata.ConnSession) { err error n int dataLen uint16 + data []byte dead = time.Duration(cSess.CstpDpd+5) * time.Second ) @@ -34,9 +35,9 @@ func LinkCstp(conn net.Conn, cSess *sessdata.ConnSession) { return } // hdata := make([]byte, BufferSize) - hb := getByteFull() - hdata := *hb - n, err = conn.Read(hdata) + pl := getPayload() + data = *pl.Data + n, err = conn.Read(data) if err != nil { base.Error("read hdata: ", err) return @@ -48,7 +49,7 @@ func LinkCstp(conn net.Conn, cSess *sessdata.ConnSession) { base.Error(err) } - switch hdata[6] { + switch data[6] { case 0x07: // KEEPALIVE // do nothing // base.Debug("recv keepalive", cSess.IpAddr) @@ -57,19 +58,20 @@ func LinkCstp(conn net.Conn, cSess *sessdata.ConnSession) { return case 0x03: // DPD-REQ // base.Debug("recv DPD-REQ", cSess.IpAddr) - if payloadOutCstp(cSess, sessdata.LTypeIPData, 0x04, nil) { + pl.PType = 0x04 + if payloadOutCstp(cSess, pl) { return } case 0x04: // log.Println("recv DPD-RESP") case 0x00: // DATA - dataLen = binary.BigEndian.Uint16(hdata[4:6]) // 4,5 - if payloadIn(cSess, sessdata.LTypeIPData, 0x00, hdata[8:8+dataLen]) { + dataLen = binary.BigEndian.Uint16(data[4:6]) // 4,5 + copy(data, data[8:8+dataLen]) + *pl.Data = data[:dataLen] + if payloadIn(cSess, pl) { return } } - - putByte(hb) } } @@ -81,40 +83,42 @@ func cstpWrite(conn net.Conn, cSess *sessdata.ConnSession) { }() var ( - err error - n int - // header []byte - payload *sessdata.Payload + err error + n int + data []byte + pl *sessdata.Payload ) for { select { - case payload = <-cSess.PayloadOutCstp: + case pl = <-cSess.PayloadOutCstp: case <-cSess.CloseChan: return } - if payload.LType != sessdata.LTypeIPData { + if pl.LType != sessdata.LTypeIPData { continue } - h := []byte{'S', 'T', 'F', 0x01, 0x00, 0x00, payload.PType, 0x00} - hb := getByteZero() - header := *hb - header = append(header, h...) - if payload.PType == 0x00 { - data := *payload.Data - binary.BigEndian.PutUint16(header[4:6], uint16(len(data))) - header = append(header, data...) + data = *pl.Data + if pl.PType == 0x00 { + l := len(data) + data = data[:l+8] + copy(data[8:], data) + copy(data[:8], plHeader) + binary.BigEndian.PutUint16(data[4:6], uint16(l)) + } else { + data = append(data[:0], plHeader...) + data[6] = pl.PType } - n, err = conn.Write(header) + *pl.Data = data + n, err = conn.Write(*pl.Data) if err != nil { base.Error("write err", err) return } - putByte(hb) - putPayload(payload) + putPayload(pl) // 限流设置 err = cSess.RateLimit(n, false) diff --git a/server/handler/link_tun.go b/server/handler/link_tun.go index 4b88e88..444075e 100644 --- a/server/handler/link_tun.go +++ b/server/handler/link_tun.go @@ -69,24 +69,24 @@ func tunWrite(ifce *water.Interface, cSess *sessdata.ConnSession) { }() var ( - err error - payload *sessdata.Payload + err error + pl *sessdata.Payload ) for { select { - case payload = <-cSess.PayloadIn: + case pl = <-cSess.PayloadIn: case <-cSess.CloseChan: return } - _, err = ifce.Write(*payload.Data) + _, err = ifce.Write(*pl.Data) if err != nil { base.Error("tun Write err", err) return } - putPayload(payload) + putPayload(pl) } } @@ -102,14 +102,15 @@ func tunRead(ifce *water.Interface, cSess *sessdata.ConnSession) { for { // data := make([]byte, BufferSize) - hb := getByteFull() - data := *hb - n, err = ifce.Read(data) + pl := getPayload() + n, err = ifce.Read(*pl.Data) if err != nil { base.Error("tun Read err", n, err) return } + *pl.Data = (*pl.Data)[:n] + // data = data[:n] // ip_src := waterutil.IPv4Source(data) // ip_dst := waterutil.IPv4Destination(data) @@ -118,10 +119,8 @@ func tunRead(ifce *water.Interface, cSess *sessdata.ConnSession) { // packet := gopacket.NewPacket(data, layers.LayerTypeIPv4, gopacket.Default) // fmt.Println("read:", packet) - if payloadOut(cSess, sessdata.LTypeIPData, 0x00, data[:n]) { + if payloadOut(cSess, pl) { return } - - putByte(hb) } } diff --git a/server/handler/payload.go b/server/handler/payload.go index c9b2ee3..67d22d4 100644 --- a/server/handler/payload.go +++ b/server/handler/payload.go @@ -6,18 +6,9 @@ import ( "github.com/songgao/water/waterutil" ) -func payloadIn(cSess *sessdata.ConnSession, lType sessdata.LType, pType byte, data []byte) bool { - pl := getPayload() - pl.LType = lType - pl.PType = pType - *pl.Data = append(*pl.Data, data...) - - return payloadInData(cSess, pl) -} - -func payloadInData(cSess *sessdata.ConnSession, payload *sessdata.Payload) bool { +func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { // 进行Acl规则判断 - check := checkLinkAcl(cSess.Group, payload) + check := checkLinkAcl(cSess.Group, pl) if !check { // 校验不通过直接丢弃 return false @@ -25,7 +16,7 @@ func payloadInData(cSess *sessdata.ConnSession, payload *sessdata.Payload) bool closed := false select { - case cSess.PayloadIn <- payload: + case cSess.PayloadIn <- pl: case <-cSess.CloseChan: closed = true } @@ -33,21 +24,16 @@ func payloadInData(cSess *sessdata.ConnSession, payload *sessdata.Payload) bool return closed } -func payloadOut(cSess *sessdata.ConnSession, lType sessdata.LType, pType byte, data []byte) bool { +func payloadOut(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { dSess := cSess.GetDtlsSession() if dSess == nil { - return payloadOutCstp(cSess, lType, pType, data) + return payloadOutCstp(cSess, pl) } else { - return payloadOutDtls(cSess, dSess, lType, pType, data) + return payloadOutDtls(cSess, dSess, pl) } } -func payloadOutCstp(cSess *sessdata.ConnSession, lType sessdata.LType, pType byte, data []byte) bool { - pl := getPayload() - pl.LType = lType - pl.PType = pType - *pl.Data = append(*pl.Data, data...) - +func payloadOutCstp(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { closed := false select { @@ -59,12 +45,7 @@ func payloadOutCstp(cSess *sessdata.ConnSession, lType sessdata.LType, pType byt return closed } -func payloadOutDtls(cSess *sessdata.ConnSession, dSess *sessdata.DtlsSession, lType sessdata.LType, pType byte, data []byte) bool { - pl := getPayload() - pl.LType = lType - pl.PType = pType - *pl.Data = append(*pl.Data, data...) - +func payloadOutDtls(cSess *sessdata.ConnSession, dSess *sessdata.DtlsSession, pl *sessdata.Payload) bool { select { case cSess.PayloadOutDtls <- pl: case <-dSess.CloseChan: diff --git a/server/handler/pool.go b/server/handler/pool.go index 629c715..561f2df 100644 --- a/server/handler/pool.go +++ b/server/handler/pool.go @@ -3,14 +3,21 @@ package handler import ( "sync" + "github.com/bjdgyc/anylink/base" "github.com/bjdgyc/anylink/sessdata" ) -var plPool = sync.Pool{ +// 不允许直接修改 +// [6] => PType +var plHeader = []byte{'S', 'T', 'F', 0x01, 0x00, 0x00, 0x00, 0x00} + +var plPool = &sync.Pool{ New: func() interface{} { - b := make([]byte, 0, BufferSize) + b := make([]byte, BufferSize) pl := sessdata.Payload{ - Data: &b, + LType: sessdata.LTypeIPData, + PType: 0x00, + Data: &b, } // fmt.Println("plPool-init", len(pl.Data), cap(pl.Data)) return &pl @@ -23,13 +30,19 @@ func getPayload() *sessdata.Payload { } func putPayload(pl *sessdata.Payload) { - pl.LType = 0 - pl.PType = 0 - *pl.Data = (*pl.Data)[:0] + // 错误数据丢弃 + if cap(*pl.Data) != BufferSize { + base.Warn("payload cap is err", cap(*pl.Data)) + return + } + + pl.LType = sessdata.LTypeIPData + pl.PType = 0x00 + *pl.Data = (*pl.Data)[:BufferSize] plPool.Put(pl) } -var bytePool = sync.Pool{ +var bytePool = &sync.Pool{ New: func() interface{} { b := make([]byte, 0, BufferSize) // fmt.Println("bytePool-init") diff --git a/server/sessdata/protocol.go b/server/sessdata/protocol.go index f2d2377..512dad0 100644 --- a/server/sessdata/protocol.go +++ b/server/sessdata/protocol.go @@ -8,8 +8,8 @@ const ( ) type Payload struct { - PType byte // payload types LType LType // LinkType + PType byte // payload types Data *[]byte }