diff --git a/core/dispatch.go b/core/dispatch.go index 16ea044..b655e5d 100644 --- a/core/dispatch.go +++ b/core/dispatch.go @@ -2,25 +2,26 @@ package core import ( "fmt" - "github.com/google/gopacket/pcap" "log" + "time" + "github.com/google/gopacket" "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" "github.com/google/gopacket/tcpassembly" "github.com/google/gopacket/tcpassembly/tcpreader" - "time" ) type Dispatch struct { - device string + device string payload []byte - Plug *Plug + Plug *Plug } func NewDispatch(plug *Plug, cmd *Cmd) *Dispatch { - return &Dispatch { - Plug: plug, - device:cmd.Device, + return &Dispatch{ + Plug: plug, + device: cmd.Device, } } @@ -40,16 +41,16 @@ func (d *Dispatch) Capture() { } // Capture - src := gopacket.NewPacketSource(handle, handle.LinkType()) - packets := src.Packets() + src := gopacket.NewPacketSource(handle, handle.LinkType()) + packets := src.Packets() // get packet chan // Set up assembly streamFactory := &ProtocolStreamFactory{ - dispatch:d, + dispatch: d, } streamPool := NewStreamPool(streamFactory) - assembler := NewAssembler(streamPool) - ticker := time.Tick(time.Minute) + assembler := NewAssembler(streamPool) + ticker := time.Tick(time.Minute) // Loop until ctrl+z for { @@ -84,7 +85,7 @@ type ProtocolStream struct { func (m *ProtocolStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream { //init stream struct - stm := &ProtocolStream { + stm := &ProtocolStream{ net: net, transport: transport, r: tcpreader.NewReaderStream(), @@ -97,4 +98,4 @@ func (m *ProtocolStreamFactory) New(net, transport gopacket.Flow) tcpassembly.St go m.dispatch.Plug.ResolveStream(net, transport, &(stm.r)) return &(stm.r) -} \ No newline at end of file +} diff --git a/go-sniffer b/go-sniffer new file mode 100755 index 0000000..f61cc39 Binary files /dev/null and b/go-sniffer differ diff --git a/kafka.pcap b/kafka.pcap new file mode 100644 index 0000000..7f501f4 Binary files /dev/null and b/kafka.pcap differ diff --git a/plugSrc/kafka/build/const.go b/plugSrc/kafka/build/const.go index 490bb2f..fe775de 100644 --- a/plugSrc/kafka/build/const.go +++ b/plugSrc/kafka/build/const.go @@ -1,14 +1,14 @@ package build const ( - ProduceRequest = 0 - FetchRequest = 1 - OffsetRequest = 2 - MetadataRequest = 3 + ProduceRequest = 0 + FetchRequest = 1 + OffsetRequest = 2 + MetadataRequest = 3 //Non-user facing control APIs = 4-7 OffsetCommitRequest = 8 OffsetFetchRequest = 9 - GroupCoordinatorRequest = 10 + GroupCoordinatorRequest = 10 JoinGroupRequest = 11 HeartbeatRequest = 12 LeaveGroupRequest = 13 @@ -19,6 +19,27 @@ const ( CreateTopicsReqKind = 19 ) +const () + +var RquestNameMap = map[int16]string{ + 0: "ProduceRequest", + 1: "FetchRequest", + 2: "OffsetRequest", + 3: "MetadataRequest", + //Non-user facing control APIs = 4-7 + 8: "OffsetCommitRequest", + 9: "OffsetFetchRequest", + 10: "GroupCoordinatorRequest", + 11: "JoinGroupRequest", + 12: "HeartbeatRequest", + 13: "LeaveGroupRequest", + 14: "SyncGroupRequest", + 15: "DescribeGroupsRequest", + 16: "ListGroupsRequest", + 18: "APIVersionsReqKind", + 19: "CreateTopicsReqKind", +} + const ( ApiV0 = 0 ApiV1 = 1 diff --git a/plugSrc/kafka/build/entry.go b/plugSrc/kafka/build/entry.go index 28bf315..9d5fd0c 100644 --- a/plugSrc/kafka/build/entry.go +++ b/plugSrc/kafka/build/entry.go @@ -2,15 +2,17 @@ package build import ( "bytes" + "encoding/json" "fmt" - "github.com/google/gopacket" "io" "strconv" "sync" + + "github.com/google/gopacket" ) const ( - Port = 9092 + Port = 9092 Version = "0.1" CmdPort = "-p" ) @@ -22,18 +24,18 @@ type Kafka struct { } type stream struct { - packets chan *packet + packets chan *packet + correlationMap map[int32]requestHeader } type packet struct { - - isClientFlow bool //客户端->服务器端流 - messageSize int32 + isClientFlow bool //客户端->服务器端流 + messageSize int32 requestHeader responseHeader - payload io.Reader + payload io.Reader } type requestHeader struct { @@ -51,50 +53,52 @@ type messageSet struct { offset int64 messageSize int32 } + func newMessageSet(r io.Reader) messageSet { messageSet := messageSet{} - messageSet.offset = ReadInt64(r) + _, messageSet.offset = ReadInt64(r) messageSet.messageSize = ReadInt32(r) return messageSet } type message struct { - crc int32 - magicByte int8 - attributes int8 - key []byte - value []byte + crc int32 + magicByte int8 + attributes int8 + key []byte + value []byte } var kafkaInstance *Kafka var once sync.Once + func NewInstance() *Kafka { once.Do(func() { kafkaInstance = &Kafka{ - port :Port, - version:Version, - source: make(map[string]*stream), + port: Port, + version: Version, + source: make(map[string]*stream), } }) return kafkaInstance } -func (m *Kafka) SetFlag(flg []string) { +func (m *Kafka) SetFlag(flg []string) { c := len(flg) if c == 0 { return } - if c >> 1 != 1 { + if c>>1 != 1 { panic("Mongodb参数数量不正确!") } - for i:=0;i RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] - RequiredAcks => int16 - Timeout => int32 - Partition => int32 - MessageSetSize => int32 - - */ +Produce request Protocol +v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later) +ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] + RequiredAcks => int16 + Timeout => int32 + Partition => int32 + MessageSetSize => int32 + +*/ type ProduceReq struct { TransactionalID string RequiredAcks int16 @@ -44,9 +45,8 @@ type ProduceReqPartition struct { Messages []*Message } -func ReadProduceRequest(r io.Reader, version int16) string { - - var msg string +func ReadProduceRequest(r io.Reader, version int16) *ProduceReq { + // version == 1 produceReq := ProduceReq{} @@ -55,25 +55,161 @@ func ReadProduceRequest(r io.Reader, version int16) string { fmt.Println(produceReq.TransactionalID) } - produceReq.RequiredAcks = ReadInt16(r) - produceReq.Timeout = time.Duration(ReadInt32(r)) * time.Millisecond + produceReq.RequiredAcks = ReadInt16(r) + produceReq.Timeout = time.Duration(ReadInt32(r)) * time.Millisecond l := ReadInt32(r) - req := ProduceReq{} - req.Topics = make([]ProduceReqTopic, l) + produceReq.Topics = make([]ProduceReqTopic, l) - for ti := range req.Topics { - var topic = &req.Topics[ti] - topic.Name,_ = ReadString(r) - fmt.Println("msg") - fmt.Println(topic.Name) + for ti := range produceReq.Topics { + var topic = &produceReq.Topics[ti] + topic.Name, _ = ReadString(r) l := ReadInt32(r) topic.Partitions = make([]ProduceReqPartition, l) + for idx := 0; idx < int(l); idx++ { + topic.Partitions[idx].ID = ReadInt32(r) + _ = ReadInt32(r) // partitions size + topic.Partitions[idx].Messages = ReadMessages(r, version) + } } - return msg + return &produceReq +} + +type ProduceRspPartitions struct { + PartitionID int32 + Error int16 + Offset int64 +} + +type ProduceRspTopic struct { + TopicName string + Partitions []ProduceRspPartitions + ThrottleTime int32 +} + +type ProduceRsp struct { + Topics []ProduceRspTopic } +func ReadProduceResponse(r io.Reader, version int16) *ProduceRsp { + // version == 1 + produceRsp := ProduceRsp{} + l := ReadInt32(r) + produceRsp.Topics = make([]ProduceRspTopic, 0) + for i := 0; i < int(l); i++ { + topic := ProduceRspTopic{} + topic.TopicName, _ = ReadString(r) + pl := ReadInt32(r) + topic.Partitions = make([]ProduceRspPartitions, 0) + for j := 0; j < int(pl); j++ { + pt := ProduceRspPartitions{} + pt.PartitionID = ReadInt32(r) + pt.Error = ReadInt16(r) + _, pt.Offset = ReadInt64(r) + topic.Partitions = append(topic.Partitions, pt) + } + produceRsp.Topics = append(produceRsp.Topics, topic) + } + return &produceRsp +} + +type MetadataReq struct { + TopicNames []string +} + +func ReadMetadataRequest(r io.Reader, version int16) *MetadataReq { + // version == 0 + metadataReq := MetadataReq{} + + l := ReadInt32(r) + for i := 0; i < int(l); i++ { + topicName, _ := ReadString(r) + metadataReq.TopicNames = append(metadataReq.TopicNames, topicName) + } + + return &metadataReq +} +type Broker struct { + NodeID int32 + Host string + Port int32 +} + +type PartitionMetada struct { + ErrorCode int16 + PartitionIndex int32 + LeaderID int32 + ReplicaNodes []int32 + IsrNodes []int32 +} + +type TopicMetadata struct { + ErrorCode int16 + Name string + Partitions []PartitionMetada +} + +type MetadataRsp struct { + Brokers []Broker + Topics []TopicMetadata +} + +func ReadMetadataResponse(r io.Reader, version int16) *MetadataRsp { + // version == 0 + metadataRsp := MetadataRsp{} + + // read brokers + metadataRsp.Brokers = make([]Broker, 0) + l := ReadInt32(r) + for i := 0; i < int(l); i++ { + broker := Broker{} + broker.NodeID = ReadInt32(r) + broker.Host, _ = ReadString(r) + broker.Port = ReadInt32(r) + metadataRsp.Brokers = append(metadataRsp.Brokers, broker) + } + + // read topics + metadataRsp.Topics = make([]TopicMetadata, 0) + l = ReadInt32(r) + for i := 0; i < int(l); i++ { + topicMetadata := TopicMetadata{} + topicMetadata.ErrorCode = ReadInt16(r) + topicMetadata.Name, _ = ReadString(r) + pl := ReadInt32(r) + topicMetadata.Partitions = make([]PartitionMetada, 0) + for j := 0; j < int(pl); j++ { + pm := PartitionMetada{} + pm.ErrorCode = ReadInt16(r) + pm.PartitionIndex = ReadInt32(r) + pm.LeaderID = ReadInt32(r) + + pm.ReplicaNodes = make([]int32, 0) + replicaLen := ReadInt32(r) + for ri := 0; ri < int(replicaLen); ri++ { + pm.ReplicaNodes = append(pm.ReplicaNodes, ReadInt32(r)) + } + + pm.IsrNodes = make([]int32, 0) + isrLen := ReadInt32(r) + for ri := 0; ri < int(isrLen); ri++ { + pm.IsrNodes = append(pm.IsrNodes, ReadInt32(r)) + } + topicMetadata.Partitions = append(topicMetadata.Partitions, pm) + } + metadataRsp.Topics = append(metadataRsp.Topics, topicMetadata) + } + + return &metadataRsp +} + +type Action struct { + Request string + Direction string + ApiVersion int16 + Message interface{} +} diff --git a/plugSrc/kafka/build/util.go b/plugSrc/kafka/build/util.go index a613956..000b983 100644 --- a/plugSrc/kafka/build/util.go +++ b/plugSrc/kafka/build/util.go @@ -2,7 +2,9 @@ package build import ( "encoding/binary" + "fmt" "io" + "strconv" "time" ) @@ -12,7 +14,7 @@ func GetNowStr(isClient bool) string { msg += time.Now().Format(layout) if isClient { msg += "| cli -> ser |" - }else{ + } else { msg += "| ser -> cli |" } return msg @@ -31,6 +33,11 @@ func ReadOnce() { } +func ReadByte(r io.Reader) (n byte) { + binary.Read(r, binary.BigEndian, &n) + return +} + func ReadInt16(r io.Reader) (n int16) { binary.Read(r, binary.BigEndian, &n) return @@ -41,18 +48,23 @@ func ReadInt32(r io.Reader) (n int32) { return } -func ReadInt64(r io.Reader) (n int64) { +func ReadUint32(r io.Reader) (n uint32) { binary.Read(r, binary.BigEndian, &n) return } +func ReadInt64(r io.Reader) (err error, n int64) { + err = binary.Read(r, binary.BigEndian, &n) + return +} + func ReadString(r io.Reader) (string, int) { l := int(ReadInt16(r)) //-1 => null if l == -1 { - return " ",1 + return " ", 1 } str := make([]byte, l) @@ -62,6 +74,7 @@ func ReadString(r io.Reader) (string, int) { return string(str), l } + // //func TryReadInt16(r io.Reader) (n int16, err error) { // @@ -76,19 +89,70 @@ func ReadString(r io.Reader) (string, int) { func ReadBytes(r io.Reader) []byte { l := int(ReadInt32(r)) + result := make([]byte, 0) + + if l <= 0 { + return result + } - var result []byte var b = make([]byte, l) - for i:=0;i