Browse Source

Support reading with RTMP (#218)

pull/235/head
aler9 5 years ago
parent
commit
186a91800a
  1. 14
      README.md
  2. 7
      go.mod
  3. 10
      go.sum
  4. 45
      internal/client/client.go
  5. 12
      internal/clientman/clientman.go
  6. 487
      internal/clientrtmp/client.go
  7. 30
      internal/clientrtsp/client.go
  8. 143
      internal/path/path.go
  9. 4
      internal/path/readersmap.go
  10. 82
      internal/pathman/pathman.go
  11. 52
      internal/rtmputils/conn.go
  12. 13
      internal/rtmputils/connpair.go
  13. 53
      internal/rtmputils/metadata.go
  14. 10
      internal/serverrtmp/server.go
  15. 70
      internal/sourcertmp/source.go
  16. 4
      main.go
  17. 287
      main_test.go

14
README.md

@ -9,14 +9,14 @@
[![Release](https://img.shields.io/github/v/release/aler9/rtsp-simple-server)](https://github.com/aler9/rtsp-simple-server/releases) [![Release](https://img.shields.io/github/v/release/aler9/rtsp-simple-server)](https://github.com/aler9/rtsp-simple-server/releases)
[![Docker Hub](https://img.shields.io/badge/docker-aler9/rtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server) [![Docker Hub](https://img.shields.io/badge/docker-aler9/rtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server)
_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP/RTMP server and proxy, a software that allows users to publish, read and proxy live video and audio streams. RTSP is a specification that describes how to perform these operations with the help of a server, that is contacted by both publishers and readers and relays the publisher's streams to the readers. _rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP / RTMP server and proxy, a software that allows users to publish, read and proxy live video and audio streams. RTSP is a specification that describes how to perform these operations with the help of a server, that is contacted by both publishers and readers and relays the publisher's streams to the readers.
Features: Features:
* Publish live streams with RTSP (UDP or TCP mode) or RTMP * Publish live streams with RTSP (UDP or TCP mode) or RTMP
* Read live streams with RTSP * Read live streams with RTSP or RTMP
* Pull and serve streams from other RTSP / RTMP servers or cameras, always or on-demand (RTSP proxy) * Pull and serve streams from other RTSP / RTMP servers or cameras, always or on-demand (RTSP proxy)
* Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM) * Each stream can have multiple video and audio tracks, encoded with any codec (including H264, H265, VP8, VP9, MPEG2, MP3, AAC, Opus, PCM, JPEG)
* Serve multiple streams at once in separate paths * Serve multiple streams at once in separate paths
* Encrypt streams with TLS (RTSPS) * Encrypt streams with TLS (RTSPS)
* Authenticate readers and publishers * Authenticate readers and publishers
@ -37,7 +37,7 @@ Features:
* [Authentication](#authentication) * [Authentication](#authentication)
* [Encrypt the configuration](#encrypt-the-configuration) * [Encrypt the configuration](#encrypt-the-configuration)
* [RTSP proxy mode](#rtsp-proxy-mode) * [RTSP proxy mode](#rtsp-proxy-mode)
* [RTMP server](#rtmp-server) * [RTMP protocol](#rtmp-protocol)
* [Publish a webcam](#publish-a-webcam) * [Publish a webcam](#publish-a-webcam)
* [Publish a Raspberry Pi Camera](#publish-a-raspberry-pi-camera) * [Publish a Raspberry Pi Camera](#publish-a-raspberry-pi-camera)
* [Convert streams to HLS](#convert-streams-to-hls) * [Convert streams to HLS](#convert-streams-to-hls)
@ -280,15 +280,15 @@ paths:
sourceOnDemand: yes sourceOnDemand: yes
``` ```
### RTMP server ### RTMP protocol
RTMP is a protocol that is used to read and publish streams, but is less versatile and less efficient than RTSP (doesn't support UDP, encryption, most RTSP codecs, feedback mechanism). If there is need of receiving streams from a software that supports only RTMP (for instance, OBS Studio and DJI drones), it's possible to turn on a RTMP listener: RTMP is a protocol that is used to read and publish streams, but is less versatile and less efficient than RTSP (doesn't support UDP, encryption, most RTSP codecs, feedback mechanism). If there is need of publishing or reading streams from a software that supports only RTMP (for instance, OBS Studio and DJI drones), it's possible to turn on a RTMP listener:
```yml ```yml
rtmpEnable: yes rtmpEnable: yes
``` ```
Streams can then be published with the RTMP protocol, for instance with _FFmpeg_: Streams can then be published or read with the RTMP protocol, for instance with _FFmpeg_:
``` ```
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f flv rtmp://localhost/mystream ffmpeg -re -stream_loop -1 -i file.ts -c copy -f flv rtmp://localhost/mystream

7
go.mod

@ -5,14 +5,17 @@ go 1.15
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210306084624-260af6e04194 github.com/aler9/gortsplib v0.0.0-20210310150132-830e3079e366
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/notedit/rtmp v0.0.2 github.com/notedit/rtmp v0.0.0
github.com/pion/rtp v1.6.2 // indirect
github.com/pion/sdp/v3 v3.0.2 github.com/pion/sdp/v3 v3.0.2
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v2 v2.2.8
) )
replace github.com/notedit/rtmp => github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d

10
go.sum

@ -2,8 +2,10 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210306084624-260af6e04194 h1:LMhKYN1HfEvVPV5YL6ZLR6eQgXbz2H4bdb1eQK7lrQA= github.com/aler9/gortsplib v0.0.0-20210310150132-830e3079e366 h1:68edOFG2H1ntH5FhGZQJ72aq61wjiC9xRuvNI8/6haU=
github.com/aler9/gortsplib v0.0.0-20210306084624-260af6e04194/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= github.com/aler9/gortsplib v0.0.0-20210310150132-830e3079e366/go.mod h1:aj4kDzanb3JZ46sFywWShcsnqqXTLE/3PNjwDhQZGM0=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d h1:LAX8pNvYpGgFpKdbPpEZWjNkHbmyvjMrT3vO7s7aaKU=
github.com/aler9/rtmp v0.0.0-20210309202041-2d7177b7300d/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@ -12,14 +14,14 @@ github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/notedit/rtmp v0.0.2 h1:5+to4yezKATiJgnrcETu9LbV5G/QsWkOV9Ts2M/p33w=
github.com/notedit/rtmp v0.0.2/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM= github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0= github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk= github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g= github.com/pion/sdp/v3 v3.0.2 h1:UNnSPVaMM+Pdu/mR9UvAyyo6zkdYbKeuOooCwZvTl/g=
github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk= github.com/pion/sdp/v3 v3.0.2/go.mod h1:bNiSknmJE0HYBprTHXKPQ3+JjacTv5uap92ueJZKsRk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

45
internal/client/client.go

@ -1,6 +1,8 @@
package client package client
import ( import (
"fmt"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
@ -8,17 +10,14 @@ import (
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
) )
// Client can be // ErrNoOnePublishing is a "no one is publishing" error.
// *clientrtsp.Client type ErrNoOnePublishing struct {
// *clientrtmp.Client PathName string
type Client interface { }
IsClient()
IsSource() // Error implements the error interface.
Close() func (e ErrNoOnePublishing) Error() string {
Authenticate([]headers.AuthMethod, return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
string, []interface{},
string, string, interface{}) error
OnReaderFrame(int, gortsplib.StreamType, []byte)
} }
// ErrAuthNotCritical is a non-critical authentication error. // ErrAuthNotCritical is a non-critical authentication error.
@ -52,22 +51,22 @@ type DescribeRes struct {
type DescribeReq struct { type DescribeReq struct {
Client Client Client Client
PathName string PathName string
Req *base.Request Data *base.Request
Res chan DescribeRes Res chan DescribeRes
} }
// SetupPlayRes is a setup/play response. // SetupPlayRes is a setup/play response.
type SetupPlayRes struct { type SetupPlayRes struct {
Path Path Path Path
Err error Tracks gortsplib.Tracks
Err error
} }
// SetupPlayReq is a setup/play request. // SetupPlayReq is a setup/play request.
type SetupPlayReq struct { type SetupPlayReq struct {
Client Client Client Client
PathName string PathName string
TrackID int Data interface{}
Req *base.Request
Res chan SetupPlayRes Res chan SetupPlayRes
} }
@ -82,7 +81,7 @@ type AnnounceReq struct {
Client Client Client Client
PathName string PathName string
Tracks gortsplib.Tracks Tracks gortsplib.Tracks
Req interface{} Data interface{}
Res chan AnnounceRes Res chan AnnounceRes
} }
@ -113,7 +112,6 @@ type PauseReq struct {
// Path is implemented by path.Path. // Path is implemented by path.Path.
type Path interface { type Path interface {
Name() string Name() string
SourceTrackCount() int
Conf() *conf.PathConf Conf() *conf.PathConf
OnClientRemove(RemoveReq) OnClientRemove(RemoveReq)
OnClientPlay(PlayReq) OnClientPlay(PlayReq)
@ -121,3 +119,14 @@ type Path interface {
OnClientPause(PauseReq) OnClientPause(PauseReq)
OnFrame(int, gortsplib.StreamType, []byte) OnFrame(int, gortsplib.StreamType, []byte)
} }
// Client is implemented by all client*.
type Client interface {
IsClient()
IsSource()
Close()
Authenticate([]headers.AuthMethod,
string, []interface{},
string, string, interface{}) error
OnIncomingFrame(int, gortsplib.StreamType, []byte)
}

12
internal/clientman/clientman.go

@ -34,6 +34,8 @@ type Parent interface {
type ClientManager struct { type ClientManager struct {
rtspPort int rtspPort int
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration
readBufferCount int
runOnConnect string runOnConnect string
runOnConnectRestart bool runOnConnectRestart bool
protocols map[base.StreamProtocol]struct{} protocols map[base.StreamProtocol]struct{}
@ -59,6 +61,8 @@ type ClientManager struct {
func New( func New(
rtspPort int, rtspPort int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration,
readBufferCount int,
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
protocols map[base.StreamProtocol]struct{}, protocols map[base.StreamProtocol]struct{},
@ -72,6 +76,8 @@ func New(
cm := &ClientManager{ cm := &ClientManager{
rtspPort: rtspPort, rtspPort: rtspPort,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
runOnConnect: runOnConnect, runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart, runOnConnectRestart: runOnConnectRestart,
protocols: protocols, protocols: protocols,
@ -120,11 +126,11 @@ func (cm *ClientManager) run() {
return make(chan *gortsplib.ServerConn) return make(chan *gortsplib.ServerConn)
}() }()
rtmpAccept := func() chan rtmputils.ConnPair { rtmpAccept := func() chan *rtmputils.Conn {
if cm.serverRTMP != nil { if cm.serverRTMP != nil {
return cm.serverRTMP.Accept() return cm.serverRTMP.Accept()
} }
return make(chan rtmputils.ConnPair) return make(chan *rtmputils.Conn)
}() }()
outer: outer:
@ -162,6 +168,8 @@ outer:
c := clientrtmp.New( c := clientrtmp.New(
cm.rtspPort, cm.rtspPort,
cm.readTimeout, cm.readTimeout,
cm.writeTimeout,
cm.readBufferCount,
cm.runOnConnect, cm.runOnConnect,
cm.runOnConnectRestart, cm.runOnConnectRestart,
&cm.wg, &cm.wg,

487
internal/clientrtmp/client.go

@ -14,6 +14,7 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers" "github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtpaac" "github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264" "github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av" "github.com/notedit/rtmp/av"
@ -47,10 +48,19 @@ func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
return false return false
} }
func pathNameAndQuery(inURL *url.URL) (string, url.Values) {
// remove trailing slashes inserted by OBS and some other clients
tmp := strings.TrimSuffix(inURL.String(), "/")
ur, _ := url.Parse(tmp)
pathName := strings.TrimPrefix(ur.Path, "/")
return pathName, ur.Query()
}
// Parent is implemented by clientman.ClientMan. // Parent is implemented by clientman.ClientMan.
type Parent interface { type Parent interface {
Log(logger.Level, string, ...interface{}) Log(logger.Level, string, ...interface{})
OnClientClose(client.Client) OnClientClose(client.Client)
OnClientSetupPlay(client.SetupPlayReq)
OnClientAnnounce(client.AnnounceReq) OnClientAnnounce(client.AnnounceReq)
} }
@ -58,14 +68,21 @@ type Parent interface {
type Client struct { type Client struct {
rtspPort int rtspPort int
readTimeout time.Duration readTimeout time.Duration
writeTimeout time.Duration
readBufferCount int
runOnConnect string runOnConnect string
runOnConnectRestart bool runOnConnectRestart bool
stats *stats.Stats stats *stats.Stats
wg *sync.WaitGroup wg *sync.WaitGroup
conn rtmputils.ConnPair conn *rtmputils.Conn
parent Parent parent Parent
path client.Path // read mode only
h264Decoder *rtph264.Decoder
videoTrack *gortsplib.Track
aacDecoder *rtpaac.Decoder
audioTrack *gortsplib.Track
ringBuffer *ringbuffer.RingBuffer
// in // in
terminate chan struct{} terminate chan struct{}
@ -75,16 +92,20 @@ type Client struct {
func New( func New(
rtspPort int, rtspPort int,
readTimeout time.Duration, readTimeout time.Duration,
writeTimeout time.Duration,
readBufferCount int,
runOnConnect string, runOnConnect string,
runOnConnectRestart bool, runOnConnectRestart bool,
wg *sync.WaitGroup, wg *sync.WaitGroup,
stats *stats.Stats, stats *stats.Stats,
conn rtmputils.ConnPair, conn *rtmputils.Conn,
parent Parent) *Client { parent Parent) *Client {
c := &Client{ c := &Client{
rtspPort: rtspPort, rtspPort: rtspPort,
readTimeout: readTimeout, readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
runOnConnect: runOnConnect, runOnConnect: runOnConnect,
runOnConnectRestart: runOnConnectRestart, runOnConnectRestart: runOnConnectRestart,
wg: wg, wg: wg,
@ -116,11 +137,11 @@ func (c *Client) IsClient() {}
func (c *Client) IsSource() {} func (c *Client) IsSource() {}
func (c *Client) log(level logger.Level, format string, args ...interface{}) { func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NConn.RemoteAddr().String()}, args...)...) c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
} }
func (c *Client) ip() net.IP { func (c *Client) ip() net.IP {
return c.conn.NConn.RemoteAddr().(*net.TCPAddr).IP return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
} }
func (c *Client) run() { func (c *Client) run() {
@ -135,9 +156,44 @@ func (c *Client) run() {
defer onConnectCmd.Close() defer onConnectCmd.Close()
} }
if !c.conn.RConn.Publishing { if c.conn.IsPublishing() {
c.conn.NConn.Close() c.runPublish()
c.log(logger.Info, "ERR: client is not publishing") } else {
c.runRead()
}
}
func (c *Client) runRead() {
var path client.Path
var tracks gortsplib.Tracks
err := func() error {
pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.SetupPlayRes)
c.parent.OnClientSetupPlay(client.SetupPlayReq{c, pathName, query, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch res.Err.(type) {
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
case <-c.terminate:
}
}
return res.Err
}
path = res.Path
tracks = res.Tracks
return nil
}()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
c.conn.NetConn().Close()
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
<-c.terminate <-c.terminate
@ -145,122 +201,285 @@ func (c *Client) run() {
} }
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
var h264SPS []byte
var h264PPS []byte
var audioTrack *gortsplib.Track var audioTrack *gortsplib.Track
var err error var aacConfig []byte
var tracks gortsplib.Tracks
var h264Encoder *rtph264.Encoder
var aacEncoder *rtpaac.Encoder
metadataDone := make(chan struct{}) err = func() error {
go func() { for i, t := range tracks {
defer close(metadataDone) if t.IsH264() {
err = func() error { if videoTrack != nil {
videoTrack, audioTrack, err = rtmputils.Metadata(c.conn, c.readTimeout) return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1)
if err != nil { }
return err videoTrack = t
}
if videoTrack != nil {
var err error var err error
h264Encoder, err = rtph264.NewEncoder(96) h264SPS, h264PPS, err = t.ExtractDataH264()
if err != nil { if err != nil {
return err return err
} }
tracks = append(tracks, videoTrack)
}
if audioTrack != nil { } else if t.IsAAC() {
clockRate, _ := audioTrack.ClockRate() if audioTrack != nil {
return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1)
}
audioTrack = t
var err error var err error
aacEncoder, err = rtpaac.NewEncoder(96, clockRate) aacConfig, err = t.ExtractDataAAC()
if err != nil { if err != nil {
return err return err
} }
tracks = append(tracks, audioTrack)
} }
}
for i, t := range tracks { if videoTrack == nil && audioTrack == nil {
t.ID = i return fmt.Errorf("unable to find a video or audio track")
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
rtmputils.WriteMetadata(c.conn, videoTrack, audioTrack)
if videoTrack != nil {
codec := h264.Codec{
SPS: map[int][]byte{
0: h264SPS,
},
PPS: map[int][]byte{
0: h264PPS,
},
}
b := make([]byte, 128)
var n int
codec.ToConfig(b, &n)
b = b[:n]
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WritePacket(av.Packet{
Type: av.H264DecoderConfig,
Data: b,
})
c.h264Decoder = rtph264.NewDecoder()
c.videoTrack = videoTrack
}
if audioTrack != nil {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
c.conn.WritePacket(av.Packet{
Type: av.AACDecoderConfig,
Data: aacConfig,
})
c.aacDecoder = rtpaac.NewDecoder(48000)
c.audioTrack = audioTrack
}
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
resc := make(chan struct{})
path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is reading from path '%s'", path.Name())
return nil
}()
if err != nil {
c.conn.NetConn().Close()
c.log(logger.Info, "ERR: %v", err)
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
path = nil
c.parent.OnClientClose(c)
<-c.terminate
}
writerDone := make(chan error)
go func() {
writerDone <- func() error {
videoInitialized := false
var videoStartDTS time.Time
var videoBuf [][]byte
var videoPTS time.Duration
for {
data, ok := c.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
now := time.Now()
switch tdata := data.(type) {
case *rtph264.NALUAndTimestamp:
if !videoInitialized {
videoInitialized = true
videoStartDTS = now
videoPTS = tdata.Timestamp
}
// aggregate NALUs by PTS
if tdata.Timestamp != videoPTS {
pkt := av.Packet{
Type: av.H264,
Data: h264.FillNALUsAVCC(videoBuf),
Time: now.Sub(videoStartDTS),
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WritePacket(pkt)
if err != nil {
return err
}
videoBuf = nil
}
videoPTS = tdata.Timestamp
videoBuf = append(videoBuf, tdata.NALU)
case *rtpaac.AUAndTimestamp:
pkt := av.Packet{
Type: av.AAC,
Data: tdata.AU,
Time: tdata.Timestamp,
}
c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout))
err := c.conn.WritePacket(pkt)
if err != nil {
return err
}
}
} }
return nil
}() }()
}() }()
select { select {
case <-metadataDone: case err := <-writerDone:
case <-c.terminate: c.conn.NetConn().Close()
c.conn.NConn.Close()
<-metadataDone
}
if err != nil { if err != io.EOF {
c.conn.NConn.Close() c.log(logger.Info, "ERR: %s", err)
c.log(logger.Info, "ERR: %s", err) }
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
path = nil
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
<-c.terminate <-c.terminate
return
case <-c.terminate:
res := make(chan struct{})
path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.ringBuffer.Close()
c.conn.NetConn().Close()
<-writerDone
path = nil
} }
}
err = func() error { func (c *Client) runPublish() {
// remove trailing slash, that is inserted by OBS var videoTrack *gortsplib.Track
tmp := strings.TrimSuffix(c.conn.RConn.URL.String(), "/") var audioTrack *gortsplib.Track
ur, _ := url.Parse(tmp) var err error
pathName := strings.TrimPrefix(ur.Path, "/") var tracks gortsplib.Tracks
var h264Encoder *rtph264.Encoder
var aacEncoder *rtpaac.Encoder
var path client.Path
resc := make(chan client.AnnounceRes) setupDone := make(chan struct{})
c.parent.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, ur.Query(), resc}) //nolint:govet go func() {
res := <-resc defer close(setupDone)
err = func() error {
c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
videoTrack, audioTrack, err = rtmputils.ReadMetadata(c.conn)
if err != nil {
return err
}
if res.Err != nil { if videoTrack != nil {
switch res.Err.(type) { h264Encoder = rtph264.NewEncoder(96, nil, nil, nil)
case client.ErrAuthNotCritical: tracks = append(tracks, videoTrack)
return res.Err }
case client.ErrAuthCritical: if audioTrack != nil {
// wait some seconds to stop brute force attacks clockRate, _ := audioTrack.ClockRate()
select { aacEncoder = rtpaac.NewEncoder(96, clockRate, nil, nil, nil)
case <-time.After(pauseAfterAuthError): tracks = append(tracks, audioTrack)
case <-c.terminate: }
}
return res.Err
default: for i, t := range tracks {
t.ID = i
}
pathName, query := pathNameAndQuery(c.conn.URL())
resc := make(chan client.AnnounceRes)
c.parent.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, query, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch res.Err.(type) {
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
case <-c.terminate:
}
}
return res.Err return res.Err
} }
}
c.path = res.Path resc2 := make(chan struct{})
return nil res.Path.OnClientRecord(client.RecordReq{c, resc2}) //nolint:govet
<-resc2
path = res.Path
c.log(logger.Info, "is publishing to path '%s', %d %s",
path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
return nil
}()
}() }()
select {
case <-setupDone:
case <-c.terminate:
c.conn.NetConn().Close()
<-setupDone
}
if err != nil { if err != nil {
c.conn.NetConn().Close()
c.log(logger.Info, "ERR: %s", err) c.log(logger.Info, "ERR: %s", err)
c.conn.NConn.Close()
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
<-c.terminate <-c.terminate
return return
} }
resc := make(chan struct{})
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
var onPublishCmd *externalcmd.Cmd var onPublishCmd *externalcmd.Cmd
if c.path.Conf().RunOnPublish != "" { if path.Conf().RunOnPublish != "" {
onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish, onPublishCmd = externalcmd.New(path.Conf().RunOnPublish,
c.path.Conf().RunOnPublishRestart, externalcmd.Environment{ path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: c.path.Name(), Path: path.Name(),
Port: strconv.FormatInt(int64(c.rtspPort), 10), Port: strconv.FormatInt(int64(c.rtspPort), 10),
}) })
} }
@ -269,17 +488,17 @@ func (c *Client) run() {
if path.Conf().RunOnPublish != "" { if path.Conf().RunOnPublish != "" {
onPublishCmd.Close() onPublishCmd.Close()
} }
}(c.path) }(path)
readerDone := make(chan error) readerDone := make(chan error)
go func() { go func() {
readerDone <- func() error { readerDone <- func() error {
rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, c.path.OnFrame) rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, path.OnFrame)
defer rtcpSenders.Close() defer rtcpSenders.Close()
for { for {
c.conn.NConn.SetReadDeadline(time.Now().Add(c.readTimeout)) c.conn.NetConn().SetReadDeadline(time.Now().Add(c.readTimeout))
pkt, err := c.conn.RConn.ReadPacket() pkt, err := c.conn.ReadPacket()
if err != nil { if err != nil {
return err return err
} }
@ -296,15 +515,20 @@ func (c *Client) run() {
return fmt.Errorf("invalid NALU format (%d)", typ) return fmt.Errorf("invalid NALU format (%d)", typ)
} }
// encode into RTP/H264 format for _, nalu := range nalus {
frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus) frames, err := h264Encoder.Encode(&rtph264.NALUAndTimestamp{
if err != nil { Timestamp: pkt.Time + pkt.CTime,
return err NALU: nalu,
} })
if err != nil {
for _, f := range frames { return err
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f) }
c.path.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f)
for _, frame := range frames {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
path.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, frame)
}
} }
case av.AAC: case av.AAC:
@ -312,15 +536,17 @@ func (c *Client) run() {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up") return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
} }
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data) frame, err := aacEncoder.Encode(&rtpaac.AUAndTimestamp{
Timestamp: pkt.Time,
AU: pkt.Data,
})
if err != nil { if err != nil {
return err return err
} }
for _, f := range frames { rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(),
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f) gortsplib.StreamTypeRTP, frame)
c.path.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f) path.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, frame)
}
default: default:
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type) return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)
@ -331,32 +557,28 @@ func (c *Client) run() {
select { select {
case err := <-readerDone: case err := <-readerDone:
c.conn.NConn.Close() c.conn.NetConn().Close()
if err != io.EOF { if err != io.EOF {
c.log(logger.Info, "ERR: %s", err) c.log(logger.Info, "ERR: %s", err)
} }
if c.path != nil { res := make(chan struct{})
res := make(chan struct{}) path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet <-res
<-res path = nil
c.path = nil
}
c.parent.OnClientClose(c) c.parent.OnClientClose(c)
<-c.terminate <-c.terminate
case <-c.terminate: case <-c.terminate:
c.conn.NConn.Close() c.conn.NetConn().Close()
<-readerDone <-readerDone
if c.path != nil { res := make(chan struct{})
res := make(chan struct{}) path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet <-res
<-res path = nil
c.path = nil
}
} }
} }
@ -391,6 +613,39 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod,
return nil return nil
} }
// OnReaderFrame implements path.Reader. // OnIncomingFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
if streamType == gortsplib.StreamTypeRTP {
if c.videoTrack != nil {
if trackID == c.videoTrack.ID {
nts, err := c.h264Decoder.Decode(buf)
if err != nil {
if err != rtph264.ErrMorePacketsNeeded {
c.log(logger.Debug, "ERR while decoding video track: %v", err)
}
return
}
for _, nt := range nts {
c.ringBuffer.Push(nt)
}
return
}
}
if c.audioTrack != nil {
if trackID == c.audioTrack.ID {
ats, err := c.aacDecoder.Decode(buf)
if err != nil {
c.log(logger.Debug, "ERR while decoding audio track: %v", err)
return
}
for _, at := range ats {
c.ringBuffer.Push(at)
}
return
}
}
}
} }

30
internal/clientrtsp/client.go

@ -27,16 +27,6 @@ const (
pauseAfterAuthError = 2 * time.Second pauseAfterAuthError = 2 * time.Second
) )
// ErrNoOnePublishing is a "no one is publishing" error.
type ErrNoOnePublishing struct {
PathName string
}
// Error implements the error interface.
func (e ErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
}
func ipEqualOrInRange(ip net.IP, ips []interface{}) bool { func ipEqualOrInRange(ip net.IP, ips []interface{}) bool {
for _, item := range ips { for _, item := range ips {
switch titem := item.(type) { switch titem := item.(type) {
@ -116,9 +106,9 @@ func New(
atomic.AddInt64(c.stats.CountClients, 1) atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected (%s)", func() string { c.log(logger.Info, "connected (%s)", func() string {
if isTLS { if isTLS {
return "encrypted" return "RTSP/TLS"
} }
return "plain" return "RTSP/TCP"
}()) }())
c.wg.Add(1) c.wg.Add(1)
@ -194,7 +184,7 @@ func (c *Client) run() {
} }
return terr.Response, errTerminated return terr.Response, errTerminated
case ErrNoOnePublishing: case client.ErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, res.Err }, res.Err
@ -308,7 +298,7 @@ func (c *Client) run() {
} }
resc := make(chan client.SetupPlayRes) resc := make(chan client.SetupPlayRes)
c.parent.OnClientSetupPlay(client.SetupPlayReq{c, reqPath, trackID, req, resc}) //nolint:govet c.parent.OnClientSetupPlay(client.SetupPlayReq{c, reqPath, req, resc}) //nolint:govet
res := <-resc res := <-resc
if res.Err != nil { if res.Err != nil {
@ -324,7 +314,7 @@ func (c *Client) run() {
} }
return terr.Response, errTerminated return terr.Response, errTerminated
case ErrNoOnePublishing: case client.ErrNoOnePublishing:
return &base.Response{ return &base.Response{
StatusCode: base.StatusNotFound, StatusCode: base.StatusNotFound,
}, res.Err }, res.Err
@ -338,6 +328,12 @@ func (c *Client) run() {
c.path = res.Path c.path = res.Path
if trackID >= len(res.Tracks) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, fmt.Errorf("track %d does not exist", trackID)
}
default: // record default: // record
reqPathAndQuery, ok := req.URL.RTSPPathAndQuery() reqPathAndQuery, ok := req.URL.RTSPPathAndQuery()
if !ok { if !ok {
@ -651,8 +647,8 @@ func (c *Client) recordStop() {
} }
} }
// OnReaderFrame implements path.Reader. // OnIncomingFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { func (c *Client) OnIncomingFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
if !c.conn.HasSetuppedTrack(trackID) { if !c.conn.HasSetuppedTrack(trackID) {
return return
} }

143
internal/path/path.go

@ -12,7 +12,6 @@ import (
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client" "github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd" "github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
@ -34,18 +33,12 @@ type Parent interface {
OnPathClientClose(client.Client) OnPathClientClose(client.Client)
} }
// source can be // source is implemented by all sources (client* and source*).
// * client.Client
// * sourcertsp.Source
// * sourcertmp.Source
// * sourceRedirect
type source interface { type source interface {
IsSource() IsSource()
} }
// a sourceExternal can be // sourceExternal is implemented by all source*.
// * sourcertsp.Source
// * sourcertmp.Source
type sourceExternal interface { type sourceExternal interface {
IsSource() IsSource()
IsSourceExternal() IsSourceExternal()
@ -93,8 +86,7 @@ type Path struct {
describeRequests []client.DescribeReq describeRequests []client.DescribeReq
setupPlayRequests []client.SetupPlayReq setupPlayRequests []client.SetupPlayReq
source source source source
sourceTrackCount int sourceTracks gortsplib.Tracks
sourceSdp []byte
readers *readersMap readers *readersMap
onDemandCmd *externalcmd.Cmd onDemandCmd *externalcmd.Cmd
describeTimer *time.Timer describeTimer *time.Timer
@ -108,15 +100,15 @@ type Path struct {
closeTimerStarted bool closeTimerStarted bool
// in // in
sourceSetReady chan struct{} // from source sourceSetReady chan struct{} // from source
sourceSetNotReady chan struct{} // from source sourceSetNotReady chan struct{} // from source
clientDescribe chan client.DescribeReq // from program clientDescribe chan client.DescribeReq
clientAnnounce chan client.AnnounceReq // from program clientSetupPlay chan client.SetupPlayReq
clientSetupPlay chan client.SetupPlayReq // from program clientAnnounce chan client.AnnounceReq
clientPlay chan client.PlayReq // from client clientPlay chan client.PlayReq
clientRecord chan client.RecordReq // from client clientRecord chan client.RecordReq
clientPause chan client.PauseReq // from client clientPause chan client.PauseReq
clientRemove chan client.RemoveReq // from client clientRemove chan client.RemoveReq
terminate chan struct{} terminate chan struct{}
} }
@ -155,8 +147,8 @@ func New(
sourceSetReady: make(chan struct{}), sourceSetReady: make(chan struct{}),
sourceSetNotReady: make(chan struct{}), sourceSetNotReady: make(chan struct{}),
clientDescribe: make(chan client.DescribeReq), clientDescribe: make(chan client.DescribeReq),
clientAnnounce: make(chan client.AnnounceReq),
clientSetupPlay: make(chan client.SetupPlayReq), clientSetupPlay: make(chan client.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq),
clientPlay: make(chan client.PlayReq), clientPlay: make(chan client.PlayReq),
clientRecord: make(chan client.RecordReq), clientRecord: make(chan client.RecordReq),
clientPause: make(chan client.PauseReq), clientPause: make(chan client.PauseReq),
@ -208,7 +200,7 @@ outer:
pa.describeRequests = nil pa.describeRequests = nil
for _, req := range pa.setupPlayRequests { for _, req := range pa.setupPlayRequests {
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
} }
pa.setupPlayRequests = nil pa.setupPlayRequests = nil
@ -252,18 +244,13 @@ outer:
case req := <-pa.clientSetupPlay: case req := <-pa.clientSetupPlay:
pa.onClientSetupPlay(req) pa.onClientSetupPlay(req)
case req := <-pa.clientAnnounce:
pa.onClientAnnounce(req)
case req := <-pa.clientPlay: case req := <-pa.clientPlay:
pa.onClientPlay(req.Client) pa.onClientPlay(req.Client)
close(req.Res) close(req.Res)
case req := <-pa.clientAnnounce:
err := pa.onClientAnnounce(req.Client, req.Tracks)
if err != nil {
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue
}
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
case req := <-pa.clientRecord: case req := <-pa.clientRecord:
pa.onClientRecord(req.Client) pa.onClientRecord(req.Client)
close(req.Res) close(req.Res)
@ -318,7 +305,7 @@ outer:
} }
for _, req := range pa.setupPlayRequests { for _, req := range pa.setupPlayRequests {
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
} }
for c, state := range pa.clients { for c, state := range pa.clients {
@ -339,8 +326,8 @@ outer:
close(pa.sourceSetReady) close(pa.sourceSetReady)
close(pa.sourceSetNotReady) close(pa.sourceSetNotReady)
close(pa.clientDescribe) close(pa.clientDescribe)
close(pa.clientAnnounce)
close(pa.clientSetupPlay) close(pa.clientSetupPlay)
close(pa.clientAnnounce)
close(pa.clientPlay) close(pa.clientPlay)
close(pa.clientRecord) close(pa.clientRecord)
close(pa.clientPause) close(pa.clientPause)
@ -367,17 +354,17 @@ func (pa *Path) exhaustChannels() {
} }
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientAnnounce: case req, ok := <-pa.clientSetupPlay:
if !ok { if !ok {
return return
} }
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientSetupPlay: case req, ok := <-pa.clientAnnounce:
if !ok { if !ok {
return return
} }
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientPlay: case req, ok := <-pa.clientPlay:
if !ok { if !ok {
@ -508,7 +495,7 @@ func (pa *Path) onSourceSetReady() {
pa.sourceState = sourceStateReady pa.sourceState = sourceStateReady
for _, req := range pa.describeRequests { for _, req := range pa.describeRequests {
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet
} }
pa.describeRequests = nil pa.describeRequests = nil
@ -594,7 +581,7 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) {
switch pa.sourceState { switch pa.sourceState {
case sourceStateReady: case sourceStateReady:
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet req.Res <- client.DescribeRes{pa.sourceTracks.Write(), "", nil} //nolint:govet
return return
case sourceStateWaitingDescribe: case sourceStateWaitingDescribe:
@ -606,9 +593,9 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) {
fallbackURL := func() string { fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") { if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{ ur := base.URL{
Scheme: req.Req.URL.Scheme, Scheme: req.Data.URL.Scheme,
User: req.Req.URL.User, User: req.Data.URL.User,
Host: req.Req.URL.Host, Host: req.Data.URL.Host,
Path: pa.conf.Fallback, Path: pa.conf.Fallback,
} }
return ur.String() return ur.String()
@ -619,17 +606,31 @@ func (pa *Path) onClientDescribe(req client.DescribeReq) {
return return
} }
req.Res <- client.DescribeRes{nil, "", clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet req.Res <- client.DescribeRes{nil, "", client.ErrNoOnePublishing{pa.name}} //nolint:govet
return return
} }
} }
func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) { func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) {
if req.TrackID >= pa.sourceTrackCount { pa.fixedPublisherStart()
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet pa.scheduleClose()
switch pa.sourceState {
case sourceStateReady:
pa.onClientSetupPlayPost(req)
return
case sourceStateWaitingDescribe:
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return
case sourceStateNotReady:
req.Res <- client.SetupPlayRes{nil, nil, client.ErrNoOnePublishing{pa.name}} //nolint:govet
return return
} }
}
func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
if _, ok := pa.clients[req.Client]; !ok { if _, ok := pa.clients[req.Client]; !ok {
// prevent on-demand source from closing // prevent on-demand source from closing
if pa.sourceCloseTimerStarted { if pa.sourceCloseTimerStarted {
@ -646,26 +647,7 @@ func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
pa.addClient(req.Client, clientStatePrePlay) pa.addClient(req.Client, clientStatePrePlay)
} }
req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet req.Res <- client.SetupPlayRes{pa, pa.sourceTracks, nil} //nolint:govet
}
func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
switch pa.sourceState {
case sourceStateReady:
pa.onClientSetupPlayPost(req)
return
case sourceStateWaitingDescribe:
pa.setupPlayRequests = append(pa.setupPlayRequests, req)
return
case sourceStateNotReady:
req.Res <- client.SetupPlayRes{nil, clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet
return
}
} }
func (pa *Path) onClientPlay(c client.Client) { func (pa *Path) onClientPlay(c client.Client) {
@ -680,17 +662,18 @@ func (pa *Path) onClientPlay(c client.Client) {
atomic.AddInt64(pa.stats.CountReaders, 1) atomic.AddInt64(pa.stats.CountReaders, 1)
pa.clients[c] = clientStatePlay pa.clients[c] = clientStatePlay
pa.readers.add(c) pa.readers.add(c)
} }
func (pa *Path) onClientAnnounce(c client.Client, tracks gortsplib.Tracks) error { func (pa *Path) onClientAnnounce(req client.AnnounceReq) {
if _, ok := pa.clients[c]; ok { if _, ok := pa.clients[req.Client]; ok {
return fmt.Errorf("already subscribed") req.Res <- client.AnnounceRes{nil, fmt.Errorf("already publishing or reading")} //nolint:govet
return
} }
if pa.hasExternalSource() { if pa.hasExternalSource() {
return fmt.Errorf("path '%s' is assigned to an external source", pa.name) req.Res <- client.AnnounceRes{nil, fmt.Errorf("path '%s' is assigned to an external source", pa.name)} //nolint:govet
return
} }
if pa.source != nil { if pa.source != nil {
@ -707,12 +690,11 @@ func (pa *Path) onClientAnnounce(c client.Client, tracks gortsplib.Tracks) error
} }
} }
pa.addClient(c, clientStatePreRecord) pa.addClient(req.Client, clientStatePreRecord)
pa.source = c pa.source = req.Client
pa.sourceTrackCount = len(tracks) pa.sourceTracks = req.Tracks
pa.sourceSdp = tracks.Write() req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
return nil
} }
func (pa *Path) onClientRecord(c client.Client) { func (pa *Path) onClientRecord(c client.Client) {
@ -727,7 +709,6 @@ func (pa *Path) onClientRecord(c client.Client) {
atomic.AddInt64(pa.stats.CountPublishers, 1) atomic.AddInt64(pa.stats.CountPublishers, 1)
pa.clients[c] = clientStateRecord pa.clients[c] = clientStateRecord
pa.onSourceSetReady() pa.onSourceSetReady()
} }
@ -740,13 +721,11 @@ func (pa *Path) onClientPause(c client.Client) {
if state == clientStatePlay { if state == clientStatePlay {
atomic.AddInt64(pa.stats.CountReaders, -1) atomic.AddInt64(pa.stats.CountReaders, -1)
pa.clients[c] = clientStatePrePlay pa.clients[c] = clientStatePrePlay
pa.readers.remove(c) pa.readers.remove(c)
} else if state == clientStateRecord { } else if state == clientStateRecord {
atomic.AddInt64(pa.stats.CountPublishers, -1) atomic.AddInt64(pa.stats.CountPublishers, -1)
pa.clients[c] = clientStatePreRecord pa.clients[c] = clientStatePreRecord
pa.onSourceSetNotReady() pa.onSourceSetNotReady()
} }
} }
@ -813,15 +792,9 @@ func (pa *Path) Name() string {
return pa.name return pa.name
} }
// SourceTrackCount returns the number of tracks of the source this path.
func (pa *Path) SourceTrackCount() int {
return pa.sourceTrackCount
}
// OnSourceSetReady is called by a source. // OnSourceSetReady is called by a source.
func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) { func (pa *Path) OnSourceSetReady(tracks gortsplib.Tracks) {
pa.sourceSdp = tracks.Write() pa.sourceTracks = tracks
pa.sourceTrackCount = len(tracks)
pa.sourceSetReady <- struct{}{} pa.sourceSetReady <- struct{}{}
} }

4
internal/path/readersmap.go

@ -7,7 +7,7 @@ import (
) )
type reader interface { type reader interface {
OnReaderFrame(int, gortsplib.StreamType, []byte) OnIncomingFrame(int, gortsplib.StreamType, []byte)
} }
type readersMap struct { type readersMap struct {
@ -40,6 +40,6 @@ func (m *readersMap) forwardFrame(trackID int, streamType gortsplib.StreamType,
defer m.mutex.RUnlock() defer m.mutex.RUnlock()
for c := range m.ma { for c := range m.ma {
c.OnReaderFrame(trackID, streamType, buf) c.OnIncomingFrame(trackID, streamType, buf)
} }
} }

82
internal/pathman/pathman.go

@ -38,8 +38,8 @@ type PathManager struct {
confReload chan map[string]*conf.PathConf confReload chan map[string]*conf.PathConf
pathClose chan *path.Path pathClose chan *path.Path
clientDescribe chan client.DescribeReq clientDescribe chan client.DescribeReq
clientAnnounce chan client.AnnounceReq
clientSetupPlay chan client.SetupPlayReq clientSetupPlay chan client.SetupPlayReq
clientAnnounce chan client.AnnounceReq
terminate chan struct{} terminate chan struct{}
// out // out
@ -73,8 +73,8 @@ func New(
confReload: make(chan map[string]*conf.PathConf), confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path.Path), pathClose: make(chan *path.Path),
clientDescribe: make(chan client.DescribeReq), clientDescribe: make(chan client.DescribeReq),
clientAnnounce: make(chan client.AnnounceReq),
clientSetupPlay: make(chan client.SetupPlayReq), clientSetupPlay: make(chan client.SetupPlayReq),
clientAnnounce: make(chan client.AnnounceReq),
terminate: make(chan struct{}), terminate: make(chan struct{}),
clientClose: make(chan client.Client), clientClose: make(chan client.Client),
done: make(chan struct{}), done: make(chan struct{}),
@ -155,9 +155,13 @@ outer:
continue continue
} }
err = req.Client.Authenticate(pm.authMethods, req.PathName, err = req.Client.Authenticate(
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass, pm.authMethods,
req.Req) req.PathName,
pathConf.ReadIpsParsed,
pathConf.ReadUser,
pathConf.ReadPass,
req.Data)
if err != nil { if err != nil {
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet req.Res <- client.DescribeRes{nil, "", err} //nolint:govet
continue continue
@ -165,70 +169,62 @@ outer:
// create path if it doesn't exist // create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok { if _, ok := pm.paths[req.PathName]; !ok {
pm.paths[req.PathName] = pm.createPath(pathName, pathConf, req.PathName) pm.createPath(pathName, pathConf, req.PathName)
} }
pm.paths[req.PathName].OnPathManDescribe(req) pm.paths[req.PathName].OnPathManDescribe(req)
case req := <-pm.clientAnnounce: case req := <-pm.clientSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- client.AnnounceRes{nil, err} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet
continue continue
} }
err = req.Client.Authenticate(pm.authMethods, req.PathName, err = req.Client.Authenticate(
pathConf.PublishIpsParsed, pathConf.PublishUser, pm.authMethods,
pathConf.PublishPass, req.Req) req.PathName,
pathConf.ReadIpsParsed,
pathConf.ReadUser,
pathConf.ReadPass,
req.Data)
if err != nil { if err != nil {
req.Res <- client.AnnounceRes{nil, err} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, err} //nolint:govet
continue continue
} }
// create path if it doesn't exist // create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok { if _, ok := pm.paths[req.PathName]; !ok {
pm.paths[req.PathName] = pm.createPath(pathName, pathConf, req.PathName) pm.createPath(pathName, pathConf, req.PathName)
} }
pm.paths[req.PathName].OnPathManAnnounce(req) pm.paths[req.PathName].OnPathManSetupPlay(req)
case req := <-pm.clientSetupPlay: case req := <-pm.clientAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName) pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil { if err != nil {
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue continue
} }
err = req.Client.Authenticate( err = req.Client.Authenticate(
pm.authMethods, pm.authMethods,
req.PathName, req.PathName,
pathConf.ReadIpsParsed, pathConf.PublishIpsParsed,
pathConf.ReadUser, pathConf.PublishUser,
pathConf.ReadPass, pathConf.PublishPass,
req.Req) req.Data)
if err != nil { if err != nil {
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue continue
} }
// create path if it doesn't exist // create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok { if _, ok := pm.paths[req.PathName]; !ok {
pa := path.New( pm.createPath(pathName, pathConf, req.PathName)
pm.rtspPort,
pm.readTimeout,
pm.writeTimeout,
pm.readBufferCount,
pm.readBufferSize,
pathName,
pathConf,
req.PathName,
&pm.wg,
pm.stats,
pm)
pm.paths[req.PathName] = pa
} }
pm.paths[req.PathName].OnPathManSetupPlay(req) pm.paths[req.PathName].OnPathManAnnounce(req)
case <-pm.terminate: case <-pm.terminate:
break outer break outer
@ -254,17 +250,17 @@ outer:
} }
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientAnnounce: case req, ok := <-pm.clientSetupPlay:
if !ok { if !ok {
return return
} }
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- client.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientSetupPlay: case req, ok := <-pm.clientAnnounce:
if !ok { if !ok {
return return
} }
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
} }
} }
}() }()
@ -278,12 +274,12 @@ outer:
close(pm.clientClose) close(pm.clientClose)
close(pm.pathClose) close(pm.pathClose)
close(pm.clientDescribe) close(pm.clientDescribe)
close(pm.clientAnnounce)
close(pm.clientSetupPlay) close(pm.clientSetupPlay)
close(pm.clientAnnounce)
} }
func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) *path.Path { func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) {
return path.New( pm.paths[name] = path.New(
pm.rtspPort, pm.rtspPort,
pm.readTimeout, pm.readTimeout,
pm.writeTimeout, pm.writeTimeout,
@ -300,7 +296,7 @@ func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name str
func (pm *PathManager) createPaths() { func (pm *PathManager) createPaths() {
for pathName, pathConf := range pm.pathConfs { for pathName, pathConf := range pm.pathConfs {
if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil { if _, ok := pm.paths[pathName]; !ok && pathConf.Regexp == nil {
pm.paths[pathName] = pm.createPath(pathName, pathConf, pathName) pm.createPath(pathName, pathConf, pathName)
} }
} }
} }

52
internal/rtmputils/conn.go

@ -0,0 +1,52 @@
package rtmputils
import (
"net"
"net/url"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/format/rtmp"
)
// Conn contains a RTMP connection and a net connection.
type Conn struct {
rconn *rtmp.Conn
nconn net.Conn
}
// NewConn allocates a Conn.
func NewConn(rconn *rtmp.Conn, nconn net.Conn) *Conn {
return &Conn{
rconn: rconn,
nconn: nconn,
}
}
// NetConn returns the underlying net.Conn.
func (c *Conn) NetConn() net.Conn {
return c.nconn
}
// IsPublishing returns whether the connection is publishing.
func (c *Conn) IsPublishing() bool {
return c.rconn.Publishing
}
// URL returns the URL requested by the connection.
func (c *Conn) URL() *url.URL {
return c.rconn.URL
}
// ReadPacket reads a packet.
func (c *Conn) ReadPacket() (av.Packet, error) {
return c.rconn.ReadPacket()
}
// WritePacket writes a packet.
func (c *Conn) WritePacket(pkt av.Packet) error {
err := c.rconn.WritePacket(pkt)
if err != nil {
return err
}
return c.rconn.FlushWrite()
}

13
internal/rtmputils/connpair.go

@ -1,13 +0,0 @@
package rtmputils
import (
"net"
"github.com/notedit/rtmp/format/rtmp"
)
// ConnPair contains a RTMP connection and a net connection.
type ConnPair struct {
RConn *rtmp.Conn
NConn net.Conn
}

53
internal/rtmputils/metadata.go

@ -2,13 +2,11 @@ package rtmputils
import ( import (
"fmt" "fmt"
"time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/notedit/rtmp/av" "github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264" "github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/flv/flvio" "github.com/notedit/rtmp/format/flv/flvio"
"github.com/notedit/rtmp/format/rtmp"
) )
const ( const (
@ -16,8 +14,8 @@ const (
codecAAC = 10 codecAAC = 10
) )
func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) { func readMetadata(conn *Conn) (flvio.AMFMap, error) {
pkt, err := rconn.ReadPacket() pkt, err := conn.ReadPacket()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -43,16 +41,12 @@ func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) {
return ma, nil return ma, nil
} }
// Metadata extracts track informations from a RTMP connection that is publishing. // ReadMetadata extracts track informations from a RTMP connection that is publishing.
func Metadata(conn ConnPair, readTimeout time.Duration) ( func ReadMetadata(conn *Conn) (*gortsplib.Track, *gortsplib.Track, error) {
*gortsplib.Track, *gortsplib.Track, error) {
var videoTrack *gortsplib.Track var videoTrack *gortsplib.Track
var audioTrack *gortsplib.Track var audioTrack *gortsplib.Track
// configuration must be completed within readTimeout md, err := readMetadata(conn)
conn.NConn.SetReadDeadline(time.Now().Add(readTimeout))
md, err := readMetadata(conn.RConn)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -121,7 +115,7 @@ func Metadata(conn ConnPair, readTimeout time.Duration) (
for { for {
var pkt av.Packet var pkt av.Packet
pkt, err = conn.RConn.ReadPacket() pkt, err = conn.ReadPacket()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -165,3 +159,38 @@ func Metadata(conn ConnPair, readTimeout time.Duration) (
} }
} }
} }
// WriteMetadata writes track informations to a RTMP connection that is reading.
func WriteMetadata(conn *Conn, videoTrack *gortsplib.Track, audioTrack *gortsplib.Track) error {
return conn.WritePacket(av.Packet{
Type: av.Metadata,
Data: flvio.FillAMF0ValMalloc(flvio.AMFMap{
{
K: "videodatarate",
V: float64(0),
},
{
K: "videocodecid",
V: func() float64 {
if videoTrack != nil {
return codecH264
}
return 0
}(),
},
{
K: "audiodatarate",
V: float64(0),
},
{
K: "audiocodecid",
V: func() float64 {
if audioTrack != nil {
return codecAAC
}
return 0
}(),
},
}),
})
}

10
internal/serverrtmp/server.go

@ -22,7 +22,7 @@ type Server struct {
srv *rtmp.Server srv *rtmp.Server
wg sync.WaitGroup wg sync.WaitGroup
accept chan rtmputils.ConnPair accept chan *rtmputils.Conn
} }
// New allocates a Server. // New allocates a Server.
@ -39,7 +39,7 @@ func New(
s := &Server{ s := &Server{
l: l, l: l,
accept: make(chan rtmputils.ConnPair), accept: make(chan *rtmputils.Conn),
} }
s.srv = rtmp.NewServer() s.srv = rtmp.NewServer()
@ -57,7 +57,7 @@ func New(
func (s *Server) Close() { func (s *Server) Close() {
go func() { go func() {
for co := range s.accept { for co := range s.accept {
co.NConn.Close() co.NetConn().Close()
} }
}() }()
s.l.Close() s.l.Close()
@ -83,10 +83,10 @@ func (s *Server) run() {
} }
func (s *Server) innerHandleConn(rconn *rtmp.Conn, nconn net.Conn) { func (s *Server) innerHandleConn(rconn *rtmp.Conn, nconn net.Conn) {
s.accept <- rtmputils.ConnPair{rconn, nconn} //nolint:govet s.accept <- rtmputils.NewConn(rconn, nconn)
} }
// Accept returns a channel to accept incoming connections. // Accept returns a channel to accept incoming connections.
func (s *Server) Accept() chan rtmputils.ConnPair { func (s *Server) Accept() chan *rtmputils.Conn {
return s.accept return s.accept
} }

70
internal/sourcertmp/source.go

@ -112,7 +112,7 @@ func (s *Source) run() {
func (s *Source) runInner() bool { func (s *Source) runInner() bool {
s.log(logger.Info, "connecting") s.log(logger.Info, "connecting")
var conn rtmputils.ConnPair var conn *rtmputils.Conn
var err error var err error
dialDone := make(chan struct{}, 1) dialDone := make(chan struct{}, 1)
go func() { go func() {
@ -120,7 +120,7 @@ func (s *Source) runInner() bool {
var rconn *rtmp.Conn var rconn *rtmp.Conn
var nconn net.Conn var nconn net.Conn
rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading) rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading)
conn = rtmputils.ConnPair{rconn, nconn} //nolint:govet conn = rtmputils.NewConn(rconn, nconn)
}() }()
select { select {
@ -139,14 +139,14 @@ func (s *Source) runInner() bool {
metadataDone := make(chan struct{}) metadataDone := make(chan struct{})
go func() { go func() {
defer close(metadataDone) defer close(metadataDone)
videoTrack, audioTrack, err = rtmputils.Metadata( conn.NetConn().SetReadDeadline(time.Now().Add(s.readTimeout))
conn, s.readTimeout) //nolint:govet videoTrack, audioTrack, err = rtmputils.ReadMetadata(conn)
}() }()
select { select {
case <-metadataDone: case <-metadataDone:
case <-s.terminate: case <-s.terminate:
conn.NConn.Close() conn.NetConn().Close()
<-metadataDone <-metadataDone
return false return false
} }
@ -160,26 +160,14 @@ func (s *Source) runInner() bool {
var h264Encoder *rtph264.Encoder var h264Encoder *rtph264.Encoder
if videoTrack != nil { if videoTrack != nil {
var err error h264Encoder = rtph264.NewEncoder(96, nil, nil, nil)
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
conn.NConn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
tracks = append(tracks, videoTrack) tracks = append(tracks, videoTrack)
} }
var aacEncoder *rtpaac.Encoder var aacEncoder *rtpaac.Encoder
if audioTrack != nil { if audioTrack != nil {
clockRate, _ := audioTrack.ClockRate() clockRate, _ := audioTrack.ClockRate()
var err error aacEncoder = rtpaac.NewEncoder(96, clockRate, nil, nil, nil)
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
conn.NConn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
tracks = append(tracks, audioTrack) tracks = append(tracks, audioTrack)
} }
@ -198,8 +186,8 @@ func (s *Source) runInner() bool {
defer rtcpSenders.Close() defer rtcpSenders.Close()
for { for {
conn.NConn.SetReadDeadline(time.Now().Add(s.readTimeout)) conn.NetConn().SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := conn.RConn.ReadPacket() pkt, err := conn.ReadPacket()
if err != nil { if err != nil {
return err return err
} }
@ -216,15 +204,21 @@ func (s *Source) runInner() bool {
return fmt.Errorf("invalid NALU format (%d)", typ) return fmt.Errorf("invalid NALU format (%d)", typ)
} }
// encode into RTP/H264 format for _, nalu := range nalus {
frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus) // encode into RTP/H264 format
if err != nil { frames, err := h264Encoder.Encode(&rtph264.NALUAndTimestamp{
return err Timestamp: pkt.Time + pkt.CTime,
} NALU: nalu,
})
for _, f := range frames { if err != nil {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f) return err
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f) }
for _, frame := range frames {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(),
gortsplib.StreamTypeRTP, frame)
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, frame)
}
} }
case av.AAC: case av.AAC:
@ -232,15 +226,17 @@ func (s *Source) runInner() bool {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up") return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
} }
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data) frame, err := aacEncoder.Encode(&rtpaac.AUAndTimestamp{
Timestamp: pkt.Time + pkt.CTime,
AU: pkt.Data,
})
if err != nil { if err != nil {
return err return err
} }
for _, f := range frames { rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(),
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f) gortsplib.StreamTypeRTP, frame)
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f) s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, frame)
}
default: default:
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type) return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)
@ -252,12 +248,12 @@ func (s *Source) runInner() bool {
for { for {
select { select {
case err := <-readerDone: case err := <-readerDone:
conn.NConn.Close() conn.NetConn().Close()
s.log(logger.Info, "ERR: %s", err) s.log(logger.Info, "ERR: %s", err)
return true return true
case <-s.terminate: case <-s.terminate:
conn.NConn.Close() conn.NetConn().Close()
<-readerDone <-readerDone
return false return false
} }

4
main.go

@ -257,6 +257,8 @@ func (p *program) createResources(initial bool) error {
p.clientMan = clientman.New( p.clientMan = clientman.New(
p.conf.RTSPPort, p.conf.RTSPPort,
p.conf.ReadTimeout, p.conf.ReadTimeout,
p.conf.WriteTimeout,
p.conf.ReadBufferCount,
p.conf.RunOnConnect, p.conf.RunOnConnect,
p.conf.RunOnConnectRestart, p.conf.RunOnConnectRestart,
p.conf.ProtocolsParsed, p.conf.ProtocolsParsed,
@ -350,6 +352,8 @@ func (p *program) closeResources(newConf *conf.Conf) {
closePathMan || closePathMan ||
newConf.RTSPPort != p.conf.RTSPPort || newConf.RTSPPort != p.conf.RTSPPort ||
newConf.ReadTimeout != p.conf.ReadTimeout || newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.WriteTimeout != p.conf.WriteTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
newConf.RunOnConnect != p.conf.RunOnConnect || newConf.RunOnConnect != p.conf.RunOnConnect ||
newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart || newConf.RunOnConnectRestart != p.conf.RunOnConnectRestart ||
!reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) { !reflect.DeepEqual(newConf.ProtocolsParsed, p.conf.ProtocolsParsed) {

287
main_test.go

@ -185,7 +185,7 @@ y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----
`) `)
func TestPublishRead(t *testing.T) { func TestRTSPPublishRead(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
encrypted bool encrypted bool
publisherSoft string publisherSoft string
@ -315,7 +315,7 @@ func TestPublishRead(t *testing.T) {
} }
} }
func TestAutomaticProtocol(t *testing.T) { func TestRTSPAutomaticProtocol(t *testing.T) {
for _, source := range []string{ for _, source := range []string{
"ffmpeg", "ffmpeg",
} { } {
@ -351,7 +351,7 @@ func TestAutomaticProtocol(t *testing.T) {
} }
} }
func TestPublisherOverride(t *testing.T) { func TestRTSPPublisherOverride(t *testing.T) {
p, ok := testProgram("") p, ok := testProgram("")
require.Equal(t, true, ok) require.Equal(t, true, ok)
defer p.close() defer p.close()
@ -445,7 +445,7 @@ func TestPublisherOverride(t *testing.T) {
defer conn2.Close() defer conn2.Close()
} }
func TestPath(t *testing.T) { func TestRTSPPath(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
name string name string
path string path string
@ -490,7 +490,7 @@ func TestPath(t *testing.T) {
} }
} }
func TestAuth(t *testing.T) { func TestRTSPAuth(t *testing.T) {
t.Run("publish", func(t *testing.T) { t.Run("publish", func(t *testing.T) {
p, ok := testProgram("paths:\n" + p, ok := testProgram("paths:\n" +
" all:\n" + " all:\n" +
@ -607,43 +607,9 @@ func TestAuth(t *testing.T) {
defer cnt2.close() defer cnt2.close()
require.Equal(t, 0, cnt2.wait()) require.Equal(t, 0, cnt2.wait())
}) })
t.Run("rtmp", func(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/test1/test2?user=testuser&pass=testpass",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test1/test2",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
})
} }
func TestAuthFail(t *testing.T) { func TestRTSPAuthFail(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
name string name string
user string user string
@ -757,7 +723,7 @@ func TestAuthFail(t *testing.T) {
} }
} }
func TestAuthIpFail(t *testing.T) { func TestRTSPAuthIpFail(t *testing.T) {
p, ok := testProgram("paths:\n" + p, ok := testProgram("paths:\n" +
" all:\n" + " all:\n" +
" publishIps: [127.0.0.1/32]\n") " publishIps: [127.0.0.1/32]\n")
@ -778,12 +744,207 @@ func TestAuthIpFail(t *testing.T) {
require.NotEqual(t, 0, cnt1.wait()) require.NotEqual(t, 0, cnt1.wait())
} }
func TestRTMPPublish(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + ":1935/test1/test2",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test1/test2",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestRTMPRead(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"rtsp://" + ownDockerIP + ":8554/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + ":1935/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestRTMPAuth(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser\n" +
" publishPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
})
t.Run("read", func(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser\n" +
" readPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
})
}
func TestRTMPAuthFail(t *testing.T) {
t.Run("publish", func(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n" +
"paths:\n" +
" all:\n" +
" publishUser: testuser2\n" +
" publishPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.NotEqual(t, 0, cnt2.wait())
})
t.Run("read", func(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n" +
"paths:\n" +
" all:\n" +
" readUser: testuser2\n" +
" readPass: testpass\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/teststream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "rtmp://" + ownDockerIP + "/teststream?user=testuser&pass=testpass",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.NotEqual(t, 0, cnt2.wait())
})
}
func TestSource(t *testing.T) { func TestSource(t *testing.T) {
for _, source := range []string{ for _, source := range []string{
"rtsp_udp", //"rtsp_udp",
"rtsp_tcp", //"rtsp_tcp",
"rtsps", //"rtsps",
"rtmp_videoaudio", //"rtmp_videoaudio",
"rtmp_video", "rtmp_video",
} { } {
t.Run(source, func(t *testing.T) { t.Run(source, func(t *testing.T) {
@ -910,7 +1071,7 @@ func TestSource(t *testing.T) {
} }
} }
func TestRedirect(t *testing.T) { func TestRTSPRedirect(t *testing.T) {
p1, ok := testProgram("paths:\n" + p1, ok := testProgram("paths:\n" +
" path1:\n" + " path1:\n" +
" source: redirect\n" + " source: redirect\n" +
@ -945,7 +1106,7 @@ func TestRedirect(t *testing.T) {
require.Equal(t, 0, cnt2.wait()) require.Equal(t, 0, cnt2.wait())
} }
func TestFallback(t *testing.T) { func TestRTSPFallback(t *testing.T) {
for _, ca := range []string{ for _, ca := range []string{
"absolute", "absolute",
"relative", "relative",
@ -993,37 +1154,7 @@ func TestFallback(t *testing.T) {
} }
} }
func TestRTMP(t *testing.T) { func TestRTSPRunOnDemand(t *testing.T) {
p, ok := testProgram("rtmpEnable: yes\n")
require.Equal(t, true, ok)
defer p.close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-c", "copy",
"-f", "flv",
"rtmp://" + ownDockerIP + "/test1/test2",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test1/test2",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestRunOnDemand(t *testing.T) {
doneFile := filepath.Join(os.TempDir(), "ondemand_done") doneFile := filepath.Join(os.TempDir(), "ondemand_done")
onDemandFile, err := writeTempFile([]byte(fmt.Sprintf(`#!/bin/sh onDemandFile, err := writeTempFile([]byte(fmt.Sprintf(`#!/bin/sh
trap 'touch %s; [ -z "$(jobs -p)" ] || kill $(jobs -p)' INT trap 'touch %s; [ -z "$(jobs -p)" ] || kill $(jobs -p)' INT

Loading…
Cancel
Save