mirror of
				https://github.com/bjdgyc/anylink.git
				synced 2025-11-04 19:16:22 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			199 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package handler
 | 
						||
 | 
						||
import (
 | 
						||
	"crypto/md5"
 | 
						||
	"encoding/binary"
 | 
						||
	"runtime/debug"
 | 
						||
	"time"
 | 
						||
 | 
						||
	"github.com/bjdgyc/anylink/base"
 | 
						||
	"github.com/bjdgyc/anylink/dbdata"
 | 
						||
	"github.com/bjdgyc/anylink/pkg/utils"
 | 
						||
	"github.com/bjdgyc/anylink/sessdata"
 | 
						||
	"github.com/ivpusic/grpool"
 | 
						||
	"github.com/songgao/water/waterutil"
 | 
						||
)
 | 
						||
 | 
						||
const (
 | 
						||
	acc_proto_udp = iota + 1
 | 
						||
	acc_proto_tcp
 | 
						||
	acc_proto_https
 | 
						||
	acc_proto_http
 | 
						||
)
 | 
						||
 | 
						||
var (
 | 
						||
	auditPayload *AuditPayload
 | 
						||
	logBatch     *LogBatch
 | 
						||
)
 | 
						||
 | 
						||
// 分析审计日志
 | 
						||
type AuditPayload struct {
 | 
						||
	Pool       *grpool.Pool
 | 
						||
	IpAuditMap utils.IMaps
 | 
						||
}
 | 
						||
 | 
						||
// 保存审计日志
 | 
						||
type LogBatch struct {
 | 
						||
	Logs    []dbdata.AccessAudit
 | 
						||
	LogChan chan dbdata.AccessAudit
 | 
						||
}
 | 
						||
 | 
						||
// 异步写入pool
 | 
						||
func (p *AuditPayload) Add(userName string, pl *sessdata.Payload) {
 | 
						||
	select {
 | 
						||
	case p.Pool.JobQueue <- func() {
 | 
						||
		logAudit(userName, pl)
 | 
						||
	}:
 | 
						||
	default:
 | 
						||
		putPayload(pl)
 | 
						||
		base.Error("AccessAudit: AuditPayload channel is full")
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// 数据落盘
 | 
						||
func (l *LogBatch) Write() {
 | 
						||
	if len(l.Logs) == 0 {
 | 
						||
		return
 | 
						||
	}
 | 
						||
	_ = dbdata.AddBatch(l.Logs)
 | 
						||
	l.Reset()
 | 
						||
}
 | 
						||
 | 
						||
// 清空数据
 | 
						||
func (l *LogBatch) Reset() {
 | 
						||
	l.Logs = []dbdata.AccessAudit{}
 | 
						||
}
 | 
						||
 | 
						||
// 开启批量写入数据功能
 | 
						||
func logAuditBatch() {
 | 
						||
	if base.Cfg.AuditInterval < 0 {
 | 
						||
		return
 | 
						||
	}
 | 
						||
	auditPayload = &AuditPayload{
 | 
						||
		Pool:       grpool.NewPool(10, 10240),
 | 
						||
		IpAuditMap: utils.NewMap("cmap", 0),
 | 
						||
	}
 | 
						||
	logBatch = &LogBatch{
 | 
						||
		LogChan: make(chan dbdata.AccessAudit, 10240),
 | 
						||
	}
 | 
						||
	var (
 | 
						||
		limit       = 100 // 超过上限批量写入数据表
 | 
						||
		outTime     = time.NewTimer(time.Second)
 | 
						||
		accessAudit = dbdata.AccessAudit{}
 | 
						||
	)
 | 
						||
 | 
						||
	for {
 | 
						||
		// 重置超时 时间
 | 
						||
		outTime.Reset(time.Second * 1)
 | 
						||
		select {
 | 
						||
		case accessAudit = <-logBatch.LogChan:
 | 
						||
			logBatch.Logs = append(logBatch.Logs, accessAudit)
 | 
						||
			if len(logBatch.Logs) >= limit {
 | 
						||
				if !outTime.Stop() {
 | 
						||
					<-outTime.C
 | 
						||
				}
 | 
						||
				logBatch.Write()
 | 
						||
			}
 | 
						||
		case <-outTime.C:
 | 
						||
			logBatch.Write()
 | 
						||
		}
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// 解析IP包的数据
 | 
						||
func logAudit(userName string, pl *sessdata.Payload) {
 | 
						||
	defer func() {
 | 
						||
		if err := recover(); err != nil {
 | 
						||
			base.Error("logAudit is panic: ", err, "\n", string(debug.Stack()), "\n", pl.Data)
 | 
						||
		}
 | 
						||
		putPayload(pl)
 | 
						||
	}()
 | 
						||
 | 
						||
	if !(pl.LType == sessdata.LTypeIPData && pl.PType == 0x00) {
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	ipProto := waterutil.IPv4Protocol(pl.Data)
 | 
						||
	// 访问协议
 | 
						||
	var accessProto uint8
 | 
						||
	// 只统计 tcp和udp 的访问
 | 
						||
	switch ipProto {
 | 
						||
	case waterutil.TCP:
 | 
						||
		accessProto = acc_proto_tcp
 | 
						||
	case waterutil.UDP:
 | 
						||
		accessProto = acc_proto_udp
 | 
						||
	default:
 | 
						||
		return
 | 
						||
	}
 | 
						||
	// IP报文只包含头部信息时, 则打印LOG,并退出
 | 
						||
	ipPl := waterutil.IPv4Payload(pl.Data)
 | 
						||
	if len(ipPl) < 4 {
 | 
						||
		base.Error("ipPl len < 4", ipPl, pl.Data)
 | 
						||
		return
 | 
						||
	}
 | 
						||
	ipPort := (uint16(ipPl[2]) << 8) | uint16(ipPl[3])
 | 
						||
	ipSrc := waterutil.IPv4Source(pl.Data)
 | 
						||
	ipDst := waterutil.IPv4Destination(pl.Data)
 | 
						||
	b := getByte51()
 | 
						||
	key := *b
 | 
						||
	copy(key[:16], ipSrc)
 | 
						||
	copy(key[16:32], ipDst)
 | 
						||
	binary.BigEndian.PutUint16(key[32:34], ipPort)
 | 
						||
	key[34] = byte(accessProto)
 | 
						||
	copy(key[35:51], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})
 | 
						||
 | 
						||
	info := ""
 | 
						||
	nu := utils.NowSec().Unix()
 | 
						||
	if ipProto == waterutil.TCP {
 | 
						||
		tcpPlData := waterutil.IPv4Payload(pl.Data)
 | 
						||
		// 24 (ACK PSH)
 | 
						||
		if len(tcpPlData) < 14 || tcpPlData[13] != 24 {
 | 
						||
			return
 | 
						||
		}
 | 
						||
		accessProto, info = onTCP(tcpPlData)
 | 
						||
		// HTTPS or HTTP
 | 
						||
		if accessProto != acc_proto_tcp {
 | 
						||
			// 提前存储只含ip数据的key, 避免即记录域名又记录一笔IP数据的记录
 | 
						||
			ipKey := make([]byte, 51)
 | 
						||
			copy(ipKey, key)
 | 
						||
			ipS := utils.BytesToString(ipKey)
 | 
						||
			auditPayload.IpAuditMap.Set(ipS, nu)
 | 
						||
 | 
						||
			key[34] = byte(accessProto)
 | 
						||
			// 存储含域名的key
 | 
						||
			if info != "" {
 | 
						||
				md5Sum := md5.Sum([]byte(info))
 | 
						||
				copy(key[35:51], md5Sum[:])
 | 
						||
			}
 | 
						||
		}
 | 
						||
	}
 | 
						||
	s := utils.BytesToString(key)
 | 
						||
 | 
						||
	// 判断已经存在,并且没有过期
 | 
						||
	v, ok := auditPayload.IpAuditMap.Get(s)
 | 
						||
	if ok && nu-v.(int64) < int64(base.Cfg.AuditInterval) {
 | 
						||
		// 回收byte对象
 | 
						||
		putByte51(b)
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	auditPayload.IpAuditMap.Set(s, nu)
 | 
						||
 | 
						||
	audit := dbdata.AccessAudit{
 | 
						||
		Username:    userName,
 | 
						||
		Protocol:    uint8(ipProto),
 | 
						||
		Src:         ipSrc.String(),
 | 
						||
		Dst:         ipDst.String(),
 | 
						||
		DstPort:     ipPort,
 | 
						||
		CreatedAt:   utils.NowSec(),
 | 
						||
		AccessProto: accessProto,
 | 
						||
		Info:        info,
 | 
						||
	}
 | 
						||
	select {
 | 
						||
	case logBatch.LogChan <- audit:
 | 
						||
	default:
 | 
						||
		base.Error("AccessAudit: LogChan channel is full")
 | 
						||
		return
 | 
						||
	}
 | 
						||
}
 |