From 455b8beff7a2886a285e81642af84aae722230ae Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Tue, 27 Dec 2022 18:01:58 +0100 Subject: [PATCH] simplify code --- internal/core/hls_muxer.go | 116 ++++++++-------- internal/core/rtmp_conn.go | 271 +++++++++++++++++++------------------ 2 files changed, 198 insertions(+), 189 deletions(-) diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 53e9d86c..f45a9cf8 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -277,15 +277,66 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) if videoMedia != nil { medias = append(medias, videoMedia) + videoStartPTSFilled := false + var videoStartPTS time.Duration + res.stream.readerAdd(m, videoMedia, videoFormat, func(dat data) { - m.ringBuffer.Push(dat) + m.ringBuffer.Push(func() error { + tdata := dat.(*dataH264) + + if tdata.nalus == nil { + return nil + } + + if !videoStartPTSFilled { + videoStartPTSFilled = true + videoStartPTS = tdata.pts + } + pts := tdata.pts - videoStartPTS + + err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } + + return nil + }) }) } + if audioMedia != nil { medias = append(medias, audioMedia) + audioStartPTSFilled := false + var audioStartPTS time.Duration + res.stream.readerAdd(m, audioMedia, audioFormat, func(dat data) { - m.ringBuffer.Push(dat) + m.ringBuffer.Push(func() error { + tdata := dat.(*dataMPEG4Audio) + + if tdata.aus == nil { + return nil + } + + if !audioStartPTSFilled { + audioStartPTSFilled = true + audioStartPTS = tdata.pts + } + pts := tdata.pts - audioStartPTS + + for i, au := range tdata.aus { + err := m.muxer.WriteAAC( + tdata.ntp, + pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/time.Duration(audioFormat.ClockRate()), + au) + if err != nil { + return fmt.Errorf("muxer error: %v", err) + } + } + + return nil + }) }) } @@ -296,12 +347,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) writerDone := make(chan error) go func() { - writerDone <- m.runWriter( - videoMedia, - videoFormat, - audioMedia, - audioFormat, - ) + writerDone <- m.runWriter() }() closeCheckTicker := time.NewTicker(closeCheckPeriod) @@ -328,62 +374,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) } } -func (m *hlsMuxer) runWriter( - videoMedia *media.Media, - videoFormat *format.H264, - audioMedia *media.Media, - audioFormat *format.MPEG4Audio, -) error { - videoStartPTSFilled := false - var videoStartPTS time.Duration - audioStartPTSFilled := false - var audioStartPTS time.Duration - +func (m *hlsMuxer) runWriter() error { for { item, ok := m.ringBuffer.Pull() if !ok { return fmt.Errorf("terminated") } - data := item.(data) - switch tdata := data.(type) { - case *dataH264: - if tdata.nalus == nil { - continue - } - - if !videoStartPTSFilled { - videoStartPTSFilled = true - videoStartPTS = tdata.pts - } - pts := tdata.pts - videoStartPTS - - err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } - - case *dataMPEG4Audio: - if tdata.aus == nil { - continue - } - - if !audioStartPTSFilled { - audioStartPTSFilled = true - audioStartPTS = tdata.pts - } - pts := tdata.pts - audioStartPTS - - for i, au := range tdata.aus { - err := m.muxer.WriteAAC( - tdata.ntp, - pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormat.ClockRate()), - au) - if err != nil { - return fmt.Errorf("muxer error: %v", err) - } - } + err := item.(func() error)() + if err != nil { + return err } } } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index cdcd5e64..21e7445d 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -253,6 +253,8 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { var videoFormat *format.H264 videoMedia := res.stream.medias().FindFormat(&videoFormat) + videoFirstIDRFound := false + var videoStartDTS time.Duration var audioFormat *format.MPEG4Audio audioMedia := res.stream.medias().FindFormat(&audioFormat) @@ -271,15 +273,148 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { if videoMedia != nil { medias = append(medias, videoMedia) + videoStartPTSFilled := false + var videoStartPTS time.Duration + var videoDTSExtractor *h264.DTSExtractor + res.stream.readerAdd(c, videoMedia, videoFormat, func(dat data) { - ringBuffer.Push(dat) + ringBuffer.Push(func() error { + tdata := dat.(*dataH264) + + if tdata.nalus == nil { + return nil + } + + if !videoStartPTSFilled { + videoStartPTSFilled = true + videoStartPTS = tdata.pts + } + pts := tdata.pts - videoStartPTS + + idrPresent := false + nonIDRPresent := false + + for _, nalu := range tdata.nalus { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } + } + + var dts time.Duration + + // wait until we receive an IDR + if !videoFirstIDRFound { + if !idrPresent { + return nil + } + + videoFirstIDRFound = true + videoDTSExtractor = h264.NewDTSExtractor() + + var err error + dts, err = videoDTSExtractor.Extract(tdata.nalus, pts) + if err != nil { + return err + } + + videoStartDTS = dts + dts = 0 + pts -= videoStartDTS + } else { + if !idrPresent && !nonIDRPresent { + return nil + } + + var err error + dts, err = videoDTSExtractor.Extract(tdata.nalus, pts) + if err != nil { + return err + } + + dts -= videoStartDTS + pts -= videoStartDTS + } + + avcc, err := h264.AVCCMarshal(tdata.nalus) + if err != nil { + return err + } + + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = c.conn.WriteMessage(&message.MsgVideo{ + ChunkStreamID: message.MsgVideoChunkStreamID, + MessageStreamID: 0x1000000, + IsKeyFrame: idrPresent, + H264Type: flvio.AVC_NALU, + Payload: avcc, + DTS: dts, + PTSDelta: pts - dts, + }) + if err != nil { + return err + } + + return nil + }) }) } + if audioMedia != nil { medias = append(medias, audioMedia) + audioStartPTSFilled := false + var audioStartPTS time.Duration + res.stream.readerAdd(c, audioMedia, audioFormat, func(dat data) { - ringBuffer.Push(dat) + ringBuffer.Push(func() error { + tdata := dat.(*dataMPEG4Audio) + + if tdata.aus == nil { + return nil + } + + if !audioStartPTSFilled { + audioStartPTSFilled = true + audioStartPTS = tdata.pts + } + pts := tdata.pts - audioStartPTS + + if videoFormat != nil { + if !videoFirstIDRFound { + return nil + } + + pts -= videoStartDTS + if pts < 0 { + return nil + } + } + + for i, au := range tdata.aus { + c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err := c.conn.WriteMessage(&message.MsgAudio{ + ChunkStreamID: message.MsgAudioChunkStreamID, + MessageStreamID: 0x1000000, + Rate: flvio.SOUND_44Khz, + Depth: flvio.SOUND_16BIT, + Channels: flvio.SOUND_STEREO, + AACType: flvio.AAC_RAW, + Payload: au, + DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* + time.Second/time.Duration(audioFormat.ClockRate()), + }) + if err != nil { + return err + } + } + + return nil + }) }) } @@ -312,141 +447,15 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { // disable read deadline c.nconn.SetReadDeadline(time.Time{}) - videoStartPTSFilled := false - var videoStartPTS time.Duration - audioStartPTSFilled := false - var audioStartPTS time.Duration - - videoFirstIDRFound := false - var videoStartDTS time.Duration - var videoDTSExtractor *h264.DTSExtractor - for { item, ok := ringBuffer.Pull() if !ok { return fmt.Errorf("terminated") } - data := item.(data) - - switch tdata := data.(type) { - case *dataH264: - if tdata.nalus == nil { - continue - } - - if !videoStartPTSFilled { - videoStartPTSFilled = true - videoStartPTS = tdata.pts - } - pts := tdata.pts - videoStartPTS - - idrPresent := false - nonIDRPresent := false - - for _, nalu := range tdata.nalus { - typ := h264.NALUType(nalu[0] & 0x1F) - switch typ { - case h264.NALUTypeIDR: - idrPresent = true - - case h264.NALUTypeNonIDR: - nonIDRPresent = true - } - } - - var dts time.Duration - - // wait until we receive an IDR - if !videoFirstIDRFound { - if !idrPresent { - continue - } - - videoFirstIDRFound = true - videoDTSExtractor = h264.NewDTSExtractor() - - var err error - dts, err = videoDTSExtractor.Extract(tdata.nalus, pts) - if err != nil { - return err - } - - videoStartDTS = dts - dts = 0 - pts -= videoStartDTS - } else { - if !idrPresent && !nonIDRPresent { - continue - } - - var err error - dts, err = videoDTSExtractor.Extract(tdata.nalus, pts) - if err != nil { - return err - } - - dts -= videoStartDTS - pts -= videoStartDTS - } - avcc, err := h264.AVCCMarshal(tdata.nalus) - if err != nil { - return err - } - - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err = c.conn.WriteMessage(&message.MsgVideo{ - ChunkStreamID: message.MsgVideoChunkStreamID, - MessageStreamID: 0x1000000, - IsKeyFrame: idrPresent, - H264Type: flvio.AVC_NALU, - Payload: avcc, - DTS: dts, - PTSDelta: pts - dts, - }) - if err != nil { - return err - } - - case *dataMPEG4Audio: - if tdata.aus == nil { - continue - } - - if !audioStartPTSFilled { - audioStartPTSFilled = true - audioStartPTS = tdata.pts - } - pts := tdata.pts - audioStartPTS - - if videoFormat != nil { - if !videoFirstIDRFound { - continue - } - - pts -= videoStartDTS - if pts < 0 { - continue - } - } - - for i, au := range tdata.aus { - c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) - err := c.conn.WriteMessage(&message.MsgAudio{ - ChunkStreamID: message.MsgAudioChunkStreamID, - MessageStreamID: 0x1000000, - Rate: flvio.SOUND_44Khz, - Depth: flvio.SOUND_16BIT, - Channels: flvio.SOUND_STEREO, - AACType: flvio.AAC_RAW, - Payload: au, - DTS: pts + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit* - time.Second/time.Duration(audioFormat.ClockRate()), - }) - if err != nil { - return err - } - } + err := item.(func() error)() + if err != nil { + return err } } }