Browse Source

simplify code

pull/1341/head
aler9 3 years ago
parent
commit
455b8beff7
  1. 114
      internal/core/hls_muxer.go
  2. 131
      internal/core/rtmp_conn.go

114
internal/core/hls_muxer.go

@ -277,15 +277,66 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
if videoMedia != nil { if videoMedia != nil {
medias = append(medias, videoMedia) medias = append(medias, videoMedia)
videoStartPTSFilled := false
var videoStartPTS time.Duration
res.stream.readerAdd(m, videoMedia, videoFormat, func(dat data) { res.stream.readerAdd(m, videoMedia, videoFormat, func(dat data) {
m.ringBuffer.Push(dat) m.ringBuffer.Push(func() error {
tdata := dat.(*dataH264)
if tdata.nalus == nil {
return nil
}
if !videoStartPTSFilled {
videoStartPTSFilled = true
videoStartPTS = tdata.pts
}
pts := tdata.pts - videoStartPTS
err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
return nil
})
}) })
} }
if audioMedia != nil { if audioMedia != nil {
medias = append(medias, audioMedia) medias = append(medias, audioMedia)
audioStartPTSFilled := false
var audioStartPTS time.Duration
res.stream.readerAdd(m, audioMedia, audioFormat, func(dat data) { res.stream.readerAdd(m, audioMedia, audioFormat, func(dat data) {
m.ringBuffer.Push(dat) m.ringBuffer.Push(func() error {
tdata := dat.(*dataMPEG4Audio)
if tdata.aus == nil {
return nil
}
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tdata.pts
}
pts := tdata.pts - audioStartPTS
for i, au := range tdata.aus {
err := m.muxer.WriteAAC(
tdata.ntp,
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormat.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
return nil
})
}) })
} }
@ -296,12 +347,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
writerDone := make(chan error) writerDone := make(chan error)
go func() { go func() {
writerDone <- m.runWriter( writerDone <- m.runWriter()
videoMedia,
videoFormat,
audioMedia,
audioFormat,
)
}() }()
closeCheckTicker := time.NewTicker(closeCheckPeriod) closeCheckTicker := time.NewTicker(closeCheckPeriod)
@ -328,62 +374,16 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
} }
} }
func (m *hlsMuxer) runWriter( func (m *hlsMuxer) runWriter() error {
videoMedia *media.Media,
videoFormat *format.H264,
audioMedia *media.Media,
audioFormat *format.MPEG4Audio,
) error {
videoStartPTSFilled := false
var videoStartPTS time.Duration
audioStartPTSFilled := false
var audioStartPTS time.Duration
for { for {
item, ok := m.ringBuffer.Pull() item, ok := m.ringBuffer.Pull()
if !ok { if !ok {
return fmt.Errorf("terminated") return fmt.Errorf("terminated")
} }
data := item.(data)
switch tdata := data.(type) { err := item.(func() error)()
case *dataH264:
if tdata.nalus == nil {
continue
}
if !videoStartPTSFilled {
videoStartPTSFilled = true
videoStartPTS = tdata.pts
}
pts := tdata.pts - videoStartPTS
err := m.muxer.WriteH264(tdata.ntp, pts, tdata.nalus)
if err != nil { if err != nil {
return fmt.Errorf("muxer error: %v", err) return err
}
case *dataMPEG4Audio:
if tdata.aus == nil {
continue
}
if !audioStartPTSFilled {
audioStartPTSFilled = true
audioStartPTS = tdata.pts
}
pts := tdata.pts - audioStartPTS
for i, au := range tdata.aus {
err := m.muxer.WriteAAC(
tdata.ntp,
pts+time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/time.Duration(audioFormat.ClockRate()),
au)
if err != nil {
return fmt.Errorf("muxer error: %v", err)
}
}
} }
} }
} }

131
internal/core/rtmp_conn.go

@ -253,6 +253,8 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
var videoFormat *format.H264 var videoFormat *format.H264
videoMedia := res.stream.medias().FindFormat(&videoFormat) videoMedia := res.stream.medias().FindFormat(&videoFormat)
videoFirstIDRFound := false
var videoStartDTS time.Duration
var audioFormat *format.MPEG4Audio var audioFormat *format.MPEG4Audio
audioMedia := res.stream.medias().FindFormat(&audioFormat) audioMedia := res.stream.medias().FindFormat(&audioFormat)
@ -271,67 +273,16 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
if videoMedia != nil { if videoMedia != nil {
medias = append(medias, videoMedia) medias = append(medias, videoMedia)
res.stream.readerAdd(c, videoMedia, videoFormat, func(dat data) {
ringBuffer.Push(dat)
})
}
if audioMedia != nil {
medias = append(medias, audioMedia)
res.stream.readerAdd(c, audioMedia, audioFormat, func(dat data) {
ringBuffer.Push(dat)
})
}
defer res.stream.readerRemove(c)
c.log(logger.Info, "is reading from path '%s', %s",
path.Name(), sourceMediaInfo(medias))
if path.Conf().RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
path.Conf().RunOnRead,
path.Conf().RunOnReadRestart,
path.externalCmdEnv(),
func(co int) {
c.log(logger.Info, "runOnRead command exited with code %d", co)
})
defer func() {
onReadCmd.Close()
c.log(logger.Info, "runOnRead command stopped")
}()
}
err := c.conn.WriteTracks(videoFormat, audioFormat)
if err != nil {
return err
}
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})
videoStartPTSFilled := false videoStartPTSFilled := false
var videoStartPTS time.Duration var videoStartPTS time.Duration
audioStartPTSFilled := false
var audioStartPTS time.Duration
videoFirstIDRFound := false
var videoStartDTS time.Duration
var videoDTSExtractor *h264.DTSExtractor var videoDTSExtractor *h264.DTSExtractor
for { res.stream.readerAdd(c, videoMedia, videoFormat, func(dat data) {
item, ok := ringBuffer.Pull() ringBuffer.Push(func() error {
if !ok { tdata := dat.(*dataH264)
return fmt.Errorf("terminated")
}
data := item.(data)
switch tdata := data.(type) {
case *dataH264:
if tdata.nalus == nil { if tdata.nalus == nil {
continue return nil
} }
if !videoStartPTSFilled { if !videoStartPTSFilled {
@ -359,7 +310,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
// wait until we receive an IDR // wait until we receive an IDR
if !videoFirstIDRFound { if !videoFirstIDRFound {
if !idrPresent { if !idrPresent {
continue return nil
} }
videoFirstIDRFound = true videoFirstIDRFound = true
@ -376,7 +327,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
pts -= videoStartDTS pts -= videoStartDTS
} else { } else {
if !idrPresent && !nonIDRPresent { if !idrPresent && !nonIDRPresent {
continue return nil
} }
var err error var err error
@ -408,9 +359,23 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return err return err
} }
case *dataMPEG4Audio: return nil
})
})
}
if audioMedia != nil {
medias = append(medias, audioMedia)
audioStartPTSFilled := false
var audioStartPTS time.Duration
res.stream.readerAdd(c, audioMedia, audioFormat, func(dat data) {
ringBuffer.Push(func() error {
tdata := dat.(*dataMPEG4Audio)
if tdata.aus == nil { if tdata.aus == nil {
continue return nil
} }
if !audioStartPTSFilled { if !audioStartPTSFilled {
@ -421,12 +386,12 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
if videoFormat != nil { if videoFormat != nil {
if !videoFirstIDRFound { if !videoFirstIDRFound {
continue return nil
} }
pts -= videoStartDTS pts -= videoStartDTS
if pts < 0 { if pts < 0 {
continue return nil
} }
} }
@ -447,6 +412,50 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return err return err
} }
} }
return nil
})
})
}
defer res.stream.readerRemove(c)
c.log(logger.Info, "is reading from path '%s', %s",
path.Name(), sourceMediaInfo(medias))
if path.Conf().RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
c.externalCmdPool,
path.Conf().RunOnRead,
path.Conf().RunOnReadRestart,
path.externalCmdEnv(),
func(co int) {
c.log(logger.Info, "runOnRead command exited with code %d", co)
})
defer func() {
onReadCmd.Close()
c.log(logger.Info, "runOnRead command stopped")
}()
}
err := c.conn.WriteTracks(videoFormat, audioFormat)
if err != nil {
return err
}
// disable read deadline
c.nconn.SetReadDeadline(time.Time{})
for {
item, ok := ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
err := item.(func() error)()
if err != nil {
return err
} }
} }
} }

Loading…
Cancel
Save