优化批量写入的代码

This commit is contained in:
lanrenwo 2022-11-01 21:31:47 +08:00
parent 890ff5753f
commit 2d375869df
1 changed files with 25 additions and 31 deletions

View File

@ -39,9 +39,9 @@ type LogBatch struct {
// 批量日志池 // 批量日志池
type LogSink struct { type LogSink struct {
logChan chan dbdata.AccessAudit logChan chan dbdata.AccessAudit
autoCommitChan chan *LogBatch // 超时通知
} }
// 异步写入pool
func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) {
bPlData := getByteFull() bPlData := getByteFull()
copy(*bPlData, pl.Data) copy(*bPlData, pl.Data)
@ -50,6 +50,11 @@ func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) {
} }
} }
// 数据落盘
func (p *AuditPayload) Write(logs []dbdata.AccessAudit) {
_ = dbdata.AddBatch(logs)
}
// 开启批量写入数据功能 // 开启批量写入数据功能
func logAuditBatch() { func logAuditBatch() {
if base.Cfg.AuditInterval < 0 { if base.Cfg.AuditInterval < 0 {
@ -57,7 +62,6 @@ func logAuditBatch() {
} }
logAuditSink = &LogSink{ logAuditSink = &LogSink{
logChan: make(chan dbdata.AccessAudit, 5000), logChan: make(chan dbdata.AccessAudit, 5000),
autoCommitChan: make(chan *LogBatch, 10),
} }
auditPayload = &AuditPayload{ auditPayload = &AuditPayload{
Pool: grpool.NewPool(10, 500), Pool: grpool.NewPool(10, 500),
@ -65,39 +69,29 @@ func logAuditBatch() {
} }
var ( var (
limit = 100 // 超过上限批量写入数据表 limit = 100 // 超过上限批量写入数据表
logAudit dbdata.AccessAudit outTime = time.NewTimer(time.Second)
logBatch *LogBatch logBatch = &LogBatch{}
commitTimer *time.Timer // 超时自动提交 accessAudit = dbdata.AccessAudit{}
timeOutBatch *LogBatch
) )
for { for {
// 重置超时 时间
outTime.Reset(time.Second * 1)
select { select {
case logAudit = <-logAuditSink.logChan: case accessAudit = <-logAuditSink.logChan:
if logBatch == nil { logBatch.Logs = append(logBatch.Logs, accessAudit)
logBatch = &LogBatch{}
commitTimer = time.AfterFunc(
1*time.Second, func(logBatch *LogBatch) func() {
return func() {
logAuditSink.autoCommitChan <- logBatch
}
}(logBatch),
)
}
logBatch.Logs = append(logBatch.Logs, logAudit)
if len(logBatch.Logs) >= limit { if len(logBatch.Logs) >= limit {
commitTimer.Stop() if !outTime.Stop() {
_ = dbdata.AddBatch(logBatch.Logs) <-outTime.C
logBatch = nil
} }
case timeOutBatch = <-logAuditSink.autoCommitChan: auditPayload.Write(logBatch.Logs)
if timeOutBatch != logBatch { logBatch.Logs = []dbdata.AccessAudit{}
continue
} }
if logBatch != nil { case <-outTime.C:
_ = dbdata.AddBatch(logBatch.Logs) if len(logBatch.Logs) > 0 {
auditPayload.Write(logBatch.Logs)
logBatch.Logs = []dbdata.AccessAudit{}
} }
logBatch = nil
} }
} }
} }