diff --git a/server/handler/link_tap.go b/server/handler/link_tap.go index 2e3062b..67972ff 100644 --- a/server/handler/link_tap.go +++ b/server/handler/link_tap.go @@ -175,7 +175,7 @@ func allTapWrite(ifce LinkDriver, cSess *sessdata.ConnSession) { return } - putPayload(pl) + putPayloadInBefore(cSess, pl) } } diff --git a/server/handler/link_tun.go b/server/handler/link_tun.go index 5ab2fef..0ba7c76 100644 --- a/server/handler/link_tun.go +++ b/server/handler/link_tun.go @@ -108,7 +108,7 @@ func tunWrite(ifce *water.Interface, cSess *sessdata.ConnSession) { return } - putPayload(pl) + putPayloadInBefore(cSess, pl) } } diff --git a/server/handler/payload.go b/server/handler/payload.go index e0e8258..e5ed545 100644 --- a/server/handler/payload.go +++ b/server/handler/payload.go @@ -15,9 +15,6 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { // 校验不通过直接丢弃 return false } - if base.Cfg.AuditInterval >= 0 { - auditPayload.Add(cSess.Username, pl) - } } closed := false @@ -30,6 +27,15 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { return closed } +func putPayloadInBefore(cSess *sessdata.ConnSession, pl *sessdata.Payload) { + // 异步审计日志 + if base.Cfg.AuditInterval >= 0 { + auditPayload.Add(cSess.Username, pl) + return + } + putPayload(pl) +} + func payloadOut(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { dSess := cSess.GetDtlsSession() if dSess == nil { diff --git a/server/handler/payload_access_audit.go b/server/handler/payload_access_audit.go index aee27a5..4384352 100644 --- a/server/handler/payload_access_audit.go +++ b/server/handler/payload_access_audit.go @@ -39,10 +39,13 @@ type LogBatch struct { // 异步写入pool func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { - bPlData := getByteFull() - copy(*bPlData, pl.Data) - p.Pool.JobQueue <- func() { - logAudit(userName, bPlData) + select { + case p.Pool.JobQueue <- func() { + logAudit(userName, pl) + }: + default: + putPayload(pl) + base.Error("AccessAudit: AuditPayload channel is full") } } @@ -66,11 +69,11 @@ func logAuditBatch() { return } auditPayload = &AuditPayload{ - Pool: grpool.NewPool(10, 500), + Pool: grpool.NewPool(10, 10240), IpAuditMap: utils.NewMap("cmap", 0), } logBatch = &LogBatch{ - LogChan: make(chan dbdata.AccessAudit, 5000), + LogChan: make(chan dbdata.AccessAudit, 10240), } var ( limit = 100 // 超过上限批量写入数据表 @@ -97,11 +100,13 @@ func logAuditBatch() { } // 解析IP包的数据 -func logAudit(userName string, bPlData *[]byte) { - defer putByte(bPlData) +func logAudit(userName string, pl *sessdata.Payload) { + defer putPayload(pl) - plData := *bPlData - ipProto := waterutil.IPv4Protocol(plData) + if !(pl.LType == sessdata.LTypeIPData && pl.PType == 0x00) { + return + } + ipProto := waterutil.IPv4Protocol(pl.Data) // 访问协议 var accessProto uint8 // 只统计 tcp和udp 的访问 @@ -114,9 +119,9 @@ func logAudit(userName string, bPlData *[]byte) { return } - ipSrc := waterutil.IPv4Source(plData) - ipDst := waterutil.IPv4Destination(plData) - ipPort := waterutil.IPv4DestinationPort(plData) + ipSrc := waterutil.IPv4Source(pl.Data) + ipDst := waterutil.IPv4Destination(pl.Data) + ipPort := waterutil.IPv4DestinationPort(pl.Data) b := getByte51() key := *b @@ -129,7 +134,7 @@ func logAudit(userName string, bPlData *[]byte) { info := "" nu := utils.NowSec().Unix() if ipProto == waterutil.TCP { - tcpPlData := waterutil.IPv4Payload(plData) + tcpPlData := waterutil.IPv4Payload(pl.Data) // 24 (ACK PSH) if len(tcpPlData) < 14 || tcpPlData[13] != 24 { return @@ -173,5 +178,11 @@ func logAudit(userName string, bPlData *[]byte) { AccessProto: accessProto, Info: info, } - logBatch.LogChan <- audit + + select { + case logBatch.LogChan <- audit: + default: + base.Error("AccessAudit: LogChan channel is full") + return + } }