Browse Source

hls: new option hlsAlwaysRemux to always remux streams into HLS, not only when requested

pull/483/head
aler9 5 years ago
parent
commit
1146d5cf1b
  1. 1
      internal/conf/conf.go
  2. 27
      internal/core/core.go
  3. 140
      internal/core/hls_remuxer.go
  4. 79
      internal/core/hls_server.go
  5. 80
      internal/core/path.go
  6. 6
      internal/core/path_manager.go
  7. 4
      internal/core/read_publisher.go
  8. 62
      internal/core/rtmp_conn.go
  9. 14
      internal/core/rtmp_source.go
  10. 6
      internal/core/rtsp_conn.go
  11. 117
      internal/core/rtsp_session.go
  12. 12
      internal/core/rtsp_source.go
  13. 33
      internal/hls/muxer.go
  14. 3
      rtsp-simple-server.yml

1
internal/conf/conf.go

@ -98,6 +98,7 @@ type Conf struct { @@ -98,6 +98,7 @@ type Conf struct {
// hls
HLSDisable bool `yaml:"hlsDisable"`
HLSAddress string `yaml:"hlsAddress"`
HLSAlwaysRemux bool `yaml:"hlsAlwaysRemux"`
HLSSegmentCount int `yaml:"hlsSegmentCount"`
HLSSegmentDuration time.Duration `yaml:"hlsSegmentDuration"`
HLSAllowOrigin string `yaml:"hlsAllowOrigin"`

27
internal/core/core.go

@ -35,6 +35,9 @@ type Core struct { @@ -35,6 +35,9 @@ type Core struct {
hlsServer *hlsServer
confWatcher *confwatcher.ConfWatcher
// in
pathSourceReady chan *path
// out
done chan struct{}
}
@ -62,10 +65,11 @@ func New(args []string) (*Core, bool) { @@ -62,10 +65,11 @@ func New(args []string) (*Core, bool) {
ctx, ctxCancel := context.WithCancel(context.Background())
p := &Core{
ctx: ctx,
ctxCancel: ctxCancel,
confPath: *argConfPath,
done: make(chan struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
confPath: *argConfPath,
pathSourceReady: make(chan *path),
done: make(chan struct{}),
}
var err error
@ -136,6 +140,11 @@ outer: @@ -136,6 +140,11 @@ outer:
break outer
}
case pa := <-p.pathSourceReady:
if p.hlsServer != nil {
p.hlsServer.OnPathSourceReady(pa)
}
case <-p.ctx.Done():
break outer
}
@ -306,6 +315,7 @@ func (p *Core) createResources(initial bool) error { @@ -306,6 +315,7 @@ func (p *Core) createResources(initial bool) error {
p.hlsServer, err = newHLSServer(
p.ctx,
p.conf.HLSAddress,
p.conf.HLSAlwaysRemux,
p.conf.HLSSegmentCount,
p.conf.HLSSegmentDuration,
p.conf.HLSAllowOrigin,
@ -426,6 +436,7 @@ func (p *Core) closeResources(newConf *conf.Conf) { @@ -426,6 +436,7 @@ func (p *Core) closeResources(newConf *conf.Conf) {
if newConf == nil ||
newConf.HLSDisable != p.conf.HLSDisable ||
newConf.HLSAddress != p.conf.HLSAddress ||
newConf.HLSAlwaysRemux != p.conf.HLSAlwaysRemux ||
newConf.HLSSegmentCount != p.conf.HLSSegmentCount ||
newConf.HLSSegmentDuration != p.conf.HLSSegmentDuration ||
newConf.HLSAllowOrigin != p.conf.HLSAllowOrigin ||
@ -493,3 +504,11 @@ func (p *Core) reloadConf() error { @@ -493,3 +504,11 @@ func (p *Core) reloadConf() error {
p.conf = newConf
return p.createResources(false)
}
// OnPathSourceReady is called by pathManager.
func (p *Core) OnPathSourceReady(pa *path) {
select {
case p.pathSourceReady <- pa:
case <-p.done:
}
}

140
internal/core/hls_remuxer.go

@ -99,6 +99,7 @@ type hlsRemuxerParent interface { @@ -99,6 +99,7 @@ type hlsRemuxerParent interface {
}
type hlsRemuxer struct {
hlsAlwaysRemux bool
hlsSegmentCount int
hlsSegmentDuration time.Duration
readBufferCount int
@ -121,6 +122,7 @@ type hlsRemuxer struct { @@ -121,6 +122,7 @@ type hlsRemuxer struct {
func newHLSRemuxer(
parentCtx context.Context,
hlsAlwaysRemux bool,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
readBufferCount int,
@ -131,7 +133,8 @@ func newHLSRemuxer( @@ -131,7 +133,8 @@ func newHLSRemuxer(
parent hlsRemuxerParent) *hlsRemuxer {
ctx, ctxCancel := context.WithCancel(parentCtx)
c := &hlsRemuxer{
r := &hlsRemuxer{
hlsAlwaysRemux: hlsAlwaysRemux,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
readBufferCount: readBufferCount,
@ -149,75 +152,78 @@ func newHLSRemuxer( @@ -149,75 +152,78 @@ func newHLSRemuxer(
request: make(chan hlsRemuxerRequest),
}
c.log(logger.Info, "opened")
r.log(logger.Info, "created")
c.wg.Add(1)
go c.run()
r.wg.Add(1)
go r.run()
return c
return r
}
// ParentClose closes a Remuxer.
func (c *hlsRemuxer) ParentClose() {
c.log(logger.Info, "closed")
func (r *hlsRemuxer) ParentClose() {
r.log(logger.Info, "destroyed")
}
func (c *hlsRemuxer) Close() {
c.ctxCancel()
func (r *hlsRemuxer) Close() {
r.ctxCancel()
}
// IsReadPublisher implements readPublisher.
func (c *hlsRemuxer) IsReadPublisher() {}
func (r *hlsRemuxer) IsReadPublisher() {}
// IsSource implements source.
func (c *hlsRemuxer) IsSource() {}
func (r *hlsRemuxer) IsSource() {}
func (c *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{c.pathName}, args...)...)
func (r *hlsRemuxer) log(level logger.Level, format string, args ...interface{}) {
r.parent.Log(level, "[remuxer %s] "+format, append([]interface{}{r.pathName}, args...)...)
}
// PathName returns the path name of the readPublisher
func (c *hlsRemuxer) PathName() string {
return c.pathName
func (r *hlsRemuxer) PathName() string {
return r.pathName
}
func (c *hlsRemuxer) run() {
defer c.wg.Done()
func (r *hlsRemuxer) run() {
defer r.wg.Done()
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
runErr := make(chan error)
go func() {
runErr <- c.runInner(innerCtx)
runErr <- r.runInner(innerCtx)
}()
select {
case err := <-runErr:
innerCtxCancel()
if err != nil {
c.log(logger.Info, "ERR: %s", err)
r.log(logger.Info, "ERR: %s", err)
}
case <-c.ctx.Done():
case <-r.ctx.Done():
innerCtxCancel()
<-runErr
}
c.ctxCancel()
r.ctxCancel()
if c.path != nil {
if r.path != nil {
res := make(chan struct{})
c.path.OnReadPublisherRemove(readPublisherRemoveReq{c, res}) //nolint:govet
r.path.OnReadPublisherRemove(readPublisherRemoveReq{
Author: r,
Res: res,
})
<-res
}
c.parent.OnRemuxerClose(c)
r.parent.OnRemuxerClose(r)
}
func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
func (r *hlsRemuxer) runInner(innerCtx context.Context) error {
pres := make(chan readPublisherSetupPlayRes)
c.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: c,
PathName: c.pathName,
r.pathMan.OnReadPublisherSetupPlay(readPublisherSetupPlayReq{
Author: r,
PathName: r.pathName,
IP: nil,
ValidateCredentials: nil,
Res: pres,
@ -228,7 +234,7 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -228,7 +234,7 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
return res.Err
}
c.path = res.Path
r.path = res.Path
var videoTrack *gortsplib.Track
videoTrackID := -1
var h264SPS []byte
@ -283,42 +289,43 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -283,42 +289,43 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
}
var err error
c.muxer, err = hls.NewMuxer(
c.hlsSegmentCount,
c.hlsSegmentDuration,
r.muxer, err = hls.NewMuxer(
r.hlsSegmentCount,
r.hlsSegmentDuration,
videoTrack,
audioTrack,
)
if err != nil {
return err
}
defer c.muxer.Close()
defer r.muxer.Close()
// start request handler only after muxer has been inizialized
requestHandlerTerminate := make(chan struct{})
requestHandlerDone := make(chan struct{})
go c.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
go r.runRequestHandler(requestHandlerTerminate, requestHandlerDone)
defer func() {
close(requestHandlerTerminate)
<-requestHandlerDone
}()
c.ringBuffer = ringbuffer.New(uint64(c.readBufferCount))
r.ringBuffer = ringbuffer.New(uint64(r.readBufferCount))
resc := make(chan readPublisherPlayRes)
c.path.OnReadPublisherPlay(readPublisherPlayReq{c, resc}) //nolint:govet
r.path.OnReadPublisherPlay(readPublisherPlayReq{
Author: r,
Res: resc,
})
<-resc
c.log(logger.Info, "is remuxing into HLS")
writerDone := make(chan error)
go func() {
writerDone <- func() error {
var videoBuf [][]byte
for {
data, ok := c.ringBuffer.Pull()
data, ok := r.ringBuffer.Pull()
if !ok {
return fmt.Errorf("terminated")
}
@ -328,14 +335,14 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -328,14 +335,14 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
var pkt rtp.Packet
err := pkt.Unmarshal(pair.buf)
if err != nil {
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
r.log(logger.Warn, "unable to decode RTP packet: %v", err)
continue
}
nalus, pts, err := h264Decoder.DecodeRTP(&pkt)
if err != nil {
if err != rtph264.ErrMorePacketsNeeded && err != rtph264.ErrNonStartingPacketAndNoPrevious {
c.log(logger.Warn, "unable to decode video track: %v", err)
r.log(logger.Warn, "unable to decode video track: %v", err)
}
continue
}
@ -360,7 +367,7 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -360,7 +367,7 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
// RTP marker means that all the NALUs with the same PTS have been received.
// send them together.
if pkt.Marker {
err := c.muxer.WriteH264(pts, videoBuf)
err := r.muxer.WriteH264(pts, videoBuf)
if err != nil {
return err
}
@ -372,19 +379,19 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -372,19 +379,19 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
var pkt rtp.Packet
err := pkt.Unmarshal(pair.buf)
if err != nil {
c.log(logger.Warn, "unable to decode RTP packet: %v", err)
r.log(logger.Warn, "unable to decode RTP packet: %v", err)
continue
}
aus, pts, err := aacDecoder.DecodeRTP(&pkt)
if err != nil {
if err != rtpaac.ErrMorePacketsNeeded {
c.log(logger.Warn, "unable to decode audio track: %v", err)
r.log(logger.Warn, "unable to decode audio track: %v", err)
}
continue
}
err = c.muxer.WriteAAC(pts, aus)
err = r.muxer.WriteAAC(pts, aus)
if err != nil {
return err
}
@ -399,9 +406,9 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -399,9 +406,9 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
for {
select {
case <-closeCheckTicker.C:
t := time.Unix(atomic.LoadInt64(c.lastRequestTime), 0)
if time.Since(t) >= closeAfterInactivity {
c.ringBuffer.Close()
t := time.Unix(atomic.LoadInt64(r.lastRequestTime), 0)
if !r.hlsAlwaysRemux && time.Since(t) >= closeAfterInactivity {
r.ringBuffer.Close()
<-writerDone
return nil
}
@ -410,14 +417,14 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error { @@ -410,14 +417,14 @@ func (c *hlsRemuxer) runInner(innerCtx context.Context) error {
return err
case <-innerCtx.Done():
c.ringBuffer.Close()
r.ringBuffer.Close()
<-writerDone
return nil
}
}
}
func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct{}) {
func (r *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct{}) {
defer close(done)
for {
@ -425,18 +432,18 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct @@ -425,18 +432,18 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct
case <-terminate:
return
case preq := <-c.request:
case preq := <-r.request:
req := preq
atomic.StoreInt64(c.lastRequestTime, time.Now().Unix())
atomic.StoreInt64(r.lastRequestTime, time.Now().Unix())
conf := c.path.Conf()
conf := r.path.Conf()
if conf.ReadIPsParsed != nil {
tmp, _, _ := net.SplitHostPort(req.Req.RemoteAddr)
ip := net.ParseIP(tmp)
if !ipEqualOrInRange(ip, conf.ReadIPsParsed) {
c.log(logger.Info, "ERR: ip '%s' not allowed", ip)
r.log(logger.Info, "ERR: ip '%s' not allowed", ip)
req.W.WriteHeader(http.StatusUnauthorized)
req.Res <- nil
continue
@ -455,7 +462,7 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct @@ -455,7 +462,7 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct
switch {
case req.File == "stream.m3u8":
r := c.muxer.Playlist()
r := r.muxer.Playlist()
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
@ -466,7 +473,7 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct @@ -466,7 +473,7 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct
req.Res <- r
case strings.HasSuffix(req.File, ".ts"):
r := c.muxer.TSFile(req.File)
r := r.muxer.TSFile(req.File)
if r == nil {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
@ -488,18 +495,27 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct @@ -488,18 +495,27 @@ func (c *hlsRemuxer) runRequestHandler(terminate chan struct{}, done chan struct
}
// OnRequest is called by hlsserver.Server.
func (c *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
func (r *hlsRemuxer) OnRequest(req hlsRemuxerRequest) {
select {
case c.request <- req:
case <-c.ctx.Done():
case r.request <- req:
case <-r.ctx.Done():
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}
// OnFrame implements path.Reader.
func (c *hlsRemuxer) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
// OnReaderAccepted implements readPublisher.
func (r *hlsRemuxer) OnReaderAccepted() {
r.log(logger.Info, "is remuxing into HLS")
}
// OnPublisherAccepted implements readPublisher.
func (r *hlsRemuxer) OnPublisherAccepted(tracksLen int) {
}
// OnFrame implements readPublisher.
func (r *hlsRemuxer) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload})
r.ringBuffer.Push(hlsRemuxerTrackIDPayloadPair{trackID, payload})
}
}

79
internal/core/hls_server.go

@ -18,6 +18,7 @@ type hlsServerParent interface { @@ -18,6 +18,7 @@ type hlsServerParent interface {
}
type hlsServer struct {
hlsAlwaysRemux bool
hlsSegmentCount int
hlsSegmentDuration time.Duration
hlsAllowOrigin string
@ -26,20 +27,22 @@ type hlsServer struct { @@ -26,20 +27,22 @@ type hlsServer struct {
pathMan *pathManager
parent hlsServerParent
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
converters map[string]*hlsRemuxer
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
remuxers map[string]*hlsRemuxer
// in
request chan hlsRemuxerRequest
connClose chan *hlsRemuxer
pathSourceReady chan *path
request chan hlsRemuxerRequest
connClose chan *hlsRemuxer
}
func newHLSServer(
parentCtx context.Context,
address string,
hlsAlwaysRemux bool,
hlsSegmentCount int,
hlsSegmentDuration time.Duration,
hlsAllowOrigin string,
@ -56,6 +59,7 @@ func newHLSServer( @@ -56,6 +59,7 @@ func newHLSServer(
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &hlsServer{
hlsAlwaysRemux: hlsAlwaysRemux,
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
hlsAllowOrigin: hlsAllowOrigin,
@ -66,7 +70,8 @@ func newHLSServer( @@ -66,7 +70,8 @@ func newHLSServer(
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
converters: make(map[string]*hlsRemuxer),
remuxers: make(map[string]*hlsRemuxer),
pathSourceReady: make(chan *path),
request: make(chan hlsRemuxerRequest),
connClose: make(chan *hlsRemuxer),
}
@ -98,25 +103,17 @@ func (s *hlsServer) run() { @@ -98,25 +103,17 @@ func (s *hlsServer) run() {
outer:
for {
select {
case req := <-s.request:
c, ok := s.converters[req.Dir]
if !ok {
c = newHLSRemuxer(
s.ctx,
s.hlsSegmentCount,
s.hlsSegmentDuration,
s.readBufferCount,
&s.wg,
s.stats,
req.Dir,
s.pathMan,
s)
s.converters[req.Dir] = c
case pa := <-s.pathSourceReady:
if s.hlsAlwaysRemux {
s.createRemuxer(pa.Name())
}
c.OnRequest(req)
case req := <-s.request:
r := s.createRemuxer(req.Dir)
r.OnRequest(req)
case c := <-s.connClose:
if c2, ok := s.converters[c.PathName()]; !ok || c2 != c {
if c2, ok := s.remuxers[c.PathName()]; !ok || c2 != c {
continue
}
s.doRemuxerClose(c)
@ -128,7 +125,7 @@ outer: @@ -128,7 +125,7 @@ outer:
s.ctxCancel()
for _, c := range s.converters {
for _, c := range s.remuxers {
s.doRemuxerClose(c)
}
@ -189,7 +186,7 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -189,7 +186,7 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 4096)
for {
n, err := res.Read(buf)
if n == 0 {
if err != nil {
return
}
@ -206,14 +203,42 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -206,14 +203,42 @@ func (s *hlsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (s *hlsServer) createRemuxer(pathName string) *hlsRemuxer {
r, ok := s.remuxers[pathName]
if !ok {
r = newHLSRemuxer(
s.ctx,
s.hlsAlwaysRemux,
s.hlsSegmentCount,
s.hlsSegmentDuration,
s.readBufferCount,
&s.wg,
s.stats,
pathName,
s.pathMan,
s)
s.remuxers[pathName] = r
}
return r
}
func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) {
delete(s.converters, c.PathName())
delete(s.remuxers, c.PathName())
c.ParentClose()
}
// OnRemuxerClose is called by hlsRemuxer.
func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) {
select {
case s.connClose <- c:
case <-s.ctx.Done():
}
}
// OnPathSourceReady is called by core.
func (s *hlsServer) OnPathSourceReady(pa *path) {
select {
case s.pathSourceReady <- pa:
case <-s.ctx.Done():
}
}

80
internal/core/path.go

@ -25,6 +25,7 @@ func newEmptyTimer() *time.Timer { @@ -25,6 +25,7 @@ func newEmptyTimer() *time.Timer {
type pathParent interface {
Log(logger.Level, string, ...interface{})
OnPathSourceReady(*path)
OnPathClose(*path)
}
@ -54,34 +55,30 @@ const ( @@ -54,34 +55,30 @@ const (
sourceStateReady
)
type reader interface {
OnFrame(int, gortsplib.StreamType, []byte)
}
type readersMap struct {
type pathReadersMap struct {
mutex sync.RWMutex
ma map[reader]struct{}
ma map[readPublisher]struct{}
}
func newReadersMap() *readersMap {
return &readersMap{
ma: make(map[reader]struct{}),
func newPathReadersMap() *pathReadersMap {
return &pathReadersMap{
ma: make(map[readPublisher]struct{}),
}
}
func (m *readersMap) add(reader reader) {
func (m *pathReadersMap) add(r readPublisher) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.ma[reader] = struct{}{}
m.ma[r] = struct{}{}
}
func (m *readersMap) remove(reader reader) {
func (m *pathReadersMap) remove(r readPublisher) {
m.mutex.Lock()
defer m.mutex.Unlock()
delete(m.ma, reader)
delete(m.ma, r)
}
func (m *readersMap) forwardFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
func (m *pathReadersMap) forwardFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
m.mutex.RLock()
defer m.mutex.RUnlock()
@ -110,8 +107,9 @@ type path struct { @@ -110,8 +107,9 @@ type path struct {
setupPlayRequests []readPublisherSetupPlayReq
source source
sourceStream *gortsplib.ServerStream
nonRTSPReaders *readersMap
nonRTSPReaders *pathReadersMap
onDemandCmd *externalcmd.Cmd
onPublishCmd *externalcmd.Cmd
describeTimer *time.Timer
sourceCloseTimer *time.Timer
sourceCloseTimerStarted bool
@ -164,7 +162,7 @@ func newPath( @@ -164,7 +162,7 @@ func newPath(
ctx: ctx,
ctxCancel: ctxCancel,
readPublishers: make(map[readPublisher]readPublisherState),
nonRTSPReaders: newReadersMap(),
nonRTSPReaders: newPathReadersMap(),
describeTimer: newEmptyTimer(),
sourceCloseTimer: newEmptyTimer(),
runOnDemandCloseTimer: newEmptyTimer(),
@ -462,11 +460,18 @@ func (pa *path) onSourceSetReady() { @@ -462,11 +460,18 @@ func (pa *path) onSourceSetReady() {
pa.scheduleSourceClose()
pa.scheduleRunOnDemandClose()
pa.scheduleClose()
pa.parent.OnPathSourceReady(pa)
}
func (pa *path) onSourceSetNotReady() {
pa.sourceState = sourceStateNotReady
if pa.onPublishCmd != nil {
pa.onPublishCmd.Close()
pa.onPublishCmd = nil
}
// close all readPublishers that are reading or waiting to read
for c, state := range pa.readPublishers {
if c != pa.source && state != readPublisherStatePreRemove {
@ -553,7 +558,7 @@ func (pa *path) onReadPublisherDescribe(req readPublisherDescribeReq) { @@ -553,7 +558,7 @@ func (pa *path) onReadPublisherDescribe(req readPublisherDescribeReq) {
}
return pa.conf.Fallback
}()
req.Res <- readPublisherDescribeRes{nil, fallbackURL, nil} //nolint:govet
req.Res <- readPublisherDescribeRes{Redirect: fallbackURL}
return
}
@ -612,6 +617,8 @@ func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) { @@ -612,6 +617,8 @@ func (pa *path) onReadPublisherPlay(req readPublisherPlayReq) {
pa.nonRTSPReaders.add(req.Author)
}
req.Author.OnReaderAccepted()
req.Res <- readPublisherPlayRes{}
}
@ -649,7 +656,7 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) { @@ -649,7 +656,7 @@ func (pa *path) onReadPublisherAnnounce(req readPublisherAnnounceReq) {
pa.source = req.Author
pa.sourceStream = gortsplib.NewServerStream(req.Tracks)
req.Res <- readPublisherAnnounceRes{pa, nil} //nolint:govet
req.Res <- readPublisherAnnounceRes{Path: pa}
}
func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) {
@ -660,8 +667,19 @@ func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) { @@ -660,8 +667,19 @@ func (pa *path) onReadPublisherRecord(req readPublisherRecordReq) {
atomic.AddInt64(pa.stats.CountPublishers, 1)
pa.readPublishers[req.Author] = readPublisherStateRecord
req.Author.OnPublisherAccepted(len(pa.sourceStream.Tracks()))
pa.onSourceSetReady()
if pa.conf.RunOnPublish != "" {
_, port, _ := net.SplitHostPort(pa.rtspAddress)
pa.onPublishCmd = externalcmd.New(pa.conf.RunOnPublish, pa.conf.RunOnPublishRestart, externalcmd.Environment{
Path: pa.name,
Port: port,
})
}
req.Res <- readPublisherRecordRes{}
}
@ -751,8 +769,8 @@ func (pa *path) Name() string { @@ -751,8 +769,8 @@ func (pa *path) Name() string {
return pa.name
}
// OnsourceExternalSetReady is called by an external source.
func (pa *path) OnsourceExternalSetReady(req sourceExtSetReadyReq) {
// OnSourceExternalSetReady is called by an external source.
func (pa *path) OnSourceExternalSetReady(req sourceExtSetReadyReq) {
select {
case pa.extSourceSetReady <- req:
case <-pa.ctx.Done():
@ -760,8 +778,8 @@ func (pa *path) OnsourceExternalSetReady(req sourceExtSetReadyReq) { @@ -760,8 +778,8 @@ func (pa *path) OnsourceExternalSetReady(req sourceExtSetReadyReq) {
}
}
// OnsourceExternalSetNotReady is called by an external source.
func (pa *path) OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq) {
// OnSourceExternalSetNotReady is called by an external source.
func (pa *path) OnSourceExternalSetNotReady(req sourceExtSetNotReadyReq) {
select {
case pa.extSourceSetNotReady <- req:
case <-pa.ctx.Done():
@ -769,7 +787,7 @@ func (pa *path) OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq) { @@ -769,7 +787,7 @@ func (pa *path) OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq) {
}
}
// OnPathManDescribe is called by pathman.PathMan.
// OnPathManDescribe is called by pathManager.
func (pa *path) OnPathManDescribe(req readPublisherDescribeReq) {
select {
case pa.describeReq <- req:
@ -778,7 +796,7 @@ func (pa *path) OnPathManDescribe(req readPublisherDescribeReq) { @@ -778,7 +796,7 @@ func (pa *path) OnPathManDescribe(req readPublisherDescribeReq) {
}
}
// OnPathManSetupPlay is called by pathman.PathMan.
// OnPathManSetupPlay is called by pathManager.
func (pa *path) OnPathManSetupPlay(req readPublisherSetupPlayReq) {
select {
case pa.setupPlayReq <- req:
@ -787,7 +805,7 @@ func (pa *path) OnPathManSetupPlay(req readPublisherSetupPlayReq) { @@ -787,7 +805,7 @@ func (pa *path) OnPathManSetupPlay(req readPublisherSetupPlayReq) {
}
}
// OnPathManAnnounce is called by pathman.PathMan.
// OnPathManAnnounce is called by pathManager.
func (pa *path) OnPathManAnnounce(req readPublisherAnnounceReq) {
select {
case pa.announceReq <- req:
@ -796,7 +814,7 @@ func (pa *path) OnPathManAnnounce(req readPublisherAnnounceReq) { @@ -796,7 +814,7 @@ func (pa *path) OnPathManAnnounce(req readPublisherAnnounceReq) {
}
}
// OnReadPublisherPlay is called by a readPublisher
// OnReadPublisherPlay is called by a readPublisher.
func (pa *path) OnReadPublisherPlay(req readPublisherPlayReq) {
select {
case pa.playReq <- req:
@ -805,7 +823,7 @@ func (pa *path) OnReadPublisherPlay(req readPublisherPlayReq) { @@ -805,7 +823,7 @@ func (pa *path) OnReadPublisherPlay(req readPublisherPlayReq) {
}
}
// OnReadPublisherRecord is called by a readPublisher
// OnReadPublisherRecord is called by a readPublisher.
func (pa *path) OnReadPublisherRecord(req readPublisherRecordReq) {
select {
case pa.recordReq <- req:
@ -814,7 +832,7 @@ func (pa *path) OnReadPublisherRecord(req readPublisherRecordReq) { @@ -814,7 +832,7 @@ func (pa *path) OnReadPublisherRecord(req readPublisherRecordReq) {
}
}
// OnReadPublisherPause is called by a readPublisher
// OnReadPublisherPause is called by a readPublisher.
func (pa *path) OnReadPublisherPause(req readPublisherPauseReq) {
select {
case pa.pauseReq <- req:
@ -823,7 +841,7 @@ func (pa *path) OnReadPublisherPause(req readPublisherPauseReq) { @@ -823,7 +841,7 @@ func (pa *path) OnReadPublisherPause(req readPublisherPauseReq) {
}
}
// OnReadPublisherRemove is called by a readPublisher
// OnReadPublisherRemove is called by a readPublisher.
func (pa *path) OnReadPublisherRemove(req readPublisherRemoveReq) {
select {
case pa.removeReq <- req:
@ -832,8 +850,8 @@ func (pa *path) OnReadPublisherRemove(req readPublisherRemoveReq) { @@ -832,8 +850,8 @@ func (pa *path) OnReadPublisherRemove(req readPublisherRemoveReq) {
}
}
// OnFrame is called by a readpublisher
func (pa *path) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
// OnFrame is called by a source.
func (pa *path) OnSourceFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
pa.sourceStream.WriteFrame(trackID, streamType, payload)
pa.nonRTSPReaders.forwardFrame(trackID, streamType, payload)

6
internal/core/path_manager.go

@ -16,6 +16,7 @@ import ( @@ -16,6 +16,7 @@ import (
type pathManagerParent interface {
Log(logger.Level, string, ...interface{})
OnPathSourceReady(*path)
}
type pathManager struct {
@ -282,6 +283,11 @@ func (pm *pathManager) OnConfReload(pathConfs map[string]*conf.PathConf) { @@ -282,6 +283,11 @@ func (pm *pathManager) OnConfReload(pathConfs map[string]*conf.PathConf) {
}
}
// OnPathSourceReady is called by path.
func (pm *pathManager) OnPathSourceReady(pa *path) {
pm.parent.OnPathSourceReady(pa)
}
// OnPathClose is called by path.
func (pm *pathManager) OnPathClose(pa *path) {
select {

4
internal/core/read_publisher.go

@ -18,7 +18,7 @@ type readPublisherPath interface { @@ -18,7 +18,7 @@ type readPublisherPath interface {
OnReadPublisherPlay(readPublisherPlayReq)
OnReadPublisherRecord(readPublisherRecordReq)
OnReadPublisherPause(readPublisherPauseReq)
OnFrame(int, gortsplib.StreamType, []byte)
OnSourceFrame(int, gortsplib.StreamType, []byte)
}
type readPublisherErrNoOnePublishing struct {
@ -53,6 +53,8 @@ type readPublisher interface { @@ -53,6 +53,8 @@ type readPublisher interface {
IsReadPublisher()
IsSource()
Close()
OnReaderAccepted()
OnPublisherAccepted(tracksLen int)
OnFrame(int, gortsplib.StreamType, []byte)
}

62
internal/core/rtmp_conn.go

@ -175,7 +175,10 @@ func (c *rtmpConn) run() { @@ -175,7 +175,10 @@ func (c *rtmpConn) run() {
if c.path != nil {
res := make(chan struct{})
c.path.OnReadPublisherRemove(readPublisherRemoveReq{c, res}) //nolint:govet
c.path.OnReadPublisherRemove(readPublisherRemoveReq{
Author: c,
Res: res,
})
<-res
}
@ -272,11 +275,12 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -272,11 +275,12 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
}()
pres := make(chan readPublisherPlayRes)
c.path.OnReadPublisherPlay(readPublisherPlayReq{c, pres}) //nolint:govet
c.path.OnReadPublisherPlay(readPublisherPlayReq{
Author: c,
Res: pres,
})
<-pres
c.log(logger.Info, "is reading from path '%s'", c.path.Name())
// disable read deadline
c.conn.NetConn().SetReadDeadline(time.Time{})
@ -436,38 +440,12 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -436,38 +440,12 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
return rres.Err
}
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
var onPublishCmd *externalcmd.Cmd
if c.path.Conf().RunOnPublish != "" {
_, port, _ := net.SplitHostPort(c.rtspAddress)
onPublishCmd = externalcmd.New(c.path.Conf().RunOnPublish,
c.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: c.path.Name(),
Port: port,
})
}
defer func(path readPublisherPath) {
if path.Conf().RunOnPublish != "" {
onPublishCmd.Close()
}
}(c.path)
rtcpSenders := rtcpsenderset.New(tracks, c.path.OnFrame)
rtcpSenders := rtcpsenderset.New(tracks, c.path.OnSourceFrame)
defer rtcpSenders.Close()
onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
c.path.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
c.path.OnSourceFrame(trackID, gortsplib.StreamTypeRTP, payload)
}
for {
@ -549,7 +527,25 @@ func (c *rtmpConn) validateCredentials( @@ -549,7 +527,25 @@ func (c *rtmpConn) validateCredentials(
return nil
}
// OnFrame implements path.Reader.
// OnReaderAccepted implements readPublisher.
func (c *rtmpConn) OnReaderAccepted() {
c.log(logger.Info, "is reading from path '%s'", c.path.Name())
}
// OnPublisherAccepted implements readPublisher.
func (c *rtmpConn) OnPublisherAccepted(tracksLen int) {
c.log(logger.Info, "is publishing to path '%s', %d %s",
c.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}())
}
// OnFrame implements readPublisher.
func (c *rtmpConn) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
if streamType == gortsplib.StreamTypeRTP {
c.ringBuffer.Push(rtmpConnTrackIDPayloadPair{trackID, payload})

14
internal/core/rtmp_source.go

@ -24,9 +24,9 @@ const ( @@ -24,9 +24,9 @@ const (
type rtmpSourceParent interface {
Log(logger.Level, string, ...interface{})
OnsourceExternalSetReady(req sourceExtSetReadyReq)
OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq)
OnFrame(int, gortsplib.StreamType, []byte)
OnSourceExternalSetReady(req sourceExtSetReadyReq)
OnSourceExternalSetNotReady(req sourceExtSetNotReadyReq)
OnSourceFrame(int, gortsplib.StreamType, []byte)
}
type rtmpSource struct {
@ -169,7 +169,7 @@ func (s *rtmpSource) runInner() bool { @@ -169,7 +169,7 @@ func (s *rtmpSource) runInner() bool {
s.log(logger.Info, "ready")
cres := make(chan sourceExtSetReadyRes)
s.parent.OnsourceExternalSetReady(sourceExtSetReadyReq{
s.parent.OnSourceExternalSetReady(sourceExtSetReadyReq{
Tracks: tracks,
Res: cres,
})
@ -177,18 +177,18 @@ func (s *rtmpSource) runInner() bool { @@ -177,18 +177,18 @@ func (s *rtmpSource) runInner() bool {
defer func() {
res := make(chan struct{})
s.parent.OnsourceExternalSetNotReady(sourceExtSetNotReadyReq{
s.parent.OnSourceExternalSetNotReady(sourceExtSetNotReadyReq{
Res: res,
})
<-res
}()
rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnFrame)
rtcpSenders := rtcpsenderset.New(tracks, s.parent.OnSourceFrame)
defer rtcpSenders.Close()
onFrame := func(trackID int, payload []byte) {
rtcpSenders.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
s.parent.OnFrame(trackID, gortsplib.StreamTypeRTP, payload)
s.parent.OnSourceFrame(trackID, gortsplib.StreamTypeRTP, payload)
}
for {

6
internal/core/rtsp_conn.go

@ -114,17 +114,17 @@ func (c *rtspConn) ip() net.IP { @@ -114,17 +114,17 @@ func (c *rtspConn) ip() net.IP {
return c.conn.NetConn().RemoteAddr().(*net.TCPAddr).IP
}
// OnRequest is called by rtspserver.Server.
// OnRequest is called by rtspServer.
func (c *rtspConn) OnRequest(req *base.Request) {
c.log(logger.Debug, "[c->s] %v", req)
}
// OnResponse is called by rtspserver.Server.
// OnResponse is called by rtspServer.
func (c *rtspConn) OnResponse(res *base.Response) {
c.log(logger.Debug, "[s->c] %v", res)
}
// OnDescribe is called by rtspserver.Server.
// OnDescribe is called by rtspServer.
func (c *rtspConn) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
resc := make(chan readPublisherDescribeRes)
c.pathMan.OnReadPublisherDescribe(readPublisherDescribeReq{

117
internal/core/rtsp_session.go

@ -39,7 +39,6 @@ type rtspSession struct { @@ -39,7 +39,6 @@ type rtspSession struct {
path readPublisherPath
setuppedTracks map[int]*gortsplib.Track // read
onReadCmd *externalcmd.Cmd // read
onPublishCmd *externalcmd.Cmd // publish
}
func newRTSPSession(
@ -66,21 +65,18 @@ func newRTSPSession( @@ -66,21 +65,18 @@ func newRTSPSession(
// ParentClose closes a Session.
func (s *rtspSession) ParentClose() {
switch s.ss.State() {
case gortsplib.ServerSessionStatePlay:
if s.ss.State() == gortsplib.ServerSessionStatePlay {
if s.onReadCmd != nil {
s.onReadCmd.Close()
}
case gortsplib.ServerSessionStateRecord:
if s.onPublishCmd != nil {
s.onPublishCmd.Close()
}
}
if s.path != nil {
res := make(chan struct{})
s.path.OnReadPublisherRemove(readPublisherRemoveReq{s, res}) //nolint:govet
s.path.OnReadPublisherRemove(readPublisherRemoveReq{
Author: s,
Res: res,
})
<-res
s.path = nil
}
@ -118,7 +114,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{} @@ -118,7 +114,7 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{}
s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...)
}
// OnAnnounce is called by rtspserver.Server.
// OnAnnounce is called by rtspServer.
func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
resc := make(chan readPublisherAnnounceRes)
s.pathMan.OnReadPublisherAnnounce(readPublisherAnnounceReq{
@ -127,7 +123,7 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -127,7 +123,7 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
Tracks: ctx.Tracks,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
ValidateCredentials: func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error {
return c.ValidateCredentials(authMethods, pathUser, pathPass, ctx.Path, ctx.Req)
return c.validateCredentials(authMethods, pathUser, pathPass, ctx.Path, ctx.Req)
},
Res: resc,
})
@ -158,7 +154,7 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -158,7 +154,7 @@ func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
}, nil
}
// OnSetup is called by rtspserver.Server.
// OnSetup is called by rtspServer.
func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
if ctx.Transport.Protocol == base.StreamProtocolUDP {
if _, ok := s.protocols[conf.ProtocolUDP]; !ok {
@ -188,7 +184,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -188,7 +184,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
PathName: ctx.Path,
IP: ctx.Conn.NetConn().RemoteAddr().(*net.TCPAddr).IP,
ValidateCredentials: func(authMethods []headers.AuthMethod, pathUser string, pathPass string) error {
return c.ValidateCredentials(authMethods, pathUser, pathPass, ctx.Path, ctx.Req)
return c.validateCredentials(authMethods, pathUser, pathPass, ctx.Path, ctx.Req)
},
Res: resc,
})
@ -241,7 +237,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -241,7 +237,7 @@ func (s *rtspSession) OnSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
}
}
// OnPlay is called by rtspserver.Server.
// OnPlay is called by rtspServer.
func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
h := make(base.Header)
@ -253,22 +249,12 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -253,22 +249,12 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
}
resc := make(chan readPublisherPlayRes)
s.path.OnReadPublisherPlay(readPublisherPlayReq{s, resc}) //nolint:govet
s.path.OnReadPublisherPlay(readPublisherPlayReq{
Author: s,
Res: resc,
})
<-resc
tracksLen := len(s.ss.SetuppedTracks())
s.log(logger.Info, "is reading from path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.displayedProtocol())
if s.path.Conf().RunOnRead != "" {
_, port, _ := net.SplitHostPort(s.rtspAddress)
s.onReadCmd = externalcmd.New(s.path.Conf().RunOnRead, s.path.Conf().RunOnReadRestart, externalcmd.Environment{
@ -284,7 +270,7 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -284,7 +270,7 @@ func (s *rtspSession) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
}, nil
}
// OnRecord is called by rtspserver.Server.
// OnRecord is called by rtspServer.
func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
if ctx.Path != s.path.Name() {
return &base.Response{
@ -302,33 +288,12 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -302,33 +288,12 @@ func (s *rtspSession) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
}, res.Err
}
tracksLen := len(s.ss.AnnouncedTracks())
s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.displayedProtocol())
if s.path.Conf().RunOnPublish != "" {
_, port, _ := net.SplitHostPort(s.rtspAddress)
s.onPublishCmd = externalcmd.New(s.path.Conf().RunOnPublish, s.path.Conf().RunOnPublishRestart, externalcmd.Environment{
Path: s.path.Name(),
Port: port,
})
}
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
// OnPause is called by rtspserver.Server.
// OnPause is called by rtspServer.
func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
switch s.ss.State() {
case gortsplib.ServerSessionStatePlay:
@ -337,16 +302,18 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res @@ -337,16 +302,18 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
}
res := make(chan struct{})
s.path.OnReadPublisherPause(readPublisherPauseReq{s, res}) //nolint:govet
s.path.OnReadPublisherPause(readPublisherPauseReq{
Author: s,
Res: res,
})
<-res
case gortsplib.ServerSessionStateRecord:
if s.onPublishCmd != nil {
s.onPublishCmd.Close()
}
res := make(chan struct{})
s.path.OnReadPublisherPause(readPublisherPauseReq{s, res}) //nolint:govet
s.path.OnReadPublisherPause(readPublisherPauseReq{
Author: s,
Res: res,
})
<-res
}
@ -355,16 +322,46 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res @@ -355,16 +322,46 @@ func (s *rtspSession) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
}, nil
}
// OnFrame implements path.Reader.
// OnReaderAccepted implements readPublisher.
func (s *rtspSession) OnReaderAccepted() {
tracksLen := len(s.ss.SetuppedTracks())
s.log(logger.Info, "is reading from path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.displayedProtocol())
}
// OnPublisherAccepted implements readPublisher.
func (s *rtspSession) OnPublisherAccepted(tracksLen int) {
s.log(logger.Info, "is publishing to path '%s', %d %s with %s",
s.path.Name(),
tracksLen,
func() string {
if tracksLen == 1 {
return "track"
}
return "tracks"
}(),
s.displayedProtocol())
}
// OnFrame implements readPublisher.
func (s *rtspSession) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.ss.WriteFrame(trackID, streamType, payload)
}
// OnIncomingFrame is called by rtspserver.Server.
// OnIncomingFrame is called by rtspServer.
func (s *rtspSession) OnIncomingFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
if s.ss.State() != gortsplib.ServerSessionStateRecord {
return
}
s.path.OnFrame(ctx.TrackID, ctx.StreamType, ctx.Payload)
s.path.OnSourceFrame(ctx.TrackID, ctx.StreamType, ctx.Payload)
}

12
internal/core/rtsp_source.go

@ -23,9 +23,9 @@ const ( @@ -23,9 +23,9 @@ const (
type rtspSourceParent interface {
Log(logger.Level, string, ...interface{})
OnsourceExternalSetReady(req sourceExtSetReadyReq)
OnsourceExternalSetNotReady(req sourceExtSetNotReadyReq)
OnFrame(int, gortsplib.StreamType, []byte)
OnSourceExternalSetReady(req sourceExtSetReadyReq)
OnSourceExternalSetNotReady(req sourceExtSetNotReadyReq)
OnSourceFrame(int, gortsplib.StreamType, []byte)
}
type rtspSource struct {
@ -188,7 +188,7 @@ func (s *rtspSource) runInner() bool { @@ -188,7 +188,7 @@ func (s *rtspSource) runInner() bool {
s.log(logger.Info, "ready")
cres := make(chan sourceExtSetReadyRes)
s.parent.OnsourceExternalSetReady(sourceExtSetReadyReq{
s.parent.OnSourceExternalSetReady(sourceExtSetReadyReq{
Tracks: conn.Tracks(),
Res: cres,
})
@ -196,7 +196,7 @@ func (s *rtspSource) runInner() bool { @@ -196,7 +196,7 @@ func (s *rtspSource) runInner() bool {
defer func() {
res := make(chan struct{})
s.parent.OnsourceExternalSetNotReady(sourceExtSetNotReadyReq{
s.parent.OnSourceExternalSetNotReady(sourceExtSetNotReadyReq{
Res: res,
})
<-res
@ -205,7 +205,7 @@ func (s *rtspSource) runInner() bool { @@ -205,7 +205,7 @@ func (s *rtspSource) runInner() bool {
readErr := make(chan error)
go func() {
readErr <- conn.ReadFrames(func(trackID int, streamType gortsplib.StreamType, payload []byte) {
s.parent.OnFrame(trackID, streamType, payload)
s.parent.OnSourceFrame(trackID, streamType, payload)
})
}()

33
internal/hls/muxer.go

@ -104,22 +104,21 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error { @@ -104,22 +104,21 @@ func (m *Muxer) WriteH264(pts time.Duration, nalus [][]byte) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if idrPresent {
if m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration {
if m.tsCurrent != nil {
m.tsCurrent.close()
}
if idrPresent &&
m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration {
if m.tsCurrent != nil {
m.tsCurrent.close()
}
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
m.tsCurrent = newTSFile(m.videoTrack != nil, m.audioTrack != nil)
m.tsByName[m.tsCurrent.name] = m.tsCurrent
m.tsQueue = append(m.tsQueue, m.tsCurrent)
if len(m.tsQueue) > m.hlsSegmentCount {
delete(m.tsByName, m.tsQueue[0].name)
m.tsQueue = m.tsQueue[1:]
m.tsDeleteCount++
}
m.tsByName[m.tsCurrent.name] = m.tsCurrent
m.tsQueue = append(m.tsQueue, m.tsCurrent)
if len(m.tsQueue) > m.hlsSegmentCount {
delete(m.tsByName, m.tsQueue[0].name)
m.tsQueue = m.tsQueue[1:]
m.tsDeleteCount++
}
}
@ -142,9 +141,9 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error { @@ -142,9 +141,9 @@ func (m *Muxer) WriteAAC(pts time.Duration, aus [][]byte) error {
defer m.mutex.Unlock()
if m.videoTrack == nil {
if m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration &&
m.audioAUCount >= segmentMinAUCount {
if m.audioAUCount >= segmentMinAUCount &&
m.tsCurrent.firstPacketWritten &&
m.tsCurrent.duration() >= m.hlsSegmentDuration {
if m.tsCurrent != nil {
m.tsCurrent.close()

3
rtsp-simple-server.yml

@ -93,6 +93,9 @@ rtmpAddress: :1935 @@ -93,6 +93,9 @@ rtmpAddress: :1935
hlsDisable: no
# address of the HLS listener.
hlsAddress: :8888
# whether to always start the HLS remuxer; otherwise, it is started only
# after an HLS stream is requested.
hlsAlwaysRemux: no
# number of HLS segments to generate.
# increasing segments allows more buffering,
# decreasing segments decrease latency.

Loading…
Cancel
Save