Ready-to-use SRT / WebRTC / RTSP / RTMP / LL-HLS media server and media proxy that allows to read, publish, proxy, record and playback video and audio streams.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

192 lines
4.1 KiB

// Package rtsp contains the RTSP static source.
package rtsp
import (
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/defs"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/protocols/tls"
)
func createRangeHeader(cnf *conf.Path) (*headers.Range, error) {
switch cnf.RTSPRangeType {
case conf.RTSPRangeTypeClock:
start, err := time.Parse("20060102T150405Z", cnf.RTSPRangeStart)
if err != nil {
return nil, err
}
return &headers.Range{
Value: &headers.RangeUTC{
Start: start,
},
}, nil
case conf.RTSPRangeTypeNPT:
start, err := time.ParseDuration(cnf.RTSPRangeStart)
if err != nil {
return nil, err
}
return &headers.Range{
Value: &headers.RangeNPT{
Start: start,
},
}, nil
case conf.RTSPRangeTypeSMPTE:
start, err := time.ParseDuration(cnf.RTSPRangeStart)
if err != nil {
return nil, err
}
return &headers.Range{
Value: &headers.RangeSMPTE{
Start: headers.RangeSMPTETime{
Time: start,
},
},
}, nil
default:
return nil, nil
}
}
// Source is a RTSP static source.
type Source struct {
ReadTimeout conf.StringDuration
WriteTimeout conf.StringDuration
WriteQueueSize int
Parent defs.StaticSourceParent
}
// Log implements StaticSource.
func (s *Source) Log(level logger.Level, format string, args ...interface{}) {
s.Parent.Log(level, "[RTSP source] "+format, args...)
}
// Run implements StaticSource.
func (s *Source) Run(params defs.StaticSourceRunParams) error {
s.Log(logger.Debug, "connecting")
decodeErrLogger := logger.NewLimitedLogger(s)
c := &gortsplib.Client{
Transport: params.Conf.RTSPTransport.Transport,
TLSConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
ReadTimeout: time.Duration(s.ReadTimeout),
WriteTimeout: time.Duration(s.WriteTimeout),
WriteQueueSize: s.WriteQueueSize,
AnyPortEnable: params.Conf.RTSPAnyPort,
OnRequest: func(req *base.Request) {
s.Log(logger.Debug, "[c->s] %v", req)
},
OnResponse: func(res *base.Response) {
s.Log(logger.Debug, "[s->c] %v", res)
},
OnTransportSwitch: func(err error) {
s.Log(logger.Warn, err.Error())
},
OnPacketLost: func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error())
},
OnDecodeError: func(err error) {
decodeErrLogger.Log(logger.Warn, err.Error())
},
}
u, err := url.Parse(params.Conf.Source)
if err != nil {
return err
}
err = c.Start(u.Scheme, u.Host)
if err != nil {
return err
}
defer c.Close()
readErr := make(chan error)
go func() {
readErr <- func() error {
desc, _, err := c.Describe(u)
if err != nil {
return err
}
err = c.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return err
}
res := s.Parent.SetReady(defs.PathSourceStaticSetReadyReq{
Desc: desc,
GenerateRTPPackets: false,
})
if res.Err != nil {
return res.Err
}
defer s.Parent.SetNotReady(defs.PathSourceStaticSetNotReadyReq{})
for _, medi := range desc.Medias {
for _, forma := range medi.Formats {
cmedi := medi
cforma := forma
c.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
pts, ok := c.PacketPTS(cmedi, pkt)
if !ok {
return
}
res.Stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now(), pts)
})
}
}
rangeHeader, err := createRangeHeader(params.Conf)
if err != nil {
return err
}
_, err = c.Play(rangeHeader)
if err != nil {
return err
}
return c.Wait()
}()
}()
for {
select {
case err := <-readErr:
return err
case <-params.ReloadConf:
case <-params.Context.Done():
c.Close()
<-readErr
return nil
}
}
}
// APISourceDescribe implements StaticSource.
func (*Source) APISourceDescribe() defs.APIPathSourceOrReader {
return defs.APIPathSourceOrReader{
Type: "rtspSource",
ID: "",
}
}