Browse Source

implement RTSPS (#77)

pull/169/head
aler9 4 years ago
parent
commit
ccb8b92bfb
  1. 74
      README.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 10
      internal/client/client.go
  5. 28
      internal/clientman/clientman.go
  6. 68
      internal/conf/conf.go
  7. 6
      internal/servertcp/server.go
  8. 96
      internal/servertls/server.go
  9. 2
      internal/serverudp/server.go
  10. 4
      internal/stats/stats.go
  11. 193
      main.go
  12. 247
      main_test.go
  13. 28
      rtsp-simple-server.yml
  14. 33
      testimages/gstreamer/Dockerfile
  15. 95
      testimages/gstreamer/exitafterframe.c
  16. 2
      testimages/gstreamer/start.sh

74
README.md

@ -12,6 +12,7 @@ Features: @@ -12,6 +12,7 @@ Features:
* Read and publish live streams with UDP and TCP
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MP3, AAC, Opus, PCM)
* Serve multiple streams at once in separate paths
* Encrypt streams with TLS (RTSPS)
* Pull and serve streams from other RTSP or RTMP servers, always or on-demand (RTSP proxy)
* Authenticate readers and publishers separately
* Redirect to other RTSP servers (load balancing)
@ -27,10 +28,11 @@ Features: @@ -27,10 +28,11 @@ Features:
* [Basic usage](#basic-usage)
* [Advanced usage and FAQs](#advanced-usage-and-faqs)
* [Configuration](#configuration)
* [Encryption](#encryption)
* [RTSP proxy mode](#rtsp-proxy-mode)
* [Publish a webcam](#publish-a-webcam)
* [Publish a Raspberry Pi Camera](#publish-a-raspberry-pi-camera)
* [Generate HLS](#generate-hls)
* [Convert streams to HLS](#convert-streams-to-hls)
* [Remuxing, re-encoding, compression](#remuxing-re-encoding-compression)
* [On-demand publishing](#on-demand-publishing)
* [Redirect to another server](#redirect-to-another-server)
@ -104,25 +106,67 @@ docker run --rm -it -e RTSP_PROTOCOLS=tcp -p 8554:8554 aler9/rtsp-simple-server @@ -104,25 +106,67 @@ docker run --rm -it -e RTSP_PROTOCOLS=tcp -p 8554:8554 aler9/rtsp-simple-server
### Configuration
To see or change the configuration, edit the `rtsp-simple-server.yml` file, that is:
All the configuration parameters are listed and commented in the [configuration file](rtsp-simple-server.yml).
* included the release bundle
* available in the root folder of the Docker image (`/rtsp-simple-server.yml`)
* also [available here](rtsp-simple-server.yml).
There are two ways to change the configuration:
Every configuration parameter can be overridden by environment variables, in the format `RTSP_PARAMNAME`, where `PARAMNAME` is the uppercase name of a parameter. For instance, the `rtspPort` parameter can be overridden in the following way:
* By editing the `rtsp-simple-server.yml` file, that is
* included into the release bundle
* available in the root folder of the Docker image (`/rtsp-simple-server.yml`)
* also [available here](rtsp-simple-server.yml).
* By overriding configuration parameters with environment variables, in the format `RTSP_PARAMNAME`, where `PARAMNAME` is the uppercase name of a parameter. For instance, the `rtspPort` parameter can be overridden in the following way:
```
RTSP_RTSPPORT=8555 ./rtsp-simple-server
```
Parameters in maps can be overridden by using underscores, in the following way:
```
RTSP_PATHS_TEST_SOURCE=rtsp://myurl ./rtsp-simple-server
```
This method is particularly useful when running rtsp-simple-server with Docker; any configuration parameter can be changed by passing environment variables with the `-e` flag:
```
docker run --rm -it --network=host -e RTSP_PATHS_TEST_SOURCE=rtsp://myurl aler9/rtsp-simple-server
```
The configuration can be changed dinamically when the server is running (hot reloading) by writing to the configuration file. Changes are detected and applied without disconnecting existing clients, whenever it's possible.
### Encryption
Incoming and outgoing streams can be encrypted with TLS (obtaining the RTSPS protocol). A TLS certificate must be installed on the server; if the server is installed on a machine that is publicly accessible from the internet, a certificate can be requested from a Certificate authority by using tools like [Certbot](https://certbot.eff.org/); otherwise, a self-signed certificate can be generated with openSSL:
```
openssl genrsa -out server.key 2048
openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
```
RTSP_RTSPPORT=8555 ./rtsp-simple-server
Edit `rtsp-simple-server.yml`, and set the `protocols`, `encrypt`, `serverKey` and `serverCert` parameters:
```yml
protocols: [tcp]
encryption: optional
serverKey: server.key
serverCert: server.crt
```
Parameters in maps can be overridden by using underscores, in the following way:
Streams can then be published and read with the `rtsps` scheme and the 8555 port:
```
RTSP_PATHS_TEST_SOURCE=rtsp://myurl ./rtsp-simple-server
ffmpeg -i rtsps://...
```
The configuration can be changed dinamically when the server is running (hot reloading) by writing to the configuration file. Changes are detected and applied without disconnecting existing clients, whenever it's possible.
If the client is _GStreamer_ and the server certificate is self signed, remember to disable the certificate validation:
```
gst-launch-1.0 rtspsrc location=rtsps://... tls-validation-flags=0
```
If the client is _VLC_, encryption can't be deployed, since _VLC_ doesn't support it.
### RTSP proxy mode
@ -210,9 +254,9 @@ paths: @@ -210,9 +254,9 @@ paths:
After starting the server, the webcam is available on `rtsp://localhost:8554/cam`.
### Generate HLS
### Convert streams to HLS
Edit `rtsp-simple-server.yml` and replace everything inside section `paths` with the following content:
HLS is a media format that allows live streams be embedded in web pages, inside standard `<video>` HTML tags. To generate HLS whenever someone publishes a stream, edit `rtsp-simple-server.yml` and replace everything inside section `paths` with the following content:
```yml
paths:
@ -221,9 +265,9 @@ paths: @@ -221,9 +265,9 @@ paths:
runOnPublishRestart: yes
```
Every time someone publishes a stream, the server will produce HLS segments, that can be served by a web server.
The resulting files (`stream.m3u8` and a lot of `.ts` segments) can be served by a web server.
The example above makes the assumption that the incoming stream is encoded with H264 and AAC, since they are the only codecs supported by HLS; if the incoming stream is encoded with different codecs, it must be converted:
The example above makes the assumption that published streams are encoded with H264 and AAC, since they are the only codecs supported by HLS; if streams make use of different codecs, they must be converted:
```yml
paths:
@ -307,7 +351,7 @@ paths: @@ -307,7 +351,7 @@ paths:
readPass: userpassword
```
WARNING: RTSP is a plain protocol, and the credentials can be intercepted and read by malicious users (even if hashed, since the only supported hash method is md5, which is broken). If you need a secure channel, use RTSP inside a VPN.
**WARNING**: enable encryption or use a VPN to ensure that no one is intercepting and reading the credentials.
### Start on boot with systemd

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c
github.com/aler9/gortsplib v0.0.0-20201213221155-9cd36cdd6860
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c h1:NEqO8o8hNAEeYB7OZnkT2k9QVLcBlOZXBLZYQeE1aRg=
github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20201213221155-9cd36cdd6860 h1:fL41RJvlXWaIk/SS/rNDf9F/26Aq6y2F7quilWolMfg=
github.com/aler9/gortsplib v0.0.0-20201213221155-9cd36cdd6860/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

10
internal/client/client.go

@ -128,6 +128,7 @@ type Client struct { @@ -128,6 +128,7 @@ type Client struct {
// New allocates a Client.
func New(
isTLS bool,
rtspPort int,
readTimeout time.Duration,
runOnConnect string,
@ -159,7 +160,12 @@ func New( @@ -159,7 +160,12 @@ func New(
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected")
c.log(logger.Info, "connected (%s)", func() string {
if isTLS {
return "encrypted"
}
return "plain"
}())
c.wg.Add(1)
go c.run()
@ -811,7 +817,7 @@ func (c *Client) run() { @@ -811,7 +817,7 @@ func (c *Client) run() {
select {
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != errTerminated {
if err != io.EOF && err != gortsplib.ErrServerTeardown && err != errTerminated {
c.log(logger.Info, "ERR: %s", err)
}

28
internal/clientman/clientman.go

@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/servertcp"
"github.com/aler9/rtsp-simple-server/internal/servertls"
"github.com/aler9/rtsp-simple-server/internal/serverudp"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -32,6 +33,7 @@ type ClientManager struct { @@ -32,6 +33,7 @@ type ClientManager struct {
serverUDPRtcp *serverudp.Server
pathMan *pathman.PathManager
serverTCP *servertcp.Server
serverTLS *servertls.Server
parent Parent
clients map[*client.Client]struct{}
@ -57,6 +59,7 @@ func New( @@ -57,6 +59,7 @@ func New(
serverUDPRtcp *serverudp.Server,
pathMan *pathman.PathManager,
serverTCP *servertcp.Server,
serverTLS *servertls.Server,
parent Parent) *ClientManager {
cm := &ClientManager{
@ -70,6 +73,7 @@ func New( @@ -70,6 +73,7 @@ func New(
serverUDPRtcp: serverUDPRtcp,
pathMan: pathMan,
serverTCP: serverTCP,
serverTLS: serverTLS,
parent: parent,
clients: make(map[*client.Client]struct{}),
clientClose: make(chan *client.Client),
@ -95,11 +99,31 @@ func (cm *ClientManager) Log(level logger.Level, format string, args ...interfac @@ -95,11 +99,31 @@ func (cm *ClientManager) Log(level logger.Level, format string, args ...interfac
func (cm *ClientManager) run() {
defer close(cm.done)
tcpAccept := func() chan *gortsplib.ServerConn {
if cm.serverTCP != nil {
return cm.serverTCP.Accept()
}
return make(chan *gortsplib.ServerConn)
}()
tlsAccept := func() chan *gortsplib.ServerConn {
if cm.serverTLS != nil {
return cm.serverTLS.Accept()
}
return make(chan *gortsplib.ServerConn)
}()
outer:
for {
select {
case conn := <-cm.serverTCP.Accept():
c := client.New(cm.rtspPort, cm.readTimeout,
case conn := <-tcpAccept:
c := client.New(false, cm.rtspPort, cm.readTimeout,
cm.runOnConnect, cm.runOnConnectRestart, cm.protocols, &cm.wg,
cm.stats, cm.serverUDPRtp, cm.serverUDPRtcp, conn, cm)
cm.clients[c] = struct{}{}
case conn := <-tlsAccept:
c := client.New(true, cm.rtspPort, cm.readTimeout,
cm.runOnConnect, cm.runOnConnectRestart, cm.protocols, &cm.wg,
cm.stats, cm.serverUDPRtp, cm.serverUDPRtcp, conn, cm)
cm.clients[c] = struct{}{}

68
internal/conf/conf.go

@ -13,6 +13,16 @@ import ( @@ -13,6 +13,16 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
// Encryption is an encryption policy.
type Encryption int
// encryption policies.
const (
EncryptionNo Encryption = iota
EncryptionOptional
EncryptionYes
)
// Conf is the main program configuration.
type Conf struct {
LogLevel string `yaml:"logLevel"`
@ -22,26 +32,34 @@ type Conf struct { @@ -22,26 +32,34 @@ type Conf struct {
LogFile string `yaml:"logFile"`
Protocols []string `yaml:"protocols"`
ProtocolsParsed map[gortsplib.StreamProtocol]struct{} `yaml:"-" json:"-"`
Encryption string `yaml:"encryption"`
EncryptionParsed Encryption `yaml:"-" json:"-"`
RtspPort int `yaml:"rtspPort"`
RtspsPort int `yaml:"rtspsPort"`
RtpPort int `yaml:"rtpPort"`
RtcpPort int `yaml:"rtcpPort"`
RunOnConnect string `yaml:"runOnConnect"`
RunOnConnectRestart bool `yaml:"runOnConnectRestart"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
ServerKey string `yaml:"serverKey"`
ServerCert string `yaml:"serverCert"`
AuthMethods []string `yaml:"authMethods"`
AuthMethodsParsed []headers.AuthMethod `yaml:"-" json:"-"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
Metrics bool `yaml:"metrics"`
Pprof bool `yaml:"pprof"`
RunOnConnect string `yaml:"runOnConnect"`
RunOnConnectRestart bool `yaml:"runOnConnectRestart"`
Paths map[string]*PathConf `yaml:"paths"`
}
func (conf *Conf) fillAndCheck() error {
if conf.LogLevel == "" {
conf.LogLevel = "info"
}
switch conf.LogLevel {
case "warn":
conf.LogLevelParsed = logger.Warn
case "", "info":
case "info":
conf.LogLevelParsed = logger.Info
case "debug":
@ -54,7 +72,6 @@ func (conf *Conf) fillAndCheck() error { @@ -54,7 +72,6 @@ func (conf *Conf) fillAndCheck() error {
if len(conf.LogDestinations) == 0 {
conf.LogDestinations = []string{"stdout"}
}
conf.LogDestinationsParsed = make(map[logger.Destination]struct{})
for _, dest := range conf.LogDestinations {
switch dest {
@ -96,9 +113,33 @@ func (conf *Conf) fillAndCheck() error { @@ -96,9 +113,33 @@ func (conf *Conf) fillAndCheck() error {
return fmt.Errorf("no protocols provided")
}
if conf.Encryption == "" {
conf.Encryption = "no"
}
switch conf.Encryption {
case "no":
conf.EncryptionParsed = EncryptionNo
case "optional":
conf.EncryptionParsed = EncryptionOptional
case "yes":
conf.EncryptionParsed = EncryptionYes
if _, ok := conf.ProtocolsParsed[gortsplib.StreamProtocolUDP]; ok {
return fmt.Errorf("encryption can't be used with the UDP stream protocol")
}
default:
return fmt.Errorf("unsupported encryption value: '%s'", conf.Encryption)
}
if conf.RtspPort == 0 {
conf.RtspPort = 8554
}
if conf.RtspsPort == 0 {
conf.RtspsPort = 8555
}
if conf.RtpPort == 0 {
conf.RtpPort = 8000
}
@ -112,11 +153,11 @@ func (conf *Conf) fillAndCheck() error { @@ -112,11 +153,11 @@ func (conf *Conf) fillAndCheck() error {
return fmt.Errorf("rtcp and rtp ports must be consecutive")
}
if conf.ReadTimeout == 0 {
conf.ReadTimeout = 10 * time.Second
if conf.ServerKey == "" {
conf.ServerKey = "server.key"
}
if conf.WriteTimeout == 0 {
conf.WriteTimeout = 10 * time.Second
if conf.ServerCert == "" {
conf.ServerCert = "server.crt"
}
if len(conf.AuthMethods) == 0 {
@ -135,6 +176,13 @@ func (conf *Conf) fillAndCheck() error { @@ -135,6 +176,13 @@ func (conf *Conf) fillAndCheck() error {
}
}
if conf.ReadTimeout == 0 {
conf.ReadTimeout = 10 * time.Second
}
if conf.WriteTimeout == 0 {
conf.WriteTimeout = 10 * time.Second
}
if len(conf.Paths) == 0 {
conf.Paths = map[string]*PathConf{
"all": {},

6
internal/servertcp/server.go

@ -14,7 +14,7 @@ type Parent interface { @@ -14,7 +14,7 @@ type Parent interface {
Log(logger.Level, string, ...interface{})
}
// Server is a RTSP TCP server.
// Server is a TCP/RTSP listener.
type Server struct {
parent Parent
@ -49,7 +49,7 @@ func New(port int, @@ -49,7 +49,7 @@ func New(port int,
done: make(chan struct{}),
}
parent.Log(logger.Info, "[TCP server] opened on :%d", port)
parent.Log(logger.Info, "[TCP/RTSP server] opened on :%d", port)
go s.run()
return s, nil
@ -82,6 +82,6 @@ func (s *Server) run() { @@ -82,6 +82,6 @@ func (s *Server) run() {
}
// Accept returns a channel to accept incoming connections.
func (s *Server) Accept() <-chan *gortsplib.ServerConn {
func (s *Server) Accept() chan *gortsplib.ServerConn {
return s.accept
}

96
internal/servertls/server.go

@ -0,0 +1,96 @@ @@ -0,0 +1,96 @@
package servertls
import (
"crypto/tls"
"strconv"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
}
// Server is a TCP/TLS/RTSPS listener.
type Server struct {
parent Parent
srv *gortsplib.Server
// out
accept chan *gortsplib.ServerConn
done chan struct{}
}
// New allocates a Server.
func New(port int,
readTimeout time.Duration,
writeTimeout time.Duration,
serverKey string,
serverCert string,
parent Parent) (*Server, error) {
cert, err := tls.LoadX509KeyPair(serverCert, serverKey)
if err != nil {
return nil, err
}
conf := gortsplib.ServerConf{
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
ReadBufferCount: 1,
}
srv, err := conf.Serve(":" + strconv.FormatInt(int64(port), 10))
if err != nil {
return nil, err
}
s := &Server{
parent: parent,
srv: srv,
accept: make(chan *gortsplib.ServerConn),
done: make(chan struct{}),
}
parent.Log(logger.Info, "[TCP/TLS/RTSPS server] opened on :%d", port)
go s.run()
return s, nil
}
// Close closes a Server.
func (s *Server) Close() {
go func() {
for co := range s.accept {
co.Close()
}
}()
s.srv.Close()
<-s.done
}
func (s *Server) run() {
defer close(s.done)
for {
conn, err := s.srv.Accept()
if err != nil {
break
}
s.accept <- conn
}
close(s.accept)
}
// Accept returns a channel to accept incoming connections.
func (s *Server) Accept() chan *gortsplib.ServerConn {
return s.accept
}

2
internal/serverudp/server.go

@ -49,7 +49,7 @@ func (p *publisherAddr) fill(ip net.IP, port int) { @@ -49,7 +49,7 @@ func (p *publisherAddr) fill(ip net.IP, port int) {
}
}
// Server is a RTSP UDP server.
// Server is a UDP server that can be used to send and receive both RTP and RTCP.
type Server struct {
writeTimeout time.Duration
streamType gortsplib.StreamType

4
internal/stats/stats.go

@ -30,3 +30,7 @@ func New() *Stats { @@ -30,3 +30,7 @@ func New() *Stats {
CountSourcesRtmpRunning: ptrInt64(),
}
}
// Close closes a stats.
func (s *Stats) Close() {
}

193
main.go

@ -17,6 +17,7 @@ import ( @@ -17,6 +17,7 @@ import (
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/pprof"
"github.com/aler9/rtsp-simple-server/internal/servertcp"
"github.com/aler9/rtsp-simple-server/internal/servertls"
"github.com/aler9/rtsp-simple-server/internal/serverudp"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -34,6 +35,7 @@ type program struct { @@ -34,6 +35,7 @@ type program struct {
serverUDPRtp *serverudp.Server
serverUDPRtcp *serverudp.Server
serverTCP *servertcp.Server
serverTLS *servertls.Server
pathMan *pathman.PathManager
clientMan *clientman.ClientManager
confWatcher *confwatcher.ConfWatcher
@ -69,10 +71,10 @@ func newProgram(args []string) (*program, bool) { @@ -69,10 +71,10 @@ func newProgram(args []string) (*program, bool) {
return nil, false
}
err = p.createDynamicResources(true)
err = p.createResources(true)
if err != nil {
p.Log(logger.Info, "ERR: %s", err)
p.closeAllResources()
p.closeResources(nil)
return nil, false
}
@ -80,7 +82,7 @@ func newProgram(args []string) (*program, bool) { @@ -80,7 +82,7 @@ func newProgram(args []string) (*program, bool) {
p.confWatcher, err = confwatcher.New(p.confPath)
if err != nil {
p.Log(logger.Info, "ERR: %s", err)
p.closeAllResources()
p.closeResources(nil)
return nil, false
}
}
@ -129,10 +131,14 @@ outer: @@ -129,10 +131,14 @@ outer:
}
}
p.closeAllResources()
p.closeResources(nil)
if p.confWatcher != nil {
p.confWatcher.Close()
}
}
func (p *program) createDynamicResources(initial bool) error {
func (p *program) createResources(initial bool) error {
var err error
if p.stats == nil {
@ -149,7 +155,6 @@ func (p *program) createDynamicResources(initial bool) error { @@ -149,7 +155,6 @@ func (p *program) createDynamicResources(initial bool) error {
if initial {
p.Log(logger.Info, "rtsp-simple-server %s", version)
if !p.confFound {
p.Log(logger.Warn, "configuration file not found, using the default one")
}
@ -192,10 +197,22 @@ func (p *program) createDynamicResources(initial bool) error { @@ -192,10 +197,22 @@ func (p *program) createDynamicResources(initial bool) error {
}
if p.serverTCP == nil {
p.serverTCP, err = servertcp.New(p.conf.RtspPort, p.conf.ReadTimeout,
p.conf.WriteTimeout, p)
if err != nil {
return err
if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional {
p.serverTCP, err = servertcp.New(p.conf.RtspPort, p.conf.ReadTimeout,
p.conf.WriteTimeout, p)
if err != nil {
return err
}
}
}
if p.serverTLS == nil {
if p.conf.EncryptionParsed == conf.EncryptionYes || p.conf.EncryptionParsed == conf.EncryptionOptional {
p.serverTLS, err = servertls.New(p.conf.RtspsPort, p.conf.ReadTimeout,
p.conf.WriteTimeout, p.conf.ServerKey, p.conf.ServerCert, p)
if err != nil {
return err
}
}
}
@ -209,129 +226,113 @@ func (p *program) createDynamicResources(initial bool) error { @@ -209,129 +226,113 @@ func (p *program) createDynamicResources(initial bool) error {
p.clientMan = clientman.New(p.conf.RtspPort, p.conf.ReadTimeout,
p.conf.RunOnConnect, p.conf.RunOnConnectRestart,
p.conf.ProtocolsParsed, p.stats, p.serverUDPRtp, p.serverUDPRtcp,
p.pathMan, p.serverTCP, p)
p.pathMan, p.serverTCP, p.serverTLS, p)
}
return nil
}
func (p *program) closeAllResources() {
if p.confWatcher != nil {
p.confWatcher.Close()
}
if p.clientMan != nil {
p.clientMan.Close()
}
if p.pathMan != nil {
p.pathMan.Close()
}
if p.serverTCP != nil {
p.serverTCP.Close()
}
if p.serverUDPRtcp != nil {
p.serverUDPRtcp.Close()
}
if p.serverUDPRtp != nil {
p.serverUDPRtp.Close()
}
if p.metrics != nil {
p.metrics.Close()
}
if p.pprof != nil {
p.pprof.Close()
}
if p.logger != nil {
p.logger.Close()
}
}
func (p *program) reloadConf() error {
p.Log(logger.Info, "reloading configuration")
conf, _, err := conf.Load(p.confPath)
if err != nil {
return err
}
func (p *program) closeResources(newConf *conf.Conf) {
closeLogger := false
if !reflect.DeepEqual(conf.LogDestinationsParsed, p.conf.LogDestinationsParsed) ||
conf.LogFile != p.conf.LogFile {
if newConf == nil ||
!reflect.DeepEqual(newConf.LogDestinationsParsed, p.conf.LogDestinationsParsed) ||
newConf.LogFile != p.conf.LogFile {
closeLogger = true
}
closeMetrics := false
if conf.Metrics != p.conf.Metrics {
if newConf == nil ||
newConf.Metrics != p.conf.Metrics {
closeMetrics = true
}
closePprof := false
if conf.Pprof != p.conf.Pprof {
if newConf == nil ||
newConf.Pprof != p.conf.Pprof {
closePprof = true
}
closeServerUDPRtp := false
if !reflect.DeepEqual(conf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
conf.WriteTimeout != p.conf.WriteTimeout ||
conf.RtpPort != p.conf.RtpPort {
if newConf == nil ||
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.RtpPort != p.conf.RtpPort {
closeServerUDPRtp = true
}
closeServerUDPRtcp := false
if !reflect.DeepEqual(conf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
conf.WriteTimeout != p.conf.WriteTimeout ||
conf.RtcpPort != p.conf.RtcpPort {
if newConf == nil ||
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.RtcpPort != p.conf.RtcpPort {
closeServerUDPRtcp = true
}
closeServerTCP := false
if conf.RtspPort != p.conf.RtspPort ||
conf.ReadTimeout != p.conf.ReadTimeout ||
conf.WriteTimeout != p.conf.WriteTimeout {
if newConf == nil ||
newConf.EncryptionParsed != p.conf.EncryptionParsed ||
newConf.RtspPort != p.conf.RtspPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout {
closeServerTCP = true
}
closeServerTLS := false
if newConf == nil ||
newConf.EncryptionParsed != p.conf.EncryptionParsed ||
newConf.RtspsPort != p.conf.RtspsPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout {
closeServerTLS = true
}
closePathMan := false
if conf.RtspPort != p.conf.RtspPort ||
conf.ReadTimeout != p.conf.ReadTimeout ||
conf.WriteTimeout != p.conf.WriteTimeout ||
!reflect.DeepEqual(conf.AuthMethodsParsed, p.conf.AuthMethodsParsed) {
if newConf == nil ||
newConf.RtspPort != p.conf.RtspPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
!reflect.DeepEqual(newConf.AuthMethodsParsed, p.conf.AuthMethodsParsed) {
closePathMan = true
} else if !reflect.DeepEqual(conf.Paths, p.conf.Paths) {
p.pathMan.OnProgramConfReload(conf.Paths)
} else if !reflect.DeepEqual(newConf.Paths, p.conf.Paths) {
p.pathMan.OnProgramConfReload(newConf.Paths)
}
closeClientMan := false
if closeServerUDPRtp ||
if newConf == nil ||
closeServerUDPRtp ||
closeServerUDPRtcp ||
closeServerTCP ||
closeServerTLS ||
closePathMan ||
conf.RtspPort != p.conf.RtspPort ||
conf.ReadTimeout != p.conf.ReadTimeout ||
conf.RunOnConnect != p.conf.RunOnConnect ||
conf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
!reflect.DeepEqual(conf.ProtocolsParsed, p.conf.ProtocolsParsed) {
newConf.RtspPort != p.conf.RtspPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) {
closeClientMan = true
}
if closeClientMan {
closeStats := false
if newConf == nil {
closeStats = true
}
if closeClientMan && p.clientMan != nil {
p.clientMan.Close()
p.clientMan = nil
}
if closePathMan {
if closePathMan && p.pathMan != nil {
p.pathMan.Close()
p.pathMan = nil
}
if closeServerTCP {
if closeServerTLS && p.serverTLS != nil {
p.serverTLS.Close()
p.serverTLS = nil
}
if closeServerTCP && p.serverTCP != nil {
p.serverTCP.Close()
p.serverTCP = nil
}
@ -356,13 +357,28 @@ func (p *program) reloadConf() error { @@ -356,13 +357,28 @@ func (p *program) reloadConf() error {
p.metrics = nil
}
if closeLogger {
if closeLogger && p.logger != nil {
p.logger.Close()
p.logger = nil
}
p.conf = conf
return p.createDynamicResources(false)
if closeStats && p.stats != nil {
p.stats.Close()
}
}
func (p *program) reloadConf() error {
p.Log(logger.Info, "reloading configuration")
newConf, _, err := conf.Load(p.confPath)
if err != nil {
return err
}
p.closeResources(newConf)
p.conf = newConf
return p.createResources(false)
}
func main() {
@ -370,6 +386,5 @@ func main() { @@ -370,6 +386,5 @@ func main() {
if !ok {
os.Exit(1)
}
<-p.done
}

247
main_test.go

@ -102,21 +102,33 @@ func (c *container) ip() string { @@ -102,21 +102,33 @@ func (c *container) ip() string {
return string(out[:len(out)-1])
}
func writeTempFile(byts []byte) (string, error) {
tmpf, err := ioutil.TempFile(os.TempDir(), "rtsp-")
if err != nil {
return "", err
}
defer tmpf.Close()
_, err = tmpf.Write(byts)
if err != nil {
return "", err
}
return tmpf.Name(), nil
}
func testProgram(conf string) (*program, bool) {
if conf == "" {
return newProgram([]string{})
}
tmpf, err := ioutil.TempFile(os.TempDir(), "rtsp-")
tmpf, err := writeTempFile([]byte(conf))
if err != nil {
return nil, false
}
defer os.Remove(tmpf.Name())
defer os.Remove(tmpf)
tmpf.WriteString(conf)
tmpf.Close()
return newProgram([]string{tmpf.Name()})
return newProgram([]string{tmpf})
}
func TestEnvironment(t *testing.T) {
@ -233,43 +245,144 @@ func TestEnvironmentNoFile(t *testing.T) { @@ -233,43 +245,144 @@ func TestEnvironmentNoFile(t *testing.T) {
}, pa)
}
func TestPublish(t *testing.T) {
var serverCert = []byte(`-----BEGIN CERTIFICATE-----
MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy
MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj
zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv
NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp
OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I
qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e
nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud
DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a
u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj
3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO
xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu
tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI
XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7
7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd
XQxaORfgM//NzX9LhUPk
-----END CERTIFICATE-----
`)
var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/
KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y
1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY
cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3
6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE
CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC
kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT
kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP
bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S
Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj
5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb
agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ
M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3
ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz
ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl
+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX
4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp
xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj
7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf
3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a
r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO
y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD
94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK
6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1
+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs=
-----END RSA PRIVATE KEY-----
`)
func TestPublishRead(t *testing.T) {
for _, conf := range []struct {
publishSoft string
publishProto string
encrypted bool
publisherSoft string
publisherProto string
readerSoft string
readerProto string
}{
{"ffmpeg", "udp"},
{"ffmpeg", "tcp"},
{"gstreamer", "udp"},
{"gstreamer", "tcp"},
{false, "ffmpeg", "udp", "ffmpeg", "udp"},
{false, "ffmpeg", "udp", "ffmpeg", "tcp"},
{false, "ffmpeg", "udp", "gstreamer", "udp"},
{false, "ffmpeg", "udp", "gstreamer", "tcp"},
{false, "ffmpeg", "udp", "vlc", "udp"},
{false, "ffmpeg", "udp", "vlc", "tcp"},
{false, "ffmpeg", "tcp", "ffmpeg", "udp"},
{false, "gstreamer", "udp", "ffmpeg", "udp"},
{false, "gstreamer", "tcp", "ffmpeg", "udp"},
{true, "ffmpeg", "tcp", "ffmpeg", "tcp"},
{true, "ffmpeg", "tcp", "gstreamer", "tcp"},
{true, "gstreamer", "tcp", "ffmpeg", "tcp"},
} {
t.Run(conf.publishSoft+"_"+conf.publishProto, func(t *testing.T) {
p, ok := testProgram("readTimeout: 20s")
require.Equal(t, true, ok)
defer p.close()
encryptedStr := func() string {
if conf.encrypted {
return "encrypted"
}
return "plain"
}()
t.Run(encryptedStr+"_"+conf.publisherSoft+"_"+conf.publisherProto+"_"+
conf.readerSoft+"_"+conf.readerProto, func(t *testing.T) {
var proto string
var port string
if !conf.encrypted {
proto = "rtsp"
port = "8554"
p, ok := testProgram("readTimeout: 20s")
require.Equal(t, true, ok)
defer p.close()
} else {
proto = "rtsps"
port = "8555"
serverCertFpath, err := writeTempFile(serverCert)
require.NoError(t, err)
defer os.Remove(serverCertFpath)
serverKeyFpath, err := writeTempFile(serverKey)
require.NoError(t, err)
defer os.Remove(serverKeyFpath)
p, ok := testProgram("readTimeout: 20s\n" +
"protocols: [tcp]\n" +
"encryption: yes\n" +
"serverCert: " + serverCertFpath + "\n" +
"serverKey: " + serverKeyFpath + "\n")
require.Equal(t, true, ok)
defer p.close()
}
time.Sleep(1 * time.Second)
switch conf.publishSoft {
switch conf.publisherSoft {
case "ffmpeg":
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", conf.publishProto,
"rtsp://" + ownDockerIP + ":8554/teststream",
"-rtsp_transport", conf.publisherProto,
proto + "://" + ownDockerIP + ":" + port + "/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
default:
case "gstreamer":
cnt1, err := newContainer("gstreamer", "source", []string{
"filesrc location=emptyvideo.ts ! tsdemux ! queue ! video/x-h264 ! h264parse config-interval=1 ! rtspclientsink " +
"location=rtsp://" + ownDockerIP + ":8554/teststream protocols=" + conf.publishProto + " latency=0 timeout=0 rtx-time=0",
"location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream " +
"protocols=" + conf.publisherProto + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0",
})
require.NoError(t, err)
defer cnt1.close()
@ -277,77 +390,37 @@ func TestPublish(t *testing.T) { @@ -277,77 +390,37 @@ func TestPublish(t *testing.T) {
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
})
}
}
func TestRead(t *testing.T) {
for _, conf := range []struct {
readSoft string
readProto string
}{
{"ffmpeg", "udp"},
{"ffmpeg", "tcp"},
{"vlc", "udp"},
{"vlc", "tcp"},
} {
t.Run(conf.readSoft+"_"+conf.readProto, func(t *testing.T) {
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
time.Sleep(1 * time.Second)
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://" + ownDockerIP + ":8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
switch conf.readSoft {
switch conf.readerSoft {
case "ffmpeg":
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", conf.readProto,
"-i", "rtsp://" + ownDockerIP + ":8554/teststream",
"-rtsp_transport", conf.readerProto,
"-i", proto + "://" + ownDockerIP + ":" + port + "/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
case "gstreamer":
cnt2, err := newContainer("gstreamer", "read", []string{
"rtspsrc location=" + proto + "://" + ownDockerIP + ":" + port + "/teststream protocols=tcp tls-validation-flags=0 latency=0 " +
"! application/x-rtp,media=video ! decodebin ! exitafterframe ! fakesink",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
default:
case "vlc":
args := []string{}
if conf.readProto == "tcp" {
if conf.readerProto == "tcp" {
args = append(args, "--rtsp-tcp")
}
args = append(args, "rtsp://"+ownDockerIP+":8554/teststream")
args = append(args, proto+"://"+ownDockerIP+":"+port+"/teststream")
cnt2, err := newContainer("vlc", "dest", args)
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
})
@ -364,7 +437,7 @@ func TestTCPOnly(t *testing.T) { @@ -364,7 +437,7 @@ func TestTCPOnly(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "tcp",
@ -396,7 +469,7 @@ func TestPathWithSlash(t *testing.T) { @@ -396,7 +469,7 @@ func TestPathWithSlash(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -428,7 +501,7 @@ func TestPathWithQuery(t *testing.T) { @@ -428,7 +501,7 @@ func TestPathWithQuery(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -465,7 +538,7 @@ func TestAuth(t *testing.T) { @@ -465,7 +538,7 @@ func TestAuth(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -507,7 +580,7 @@ func TestAuth(t *testing.T) { @@ -507,7 +580,7 @@ func TestAuth(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -556,7 +629,7 @@ func TestAuthIpFail(t *testing.T) { @@ -556,7 +629,7 @@ func TestAuthIpFail(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -586,7 +659,7 @@ func TestSourceRtsp(t *testing.T) { @@ -586,7 +659,7 @@ func TestSourceRtsp(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -636,7 +709,7 @@ func TestSourceRtmp(t *testing.T) { @@ -636,7 +709,7 @@ func TestSourceRtmp(t *testing.T) {
cnt2, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + cnt1.ip() + "/stream/test",
@ -682,7 +755,7 @@ func TestRedirect(t *testing.T) { @@ -682,7 +755,7 @@ func TestRedirect(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
@ -719,7 +792,7 @@ func TestFallback(t *testing.T) { @@ -719,7 +792,7 @@ func TestFallback(t *testing.T) {
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",

28
rtsp-simple-server.yml

@ -1,26 +1,38 @@ @@ -1,26 +1,38 @@
# sets the verbosity of the program; available values are "warn", "info", "debug"
# sets the verbosity of the program; available values are "warn", "info", "debug".
logLevel: info
# destinations of log messages; available values are "stdout", "file" and "syslog".
logDestinations: [stdout]
# if "file" is in logDestinations, this is the file which will receive the logs.
logFile: rtsp-simple-server.log
# supported stream protocols (the handshake is always performed with TCP).
# supported stream protocols.
# UDP is the most performant, but can cause problems if there's a NAT between
# server and clients, and doesn't support encryption.
# TCP is the most versatile, and does support encryption.
# The handshake is always performed with TCP.
protocols: [udp, tcp]
# port of the TCP RTSP listener.
# encrypt handshake and TCP streams with TLS (RTSPS).
# available values are "no", "yes", "optional".
encryption: no
# port of the TCP/RTSP listener. This is used only if encryption is "no" or "optional".
rtspPort: 8554
# port of the UDP RTP listener (used only if udp is in protocols).
# port of the TCP/TLS/RTSPS listener. This is used only if encryption is "yes" or "optional".
rtspsPort: 8555
# port of the UDP/RTP listener. This is used only if "udp" is in protocols.
rtpPort: 8000
# port of the UDP RTCP listener (used only if udp is in protocols).
# port of the UDP/RTCP listener. This is used only if "udp" is in protocols.
rtcpPort: 8001
# path to the server key. This is used only if encryption is "yes" or "optional".
serverKey: server.key
# path to the server certificate. This is used only if encryption is "yes" or "optional".
serverCert: server.crt
# authentication methods.
authMethods: [basic, digest]
# timeout of read operations.
readTimeout: 10s
# timeout of write operations.
writeTimeout: 10s
# supported authentication methods (both are insecure, use RTSP inside a VPN
# to enforce security).
authMethods: [basic, digest]
# enable Prometheus-compatible metrics on port 9998.
metrics: no

33
testimages/gstreamer/Dockerfile

@ -1,12 +1,43 @@ @@ -1,12 +1,43 @@
FROM amd64/ubuntu:20.04
######################################
FROM ubuntu:20.04 AS exitafterframe
RUN apt update && apt install -y --no-install-recommends \
pkg-config \
gcc \
libgstreamer-plugins-base1.0-dev \
&& rm -rf /var/lib/apt/lists/*
COPY exitafterframe.c /s/
RUN cd /s \
&& gcc \
exitafterframe.c \
-o libexitafterframe.so \
-Ofast \
-s \
-Werror \
-Wall \
-Wextra \
-Wno-unused-parameter \
-fPIC \
-shared \
-Wl,--no-undefined \
$(pkg-config --cflags --libs gstreamer-1.0) \
&& mv libexitafterframe.so /usr/lib/x86_64-linux-gnu/gstreamer-1.0/ \
&& rm -rf /s
######################################
FROM ubuntu:20.04
RUN apt update && apt install -y --no-install-recommends \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
gstreamer1.0-rtsp \
gstreamer1.0-libav \
&& rm -rf /var/lib/apt/lists/*
COPY --from=exitafterframe /usr/lib/x86_64-linux-gnu/gstreamer-1.0/libexitafterframe.so /usr/lib/x86_64-linux-gnu/gstreamer-1.0/
COPY emptyvideo.ts /
COPY start.sh /

95
testimages/gstreamer/exitafterframe.c

@ -0,0 +1,95 @@ @@ -0,0 +1,95 @@
#include <gst/gst.h>
#include <gst/video/video.h>
GType gst_exitafterframe_get_type ();
#define GST_TYPE_EXITAFTERFRAME (gst_exitafterframe_get_type())
#define GST_EXITAFTERFRAME(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_EXITAFTERFRAME,GstExitAfterFrame))
typedef struct
{
GstElement element;
GstPad *srcpad;
GstPad *sinkpad;
} GstExitAfterFrame;
typedef struct
{
GstElementClass parent_class;
} GstExitAfterFrameClass;
#define gst_exitafterframe_parent_class parent_class
G_DEFINE_TYPE (GstExitAfterFrame, gst_exitafterframe, GST_TYPE_ELEMENT);
static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE(
"sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS("video/x-raw")
);
static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE(
"src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS("video/x-raw")
);
static GstFlowReturn
gst_exitafterframe_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstExitAfterFrame *filter = GST_EXITAFTERFRAME (parent);
exit(0);
return gst_pad_push (filter->srcpad, buf);
}
static void
gst_exitafterframe_class_init(GstExitAfterFrameClass* klass) {
GstElementClass* element_class = (GstElementClass*)klass;
gst_element_class_set_details_simple(
element_class,
"Plugin",
"FIXME:Generic",
"FIXME:Generic Template Element",
"AUTHOR_NAME AUTHOR_EMAIL"
);
gst_element_class_add_pad_template(element_class,
gst_static_pad_template_get(&src_factory));
gst_element_class_add_pad_template(element_class,
gst_static_pad_template_get(&sink_factory));
}
static void
gst_exitafterframe_init (GstExitAfterFrame* filter)
{
GstElement* element = GST_ELEMENT(filter);
g_print("[lasermqtt] init()\n");
filter->sinkpad = gst_pad_new_from_static_template(&sink_factory, "sink");
gst_pad_set_chain_function(filter->sinkpad, gst_exitafterframe_chain);
GST_PAD_SET_PROXY_CAPS(filter->sinkpad);
gst_element_add_pad(element, filter->sinkpad);
filter->srcpad = gst_pad_new_from_static_template(&src_factory, "src");
GST_PAD_SET_PROXY_CAPS(filter->srcpad);
gst_element_add_pad(element, filter->srcpad);
}
static gboolean
plugin_init (GstPlugin * plugin)
{
return gst_element_register (plugin, "exitafterframe", GST_RANK_NONE,
GST_TYPE_EXITAFTERFRAME);
}
#define PACKAGE "exitafterframe"
GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, exitafterframe,
"exitafterframe", plugin_init, "1.0", "LGPL", "exitafterframe",
"http://example.com")

2
testimages/gstreamer/start.sh

@ -1,3 +1,3 @@ @@ -1,3 +1,3 @@
#!/bin/sh -e
exec gst-launch-1.0 $@
exec gst-launch-1.0 $@ 2>&1

Loading…
Cancel
Save