From 890ff5753f87fd04ed612f8c1e53437d51073c07 Mon Sep 17 00:00:00 2001 From: lanrenwo Date: Tue, 1 Nov 2022 14:17:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AE=A1=E8=AE=A1=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E5=BC=82=E6=AD=A5=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/handler/payload.go | 4 +- server/handler/payload_access_audit.go | 124 +++++++++++-------------- server/sessdata/session.go | 14 --- 3 files changed, 55 insertions(+), 87 deletions(-) diff --git a/server/handler/payload.go b/server/handler/payload.go index 626ae6e..e0e8258 100644 --- a/server/handler/payload.go +++ b/server/handler/payload.go @@ -16,9 +16,7 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { return false } if base.Cfg.AuditInterval >= 0 { - cSess.IpAuditPool.JobQueue <- func() { - logAudit(cSess, pl) - } + auditPayload.Add(cSess.Username, pl) } } diff --git a/server/handler/payload_access_audit.go b/server/handler/payload_access_audit.go index 4697c87..37de256 100644 --- a/server/handler/payload_access_audit.go +++ b/server/handler/payload_access_audit.go @@ -9,6 +9,7 @@ import ( "github.com/bjdgyc/anylink/dbdata" "github.com/bjdgyc/anylink/pkg/utils" "github.com/bjdgyc/anylink/sessdata" + "github.com/ivpusic/grpool" "github.com/songgao/water/waterutil" ) @@ -19,33 +20,49 @@ const ( acc_proto_http ) +var ( + logAuditSink *LogSink + auditPayload *AuditPayload +) + +// 审计日志 +type AuditPayload struct { + Pool *grpool.Pool + IpAuditMap utils.IMaps +} + // 保存批量的审计日志 type LogBatch struct { Logs []dbdata.AccessAudit } -// 日志池 +// 批量日志池 type LogSink struct { logChan chan dbdata.AccessAudit autoCommitChan chan *LogBatch // 超时通知 } -var logAuditSink *LogSink - -// 写入日志通道 -func logAuditWrite(aa dbdata.AccessAudit) { - logAuditSink.logChan <- aa +func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { + bPlData := getByteFull() + copy(*bPlData, pl.Data) + p.Pool.JobQueue <- func() { + logAudit(userName, bPlData) + } } -// 批量写入数据表 +// 开启批量写入数据功能 func logAuditBatch() { if base.Cfg.AuditInterval < 0 { return } logAuditSink = &LogSink{ - logChan: make(chan dbdata.AccessAudit, 1000), + logChan: make(chan dbdata.AccessAudit, 5000), autoCommitChan: make(chan *LogBatch, 10), } + auditPayload = &AuditPayload{ + Pool: grpool.NewPool(10, 500), + IpAuditMap: utils.NewMap("cmap", 0), + } var ( limit = 100 // 超过上限批量写入数据表 logAudit dbdata.AccessAudit @@ -53,6 +70,7 @@ func logAuditBatch() { commitTimer *time.Timer // 超时自动提交 timeOutBatch *LogBatch ) + for { select { case logAudit = <-logAuditSink.logChan: @@ -85,8 +103,11 @@ func logAuditBatch() { } // 解析IP包的数据 -func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { - ipProto := waterutil.IPv4Protocol(pl.Data) +func logAudit(userName string, bPlData *[]byte) { + defer putByte(bPlData) + + plData := *bPlData + ipProto := waterutil.IPv4Protocol(plData) // 访问协议 var accessProto uint8 // 只统计 tcp和udp 的访问 @@ -99,9 +120,9 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { return } - ipSrc := waterutil.IPv4Source(pl.Data) - ipDst := waterutil.IPv4Destination(pl.Data) - ipPort := waterutil.IPv4DestinationPort(pl.Data) + ipSrc := waterutil.IPv4Source(plData) + ipDst := waterutil.IPv4Destination(plData) + ipPort := waterutil.IPv4DestinationPort(plData) b := getByte51() key := *b @@ -114,79 +135,42 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { info := "" nu := utils.NowSec().Unix() if ipProto == waterutil.TCP { - plData := waterutil.IPv4Payload(pl.Data) - if len(plData) < 14 { + tcpPlData := waterutil.IPv4Payload(plData) + // 24 (ACK PSH) + if len(tcpPlData) < 14 || tcpPlData[13] != 24 { return } - flags := plData[13] - switch flags { - case flags & 0x20: - // URG - return - case flags & 0x14: - // RST ACK - return - case flags & 0x12: - // SYN ACK - return - case flags & 0x11: - // Client FIN - return - case flags & 0x10: - // ACK - return - case flags & 0x08: - // PSH - return - case flags & 0x04: - // RST - return - case flags & 0x02: - // SYN - return - case flags & 0x01: - // FIN - return - case flags & 0x18: - // PSH ACK - accessProto, info = onTCP(plData) - // HTTPS or HTTP - if accessProto != acc_proto_tcp { - // 提前存储只含ip数据的key, 避免即记录域名又记录一笔IP数据的记录 - ipKey := make([]byte, 51) - copy(ipKey, key) - ipS := utils.BytesToString(ipKey) - cSess.IpAuditMap.Set(ipS, nu) + accessProto, info = onTCP(tcpPlData) + // HTTPS or HTTP + if accessProto != acc_proto_tcp { + // 提前存储只含ip数据的key, 避免即记录域名又记录一笔IP数据的记录 + ipKey := make([]byte, 51) + copy(ipKey, key) + ipS := utils.BytesToString(ipKey) + auditPayload.IpAuditMap.Set(ipS, nu) - key[34] = byte(accessProto) - // 存储含域名的key - if info != "" { - md5Sum := md5.Sum([]byte(info)) - copy(key[35:51], md5Sum[:]) - } + key[34] = byte(accessProto) + // 存储含域名的key + if info != "" { + md5Sum := md5.Sum([]byte(info)) + copy(key[35:51], md5Sum[:]) } - case flags & 0x19: - // URG - return - case flags & 0xC2: - // SYN-ECE-CWR - return } } s := utils.BytesToString(key) // 判断已经存在,并且没有过期 - v, ok := cSess.IpAuditMap.Get(s) + v, ok := auditPayload.IpAuditMap.Get(s) if ok && nu-v.(int64) < int64(base.Cfg.AuditInterval) { // 回收byte对象 putByte51(b) return } - cSess.IpAuditMap.Set(s, nu) + auditPayload.IpAuditMap.Set(s, nu) audit := dbdata.AccessAudit{ - Username: cSess.Username, + Username: userName, Protocol: uint8(ipProto), Src: ipSrc.String(), Dst: ipDst.String(), @@ -195,5 +179,5 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { AccessProto: accessProto, Info: info, } - logAuditWrite(audit) + logAuditSink.logChan <- audit } diff --git a/server/sessdata/session.go b/server/sessdata/session.go index 56d3ffc..2da2ef4 100644 --- a/server/sessdata/session.go +++ b/server/sessdata/session.go @@ -12,9 +12,7 @@ import ( "github.com/bjdgyc/anylink/base" "github.com/bjdgyc/anylink/dbdata" - "github.com/bjdgyc/anylink/pkg/utils" mapset "github.com/deckarep/golang-set" - "github.com/ivpusic/grpool" atomic2 "go.uber.org/atomic" ) @@ -52,8 +50,6 @@ type ConnSession struct { PayloadIn chan *Payload PayloadOutCstp chan *Payload // Cstp的数据 PayloadOutDtls chan *Payload // Dtls的数据 - IpAuditMap utils.IMaps // 审计的ip数据 - IpAuditPool *grpool.Pool // 审计的IP包解析池 // dSess *DtlsSession dSess *atomic.Value } @@ -206,12 +202,6 @@ func (s *Session) NewConn() *ConnSession { dSess: &atomic.Value{}, } - // ip 审计 - if base.Cfg.AuditInterval >= 0 { - cSess.IpAuditMap = utils.NewMap("cmap", 0) - cSess.IpAuditPool = grpool.NewPool(1, 600) - } - dSess := &DtlsSession{ isActive: -1, } @@ -244,10 +234,6 @@ func (cs *ConnSession) Close() { cs.Sess.LastLogin = time.Now() cs.Sess.CSess = nil - if cs.IpAuditPool != nil { - cs.IpAuditPool.Release() - } - dSess := cs.GetDtlsSession() if dSess != nil { dSess.Close()