复用pl对象,减少copy性能损耗

This commit is contained in:
lanrenwo 2022-11-25 15:08:05 +08:00
parent e909ca552c
commit 9e53ec289c
4 changed files with 37 additions and 20 deletions

View File

@ -175,7 +175,7 @@ func allTapWrite(ifce LinkDriver, cSess *sessdata.ConnSession) {
return return
} }
putPayload(pl) putPayloadInBefore(cSess, pl)
} }
} }

View File

@ -108,7 +108,7 @@ func tunWrite(ifce *water.Interface, cSess *sessdata.ConnSession) {
return return
} }
putPayload(pl) putPayloadInBefore(cSess, pl)
} }
} }

View File

@ -15,9 +15,6 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool {
// 校验不通过直接丢弃 // 校验不通过直接丢弃
return false return false
} }
if base.Cfg.AuditInterval >= 0 {
auditPayload.Add(cSess.Username, pl)
}
} }
closed := false closed := false
@ -30,6 +27,15 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool {
return closed return closed
} }
func putPayloadInBefore(cSess *sessdata.ConnSession, pl *sessdata.Payload) {
// 异步审计日志
if base.Cfg.AuditInterval >= 0 {
auditPayload.Add(cSess.Username, pl)
return
}
putPayload(pl)
}
func payloadOut(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { func payloadOut(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool {
dSess := cSess.GetDtlsSession() dSess := cSess.GetDtlsSession()
if dSess == nil { if dSess == nil {

View File

@ -39,10 +39,13 @@ type LogBatch struct {
// 异步写入pool // 异步写入pool
func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) {
bPlData := getByteFull() select {
copy(*bPlData, pl.Data) case p.Pool.JobQueue <- func() {
p.Pool.JobQueue <- func() { logAudit(userName, pl)
logAudit(userName, bPlData) }:
default:
putPayload(pl)
base.Error("AccessAudit: AuditPayload channel is full")
} }
} }
@ -66,11 +69,11 @@ func logAuditBatch() {
return return
} }
auditPayload = &AuditPayload{ auditPayload = &AuditPayload{
Pool: grpool.NewPool(10, 500), Pool: grpool.NewPool(10, 10240),
IpAuditMap: utils.NewMap("cmap", 0), IpAuditMap: utils.NewMap("cmap", 0),
} }
logBatch = &LogBatch{ logBatch = &LogBatch{
LogChan: make(chan dbdata.AccessAudit, 5000), LogChan: make(chan dbdata.AccessAudit, 10240),
} }
var ( var (
limit = 100 // 超过上限批量写入数据表 limit = 100 // 超过上限批量写入数据表
@ -97,11 +100,13 @@ func logAuditBatch() {
} }
// 解析IP包的数据 // 解析IP包的数据
func logAudit(userName string, bPlData *[]byte) { func logAudit(userName string, pl *sessdata.Payload) {
defer putByte(bPlData) defer putPayload(pl)
plData := *bPlData if !(pl.LType == sessdata.LTypeIPData && pl.PType == 0x00) {
ipProto := waterutil.IPv4Protocol(plData) return
}
ipProto := waterutil.IPv4Protocol(pl.Data)
// 访问协议 // 访问协议
var accessProto uint8 var accessProto uint8
// 只统计 tcp和udp 的访问 // 只统计 tcp和udp 的访问
@ -114,9 +119,9 @@ func logAudit(userName string, bPlData *[]byte) {
return return
} }
ipSrc := waterutil.IPv4Source(plData) ipSrc := waterutil.IPv4Source(pl.Data)
ipDst := waterutil.IPv4Destination(plData) ipDst := waterutil.IPv4Destination(pl.Data)
ipPort := waterutil.IPv4DestinationPort(plData) ipPort := waterutil.IPv4DestinationPort(pl.Data)
b := getByte51() b := getByte51()
key := *b key := *b
@ -129,7 +134,7 @@ func logAudit(userName string, bPlData *[]byte) {
info := "" info := ""
nu := utils.NowSec().Unix() nu := utils.NowSec().Unix()
if ipProto == waterutil.TCP { if ipProto == waterutil.TCP {
tcpPlData := waterutil.IPv4Payload(plData) tcpPlData := waterutil.IPv4Payload(pl.Data)
// 24 (ACK PSH) // 24 (ACK PSH)
if len(tcpPlData) < 14 || tcpPlData[13] != 24 { if len(tcpPlData) < 14 || tcpPlData[13] != 24 {
return return
@ -173,5 +178,11 @@ func logAudit(userName string, bPlData *[]byte) {
AccessProto: accessProto, AccessProto: accessProto,
Info: info, Info: info,
} }
logBatch.LogChan <- audit
select {
case logBatch.LogChan <- audit:
default:
base.Error("AccessAudit: LogChan channel is full")
return
}
} }