diff --git a/server/handler/payload_access_audit.go b/server/handler/payload_access_audit.go index 37de256..267dcfe 100644 --- a/server/handler/payload_access_audit.go +++ b/server/handler/payload_access_audit.go @@ -38,10 +38,10 @@ type LogBatch struct { // 批量日志池 type LogSink struct { - logChan chan dbdata.AccessAudit - autoCommitChan chan *LogBatch // 超时通知 + logChan chan dbdata.AccessAudit } +// 异步写入pool func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { bPlData := getByteFull() copy(*bPlData, pl.Data) @@ -50,54 +50,48 @@ func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { } } +// 数据落盘 +func (p *AuditPayload) Write(logs []dbdata.AccessAudit) { + _ = dbdata.AddBatch(logs) +} + // 开启批量写入数据功能 func logAuditBatch() { if base.Cfg.AuditInterval < 0 { return } logAuditSink = &LogSink{ - logChan: make(chan dbdata.AccessAudit, 5000), - autoCommitChan: make(chan *LogBatch, 10), + logChan: make(chan dbdata.AccessAudit, 5000), } auditPayload = &AuditPayload{ Pool: grpool.NewPool(10, 500), IpAuditMap: utils.NewMap("cmap", 0), } var ( - limit = 100 // 超过上限批量写入数据表 - logAudit dbdata.AccessAudit - logBatch *LogBatch - commitTimer *time.Timer // 超时自动提交 - timeOutBatch *LogBatch + limit = 100 // 超过上限批量写入数据表 + outTime = time.NewTimer(time.Second) + logBatch = &LogBatch{} + accessAudit = dbdata.AccessAudit{} ) for { + // 重置超时 时间 + outTime.Reset(time.Second * 1) select { - case logAudit = <-logAuditSink.logChan: - if logBatch == nil { - logBatch = &LogBatch{} - commitTimer = time.AfterFunc( - 1*time.Second, func(logBatch *LogBatch) func() { - return func() { - logAuditSink.autoCommitChan <- logBatch - } - }(logBatch), - ) - } - logBatch.Logs = append(logBatch.Logs, logAudit) + case accessAudit = <-logAuditSink.logChan: + logBatch.Logs = append(logBatch.Logs, accessAudit) if len(logBatch.Logs) >= limit { - commitTimer.Stop() - _ = dbdata.AddBatch(logBatch.Logs) - logBatch = nil + if !outTime.Stop() { + <-outTime.C + } + auditPayload.Write(logBatch.Logs) + logBatch.Logs = []dbdata.AccessAudit{} } - case timeOutBatch = <-logAuditSink.autoCommitChan: - if timeOutBatch != logBatch { - continue + case <-outTime.C: + if len(logBatch.Logs) > 0 { + auditPayload.Write(logBatch.Logs) + logBatch.Logs = []dbdata.AccessAudit{} } - if logBatch != nil { - _ = dbdata.AddBatch(logBatch.Logs) - } - logBatch = nil } } }