Browse Source

RTSP: add SSRC to Transport header (#380)

pull/442/head
aler9 5 years ago
parent
commit
79a3ab316f
  1. 2
      go.mod
  2. 4
      go.sum
  3. 32
      internal/path/path.go
  4. 12
      internal/pathman/pathman.go
  5. 7
      internal/readpublisher/readpublisher.go
  6. 12
      internal/rtspconn/conn.go
  7. 2
      internal/rtspserver/server.go
  8. 27
      internal/rtspsession/session.go
  9. 12
      internal/streamproc/streamproc.go
  10. 47
      main_clientrtmp_test.go
  11. 165
      main_clientrtsp_test.go
  12. 47
      main_sourcertmp_test.go
  13. 214
      main_sourcertsp_test.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210514205337-76fab391dc85
github.com/aler9/gortsplib v0.0.0-20210516134729-c6ff1e0d0243
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9

4
go.sum

@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210514205337-76fab391dc85 h1:OFulQt+GaeWy+mjPspkh4OBiGpGPVK4JPWVU12wqsQ8=
github.com/aler9/gortsplib v0.0.0-20210514205337-76fab391dc85/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210516134729-c6ff1e0d0243 h1:q6xHohYx2qiXE0txZ68hnXg2525mkuG0yG93AXDkV6c=
github.com/aler9/gortsplib v0.0.0-20210516134729-c6ff1e0d0243/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

32
internal/path/path.go

@ -191,12 +191,12 @@ outer: @@ -191,12 +191,12 @@ outer:
select {
case <-pa.describeTimer.C:
for _, req := range pa.describeRequests {
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("publisher of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
@ -294,11 +294,11 @@ outer: @@ -294,11 +294,11 @@ outer:
}
for _, req := range pa.describeRequests {
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequests {
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")}
}
for c, state := range pa.readPublishers {
@ -517,7 +517,7 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) { @@ -517,7 +517,7 @@ func (pa *Path) onReadPublisherDescribe(req readpublisher.DescribeReq) {
return
}
req.Res <- readpublisher.DescribeRes{nil, "", readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: readpublisher.ErrNoOnePublishing{PathName: pa.name}}
return
}
}
@ -536,7 +536,7 @@ func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) { @@ -536,7 +536,7 @@ func (pa *Path) onReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
return
case sourceStateNotReady:
req.Res <- readpublisher.SetupPlayRes{nil, nil, readpublisher.ErrNoOnePublishing{pa.name}} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: readpublisher.ErrNoOnePublishing{PathName: pa.name}}
return
}
}
@ -558,7 +558,11 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) { @@ -558,7 +558,11 @@ func (pa *Path) onReadPublisherSetupPlayPost(req readpublisher.SetupPlayReq) {
pa.addReadPublisher(req.Author, readPublisherStatePrePlay)
}
req.Res <- readpublisher.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{
Path: pa,
Tracks: pa.sourceTracks,
TrackInfos: pa.sp.TrackInfos(),
}
}
func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) {
@ -571,18 +575,18 @@ func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) { @@ -571,18 +575,18 @@ func (pa *Path) onReadPublisherPlay(req readpublisher.PlayReq) {
func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
if _, ok := pa.readPublishers[req.Author]; ok {
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("already publishing or reading")}
return
}
if pa.hasExternalSource() {
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("path '%s' is assigned to an external source", pa.name)}
return
}
if pa.source != nil {
if pa.conf.DisablePublisherOverride {
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("another client is already publishing on path '%s'", pa.name)} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("another client is already publishing on path '%s'", pa.name)}
return
}
@ -608,7 +612,7 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) { @@ -608,7 +612,7 @@ func (pa *Path) onReadPublisherAnnounce(req readpublisher.AnnounceReq) {
func (pa *Path) onReadPublisherRecord(req readpublisher.RecordReq) {
if state, ok := pa.readPublishers[req.Author]; !ok || state != readPublisherStatePreRecord {
req.Res <- readpublisher.RecordRes{SP: nil, Err: fmt.Errorf("not recording anymore")}
req.Res <- readpublisher.RecordRes{Err: fmt.Errorf("not recording anymore")}
return
}
@ -727,7 +731,7 @@ func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) { @@ -727,7 +731,7 @@ func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) {
select {
case pa.describeReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: fmt.Errorf("terminated")}
}
}
@ -736,7 +740,7 @@ func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) { @@ -736,7 +740,7 @@ func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) {
select {
case pa.setupPlayReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: fmt.Errorf("terminated")}
}
}
@ -745,7 +749,7 @@ func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) { @@ -745,7 +749,7 @@ func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) {
select {
case pa.announceReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: fmt.Errorf("terminated")}
}
}

12
internal/pathman/pathman.go

@ -168,7 +168,7 @@ outer: @@ -168,7 +168,7 @@ outer:
case req := <-pm.rpDescribe:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: err}
continue
}
@ -181,7 +181,7 @@ outer: @@ -181,7 +181,7 @@ outer:
pathConf.ReadPass,
)
if err != nil {
req.Res <- readpublisher.DescribeRes{nil, "", err} //nolint:govet
req.Res <- readpublisher.DescribeRes{Err: err}
continue
}
@ -195,7 +195,7 @@ outer: @@ -195,7 +195,7 @@ outer:
case req := <-pm.rpSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: err}
continue
}
@ -208,7 +208,7 @@ outer: @@ -208,7 +208,7 @@ outer:
pathConf.ReadPass,
)
if err != nil {
req.Res <- readpublisher.SetupPlayRes{nil, nil, err} //nolint:govet
req.Res <- readpublisher.SetupPlayRes{Err: err}
continue
}
@ -222,7 +222,7 @@ outer: @@ -222,7 +222,7 @@ outer:
case req := <-pm.rpAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: err}
continue
}
@ -235,7 +235,7 @@ outer: @@ -235,7 +235,7 @@ outer:
pathConf.PublishPass,
)
if err != nil {
req.Res <- readpublisher.AnnounceRes{nil, err} //nolint:govet
req.Res <- readpublisher.AnnounceRes{Err: err}
continue
}

7
internal/readpublisher/readpublisher.go

@ -79,9 +79,10 @@ type DescribeReq struct { @@ -79,9 +79,10 @@ type DescribeReq struct {
// SetupPlayRes is a setup/play response.
type SetupPlayRes struct {
Path Path
Tracks gortsplib.Tracks
Err error
Path Path
Tracks gortsplib.Tracks
TrackInfos []streamproc.TrackInfo
Err error
}
// SetupPlayReq is a setup/play request.

12
internal/rtspconn/conn.go

@ -238,12 +238,14 @@ func (c *Conn) ValidateCredentials( @@ -238,12 +238,14 @@ func (c *Conn) ValidateCredentials(
c.log(logger.Debug, "WARN: unauthorized: %s", err)
}
return readpublisher.ErrAuthNotCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(),
return readpublisher.ErrAuthNotCritical{
Response: &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(),
},
},
}}
}
}
// login successful, reset authFailures

2
internal/rtspserver/server.go

@ -292,7 +292,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re @@ -292,7 +292,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re
}
// OnSetup implements gortsplib.ServerHandlerOnSetup.
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) {
s.mutex.RLock()
c := s.conns[ctx.Conn]
se := s.sessions[ctx.Session]

27
internal/rtspsession/session.go

@ -158,18 +158,18 @@ func (s *Session) OnAnnounce(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnAnn @@ -158,18 +158,18 @@ func (s *Session) OnAnnounce(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnAnn
}
// OnSetup is called by rtspserver.Server.
func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *uint32, error) {
if ctx.Transport.Protocol == gortsplib.StreamProtocolUDP {
if _, ok := s.protocols[gortsplib.StreamProtocolUDP]; !ok {
return &base.Response{
StatusCode: base.StatusUnsupportedTransport,
}, nil
}, nil, nil
}
} else {
if _, ok := s.protocols[gortsplib.StreamProtocolTCP]; !ok {
return &base.Response{
StatusCode: base.StatusUnsupportedTransport,
}, nil
}, nil, nil
}
}
@ -190,23 +190,23 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC @@ -190,23 +190,23 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC
if res.Err != nil {
switch terr := res.Err.(type) {
case readpublisher.ErrAuthNotCritical:
return terr.Response, nil
return terr.Response, nil, nil
case readpublisher.ErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError)
return terr.Response, errors.New(terr.Message)
return terr.Response, nil, errors.New(terr.Message)
case readpublisher.ErrNoOnePublishing:
return &base.Response{
StatusCode: base.StatusNotFound,
}, res.Err
}, nil, res.Err
default:
return &base.Response{
StatusCode: base.StatusBadRequest,
}, res.Err
}, nil, res.Err
}
}
@ -215,18 +215,27 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC @@ -215,18 +215,27 @@ func (s *Session) OnSetup(c *rtspconn.Conn, ctx *gortsplib.ServerHandlerOnSetupC
if ctx.TrackID >= len(res.Tracks) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("track %d does not exist", ctx.TrackID)
}, nil, fmt.Errorf("track %d does not exist", ctx.TrackID)
}
if s.setuppedTracks == nil {
s.setuppedTracks = make(map[int]*gortsplib.Track)
}
s.setuppedTracks[ctx.TrackID] = res.Tracks[ctx.TrackID]
var ssrc *uint32
if res.TrackInfos[ctx.TrackID].LastSSRC != 0 {
ssrc = &res.TrackInfos[ctx.TrackID].LastSSRC
}
return &base.Response{
StatusCode: base.StatusOK,
}, ssrc, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}, nil, nil
}
// OnPlay is called by rtspserver.Server.

12
internal/streamproc/streamproc.go

@ -18,12 +18,14 @@ type TrackInfo struct { @@ -18,12 +18,14 @@ type TrackInfo struct {
LastSequenceNumber uint16
LastTimeRTP uint32
LastTimeNTP int64
LastSSRC uint32
}
type track struct {
lastSequenceNumber uint32
lastTimeRTP uint32
lastTimeNTP int64
lastSSRC uint32
}
// StreamProc is a stream processor, an intermediate layer between a source and a path.
@ -54,6 +56,7 @@ func (sp *StreamProc) TrackInfos() []TrackInfo { @@ -54,6 +56,7 @@ func (sp *StreamProc) TrackInfos() []TrackInfo {
LastSequenceNumber: uint16(atomic.LoadUint32(&track.lastSequenceNumber)),
LastTimeRTP: atomic.LoadUint32(&track.lastTimeRTP),
LastTimeNTP: atomic.LoadInt64(&track.lastTimeNTP),
LastSSRC: atomic.LoadUint32(&track.lastSSRC),
}
}
return ret
@ -64,14 +67,15 @@ func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payl @@ -64,14 +67,15 @@ func (sp *StreamProc) OnFrame(trackID int, streamType gortsplib.StreamType, payl
if streamType == gortsplib.StreamTypeRTP && len(payload) >= 8 {
track := sp.tracks[trackID]
// store last sequence number
sequenceNumber := binary.BigEndian.Uint16(payload[2 : 2+2])
sequenceNumber := binary.BigEndian.Uint16(payload[2:4])
atomic.StoreUint32(&track.lastSequenceNumber, uint32(sequenceNumber))
// store last RTP time and correspondent NTP time
timestamp := binary.BigEndian.Uint32(payload[4 : 4+4])
timestamp := binary.BigEndian.Uint32(payload[4:8])
atomic.StoreUint32(&track.lastTimeRTP, timestamp)
atomic.StoreInt64(&track.lastTimeNTP, time.Now().Unix())
ssrc := binary.BigEndian.Uint32(payload[8:12])
atomic.StoreUint32(&track.lastSSRC, ssrc)
}
sp.path.OnSPFrame(trackID, streamType, payload)

47
main_clientrtmp_test.go

@ -4,9 +4,6 @@ import ( @@ -4,9 +4,6 @@ import (
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/stretchr/testify/require"
)
@ -217,47 +214,3 @@ func TestClientRTMPAuthFail(t *testing.T) { @@ -217,47 +214,3 @@ func TestClientRTMPAuthFail(t *testing.T) {
require.NotEqual(t, 0, cnt2.wait())
})
}
func TestClientRTMPRTPInfo(t *testing.T) {
p, ok := testProgram("hlsDisable: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideoaudio.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + ":1935/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: (*dest.RTPInfo())[0].SequenceNumber,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
}).String(),
SequenceNumber: (*dest.RTPInfo())[1].SequenceNumber,
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
}

165
main_clientrtsp_test.go

@ -603,7 +603,55 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) { @@ -603,7 +603,55 @@ func TestClientRTSPNonCompliantFrameSize(t *testing.T) {
})
}
func TestClientRTSPRTPInfo(t *testing.T) {
func TestClientRTSPAdditionalInfos(t *testing.T) {
getInfos := func() (*headers.RTPInfo, []*uint32, error) {
u, err := base.ParseURL("rtsp://" + ownDockerIP + ":8554/teststream")
if err != nil {
return nil, nil, err
}
conn, err := gortsplib.Dial(u.Scheme, u.Host)
if err != nil {
return nil, nil, err
}
defer conn.Close()
tracks, _, err := conn.Describe(u)
if err != nil {
return nil, nil, err
}
ssrcs := make([]*uint32, len(tracks))
for i, t := range tracks {
res, err := conn.Setup(headers.TransportModePlay, t, 0, 0)
if err != nil {
return nil, nil, err
}
var th headers.Transport
err = th.Read(res.Header["Transport"])
if err != nil {
return nil, nil, err
}
ssrcs[i] = th.SSRC
}
res, err := conn.Play()
if err != nil {
return nil, nil, err
}
var ri headers.RTPInfo
err = ri.Read(res.Header["RTP-Info"])
if err != nil {
return nil, nil, err
}
return &ri, ssrcs, nil
}
p, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n")
require.Equal(t, true, ok)
@ -637,26 +685,29 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -637,26 +685,29 @@ func TestClientRTSPRTPInfo(t *testing.T) {
err = source.WriteFrame(track1.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
func() {
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
}, dest.RTPInfo())
}()
rtpInfo, ssrcs, err := getInfos()
require.NoError(t, err)
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
nil,
}, ssrcs)
pkt = rtp.Packet{
Header: rtp.Header{
@ -664,7 +715,7 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -664,7 +715,7 @@ func TestClientRTSPRTPInfo(t *testing.T) {
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 96342362,
SSRC: 536474323,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
@ -675,38 +726,44 @@ func TestClientRTSPRTPInfo(t *testing.T) { @@ -675,38 +726,44 @@ func TestClientRTSPRTPInfo(t *testing.T) {
err = source.WriteFrame(track2.ID, gortsplib.StreamTypeRTP, buf)
require.NoError(t, err)
func() {
dest, err := gortsplib.DialRead("rtsp://" + ownDockerIP + ":8554/teststream")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
}()
rtpInfo, ssrcs, err = getInfos()
require.NoError(t, err)
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(556)
return &v
}(),
Timestamp: (*rtpInfo)[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: ownDockerIP + ":8554",
Path: "/teststream/trackID=1",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*rtpInfo)[1].Timestamp,
},
}, rtpInfo)
require.Equal(t, []*uint32{
func() *uint32 {
v := uint32(96342362)
return &v
}(),
func() *uint32 {
v := uint32(536474323)
return &v
}(),
}, ssrcs)
}
func TestClientRTSPRedirect(t *testing.T) {

47
main_sourcertmp_test.go

@ -4,9 +4,6 @@ import ( @@ -4,9 +4,6 @@ import (
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/stretchr/testify/require"
)
@ -59,47 +56,3 @@ func TestSourceRTMP(t *testing.T) { @@ -59,47 +56,3 @@ func TestSourceRTMP(t *testing.T) {
})
}
}
func TestSourceRTMPRTPInfo(t *testing.T) {
cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{})
require.NoError(t, err)
defer cnt1.close()
cnt2, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://" + cnt1.ip() + "/stream/test",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
p, ok := testProgram("paths:\n" +
" proxied:\n" +
" source: rtmp://" + cnt1.ip() + "/stream/test\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
dest, err := gortsplib.DialRead("rtsp://127.0.1.2:8554/proxied")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=0",
}).String(),
SequenceNumber: (*dest.RTPInfo())[0].SequenceNumber,
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
}, dest.RTPInfo())
}

214
main_sourcertsp_test.go

@ -1,17 +1,10 @@ @@ -1,17 +1,10 @@
package main
import (
"bufio"
"net"
"os"
"strings"
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
@ -123,210 +116,3 @@ func TestSourceRTSP(t *testing.T) { @@ -123,210 +116,3 @@ func TestSourceRTSP(t *testing.T) {
})
}
}
func TestSourceRTSPRTPInfo(t *testing.T) {
l, err := net.Listen("tcp", "localhost:8555")
require.NoError(t, err)
defer l.Close()
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
conn, err := l.Accept()
require.NoError(t, err)
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
var req base.Request
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
track1, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
track2, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: gortsplib.Tracks{track1, track2}.Write(),
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
var th headers.Transport
err = th.Read(req.Header["Transport"])
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: gortsplib.StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
ClientPorts: th.ClientPorts,
InterleavedIDs: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
err = th.Read(req.Header["Transport"])
require.NoError(t, err)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Protocol: gortsplib.StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
ClientPorts: th.ClientPorts,
InterleavedIDs: &[2]int{2, 3},
}.Write(),
},
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
Header: base.Header{},
}.Write(bconn.Writer)
require.NoError(t, err)
pkt := rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 34254,
Timestamp: 156457686,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 1,
StreamType: gortsplib.StreamTypeRTP,
Payload: buf,
}.Write(bconn.Writer)
require.NoError(t, err)
pkt = rtp.Packet{
Header: rtp.Header{
Version: 0x80,
PayloadType: 96,
SequenceNumber: 87,
Timestamp: 756436454,
SSRC: 96342362,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
buf, err = pkt.Marshal()
require.NoError(t, err)
err = base.InterleavedFrame{
TrackID: 0,
StreamType: gortsplib.StreamTypeRTP,
Payload: buf,
}.Write(bconn.Writer)
require.NoError(t, err)
err = req.Read(bconn.Reader)
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = base.Response{
StatusCode: base.StatusOK,
}.Write(bconn.Writer)
require.NoError(t, err)
conn.Close()
}()
p1, ok := testProgram("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://localhost:8555/stream\n" +
" sourceProtocol: tcp\n")
require.Equal(t, true, ok)
defer p1.close()
time.Sleep(1000 * time.Millisecond)
dest, err := gortsplib.DialRead("rtsp://127.0.1.2:8554/proxied")
require.NoError(t, err)
defer dest.Close()
require.Equal(t, &headers.RTPInfo{
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=0",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(87)
return &v
}(),
Timestamp: (*dest.RTPInfo())[0].Timestamp,
},
&headers.RTPInfoEntry{
URL: (&base.URL{
Scheme: "rtsp",
Host: "127.0.1.2:8554",
Path: "/proxied/trackID=1",
}).String(),
SequenceNumber: func() *uint16 {
v := uint16(34254)
return &v
}(),
Timestamp: (*dest.RTPInfo())[1].Timestamp,
},
}, dest.RTPInfo())
require.Less(t, uint32(756436454), *(*dest.RTPInfo())[0].Timestamp)
require.Less(t, uint32(156457686), *(*dest.RTPInfo())[1].Timestamp)
}

Loading…
Cancel
Save