Browse Source

support RTMPS (#1089)

pull/1128/head
Alessandro Ros 3 years ago committed by aler9
parent
commit
820ba067f2
  1. 54
      README.md
  2. 12
      internal/conf/conf.go
  3. 6
      internal/conf/path.go
  4. 38
      internal/core/api.go
  5. 61
      internal/core/core.go
  6. 6
      internal/core/core_test.go
  7. 1
      internal/core/path.go
  8. 19
      internal/core/rtmp_conn.go
  9. 35
      internal/core/rtmp_server.go
  10. 385
      internal/core/rtmp_server_test.go
  11. 33
      internal/core/rtmp_source.go
  12. 171
      internal/core/rtmp_source_test.go
  13. 30
      internal/core/rtsp_server.go
  14. 2
      internal/core/rtsp_source_test.go
  15. 4
      internal/core/source_static.go
  16. 21
      rtsp-simple-server.yml
  17. 8
      testimages/nginx-rtmp/Dockerfile
  18. 23
      testimages/nginx-rtmp/nginx.conf

54
README.md

@ -5,11 +5,11 @@ @@ -5,11 +5,11 @@
_rtsp-simple-server_ is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams through various protocols:
|protocol|description|publish|read|proxy|
|--------|-----------|-------|----|-----|
|RTSP|fastest way to publish and read streams|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|RTMP|allows to interact with legacy software|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|Low-Latency HLS|allows to embed streams into a web page|:x:|:heavy_check_mark:|:heavy_check_mark:|
|protocol|description|variants|publish|read|proxy|
|--------|-----------|--------|-------|----|-----|
|RTSP|fastest way to publish and read streams|RTSP, RTSPS|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|RTMP|allows to interact with legacy software|RTMP, RTMPS|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|HLS|allows to embed streams into a web page|Low-Latency HLS, standard HLS|:x:|:heavy_check_mark:|:heavy_check_mark:|
Features:
@ -64,7 +64,7 @@ Features: @@ -64,7 +64,7 @@ Features:
* [Read from the server](#read-from-the-server)
* [From VLC and Ubuntu](#from-vlc-and-ubuntu)
* [RTSP protocol](#rtsp-protocol)
* [RTSP general usage](#rtsp-general-usage)
* [General usage](#general-usage)
* [TCP transport](#tcp-transport)
* [UDP-multicast transport](#udp-multicast-transport)
* [Encryption](#encryption)
@ -72,9 +72,10 @@ Features: @@ -72,9 +72,10 @@ Features:
* [Fallback stream](#fallback-stream)
* [Corrupted frames](#corrupted-frames)
* [RTMP protocol](#rtmp-protocol)
* [RTMP general usage](#rtmp-general-usage)
* [General usage](#general-usage-1)
* [Encryption](#encryption-1)
* [HLS protocol](#hls-protocol)
* [HLS general usage](#hls-general-usage)
* [General usage](#general-usage-2)
* [Embedding](#embedding)
* [Low-Latency variant](#low-latency-variant)
* [Decreasing latency](#decreasing-latency)
@ -574,7 +575,7 @@ make -j$(nproc) @@ -574,7 +575,7 @@ make -j$(nproc)
sudo make install
```
Videos can then be published with `VideoWriter`:
Videos can be published with `VideoWriter`:
```python
import cv2
@ -627,7 +628,7 @@ vlc rtsp://localhost:8554/mystream @@ -627,7 +628,7 @@ vlc rtsp://localhost:8554/mystream
## RTSP protocol
### RTSP general usage
### General usage
RTSP is a standardized protocol that allows to publish and read streams; in particular, it supports different underlying transport protocols, that are chosen by clients during the handshake with the server:
@ -709,7 +710,7 @@ serverKey: server.key @@ -709,7 +710,7 @@ serverKey: server.key
serverCert: server.crt
```
Streams can then be published and read with the `rtsps` scheme and the `8322` port:
Streams can be published and read with the `rtsps` scheme and the `8322` port:
```
ffmpeg -i rtsps://ip:8322/...
@ -771,7 +772,7 @@ In some scenarios, when reading RTSP from the server, decoded frames can be corr @@ -771,7 +772,7 @@ In some scenarios, when reading RTSP from the server, decoded frames can be corr
## RTMP protocol
### RTMP general usage
### General usage
RTMP is a protocol that allows to read and publish streams, but is less versatile and less efficient than RTSP (doesn't support UDP, encryption, doesn't support most RTSP codecs, doesn't support feedback mechanism). It is used when there's need of publishing or reading streams from a software that supports only RTMP (for instance, OBS Studio and DJI drones).
@ -795,9 +796,34 @@ Credentials can be provided by appending to the URL the `user` and `pass` parame @@ -795,9 +796,34 @@ Credentials can be provided by appending to the URL the `user` and `pass` parame
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f flv rtmp://localhost:8554/mystream?user=myuser&pass=mypass
```
### Encryption
RTMP connections can be encrypted with TLS, obtaining the RTMPS protocol. A TLS certificate is needed and can be generated with OpenSSL:
```
openssl genrsa -out server.key 2048
openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
```
Edit `rtsp-simple-server.yml`, and set the `rtmpEncryption`, `rtmpServerKey` and `rtmpServerCert` parameters:
```yml
rtmpEncryption: optional
rtmpServerKey: server.key
rtmpServerCert: server.crt
```
Streams can be published and read with the `rtmps` scheme and the `1937` port:
```
rtmps://localhost:1937/...
```
Please be aware that RTMPS is currently unsupported by _VLC_, _FFmpeg_ and _GStreamer_. However, you can use a proxy like [stunnel](https://www.stunnel.org/) or [nginx](https://nginx.org/) to allow RTMP clients to access RTMPS resources.
## HLS protocol
### HLS general usage
### General usage
HLS is a protocol that allows to embed live streams into web pages. It works by splitting streams into segments, and by serving these segments with the HTTP protocol. Every stream published to the server can be accessed by visiting:
@ -849,7 +875,7 @@ hlsServerKey: server.key @@ -849,7 +875,7 @@ hlsServerKey: server.key
hlsServerCert: server.crt
```
Every stream published to the server can then be read with LL-HLS by visiting:
Every stream published to the server can be read with LL-HLS by visiting:
```
https://localhost:8888/mystream

12
internal/conf/conf.go

@ -200,8 +200,12 @@ type Conf struct { @@ -200,8 +200,12 @@ type Conf struct {
AuthMethods AuthMethods `json:"authMethods"`
// RTMP
RTMPDisable bool `json:"rtmpDisable"`
RTMPAddress string `json:"rtmpAddress"`
RTMPDisable bool `json:"rtmpDisable"`
RTMPAddress string `json:"rtmpAddress"`
RTMPEncryption Encryption `json:"rtmpEncryption"`
RTMPSAddress string `json:"rtmpsAddress"`
RTMPServerKey string `json:"rtmpServerKey"`
RTMPServerCert string `json:"rtmpServerCert"`
// HLS
HLSDisable bool `json:"hlsDisable"`
@ -354,6 +358,10 @@ func (conf *Conf) CheckAndFillMissing() error { @@ -354,6 +358,10 @@ func (conf *Conf) CheckAndFillMissing() error {
conf.RTMPAddress = ":1935"
}
if conf.RTMPSAddress == "" {
conf.RTMPSAddress = ":1936"
}
if conf.HLSAddress == "" {
conf.HLSAddress = ":8888"
}

6
internal/conf/path.go

@ -118,7 +118,8 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error { @@ -118,7 +118,8 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error {
return fmt.Errorf("'%s' is not a valid RTSP URL", pconf.Source)
}
case strings.HasPrefix(pconf.Source, "rtmp://"):
case strings.HasPrefix(pconf.Source, "rtmp://") ||
strings.HasPrefix(pconf.Source, "rtmps://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a RTMP source; use another path")
}
@ -127,9 +128,6 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error { @@ -127,9 +128,6 @@ func (pconf *PathConf) checkAndFillMissing(conf *Conf, name string) error {
if err != nil {
return fmt.Errorf("'%s' is not a valid RTMP URL", pconf.Source)
}
if u.Scheme != "rtmp" {
return fmt.Errorf("'%s' is not a valid RTMP URL", pconf.Source)
}
if u.User != nil {
pass, _ := u.User.Password()

38
internal/core/api.go

@ -76,8 +76,12 @@ func loadConfData(ctx *gin.Context) (interface{}, error) { @@ -76,8 +76,12 @@ func loadConfData(ctx *gin.Context) (interface{}, error) {
AuthMethods *conf.AuthMethods `json:"authMethods"`
// RTMP
RTMPDisable *bool `json:"rtmpDisable"`
RTMPAddress *string `json:"rtmpAddress"`
RTMPDisable *bool `json:"rtmpDisable"`
RTMPAddress *string `json:"rtmpAddress"`
RTMPEncryption *conf.Encryption `json:"rtmpEncryption"`
RTMPSAddress *string `json:"rtmpsAddress"`
RTMPServerKey *string `json:"rtmpServerKey"`
RTMPServerCert *string `json:"rtmpServerCert"`
// HLS
HLSDisable *bool `json:"hlsDisable"`
@ -181,6 +185,7 @@ type api struct { @@ -181,6 +185,7 @@ type api struct {
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
rtmpsServer apiRTMPServer
hlsServer apiHLSServer
parent apiParent
@ -195,6 +200,7 @@ func newAPI( @@ -195,6 +200,7 @@ func newAPI(
rtspServer apiRTSPServer,
rtspsServer apiRTSPServer,
rtmpServer apiRTMPServer,
rtmpsServer apiRTMPServer,
hlsServer apiHLSServer,
parent apiParent,
) (*api, error) {
@ -209,6 +215,7 @@ func newAPI( @@ -209,6 +215,7 @@ func newAPI(
rtspServer: rtspServer,
rtspsServer: rtspsServer,
rtmpServer: rtmpServer,
rtmpsServer: rtmpsServer,
hlsServer: hlsServer,
parent: parent,
}
@ -241,6 +248,11 @@ func newAPI( @@ -241,6 +248,11 @@ func newAPI(
group.POST("/v1/rtmpconns/kick/:id", a.onRTMPConnsKick)
}
if !interfaceIsEmpty(a.rtmpsServer) {
group.GET("/v1/rtmpsconns/list", a.onRTMPSConnsList)
group.POST("/v1/rtmpsconns/kick/:id", a.onRTMPSConnsKick)
}
if !interfaceIsEmpty(a.hlsServer) {
group.GET("/v1/hlsmuxers/list", a.onHLSMuxersList)
}
@ -516,6 +528,28 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { @@ -516,6 +528,28 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
ctx.Status(http.StatusOK)
}
func (a *api) onRTMPSConnsList(ctx *gin.Context) {
res := a.rtmpsServer.apiConnsList(rtmpServerAPIConnsListReq{})
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTMPSConnsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtmpsServer.apiConnsKick(rtmpServerAPIConnsKickReq{id: id})
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
ctx.Status(http.StatusOK)
}
func (a *api) onHLSMuxersList(ctx *gin.Context) {
res := a.hlsServer.apiHLSMuxersList(hlsServerAPIMuxersListReq{})
if res.err != nil {

61
internal/core/core.go

@ -35,6 +35,7 @@ type Core struct { @@ -35,6 +35,7 @@ type Core struct {
rtspServer *rtspServer
rtspsServer *rtspServer
rtmpServer *rtmpServer
rtmpsServer *rtmpServer
hlsServer *hlsServer
api *api
confWatcher *confwatcher.ConfWatcher
@ -304,7 +305,9 @@ func (p *Core) createResources(initial bool) error { @@ -304,7 +305,9 @@ func (p *Core) createResources(initial bool) error {
}
}
if !p.conf.RTMPDisable {
if !p.conf.RTMPDisable &&
(p.conf.RTMPEncryption == conf.EncryptionNo ||
p.conf.RTMPEncryption == conf.EncryptionOptional) {
if p.rtmpServer == nil {
p.rtmpServer, err = newRTMPServer(
p.ctx,
@ -313,6 +316,36 @@ func (p *Core) createResources(initial bool) error { @@ -313,6 +316,36 @@ func (p *Core) createResources(initial bool) error {
p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
false,
"",
"",
p.conf.RTSPAddress,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
p.externalCmdPool,
p.metrics,
p.pathManager,
p)
if err != nil {
return err
}
}
}
if !p.conf.RTMPDisable &&
(p.conf.RTMPEncryption == conf.EncryptionStrict ||
p.conf.RTMPEncryption == conf.EncryptionOptional) {
if p.rtmpsServer == nil {
p.rtmpsServer, err = newRTMPServer(
p.ctx,
p.conf.ExternalAuthenticationURL,
p.conf.RTMPSAddress,
p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
true,
p.conf.RTMPServerCert,
p.conf.RTMPServerKey,
p.conf.RTSPAddress,
p.conf.RunOnConnect,
p.conf.RunOnConnectRestart,
@ -362,6 +395,7 @@ func (p *Core) createResources(initial bool) error { @@ -362,6 +395,7 @@ func (p *Core) createResources(initial bool) error {
p.rtspServer,
p.rtspsServer,
p.rtmpServer,
p.rtmpsServer,
p.hlsServer,
p)
if err != nil {
@ -463,6 +497,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -463,6 +497,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeRTMPServer := false
if newConf == nil ||
newConf.RTMPDisable != p.conf.RTMPDisable ||
newConf.RTMPEncryption != p.conf.RTMPEncryption ||
newConf.RTMPAddress != p.conf.RTMPAddress ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
@ -476,6 +511,25 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -476,6 +511,25 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
closeRTMPServer = true
}
closeRTMPSServer := false
if newConf == nil ||
newConf.RTMPDisable != p.conf.RTMPDisable ||
newConf.RTMPEncryption != p.conf.RTMPEncryption ||
newConf.RTMPSAddress != p.conf.RTMPSAddress ||
newConf.ExternalAuthenticationURL != p.conf.ExternalAuthenticationURL ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
newConf.RTMPServerCert != p.conf.RTMPServerCert ||
newConf.RTMPServerKey != p.conf.RTMPServerKey ||
newConf.RTSPAddress != p.conf.RTSPAddress ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
closeMetrics ||
closePathManager {
closeRTMPSServer = true
}
closeHLSServer := false
if newConf == nil ||
newConf.HLSDisable != p.conf.HLSDisable ||
@ -544,6 +598,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -544,6 +598,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
p.hlsServer = nil
}
if closeRTMPSServer && p.rtmpsServer != nil {
p.rtmpsServer.close()
p.rtmpsServer = nil
}
if closeRTMPServer && p.rtmpServer != nil {
p.rtmpServer.close()
p.rtmpServer = nil

6
internal/core/core_test.go

@ -119,12 +119,6 @@ func (c *container) wait() int { @@ -119,12 +119,6 @@ func (c *container) wait() int {
return int(code)
}
func (c *container) ip() string {
out, _ := exec.Command("docker", "inspect", "rtsp-simple-server-test-"+c.name,
"-f", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}").Output()
return string(out[:len(out)-1])
}
func writeTempFile(byts []byte) (string, error) {
tmpf, err := ioutil.TempFile(os.TempDir(), "rtsp-")
if err != nil {

1
internal/core/path.go

@ -334,6 +334,7 @@ func (pa *path) hasStaticSource() bool { @@ -334,6 +334,7 @@ func (pa *path) hasStaticSource() bool {
return strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://") ||
strings.HasPrefix(pa.conf.Source, "rtmp://") ||
strings.HasPrefix(pa.conf.Source, "rtmps://") ||
strings.HasPrefix(pa.conf.Source, "http://") ||
strings.HasPrefix(pa.conf.Source, "https://") ||
pa.conf.Source == "rpiCamera"

19
internal/core/rtmp_conn.go

@ -288,14 +288,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -288,14 +288,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
return fmt.Errorf("the stream doesn't contain an H264 track or an AAC track")
}
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err := c.conn.WriteTracks(videoTrack, audioTrack)
if err != nil {
return err
}
c.ringBuffer, _ = ringbuffer.New(uint64(c.readBufferCount))
go func() {
<-ctx.Done()
c.ringBuffer.Close()
@ -309,6 +302,12 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -309,6 +302,12 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
c.path.Name(),
sourceTrackInfo(res.stream.tracks()))
c.nconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err := c.conn.WriteTracks(videoTrack, audioTrack)
if err != nil {
return err
}
if c.path.Conf().RunOnRead != "" {
c.log(logger.Info, "runOnRead command started")
onReadCmd := externalcmd.NewCmd(
@ -515,9 +514,6 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -515,9 +514,6 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
c.state = rtmpConnStatePublish
c.stateMutex.Unlock()
// disable write deadline
c.nconn.SetWriteDeadline(time.Time{})
rres := c.path.publisherRecord(pathPublisherRecordReq{
author: c,
tracks: tracks,
@ -531,6 +527,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -531,6 +527,9 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
c.path.Name(),
sourceTrackInfo(tracks))
// disable write deadline
c.nconn.SetWriteDeadline(time.Time{})
for {
c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout)))
msg, err := c.conn.ReadMessage()

35
internal/core/rtmp_server.go

@ -3,6 +3,7 @@ package core @@ -3,6 +3,7 @@ package core
import (
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"net"
"strconv"
@ -51,6 +52,7 @@ type rtmpServer struct { @@ -51,6 +52,7 @@ type rtmpServer struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
isTLS bool
rtspAddress string
runOnConnect string
runOnConnectRestart bool
@ -62,7 +64,7 @@ type rtmpServer struct { @@ -62,7 +64,7 @@ type rtmpServer struct {
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
l net.Listener
ln net.Listener
conns map[*rtmpConn]struct{}
// in
@ -78,6 +80,9 @@ func newRTMPServer( @@ -78,6 +80,9 @@ func newRTMPServer(
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
isTLS bool,
serverCert string,
serverKey string,
rtspAddress string,
runOnConnect string,
runOnConnectRestart bool,
@ -86,7 +91,18 @@ func newRTMPServer( @@ -86,7 +91,18 @@ func newRTMPServer(
pathManager *pathManager,
parent rtmpServerParent,
) (*rtmpServer, error) {
l, err := net.Listen("tcp", address)
ln, err := func() (net.Listener, error) {
if !isTLS {
return net.Listen("tcp", address)
}
cert, err := tls.LoadX509KeyPair(serverCert, serverKey)
if err != nil {
return nil, err
}
return tls.Listen("tcp", address, &tls.Config{Certificates: []tls.Certificate{cert}})
}()
if err != nil {
return nil, err
}
@ -101,13 +117,14 @@ func newRTMPServer( @@ -101,13 +117,14 @@ func newRTMPServer(
rtspAddress: rtspAddress,
runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart,
isTLS: isTLS,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
l: l,
ln: ln,
conns: make(map[*rtmpConn]struct{}),
chConnClose: make(chan *rtmpConn),
chAPIConnsList: make(chan rtmpServerAPIConnsListReq),
@ -127,7 +144,13 @@ func newRTMPServer( @@ -127,7 +144,13 @@ func newRTMPServer(
}
func (s *rtmpServer) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[RTMP] "+format, append([]interface{}{}, args...)...)
label := func() string {
if s.isTLS {
return "RTMPS"
}
return "RTMP"
}()
s.parent.Log(level, "[%s] "+format, append([]interface{}{label}, args...)...)
}
func (s *rtmpServer) close() {
@ -146,7 +169,7 @@ func (s *rtmpServer) run() { @@ -146,7 +169,7 @@ func (s *rtmpServer) run() {
defer s.wg.Done()
err := func() error {
for {
conn, err := s.l.Accept()
conn, err := s.ln.Accept()
if err != nil {
return err
}
@ -246,7 +269,7 @@ outer: @@ -246,7 +269,7 @@ outer:
s.ctxCancel()
s.l.Close()
s.ln.Close()
if s.metrics != nil {
s.metrics.rtmpServerSet(s)

385
internal/core/rtmp_server_test.go

@ -1,85 +1,143 @@ @@ -1,85 +1,143 @@
package core
package core //nolint:dupl
import (
"io"
"crypto/tls"
"net"
"net/url"
"os"
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/stretchr/testify/require"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
"github.com/aler9/rtsp-simple-server/internal/rtmp/message"
)
func TestRTMPServerPublish(t *testing.T) {
for _, source := range []string{
"videoaudio",
"video",
} {
t.Run(source, func(t *testing.T) {
p, ok := newInstance("hlsDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.close()
func TestRTMPServerPublishRead(t *testing.T) {
for _, ca := range []string{"plain", "tls"} {
t.Run(ca, func(t *testing.T) {
var port string
if ca == "plain" {
port = "1935"
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "empty" + source + ".mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost:1935/test1/test2",
})
p, ok := newInstance("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.close()
} else {
port = "1936"
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := newInstance("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"rtmpEncryption: \"yes\"\n" +
"rtmpServerCert: " + serverCertFpath + "\n" +
"rtmpServerKey: " + serverKeyFpath + "\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.close()
}
u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mystream")
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
nconn1, err := func() (net.Conn, error) {
if ca == "plain" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn1.Close()
conn1 := rtmp.NewConn(nconn1)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://localhost:8554/test1/test2",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
err = conn1.InitializeClient(u, true)
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
})
}
}
func TestRTMPServerRead(t *testing.T) {
p, ok := newInstance("hlsDisable: yes\n" +
"paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
}
time.Sleep(1 * time.Second)
audioTrack := &gortsplib.TrackMPEG4Audio{
PayloadType: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://localhost:1935/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
err = conn1.WriteTracks(videoTrack, audioTrack)
require.NoError(t, err)
nconn2, err := func() (net.Conn, error) {
if ca == "plain" {
return net.Dial("tcp", u.Host)
}
return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true})
}()
require.NoError(t, err)
defer nconn2.Close()
conn2 := rtmp.NewConn(nconn2)
err = conn2.InitializeClient(u, false)
require.NoError(t, err)
videoTrack1, audioTrack2, err := conn2.ReadTracks()
require.NoError(t, err)
require.Equal(t, videoTrack, videoTrack1)
require.Equal(t, audioTrack, audioTrack2)
err = conn1.WriteMessage(&message.MsgVideo{
ChunkStreamID: 6,
MessageStreamID: 0x1000000,
IsKeyFrame: true,
H264Type: flvio.AVC_NALU,
Payload: []byte{0x00, 0x00, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04},
})
require.NoError(t, err)
msg1, err := conn2.ReadMessage()
require.NoError(t, err)
require.Equal(t, &message.MsgVideo{
ChunkStreamID: 6,
MessageStreamID: 1,
IsKeyFrame: true,
H264Type: flvio.AVC_NALU,
Payload: []byte{
0x00, 0x00, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28,
0xd9, 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00,
0x00, 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00,
0xf0, 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x00, 0x00,
0x04, 0x08, 0x06, 0x07, 0x08, 0x00, 0x00, 0x00,
0x04, 0x05, 0x02, 0x03, 0x04,
},
}, msg1)
})
}
}
func TestRTMPServerAuth(t *testing.T) {
@ -115,18 +173,33 @@ func TestRTMPServerAuth(t *testing.T) { @@ -115,18 +173,33 @@ func TestRTMPServerAuth(t *testing.T) {
require.NoError(t, err)
}
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://127.0.0.1/teststream?user=testpublisher&pass=testpass&param=value",
})
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testpublisher&pass=testpass&param=value")
require.NoError(t, err)
nconn1, err := net.Dial("tcp", u1.Host)
require.NoError(t, err)
defer nconn1.Close()
conn1 := rtmp.NewConn(nconn1)
err = conn1.InitializeClient(u1, true)
require.NoError(t, err)
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
},
PPS: []byte{
0x68, 0xee, 0x3c, 0x80,
},
}
err = conn1.WriteTracks(videoTrack, nil)
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
if ca == "external" {
a.close()
@ -135,25 +208,25 @@ func TestRTMPServerAuth(t *testing.T) { @@ -135,25 +208,25 @@ func TestRTMPServerAuth(t *testing.T) {
defer a.close()
}
u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testreader&pass=testpass&param=value")
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testreader&pass=testpass&param=value")
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
nconn2, err := net.Dial("tcp", u2.Host)
require.NoError(t, err)
defer nconn.Close()
conn := rtmp.NewConn(nconn)
defer nconn2.Close()
conn2 := rtmp.NewConn(nconn2)
err = conn.InitializeClient(u, false)
err = conn2.InitializeClient(u2, false)
require.NoError(t, err)
_, _, err = conn.ReadTracks()
_, _, err = conn2.ReadTracks()
require.NoError(t, err)
})
}
}
func TestRTMPServerAuthFail(t *testing.T) {
t.Run("publish", func(t *testing.T) {
t.Run("publish", func(t *testing.T) { //nolint:dupl
p, ok := newInstance("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
@ -163,17 +236,47 @@ func TestRTMPServerAuthFail(t *testing.T) { @@ -163,17 +236,47 @@ func TestRTMPServerAuthFail(t *testing.T) {
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost/teststream?user=testuser&pass=testpass",
})
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser&pass=testpass")
require.NoError(t, err)
nconn1, err := net.Dial("tcp", u1.Host)
require.NoError(t, err)
defer nconn1.Close()
conn1 := rtmp.NewConn(nconn1)
err = conn1.InitializeClient(u1, true)
require.NoError(t, err)
defer cnt1.close()
require.NotEqual(t, 0, cnt1.wait())
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
},
PPS: []byte{
0x68, 0xee, 0x3c, 0x80,
},
}
err = conn1.WriteTracks(videoTrack, nil)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream")
require.NoError(t, err)
nconn2, err := net.Dial("tcp", u2.Host)
require.NoError(t, err)
defer nconn2.Close()
conn2 := rtmp.NewConn(nconn2)
err = conn2.InitializeClient(u2, false)
require.NoError(t, err)
_, _, err = conn2.ReadTracks()
require.EqualError(t, err, "EOF")
})
t.Run("publish_external", func(t *testing.T) {
@ -187,20 +290,50 @@ func TestRTMPServerAuthFail(t *testing.T) { @@ -187,20 +290,50 @@ func TestRTMPServerAuthFail(t *testing.T) {
require.NoError(t, err)
defer a.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost/teststream?user=testuser2&pass=testpass&param=value",
})
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass")
require.NoError(t, err)
nconn1, err := net.Dial("tcp", u1.Host)
require.NoError(t, err)
defer nconn1.Close()
conn1 := rtmp.NewConn(nconn1)
err = conn1.InitializeClient(u1, true)
require.NoError(t, err)
defer cnt1.close()
require.NotEqual(t, 0, cnt1.wait())
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
},
PPS: []byte{
0x68, 0xee, 0x3c, 0x80,
},
}
err = conn1.WriteTracks(videoTrack, nil)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream")
require.NoError(t, err)
nconn2, err := net.Dial("tcp", u2.Host)
require.NoError(t, err)
defer nconn2.Close()
conn2 := rtmp.NewConn(nconn2)
err = conn2.InitializeClient(u2, false)
require.NoError(t, err)
_, _, err = conn2.ReadTracks()
require.EqualError(t, err, "EOF")
})
t.Run("read", func(t *testing.T) {
t.Run("read", func(t *testing.T) { //nolint:dupl
p, ok := newInstance("rtspDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
@ -210,36 +343,46 @@ func TestRTMPServerAuthFail(t *testing.T) { @@ -210,36 +343,46 @@ func TestRTMPServerAuthFail(t *testing.T) {
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "flv",
"rtmp://localhost/teststream",
})
u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream")
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
nconn1, err := net.Dial("tcp", u1.Host)
require.NoError(t, err)
defer nconn1.Close()
conn1 := rtmp.NewConn(nconn1)
u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser&pass=testpass")
err = conn1.InitializeClient(u1, true)
require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host)
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{
0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0,
0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00,
0x00, 0x03, 0x00, 0x3d, 0x08,
},
PPS: []byte{
0x68, 0xee, 0x3c, 0x80,
},
}
err = conn1.WriteTracks(videoTrack, nil)
require.NoError(t, err)
defer nconn.Close()
conn := rtmp.NewConn(nconn)
err = conn.InitializeClient(u, false)
time.Sleep(500 * time.Millisecond)
u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass")
require.NoError(t, err)
for i := 0; i < 3; i++ {
_, err := conn.ReadMessage()
require.NoError(t, err)
}
nconn2, err := net.Dial("tcp", u2.Host)
require.NoError(t, err)
defer nconn2.Close()
conn2 := rtmp.NewConn(nconn2)
err = conn2.InitializeClient(u2, false)
require.NoError(t, err)
_, err = conn.ReadMessage()
require.Equal(t, err, io.EOF)
_, _, err = conn2.ReadTracks()
require.EqualError(t, err, "EOF")
})
}

33
internal/core/rtmp_source.go

@ -2,9 +2,13 @@ package core @@ -2,9 +2,13 @@ package core
import (
"context"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"fmt"
"net"
"net/url"
"strings"
"time"
"github.com/aler9/gortsplib"
@ -25,6 +29,7 @@ type rtmpSourceParent interface { @@ -25,6 +29,7 @@ type rtmpSourceParent interface {
type rtmpSource struct {
ur string
fingerprint string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
parent rtmpSourceParent
@ -32,12 +37,14 @@ type rtmpSource struct { @@ -32,12 +37,14 @@ type rtmpSource struct {
func newRTMPSource(
ur string,
fingerprint string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
parent rtmpSourceParent,
) *rtmpSource {
return &rtmpSource{
ur: ur,
fingerprint: fingerprint,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
parent: parent,
@ -66,8 +73,30 @@ func (s *rtmpSource) run(ctx context.Context) error { @@ -66,8 +73,30 @@ func (s *rtmpSource) run(ctx context.Context) error {
ctx2, cancel2 := context.WithTimeout(ctx, time.Duration(s.readTimeout))
defer cancel2()
var d net.Dialer
nconn, err := d.DialContext(ctx2, "tcp", u.Host)
nconn, err := func() (net.Conn, error) {
if u.Scheme == "rtmp" {
return (&net.Dialer{}).DialContext(ctx2, "tcp", u.Host)
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
VerifyConnection: func(cs tls.ConnectionState) error {
h := sha256.New()
h.Write(cs.PeerCertificates[0].Raw)
hstr := hex.EncodeToString(h.Sum(nil))
fingerprintLower := strings.ToLower(s.fingerprint)
if hstr != fingerprintLower {
return fmt.Errorf("server fingerprint do not match: expected %s, got %s",
fingerprintLower, hstr)
}
return nil
},
}
return (&tls.Dialer{Config: tlsConfig}).DialContext(ctx2, "tcp", u.Host)
}()
if err != nil {
return err
}

171
internal/core/rtmp_source_test.go

@ -1,56 +1,147 @@ @@ -1,56 +1,147 @@
package core
import (
"crypto/tls"
"net"
"os"
"testing"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/aler9/gortsplib/pkg/url"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/stretchr/testify/require"
"github.com/aler9/rtsp-simple-server/internal/rtmp"
"github.com/aler9/rtsp-simple-server/internal/rtmp/message"
)
func TestRTMPSource(t *testing.T) {
for _, source := range []string{
"videoaudio",
"video",
for _, ca := range []string{
"plain",
"tls",
} {
t.Run(source, func(t *testing.T) {
cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{})
t.Run(ca, func(t *testing.T) {
ln, err := func() (net.Listener, error) {
if ca == "plain" {
return net.Listen("tcp", "127.0.0.1:1937")
}
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
var cert tls.Certificate
cert, err = tls.LoadX509KeyPair(serverCertFpath, serverKeyFpath)
require.NoError(t, err)
return tls.Listen("tcp", "127.0.0.1:1937", &tls.Config{Certificates: []tls.Certificate{cert}})
}()
require.NoError(t, err)
defer cnt1.close()
cnt2, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "empty" + source + ".mkv",
"-c", "copy",
"-f", "flv",
"rtmp://" + cnt1.ip() + "/stream/test",
})
defer ln.Close()
connected := make(chan struct{})
received := make(chan struct{})
done := make(chan struct{})
go func() {
nconn, err := ln.Accept()
require.NoError(t, err)
defer nconn.Close()
conn := rtmp.NewConn(nconn)
_, _, err = conn.InitializeServer()
require.NoError(t, err)
videoTrack := &gortsplib.TrackH264{
PayloadType: 96,
SPS: []byte{ // 1920x1080 baseline
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
PPS: []byte{0x08, 0x06, 0x07, 0x08},
}
audioTrack := &gortsplib.TrackMPEG4Audio{
PayloadType: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
err = conn.WriteTracks(videoTrack, audioTrack)
require.NoError(t, err)
<-connected
err = conn.WriteMessage(&message.MsgVideo{
ChunkStreamID: 6,
MessageStreamID: 0x1000000,
IsKeyFrame: true,
H264Type: flvio.AVC_NALU,
Payload: []byte{0x00, 0x00, 0x00, 0x04, 0x05, 0x02, 0x03, 0x04},
})
require.NoError(t, err)
<-done
}()
if ca == "plain" {
p, ok := newInstance("paths:\n" +
" proxied:\n" +
" source: rtmp://localhost:1937/teststream\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
} else {
p, ok := newInstance("paths:\n" +
" proxied:\n" +
" source: rtmps://localhost:1937/teststream\n" +
" sourceFingerprint: 33949E05FFFB5FF3E8AA16F8213A6251B4D9363804BA53233C4DA9A46D6F2739\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
}
c := gortsplib.Client{
OnPacketRTP: func(ctx *gortsplib.ClientOnPacketRTPCtx) {
require.Equal(t, []byte{
0x18, 0x0, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9,
0x0, 0x78, 0x2, 0x27, 0xe5, 0x84, 0x0, 0x0,
0x3, 0x0, 0x4, 0x0, 0x0, 0x3, 0x0, 0xf0,
0x3c, 0x60, 0xc9, 0x20, 0x0, 0x4, 0x8, 0x6,
0x7, 0x8, 0x0, 0x4, 0x5, 0x2, 0x3, 0x4,
}, ctx.Packet.Payload)
close(received)
},
}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
p, ok := newInstance("hlsDisable: yes\n" +
"rtmpDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtmp://localhost/stream/test\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
cnt3, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://localhost:8554/proxied",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer cnt3.close()
require.Equal(t, 0, cnt3.wait())
defer c.Close()
tracks, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAndPlay(tracks, baseURL)
require.NoError(t, err)
close(connected)
<-received
close(done)
})
}
}

30
internal/core/rtsp_server.go

@ -49,6 +49,22 @@ type rtspServerParent interface { @@ -49,6 +49,22 @@ type rtspServerParent interface {
Log(logger.Level, string, ...interface{})
}
func printAddresses(srv *gortsplib.Server) string {
var ret []string
ret = append(ret, fmt.Sprintf("%s (TCP)", srv.RTSPAddress))
if srv.UDPRTPAddress != "" {
ret = append(ret, fmt.Sprintf("%s (UDP/RTP)", srv.UDPRTPAddress))
}
if srv.UDPRTCPAddress != "" {
ret = append(ret, fmt.Sprintf("%s (UDP/RTCP)", srv.UDPRTCPAddress))
}
return strings.Join(ret, ", ")
}
type rtspServer struct {
externalAuthenticationURL string
authMethods []headers.AuthMethod
@ -152,19 +168,7 @@ func newRTSPServer( @@ -152,19 +168,7 @@ func newRTSPServer(
return nil, err
}
var temp []string
temp = append(temp, fmt.Sprintf("%s (TCP)", address))
if s.srv.UDPRTPAddress != "" {
temp = append(temp, fmt.Sprintf("%s (UDP/RTP)", s.srv.UDPRTPAddress))
}
if s.srv.UDPRTCPAddress != "" {
temp = append(temp, fmt.Sprintf("%s (UDP/RTCP)", s.srv.UDPRTCPAddress))
}
s.log(logger.Info, "listener opened on "+strings.Join(temp, ", "))
s.log(logger.Info, "listener opened on %s", printAddresses(s.srv))
if s.metrics != nil {
if !isTLS {

2
internal/core/rtsp_source_test.go

@ -145,8 +145,6 @@ func TestRTSPSource(t *testing.T) { @@ -145,8 +145,6 @@ func TestRTSPSource(t *testing.T) {
defer p.close()
}
time.Sleep(1 * time.Second)
received := make(chan struct{})
c := gortsplib.Client{

4
internal/core/source_static.go

@ -67,9 +67,11 @@ func newSourceStatic( @@ -67,9 +67,11 @@ func newSourceStatic(
readBufferCount,
s)
case strings.HasPrefix(conf.Source, "rtmp://"):
case strings.HasPrefix(conf.Source, "rtmp://") ||
strings.HasPrefix(conf.Source, "rtmps://"):
s.impl = newRTMPSource(
conf.Source,
conf.SourceFingerprint,
readTimeout,
writeTimeout,
s)

21
rtsp-simple-server.yml

@ -67,7 +67,7 @@ rtspDisable: no @@ -67,7 +67,7 @@ rtspDisable: no
# TCP is the most versatile, and does support encryption.
# The handshake is always performed with TCP.
protocols: [udp, multicast, tcp]
# Encrypt handshake and TCP streams with TLS (RTSPS).
# Encrypt handshakes and TCP streams with TLS (RTSPS).
# Available values are "no", "strict", "optional".
encryption: "no"
# Address of the TCP/RTSP listener. This is needed only when encryption is "no" or "optional".
@ -99,8 +99,20 @@ authMethods: [basic, digest] @@ -99,8 +99,20 @@ authMethods: [basic, digest]
# Disable support for the RTMP protocol.
rtmpDisable: no
# Address of the RTMP listener.
# Address of the RTMP listener. This is needed only when encryption is "no" or "optional".
rtmpAddress: :1935
# Encrypt connections with TLS (RTMPS).
# Available values are "no", "strict", "optional".
rtmpEncryption: "no"
# Address of the RTMPS listener. This is needed only when encryption is "strict" or "optional".
rtmpsAddress: :1936
# Path to the server key. This is needed only when encryption is "strict" or "optional".
# This can be generated with:
# openssl genrsa -out server.key 2048
# openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
rtmpServerKey: server.key
# Path to the server certificate. This is needed only when encryption is "strict" or "optional".
rtmpServerCert: server.crt
###############################################
# HLS parameters
@ -169,7 +181,8 @@ paths: @@ -169,7 +181,8 @@ paths:
# * publisher -> the stream is published by a RTSP or RTMP client
# * rtsp://existing-url -> the stream is pulled from another RTSP server / camera
# * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS
# * rtmp://existing-url -> the stream is pulled from another RTMP server
# * rtmp://existing-url -> the stream is pulled from another RTMP server / camera
# * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS
# * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server
# * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS
# * redirect -> the stream is provided by another path or server
@ -185,7 +198,7 @@ paths: @@ -185,7 +198,7 @@ paths:
# and must be used only when interacting with sources that require it.
sourceAnyPortEnable: no
# If the source is a RTSPS or HTTPS URL, and the source certificate is self-signed
# If the source is a RTSPS, RTMPS or HTTPS URL, and the source certificate is self-signed
# or invalid, you can provide the fingerprint of the certificate in order to
# validate it anyway. It can be obtained by running:
# openssl s_client -connect source_ip:source_port </dev/null 2>/dev/null | sed -n '/BEGIN/,/END/p' > server.crt

8
testimages/nginx-rtmp/Dockerfile

@ -1,8 +0,0 @@ @@ -1,8 +0,0 @@
FROM amd64/alpine:3.14
RUN apk add --no-cache \
nginx-mod-rtmp
COPY nginx.conf /etc/nginx/
ENTRYPOINT [ "nginx", "-g", "daemon off;" ]

23
testimages/nginx-rtmp/nginx.conf

@ -1,23 +0,0 @@ @@ -1,23 +0,0 @@
pid /run/nginx.pid;
worker_processes auto;
pcre_jit on;
error_log /dev/null;
include /etc/nginx/modules/*.conf;
events {
worker_connections 20000;
}
rtmp {
server {
listen 1935;
access_log /dev/null;
application stream {
live on;
}
}
}
Loading…
Cancel
Save