Browse Source

print track codecs into logs

pull/1057/head
aler9 3 years ago
parent
commit
cb8aec8844
  1. 8
      internal/core/hls_muxer.go
  2. 2
      internal/core/hls_source.go
  3. 4
      internal/core/path.go
  4. 1
      internal/core/publisher.go
  5. 1
      internal/core/reader.go
  6. 26
      internal/core/rtmp_conn.go
  7. 2
      internal/core/rtmp_source.go
  8. 55
      internal/core/rtsp_session.go
  9. 2
      internal/core/rtsp_source.go
  10. 27
      internal/core/source.go

8
internal/core/hls_muxer.go

@ -349,6 +349,9 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -349,6 +349,9 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.path.readerPlay(pathReaderPlayReq{author: m})
m.log(logger.Info, "is converting into HLS, %s",
sourceTrackInfo(res.stream.tracks()))
writerDone := make(chan error)
go func() {
writerDone <- func() error {
@ -541,11 +544,6 @@ func (m *hlsMuxer) apiHLSMuxersList(req hlsServerAPIMuxersListSubReq) { @@ -541,11 +544,6 @@ func (m *hlsMuxer) apiHLSMuxersList(req hlsServerAPIMuxersListSubReq) {
}
}
// onReaderAccepted implements reader.
func (m *hlsMuxer) onReaderAccepted() {
m.log(logger.Info, "is converting into HLS")
}
// onReaderData implements reader.
func (m *hlsMuxer) onReaderData(data *data) {
m.ringBuffer.Push(data)

2
internal/core/hls_source.go

@ -72,7 +72,7 @@ func (s *hlsSource) run(ctx context.Context) error { @@ -72,7 +72,7 @@ func (s *hlsSource) run(ctx context.Context) error {
return res.err
}
s.Log(logger.Info, "ready")
s.Log(logger.Info, "proxying %s", sourceTrackInfo(tracks))
stream = res.stream
return nil

4
internal/core/path.go

@ -826,8 +826,6 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) { @@ -826,8 +826,6 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
return
}
req.author.onPublisherAccepted(len(req.tracks))
if pa.hasOnDemandPublisher() {
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherReadyTimer = newEmptyTimer()
@ -933,8 +931,6 @@ func (pa *path) handleReaderPlay(req pathReaderPlayReq) { @@ -933,8 +931,6 @@ func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
pa.stream.readerAdd(req.author)
req.author.onReaderAccepted()
close(req.res)
}

1
internal/core/publisher.go

@ -4,5 +4,4 @@ package core @@ -4,5 +4,4 @@ package core
type publisher interface {
source
close()
onPublisherAccepted(tracksLen int)
}

1
internal/core/reader.go

@ -3,7 +3,6 @@ package core @@ -3,7 +3,6 @@ package core
// reader is an entity that can read a stream.
type reader interface {
close()
onReaderAccepted()
onReaderData(*data)
apiReaderDescribe() interface{}
}

26
internal/core/rtmp_conn.go

@ -305,6 +305,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -305,6 +305,10 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
author: c,
})
c.log(logger.Info, "is reading from path '%s', %s",
c.path.Name(),
sourceTrackInfo(res.stream.tracks()))
if c.path.Conf().RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
@ -545,6 +549,10 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -545,6 +549,10 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
return rres.err
}
c.log(logger.Info, "is publishing to path '%s', %s",
c.path.Name(),
sourceTrackInfo(tracks))
for {
c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout)))
msg, err := c.conn.ReadMessage()
@ -673,11 +681,6 @@ func (c *rtmpConn) authenticate( @@ -673,11 +681,6 @@ func (c *rtmpConn) authenticate(
return nil
}
// onReaderAccepted implements reader.
func (c *rtmpConn) onReaderAccepted() {
c.log(logger.Info, "is reading from path '%s'", c.path.Name())
}
// onReaderData implements reader.
func (c *rtmpConn) onReaderData(data *data) {
c.ringBuffer.Push(data)
@ -698,16 +701,3 @@ func (c *rtmpConn) apiSourceDescribe() interface{} { @@ -698,16 +701,3 @@ func (c *rtmpConn) apiSourceDescribe() interface{} {
ID string `json:"id"`
}{"rtmpConn", c.id}
}
// onPublisherAccepted implements publisher.
func (c *rtmpConn) onPublisherAccepted(tracksLen int) {
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}())
}

2
internal/core/rtmp_source.go

@ -113,7 +113,7 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -113,7 +113,7 @@ func (s *rtmpSource) run(ctx context.Context) error {
return res.err
}
s.Log(logger.Info, "ready")
s.Log(logger.Info, "proxying %s", sourceTrackInfo(tracks))
defer func() {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})

55
internal/core/rtsp_session.go

@ -40,11 +40,11 @@ type rtspSession struct { @@ -40,11 +40,11 @@ type rtspSession struct {
created time.Time
path *path
stream *stream
state gortsplib.ServerSessionState
stateMutex sync.Mutex
onReadCmd *externalcmd.Cmd // read
announcedTracks gortsplib.Tracks // publish
stream *stream // publish
}
func newRTSPSession(
@ -109,13 +109,14 @@ func (s *rtspSession) onClose(err error) { @@ -109,13 +109,14 @@ func (s *rtspSession) onClose(err error) {
switch s.ss.State() {
case gortsplib.ServerSessionStatePrePlay, gortsplib.ServerSessionStatePlay:
s.path.readerRemove(pathReaderRemoveReq{author: s})
s.path = nil
case gortsplib.ServerSessionStatePreRecord, gortsplib.ServerSessionStateRecord:
s.path.publisherRemove(pathPublisherRemoveReq{author: s})
s.path = nil
}
s.path = nil
s.stream = nil
s.log(logger.Info, "destroyed (%v)", err)
}
@ -218,6 +219,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -218,6 +219,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
s.path = res.path
s.stream = res.stream
if ctx.TrackID >= len(res.stream.tracks()) {
return &base.Response{
@ -247,6 +249,18 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -247,6 +249,18 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
if s.ss.State() == gortsplib.ServerSessionStatePrePlay {
s.path.readerPlay(pathReaderPlayReq{author: s})
tracks := make(gortsplib.Tracks, len(s.ss.SetuppedTracks()))
n := 0
for id := range s.ss.SetuppedTracks() {
tracks[n] = s.stream.tracks()[id]
n++
}
s.log(logger.Info, "is reading from path '%s', with %s, %s",
s.path.Name(),
s.ss.SetuppedTransport(),
sourceTrackInfo(tracks))
if s.path.Conf().RunOnRead != "" {
s.log(logger.Info, "runOnRead command started")
s.onReadCmd = externalcmd.NewCmd(
@ -283,6 +297,11 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -283,6 +297,11 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}, res.err
}
s.log(logger.Info, "is publishing to path '%s', with %s, %s",
s.path.Name(),
s.ss.SetuppedTransport(),
sourceTrackInfo(s.announcedTracks))
s.stream = res.stream
s.stateMutex.Lock()
@ -322,22 +341,6 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res @@ -322,22 +341,6 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
}, nil
}
// onReaderAccepted implements reader.
func (s *rtspSession) onReaderAccepted() {
tracksLen := len(s.ss.SetuppedTracks())
s.log(logger.Info, "is reading from path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.ss.SetuppedTransport())
}
// onReaderData implements reader.
func (s *rtspSession) onReaderData(data *data) {
// packets are routed to the session by gortsplib.ServerStream.
@ -373,20 +376,6 @@ func (s *rtspSession) apiSourceDescribe() interface{} { @@ -373,20 +376,6 @@ func (s *rtspSession) apiSourceDescribe() interface{} {
}{typ, s.id}
}
// onPublisherAccepted implements publisher.
func (s *rtspSession) onPublisherAccepted(tracksLen int) {
s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.ss.SetuppedTransport())
}
// onPacketRTP is called by rtspServer.
func (s *rtspSession) onPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx) {
if ctx.H264NALUs != nil {

2
internal/core/rtsp_source.go

@ -133,7 +133,7 @@ func (s *rtspSource) run(ctx context.Context) error { @@ -133,7 +133,7 @@ func (s *rtspSource) run(ctx context.Context) error {
return res.err
}
s.Log(logger.Info, "ready")
s.Log(logger.Info, "proxying %s", sourceTrackInfo(tracks))
defer func() {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})

27
internal/core/source.go

@ -1,5 +1,13 @@ @@ -1,5 +1,13 @@
package core
import (
"fmt"
"reflect"
"strings"
"github.com/aler9/gortsplib"
)
// source is an entity that can provide a stream.
// it can be:
// - a publisher
@ -8,3 +16,22 @@ package core @@ -8,3 +16,22 @@ package core
type source interface {
apiSourceDescribe() interface{}
}
func sourceTrackInfo(tracks gortsplib.Tracks) string {
trackCodecs := make([]string, len(tracks))
for i, t := range tracks {
n := reflect.TypeOf(t).Elem().Name()
n = n[len("Track"):]
trackCodecs[i] = n
}
return fmt.Sprintf("%d %s (%s)",
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}(),
strings.Join(trackCodecs, ", "))
}

Loading…
Cancel
Save