Browse Source

speed up tests

pull/1364/head
aler9 3 years ago
parent
commit
535cbe41e8
  1. 91
      internal/core/formatprocessor_generic_test.go
  2. 450
      internal/core/formatprocessor_h264_test.go
  3. 31
      internal/core/rtsp_server_test.go
  4. 603
      internal/core/rtsp_source_test.go
  5. 2
      internal/core/streamformat.go

91
internal/core/formatprocessor_generic_test.go

@ -0,0 +1,91 @@ @@ -0,0 +1,91 @@
package core
import (
"testing"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/url"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestFormatProcessorRemovePadding(t *testing.T) {
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
forma := &format.Generic{
PayloadTyp: 96,
RTPMap: "private/90000",
}
forma.Init()
medi := &media.Media{
Type: media.TypeApplication,
Formats: []format.Format{forma},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/stream",
media.Medias{medi})
require.NoError(t, err)
defer source.Close()
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/stream")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
packetRecv := make(chan struct{})
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)
close(packetRecv)
})
_, err = c.Play(nil)
require.NoError(t, err)
source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
PaddingSize: 20,
})
<-packetRecv
}

450
internal/core/formatprocessor_h264_test.go

@ -0,0 +1,450 @@ @@ -0,0 +1,450 @@
package core
import (
"bytes"
"net"
"testing"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/base"
"github.com/aler9/gortsplib/v2/pkg/codecs/h265"
"github.com/aler9/gortsplib/v2/pkg/conn"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/headers"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/url"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestFormatProcessorDynamicParams(t *testing.T) {
checkTrack := func(t *testing.T, forma format.Format) {
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/stream")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, _, _, err := c.Describe(u)
require.NoError(t, err)
forma1 := medias[0].Formats[0]
require.Equal(t, forma, forma1)
}
for _, ca := range []string{"h264", "h265"} {
t.Run(ca, func(t *testing.T) {
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
formah264 := &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}
formah265 := &format.H265{
PayloadTyp: 96,
}
var forma format.Format
if ca == "h264" {
forma = formah264
} else {
forma = formah265
}
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{forma},
}
source := gortsplib.Client{}
err := source.StartRecording(
"rtsp://localhost:8554/stream",
media.Medias{medi})
require.NoError(t, err)
defer source.Close()
if ca == "h264" {
enc := formah264.CreateEncoder()
pkts, err := enc.Encode([][]byte{{7, 1, 2, 3}}, 0) // SPS
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{8}}, 0) // PPS
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
SPS: []byte{7, 1, 2, 3},
PPS: []byte{8},
})
pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
SPS: []byte{7, 4, 5, 6},
PPS: []byte{8, 1},
})
} else {
enc := formah265.CreateEncoder()
pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H265{
PayloadTyp: 96,
VPS: []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3},
SPS: []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6},
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9},
})
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18}}, 0)
require.NoError(t, err)
source.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H265{
PayloadTyp: 96,
VPS: []byte{byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12},
SPS: []byte{byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15},
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18},
})
}
})
}
}
func TestFormatProcessorOversizedPackets(t *testing.T) {
for _, ca := range []string{"h264", "h265"} {
t.Run(ca, func(t *testing.T) {
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
nconn, err := net.Dial("tcp", "localhost:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
var forma format.Format
if ca == "h264" {
forma = &format.H264{
PayloadTyp: 96,
SPS: []byte{0x01, 0x02, 0x03, 0x04},
PPS: []byte{0x01, 0x02, 0x03, 0x04},
PacketizationMode: 1,
}
} else {
forma = &format.H265{
PayloadTyp: 96,
VPS: []byte{byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12},
SPS: []byte{byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15},
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18},
}
}
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{forma},
}
medias := media.Medias{medi}
medias.SetControls()
res, err := writeReqReadRes(conn, base.Request{
Method: base.Announce,
URL: mustParseURL("rtsp://localhost:8554/stream"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: mustMarshalSDP(medias.Marshal(false)),
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var sx headers.Session
inTH := &headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModeRecord
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: &[2]int{0, 1},
}
res, err = writeReqReadRes(conn, base.Request{
Method: base.Setup,
URL: mustParseURL("rtsp://localhost:8554/stream/" + medias[0].Control),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": inTH.Marshal(),
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
err = sx.Unmarshal(res.Header["Session"])
require.NoError(t, err)
res, err = writeReqReadRes(conn, base.Request{
Method: base.Record,
URL: mustParseURL("rtsp://localhost:8554/stream"),
Header: base.Header{
"CSeq": base.HeaderValue{"3"},
"Session": base.HeaderValue{sx.Session},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/stream")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
packetRecv := make(chan struct{})
i := 0
var expected []*rtp.Packet
if ca == "h264" {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x80}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364)...),
[]byte{0x01, 0x02}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x40, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 136)...,
),
},
}
} else {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x63, 0x02, 0x80, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363)...),
[]byte{0x01, 0x02, 0x03}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x63, 0x02, 0x40, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 135)...,
),
},
}
}
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
require.Equal(t, expected[i], pkt)
i++
if i >= len(expected) {
close(packetRecv)
}
})
_, err = c.Play(nil)
require.NoError(t, err)
var tosend []*rtp.Packet
if ca == "h264" {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: append([]byte{0x1c, 0b10000000}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4)...),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
},
}
} else {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4),
},
}
}
for _, pkt := range tosend {
byts, _ := pkt.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 2048))
require.NoError(t, err)
}
<-packetRecv
})
}
}

31
internal/core/rtsp_server_test.go

@ -4,12 +4,43 @@ import ( @@ -4,12 +4,43 @@ import (
"testing"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/base"
"github.com/aler9/gortsplib/v2/pkg/conn"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/sdp"
"github.com/aler9/gortsplib/v2/pkg/url"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mustParseURL(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return u
}
func mustMarshalSDP(sdp *sdp.SessionDescription) []byte {
byts, err := sdp.Marshal()
if err != nil {
panic(err)
}
return byts
}
func writeReqReadRes(
conn *conn.Conn,
req base.Request,
) (*base.Response, error) {
err := conn.WriteRequest(&req)
if err != nil {
return nil, err
}
return conn.ReadResponse()
}
func TestRTSPServerAuth(t *testing.T) {
for _, ca := range []string{
"internal",

603
internal/core/rtsp_source_test.go

@ -1,21 +1,14 @@ @@ -1,21 +1,14 @@
package core
import (
"bytes"
"crypto/tls"
"net"
"os"
"strings"
"testing"
"time"
"github.com/aler9/gortsplib/v2"
"github.com/aler9/gortsplib/v2/pkg/auth"
"github.com/aler9/gortsplib/v2/pkg/base"
"github.com/aler9/gortsplib/v2/pkg/codecs/h265"
"github.com/aler9/gortsplib/v2/pkg/conn"
"github.com/aler9/gortsplib/v2/pkg/format"
"github.com/aler9/gortsplib/v2/pkg/headers"
"github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/url"
"github.com/pion/rtp"
@ -236,599 +229,3 @@ func TestRTSPSourceNoPassword(t *testing.T) { @@ -236,599 +229,3 @@ func TestRTSPSourceNoPassword(t *testing.T) {
<-done
}
func TestRTSPSourceDynamicH264Params(t *testing.T) {
checkTrack := func(t *testing.T, forma format.Format) {
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, _, _, err := c.Describe(u)
require.NoError(t, err)
forma1 := medias[0].Formats[0]
require.Equal(t, forma, forma1)
}
t.Run("h264", func(t *testing.T) {
forma := &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{forma},
}
stream := gortsplib.NewServerStream(media.Medias{medi})
defer stream.Close()
s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8555",
}
err := s.Start()
require.NoError(t, err)
defer s.Wait()
defer s.Close()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n")
require.Equal(t, true, ok)
defer p.Close()
time.Sleep(1 * time.Second)
enc := forma.CreateEncoder()
pkts, err := enc.Encode([][]byte{{7, 1, 2, 3}}, 0) // SPS
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{8}}, 0) // PPS
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
SPS: []byte{7, 1, 2, 3},
PPS: []byte{8},
})
pkts, err = enc.Encode([][]byte{{7, 4, 5, 6}}, 0) // SPS
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{8, 1}}, 0) // PPS
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
SPS: []byte{7, 4, 5, 6},
PPS: []byte{8, 1},
})
})
t.Run("h265", func(t *testing.T) {
forma := &format.H265{
PayloadTyp: 96,
}
medi := &media.Media{
Type: media.TypeVideo,
Formats: []format.Format{forma},
}
stream := gortsplib.NewServerStream(media.Medias{medi})
defer stream.Close()
s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8555",
}
err := s.Start()
require.NoError(t, err)
defer s.Wait()
defer s.Close()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n")
require.Equal(t, true, ok)
defer p.Close()
time.Sleep(1 * time.Second)
enc := forma.CreateEncoder()
pkts, err := enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H265{
PayloadTyp: 96,
VPS: []byte{byte(h265.NALUType_VPS_NUT) << 1, 1, 2, 3},
SPS: []byte{byte(h265.NALUType_SPS_NUT) << 1, 4, 5, 6},
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 7, 8, 9},
})
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
pkts, err = enc.Encode([][]byte{{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18}}, 0)
require.NoError(t, err)
stream.WritePacketRTP(medi, pkts[0])
checkTrack(t, &format.H265{
PayloadTyp: 96,
VPS: []byte{byte(h265.NALUType_VPS_NUT) << 1, 10, 11, 12},
SPS: []byte{byte(h265.NALUType_SPS_NUT) << 1, 13, 14, 15},
PPS: []byte{byte(h265.NALUType_PPS_NUT) << 1, 16, 17, 18},
})
})
}
func TestRTSPSourceRemovePadding(t *testing.T) {
medi := testMediaH264
stream := gortsplib.NewServerStream(media.Medias{medi})
defer stream.Close()
s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8555",
}
err := s.Start()
require.NoError(t, err)
defer s.Wait()
defer s.Close()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n")
require.Equal(t, true, ok)
defer p.Close()
time.Sleep(1 * time.Second)
packetRecv := make(chan struct{})
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}, pkt)
close(packetRecv)
})
_, err = c.Play(nil)
require.NoError(t, err)
stream.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
PaddingSize: 20,
})
<-packetRecv
}
func TestRTSPSourceOversizedPackets(t *testing.T) {
for _, ca := range []string{"h264", "h265"} {
t.Run(ca, func(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:8555")
require.NoError(t, err)
defer l.Close()
connected := make(chan struct{})
serverDone := make(chan struct{})
defer func() { <-serverDone }()
go func() {
defer close(serverDone)
nconn, err := l.Accept()
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
req, err := conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Options, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Public": base.HeaderValue{strings.Join([]string{
string(base.Describe),
string(base.Setup),
string(base.Play),
}, ", ")},
},
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Describe, req.Method)
medias := media.Medias{testMediaH264}
byts, _ := medias.Marshal(false).Marshal()
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: byts,
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Setup, req.Method)
var inTH headers.Transport
err = inTH.Unmarshal(req.Header["Transport"])
require.NoError(t, err)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
Header: base.Header{
"Transport": headers.Transport{
Delivery: func() *headers.TransportDelivery {
v := headers.TransportDeliveryUnicast
return &v
}(),
Protocol: headers.TransportProtocolTCP,
InterleavedIDs: inTH.InterleavedIDs,
}.Marshal(),
},
})
require.NoError(t, err)
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Play, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
<-connected
var tosend []*rtp.Packet
if ca == "h264" {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: append([]byte{0x1c, 0b10000000}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4)...),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x1c, 0b01000000, 0x01, 0x02, 0x03, 0x04},
},
}
} else {
tosend = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
Padding: true,
},
Payload: bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 2000/4),
},
}
}
for _, pkt := range tosend {
byts, _ = pkt.Marshal()
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 0,
Payload: byts,
}, make([]byte, 2048))
require.NoError(t, err)
}
req, err = conn.ReadRequest()
require.NoError(t, err)
require.Equal(t, base.Teardown, req.Method)
err = conn.WriteResponse(&base.Response{
StatusCode: base.StatusOK,
})
require.NoError(t, err)
}()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"webrtcDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n" +
" sourceProtocol: tcp\n")
require.Equal(t, true, ok)
defer p.Close()
time.Sleep(1 * time.Second)
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
require.NoError(t, err)
packetRecv := make(chan struct{})
i := 0
var expected []*rtp.Packet
if ca == "h264" {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x80}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364)...),
[]byte{0x01, 0x02}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x40, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 136)...,
),
},
}
} else {
expected = []*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
append([]byte{0x1c, 0x81, 0x02, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363)...),
[]byte{0x01, 0x02, 0x03}...,
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 125,
Timestamp: 45343,
SSRC: 563423,
CSRC: []uint32{},
},
Payload: append(
[]byte{0x1c, 0x41, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 135)...,
),
},
}
}
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
require.Equal(t, expected[i], pkt)
i++
if i >= len(expected) {
close(packetRecv)
}
})
_, err = c.Play(nil)
require.NoError(t, err)
close(connected)
<-packetRecv
})
}
}

2
internal/core/streamformat.go

@ -57,7 +57,7 @@ func (sf *streamFormat) writeData(s *stream, medi *media.Media, data data) error @@ -57,7 +57,7 @@ func (sf *streamFormat) writeData(s *stream, medi *media.Media, data data) error
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, data.getNTP())
}
// forward data to non-RTSP readers
// forward decoded frames to non-RTSP readers
for _, cb := range sf.nonRTSPReaders {
cb(data)
}

Loading…
Cancel
Save