mirror of
				https://github.com/bjdgyc/anylink.git
				synced 2025-11-04 19:16:22 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			192 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			192 lines
		
	
	
		
			4.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package handler
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"encoding/binary"
 | 
						|
	"net"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/bjdgyc/anylink/base"
 | 
						|
	"github.com/bjdgyc/anylink/dbdata"
 | 
						|
	"github.com/bjdgyc/anylink/pkg/utils"
 | 
						|
	"github.com/bjdgyc/anylink/sessdata"
 | 
						|
)
 | 
						|
 | 
						|
func LinkCstp(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
 | 
						|
	base.Debug("LinkCstp connect ip:", cSess.IpAddr, "user:", cSess.Username, "rip:", conn.RemoteAddr())
 | 
						|
	defer func() {
 | 
						|
		base.Debug("LinkCstp return", cSess.Username, cSess.IpAddr)
 | 
						|
		_ = conn.Close()
 | 
						|
		cSess.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	var (
 | 
						|
		err       error
 | 
						|
		n         int
 | 
						|
		dataLen   uint16
 | 
						|
		dead      = time.Second * time.Duration(cSess.CstpDpd+5)
 | 
						|
		idle      = time.Second * time.Duration(base.Cfg.IdleTimeout)
 | 
						|
		checkIdle = base.Cfg.IdleTimeout > 0
 | 
						|
		lastTime  time.Time
 | 
						|
	)
 | 
						|
 | 
						|
	go cstpWrite(conn, bufRW, cSess)
 | 
						|
 | 
						|
	for {
 | 
						|
 | 
						|
		// 设置超时限制
 | 
						|
		err = conn.SetReadDeadline(utils.NowSec().Add(dead))
 | 
						|
		if err != nil {
 | 
						|
			base.Error("SetDeadline: ", cSess.Username, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// hdata := make([]byte, BufferSize)
 | 
						|
		pl := getPayload()
 | 
						|
		n, err = bufRW.Read(pl.Data)
 | 
						|
		if err != nil {
 | 
						|
			base.Error("read hdata: ", cSess.Username, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// 限流设置
 | 
						|
		err = cSess.RateLimit(n, true)
 | 
						|
		if err != nil {
 | 
						|
			base.Error(err)
 | 
						|
		}
 | 
						|
 | 
						|
		switch pl.Data[6] {
 | 
						|
		case 0x07: // KEEPALIVE
 | 
						|
			// do nothing
 | 
						|
			// base.Debug("recv keepalive", cSess.IpAddr)
 | 
						|
			// 判断超时时间
 | 
						|
			if checkIdle {
 | 
						|
				lastTime = cSess.LastDataTime.Load()
 | 
						|
				if lastTime.Before(utils.NowSec().Add(-idle)) {
 | 
						|
					base.Warn("IdleTimeout", cSess.Username, cSess.IpAddr, "lastTime", lastTime)
 | 
						|
					sessdata.CloseSess(cSess.Sess.Token, dbdata.UserIdleTimeout)
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
		case 0x05: // DISCONNECT
 | 
						|
			cSess.UserLogoutCode = dbdata.UserLogoutClient
 | 
						|
			base.Debug("DISCONNECT", cSess.Username, cSess.IpAddr)
 | 
						|
			sessdata.CloseSess(cSess.Sess.Token, dbdata.UserLogoutClient)
 | 
						|
			return
 | 
						|
		case 0x03: // DPD-REQ
 | 
						|
			// base.Debug("recv DPD-REQ", cSess.IpAddr)
 | 
						|
			pl.PType = 0x04
 | 
						|
			pl.Data = pl.Data[:n]
 | 
						|
			if payloadOutCstp(cSess, pl) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case 0x04:
 | 
						|
		// log.Println("recv DPD-RESP")
 | 
						|
		case 0x08: // decompress
 | 
						|
			if cSess.CstpPickCmp == nil {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			dst := getByteFull()
 | 
						|
			nn, err := cSess.CstpPickCmp.Uncompress(pl.Data[8:], *dst)
 | 
						|
			if err != nil {
 | 
						|
				putByte(dst)
 | 
						|
				base.Error("cstp decompress error", err, nn)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			binary.BigEndian.PutUint16(pl.Data[4:6], uint16(nn))
 | 
						|
			pl.Data = append(pl.Data[:8], (*dst)[:nn]...)
 | 
						|
			putByte(dst)
 | 
						|
			fallthrough
 | 
						|
		case 0x00: // DATA
 | 
						|
			// 获取数据长度
 | 
						|
			dataLen = binary.BigEndian.Uint16(pl.Data[4:6]) // 4,5
 | 
						|
			// 修复 cstp 数据长度溢出报错
 | 
						|
			if 8+dataLen > BufferSize {
 | 
						|
				base.Error("recv error dataLen", cSess.Username, dataLen)
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			// 去除数据头
 | 
						|
			copy(pl.Data, pl.Data[8:8+dataLen])
 | 
						|
			// 更新切片长度
 | 
						|
			pl.Data = pl.Data[:dataLen]
 | 
						|
			// pl.Data = append(pl.Data[:0], pl.Data[8:8+dataLen]...)
 | 
						|
			if payloadIn(cSess, pl) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
			// 只记录返回正确的数据时间
 | 
						|
			cSess.LastDataTime.Store(utils.NowSec())
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func cstpWrite(conn net.Conn, bufRW *bufio.ReadWriter, cSess *sessdata.ConnSession) {
 | 
						|
	defer func() {
 | 
						|
		base.Debug("cstpWrite return", cSess.Username, cSess.IpAddr)
 | 
						|
		_ = conn.Close()
 | 
						|
		cSess.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	var (
 | 
						|
		err error
 | 
						|
		n   int
 | 
						|
		pl  *sessdata.Payload
 | 
						|
	)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case pl = <-cSess.PayloadOutCstp:
 | 
						|
		case <-cSess.CloseChan:
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if pl.LType != sessdata.LTypeIPData {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if pl.PType == 0x00 {
 | 
						|
			isCompress := false
 | 
						|
			if cSess.CstpPickCmp != nil && len(pl.Data) > base.Cfg.NoCompressLimit {
 | 
						|
				dst := getByteFull()
 | 
						|
				size, err := cSess.CstpPickCmp.Compress(pl.Data, (*dst)[8:])
 | 
						|
				if err == nil && size < len(pl.Data) {
 | 
						|
					copy((*dst)[:8], plHeader)
 | 
						|
					binary.BigEndian.PutUint16((*dst)[4:6], uint16(size))
 | 
						|
					(*dst)[6] = 0x08
 | 
						|
					pl.Data = append(pl.Data[:0], (*dst)[:size+8]...)
 | 
						|
					isCompress = true
 | 
						|
				}
 | 
						|
				putByte(dst)
 | 
						|
			}
 | 
						|
			if !isCompress {
 | 
						|
				// 获取数据长度
 | 
						|
				l := len(pl.Data)
 | 
						|
				// 先扩容 +8
 | 
						|
				pl.Data = pl.Data[:l+8]
 | 
						|
				// 数据后移
 | 
						|
				copy(pl.Data[8:], pl.Data)
 | 
						|
				// 添加头信息
 | 
						|
				copy(pl.Data[:8], plHeader)
 | 
						|
				// 更新头长度
 | 
						|
				binary.BigEndian.PutUint16(pl.Data[4:6], uint16(l))
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			// pl.Data = append(pl.Data[:0], plHeader...)
 | 
						|
			// 设置头类型
 | 
						|
			pl.Data[6] = pl.PType
 | 
						|
		}
 | 
						|
 | 
						|
		n, err = conn.Write(pl.Data)
 | 
						|
		if err != nil {
 | 
						|
			base.Error("write err", cSess.Username, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		putPayload(pl)
 | 
						|
 | 
						|
		// 限流设置
 | 
						|
		err = cSess.RateLimit(n, false)
 | 
						|
		if err != nil {
 | 
						|
			base.Error(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |