diff --git a/core/plug.go b/core/plug.go index af0efd6..796c592 100644 --- a/core/plug.go +++ b/core/plug.go @@ -8,6 +8,7 @@ import ( mysql "github.com/40t/go-sniffer/plugSrc/mysql/build" redis "github.com/40t/go-sniffer/plugSrc/redis/build" hp "github.com/40t/go-sniffer/plugSrc/http/build" + mongodb "github.com/40t/go-sniffer/plugSrc/mongodb/build" "path/filepath" "fmt" "path" @@ -78,12 +79,12 @@ func (p *Plug) LoadInternalPlugList() { //Mysql list["mysql"] = mysql.NewInstance() - //TODO Mongodb - - //TODO ARP + //Mongodb + list["mongodb"] = mongodb.NewInstance() //Redis list["redis"] = redis.NewInstance() + //Http list["http"] = hp.NewInstance() diff --git a/plugSrc/mongodb/build/const.go b/plugSrc/mongodb/build/const.go new file mode 100644 index 0000000..860f90e --- /dev/null +++ b/plugSrc/mongodb/build/const.go @@ -0,0 +1,17 @@ +package build + +const ( + OP_REPLY = 1 //Reply to a client request. responseTo is set. + OP_UPDATE = 2001 //Update document. + OP_INSERT = 2002 //Insert new document. + RESERVED = 2003 //Formerly used for OP_GET_BY_OID. + + OP_QUERY = 2004 //Query a collection. + OP_GET_MORE = 2005 //Get more data from a query. See Cursors. + OP_DELETE = 2006 //Delete documents. + OP_KILL_CURSORS = 2007 //Notify database that the client has finished with the cursor. + + OP_COMMAND = 2010 //Cluster internal protocol representing a command request. + OP_COMMANDREPLY = 2011 //Cluster internal protocol representing a reply to an OP_COMMAND. + OP_MSG = 2013 //Send a message using the format introduced in MongoDB 3.6. +) \ No newline at end of file diff --git a/plugSrc/mongodb/build/entry.go b/plugSrc/mongodb/build/entry.go new file mode 100644 index 0000000..f167d42 --- /dev/null +++ b/plugSrc/mongodb/build/entry.go @@ -0,0 +1,286 @@ +package build + +import ( + "bytes" + "encoding/binary" + "fmt" + "github.com/google/gopacket" + "io" + "strconv" +) + +const ( + Port = 27017 + Version = "0.1" + CmdPort = "-p" +) + +type Mongodb struct { + port int + version string + source map[string]*stream +} + +type stream struct { + packets chan *packet +} + +type packet struct { + + isClientFlow bool //客户端->服务器端流 + + messageLength int //总消息大小 + requestID int //此消息的标识符 + responseTo int //从原始请求的requestID + opCode int //请求类型 + + payload io.Reader +} + +var mongodbInstance *Mongodb + +func NewInstance() *Mongodb { + if mongodbInstance == nil { + mongodbInstance = &Mongodb{ + port :Port, + version:Version, + source: make(map[string]*stream), + } + } + return mongodbInstance +} + +func (m *Mongodb) SetFlag(flg []string) { + c := len(flg) + if c == 0 { + return + } + if c >> 1 != 1 { + panic("Mysql参数数量不正确!") + } + for i:=0;i 65535 { + panic("参数不正确: 端口范围(0-65535)") + } + break + default: + panic("参数不正确") + } + } +} + +func (m *Mongodb) BPFFilter() string { + return "tcp and port "+strconv.Itoa(m.port); +} + +func (m *Mongodb) Version() string { + return m.version +} + +func (m *Mongodb) ResolveStream(net, transport gopacket.Flow, buf io.Reader) { + + //uuid + uuid := fmt.Sprintf("%v:%v", net.FastHash(), transport.FastHash()) + + //resolve packet + if _, ok := m.source[uuid]; !ok { + + var newStream = stream { + packets:make(chan *packet, 100), + } + + m.source[uuid] = &newStream + go newStream.resolve() + } + + //read bi-directional packet + //server -> client || client -> server + for { + + newPacket := m.newPacket(net, transport, buf) + if newPacket == nil { + return + } + + m.source[uuid].packets <- newPacket + } +} + +func (m *Mongodb) newPacket(net, transport gopacket.Flow, r io.Reader) *packet { + + //read packet + var packet *packet + var err error + packet, err = readStream(r) + + //stream close + if err == io.EOF { + fmt.Println(net, transport, " 关闭") + return nil + } else if err != nil { + fmt.Println("流解析错误", net, transport, ":", err) + return nil + } + + //set flow direction + if transport.Src().String() == strconv.Itoa(m.port) { + packet.isClientFlow = false + }else{ + packet.isClientFlow = true + } + + return packet +} + +func (stm *stream) resolve() { + for { + select { + case packet := <- stm.packets: + if packet.isClientFlow { + stm.resolveClientPacket(packet) + } else { + stm.resolveServerPacket(packet) + } + } + } +} + +func (stm *stream) resolveServerPacket(pk *packet) { + return +} + +func (stm *stream) resolveClientPacket(pk *packet) { + + var msg string + switch pk.opCode { + + case OP_UPDATE: + zero := ReadInt32(pk.payload) + fullCollectionName := ReadString(pk.payload) + flags := ReadInt32(pk.payload) + selector := ReadBson2Json(pk.payload) + update := ReadBson2Json(pk.payload) + _ = zero + _ = flags + + msg = fmt.Sprintf(" [更新] [集合:%s] 语句: %v %v", + fullCollectionName, + selector, + update, + ) + + case OP_INSERT: + flags := ReadInt32(pk.payload) + fullCollectionName := ReadString(pk.payload) + command := ReadBson2Json(pk.payload) + _ = flags + + msg = fmt.Sprintf(" [插入] [集合:%s] 语句: %v", + fullCollectionName, + command, + ) + + case OP_QUERY: + flags := ReadInt32(pk.payload) + fullCollectionName := ReadString(pk.payload) + numberToSkip := ReadInt32(pk.payload) + numberToReturn := ReadInt32(pk.payload) + _ = flags + _ = numberToSkip + _ = numberToReturn + + command := ReadBson2Json(pk.payload) + selector := ReadBson2Json(pk.payload) + + msg = fmt.Sprintf(" [查询] [集合:%s] 语句: %v %v", + fullCollectionName, + command, + selector, + ) + + case OP_COMMAND: + database := ReadString(pk.payload) + commandName := ReadString(pk.payload) + metaData := ReadBson2Json(pk.payload) + commandArgs := ReadBson2Json(pk.payload) + inputDocs := ReadBson2Json(pk.payload) + + msg = fmt.Sprintf(" [命令] [数据库:%s] [命令名:%s] %v %v %v", + database, + commandName, + metaData, + commandArgs, + inputDocs, + ) + + case OP_GET_MORE: + zero := ReadInt32(pk.payload) + fullCollectionName := ReadString(pk.payload) + numberToReturn := ReadInt32(pk.payload) + cursorId := ReadInt64(pk.payload) + _ = zero + + msg = fmt.Sprintf(" [查询更多] [集合:%s] [回复数量:%v] [游标:%v]", + fullCollectionName, + numberToReturn, + cursorId, + ) + + case OP_DELETE: + zero := ReadInt32(pk.payload) + fullCollectionName := ReadString(pk.payload) + flags := ReadInt32(pk.payload) + selector := ReadBson2Json(pk.payload) + _ = zero + _ = flags + + msg = fmt.Sprintf(" [删除] [集合:%s] 语句: %v", + fullCollectionName, + selector, + ) + + case OP_MSG: + return + default: + return + } + + fmt.Println(GetNowStr(true) + msg) +} + +func readStream(r io.Reader) (*packet, error) { + + var buf bytes.Buffer + p := &packet{} + + //header + header := make([]byte, 16) + if _, err := io.ReadFull(r, header); err != nil { + return nil,err + } + + // message length + payloadLen := binary.LittleEndian.Uint32(header[0:4]) - 16 + p.messageLength = int(payloadLen) + + // opCode + p.opCode = int(binary.LittleEndian.Uint32(header[12:])) + + if p.messageLength != 0 { + io.CopyN(&buf, r, int64(payloadLen)) + } + + p.payload = bytes.NewReader(buf.Bytes()) + + return p, nil +} diff --git a/plugSrc/mongodb/build/util.go b/plugSrc/mongodb/build/util.go new file mode 100644 index 0000000..e4ee299 --- /dev/null +++ b/plugSrc/mongodb/build/util.go @@ -0,0 +1,84 @@ +package build + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "gopkg.in/mgo.v2/bson" + "io" + "time" +) + +func GetNowStr(isClient bool) string { + var msg string + if isClient { + msg = time.Now().Format("2006-01-02 15:04:05")+"| cli -> ser |" + }else{ + msg = time.Now().Format("2006-01-02 15:04:05")+"| ser -> cli |" + } + return msg +} + +func ReadInt32(r io.Reader) (n int32) { + binary.Read(r, binary.LittleEndian, &n) + return +} + +func ReadInt64(r io.Reader) int64 { + var n int64 + binary.Read(r, binary.LittleEndian, &n) + return n +} + +func ReadString(r io.Reader) string { + + var result []byte + var b = make([]byte, 1) + for { + + _, err := r.Read(b) + + if err != nil { + panic(err) + } + + if b[0] == '\x00' { + break + } + + result = append(result, b[0]) + } + + return string(result) +} + +func ReadBson2Json(r io.Reader) (string) { + + //read len + docLen := ReadInt32(r) + if docLen == 0 { + return "" + } + + //document []byte + docBytes := make([]byte, int(docLen)) + binary.LittleEndian.PutUint32(docBytes, uint32(docLen)) + if _, err := io.ReadFull(r, docBytes[4:]); err != nil { + panic(err) + } + + //resolve document + var bsn bson.M + err := bson.Unmarshal(docBytes, &bsn) + if err != nil { + panic(err) + } + + //format to Json + jsonStr, err := json.Marshal(bsn) + if err != nil { + return fmt.Sprintf("{\"error\":%s}", err.Error()) + } + return string(jsonStr) +} +