Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

308 lines
6.4 KiB

// Package rtmp contains a RTMP server.
package rtmp
import (
"context"
"crypto/tls"
"fmt"
"net"
"sort"
"sync"
"github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/restrictnetwork"
)
type serverAPIConnsListRes struct {
data *defs.APIRTMPConnList
err error
}
type serverAPIConnsListReq struct {
res chan serverAPIConnsListRes
}
type serverAPIConnsGetRes struct {
data *defs.APIRTMPConn
err error
}
type serverAPIConnsGetReq struct {
uuid uuid.UUID
res chan serverAPIConnsGetRes
}
type serverAPIConnsKickRes struct {
err error
}
type serverAPIConnsKickReq struct {
uuid uuid.UUID
res chan serverAPIConnsKickRes
}
type serverParent interface {
logger.Writer
}
// Server is a RTMP server.
type Server struct {
Address string
ReadTimeout conf.StringDuration
WriteTimeout conf.StringDuration
WriteQueueSize int
IsTLS bool
ServerCert string
ServerKey string
RTSPAddress string
RunOnConnect string
RunOnConnectRestart bool
RunOnDisconnect string
ExternalCmdPool *externalcmd.Pool
PathManager defs.PathManager
Parent serverParent
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
conns map[*conn]struct{}
// in
chNewConn chan net.Conn
chAcceptErr chan error
chCloseConn chan *conn
chAPIConnsList chan serverAPIConnsListReq
chAPIConnsGet chan serverAPIConnsGetReq
chAPIConnsKick chan serverAPIConnsKickReq
}
// Initialize initializes the server.
func (s *Server) Initialize() error {
ln, err := func() (net.Listener, error) {
if !s.IsTLS {
return net.Listen(restrictnetwork.Restrict("tcp", s.Address))
}
cert, err := tls.LoadX509KeyPair(s.ServerCert, s.ServerKey)
if err != nil {
return nil, err
}
network, address := restrictnetwork.Restrict("tcp", s.Address)
return tls.Listen(network, address, &tls.Config{Certificates: []tls.Certificate{cert}})
}()
if err != nil {
return err
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.ln = ln
s.conns = make(map[*conn]struct{})
s.chNewConn = make(chan net.Conn)
s.chAcceptErr = make(chan error)
s.chCloseConn = make(chan *conn)
s.chAPIConnsList = make(chan serverAPIConnsListReq)
s.chAPIConnsGet = make(chan serverAPIConnsGetReq)
s.chAPIConnsKick = make(chan serverAPIConnsKickReq)
s.Log(logger.Info, "listener opened on %s", s.Address)
l := &listener{
ln: s.ln,
wg: &s.wg,
parent: s,
}
l.initialize()
s.wg.Add(1)
go s.run()
return nil
}
// Log implements logger.Writer.
func (s *Server) Log(level logger.Level, format string, args ...interface{}) {
label := func() string {
if s.IsTLS {
return "RTMPS"
}
return "RTMP"
}()
s.Parent.Log(level, "[%s] "+format, append([]interface{}{label}, args...)...)
}
// Close closes the server.
func (s *Server) Close() {
s.Log(logger.Info, "listener is closing")
s.ctxCancel()
s.wg.Wait()
}
func (s *Server) run() {
defer s.wg.Done()
outer:
for {
select {
case err := <-s.chAcceptErr:
s.Log(logger.Error, "%s", err)
break outer
case nconn := <-s.chNewConn:
c := &conn{
parentCtx: s.ctx,
isTLS: s.IsTLS,
rtspAddress: s.RTSPAddress,
readTimeout: s.ReadTimeout,
writeTimeout: s.WriteTimeout,
writeQueueSize: s.WriteQueueSize,
runOnConnect: s.RunOnConnect,
runOnConnectRestart: s.RunOnConnectRestart,
runOnDisconnect: s.RunOnDisconnect,
wg: &s.wg,
nconn: nconn,
externalCmdPool: s.ExternalCmdPool,
pathManager: s.PathManager,
parent: s,
}
c.initialize()
s.conns[c] = struct{}{}
case c := <-s.chCloseConn:
delete(s.conns, c)
case req := <-s.chAPIConnsList:
data := &defs.APIRTMPConnList{
Items: []*defs.APIRTMPConn{},
}
for c := range s.conns {
data.Items = append(data.Items, c.apiItem())
}
sort.Slice(data.Items, func(i, j int) bool {
return data.Items[i].Created.Before(data.Items[j].Created)
})
req.res <- serverAPIConnsListRes{data: data}
case req := <-s.chAPIConnsGet:
c := s.findConnByUUID(req.uuid)
if c == nil {
req.res <- serverAPIConnsGetRes{err: fmt.Errorf("connection not found")}
continue
}
req.res <- serverAPIConnsGetRes{data: c.apiItem()}
case req := <-s.chAPIConnsKick:
c := s.findConnByUUID(req.uuid)
if c == nil {
req.res <- serverAPIConnsKickRes{err: fmt.Errorf("connection not found")}
continue
}
delete(s.conns, c)
c.Close()
req.res <- serverAPIConnsKickRes{}
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
s.ln.Close()
}
func (s *Server) findConnByUUID(uuid uuid.UUID) *conn {
for c := range s.conns {
if c.uuid == uuid {
return c
}
}
return nil
}
// newConn is called by rtmpListener.
func (s *Server) newConn(conn net.Conn) {
select {
case s.chNewConn <- conn:
case <-s.ctx.Done():
conn.Close()
}
}
// acceptError is called by rtmpListener.
func (s *Server) acceptError(err error) {
select {
case s.chAcceptErr <- err:
case <-s.ctx.Done():
}
}
// closeConn is called by conn.
func (s *Server) closeConn(c *conn) {
select {
case s.chCloseConn <- c:
case <-s.ctx.Done():
}
}
// APIConnsList is called by api.
func (s *Server) APIConnsList() (*defs.APIRTMPConnList, error) {
req := serverAPIConnsListReq{
res: make(chan serverAPIConnsListRes),
}
select {
case s.chAPIConnsList <- req:
res := <-req.res
return res.data, res.err
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// APIConnsGet is called by api.
func (s *Server) APIConnsGet(uuid uuid.UUID) (*defs.APIRTMPConn, error) {
req := serverAPIConnsGetReq{
uuid: uuid,
res: make(chan serverAPIConnsGetRes),
}
select {
case s.chAPIConnsGet <- req:
res := <-req.res
return res.data, res.err
case <-s.ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
// APIConnsKick is called by api.
func (s *Server) APIConnsKick(uuid uuid.UUID) error {
req := serverAPIConnsKickReq{
uuid: uuid,
res: make(chan serverAPIConnsKickRes),
}
select {
case s.chAPIConnsKick <- req:
res := <-req.res
return res.err
case <-s.ctx.Done():
return fmt.Errorf("terminated")
}
}