Browse Source

add RTP-Info to PLAY responses, allowing VLC to compute the elapsed time (#233)

pull/340/head
aler9 4 years ago
parent
commit
508d30dc07
  1. 4
      go.mod
  2. 4
      go.sum
  3. 22
      internal/client/client.go
  4. 7
      internal/clientrtmp/client.go
  5. 90
      internal/clientrtsp/client.go
  6. 115
      internal/path/path.go
  7. 104
      main_rtsp_test.go

4
go.mod

@ -5,12 +5,12 @@ go 1.15 @@ -5,12 +5,12 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210316211856-c2de28c18558
github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/notedit/rtmp v0.0.0
github.com/pion/rtp v1.6.2 // indirect
github.com/pion/rtp v1.6.2
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210316211856-c2de28c18558 h1:e1MhWMfd4vlPPk9WZHSTC/Muigq41/+A6PSrgaRFUDY=
github.com/aler9/gortsplib v0.0.0-20210316211856-c2de28c18558/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0=
github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf h1:5wdYKMopoCRspHReiEqx6KWM1aqmQORumi9aWLlv7hM=
github.com/aler9/gortsplib v0.0.0-20210318191934-8936db52e4cf/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

22
internal/client/client.go

@ -91,10 +91,22 @@ type RemoveReq struct { @@ -91,10 +91,22 @@ type RemoveReq struct {
Res chan struct{}
}
// TrackStartingPoint is the starting point of a track.
type TrackStartingPoint struct {
Filled bool // used by clientrtsp to avoid mutexes
SequenceNumber uint16
Timestamp uint32
}
// PlayRes is a play response.
type PlayRes struct {
TrackStartingPoints []*TrackStartingPoint
}
// PlayReq is a play request.
type PlayReq struct {
Client Client
Res chan struct{}
Res chan PlayRes
}
// RecordReq is a record request.
@ -109,6 +121,13 @@ type PauseReq struct { @@ -109,6 +121,13 @@ type PauseReq struct {
Res chan struct{}
}
// StartingPointReq is a starting point request.
type StartingPointReq struct {
Client Client
TrackID int
SP *TrackStartingPoint
}
// Path is implemented by path.Path.
type Path interface {
Name() string
@ -117,6 +136,7 @@ type Path interface { @@ -117,6 +136,7 @@ type Path interface {
OnClientPlay(PlayReq)
OnClientRecord(RecordReq)
OnClientPause(PauseReq)
OnClientStartingPoint(StartingPointReq)
OnFrame(int, gortsplib.StreamType, []byte)
}

7
internal/clientrtmp/client.go

@ -283,7 +283,7 @@ func (c *Client) runRead() { @@ -283,7 +283,7 @@ func (c *Client) runRead() {
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
resc := make(chan struct{})
resc := make(chan client.PlayRes)
path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
@ -358,10 +358,8 @@ func (c *Client) runRead() { @@ -358,10 +358,8 @@ func (c *Client) runRead() {
videoPTS = nt.Timestamp
videoBuf = append(videoBuf, nt.NALU)
}
continue
}
if c.audioTrack != nil && pair.trackID == c.audioTrack.ID {
} else if c.audioTrack != nil && pair.trackID == c.audioTrack.ID {
ats, err := c.aacDecoder.Decode(pair.buf)
if err != nil {
c.log(logger.Debug, "ERR while decoding audio track: %v", err)
@ -381,7 +379,6 @@ func (c *Client) runRead() { @@ -381,7 +379,6 @@ func (c *Client) runRead() {
return err
}
}
continue
}
}
}()

90
internal/clientrtsp/client.go

@ -14,6 +14,7 @@ import ( @@ -14,6 +14,7 @@ import (
"github.com/aler9/gortsplib/pkg/auth"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/pion/rtp"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
@ -64,13 +65,18 @@ type Client struct { @@ -64,13 +65,18 @@ type Client struct {
conn *gortsplib.ServerConn
parent Parent
path client.Path
authUser string
authPass string
authValidator *auth.Validator
authFailures int
onReadCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
path client.Path
trackStartingPoints []*client.TrackStartingPoint
authUser string
authPass string
authValidator *auth.Validator
authFailures int
// read only
onReadCmd *externalcmd.Cmd
// publish only
onPublishCmd *externalcmd.Cmd
// in
terminate chan struct{}
@ -229,6 +235,11 @@ func (c *Client) run() { @@ -229,6 +235,11 @@ func (c *Client) run() {
c.path = res.Path
c.trackStartingPoints = make([]*client.TrackStartingPoint, len(ctx.Tracks))
for i := range ctx.Tracks {
c.trackStartingPoints[i] = &client.TrackStartingPoint{}
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
@ -305,14 +316,42 @@ func (c *Client) run() { @@ -305,14 +316,42 @@ func (c *Client) run() {
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), ctx.Path)
}
c.playStart()
res := c.playStart()
c.trackStartingPoints = res.TrackStartingPoints
}
h := base.Header{
"Session": base.HeaderValue{sessionID},
}
// add RTP-Info
var ri headers.RTPInfo
for id, v := range c.trackStartingPoints {
if v == nil {
continue
}
u := &base.URL{
Scheme: ctx.Req.URL.Scheme,
User: ctx.Req.URL.User,
Host: ctx.Req.URL.Host,
Path: "/" + c.path.Name(),
}
u.AddControlAttribute("trackID=" + strconv.FormatInt(int64(id), 10))
ri = append(ri, &headers.RTPInfoEntry{
URL: u,
SequenceNumber: v.SequenceNumber,
Timestamp: v.Timestamp,
})
}
if len(ri) > 0 {
h["RTP-Info"] = ri.Write()
}
return &base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Session": base.HeaderValue{sessionID},
},
Header: h,
}, nil
}
@ -361,6 +400,23 @@ func (c *Client) run() { @@ -361,6 +400,23 @@ func (c *Client) run() {
return
}
if streamType == gortsplib.StreamTypeRTP &&
!c.trackStartingPoints[trackID].Filled {
pkt := rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
return
}
sp := c.trackStartingPoints[trackID]
sp.Filled = true
sp.SequenceNumber = pkt.SequenceNumber
sp.Timestamp = pkt.Timestamp
c.path.OnClientStartingPoint(client.StartingPointReq{c, trackID, sp}) // nolint:govet
}
c.path.OnFrame(trackID, streamType, payload)
}
@ -506,10 +562,10 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, @@ -506,10 +562,10 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
return nil
}
func (c *Client) playStart() {
resc := make(chan struct{})
func (c *Client) playStart() client.PlayRes {
resc := make(chan client.PlayRes)
c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
res := <-resc
tracksLen := len(c.conn.SetuppedTracks())
@ -530,6 +586,8 @@ func (c *Client) playStart() { @@ -530,6 +586,8 @@ func (c *Client) playStart() {
Port: strconv.FormatInt(int64(c.rtspPort), 10),
})
}
return res
}
func (c *Client) playStop() {
@ -571,10 +629,10 @@ func (c *Client) recordStop() { @@ -571,10 +629,10 @@ func (c *Client) recordStop() {
}
// OnIncomingFrame implements path.Reader.
func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if _, ok := c.conn.SetuppedTracks()[trackID]; !ok {
return
}
c.conn.WriteFrame(trackID, streamType, buf)
c.conn.WriteFrame(trackID, streamType, payload)
}

115
internal/path/path.go

@ -51,6 +51,11 @@ func (*sourceRedirect) IsSource() {} @@ -51,6 +51,11 @@ func (*sourceRedirect) IsSource() {}
type extSourceSetReadyReq struct {
tracks gortsplib.Tracks
res chan struct{}
}
type extSourceSetNotReadyReq struct {
res chan struct{}
}
type clientState int
@ -91,6 +96,7 @@ type Path struct { @@ -91,6 +96,7 @@ type Path struct {
setupPlayRequests []client.SetupPlayReq
source source
sourceTracks gortsplib.Tracks
sourceTrackStartingPoints []*client.TrackStartingPoint
readers *readersMap
onDemandCmd *externalcmd.Cmd
describeTimer *time.Timer
@ -104,14 +110,15 @@ type Path struct { @@ -104,14 +110,15 @@ type Path struct {
closeTimerStarted bool
// in
extSourceSetReady chan extSourceSetReadyReq // from external source
extSourceSetNotReady chan struct{} // from external source
extSourceSetReady chan extSourceSetReadyReq // from external source
extSourceSetNotReady chan extSourceSetNotReadyReq // from external source
clientDescribe chan client.DescribeReq
clientSetupPlay chan client.SetupPlayReq
clientAnnounce chan client.AnnounceReq
clientPlay chan client.PlayReq
clientRecord chan client.RecordReq
clientPause chan client.PauseReq
clientStartingPoint chan client.StartingPointReq
clientRemove chan client.RemoveReq
terminate chan struct{}
}
@ -149,13 +156,14 @@ func New( @@ -149,13 +156,14 @@ func New(
runOnDemandCloseTimer: newEmptyTimer(),
closeTimer: newEmptyTimer(),
extSourceSetReady: make(chan extSourceSetReadyReq),
extSourceSetNotReady: make(chan struct{}),
extSourceSetNotReady: make(chan extSourceSetNotReadyReq),
clientDescribe: make(chan client.DescribeReq),
clientSetupPlay: make(chan client.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq),
clientPlay: make(chan client.PlayReq),
clientRecord: make(chan client.RecordReq),
clientPause: make(chan client.PauseReq),
clientStartingPoint: make(chan client.StartingPointReq),
clientRemove: make(chan client.RemoveReq),
terminate: make(chan struct{}),
}
@ -238,10 +246,13 @@ outer: @@ -238,10 +246,13 @@ outer:
case req := <-pa.extSourceSetReady:
pa.sourceTracks = req.tracks
pa.sourceTrackStartingPoints = make([]*client.TrackStartingPoint, len(req.tracks))
pa.onSourceSetReady()
close(req.res)
case <-pa.extSourceSetNotReady:
case req := <-pa.extSourceSetNotReady:
pa.onSourceSetNotReady()
close(req.res)
case req := <-pa.clientDescribe:
pa.onClientDescribe(req)
@ -253,16 +264,16 @@ outer: @@ -253,16 +264,16 @@ outer:
pa.onClientAnnounce(req)
case req := <-pa.clientPlay:
pa.onClientPlay(req.Client)
close(req.Res)
pa.onClientPlay(req)
case req := <-pa.clientRecord:
pa.onClientRecord(req.Client)
close(req.Res)
pa.onClientRecord(req)
case req := <-pa.clientPause:
pa.onClientPause(req.Client)
close(req.Res)
pa.onClientPause(req)
case req := <-pa.clientStartingPoint:
pa.onClientStartingPoint(req)
case req := <-pa.clientRemove:
if _, ok := pa.clients[req.Client]; !ok {
@ -336,6 +347,7 @@ outer: @@ -336,6 +347,7 @@ outer:
close(pa.clientPlay)
close(pa.clientRecord)
close(pa.clientPause)
close(pa.clientStartingPoint)
close(pa.clientRemove)
}
@ -343,15 +355,17 @@ func (pa *Path) exhaustChannels() { @@ -343,15 +355,17 @@ func (pa *Path) exhaustChannels() {
go func() {
for {
select {
case _, ok := <-pa.extSourceSetReady:
case req, ok := <-pa.extSourceSetReady:
if !ok {
return
}
close(req.res)
case _, ok := <-pa.extSourceSetNotReady:
case req, ok := <-pa.extSourceSetNotReady:
if !ok {
return
}
close(req.res)
case req, ok := <-pa.clientDescribe:
if !ok {
@ -389,6 +403,11 @@ func (pa *Path) exhaustChannels() { @@ -389,6 +403,11 @@ func (pa *Path) exhaustChannels() {
}
close(req.Res)
case _, ok := <-pa.clientStartingPoint:
if !ok {
return
}
case req, ok := <-pa.clientRemove:
if !ok {
return
@ -655,19 +674,18 @@ func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { @@ -655,19 +674,18 @@ func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
req.Res <- client.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet
}
func (pa *Path) onClientPlay(c client.Client) {
state, ok := pa.clients[c]
if !ok {
return
}
func (pa *Path) onClientPlay(req client.PlayReq) {
atomic.AddInt64(pa.stats.CountReaders, 1)
pa.clients[req.Client] = clientStatePlay
pa.readers.add(req.Client)
if state != clientStatePrePlay {
return
// clone slice, do not clone items
cl := make([]*client.TrackStartingPoint, len(pa.sourceTrackStartingPoints))
for k, v := range pa.sourceTrackStartingPoints {
cl[k] = v
}
atomic.AddInt64(pa.stats.CountReaders, 1)
pa.clients[c] = clientStatePlay
pa.readers.add(c)
req.Res <- client.PlayRes{cl} // nolint:govet
}
func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
@ -699,40 +717,50 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) { @@ -699,40 +717,50 @@ func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
pa.source = req.Client
pa.sourceTracks = req.Tracks
pa.sourceTrackStartingPoints = make([]*client.TrackStartingPoint, len(req.Tracks))
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
}
func (pa *Path) onClientRecord(c client.Client) {
state, ok := pa.clients[c]
if !ok {
return
}
if state != clientStatePreRecord {
func (pa *Path) onClientRecord(req client.RecordReq) {
if state, ok := pa.clients[req.Client]; !ok || state != clientStatePreRecord {
close(req.Res)
return
}
atomic.AddInt64(pa.stats.CountPublishers, 1)
pa.clients[c] = clientStateRecord
pa.clients[req.Client] = clientStateRecord
pa.onSourceSetReady()
close(req.Res)
}
func (pa *Path) onClientPause(c client.Client) {
state, ok := pa.clients[c]
func (pa *Path) onClientPause(req client.PauseReq) {
state, ok := pa.clients[req.Client]
if !ok {
close(req.Res)
return
}
if state == clientStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1)
pa.clients[c] = clientStatePrePlay
pa.readers.remove(c)
pa.clients[req.Client] = clientStatePrePlay
pa.readers.remove(req.Client)
} else if state == clientStateRecord {
atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.clients[c] = clientStatePreRecord
pa.clients[req.Client] = clientStatePreRecord
pa.onSourceSetNotReady()
}
close(req.Res)
}
func (pa *Path) onClientStartingPoint(req client.StartingPointReq) {
if state, ok := pa.clients[req.Client]; !ok || state != clientStateRecord {
return
}
pa.sourceTrackStartingPoints[req.TrackID] = req.SP
}
func (pa *Path) scheduleSourceClose() {
@ -799,12 +827,16 @@ func (pa *Path) Name() string { @@ -799,12 +827,16 @@ func (pa *Path) Name() string {
// OnExtSourceSetReady is called by a external source.
func (pa *Path) OnExtSourceSetReady(tracks gortsplib.Tracks) {
pa.extSourceSetReady <- extSourceSetReadyReq{tracks}
res := make(chan struct{})
pa.extSourceSetReady <- extSourceSetReadyReq{tracks, res}
<-res
}
// OnExtSourceSetNotReady is called by a external source.
func (pa *Path) OnExtSourceSetNotReady() {
pa.extSourceSetNotReady <- struct{}{}
res := make(chan struct{})
pa.extSourceSetNotReady <- extSourceSetNotReadyReq{res}
<-res
}
// OnPathManDescribe is called by pathman.PathMan.
@ -842,7 +874,12 @@ func (pa *Path) OnClientPause(req client.PauseReq) { @@ -842,7 +874,12 @@ func (pa *Path) OnClientPause(req client.PauseReq) {
pa.clientPause <- req
}
// OnClientStartingPoint is called by clientrtsp.Client.
func (pa *Path) OnClientStartingPoint(req client.StartingPointReq) {
pa.clientStartingPoint <- req
}
// OnFrame is called by a source or by a clientrtsp.Client.
func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
pa.readers.forwardFrame(trackID, streamType, buf)
func (pa *Path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.readers.forwardFrame(trackID, streamType, payload)
}

104
main_rtsp_test.go

@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@ -579,6 +580,109 @@ func TestRTSPNonCompliantFrameSize(t *testing.T) { @@ -579,6 +580,109 @@ func TestRTSPNonCompliantFrameSize(t *testing.T) {
})
}
func TestRTSPRTPInfo(t *testing.T) {
p, ok := testProgram("rtmpDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
track1, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
conf := gortsplib.ClientConf{
StreamProtocol: func() *gortsplib.StreamProtocol {
v := gortsplib.StreamProtocolTCP
return &v
}(),
}
source, err := conf.DialPublish("rtsp://"+ownDockerIP+":8554/teststream",
gortsplib.Tracks{track1, track2})
require.NoError(t, err)
defer source.Close()
pkt := rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 556,
Timestamp: 984512368,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
err = source.WriteFrame(track1.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
func() {
dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
},
SequenceNumber: 556,
Timestamp: 984512368,
},
}, dest.RTPInfo())
}()
pkt = rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err = pkt.Marshal()
require.NoError(t, err)
err = source.WriteFrame(track2.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
func() {
dest, err := conf.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
},
SequenceNumber: 556,
Timestamp: 984512368,
},
&headers.RTPInfoEntry{
URL: &base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
},
SequenceNumber: 87,
Timestamp: 756436454,
},
}, dest.RTPInfo())
}()
}
func TestRTSPRedirect(t *testing.T) {
p1, ok := testProgram("rtmpDisable: yes\n" +
"paths:\n" +

Loading…
Cancel
Save