From 17c22577c991eb4179bbb65b9237262bdb40f2d9 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 31 Jan 2021 16:44:32 +0100 Subject: [PATCH] split sourcertmp and rtmpinfo --- internal/rtmpinfo/rtmpinfo.go | 133 ++++++++++++++++++++++ internal/sourcertmp/source.go | 202 ++++++++-------------------------- 2 files changed, 181 insertions(+), 154 deletions(-) create mode 100644 internal/rtmpinfo/rtmpinfo.go diff --git a/internal/rtmpinfo/rtmpinfo.go b/internal/rtmpinfo/rtmpinfo.go new file mode 100644 index 00000000..e0a4f796 --- /dev/null +++ b/internal/rtmpinfo/rtmpinfo.go @@ -0,0 +1,133 @@ +package rtmpinfo + +import ( + "fmt" + "net" + "time" + + "github.com/aler9/gortsplib" + "github.com/notedit/rtmp/av" + "github.com/notedit/rtmp/codec/h264" + "github.com/notedit/rtmp/format/flv/flvio" + "github.com/notedit/rtmp/format/rtmp" +) + +const ( + codecH264 = 7 + codecAAC = 10 +) + +func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) { + pkt, err := rconn.ReadPacket() + if err != nil { + return nil, err + } + + if pkt.Type != av.Metadata { + return nil, fmt.Errorf("first packet must be metadata") + } + + arr, err := flvio.ParseAMFVals(pkt.Data, false) + if err != nil { + return nil, err + } + + if len(arr) != 1 { + return nil, fmt.Errorf("invalid metadata") + } + + ma, ok := arr[0].(flvio.AMFMap) + if !ok { + return nil, fmt.Errorf("invalid metadata") + } + + return ma, nil +} + +// Info extracts track informations from a RTMP connection. +func Info(rconn *rtmp.Conn, nconn net.Conn, readTimeout time.Duration) ( + *gortsplib.Track, *gortsplib.Track, error) { + var videoTrack *gortsplib.Track + var audioTrack *gortsplib.Track + + // configuration must be completed within readTimeout + nconn.SetReadDeadline(time.Now().Add(readTimeout)) + + md, err := readMetadata(rconn) + if err != nil { + return nil, nil, err + } + + hasVideo := false + if v, ok := md.GetFloat64("videocodecid"); ok { + switch v { + case codecH264: + hasVideo = true + case 0: + default: + return nil, nil, fmt.Errorf("unsupported video codec %v", v) + } + + } + + hasAudio := false + if v, ok := md.GetFloat64("audiocodecid"); ok { + switch v { + case codecAAC: + hasAudio = true + case 0: + default: + return nil, nil, fmt.Errorf("unsupported audio codec %v", v) + } + } + + if !hasVideo && !hasAudio { + return nil, nil, fmt.Errorf("stream has no tracks") + } + + for { + var pkt av.Packet + pkt, err = rconn.ReadPacket() + if err != nil { + return nil, nil, err + } + + switch pkt.Type { + case av.H264DecoderConfig: + if !hasVideo { + return nil, nil, fmt.Errorf("unexpected video packet") + } + if videoTrack != nil { + return nil, nil, fmt.Errorf("video track setupped twice") + } + + codec, err := h264.FromDecoderConfig(pkt.Data) + if err != nil { + return nil, nil, err + } + + videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0]) + if err != nil { + return nil, nil, err + } + + case av.AACDecoderConfig: + if !hasAudio { + return nil, nil, fmt.Errorf("unexpected audio packet") + } + if audioTrack != nil { + return nil, nil, fmt.Errorf("audio track setupped twice") + } + + audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data) + if err != nil { + return nil, nil, err + } + } + + if (!hasVideo || videoTrack != nil) && + (!hasAudio || audioTrack != nil) { + return videoTrack, audioTrack, nil + } + } +} diff --git a/internal/sourcertmp/source.go b/internal/sourcertmp/source.go index 5bee9c78..03d9aca8 100644 --- a/internal/sourcertmp/source.go +++ b/internal/sourcertmp/source.go @@ -13,18 +13,15 @@ import ( "github.com/aler9/gortsplib/pkg/rtph264" "github.com/notedit/rtmp/av" "github.com/notedit/rtmp/codec/h264" - "github.com/notedit/rtmp/format/flv/flvio" "github.com/notedit/rtmp/format/rtmp" "github.com/aler9/rtsp-simple-server/internal/logger" + "github.com/aler9/rtsp-simple-server/internal/rtmpinfo" "github.com/aler9/rtsp-simple-server/internal/stats" ) const ( retryPause = 5 * time.Second - - codecH264 = 7 - codecAAC = 10 ) // Parent is implemeneted by path.Path. @@ -113,49 +110,22 @@ func (s *Source) run() { } } -func readMetadata(conn *rtmp.Conn) (flvio.AMFMap, error) { - pkt, err := conn.ReadPacket() - if err != nil { - return nil, err - } - - if pkt.Type != av.Metadata { - return nil, fmt.Errorf("first packet must be metadata") - } - - arr, err := flvio.ParseAMFVals(pkt.Data, false) - if err != nil { - return nil, err - } - - if len(arr) != 1 { - return nil, fmt.Errorf("invalid metadata") - } - - ma, ok := arr[0].(flvio.AMFMap) - if !ok { - return nil, fmt.Errorf("invalid metadata") - } - - return ma, nil -} - func (s *Source) runInner() bool { s.log(logger.Info, "connecting") - var conn *rtmp.Conn + var rconn *rtmp.Conn var nconn net.Conn var err error dialDone := make(chan struct{}, 1) go func() { defer close(dialDone) - conn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading) + rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading) }() select { + case <-dialDone: case <-s.terminate: return false - case <-dialDone: } if err != nil { @@ -163,133 +133,57 @@ func (s *Source) runInner() bool { return true } - var tracks gortsplib.Tracks - var videoTrack *gortsplib.Track - var videoRTCPSender *rtcpsender.RTCPSender - var h264Encoder *rtph264.Encoder - var audioTrack *gortsplib.Track - var audioRTCPSender *rtcpsender.RTCPSender - var aacEncoder *rtpaac.Encoder - - // configuration must be completed within readTimeout - nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) - - confDone := make(chan error) + confDone := make(chan struct{}) go func() { - confDone <- func() error { - md, err := readMetadata(conn) - if err != nil { - return err - } - - hasVideo := false - if v, ok := md.GetFloat64("videocodecid"); ok { - switch v { - case codecH264: - hasVideo = true - case 0: - default: - return fmt.Errorf("unsupported video codec %v", v) - } - - } - - hasAudio := false - if v, ok := md.GetFloat64("audiocodecid"); ok { - switch v { - case codecAAC: - hasAudio = true - case 0: - default: - return fmt.Errorf("unsupported audio codec %v", v) - } - } - - if !hasVideo && !hasAudio { - return fmt.Errorf("stream has no tracks") - } - - for { - var pkt av.Packet - pkt, err = conn.ReadPacket() - if err != nil { - return err - } - - switch pkt.Type { - case av.H264DecoderConfig: - if !hasVideo { - return fmt.Errorf("unexpected video packet") - } - if videoTrack != nil { - return fmt.Errorf("video track setupped twice") - } - - codec, err := h264.FromDecoderConfig(pkt.Data) - if err != nil { - return err - } - - videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0]) - if err != nil { - return err - } - - clockRate, _ := videoTrack.ClockRate() - videoRTCPSender = rtcpsender.New(clockRate) - - h264Encoder, err = rtph264.NewEncoder(96) - if err != nil { - return err - } - - tracks = append(tracks, videoTrack) - - case av.AACDecoderConfig: - if !hasAudio { - return fmt.Errorf("unexpected audio packet") - } - if audioTrack != nil { - return fmt.Errorf("audio track setupped twice") - } + defer close(confDone) + videoTrack, audioTrack, err = rtmpinfo.Info(rconn, nconn, s.readTimeout) + }() - audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data) - if err != nil { - return err - } + select { + case <-confDone: + case <-s.terminate: + nconn.Close() + <-confDone + return false + } - clockRate, _ := audioTrack.ClockRate() - audioRTCPSender = rtcpsender.New(clockRate) + if err != nil { + s.log(logger.Info, "ERR: %s", err) + return true + } - aacEncoder, err = rtpaac.NewEncoder(96, clockRate) - if err != nil { - return err - } + var tracks gortsplib.Tracks + var videoRTCPSender *rtcpsender.RTCPSender + var h264Encoder *rtph264.Encoder + var audioRTCPSender *rtcpsender.RTCPSender + var aacEncoder *rtpaac.Encoder - tracks = append(tracks, audioTrack) - } + if videoTrack != nil { + clockRate, _ := videoTrack.ClockRate() + h264Encoder, err = rtph264.NewEncoder(96) + if err != nil { + nconn.Close() + s.log(logger.Info, "ERR: %s", err) + return true + } - if (!hasVideo || videoTrack != nil) && - (!hasAudio || audioTrack != nil) { - return nil - } - } - }() - }() + videoRTCPSender = rtcpsender.New(clockRate) + tracks = append(tracks, videoTrack) + } - select { - case err := <-confDone: + if audioTrack != nil { + clockRate, _ := audioTrack.ClockRate() + aacEncoder, err = rtpaac.NewEncoder(96, clockRate) if err != nil { + nconn.Close() s.log(logger.Info, "ERR: %s", err) return true } - case <-s.terminate: - nconn.Close() - <-confDone - return false + audioRTCPSender = rtcpsender.New(clockRate) + tracks = append(tracks, audioTrack) } for i, t := range tracks { @@ -338,7 +232,7 @@ func (s *Source) runInner() bool { readerDone <- func() error { for { nconn.SetReadDeadline(time.Now().Add(s.readTimeout)) - pkt, err := conn.ReadPacket() + pkt, err := rconn.ReadPacket() if err != nil { return err } @@ -390,21 +284,21 @@ func (s *Source) runInner() bool { for { select { - case <-s.terminate: + case err := <-readerDone: nconn.Close() - <-readerDone + s.log(logger.Info, "ERR: %s", err) close(rtcpTerminate) <-rtcpDone - return false + return true - case err := <-readerDone: + case <-s.terminate: nconn.Close() - s.log(logger.Info, "ERR: %s", err) + <-readerDone close(rtcpTerminate) <-rtcpDone - return true + return false } } }