Browse Source

add a RTMP server that allows to publish legacy streams (#132)

pull/235/head
aler9 5 years ago
parent
commit
3bcead121d
  1. 7
      README.md
  2. 122
      internal/client/client.go
  3. 59
      internal/clientman/clientman.go
  4. 316
      internal/clientrtmp/client.go
  5. 198
      internal/clientrtsp/client.go
  6. 119
      internal/path/path.go
  7. 3
      internal/path/readersmap.go
  8. 70
      internal/pathman/pathman.go
  9. 13
      internal/rtmputils/connpair.go
  10. 13
      internal/rtmputils/metadata.go
  11. 79
      internal/rtmputils/rtcpsenderset.go
  12. 27
      internal/serverrtmp/server.go
  13. 99
      internal/sourcertmp/source.go
  14. 22
      main.go
  15. 19
      main_test.go

7
README.md

@ -5,7 +5,7 @@ @@ -5,7 +5,7 @@
[![Lint](https://github.com/aler9/rtsp-simple-server/workflows/lint/badge.svg)](https://github.com/aler9/rtsp-simple-server/actions)
[![Docker Hub](https://img.shields.io/badge/docker-aler9%2Frtsp--simple--server-blue)](https://hub.docker.com/r/aler9/rtsp-simple-server)
_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server and RTSP proxy, a software that allows multiple users to publish, read and proxy live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both publishers and readers and relays the publisher's streams to the readers.
_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP/RTMP server and RTSP/RTMP proxy, a software that allows multiple users to publish, read and proxy live video and audio streams over time. RTSP is a standard protocol that describes how to perform these operations with the help of a server, that is contacted by both publishers and readers and relays the publisher's streams to the readers.
Features:
@ -33,6 +33,7 @@ Features: @@ -33,6 +33,7 @@ Features:
* [Authentication](#authentication)
* [Encrypt the configuration](#encrypt-the-configuration)
* [RTSP proxy mode](#rtsp-proxy-mode)
* [RTMP server](#rtmp-server)
* [Publish a webcam](#publish-a-webcam)
* [Publish a Raspberry Pi Camera](#publish-a-raspberry-pi-camera)
* [Convert streams to HLS](#convert-streams-to-hls)
@ -275,6 +276,10 @@ paths: @@ -275,6 +276,10 @@ paths:
sourceOnDemand: yes
```
### RTMP server
asdasd
### Publish a webcam
Edit `rtsp-simple-server.yml` and replace everything inside section `paths` with the following content:

122
internal/client/client.go

@ -0,0 +1,122 @@ @@ -0,0 +1,122 @@
package client
import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/conf"
)
// Client can be
// *clientrtsp.Client
// *clientrtmp.Client
type Client interface {
IsClient()
IsSource()
Close()
Authenticate([]headers.AuthMethod, []interface{}, string, string,
*base.Request, *base.URL) error
OnReaderFrame(int, gortsplib.StreamType, []byte)
}
// ErrAuthNotCritical is a non-critical authentication error.
type ErrAuthNotCritical struct {
*base.Response
}
// Error implements the error interface.
func (ErrAuthNotCritical) Error() string {
return "non-critical authentication error"
}
// ErrAuthCritical is a critical authentication error.
type ErrAuthCritical struct {
*base.Response
}
// Error implements the error interface.
func (ErrAuthCritical) Error() string {
return "critical authentication error"
}
// DescribeRes is a client describe response.
type DescribeRes struct {
SDP []byte
Redirect string
Err error
}
// DescribeReq is a client describe request.
type DescribeReq struct {
Client Client
PathName string
Req *base.Request
Res chan DescribeRes
}
// SetupPlayRes is a setup/play response.
type SetupPlayRes struct {
Path Path
Err error
}
// SetupPlayReq is a setup/play request.
type SetupPlayReq struct {
Client Client
PathName string
TrackID int
Req *base.Request
Res chan SetupPlayRes
}
// AnnounceRes is a client announce response.
type AnnounceRes struct {
Path Path
Err error
}
// AnnounceReq is a client announce request.
type AnnounceReq struct {
Client Client
PathName string
Tracks gortsplib.Tracks
Req *base.Request
Res chan AnnounceRes
}
// RemoveReq is a remove request.
type RemoveReq struct {
Client Client
Res chan struct{}
}
// PlayReq is a play request.
type PlayReq struct {
Client Client
Res chan struct{}
}
// RecordReq is a record request.
type RecordReq struct {
Client Client
Res chan struct{}
}
// PauseReq is a pause request.
type PauseReq struct {
Client Client
Res chan struct{}
}
// Path is implemented by path.Path.
type Path interface {
Name() string
SourceTrackCount() int
Conf() *conf.PathConf
OnClientRemove(RemoveReq)
OnClientPlay(PlayReq)
OnClientRecord(RecordReq)
OnClientPause(PauseReq)
OnFrame(int, gortsplib.StreamType, []byte)
}

59
internal/clientman/clientman.go

@ -7,13 +7,24 @@ import ( @@ -7,13 +7,24 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/clientrtmp"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/pathman"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/serverrtmp"
"github.com/aler9/rtsp-simple-server/internal/serverrtsp"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
// PathManager is implemented by pathman.PathManager.
type PathManager interface {
ClientClose() chan client.Client
OnClientDescribe(client.DescribeReq)
OnClientAnnounce(client.AnnounceReq)
OnClientSetupPlay(client.SetupPlayReq)
}
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
@ -27,16 +38,17 @@ type ClientManager struct { @@ -27,16 +38,17 @@ type ClientManager struct {
runOnConnectRestart bool
protocols map[base.StreamProtocol]struct{}
stats *stats.Stats
pathMan *pathman.PathManager
pathMan PathManager
serverPlain *serverrtsp.Server
serverTLS *serverrtsp.Server
serverRTMP *serverrtmp.Server
parent Parent
clients map[*clientrtsp.Client]struct{}
clients map[client.Client]struct{}
wg sync.WaitGroup
// in
clientClose chan *clientrtsp.Client
clientClose chan client.Client
terminate chan struct{}
// out
@ -51,9 +63,10 @@ func New( @@ -51,9 +63,10 @@ func New(
runOnConnectRestart bool,
protocols map[base.StreamProtocol]struct{},
stats *stats.Stats,
pathMan *pathman.PathManager,
pathMan PathManager,
serverPlain *serverrtsp.Server,
serverTLS *serverrtsp.Server,
serverRTMP *serverrtmp.Server,
parent Parent) *ClientManager {
cm := &ClientManager{
@ -66,14 +79,16 @@ func New( @@ -66,14 +79,16 @@ func New(
pathMan: pathMan,
serverPlain: serverPlain,
serverTLS: serverTLS,
serverRTMP: serverRTMP,
parent: parent,
clients: make(map[*clientrtsp.Client]struct{}),
clientClose: make(chan *clientrtsp.Client),
clients: make(map[client.Client]struct{}),
clientClose: make(chan client.Client),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
go cm.run()
return cm
}
@ -105,11 +120,19 @@ func (cm *ClientManager) run() { @@ -105,11 +120,19 @@ func (cm *ClientManager) run() {
return make(chan *gortsplib.ServerConn)
}()
rtmpAccept := func() chan rtmputils.ConnPair {
if cm.serverRTMP != nil {
return cm.serverRTMP.Accept()
}
return make(chan rtmputils.ConnPair)
}()
outer:
for {
select {
case conn := <-tcpAccept:
c := clientrtsp.New(false,
c := clientrtsp.New(
false,
cm.rtspPort,
cm.readTimeout,
cm.runOnConnect,
@ -122,7 +145,8 @@ outer: @@ -122,7 +145,8 @@ outer:
cm.clients[c] = struct{}{}
case conn := <-tlsAccept:
c := clientrtsp.New(true,
c := clientrtsp.New(
true,
cm.rtspPort,
cm.readTimeout,
cm.runOnConnect,
@ -134,6 +158,15 @@ outer: @@ -134,6 +158,15 @@ outer:
cm)
cm.clients[c] = struct{}{}
case conn := <-rtmpAccept:
c := clientrtmp.New(
cm.readTimeout,
&cm.wg,
cm.stats,
conn,
cm)
cm.clients[c] = struct{}{}
case c := <-cm.pathMan.ClientClose():
if _, ok := cm.clients[c]; !ok {
continue
@ -167,21 +200,21 @@ outer: @@ -167,21 +200,21 @@ outer:
}
// OnClientClose is called by clientrtsp.Client.
func (cm *ClientManager) OnClientClose(c *clientrtsp.Client) {
func (cm *ClientManager) OnClientClose(c client.Client) {
cm.clientClose <- c
}
// OnClientDescribe is called by clientrtsp.Client.
func (cm *ClientManager) OnClientDescribe(req clientrtsp.DescribeReq) {
func (cm *ClientManager) OnClientDescribe(req client.DescribeReq) {
cm.pathMan.OnClientDescribe(req)
}
// OnClientAnnounce is called by clientrtsp.Client.
func (cm *ClientManager) OnClientAnnounce(req clientrtsp.AnnounceReq) {
func (cm *ClientManager) OnClientAnnounce(req client.AnnounceReq) {
cm.pathMan.OnClientAnnounce(req)
}
// OnClientSetupPlay is called by clientrtsp.Client.
func (cm *ClientManager) OnClientSetupPlay(req clientrtsp.SetupPlayReq) {
func (cm *ClientManager) OnClientSetupPlay(req client.SetupPlayReq) {
cm.pathMan.OnClientSetupPlay(req)
}

316
internal/clientrtmp/client.go

@ -0,0 +1,316 @@ @@ -0,0 +1,316 @@
package clientrtmp
import (
"fmt"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
const (
pauseAfterAuthError = 2 * time.Second
)
// Parent is implemented by clientman.ClientMan.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnClientClose(client.Client)
OnClientAnnounce(client.AnnounceReq)
}
// Client is a RTMP client.
type Client struct {
readTimeout time.Duration
stats *stats.Stats
wg *sync.WaitGroup
conn rtmputils.ConnPair
parent Parent
path client.Path
// in
terminate chan struct{}
}
// New allocates a Client.
func New(
readTimeout time.Duration,
wg *sync.WaitGroup,
stats *stats.Stats,
conn rtmputils.ConnPair,
parent Parent) *Client {
c := &Client{
readTimeout: readTimeout,
wg: wg,
stats: stats,
conn: conn,
parent: parent,
terminate: make(chan struct{}),
}
atomic.AddInt64(c.stats.CountClients, 1)
c.log(logger.Info, "connected (RTMP)")
c.wg.Add(1)
go c.run()
return c
}
// Close closes a Client.
func (c *Client) Close() {
atomic.AddInt64(c.stats.CountClients, -1)
close(c.terminate)
}
// IsClient implements client.Client.
func (c *Client) IsClient() {}
// IsSource implements path.source.
func (c *Client) IsSource() {}
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NConn.RemoteAddr().String()}, args...)...)
}
func (c *Client) run() {
defer c.wg.Done()
defer c.log(logger.Info, "disconnected")
if !c.conn.RConn.Publishing {
c.conn.NConn.Close()
c.log(logger.Info, "ERR: client is not publishing")
return
}
var videoTrack *gortsplib.Track
var audioTrack *gortsplib.Track
var err error
var tracks gortsplib.Tracks
var h264Encoder *rtph264.Encoder
var aacEncoder *rtpaac.Encoder
metadataDone := make(chan struct{})
go func() {
defer close(metadataDone)
err = func() error {
videoTrack, audioTrack, err = rtmputils.Metadata(c.conn, c.readTimeout)
if err != nil {
return err
}
if videoTrack != nil {
var err error
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
return err
}
tracks = append(tracks, videoTrack)
}
if audioTrack != nil {
clockRate, _ := audioTrack.ClockRate()
var err error
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
return err
}
tracks = append(tracks, audioTrack)
}
for i, t := range tracks {
t.ID = i
}
return nil
}()
}()
select {
case <-metadataDone:
case <-c.terminate:
c.conn.NConn.Close()
<-metadataDone
}
if err != nil {
c.conn.NConn.Close()
c.log(logger.Info, "ERR: %s", err)
c.parent.OnClientClose(c)
<-c.terminate
return
}
err = func() error {
// remove trailing slash, that is inserted by OBS
tmp := strings.TrimSuffix(c.conn.RConn.URL.String(), "/")
ur, _ := url.Parse(tmp)
pathName := strings.TrimPrefix(ur.Path, "/")
resc := make(chan client.AnnounceRes)
c.parent.OnClientAnnounce(client.AnnounceReq{c, pathName, tracks, nil, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch res.Err.(type) {
case client.ErrAuthNotCritical:
return err
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
case <-c.terminate:
}
return err
default:
return err
}
}
c.path = res.Path
return nil
}()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
c.conn.NConn.Close()
c.parent.OnClientClose(c)
<-c.terminate
return
}
func() {
resc := make(chan struct{})
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
}()
readerDone := make(chan error)
go func() {
readerDone <- func() error {
rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, c.path.OnFrame)
defer rtcpSenders.Close()
for {
c.conn.NConn.SetReadDeadline(time.Now().Add(c.readTimeout))
pkt, err := c.conn.RConn.ReadPacket()
if err != nil {
return err
}
switch pkt.Type {
case av.H264:
if videoTrack == nil {
return fmt.Errorf("ERR: received an H264 frame, but track is not set up")
}
// decode from AVCC format
nalus, typ := h264.SplitNALUs(pkt.Data)
if typ != h264.NALU_AVCC {
return fmt.Errorf("invalid NALU format (%d)", typ)
}
// encode into RTP/H264 format
frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus)
if err != nil {
return err
}
for _, f := range frames {
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f)
c.path.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f)
}
case av.AAC:
if audioTrack == nil {
return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
}
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data)
if err != nil {
return err
}
for _, f := range frames {
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f)
c.path.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f)
}
default:
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)
}
}
}()
}()
select {
case err := <-readerDone:
c.conn.NConn.Close()
if err != io.EOF {
c.log(logger.Info, "ERR: %s", err)
}
if c.path != nil {
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.path = nil
}
c.parent.OnClientClose(c)
<-c.terminate
case <-c.terminate:
c.conn.NConn.Close()
<-readerDone
if c.path != nil {
res := make(chan struct{})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.path = nil
}
}
}
// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{},
user string, pass string, req *base.Request, altURL *base.URL) error {
// TODO
return nil
}
// OnReaderFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
}

198
internal/clientrtsp/client.go

@ -16,7 +16,7 @@ import ( @@ -16,7 +16,7 @@ import (
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/stats"
@ -37,97 +37,16 @@ func (e ErrNoOnePublishing) Error() string { @@ -37,97 +37,16 @@ func (e ErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
}
// DescribeRes is a client describe response.
type DescribeRes struct {
SDP []byte
Redirect string
Err error
}
// DescribeReq is a client describe request.
type DescribeReq struct {
Client *Client
PathName string
Req *base.Request
Res chan DescribeRes
}
// AnnounceRes is a client announce response.
type AnnounceRes struct {
Path Path
Err error
}
// AnnounceReq is a client announce request.
type AnnounceReq struct {
Client *Client
PathName string
Tracks gortsplib.Tracks
Req *base.Request
Res chan AnnounceRes
}
// SetupPlayRes is a setup/play response.
type SetupPlayRes struct {
Path Path
Err error
}
// SetupPlayReq is a setup/play request.
type SetupPlayReq struct {
Client *Client
PathName string
TrackID int
Req *base.Request
Res chan SetupPlayRes
}
// RemoveReq is a remove request.
type RemoveReq struct {
Client *Client
Res chan struct{}
}
// PlayReq is a play request.
type PlayReq struct {
Client *Client
Res chan struct{}
}
// RecordReq is a record request.
type RecordReq struct {
Client *Client
Res chan struct{}
}
// PauseReq is a pause request.
type PauseReq struct {
Client *Client
Res chan struct{}
}
// Path is implemented by path.Path.
type Path interface {
Name() string
SourceTrackCount() int
Conf() *conf.PathConf
OnClientRemove(RemoveReq)
OnClientPlay(PlayReq)
OnClientRecord(RecordReq)
OnClientPause(PauseReq)
OnFrame(int, gortsplib.StreamType, []byte)
}
// Parent is implemented by clientman.ClientMan.
type Parent interface {
Log(logger.Level, string, ...interface{})
OnClientClose(*Client)
OnClientDescribe(DescribeReq)
OnClientAnnounce(AnnounceReq)
OnClientSetupPlay(SetupPlayReq)
OnClientClose(client.Client)
OnClientDescribe(client.DescribeReq)
OnClientAnnounce(client.AnnounceReq)
OnClientSetupPlay(client.SetupPlayReq)
}
// Client is a RTSP
// Client is a RTSP client.
type Client struct {
rtspPort int
readTimeout time.Duration
@ -139,7 +58,7 @@ type Client struct { @@ -139,7 +58,7 @@ type Client struct {
conn *gortsplib.ServerConn
parent Parent
path Path
path client.Path
authUser string
authPass string
authValidator *auth.Validator
@ -187,6 +106,7 @@ func New( @@ -187,6 +106,7 @@ func New(
c.wg.Add(1)
go c.run()
return c
}
@ -196,6 +116,9 @@ func (c *Client) Close() { @@ -196,6 +116,9 @@ func (c *Client) Close() {
close(c.terminate)
}
// IsClient implements client.Client.
func (c *Client) IsClient() {}
// IsSource implements path.source.
func (c *Client) IsSource() {}
@ -237,16 +160,16 @@ func (c *Client) run() { @@ -237,16 +160,16 @@ func (c *Client) run() {
}, fmt.Errorf("invalid path (%s)", req.URL)
}
resc := make(chan DescribeRes)
c.parent.OnClientDescribe(DescribeReq{c, reqPath, req, resc})
resc := make(chan client.DescribeRes)
c.parent.OnClientDescribe(client.DescribeReq{c, reqPath, req, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch terr := res.Err.(type) {
case errAuthNotCritical:
case client.ErrAuthNotCritical:
return terr.Response, nil
case errAuthCritical:
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
@ -293,16 +216,16 @@ func (c *Client) run() { @@ -293,16 +216,16 @@ func (c *Client) run() {
}, fmt.Errorf("invalid path (%s)", req.URL)
}
resc := make(chan AnnounceRes)
c.parent.OnClientAnnounce(AnnounceReq{c, reqPath, tracks, req, resc})
resc := make(chan client.AnnounceRes)
c.parent.OnClientAnnounce(client.AnnounceReq{c, reqPath, tracks, req, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch terr := res.Err.(type) {
case errAuthNotCritical:
case client.ErrAuthNotCritical:
return terr.Response, nil
case errAuthCritical:
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
@ -363,16 +286,16 @@ func (c *Client) run() { @@ -363,16 +286,16 @@ func (c *Client) run() {
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}
resc := make(chan SetupPlayRes)
c.parent.OnClientSetupPlay(SetupPlayReq{c, reqPath, trackID, req, resc})
resc := make(chan client.SetupPlayRes)
c.parent.OnClientSetupPlay(client.SetupPlayReq{c, reqPath, trackID, req, resc}) //nolint:govet
res := <-resc
if res.Err != nil {
switch terr := res.Err.(type) {
case errAuthNotCritical:
case client.ErrAuthNotCritical:
return terr.Response, nil
case errAuthCritical:
case client.ErrAuthCritical:
// wait some seconds to stop brute force attacks
select {
case <-time.After(pauseAfterAuthError):
@ -436,7 +359,7 @@ func (c *Client) run() { @@ -436,7 +359,7 @@ func (c *Client) run() {
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}
c.startPlay()
c.playStart()
}
return &base.Response{
@ -464,7 +387,7 @@ func (c *Client) run() { @@ -464,7 +387,7 @@ func (c *Client) run() {
}, fmt.Errorf("path has changed, was '%s', now is '%s'", c.path.Name(), reqPath)
}
c.startRecord()
c.recordStart()
return &base.Response{
StatusCode: base.StatusOK,
@ -477,15 +400,15 @@ func (c *Client) run() { @@ -477,15 +400,15 @@ func (c *Client) run() {
onPause := func(req *base.Request) (*base.Response, error) {
switch c.conn.State() {
case gortsplib.ServerConnStatePlay:
c.stopPlay()
c.playStop()
res := make(chan struct{})
c.path.OnClientPause(PauseReq{c, res})
c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet
<-res
case gortsplib.ServerConnStateRecord:
c.stopRecord()
c.recordStop()
res := make(chan struct{})
c.path.OnClientPause(PauseReq{c, res})
c.path.OnClientPause(client.PauseReq{c, res}) //nolint:govet
<-res
}
@ -520,21 +443,22 @@ func (c *Client) run() { @@ -520,21 +443,22 @@ func (c *Client) run() {
select {
case err := <-readDone:
c.conn.Close()
if err != io.EOF && err != gortsplib.ErrServerTeardown && err != errTerminated {
c.log(logger.Info, "ERR: %s", err)
}
switch c.conn.State() {
case gortsplib.ServerConnStatePlay:
c.stopPlay()
c.playStop()
case gortsplib.ServerConnStateRecord:
c.stopRecord()
c.recordStop()
}
if c.path != nil {
res := make(chan struct{})
c.path.OnClientRemove(RemoveReq{c, res})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.path = nil
}
@ -548,37 +472,21 @@ func (c *Client) run() { @@ -548,37 +472,21 @@ func (c *Client) run() {
switch c.conn.State() {
case gortsplib.ServerConnStatePlay:
c.stopPlay()
c.playStop()
case gortsplib.ServerConnStateRecord:
c.stopRecord()
c.recordStop()
}
if c.path != nil {
res := make(chan struct{})
c.path.OnClientRemove(RemoveReq{c, res})
c.path.OnClientRemove(client.RemoveReq{c, res}) //nolint:govet
<-res
c.path = nil
}
}
}
type errAuthNotCritical struct {
*base.Response
}
func (errAuthNotCritical) Error() string {
return "auth not critical"
}
type errAuthCritical struct {
*base.Response
}
func (errAuthCritical) Error() string {
return "auth critical"
}
// Authenticate performs an authentication.
func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{},
user string, pass string, req *base.Request, altURL *base.URL) error {
@ -589,7 +497,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -589,7 +497,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
if !ipEqualOrInRange(ip, ips) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
return errAuthCritical{&base.Response{
return client.ErrAuthCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized,
}}
}
@ -618,7 +526,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -618,7 +526,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
if c.authFailures > 3 {
c.log(logger.Info, "ERR: unauthorized: %s", err)
return errAuthCritical{&base.Response{
return client.ErrAuthCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(),
@ -630,7 +538,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -630,7 +538,7 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
c.log(logger.Debug, "WARN: unauthorized: %s", err)
}
return errAuthNotCritical{&base.Response{
return client.ErrAuthNotCritical{&base.Response{ //nolint:govet
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.GenerateHeader(),
@ -645,12 +553,13 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{ @@ -645,12 +553,13 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{
return nil
}
func (c *Client) startPlay() {
res := make(chan struct{})
c.path.OnClientPlay(PlayReq{c, res})
<-res
func (c *Client) playStart() {
resc := make(chan struct{})
c.path.OnClientPlay(client.PlayReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is reading from path '%s', %d %s with %s", c.path.Name(),
c.log(logger.Info, "is reading from path '%s', %d %s with %s",
c.path.Name(),
c.conn.SetuppedTracksLen(),
func() string {
if c.conn.SetuppedTracksLen() == 1 {
@ -668,18 +577,19 @@ func (c *Client) startPlay() { @@ -668,18 +577,19 @@ func (c *Client) startPlay() {
}
}
func (c *Client) stopPlay() {
func (c *Client) playStop() {
if c.path.Conf().RunOnRead != "" {
c.onReadCmd.Close()
}
}
func (c *Client) startRecord() {
res := make(chan struct{})
c.path.OnClientRecord(RecordReq{c, res})
<-res
func (c *Client) recordStart() {
resc := make(chan struct{})
c.path.OnClientRecord(client.RecordReq{c, resc}) //nolint:govet
<-resc
c.log(logger.Info, "is publishing to path '%s', %d %s with %s", c.path.Name(),
c.log(logger.Info, "is publishing to path '%s', %d %s with %s",
c.path.Name(),
c.conn.SetuppedTracksLen(),
func() string {
if c.conn.SetuppedTracksLen() == 1 {
@ -697,14 +607,14 @@ func (c *Client) startRecord() { @@ -697,14 +607,14 @@ func (c *Client) startRecord() {
}
}
func (c *Client) stopRecord() {
func (c *Client) recordStop() {
if c.path.Conf().RunOnPublish != "" {
c.onPublishCmd.Close()
}
}
// OnReaderFrame implements path.Reader.
func (c *Client) OnReaderFrame(trackID int, streamType base.StreamType, buf []byte) {
func (c *Client) OnReaderFrame(trackID int, streamType gortsplib.StreamType, buf []byte) {
if !c.conn.HasSetuppedTrack(trackID) {
return
}

119
internal/path/path.go

@ -10,6 +10,7 @@ import ( @@ -10,6 +10,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
@ -29,11 +30,11 @@ func newEmptyTimer() *time.Timer { @@ -29,11 +30,11 @@ func newEmptyTimer() *time.Timer {
type Parent interface {
Log(logger.Level, string, ...interface{})
OnPathClose(*Path)
OnPathClientClose(*clientrtsp.Client)
OnPathClientClose(client.Client)
}
// a source can be
// * clientrtsp.Client
// source can be
// * client.Client
// * sourcertsp.Source
// * sourcertmp.Source
// * sourceRedirect
@ -85,10 +86,10 @@ type Path struct { @@ -85,10 +86,10 @@ type Path struct {
stats *stats.Stats
parent Parent
clients map[*clientrtsp.Client]clientState
clients map[client.Client]clientState
clientsWg sync.WaitGroup
describeRequests []clientrtsp.DescribeReq
setupPlayRequests []clientrtsp.SetupPlayReq
describeRequests []client.DescribeReq
setupPlayRequests []client.SetupPlayReq
source source
sourceTrackCount int
sourceSdp []byte
@ -105,15 +106,15 @@ type Path struct { @@ -105,15 +106,15 @@ type Path struct {
closeTimerStarted bool
// in
sourceSetReady chan struct{} // from source
sourceSetNotReady chan struct{} // from source
clientDescribe chan clientrtsp.DescribeReq // from program
clientAnnounce chan clientrtsp.AnnounceReq // from program
clientSetupPlay chan clientrtsp.SetupPlayReq // from program
clientPlay chan clientrtsp.PlayReq // from client
clientRecord chan clientrtsp.RecordReq // from client
clientPause chan clientrtsp.PauseReq // from client
clientRemove chan clientrtsp.RemoveReq // from client
sourceSetReady chan struct{} // from source
sourceSetNotReady chan struct{} // from source
clientDescribe chan client.DescribeReq // from program
clientAnnounce chan client.AnnounceReq // from program
clientSetupPlay chan client.SetupPlayReq // from program
clientPlay chan client.PlayReq // from client
clientRecord chan client.RecordReq // from client
clientPause chan client.PauseReq // from client
clientRemove chan client.RemoveReq // from client
terminate chan struct{}
}
@ -141,7 +142,7 @@ func New( @@ -141,7 +142,7 @@ func New(
wg: wg,
stats: stats,
parent: parent,
clients: make(map[*clientrtsp.Client]clientState),
clients: make(map[client.Client]clientState),
readers: newReadersMap(),
describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(),
@ -149,13 +150,13 @@ func New( @@ -149,13 +150,13 @@ func New(
closeTimer: newEmptyTimer(),
sourceSetReady: make(chan struct{}),
sourceSetNotReady: make(chan struct{}),
clientDescribe: make(chan clientrtsp.DescribeReq),
clientAnnounce: make(chan clientrtsp.AnnounceReq),
clientSetupPlay: make(chan clientrtsp.SetupPlayReq),
clientPlay: make(chan clientrtsp.PlayReq),
clientRecord: make(chan clientrtsp.RecordReq),
clientPause: make(chan clientrtsp.PauseReq),
clientRemove: make(chan clientrtsp.RemoveReq),
clientDescribe: make(chan client.DescribeReq),
clientAnnounce: make(chan client.AnnounceReq),
clientSetupPlay: make(chan client.SetupPlayReq),
clientPlay: make(chan client.PlayReq),
clientRecord: make(chan client.RecordReq),
clientPause: make(chan client.PauseReq),
clientRemove: make(chan client.RemoveReq),
terminate: make(chan struct{}),
}
@ -198,12 +199,12 @@ outer: @@ -198,12 +199,12 @@ outer:
select {
case <-pa.describeTimer.C:
for _, req := range pa.describeRequests {
req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("publisher of path '%s' has timed out", pa.name)} //nolint:govet
}
pa.setupPlayRequests = nil
@ -254,10 +255,10 @@ outer: @@ -254,10 +255,10 @@ outer:
case req := <-pa.clientAnnounce:
err := pa.onClientAnnounce(req.Client, req.Tracks)
if err != nil {
req.Res <- clientrtsp.AnnounceRes{nil, err} //nolint:govet
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue
}
req.Res <- clientrtsp.AnnounceRes{pa, nil} //nolint:govet
req.Res <- client.AnnounceRes{pa, nil} //nolint:govet
case req := <-pa.clientRecord:
pa.onClientRecord(req.Client)
@ -309,11 +310,11 @@ outer: @@ -309,11 +310,11 @@ outer:
}
for _, req := range pa.describeRequests {
req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
}
for _, req := range pa.setupPlayRequests {
req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
}
for c, state := range pa.clients {
@ -360,19 +361,19 @@ func (pa *Path) exhaustChannels() { @@ -360,19 +361,19 @@ func (pa *Path) exhaustChannels() {
if !ok {
return
}
req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientAnnounce:
if !ok {
return
}
req.Res <- clientrtsp.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientSetupPlay:
if !ok {
return
}
req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.clientPlay:
if !ok {
@ -457,16 +458,12 @@ func (pa *Path) hasClientsNotSources() bool { @@ -457,16 +458,12 @@ func (pa *Path) hasClientsNotSources() bool {
return false
}
func (pa *Path) addClient(c *clientrtsp.Client, state clientState) {
if _, ok := pa.clients[c]; ok {
panic("client already added")
}
func (pa *Path) addClient(c client.Client, state clientState) {
pa.clients[c] = state
pa.clientsWg.Add(1)
}
func (pa *Path) removeClient(c *clientrtsp.Client) {
func (pa *Path) removeClient(c client.Client) {
state := pa.clients[c]
pa.clients[c] = clientStatePreRemove
@ -506,7 +503,7 @@ func (pa *Path) onSourceSetReady() { @@ -506,7 +503,7 @@ func (pa *Path) onSourceSetReady() {
pa.sourceState = sourceStateReady
for _, req := range pa.describeRequests {
req.Res <- clientrtsp.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
}
pa.describeRequests = nil
@ -576,9 +573,9 @@ func (pa *Path) fixedPublisherStart() { @@ -576,9 +573,9 @@ func (pa *Path) fixedPublisherStart() {
}
}
func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) {
func (pa *Path) onClientDescribe(req client.DescribeReq) {
if _, ok := pa.clients[req.Client]; ok {
req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("already subscribed")} //nolint:govet
return
}
@ -586,13 +583,13 @@ func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) { @@ -586,13 +583,13 @@ func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) {
pa.scheduleClose()
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- clientrtsp.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet
req.Res <- client.DescribeRes{nil, pa.conf.SourceRedirect, nil} //nolint:govet
return
}
switch pa.sourceState {
case sourceStateReady:
req.Res <- clientrtsp.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
req.Res <- client.DescribeRes{pa.sourceSdp, "", nil} //nolint:govet
return
case sourceStateWaitingDescribe:
@ -601,18 +598,18 @@ func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) { @@ -601,18 +598,18 @@ func (pa *Path) onClientDescribe(req clientrtsp.DescribeReq) {
case sourceStateNotReady:
if pa.conf.Fallback != "" {
req.Res <- clientrtsp.DescribeRes{nil, pa.conf.Fallback, nil} //nolint:govet
req.Res <- client.DescribeRes{nil, pa.conf.Fallback, nil} //nolint:govet
return
}
req.Res <- clientrtsp.DescribeRes{nil, "", clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet
req.Res <- client.DescribeRes{nil, "", clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet
return
}
}
func (pa *Path) onClientSetupPlayPost(req clientrtsp.SetupPlayReq) {
func (pa *Path) onClientSetupPlayPost(req client.SetupPlayReq) {
if req.TrackID >= pa.sourceTrackCount {
req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("track %d does not exist", req.TrackID)} //nolint:govet
return
}
@ -632,10 +629,10 @@ func (pa *Path) onClientSetupPlayPost(req clientrtsp.SetupPlayReq) { @@ -632,10 +629,10 @@ func (pa *Path) onClientSetupPlayPost(req clientrtsp.SetupPlayReq) {
pa.addClient(req.Client, clientStatePrePlay)
}
req.Res <- clientrtsp.SetupPlayRes{pa, nil} //nolint:govet
req.Res <- client.SetupPlayRes{pa, nil} //nolint:govet
}
func (pa *Path) onClientSetupPlay(req clientrtsp.SetupPlayReq) {
func (pa *Path) onClientSetupPlay(req client.SetupPlayReq) {
pa.fixedPublisherStart()
pa.scheduleClose()
@ -649,12 +646,12 @@ func (pa *Path) onClientSetupPlay(req clientrtsp.SetupPlayReq) { @@ -649,12 +646,12 @@ func (pa *Path) onClientSetupPlay(req clientrtsp.SetupPlayReq) {
return
case sourceStateNotReady:
req.Res <- clientrtsp.SetupPlayRes{nil, clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet
req.Res <- client.SetupPlayRes{nil, clientrtsp.ErrNoOnePublishing{pa.name}} //nolint:govet
return
}
}
func (pa *Path) onClientPlay(c *clientrtsp.Client) {
func (pa *Path) onClientPlay(c client.Client) {
state, ok := pa.clients[c]
if !ok {
return
@ -670,7 +667,7 @@ func (pa *Path) onClientPlay(c *clientrtsp.Client) { @@ -670,7 +667,7 @@ func (pa *Path) onClientPlay(c *clientrtsp.Client) {
pa.readers.add(c)
}
func (pa *Path) onClientAnnounce(c *clientrtsp.Client, tracks gortsplib.Tracks) error {
func (pa *Path) onClientAnnounce(c client.Client, tracks gortsplib.Tracks) error {
if _, ok := pa.clients[c]; ok {
return fmt.Errorf("already subscribed")
}
@ -687,7 +684,7 @@ func (pa *Path) onClientAnnounce(c *clientrtsp.Client, tracks gortsplib.Tracks) @@ -687,7 +684,7 @@ func (pa *Path) onClientAnnounce(c *clientrtsp.Client, tracks gortsplib.Tracks)
return nil
}
func (pa *Path) onClientRecord(c *clientrtsp.Client) {
func (pa *Path) onClientRecord(c client.Client) {
state, ok := pa.clients[c]
if !ok {
return
@ -703,7 +700,7 @@ func (pa *Path) onClientRecord(c *clientrtsp.Client) { @@ -703,7 +700,7 @@ func (pa *Path) onClientRecord(c *clientrtsp.Client) {
pa.onSourceSetReady()
}
func (pa *Path) onClientPause(c *clientrtsp.Client) {
func (pa *Path) onClientPause(c client.Client) {
state, ok := pa.clients[c]
if !ok {
return
@ -803,37 +800,37 @@ func (pa *Path) OnSourceSetNotReady() { @@ -803,37 +800,37 @@ func (pa *Path) OnSourceSetNotReady() {
}
// OnPathManDescribe is called by pathman.PathMan.
func (pa *Path) OnPathManDescribe(req clientrtsp.DescribeReq) {
func (pa *Path) OnPathManDescribe(req client.DescribeReq) {
pa.clientDescribe <- req
}
// OnPathManSetupPlay is called by pathman.PathMan.
func (pa *Path) OnPathManSetupPlay(req clientrtsp.SetupPlayReq) {
func (pa *Path) OnPathManSetupPlay(req client.SetupPlayReq) {
pa.clientSetupPlay <- req
}
// OnPathManAnnounce is called by pathman.PathMan.
func (pa *Path) OnPathManAnnounce(req clientrtsp.AnnounceReq) {
func (pa *Path) OnPathManAnnounce(req client.AnnounceReq) {
pa.clientAnnounce <- req
}
// OnClientRemove is called by clientrtsp.Client.
func (pa *Path) OnClientRemove(req clientrtsp.RemoveReq) {
func (pa *Path) OnClientRemove(req client.RemoveReq) {
pa.clientRemove <- req
}
// OnClientPlay is called by clientrtsp.Client.
func (pa *Path) OnClientPlay(req clientrtsp.PlayReq) {
func (pa *Path) OnClientPlay(req client.PlayReq) {
pa.clientPlay <- req
}
// OnClientRecord is called by clientrtsp.Client.
func (pa *Path) OnClientRecord(req clientrtsp.RecordReq) {
func (pa *Path) OnClientRecord(req client.RecordReq) {
pa.clientRecord <- req
}
// OnClientPause is called by clientrtsp.Client.
func (pa *Path) OnClientPause(req clientrtsp.PauseReq) {
func (pa *Path) OnClientPause(req client.PauseReq) {
pa.clientPause <- req
}

3
internal/path/readersmap.go

@ -4,11 +4,10 @@ import ( @@ -4,11 +4,10 @@ import (
"sync"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
)
type reader interface {
OnReaderFrame(int, base.StreamType, []byte)
OnReaderFrame(int, gortsplib.StreamType, []byte)
}
type readersMap struct {

70
internal/pathman/pathman.go

@ -8,7 +8,7 @@ import ( @@ -8,7 +8,7 @@ import (
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/rtsp-simple-server/internal/clientrtsp"
"github.com/aler9/rtsp-simple-server/internal/client"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/path"
@ -37,13 +37,13 @@ type PathManager struct { @@ -37,13 +37,13 @@ type PathManager struct {
// in
confReload chan map[string]*conf.PathConf
pathClose chan *path.Path
clientDescribe chan clientrtsp.DescribeReq
clientAnnounce chan clientrtsp.AnnounceReq
clientSetupPlay chan clientrtsp.SetupPlayReq
clientDescribe chan client.DescribeReq
clientAnnounce chan client.AnnounceReq
clientSetupPlay chan client.SetupPlayReq
terminate chan struct{}
// out
clientClose chan *clientrtsp.Client
clientClose chan client.Client
done chan struct{}
}
@ -70,11 +70,11 @@ func New( @@ -70,11 +70,11 @@ func New(
paths: make(map[string]*path.Path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path.Path),
clientDescribe: make(chan clientrtsp.DescribeReq),
clientAnnounce: make(chan clientrtsp.AnnounceReq),
clientSetupPlay: make(chan clientrtsp.SetupPlayReq),
clientDescribe: make(chan client.DescribeReq),
clientAnnounce: make(chan client.AnnounceReq),
clientSetupPlay: make(chan client.SetupPlayReq),
terminate: make(chan struct{}),
clientClose: make(chan *clientrtsp.Client),
clientClose: make(chan client.Client),
done: make(chan struct{}),
}
@ -149,14 +149,14 @@ outer: @@ -149,14 +149,14 @@ outer:
case req := <-pm.clientDescribe:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- clientrtsp.DescribeRes{nil, "", err} //nolint:govet
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet
continue
}
err = req.Client.Authenticate(pm.authMethods, pathConf.ReadIpsParsed,
pathConf.ReadUser, pathConf.ReadPass, req.Req, nil)
if err != nil {
req.Res <- clientrtsp.DescribeRes{nil, "", err} //nolint:govet
req.Res <- client.DescribeRes{nil, "", err} //nolint:govet
continue
}
@ -181,7 +181,7 @@ outer: @@ -181,7 +181,7 @@ outer:
case req := <-pm.clientAnnounce:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- clientrtsp.AnnounceRes{nil, err} //nolint:govet
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue
}
@ -189,7 +189,7 @@ outer: @@ -189,7 +189,7 @@ outer:
pathConf.PublishIpsParsed, pathConf.PublishUser,
pathConf.PublishPass, req.Req, nil)
if err != nil {
req.Res <- clientrtsp.AnnounceRes{nil, err} //nolint:govet
req.Res <- client.AnnounceRes{nil, err} //nolint:govet
continue
}
@ -214,23 +214,29 @@ outer: @@ -214,23 +214,29 @@ outer:
case req := <-pm.clientSetupPlay:
pathName, pathConf, err := pm.findPathConf(req.PathName)
if err != nil {
req.Res <- clientrtsp.SetupPlayRes{nil, err} //nolint:govet
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
continue
}
// VLC strips the control attribute
// provide an alternative URL without the control attribute
altURL := &base.URL{
Scheme: req.Req.URL.Scheme,
Host: req.Req.URL.Host,
Path: "/" + req.PathName + "/",
}
altURL := func() *base.URL {
if req.Req == nil {
return nil
}
return &base.URL{
Scheme: req.Req.URL.Scheme,
Host: req.Req.URL.Host,
Path: "/" + req.PathName + "/",
}
}()
err = req.Client.Authenticate(pm.authMethods,
pathConf.ReadIpsParsed, pathConf.ReadUser, pathConf.ReadPass,
req.Req, altURL)
if err != nil {
req.Res <- clientrtsp.SetupPlayRes{nil, err} //nolint:govet
req.Res <- client.SetupPlayRes{nil, err} //nolint:govet
continue
}
@ -274,19 +280,19 @@ outer: @@ -274,19 +280,19 @@ outer:
if !ok {
return
}
req.Res <- clientrtsp.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientAnnounce:
if !ok {
return
}
req.Res <- clientrtsp.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.clientSetupPlay:
if !ok {
return
}
req.Res <- clientrtsp.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
req.Res <- client.SetupPlayRes{nil, fmt.Errorf("terminated")} //nolint:govet
}
}
}()
@ -355,26 +361,26 @@ func (pm *PathManager) OnPathClose(pa *path.Path) { @@ -355,26 +361,26 @@ func (pm *PathManager) OnPathClose(pa *path.Path) {
}
// OnPathClientClose is called by path.Path.
func (pm *PathManager) OnPathClientClose(c *clientrtsp.Client) {
func (pm *PathManager) OnPathClientClose(c client.Client) {
pm.clientClose <- c
}
// OnClientDescribe is called by clientrtsp.Client.
func (pm *PathManager) OnClientDescribe(req clientrtsp.DescribeReq) {
// OnClientDescribe is called by clientman.ClientMan.
func (pm *PathManager) OnClientDescribe(req client.DescribeReq) {
pm.clientDescribe <- req
}
// OnClientAnnounce is called by clientrtsp.Client.
func (pm *PathManager) OnClientAnnounce(req clientrtsp.AnnounceReq) {
// OnClientAnnounce is called by clientman.ClientMan.
func (pm *PathManager) OnClientAnnounce(req client.AnnounceReq) {
pm.clientAnnounce <- req
}
// OnClientSetupPlay is called by clientrtsp.Client.
func (pm *PathManager) OnClientSetupPlay(req clientrtsp.SetupPlayReq) {
// OnClientSetupPlay is called by clientman.ClientMan.
func (pm *PathManager) OnClientSetupPlay(req client.SetupPlayReq) {
pm.clientSetupPlay <- req
}
// ClientClose is called by clientrtsp.Client.
func (pm *PathManager) ClientClose() chan *clientrtsp.Client {
// ClientClose is called by clientman.ClientMan.
func (pm *PathManager) ClientClose() chan client.Client {
return pm.clientClose
}

13
internal/rtmputils/connpair.go

@ -0,0 +1,13 @@ @@ -0,0 +1,13 @@
package rtmputils
import (
"net"
"github.com/notedit/rtmp/format/rtmp"
)
// ConnPair contains a RTMP connection and a net connection.
type ConnPair struct {
RConn *rtmp.Conn
NConn net.Conn
}

13
internal/rtmpinfo/rtmpinfo.go → internal/rtmputils/metadata.go

@ -1,8 +1,7 @@ @@ -1,8 +1,7 @@
package rtmpinfo
package rtmputils
import (
"fmt"
"net"
"time"
"github.com/aler9/gortsplib"
@ -44,16 +43,16 @@ func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) { @@ -44,16 +43,16 @@ func readMetadata(rconn *rtmp.Conn) (flvio.AMFMap, error) {
return ma, nil
}
// Info extracts track informations from a RTMP connection.
func Info(rconn *rtmp.Conn, nconn net.Conn, readTimeout time.Duration) (
// Metadata extracts track informations from a RTMP connection that is publishing.
func Metadata(conn ConnPair, readTimeout time.Duration) (
*gortsplib.Track, *gortsplib.Track, error) {
var videoTrack *gortsplib.Track
var audioTrack *gortsplib.Track
// configuration must be completed within readTimeout
nconn.SetReadDeadline(time.Now().Add(readTimeout))
conn.NConn.SetReadDeadline(time.Now().Add(readTimeout))
md, err := readMetadata(rconn)
md, err := readMetadata(conn.RConn)
if err != nil {
return nil, nil, err
}
@ -87,7 +86,7 @@ func Info(rconn *rtmp.Conn, nconn net.Conn, readTimeout time.Duration) ( @@ -87,7 +86,7 @@ func Info(rconn *rtmp.Conn, nconn net.Conn, readTimeout time.Duration) (
for {
var pkt av.Packet
pkt, err = rconn.ReadPacket()
pkt, err = conn.RConn.ReadPacket()
if err != nil {
return nil, nil, err
}

79
internal/rtmputils/rtcpsenderset.go

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
package rtmputils
import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtcpsender"
)
// RTCPSenderSet is a set of RTCP senders.
type RTCPSenderSet struct {
onFrame func(int, gortsplib.StreamType, []byte)
senders []*rtcpsender.RTCPSender
// in
terminate chan struct{}
// out
done chan struct{}
}
// NewRTCPSenderSet allocates a RTCPSenderSet.
func NewRTCPSenderSet(
tracks gortsplib.Tracks,
onFrame func(int, gortsplib.StreamType, []byte),
) *RTCPSenderSet {
s := &RTCPSenderSet{
onFrame: onFrame,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
s.senders = make([]*rtcpsender.RTCPSender, len(tracks))
for i, t := range tracks {
clockRate, _ := t.ClockRate()
s.senders[i] = rtcpsender.New(clockRate)
}
go s.run()
return s
}
// Close closes a RTCPSenderSet.
func (s *RTCPSenderSet) Close() {
close(s.terminate)
<-s.done
}
func (s *RTCPSenderSet) run() {
defer close(s.done)
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
now := time.Now()
for i, sender := range s.senders {
r := sender.Report(now)
if r != nil {
s.onFrame(i, gortsplib.StreamTypeRTCP, r)
}
}
case <-s.terminate:
return
}
}
}
// ProcessFrame sends a frame to the senders.
func (s *RTCPSenderSet) ProcessFrame(trackID int, t time.Time,
streamType gortsplib.StreamType, f []byte) {
s.senders[trackID].ProcessFrame(t, streamType, f)
}

27
internal/serverrtmp/server.go

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package serverrtmp
import (
"fmt"
"net"
"strconv"
"sync"
@ -9,6 +8,7 @@ import ( @@ -9,6 +8,7 @@ import (
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
)
// Parent is implemented by program.
@ -21,6 +21,8 @@ type Server struct { @@ -21,6 +21,8 @@ type Server struct {
l net.Listener
srv *rtmp.Server
wg sync.WaitGroup
accept chan rtmputils.ConnPair
}
// New allocates a Server.
@ -36,7 +38,8 @@ func New( @@ -36,7 +38,8 @@ func New(
}
s := &Server{
l: l,
l: l,
accept: make(chan rtmputils.ConnPair),
}
s.srv = rtmp.NewServer()
@ -52,8 +55,14 @@ func New( @@ -52,8 +55,14 @@ func New(
// Close closes a Server.
func (s *Server) Close() {
go func() {
for co := range s.accept {
co.NConn.Close()
}
}()
s.l.Close()
s.wg.Wait()
close(s.accept)
}
func (s *Server) run() {
@ -74,14 +83,10 @@ func (s *Server) run() { @@ -74,14 +83,10 @@ func (s *Server) run() {
}
func (s *Server) innerHandleConn(rconn *rtmp.Conn, nconn net.Conn) {
defer nconn.Close()
err := func() error {
if !rconn.Publishing {
return fmt.Errorf("not publishing")
}
s.accept <- rtmputils.ConnPair{rconn, nconn} //nolint:govet
}
return nil
}()
fmt.Println("ERR", err)
// Accept returns a channel to accept incoming connections.
func (s *Server) Accept() chan rtmputils.ConnPair {
return s.accept
}

99
internal/sourcertmp/source.go

@ -8,7 +8,6 @@ import ( @@ -8,7 +8,6 @@ import (
"time"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtcpsender"
"github.com/aler9/gortsplib/pkg/rtpaac"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
@ -16,7 +15,7 @@ import ( @@ -16,7 +15,7 @@ import (
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
"github.com/aler9/rtsp-simple-server/internal/rtmpinfo"
"github.com/aler9/rtsp-simple-server/internal/rtmputils"
"github.com/aler9/rtsp-simple-server/internal/stats"
)
@ -113,13 +112,15 @@ func (s *Source) run() { @@ -113,13 +112,15 @@ func (s *Source) run() {
func (s *Source) runInner() bool {
s.log(logger.Info, "connecting")
var rconn *rtmp.Conn
var nconn net.Conn
var conn rtmputils.ConnPair
var err error
dialDone := make(chan struct{}, 1)
go func() {
defer close(dialDone)
var rconn *rtmp.Conn
var nconn net.Conn
rconn, nconn, err = rtmp.NewClient().Dial(s.ur, rtmp.PrepareReading)
conn = rtmputils.ConnPair{rconn, nconn} //nolint:govet
}()
select {
@ -135,17 +136,18 @@ func (s *Source) runInner() bool { @@ -135,17 +136,18 @@ func (s *Source) runInner() bool {
var videoTrack *gortsplib.Track
var audioTrack *gortsplib.Track
confDone := make(chan struct{})
metadataDone := make(chan struct{})
go func() {
defer close(confDone)
videoTrack, audioTrack, err = rtmpinfo.Info(rconn, nconn, s.readTimeout)
defer close(metadataDone)
videoTrack, audioTrack, err = rtmputils.Metadata(
conn, s.readTimeout) //nolint:govet
}()
select {
case <-confDone:
case <-metadataDone:
case <-s.terminate:
nconn.Close()
<-confDone
conn.NConn.Close()
<-metadataDone
return false
}
@ -155,34 +157,29 @@ func (s *Source) runInner() bool { @@ -155,34 +157,29 @@ func (s *Source) runInner() bool {
}
var tracks gortsplib.Tracks
var videoRTCPSender *rtcpsender.RTCPSender
var h264Encoder *rtph264.Encoder
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
var h264Encoder *rtph264.Encoder
if videoTrack != nil {
clockRate, _ := videoTrack.ClockRate()
var err error
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
nconn.Close()
conn.NConn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
videoRTCPSender = rtcpsender.New(clockRate)
tracks = append(tracks, videoTrack)
}
var aacEncoder *rtpaac.Encoder
if audioTrack != nil {
clockRate, _ := audioTrack.ClockRate()
var err error
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
nconn.Close()
conn.NConn.Close()
s.log(logger.Info, "ERR: %s", err)
return true
}
audioRTCPSender = rtcpsender.New(clockRate)
tracks = append(tracks, audioTrack)
}
@ -194,45 +191,15 @@ func (s *Source) runInner() bool { @@ -194,45 +191,15 @@ func (s *Source) runInner() bool {
s.parent.OnSourceSetReady(tracks)
defer s.parent.OnSourceSetNotReady()
rtcpTerminate := make(chan struct{})
rtcpDone := make(chan struct{})
go func() {
close(rtcpDone)
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
now := time.Now()
if videoRTCPSender != nil {
r := videoRTCPSender.Report(now)
if r != nil {
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTCP, r)
}
}
if audioRTCPSender != nil {
r := audioRTCPSender.Report(now)
if r != nil {
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTCP, r)
}
}
case <-rtcpTerminate:
return
}
}
}()
readerDone := make(chan error)
go func() {
readerDone <- func() error {
rtcpSenders := rtmputils.NewRTCPSenderSet(tracks, s.parent.OnFrame)
defer rtcpSenders.Close()
for {
nconn.SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := rconn.ReadPacket()
conn.NConn.SetReadDeadline(time.Now().Add(s.readTimeout))
pkt, err := conn.RConn.ReadPacket()
if err != nil {
return err
}
@ -240,7 +207,7 @@ func (s *Source) runInner() bool { @@ -240,7 +207,7 @@ func (s *Source) runInner() bool {
switch pkt.Type {
case av.H264:
if videoTrack == nil {
return fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up")
return fmt.Errorf("ERR: received an H264 frame, but track is not set up")
}
// decode from AVCC format
@ -256,13 +223,13 @@ func (s *Source) runInner() bool { @@ -256,13 +223,13 @@ func (s *Source) runInner() bool {
}
for _, f := range frames {
videoRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
rtcpSenders.ProcessFrame(videoTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f)
}
case av.AAC:
if audioTrack == nil {
return fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up")
return fmt.Errorf("ERR: received an AAC frame, but track is not set up")
}
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data)
@ -271,12 +238,12 @@ func (s *Source) runInner() bool { @@ -271,12 +238,12 @@ func (s *Source) runInner() bool {
}
for _, f := range frames {
audioRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
rtcpSenders.ProcessFrame(audioTrack.ID, time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f)
}
default:
return fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type)
return fmt.Errorf("ERR: unexpected packet: %v", pkt.Type)
}
}
}()
@ -285,19 +252,13 @@ func (s *Source) runInner() bool { @@ -285,19 +252,13 @@ func (s *Source) runInner() bool {
for {
select {
case err := <-readerDone:
nconn.Close()
conn.NConn.Close()
s.log(logger.Info, "ERR: %s", err)
close(rtcpTerminate)
<-rtcpDone
return true
case <-s.terminate:
nconn.Close()
conn.NConn.Close()
<-readerDone
close(rtcpTerminate)
<-rtcpDone
return false
}
}

22
main.go

@ -211,7 +211,8 @@ func (p *program) createResources(initial bool) error { @@ -211,7 +211,8 @@ func (p *program) createResources(initial bool) error {
}
}
if p.conf.EncryptionParsed == conf.EncryptionNo || p.conf.EncryptionParsed == conf.EncryptionOptional {
if p.conf.EncryptionParsed == conf.EncryptionNo ||
p.conf.EncryptionParsed == conf.EncryptionOptional {
if p.serverPlain == nil {
conf := gortsplib.ServerConf{
ReadTimeout: p.conf.ReadTimeout,
@ -232,7 +233,8 @@ func (p *program) createResources(initial bool) error { @@ -232,7 +233,8 @@ func (p *program) createResources(initial bool) error {
}
}
if p.conf.EncryptionParsed == conf.EncryptionStrict || p.conf.EncryptionParsed == conf.EncryptionOptional {
if p.conf.EncryptionParsed == conf.EncryptionStrict ||
p.conf.EncryptionParsed == conf.EncryptionOptional {
if p.serverTLS == nil {
cert, err := tls.LoadX509KeyPair(p.conf.ServerCert, p.conf.ServerKey)
if err != nil {
@ -292,6 +294,7 @@ func (p *program) createResources(initial bool) error { @@ -292,6 +294,7 @@ func (p *program) createResources(initial bool) error {
p.pathMan,
p.serverPlain,
p.serverTLS,
p.serverRTMP,
p)
}
@ -362,6 +365,15 @@ func (p *program) closeResources(newConf *conf.Conf) { @@ -362,6 +365,15 @@ func (p *program) closeResources(newConf *conf.Conf) {
closeServerTLS = true
}
closeServerRTMP := false
if newConf == nil ||
newConf.EncryptionParsed != p.conf.EncryptionParsed ||
newConf.ListenIP != p.conf.ListenIP ||
newConf.RTMPPort != p.conf.RTMPPort ||
newConf.ReadTimeout != p.conf.ReadTimeout {
closeServerRTMP = true
}
closePathMan := false
if newConf == nil ||
newConf.RTSPPort != p.conf.RTSPPort ||
@ -378,6 +390,7 @@ func (p *program) closeResources(newConf *conf.Conf) { @@ -378,6 +390,7 @@ func (p *program) closeResources(newConf *conf.Conf) {
if newConf == nil ||
closeServerPlain ||
closeServerTLS ||
closeServerRTMP ||
closePathMan ||
newConf.RTSPPort != p.conf.RTSPPort ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
@ -402,6 +415,11 @@ func (p *program) closeResources(newConf *conf.Conf) { @@ -402,6 +415,11 @@ func (p *program) closeResources(newConf *conf.Conf) {
p.pathMan = nil
}
if closeServerRTMP && p.serverRTMP != nil {
p.serverRTMP.Close()
p.serverRTMP = nil
}
if closeServerTLS && p.serverTLS != nil {
p.serverTLS.Close()
p.serverTLS = nil

19
main_test.go

@ -561,7 +561,6 @@ func TestAuthFail(t *testing.T) { @@ -561,7 +561,6 @@ func TestAuthFail(t *testing.T) {
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 1, cnt2.wait())
})
}
@ -618,7 +617,6 @@ func TestAuthFail(t *testing.T) { @@ -618,7 +617,6 @@ func TestAuthFail(t *testing.T) {
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 1, cnt2.wait())
})
}
@ -642,7 +640,6 @@ func TestAuthIpFail(t *testing.T) { @@ -642,7 +640,6 @@ func TestAuthIpFail(t *testing.T) {
})
require.NoError(t, err)
defer cnt1.close()
require.NotEqual(t, 0, cnt1.wait())
}
@ -810,7 +807,6 @@ func TestRedirect(t *testing.T) { @@ -810,7 +807,6 @@ func TestRedirect(t *testing.T) {
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
@ -845,7 +841,6 @@ func TestFallback(t *testing.T) { @@ -845,7 +841,6 @@ func TestFallback(t *testing.T) {
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
@ -867,6 +862,17 @@ func TestRTMP(t *testing.T) { @@ -867,6 +862,17 @@ func TestRTMP(t *testing.T) {
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-rtsp_transport", "udp",
"-i", "rtsp://" + ownDockerIP + ":8554/test1/test2",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestRunOnDemand(t *testing.T) {
@ -1069,7 +1075,6 @@ func TestHotReloading(t *testing.T) { @@ -1069,7 +1075,6 @@ func TestHotReloading(t *testing.T) {
})
require.NoError(t, err)
defer cnt1.close()
require.Equal(t, 0, cnt1.wait())
}()
@ -1093,7 +1098,6 @@ func TestHotReloading(t *testing.T) { @@ -1093,7 +1098,6 @@ func TestHotReloading(t *testing.T) {
})
require.NoError(t, err)
defer cnt1.close()
require.Equal(t, 1, cnt1.wait())
}()
@ -1106,7 +1110,6 @@ func TestHotReloading(t *testing.T) { @@ -1106,7 +1110,6 @@ func TestHotReloading(t *testing.T) {
})
require.NoError(t, err)
defer cnt1.close()
require.Equal(t, 0, cnt1.wait())
}()
}

Loading…
Cancel
Save