diff --git a/trunk/3rdparty/srs-bench/pcap/main.go b/trunk/3rdparty/srs-bench/pcap/main.go index 9409578be4..2464e51acb 100644 --- a/trunk/3rdparty/srs-bench/pcap/main.go +++ b/trunk/3rdparty/srs-bench/pcap/main.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "os" + "strings" "time" "github.com/google/gopacket" @@ -62,9 +63,19 @@ func doMain(ctx context.Context) error { } defer f.Close() - r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions) - if err != nil { - return errors.Wrapf(err, "new reader") + var source *gopacket.PacketSource + if strings.HasSuffix(filename, ".pcap") { + r, err := pcapgo.NewReader(f) + if err != nil { + return errors.Wrapf(err, "new reader") + } + source = gopacket.NewPacketSource(r, r.LinkType()) + } else { + r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions) + if err != nil { + return errors.Wrapf(err, "new reader") + } + source = gopacket.NewPacketSource(r, r.LinkType()) } // TODO: FIXME: Should start a goroutine to consume bytes from conn. @@ -76,7 +87,6 @@ func doMain(ctx context.Context) error { var packetNumber uint64 var previousTime *time.Time - source := gopacket.NewPacketSource(r, r.LinkType()) for packet := range source.Packets() { packetNumber++ @@ -90,7 +100,7 @@ func doMain(ctx context.Context) error { if len(payload) == 0 { continue } - if tcp.DstPort != 1935 { + if tcp.DstPort != 1935 && tcp.DstPort != 19350 { continue } diff --git a/trunk/3rdparty/srs-bench/tcpproxy/main.go b/trunk/3rdparty/srs-bench/tcpproxy/main.go new file mode 100644 index 0000000000..09dcfbf642 --- /dev/null +++ b/trunk/3rdparty/srs-bench/tcpproxy/main.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "fmt" + "net" + "sync" + "time" +) + +func main() { + if err := doMain(); err != nil { + panic(err) + } +} + +func doMain() error { + hashID := buildHashID() + + listener, err := net.Listen("tcp", ":1935") + if err != nil { + return err + } + trace(hashID, "Listen at %v", listener.Addr()) + + for { + client, err := listener.Accept() + if err != nil { + return err + } + + backend, err := net.Dial("tcp", "localhost:19350") + if err != nil { + return err + } + + go serve(client, backend) + } + return nil +} + +func serve(client, backend net.Conn) { + defer client.Close() + defer backend.Close() + hashID := buildHashID() + if err := doServe(hashID, client, backend); err != nil { + trace(hashID, "Serve error %v", err) + } +} + +func doServe(hashID string, client, backend net.Conn) error { + var wg sync.WaitGroup + var r0 error + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if c, ok := client.(*net.TCPConn); ok { + c.SetNoDelay(true) + } + if c, ok := backend.(*net.TCPConn); ok { + c.SetNoDelay(true) + } + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + for { + buf := make([]byte, 128*1024) + nn, err := client.Read(buf) + if err != nil { + trace(hashID, "Read from client error %v", err) + r0 = err + return + } + if nn == 0 { + trace(hashID, "Read from client EOF") + return + } + + _, err = backend.Write(buf[:nn]) + if err != nil { + trace(hashID, "Write to RTMP backend error %v", err) + r0 = err + return + } + + trace(hashID, "Copy %v bytes to RTMP backend", nn) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + for { + buf := make([]byte, 128*1024) + nn, err := backend.Read(buf) + if err != nil { + trace(hashID, "Read from RTMP backend error %v", err) + r0 = err + return + } + if nn == 0 { + trace(hashID, "Read from RTMP backend EOF") + return + } + + _, err = client.Write(buf[:nn]) + if err != nil { + trace(hashID, "Write to client error %v", err) + r0 = err + return + } + + trace(hashID, "Copy %v bytes to RTMP client", nn) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + + defer client.Close() + defer backend.Close() + + <-ctx.Done() + trace(hashID, "Context is done, close the connections") + }() + + trace(hashID, "Start proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr()) + wg.Wait() + trace(hashID, "Finish proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr()) + + return r0 +} + +func trace(id, msg string, a ...interface{}) { + fmt.Println(fmt.Sprintf("[%v][%v] %v", + time.Now().Format("2006-01-02 15:04:05.000"), id, + fmt.Sprintf(msg, a...), + )) +} + +func buildHashID() string { + randomData := make([]byte, 16) + if _, err := rand.Read(randomData); err != nil { + return "" + } + + hash := sha256.Sum256(randomData) + return hex.EncodeToString(hash[:])[:6] +} diff --git a/trunk/doc/CHANGELOG.md b/trunk/doc/CHANGELOG.md index 373dcccc64..7607a48bf6 100644 --- a/trunk/doc/CHANGELOG.md +++ b/trunk/doc/CHANGELOG.md @@ -7,6 +7,7 @@ The changelog for SRS. ## SRS 6.0 Changelog +* v6.0, 2024-03-19, Merge [#3958](https://github.com/ossrs/srs/pull/3958): Add a TCP proxy for debugging. v6.0.117 (#3958) * v6.0, 2024-03-20, Merge [#3964](https://github.com/ossrs/srs/pull/3964): WebRTC: Add support for A/V only WHEP/WHEP player. v6.0.116 (#3964) * v6.0, 2024-03-19, Merge [#3990](https://github.com/ossrs/srs/pull/3990): System: Disable feature that obtains versions and check features status. v6.0.115 (#3990) * v6.0, 2024-03-18, Merge [#3973](https://github.com/ossrs/srs/pull/3973): Typo: Fix some typo for #3973 #3976 #3982. v6.0.114 (#3973) diff --git a/trunk/src/core/srs_core_version6.hpp b/trunk/src/core/srs_core_version6.hpp index 5cb79c0389..d79d5849d4 100644 --- a/trunk/src/core/srs_core_version6.hpp +++ b/trunk/src/core/srs_core_version6.hpp @@ -9,6 +9,6 @@ #define VERSION_MAJOR 6 #define VERSION_MINOR 0 -#define VERSION_REVISION 116 +#define VERSION_REVISION 117 #endif