From 83484b1e82966e3818b2dea1869db72523b15e38 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 30 Jul 2023 23:28:54 +0200 Subject: [PATCH] update gohlslib (#2125) --- go.mod | 6 ++-- go.sum | 12 +++---- internal/core/hls_manager.go | 12 +++---- internal/core/hls_muxer.go | 34 +++++++++---------- internal/core/hls_source.go | 13 ++++++-- internal/core/mpegts_buffered_reader.go | 43 ------------------------- internal/core/udp_source.go | 11 +++---- 7 files changed, 48 insertions(+), 83 deletions(-) delete mode 100644 internal/core/mpegts_buffered_reader.go diff --git a/go.mod b/go.mod index d098d00b..88d2ffb5 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v0.11.0 github.com/alecthomas/kong v0.8.0 - github.com/asticode/go-astits v1.11.0 - github.com/bluenviron/gohlslib v0.3.0 + github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 + github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 github.com/bluenviron/gortsplib/v3 v3.9.0 - github.com/bluenviron/mediacommon v0.7.0 + github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 github.com/google/uuid v1.3.0 diff --git a/go.sum b/go.sum index c01ba971..2601f663 100644 --- a/go.sum +++ b/go.sum @@ -8,14 +8,14 @@ github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9T github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= -github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng= -github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= -github.com/bluenviron/gohlslib v0.3.0 h1:ze8cCKszGC2LAWp0B+qIXZIlCZocB7a3BKeBo9E8Sr0= -github.com/bluenviron/gohlslib v0.3.0/go.mod h1:aO69Vu0mMUxWrLmgS6g/S3Y3sfAhyg2SXaMEL7yNlWc= +github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 h1:SCAqalLhgKGDghGz03yYVWr8TavHluP/i7IwshKU9yA= +github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= +github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 h1:pAbC7frXsTMxP7Ck3E50hl7oFeSeD2dgc2lWjmHXztQ= +github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072/go.mod h1:rK4b161qErs82QqvBEl84vpi2xhdZBUT0yubXuytZ7E= github.com/bluenviron/gortsplib/v3 v3.9.0 h1:aAHV6MhsDtgBF6yKaNBBCdvtSpLB8ne4kyUfLQlN7nM= github.com/bluenviron/gortsplib/v3 v3.9.0/go.mod h1:5h3Zu7jkzwDknYrf+89q2saab//oioKgM9mgvBEX3pg= -github.com/bluenviron/mediacommon v0.7.0 h1:dJWLLL9oDbAqfK8KuNfnDUQwNbeMAtGeRjZc9Vo95js= -github.com/bluenviron/mediacommon v0.7.0/go.mod h1:wuLJdxcITiSPgY1MvQqrX+qPlKmNfeV9wNvXth5M98I= +github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda h1:+ungCWRNDjsy/CVL1l/UjAj4vYL4+NIJQoJJWbR3Xw8= +github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda/go.mod h1:tfk0qGPhqnOxVCrElu8ct3LKQn6Cj4Tpu3zbbJBTKj4= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= diff --git a/internal/core/hls_manager.go b/internal/core/hls_manager.go index 0be20123..cd8111b1 100644 --- a/internal/core/hls_manager.go +++ b/internal/core/hls_manager.go @@ -57,7 +57,7 @@ type hlsManager struct { chPathReady chan *path chPathNotReady chan *path chHandleRequest chan hlsMuxerHandleRequestReq - chMuxerClose chan *hlsMuxer + chCloseMuxer chan *hlsMuxer chAPIMuxerList chan hlsManagerAPIMuxersListReq chAPIMuxerGet chan hlsManagerAPIMuxersGetReq } @@ -104,7 +104,7 @@ func newHLSManager( chPathReady: make(chan *path), chPathNotReady: make(chan *path), chHandleRequest: make(chan hlsMuxerHandleRequestReq), - chMuxerClose: make(chan *hlsMuxer), + chCloseMuxer: make(chan *hlsMuxer), chAPIMuxerList: make(chan hlsManagerAPIMuxersListReq), chAPIMuxerGet: make(chan hlsManagerAPIMuxersGetReq), } @@ -182,7 +182,7 @@ outer: r.processRequest(&req) } - case c := <-m.chMuxerClose: + case c := <-m.chCloseMuxer: if c2, ok := m.muxers[c.PathName()]; !ok || c2 != c { continue } @@ -250,10 +250,10 @@ func (m *hlsManager) createMuxer(pathName string, remoteAddr string) *hlsMuxer { return r } -// muxerClose is called by hlsMuxer. -func (m *hlsManager) muxerClose(c *hlsMuxer) { +// closeMuxer is called by hlsMuxer. +func (m *hlsManager) closeMuxer(c *hlsMuxer) { select { - case m.chMuxerClose <- c: + case m.chCloseMuxer <- c: case <-m.ctx.Done(): } } diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index f488ea6b..077e0b54 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -54,7 +54,7 @@ type hlsMuxerHandleRequestReq struct { type hlsMuxerParent interface { logger.Writer - muxerClose(*hlsMuxer) + closeMuxer(*hlsMuxer) } type hlsMuxer struct { @@ -228,7 +228,7 @@ func (m *hlsMuxer) run() { m.clearQueuedRequests() - m.parent.muxerClose(m) + m.parent.closeMuxer(m) m.Log(logger.Info, "destroyed (%v)", err) } @@ -340,8 +340,8 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*media.Media, *gohls videoMedia := stream.Medias().FindFormat(&videoFormatH265) if videoFormatH265 != nil { - videoStartPTSFilled := false - var videoStartPTS time.Duration + startPTSFilled := false + var startPTS time.Duration stream.AddReader(m, videoMedia, videoFormatH265, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { @@ -351,12 +351,12 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*media.Media, *gohls return nil } - if !videoStartPTSFilled { - videoStartPTSFilled = true - videoStartPTS = tunit.PTS + if !startPTSFilled { + startPTSFilled = true + startPTS = tunit.PTS } - pts := tunit.PTS - videoStartPTS + pts := tunit.PTS - startPTS err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil { return fmt.Errorf("muxer error: %v", err) @@ -381,8 +381,8 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*media.Media, *gohls videoMedia = stream.Medias().FindFormat(&videoFormatH264) if videoFormatH264 != nil { - videoStartPTSFilled := false - var videoStartPTS time.Duration + startPTSFilled := false + var startPTS time.Duration stream.AddReader(m, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) { m.ringBuffer.Push(func() error { @@ -392,12 +392,12 @@ func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*media.Media, *gohls return nil } - if !videoStartPTSFilled { - videoStartPTSFilled = true - videoStartPTS = tunit.PTS + if !startPTSFilled { + startPTSFilled = true + startPTS = tunit.PTS } - pts := tunit.PTS - videoStartPTS + pts := tunit.PTS - startPTS err := m.muxer.WriteH26x(tunit.NTP, pts, tunit.AU) if err != nil { return fmt.Errorf("muxer error: %v", err) @@ -440,8 +440,8 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*media.Media, *gohls audioStartPTSFilled = true audioStartPTS = tunit.PTS } - pts := tunit.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS err := m.muxer.WriteMPEG4Audio( tunit.NTP, pts, @@ -483,8 +483,8 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*media.Media, *gohls audioStartPTSFilled = true audioStartPTS = tunit.PTS } - pts := tunit.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS err := m.muxer.WriteMPEG4Audio( tunit.NTP, pts, @@ -519,8 +519,8 @@ func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*media.Media, *gohls audioStartPTSFilled = true audioStartPTS = tunit.PTS } - pts := tunit.PTS - audioStartPTS + pts := tunit.PTS - audioStartPTS err := m.muxer.WriteOpus( tunit.NTP, pts, diff --git a/internal/core/hls_source.go b/internal/core/hls_source.go index 38929563..9aafbca3 100644 --- a/internal/core/hls_source.go +++ b/internal/core/hls_source.go @@ -55,8 +55,17 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan TLSClientConfig: tlsConfigForFingerprint(cnf.SourceFingerprint), }, }, - Log: func(level gohlslib.LogLevel, format string, args ...interface{}) { - s.Log(logger.Level(level), format, args...) + OnDownloadPrimaryPlaylist: func(u string) { + s.Log(logger.Debug, "downloading primary playlist %u", u) + }, + OnDownloadStreamPlaylist: func(u string) { + s.Log(logger.Debug, "downloading stream playlist %u", u) + }, + OnDownloadSegment: func(u string) { + s.Log(logger.Debug, "downloading segment %u", u) + }, + OnDecodeError: func(err error) { + s.Log(logger.Warn, err.Error()) }, } diff --git a/internal/core/mpegts_buffered_reader.go b/internal/core/mpegts_buffered_reader.go deleted file mode 100644 index 46e420d0..00000000 --- a/internal/core/mpegts_buffered_reader.go +++ /dev/null @@ -1,43 +0,0 @@ -package core - -import ( - "fmt" - "io" -) - -// mpegtsBufferedReader is a buffered reader optimized for MPEG-TS. -type mpegtsBufferedReader struct { - r io.Reader - midbuf []byte - midbufpos int -} - -func newMPEGTSBufferedReader(r io.Reader) *mpegtsBufferedReader { - return &mpegtsBufferedReader{ - r: r, - midbuf: make([]byte, 0, 1500), - } -} - -// Read implements io.Reader. -func (r *mpegtsBufferedReader) Read(p []byte) (int, error) { - if r.midbufpos < len(r.midbuf) { - n := copy(p, r.midbuf[r.midbufpos:]) - r.midbufpos += n - return n, nil - } - - mn, err := r.r.Read(r.midbuf[:cap(r.midbuf)]) - if err != nil { - return 0, err - } - - if (mn % 188) != 0 { - return 0, fmt.Errorf("received packet with size %d not multiple of 188", mn) - } - - r.midbuf = r.midbuf[:mn] - n := copy(p, r.midbuf) - r.midbufpos = n - return n, nil -} diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 3780fb02..b28ff112 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -47,17 +47,17 @@ func joinMulticastGroupOnAtLeastOneInterface(p *ipv4.PacketConn, listenIP net.IP } type packetConnReader struct { - pc net.PacketConn + net.PacketConn } func newPacketConnReader(pc net.PacketConn) *packetConnReader { return &packetConnReader{ - pc: pc, + PacketConn: pc, } } func (r *packetConnReader) Read(p []byte) (int, error) { - n, _, err := r.pc.ReadFrom(p) + n, _, err := r.PacketConn.ReadFrom(p) return n, err } @@ -116,7 +116,6 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.Pa } readerErr := make(chan error) - go func() { readerErr <- s.runReader(pc) }() @@ -134,7 +133,7 @@ func (s *udpSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.Pa func (s *udpSource) runReader(pc net.PacketConn) error { pc.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) - r, err := mpegts.NewReader(newMPEGTSBufferedReader(newPacketConnReader(pc))) + r, err := mpegts.NewReader(mpegts.NewBufferedReader(newPacketConnReader(pc))) if err != nil { return err } @@ -150,7 +149,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return td.Decode(t) } - for _, track := range r.Tracks() { + for _, track := range r.Tracks() { //nolint:dupl var medi *media.Media switch tcodec := track.Codec.(type) {