mirror of
				https://github.com/bjdgyc/anylink.git
				synced 2025-11-01 00:59:34 +08:00 
			
		
		
		
	| @@ -16,9 +16,7 @@ func payloadIn(cSess *sessdata.ConnSession, pl *sessdata.Payload) bool { | ||||
| 			return false | ||||
| 		} | ||||
| 		if base.Cfg.AuditInterval >= 0 { | ||||
| 			cSess.IpAuditPool.JobQueue <- func() { | ||||
| 				logAudit(cSess, pl) | ||||
| 			} | ||||
| 			auditPayload.Add(cSess.Username, pl) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -9,6 +9,7 @@ import ( | ||||
| 	"github.com/bjdgyc/anylink/dbdata" | ||||
| 	"github.com/bjdgyc/anylink/pkg/utils" | ||||
| 	"github.com/bjdgyc/anylink/sessdata" | ||||
| 	"github.com/ivpusic/grpool" | ||||
| 	"github.com/songgao/water/waterutil" | ||||
| ) | ||||
|  | ||||
| @@ -19,74 +20,88 @@ const ( | ||||
| 	acc_proto_http | ||||
| ) | ||||
|  | ||||
| // 保存批量的审计日志 | ||||
| var ( | ||||
| 	auditPayload *AuditPayload | ||||
| 	logBatch     *LogBatch | ||||
| ) | ||||
|  | ||||
| // 分析审计日志 | ||||
| type AuditPayload struct { | ||||
| 	Pool       *grpool.Pool | ||||
| 	IpAuditMap utils.IMaps | ||||
| } | ||||
|  | ||||
| // 保存审计日志 | ||||
| type LogBatch struct { | ||||
| 	Logs    []dbdata.AccessAudit | ||||
| 	LogChan chan dbdata.AccessAudit | ||||
| } | ||||
|  | ||||
| // 日志池 | ||||
| type LogSink struct { | ||||
| 	logChan        chan dbdata.AccessAudit | ||||
| 	autoCommitChan chan *LogBatch // 超时通知 | ||||
| // 异步写入pool | ||||
| func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) { | ||||
| 	bPlData := getByteFull() | ||||
| 	copy(*bPlData, pl.Data) | ||||
| 	p.Pool.JobQueue <- func() { | ||||
| 		logAudit(userName, bPlData) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| var logAuditSink *LogSink | ||||
|  | ||||
| // 写入日志通道 | ||||
| func logAuditWrite(aa dbdata.AccessAudit) { | ||||
| 	logAuditSink.logChan <- aa | ||||
| // 数据落盘 | ||||
| func (l *LogBatch) Write() { | ||||
| 	if len(l.Logs) == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	_ = dbdata.AddBatch(l.Logs) | ||||
| 	l.Reset() | ||||
| } | ||||
|  | ||||
| // 批量写入数据表 | ||||
| // 清空数据 | ||||
| func (l *LogBatch) Reset() { | ||||
| 	l.Logs = []dbdata.AccessAudit{} | ||||
| } | ||||
|  | ||||
| // 开启批量写入数据功能 | ||||
| func logAuditBatch() { | ||||
| 	if base.Cfg.AuditInterval < 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	logAuditSink = &LogSink{ | ||||
| 		logChan:        make(chan dbdata.AccessAudit, 1000), | ||||
| 		autoCommitChan: make(chan *LogBatch, 10), | ||||
| 	auditPayload = &AuditPayload{ | ||||
| 		Pool:       grpool.NewPool(10, 500), | ||||
| 		IpAuditMap: utils.NewMap("cmap", 0), | ||||
| 	} | ||||
| 	logBatch = &LogBatch{ | ||||
| 		LogChan: make(chan dbdata.AccessAudit, 5000), | ||||
| 	} | ||||
| 	var ( | ||||
| 		limit       = 100 // 超过上限批量写入数据表 | ||||
| 		logAudit     dbdata.AccessAudit | ||||
| 		logBatch     *LogBatch | ||||
| 		commitTimer  *time.Timer // 超时自动提交 | ||||
| 		timeOutBatch *LogBatch | ||||
| 		outTime     = time.NewTimer(time.Second) | ||||
| 		accessAudit = dbdata.AccessAudit{} | ||||
| 	) | ||||
|  | ||||
| 	for { | ||||
| 		// 重置超时 时间 | ||||
| 		outTime.Reset(time.Second * 1) | ||||
| 		select { | ||||
| 		case logAudit = <-logAuditSink.logChan: | ||||
| 			if logBatch == nil { | ||||
| 				logBatch = &LogBatch{} | ||||
| 				commitTimer = time.AfterFunc( | ||||
| 					1*time.Second, func(logBatch *LogBatch) func() { | ||||
| 						return func() { | ||||
| 							logAuditSink.autoCommitChan <- logBatch | ||||
| 						} | ||||
| 					}(logBatch), | ||||
| 				) | ||||
| 			} | ||||
| 			logBatch.Logs = append(logBatch.Logs, logAudit) | ||||
| 		case accessAudit = <-logBatch.LogChan: | ||||
| 			logBatch.Logs = append(logBatch.Logs, accessAudit) | ||||
| 			if len(logBatch.Logs) >= limit { | ||||
| 				commitTimer.Stop() | ||||
| 				_ = dbdata.AddBatch(logBatch.Logs) | ||||
| 				logBatch = nil | ||||
| 				if !outTime.Stop() { | ||||
| 					<-outTime.C | ||||
| 				} | ||||
| 		case timeOutBatch = <-logAuditSink.autoCommitChan: | ||||
| 			if timeOutBatch != logBatch { | ||||
| 				continue | ||||
| 				logBatch.Write() | ||||
| 			} | ||||
| 			if logBatch != nil { | ||||
| 				_ = dbdata.AddBatch(logBatch.Logs) | ||||
| 			} | ||||
| 			logBatch = nil | ||||
| 		case <-outTime.C: | ||||
| 			logBatch.Write() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // 解析IP包的数据 | ||||
| func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { | ||||
| 	ipProto := waterutil.IPv4Protocol(pl.Data) | ||||
| func logAudit(userName string, bPlData *[]byte) { | ||||
| 	defer putByte(bPlData) | ||||
|  | ||||
| 	plData := *bPlData | ||||
| 	ipProto := waterutil.IPv4Protocol(plData) | ||||
| 	// 访问协议 | ||||
| 	var accessProto uint8 | ||||
| 	// 只统计 tcp和udp 的访问 | ||||
| @@ -99,9 +114,9 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	ipSrc := waterutil.IPv4Source(pl.Data) | ||||
| 	ipDst := waterutil.IPv4Destination(pl.Data) | ||||
| 	ipPort := waterutil.IPv4DestinationPort(pl.Data) | ||||
| 	ipSrc := waterutil.IPv4Source(plData) | ||||
| 	ipDst := waterutil.IPv4Destination(plData) | ||||
| 	ipPort := waterutil.IPv4DestinationPort(plData) | ||||
|  | ||||
| 	b := getByte51() | ||||
| 	key := *b | ||||
| @@ -114,49 +129,19 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { | ||||
| 	info := "" | ||||
| 	nu := utils.NowSec().Unix() | ||||
| 	if ipProto == waterutil.TCP { | ||||
| 		plData := waterutil.IPv4Payload(pl.Data) | ||||
| 		if len(plData) < 14 { | ||||
| 		tcpPlData := waterutil.IPv4Payload(plData) | ||||
| 		// 24 (ACK PSH) | ||||
| 		if len(tcpPlData) < 14 || tcpPlData[13] != 24 { | ||||
| 			return | ||||
| 		} | ||||
| 		flags := plData[13] | ||||
| 		switch flags { | ||||
| 		case flags & 0x20: | ||||
| 			// URG | ||||
| 			return | ||||
| 		case flags & 0x14: | ||||
| 			// RST ACK | ||||
| 			return | ||||
| 		case flags & 0x12: | ||||
| 			// SYN ACK | ||||
| 			return | ||||
| 		case flags & 0x11: | ||||
| 			// Client FIN | ||||
| 			return | ||||
| 		case flags & 0x10: | ||||
| 			// ACK | ||||
| 			return | ||||
| 		case flags & 0x08: | ||||
| 			// PSH | ||||
| 			return | ||||
| 		case flags & 0x04: | ||||
| 			// RST | ||||
| 			return | ||||
| 		case flags & 0x02: | ||||
| 			// SYN | ||||
| 			return | ||||
| 		case flags & 0x01: | ||||
| 			// FIN | ||||
| 			return | ||||
| 		case flags & 0x18: | ||||
| 			// PSH ACK | ||||
| 			accessProto, info = onTCP(plData) | ||||
| 		accessProto, info = onTCP(tcpPlData) | ||||
| 		// HTTPS or HTTP | ||||
| 		if accessProto != acc_proto_tcp { | ||||
| 			// 提前存储只含ip数据的key, 避免即记录域名又记录一笔IP数据的记录 | ||||
| 			ipKey := make([]byte, 51) | ||||
| 			copy(ipKey, key) | ||||
| 			ipS := utils.BytesToString(ipKey) | ||||
| 				cSess.IpAuditMap.Set(ipS, nu) | ||||
| 			auditPayload.IpAuditMap.Set(ipS, nu) | ||||
|  | ||||
| 			key[34] = byte(accessProto) | ||||
| 			// 存储含域名的key | ||||
| @@ -165,28 +150,21 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { | ||||
| 				copy(key[35:51], md5Sum[:]) | ||||
| 			} | ||||
| 		} | ||||
| 		case flags & 0x19: | ||||
| 			// URG | ||||
| 			return | ||||
| 		case flags & 0xC2: | ||||
| 			// SYN-ECE-CWR | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| 	s := utils.BytesToString(key) | ||||
|  | ||||
| 	// 判断已经存在,并且没有过期 | ||||
| 	v, ok := cSess.IpAuditMap.Get(s) | ||||
| 	v, ok := auditPayload.IpAuditMap.Get(s) | ||||
| 	if ok && nu-v.(int64) < int64(base.Cfg.AuditInterval) { | ||||
| 		// 回收byte对象 | ||||
| 		putByte51(b) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	cSess.IpAuditMap.Set(s, nu) | ||||
| 	auditPayload.IpAuditMap.Set(s, nu) | ||||
|  | ||||
| 	audit := dbdata.AccessAudit{ | ||||
| 		Username:    cSess.Username, | ||||
| 		Username:    userName, | ||||
| 		Protocol:    uint8(ipProto), | ||||
| 		Src:         ipSrc.String(), | ||||
| 		Dst:         ipDst.String(), | ||||
| @@ -195,5 +173,5 @@ func logAudit(cSess *sessdata.ConnSession, pl *sessdata.Payload) { | ||||
| 		AccessProto: accessProto, | ||||
| 		Info:        info, | ||||
| 	} | ||||
| 	logAuditWrite(audit) | ||||
| 	logBatch.LogChan <- audit | ||||
| } | ||||
|   | ||||
| @@ -12,9 +12,7 @@ import ( | ||||
|  | ||||
| 	"github.com/bjdgyc/anylink/base" | ||||
| 	"github.com/bjdgyc/anylink/dbdata" | ||||
| 	"github.com/bjdgyc/anylink/pkg/utils" | ||||
| 	mapset "github.com/deckarep/golang-set" | ||||
| 	"github.com/ivpusic/grpool" | ||||
| 	atomic2 "go.uber.org/atomic" | ||||
| ) | ||||
|  | ||||
| @@ -52,8 +50,6 @@ type ConnSession struct { | ||||
| 	PayloadIn           chan *Payload | ||||
| 	PayloadOutCstp      chan *Payload // Cstp的数据 | ||||
| 	PayloadOutDtls      chan *Payload // Dtls的数据 | ||||
| 	IpAuditMap          utils.IMaps   // 审计的ip数据 | ||||
| 	IpAuditPool         *grpool.Pool  // 审计的IP包解析池 | ||||
| 	// dSess *DtlsSession | ||||
| 	dSess *atomic.Value | ||||
| } | ||||
| @@ -206,12 +202,6 @@ func (s *Session) NewConn() *ConnSession { | ||||
| 		dSess:          &atomic.Value{}, | ||||
| 	} | ||||
|  | ||||
| 	// ip 审计 | ||||
| 	if base.Cfg.AuditInterval >= 0 { | ||||
| 		cSess.IpAuditMap = utils.NewMap("cmap", 0) | ||||
| 		cSess.IpAuditPool = grpool.NewPool(1, 600) | ||||
| 	} | ||||
|  | ||||
| 	dSess := &DtlsSession{ | ||||
| 		isActive: -1, | ||||
| 	} | ||||
| @@ -244,10 +234,6 @@ func (cs *ConnSession) Close() { | ||||
| 		cs.Sess.LastLogin = time.Now() | ||||
| 		cs.Sess.CSess = nil | ||||
|  | ||||
| 		if cs.IpAuditPool != nil { | ||||
| 			cs.IpAuditPool.Release() | ||||
| 		} | ||||
|  | ||||
| 		dSess := cs.GetDtlsSession() | ||||
| 		if dSess != nil { | ||||
| 			dSess.Close() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user