Browse Source

add RTP-Info also in case of RTMP sources (#233)

pull/340/head
aler9 4 years ago
parent
commit
37c24f9edd
  1. 2
      go.mod
  2. 4
      go.sum
  3. 49
      internal/client/client.go
  4. 20
      internal/clientrtmp/client.go
  5. 56
      internal/clientrtsp/client.go
  6. 147
      internal/path/path.go
  7. 13
      internal/rtcpsenderset/rtcpsenderset.go
  8. 43
      internal/source/source.go
  9. 47
      internal/sourcertmp/source.go
  10. 42
      internal/sourcertsp/source.go
  11. 54
      internal/streamproc/streamproc.go
  12. 47
      main_clientrtmp_test.go
  13. 13
      main_clientrtsp_test.go
  14. 47
      main_sourcertmp_test.go
  15. 46
      main_sourcertsp_test.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210321185707-9c9a65124379
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210321185707-9c9a65124379 h1:DKQVTX1VxD/ebCJ3Z/I2nOoaZJxYtrHDXAcTTANBdoI=
github.com/aler9/gortsplib v0.0.0-20210321185707-9c9a65124379/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027 h1:cJkd74/wqKvjAUmvIoBElY12m+R2I0Pzk0UR14xyT0c=
github.com/aler9/gortsplib v0.0.0-20210322195141-1a50a3a5a027/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

49
internal/client/client.go

@ -8,6 +8,7 @@ import ( @@ -8,6 +8,7 @@ import (
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/source"
)
// ErrNoOnePublishing is a "no one is publishing" error.
@ -40,14 +41,26 @@ func (ErrAuthCritical) Error() string { @@ -40,14 +41,26 @@ func (ErrAuthCritical) Error() string {
return "critical authentication error"
}
// DescribeRes is a client describe response.
// Path is implemented by path.Path.
type Path interface {
Name() string
Conf() *conf.PathConf
OnClientRemove(RemoveReq)
OnClientPlay(PlayReq)
OnClientRecord(RecordReq)
OnClientPause(PauseReq)
OnSetStartingPoint(source.SetStartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// DescribeRes is a describe response.
type DescribeRes struct {
SDP []byte
Redirect string
Err error
}
// DescribeReq is a client describe request.
// DescribeReq is a describe request.
type DescribeReq struct {
Client Client
PathName string
@ -70,13 +83,13 @@ type SetupPlayReq struct { @@ -70,13 +83,13 @@ type SetupPlayReq struct {
Res chan SetupPlayRes
}
// AnnounceRes is a client announce response.
// AnnounceRes is a announce response.
type AnnounceRes struct {
Path Path
Err error
}
// AnnounceReq is a client announce request.
// AnnounceReq is a announce request.
type AnnounceReq struct {
Client Client
PathName string
@ -91,16 +104,9 @@ type RemoveReq struct { @@ -91,16 +104,9 @@ type RemoveReq struct {
Res chan struct{}
}
// TrackStartingPoint is the starting point of a track.
type TrackStartingPoint struct {
Filled bool // used by clientrtsp to avoid mutexes
SequenceNumber uint16
Timestamp uint32
}
// PlayRes is a play response.
type PlayRes struct {
TrackStartingPoints []*TrackStartingPoint
TrackStartingPoints []source.TrackStartingPoint
}
// PlayReq is a play request.
@ -121,25 +127,6 @@ type PauseReq struct { @@ -121,25 +127,6 @@ type PauseReq struct {
Res chan struct{}
}
// StartingPointReq is a starting point request.
type StartingPointReq struct {
Client Client
TrackID int
SP *TrackStartingPoint
}
// Path is implemented by path.Path.
type Path interface {
Name() string
Conf() *conf.PathConf
OnClientRemove(RemoveReq)
OnClientPlay(PlayReq)
OnClientRecord(RecordReq)
OnClientPause(PauseReq)
OnClientStartingPoint(StartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// Client is implemented by all client*.
type Client interface {
IsClient()

20
internal/clientrtmp/client.go

@ -23,8 +23,11 @@ import ( @@ -23,8 +23,11 @@ import (
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtcpsenderset"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
)
const (
@ -512,12 +515,19 @@ func (c *Client) runPublish() { @@ -512,12 +515,19 @@ func (c *Client) runPublish() {
}
}(path)
sp := streamproc.New(c, path, make([]source.TrackStartingPoint, len(tracks)))
readerDone := make(chan error)
go func() {
readerDone <- func() error {
rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, path.OnFrame)
rtcpSenders := rtcpsenderset.New(tracks, path.OnFrame)
defer rtcpSenders.Close()
onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
sp.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
}
for {
c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
pkt, err := c.conn.ReadPacket()
@ -547,9 +557,7 @@ func (c *Client) runPublish() { @@ -547,9 +557,7 @@ func (c *Client) runPublish() {
}
for _, frame := range frames {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
path.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, frame)
onFrame(videoTrack.ID, frame)
}
}
@ -566,9 +574,7 @@ func (c *Client) runPublish() { @@ -566,9 +574,7 @@ func (c *Client) runPublish() {
return err
}
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
path.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, frame)
onFrame(audioTrack.ID, frame)
default:
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)

56
internal/clientrtsp/client.go

@ -15,12 +15,13 @@ import ( @@ -15,12 +15,13 @@ import (
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/liberrors"
"github.com/pion/rtp"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
)
const (
@ -66,17 +67,18 @@ type Client struct { @@ -66,17 +67,18 @@ type Client struct {
conn *gortsplib.ServerConn
parent Parent
path client.Path
trackStartingPoints []*client.TrackStartingPoint
authUser string
authPass string
authValidator *auth.Validator
authFailures int
path client.Path
authUser string
authPass string
authValidator *auth.Validator
authFailures int
// read only
onReadCmd *externalcmd.Cmd
trackStartingPoints []source.TrackStartingPoint
onReadCmd *externalcmd.Cmd
// publish only
sp *streamproc.StreamProc
onPublishCmd *externalcmd.Cmd
// in
@ -236,11 +238,6 @@ func (c *Client) run() { @@ -236,11 +238,6 @@ func (c *Client) run() {
c.path = res.Path
c.trackStartingPoints = make([]*client.TrackStartingPoint, len(ctx.Tracks))
for i := range ctx.Tracks {
c.trackStartingPoints[i] = &client.TrackStartingPoint{}
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
@ -327,8 +324,12 @@ func (c *Client) run() { @@ -327,8 +324,12 @@ func (c *Client) run() {
// add RTP-Info
var ri headers.RTPInfo
for id, v := range c.trackStartingPoints {
if v == nil {
for trackID, v := range c.trackStartingPoints {
if !v.Filled {
continue
}
if _, ok := c.conn.SetuppedTracks()[trackID]; !ok {
continue
}
@ -338,7 +339,7 @@ func (c *Client) run() { @@ -338,7 +339,7 @@ func (c *Client) run() {
Host: ctx.Req.URL.Host,
Path: "/" + c.path.Name(),
}
u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(id), 10))
u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(trackID), 10))
ri = append(ri, &headers.RTPInfoEntry{
URL: u,
@ -401,24 +402,7 @@ func (c *Client) run() { @@ -401,24 +402,7 @@ func (c *Client) run() {
return
}
if streamType == gortsplib.StreamTypeRTP &&
!c.trackStartingPoints[trackID].Filled {
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
return
}
sp := c.trackStartingPoints[trackID]
sp.Filled = true
sp.SequenceNumber = pkt.SequenceNumber
sp.Timestamp = pkt.Timestamp
c.path.OnClientStartingPoint(client.StartingPointReq{c, trackID, sp}) // nolint:govet
}
c.path.OnFrame(trackID, streamType, payload)
c.sp.OnFrame(trackID, streamType, payload)
}
readDone := c.conn.Read(gortsplib.ServerConnReadHandlers{
@ -604,7 +588,7 @@ func (c *Client) recordStart() { @@ -604,7 +588,7 @@ func (c *Client) recordStart() {
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
tracksLen := len(c.conn.SetuppedTracks())
tracksLen := len(c.conn.AnnouncedTracks())
c.log(logger.Info, "is publishing to path '%s', %d %s with %s",
c.path.Name(),
@ -623,6 +607,8 @@ func (c *Client) recordStart() { @@ -623,6 +607,8 @@ func (c *Client) recordStart() {
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
}
c.sp = streamproc.New(c, c.path, make([]source.TrackStartingPoint, len(c.conn.AnnouncedTracks())))
}
func (c *Client) recordStop() {

147
internal/path/path.go

@ -15,6 +15,7 @@ import ( @@ -15,6 +15,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/sourcertmp"
"github.com/aler9/rtsp-simple-server/internal/sourcertsp"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -33,32 +34,10 @@ type Parent interface { @@ -33,32 +34,10 @@ type Parent interface {
OnPathClientClose(client.Client)
}
// source is implemented by all sources (client* and source*).
type source interface {
IsSource()
}
// extSource is implemented by all external sources.
type extSource interface {
IsSource()
IsExtSource()
Close()
}
type sourceRedirect struct{}
func (*sourceRedirect) IsSource() {}
type extSourceSetReadyReq struct {
tracks gortsplib.Tracks
startingPoints []*client.TrackStartingPoint
res chan struct{}
}
type extSourceSetNotReadyReq struct {
res chan struct{}
}
type clientState int
const (
@ -95,9 +74,9 @@ type Path struct { @@ -95,9 +74,9 @@ type Path struct {
clientsWg sync.WaitGroup
describeRequests []client.DescribeReq
setupPlayRequests []client.SetupPlayReq
source source
source source.Source
sourceTracks gortsplib.Tracks
sourceTrackStartingPoints []*client.TrackStartingPoint
sourceTrackStartingPoints []source.TrackStartingPoint
readers *readersMap
onDemandCmd *externalcmd.Cmd
describeTimer *time.Timer
@ -111,15 +90,15 @@ type Path struct { @@ -111,15 +90,15 @@ type Path struct {
closeTimerStarted bool
// in
extSourceSetReady chan extSourceSetReadyReq // from external source
extSourceSetNotReady chan extSourceSetNotReadyReq // from external source
setStartingPoint chan source.SetStartingPointReq
extSourceSetReady chan source.ExtSetReadyReq
extSourceSetNotReady chan source.ExtSetNotReadyReq
clientDescribe chan client.DescribeReq
clientSetupPlay chan client.SetupPlayReq
clientAnnounce chan client.AnnounceReq
clientPlay chan client.PlayReq
clientRecord chan client.RecordReq
clientPause chan client.PauseReq
clientStartingPoint chan client.StartingPointReq
clientRemove chan client.RemoveReq
terminate chan struct{}
}
@ -156,15 +135,15 @@ func New( @@ -156,15 +135,15 @@ func New(
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
extSourceSetReady: make(chan extSourceSetReadyReq),
extSourceSetNotReady: make(chan extSourceSetNotReadyReq),
setStartingPoint: make(chan source.SetStartingPointReq),
extSourceSetReady: make(chan source.ExtSetReadyReq),
extSourceSetNotReady: make(chan source.ExtSetNotReadyReq),
clientDescribe: make(chan client.DescribeReq),
clientSetupPlay: make(chan client.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq),
clientPlay: make(chan client.PlayReq),
clientRecord: make(chan client.RecordReq),
clientPause: make(chan client.PauseReq),
clientStartingPoint: make(chan client.StartingPointReq),
clientRemove: make(chan client.RemoveReq),
terminate: make(chan struct{}),
}
@ -226,7 +205,7 @@ outer: @@ -226,7 +205,7 @@ outer:
case <-pa.sourceCloseTimer.C:
pa.sourceCloseTimerStarted = false
pa.source.(extSource).Close()
pa.source.(source.ExtSource).Close()
pa.source = nil
pa.scheduleClose()
@ -245,15 +224,25 @@ outer: @@ -245,15 +224,25 @@ outer:
<-pa.terminate
break outer
case req := <-pa.setStartingPoint:
pa.onSetStartingPoint(req)
case req := <-pa.extSourceSetReady:
pa.sourceTracks = req.tracks
pa.sourceTrackStartingPoints = req.startingPoints
pa.sourceTracks = req.Tracks
// clone
cl := make([]source.TrackStartingPoint, len(req.StartingPoints))
for k, v := range req.StartingPoints {
cl[k] = v
}
pa.sourceTrackStartingPoints = cl
pa.onSourceSetReady()
close(req.res)
close(req.Res)
case req := <-pa.extSourceSetNotReady:
pa.onSourceSetNotReady()
close(req.res)
close(req.Res)
case req := <-pa.clientDescribe:
pa.onClientDescribe(req)
@ -273,9 +262,6 @@ outer: @@ -273,9 +262,6 @@ outer:
case req := <-pa.clientPause:
pa.onClientPause(req)
case req := <-pa.clientStartingPoint:
pa.onClientStartingPoint(req)
case req := <-pa.clientRemove:
if _, ok := pa.clients[req.Client]; !ok {
close(req.Res)
@ -306,7 +292,7 @@ outer: @@ -306,7 +292,7 @@ outer:
onInitCmd.Close()
}
if source, ok := pa.source.(extSource); ok {
if source, ok := pa.source.(source.ExtSource); ok {
source.Close()
}
pa.sourceWg.Wait()
@ -339,6 +325,7 @@ outer: @@ -339,6 +325,7 @@ outer:
}
pa.clientsWg.Wait()
close(pa.setStartingPoint)
close(pa.extSourceSetReady)
close(pa.extSourceSetNotReady)
close(pa.clientDescribe)
@ -347,7 +334,6 @@ outer: @@ -347,7 +334,6 @@ outer:
close(pa.clientPlay)
close(pa.clientRecord)
close(pa.clientPause)
close(pa.clientStartingPoint)
close(pa.clientRemove)
}
@ -355,17 +341,22 @@ func (pa *Path) exhaustChannels() { @@ -355,17 +341,22 @@ func (pa *Path) exhaustChannels() {
go func() {
for {
select {
case _, ok := <-pa.setStartingPoint:
if !ok {
return
}
case req, ok := <-pa.extSourceSetReady:
if !ok {
return
}
close(req.res)
close(req.Res)
case req, ok := <-pa.extSourceSetNotReady:
if !ok {
return
}
close(req.res)
close(req.Res)
case req, ok := <-pa.clientDescribe:
if !ok {
@ -403,11 +394,6 @@ func (pa *Path) exhaustChannels() { @@ -403,11 +394,6 @@ func (pa *Path) exhaustChannels() {
}
close(req.Res)
case _, ok := <-pa.clientStartingPoint:
if !ok {
return
}
case req, ok := <-pa.clientRemove:
if !ok {
return
@ -584,6 +570,14 @@ func (pa *Path) fixedPublisherStart() { @@ -584,6 +570,14 @@ func (pa *Path) fixedPublisherStart() {
}
}
func (pa *Path) onSetStartingPoint(req source.SetStartingPointReq) {
if req.Source != pa.source {
return
}
pa.sourceTrackStartingPoints[req.TrackID] = req.StartingPoint
}
func (pa *Path) onClientDescribe(req client.DescribeReq) {
if _, ok := pa.clients[req.Client]; ok {
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet
@ -674,8 +668,8 @@ func (pa *Path) onClientPlay(req client.PlayReq) { @@ -674,8 +668,8 @@ func (pa *Path) onClientPlay(req client.PlayReq) {
pa.clients[req.Client] = clientStatePlay
pa.readers.add(req.Client)
// clone slice, do not clone items
cl := make([]*client.TrackStartingPoint, len(pa.sourceTrackStartingPoints))
// clone
cl := make([]source.TrackStartingPoint, len(pa.sourceTrackStartingPoints))
for k, v := range pa.sourceTrackStartingPoints {
cl[k] = v
}
@ -712,7 +706,7 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) { @@ -712,7 +706,7 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
pa.source = req.Client
pa.sourceTracks = req.Tracks
pa.sourceTrackStartingPoints = make([]*client.TrackStartingPoint, len(req.Tracks))
pa.sourceTrackStartingPoints = make([]source.TrackStartingPoint, len(req.Tracks))
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
}
@ -750,14 +744,6 @@ func (pa *Path) onClientPause(req client.PauseReq) { @@ -750,14 +744,6 @@ func (pa *Path) onClientPause(req client.PauseReq) {
close(req.Res)
}
func (pa *Path) onClientStartingPoint(req client.StartingPointReq) {
if state, ok := pa.clients[req.Client]; !ok || state != clientStateRecord {
return
}
pa.sourceTrackStartingPoints[req.TrackID] = req.SP
}
func (pa *Path) scheduleSourceClose() {
if !pa.hasExternalSource() || !pa.conf.SourceOnDemand || pa.source == nil {
return
@ -820,19 +806,24 @@ func (pa *Path) Name() string { @@ -820,19 +806,24 @@ func (pa *Path) Name() string {
return pa.name
}
// OnExtSourceSetReady is called by a external source.
func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks,
startingPoints []*client.TrackStartingPoint) {
res := make(chan struct{})
pa.extSourceSetReady <- extSourceSetReadyReq{tracks, startingPoints, res}
<-res
// OnSetStartingPoint is called by a source.
func (pa *Path) OnSetStartingPoint(req source.SetStartingPointReq) {
pa.setStartingPoint <- req
}
// OnExtSourceSetNotReady is called by a external source.
func (pa *Path) OnExtSourceSetNotReady() {
res := make(chan struct{})
pa.extSourceSetNotReady <- extSourceSetNotReadyReq{res}
<-res
// OnFrame is called by a source.
func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.readers.forwardFrame(trackID, streamType, payload)
}
// OnExtSourceSetReady is called by an external source.
func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) {
pa.extSourceSetReady <- req
}
// OnExtSourceSetNotReady is called by an external source.
func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) {
pa.extSourceSetNotReady <- req
}
// OnPathManDescribe is called by pathman.PathMan.
@ -850,32 +841,22 @@ func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) { @@ -850,32 +841,22 @@ func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) {
pa.clientAnnounce <- req
}
// OnClientRemove is called by clientrtsp.Client.
// OnClientRemove is called by a client.
func (pa *Path) OnClientRemove(req client.RemoveReq) {
pa.clientRemove <- req
}
// OnClientPlay is called by clientrtsp.Client.
// OnClientPlay is called by a client.
func (pa *Path) OnClientPlay(req client.PlayReq) {
pa.clientPlay <- req
}
// OnClientRecord is called by clientrtsp.Client.
// OnClientRecord is called by a client.
func (pa *Path) OnClientRecord(req client.RecordReq) {
pa.clientRecord <- req
}
// OnClientPause is called by clientrtsp.Client.
// OnClientPause is called by a client.
func (pa *Path) OnClientPause(req client.PauseReq) {
pa.clientPause <- req
}
// OnClientStartingPoint is called by clientrtsp.Client.
func (pa *Path) OnClientStartingPoint(req client.StartingPointReq) {
pa.clientStartingPoint <- req
}
// OnFrame is called by a source or by a clientrtsp.Client.
func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.readers.forwardFrame(trackID, streamType, payload)
}

13
internal/rtmputils/rtcpsenderset.go → internal/rtcpsenderset/rtcpsenderset.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package rtmputils
package rtcpsenderset
import (
"time"
@ -19,8 +19,8 @@ type RTCPSenderSet struct { @@ -19,8 +19,8 @@ type RTCPSenderSet struct {
done chan struct{}
}
// NewRTCPSenderSet allocates a RTCPSenderSet.
func NewRTCPSenderSet(
// New allocates a RTCPSenderSet.
func New(
tracks gortsplib.Tracks,
onFrame func(int, gortsplib.StreamType, []byte),
) *RTCPSenderSet {
@ -72,8 +72,7 @@ func (s *RTCPSenderSet) run() { @@ -72,8 +72,7 @@ func (s *RTCPSenderSet) run() {
}
}
// ProcessFrame sends a frame to the senders.
func (s *RTCPSenderSet) ProcessFrame(trackID int, t time.Time,
streamType gortsplib.StreamType, f []byte) {
s.senders[trackID].ProcessFrame(t, streamType, f)
// OnFrame sends a frame to the senders.
func (s *RTCPSenderSet) OnFrame(trackID int, streamType gortsplib.StreamType, f []byte) {
s.senders[trackID].ProcessFrame(time.Now(), streamType, f)
}

43
internal/source/source.go

@ -0,0 +1,43 @@ @@ -0,0 +1,43 @@
package source
import (
"github.com/aler9/gortsplib"
)
// TrackStartingPoint is the starting point of a track.
type TrackStartingPoint struct {
Filled bool // used to avoid mutexes
SequenceNumber uint16
Timestamp uint32
}
// Source is implemented by all sources (clients and external sources).
type Source interface {
IsSource()
}
// ExtSource is implemented by all external sources.
type ExtSource interface {
IsSource()
IsExtSource()
Close()
}
// SetStartingPointReq is a set starting point request.
type SetStartingPointReq struct {
Source Source
TrackID int
StartingPoint TrackStartingPoint
}
// ExtSetReadyReq is a set ready request.
type ExtSetReadyReq struct {
Tracks gortsplib.Tracks
StartingPoints []TrackStartingPoint
Res chan struct{}
}
// ExtSetNotReadyReq is a set not ready request.
type ExtSetNotReadyReq struct {
Res chan struct{}
}

47
internal/sourcertmp/source.go

@ -14,10 +14,12 @@ import ( @@ -14,10 +14,12 @@ import (
"github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtcpsenderset"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
)
const (
@ -27,12 +29,13 @@ const ( @@ -27,12 +29,13 @@ const (
// Parent is implemeneted by path.Path.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnExtSourceSetReady(gortsplib.Tracks, []*client.TrackStartingPoint)
OnExtSourceSetNotReady()
OnExtSourceSetReady(req source.ExtSetReadyReq)
OnExtSourceSetNotReady(req source.ExtSetNotReadyReq)
OnSetStartingPoint(source.SetStartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// Source is a RTMP source.
// Source is a RTMP external source.
type Source struct {
ur string
readTimeout time.Duration
@ -74,7 +77,7 @@ func (s *Source) Close() { @@ -74,7 +77,7 @@ func (s *Source) Close() {
close(s.terminate)
}
// IsSource implements path.source.
// IsSource implements source.Source.
func (s *Source) IsSource() {}
// IsExtSource implements path.extSource.
@ -174,16 +177,34 @@ func (s *Source) runInner() bool { @@ -174,16 +177,34 @@ func (s *Source) runInner() bool {
}
s.log(logger.Info, "ready")
s.parent.OnExtSourceSetReady(tracks,
make([]*client.TrackStartingPoint, len(tracks)))
defer s.parent.OnExtSourceSetNotReady()
res := make(chan struct{})
s.parent.OnExtSourceSetReady(source.ExtSetReadyReq{
Tracks: tracks,
StartingPoints: make([]source.TrackStartingPoint, len(tracks)),
Res: res,
})
<-res
defer func() {
res := make(chan struct{})
s.parent.OnExtSourceSetNotReady(source.ExtSetNotReadyReq{
Res: res,
})
<-res
}()
readerDone := make(chan error)
go func() {
readerDone <- func() error {
rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, s.parent.OnFrame)
rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnFrame)
defer rtcpSenders.Close()
sp := streamproc.New(s, s.parent, make([]source.TrackStartingPoint, len(tracks)))
onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
sp.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, payload)
}
for {
conn.NetConn().SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := conn.ReadPacket()
@ -214,9 +235,7 @@ func (s *Source) runInner() bool { @@ -214,9 +235,7 @@ func (s *Source) runInner() bool {
}
for _, frame := range frames {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, frame)
onFrame(videoTrack.ID, frame)
}
}
@ -233,9 +252,7 @@ func (s *Source) runInner() bool { @@ -233,9 +252,7 @@ func (s *Source) runInner() bool {
return err
}
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, frame)
onFrame(audioTrack.ID, frame)
default:
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)

42
internal/sourcertsp/source.go

@ -8,9 +8,10 @@ import ( @@ -8,9 +8,10 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/source"
"github.com/aler9/rtsp-simple-server/internal/stats"
"github.com/aler9/rtsp-simple-server/internal/streamproc"
)
const (
@ -20,12 +21,13 @@ const ( @@ -20,12 +21,13 @@ const (
// Parent is implemented by path.Path.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnExtSourceSetReady(gortsplib.Tracks, []*client.TrackStartingPoint)
OnExtSourceSetNotReady()
OnExtSourceSetReady(req source.ExtSetReadyReq)
OnExtSourceSetNotReady(req source.ExtSetNotReadyReq)
OnSetStartingPoint(source.SetStartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// Source is a RTSP source.
// Source is a RTSP external source.
type Source struct {
ur string
proto *gortsplib.StreamProtocol
@ -79,7 +81,7 @@ func (s *Source) Close() { @@ -79,7 +81,7 @@ func (s *Source) Close() {
close(s.terminate)
}
// IsSource implements path.source.
// IsSource implements source.Source.
func (s *Source) IsSource() {}
// IsExtSource implements path.extSource.
@ -148,7 +150,7 @@ func (s *Source) runInner() bool { @@ -148,7 +150,7 @@ func (s *Source) runInner() bool {
return true
}
startingPoints := make([]*client.TrackStartingPoint, len(conn.Tracks()))
trackStartingPoints := make([]source.TrackStartingPoint, len(conn.Tracks()))
if conn.RTPInfo() != nil {
for _, info := range *conn.RTPInfo() {
@ -179,20 +181,32 @@ func (s *Source) runInner() bool { @@ -179,20 +181,32 @@ func (s *Source) runInner() bool {
continue
}
startingPoints[trackID] = &client.TrackStartingPoint{
Filled: true,
SequenceNumber: info.SequenceNumber,
Timestamp: info.Timestamp,
}
trackStartingPoints[trackID].Filled = true
trackStartingPoints[trackID].SequenceNumber = info.SequenceNumber
trackStartingPoints[trackID].Timestamp = info.Timestamp
}
}
s.log(logger.Info, "ready")
s.parent.OnExtSourceSetReady(conn.Tracks(), startingPoints)
defer s.parent.OnExtSourceSetNotReady()
res := make(chan struct{})
s.parent.OnExtSourceSetReady(source.ExtSetReadyReq{
Tracks: conn.Tracks(),
StartingPoints: trackStartingPoints,
Res: res,
})
<-res
defer func() {
res := make(chan struct{})
s.parent.OnExtSourceSetNotReady(source.ExtSetNotReadyReq{
Res: res,
})
<-res
}()
sp := streamproc.New(s, s.parent, trackStartingPoints)
done := conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.parent.OnFrame(trackID, streamType, payload)
sp.OnFrame(trackID, streamType, payload)
})
for {

54
internal/streamproc/streamproc.go

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
package streamproc
import (
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
"github.com/aler9/rtsp-simple-server/internal/source"
)
// Path is implemented by path.path.
type Path interface {
OnSetStartingPoint(source.SetStartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// StreamProc is a stream processor, an intermediate layer between a source and a path.
type StreamProc struct {
source source.Source
path Path
startingPoints []source.TrackStartingPoint
}
// New allocates a StreamProc.
func New(source source.Source, path Path, startingPoints []source.TrackStartingPoint) *StreamProc {
return &StreamProc{
source: source,
path: path,
startingPoints: startingPoints,
}
}
// OnFrame processes a frame.
func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP &&
!sp.startingPoints[trackID].Filled {
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
return
}
sp.startingPoints[trackID].Filled = true
sp.startingPoints[trackID].SequenceNumber = pkt.SequenceNumber
sp.startingPoints[trackID].Timestamp = pkt.Timestamp
sp.path.OnSetStartingPoint(source.SetStartingPointReq{
Source: sp.source,
TrackID: trackID,
StartingPoint: sp.startingPoints[trackID],
})
}
sp.path.OnFrame(trackID, streamType, payload)
}

47
main_clientrtmp_test.go

@ -4,6 +4,9 @@ import ( @@ -4,6 +4,9 @@ import (
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/stretchr/testify/require"
)
@ -208,3 +211,47 @@ func TestClientRTMPAuthFail(t *testing.T) { @@ -208,3 +211,47 @@ func TestClientRTMPAuthFail(t *testing.T) {
require.NotEqual(t, 0, cnt2.wait())
})
}
func TestClientRTMPRTPInfo(t *testing.T) {
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideoaudio.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + ":1935/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
},
SequenceNumber: (*dest.RTPInfo())[0].SequenceNumber,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
},
SequenceNumber: (*dest.RTPInfo())[1].SequenceNumber,
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
}

13
main_clientrtsp_test.go

@ -590,14 +590,7 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -590,14 +590,7 @@ func TestClientRTSPRTPInfo(t *testing.T) {
track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
conf := gortsplib.ClientConf{
StreamProtocol: func() *gortsplib.StreamProtocol {
v := gortsplib.StreamProtocolTCP
return &v
}(),
}
source, err := conf.DialPublish("rtsp://"+ownDockerIP+":8554/teststream",
source, err := gortsplib.DialPublish("rtsp://"+ownDockerIP+":8554/teststream",
gortsplib.Tracks{track1, track2})
require.NoError(t, err)
defer source.Close()
@ -620,7 +613,7 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -620,7 +613,7 @@ func TestClientRTSPRTPInfo(t *testing.T) {
require.NoError(t, err)
func() {
dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
@ -655,7 +648,7 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -655,7 +648,7 @@ func TestClientRTSPRTPInfo(t *testing.T) {
require.NoError(t, err)
func() {
dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()

47
main_sourcertmp_test.go

@ -4,6 +4,9 @@ import ( @@ -4,6 +4,9 @@ import (
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/stretchr/testify/require"
)
@ -55,3 +58,47 @@ func TestSourceRTMP(t *testing.T) { @@ -55,3 +58,47 @@ func TestSourceRTMP(t *testing.T) {
})
}
}
func TestSourceRTMPRTPInfo(t *testing.T) {
cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{})
require.NoError(t, err)
defer cnt1.close()
cnt2, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://" + cnt1.ip() + "/stream/test",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
p, ok := testProgram("paths:\n" +
" proxied:\n" +
" source: rtmp://" + cnt1.ip() + "/stream/test\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
dest, err := gortsplib.DialRead("rtsp://127.0.1.2:8554/proxied")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=0",
},
SequenceNumber: (*dest.RTPInfo())[0].SequenceNumber,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
}, dest.RTPInfo())
}

46
main_sourcertsp_test.go

@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@ -219,6 +220,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) { @@ -219,6 +220,7 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
// provide a partial RTP-Info with only one track
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
@ -233,6 +235,28 @@ func TestSourceRTSPRTPInfo(t *testing.T) { @@ -233,6 +235,28 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
}.Write(bconn.Writer)
require.NoError(t, err)
// send a packet to fill the missing RTP-Info track
pkt := rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: gortsplib.StreamTypeRTP,
Payload: buf,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
@ -253,16 +277,9 @@ func TestSourceRTSPRTPInfo(t *testing.T) { @@ -253,16 +277,9 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
conf := gortsplib.ClientConf{
StreamProtocol: func() *gortsplib.StreamProtocol {
v := gortsplib.StreamProtocolTCP
return &v
}(),
}
dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/proxied")
dest, err := gortsplib.DialRead("rtsp://127.0.1.2:8554/proxied")
require.NoError(t, err)
defer dest.Close()
@ -270,7 +287,16 @@ func TestSourceRTSPRTPInfo(t *testing.T) { @@ -270,7 +287,16 @@ func TestSourceRTSPRTPInfo(t *testing.T) {
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=0",
},
SequenceNumber: 87,
Timestamp: 756436454,
},
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=1",
},
SequenceNumber: 34254,

Loading…
Cancel
Save