Browse Source

add intermediate layer between path and static sources

pull/1088/head
aler9 3 years ago
parent
commit
6afbdf1015
  1. 72
      internal/core/hls_source.go
  2. 52
      internal/core/path.go
  3. 74
      internal/core/rtmp_source.go
  4. 71
      internal/core/rtsp_source.go
  5. 20
      internal/core/source.go
  6. 11
      internal/core/source_redirect.go
  7. 165
      internal/core/source_static.go

72
internal/core/hls_source.go

@ -2,7 +2,6 @@ package core @@ -2,7 +2,6 @@ package core
import (
"context"
"sync"
"time"
"github.com/aler9/gortsplib"
@ -14,10 +13,6 @@ import ( @@ -14,10 +13,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
const (
hlsSourceRetryPause = 5 * time.Second
)
type hlsSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
@ -27,80 +22,27 @@ type hlsSourceParent interface { @@ -27,80 +22,27 @@ type hlsSourceParent interface {
type hlsSource struct {
ur string
fingerprint string
wg *sync.WaitGroup
parent hlsSourceParent
ctx context.Context
ctxCancel func()
}
func newHLSSource(
parentCtx context.Context,
ur string,
fingerprint string,
wg *sync.WaitGroup,
parent hlsSourceParent,
) *hlsSource {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &hlsSource{
return &hlsSource{
ur: ur,
fingerprint: fingerprint,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
}
s.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
return s
}
func (s *hlsSource) close() {
s.Log(logger.Info, "stopped")
s.ctxCancel()
}
func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.log(level, "[hls source] "+format, args...)
}
func (s *hlsSource) run() {
defer s.wg.Done()
outer:
for {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerErr := make(chan error)
go func() {
innerErr <- s.runInner(innerCtx)
}()
select {
case err := <-innerErr:
innerCtxCancel()
s.Log(logger.Info, "ERR: %v", err)
case <-s.ctx.Done():
innerCtxCancel()
<-innerErr
}
select {
case <-time.After(hlsSourceRetryPause):
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
}
func (s *hlsSource) runInner(innerCtx context.Context) error {
// run implements sourceStaticImpl.
func (s *hlsSource) run(ctx context.Context) error {
var stream *stream
var videoTrackID int
var audioTrackID int
@ -109,7 +51,7 @@ func (s *hlsSource) runInner(innerCtx context.Context) error { @@ -109,7 +51,7 @@ func (s *hlsSource) runInner(innerCtx context.Context) error {
defer func() {
if stream != nil {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
}
}()
@ -137,7 +79,6 @@ func (s *hlsSource) runInner(innerCtx context.Context) error { @@ -137,7 +79,6 @@ func (s *hlsSource) runInner(innerCtx context.Context) error {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
source: s,
tracks: tracks,
})
if res.err != nil {
@ -145,7 +86,6 @@ func (s *hlsSource) runInner(innerCtx context.Context) error { @@ -145,7 +86,6 @@ func (s *hlsSource) runInner(innerCtx context.Context) error {
}
s.Log(logger.Info, "ready")
stream = res.stream
return nil
@ -216,14 +156,14 @@ func (s *hlsSource) runInner(innerCtx context.Context) error { @@ -216,14 +156,14 @@ func (s *hlsSource) runInner(innerCtx context.Context) error {
case err := <-c.Wait():
return err
case <-innerCtx.Done():
case <-ctx.Done():
c.Close()
<-c.Wait()
return nil
}
}
// onSourceAPIDescribe implements source.
// onSourceAPIDescribe implements sourceStaticImpl.
func (*hlsSource) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`

52
internal/core/path.go

@ -92,13 +92,13 @@ type pathSourceStaticSetReadyRes struct { @@ -92,13 +92,13 @@ type pathSourceStaticSetReadyRes struct {
}
type pathSourceStaticSetReadyReq struct {
source sourceStatic
source source
tracks gortsplib.Tracks
res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
source sourceStatic
source source
res chan struct{}
}
@ -539,7 +539,7 @@ func (pa *path) run() { @@ -539,7 +539,7 @@ func (pa *path) run() {
}
if pa.source != nil {
if source, ok := pa.source.(sourceStatic); ok {
if source, ok := pa.source.(*sourceStatic); ok {
source.close()
pa.sourceStaticWg.Wait()
} else if source, ok := pa.source.(publisher); ok {
@ -605,7 +605,7 @@ func (pa *path) onDemandStaticSourceStop() { @@ -605,7 +605,7 @@ func (pa *path) onDemandStaticSourceStop() {
pa.onDemandStaticSourceState = pathOnDemandStateInitial
pa.source.(sourceStatic).close()
pa.source.(*sourceStatic).close()
pa.source = nil
}
@ -696,39 +696,17 @@ func (pa *path) sourceSetNotReady() { @@ -696,39 +696,17 @@ func (pa *path) sourceSetNotReady() {
}
func (pa *path) staticSourceCreate() {
switch {
case strings.HasPrefix(pa.conf.Source, "rtsp://") ||
strings.HasPrefix(pa.conf.Source, "rtsps://"):
pa.source = newRTSPSource(
pa.ctx,
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
&pa.sourceStaticWg,
pa)
case strings.HasPrefix(pa.conf.Source, "rtmp://"):
pa.source = newRTMPSource(
pa.ctx,
pa.conf.Source,
pa.readTimeout,
pa.writeTimeout,
&pa.sourceStaticWg,
pa)
case strings.HasPrefix(pa.conf.Source, "http://") ||
strings.HasPrefix(pa.conf.Source, "https://"):
pa.source = newHLSSource(
pa.ctx,
pa.conf.Source,
pa.conf.SourceFingerprint,
&pa.sourceStaticWg,
pa)
}
pa.source = newSourceStatic(
pa.ctx,
pa.conf.Source,
pa.conf.SourceProtocol,
pa.conf.SourceAnyPortEnable,
pa.conf.SourceFingerprint,
pa.readTimeout,
pa.writeTimeout,
pa.readBufferCount,
&pa.sourceStaticWg,
pa)
}
func (pa *path) doReaderRemove(r reader) {

74
internal/core/rtmp_source.go

@ -5,7 +5,6 @@ import ( @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"net/url"
"sync"
"time"
"github.com/aler9/gortsplib"
@ -20,10 +19,6 @@ import ( @@ -20,10 +19,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/message"
)
const (
rtmpSourceRetryPause = 5 * time.Second
)
type rtmpSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
@ -34,83 +29,29 @@ type rtmpSource struct { @@ -34,83 +29,29 @@ type rtmpSource struct {
ur string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
wg *sync.WaitGroup
parent rtmpSourceParent
ctx context.Context
ctxCancel func()
}
func newRTMPSource(
parentCtx context.Context,
ur string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
wg *sync.WaitGroup,
parent rtmpSourceParent,
) *rtmpSource {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &rtmpSource{
return &rtmpSource{
ur: ur,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
}
s.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
return s
}
// Close closes a Source.
func (s *rtmpSource) close() {
s.Log(logger.Info, "stopped")
s.ctxCancel()
}
func (s *rtmpSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.log(level, "[rtmp source] "+format, args...)
}
func (s *rtmpSource) run() {
defer s.wg.Done()
outer:
for {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerErr := make(chan error)
go func() {
innerErr <- s.runInner(innerCtx)
}()
select {
case err := <-innerErr:
innerCtxCancel()
s.Log(logger.Info, "ERR: %v", err)
case <-s.ctx.Done():
innerCtxCancel()
<-innerErr
}
select {
case <-time.After(rtmpSourceRetryPause):
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
}
func (s *rtmpSource) runInner(innerCtx context.Context) error {
// run implements sourceStaticImpl.
func (s *rtmpSource) run(ctx context.Context) error {
s.Log(logger.Debug, "connecting")
u, err := url.Parse(s.ur)
@ -124,7 +65,7 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error { @@ -124,7 +65,7 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error {
u.Host = net.JoinHostPort(u.Host, "1935")
}
ctx2, cancel2 := context.WithTimeout(innerCtx, time.Duration(s.readTimeout))
ctx2, cancel2 := context.WithTimeout(ctx, time.Duration(s.readTimeout))
defer cancel2()
var d net.Dialer
@ -179,7 +120,6 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error { @@ -179,7 +120,6 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
source: s,
tracks: tracks,
})
if res.err != nil {
@ -189,7 +129,7 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error { @@ -189,7 +129,7 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error {
s.Log(logger.Info, "ready")
defer func() {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
for {
@ -267,14 +207,14 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error { @@ -267,14 +207,14 @@ func (s *rtmpSource) runInner(innerCtx context.Context) error {
nconn.Close()
return err
case <-innerCtx.Done():
case <-ctx.Done():
nconn.Close()
<-readDone
return nil
}
}
// onSourceAPIDescribe implements source.
// onSourceAPIDescribe implements sourceStaticImpl.
func (*rtmpSource) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`

71
internal/core/rtsp_source.go

@ -7,7 +7,6 @@ import ( @@ -7,7 +7,6 @@ import (
"encoding/hex"
"fmt"
"strings"
"sync"
"time"
"github.com/aler9/gortsplib"
@ -18,10 +17,6 @@ import ( @@ -18,10 +17,6 @@ import (
"github.com/aler9/rtsp-simple-server/internal/logger"
)
const (
rtspSourceRetryPause = 5 * time.Second
)
type rtspSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
@ -36,15 +31,10 @@ type rtspSource struct { @@ -36,15 +31,10 @@ type rtspSource struct {
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
wg *sync.WaitGroup
parent rtspSourceParent
ctx context.Context
ctxCancel func()
}
func newRTSPSource(
parentCtx context.Context,
ur string,
proto conf.SourceProtocol,
anyPortEnable bool,
@ -52,12 +42,9 @@ func newRTSPSource( @@ -52,12 +42,9 @@ func newRTSPSource(
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
wg *sync.WaitGroup,
parent rtspSourceParent,
) *rtspSource {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &rtspSource{
return &rtspSource{
ur: ur,
proto: proto,
anyPortEnable: anyPortEnable,
@ -65,61 +52,16 @@ func newRTSPSource( @@ -65,61 +52,16 @@ func newRTSPSource(
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
}
s.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
return s
}
func (s *rtspSource) close() {
s.Log(logger.Info, "stopped")
s.ctxCancel()
}
func (s *rtspSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.log(level, "[rtsp source] "+format, args...)
}
func (s *rtspSource) run() {
defer s.wg.Done()
outer:
for {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerErr := make(chan error)
go func() {
innerErr <- s.runInner(innerCtx)
}()
select {
case err := <-innerErr:
innerCtxCancel()
s.Log(logger.Info, "ERR: %v", err)
case <-s.ctx.Done():
innerCtxCancel()
<-innerErr
}
select {
case <-time.After(rtspSourceRetryPause):
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
}
func (s *rtspSource) runInner(innerCtx context.Context) error {
// run implements sourceStaticImpl.
func (s *rtspSource) run(ctx context.Context) error {
s.Log(logger.Debug, "connecting")
var tlsConfig *tls.Config
@ -184,7 +126,6 @@ func (s *rtspSource) runInner(innerCtx context.Context) error { @@ -184,7 +126,6 @@ func (s *rtspSource) runInner(innerCtx context.Context) error {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
source: s,
tracks: c.Tracks(),
})
if res.err != nil {
@ -194,7 +135,7 @@ func (s *rtspSource) runInner(innerCtx context.Context) error { @@ -194,7 +135,7 @@ func (s *rtspSource) runInner(innerCtx context.Context) error {
s.Log(logger.Info, "ready")
defer func() {
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{})
}()
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
@ -228,14 +169,14 @@ func (s *rtspSource) runInner(innerCtx context.Context) error { @@ -228,14 +169,14 @@ func (s *rtspSource) runInner(innerCtx context.Context) error {
case err := <-readErr:
return err
case <-innerCtx.Done():
case <-ctx.Done():
c.Close()
<-readErr
return nil
}
}
// onSourceAPIDescribe implements source.
// onSourceAPIDescribe implements sourceStaticImpl.
func (*rtspSource) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`

20
internal/core/source.go

@ -3,24 +3,8 @@ package core @@ -3,24 +3,8 @@ package core
// source is an entity that can provide a stream.
// it can be:
// - a publisher
// - a static source
// - a redirect source
// - sourceStatic
// - sourceRedirect
type source interface {
onSourceAPIDescribe() interface{}
}
// sourceStatic is an entity that can provide a static stream.
type sourceStatic interface {
source
close()
}
// sourceRedirect is a source that redirects to another one.
type sourceRedirect struct{}
// onSourceAPIDescribe implements source.
func (*sourceRedirect) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`
}{"redirect"}
}

11
internal/core/source_redirect.go

@ -0,0 +1,11 @@ @@ -0,0 +1,11 @@
package core
// sourceRedirect is a source that redirects to another one.
type sourceRedirect struct{}
// onSourceAPIDescribe implements source.
func (*sourceRedirect) onSourceAPIDescribe() interface{} {
return struct {
Type string `json:"type"`
}{"redirect"}
}

165
internal/core/source_static.go

@ -0,0 +1,165 @@ @@ -0,0 +1,165 @@
package core
import (
"context"
"strings"
"sync"
"time"
"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger"
)
const (
sourceStaticRetryPause = 5 * time.Second
)
type sourceStaticImpl interface {
Log(logger.Level, string, ...interface{})
run(context.Context) error
onSourceAPIDescribe() interface{}
}
type sourceStaticParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
}
// sourceStatic is a static source.
type sourceStatic struct {
ur string
protocol conf.SourceProtocol
anyPortEnable bool
fingerprint string
readTimeout conf.StringDuration
writeTimeout conf.StringDuration
readBufferCount int
wg *sync.WaitGroup
parent sourceStaticParent
impl sourceStaticImpl
ctx context.Context
ctxCancel func()
}
func newSourceStatic(
parentCtx context.Context,
ur string,
protocol conf.SourceProtocol,
anyPortEnable bool,
fingerprint string,
readTimeout conf.StringDuration,
writeTimeout conf.StringDuration,
readBufferCount int,
wg *sync.WaitGroup,
parent sourceStaticParent,
) *sourceStatic {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &sourceStatic{
ur: ur,
protocol: protocol,
anyPortEnable: anyPortEnable,
fingerprint: fingerprint,
readTimeout: readTimeout,
writeTimeout: writeTimeout,
readBufferCount: readBufferCount,
wg: wg,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
}
switch {
case strings.HasPrefix(s.ur, "rtsp://") ||
strings.HasPrefix(s.ur, "rtsps://"):
s.impl = newRTSPSource(
s.ur,
s.protocol,
s.anyPortEnable,
s.fingerprint,
s.readTimeout,
s.writeTimeout,
s.readBufferCount,
s)
case strings.HasPrefix(s.ur, "rtmp://"):
s.impl = newRTMPSource(
s.ur,
s.readTimeout,
s.writeTimeout,
s)
case strings.HasPrefix(s.ur, "http://") ||
strings.HasPrefix(s.ur, "https://"):
s.impl = newHLSSource(
s.ur,
s.fingerprint,
s)
}
s.impl.Log(logger.Info, "started")
s.wg.Add(1)
go s.run()
return s
}
func (s *sourceStatic) close() {
s.impl.Log(logger.Info, "stopped")
s.ctxCancel()
}
func (s *sourceStatic) log(level logger.Level, format string, args ...interface{}) {
s.parent.log(level, format, args...)
}
func (s *sourceStatic) run() {
defer s.wg.Done()
outer:
for {
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
innerErr := make(chan error)
go func() {
innerErr <- s.impl.run(innerCtx)
}()
select {
case err := <-innerErr:
innerCtxCancel()
s.impl.Log(logger.Info, "ERR: %v", err)
case <-s.ctx.Done():
innerCtxCancel()
<-innerErr
}
select {
case <-time.After(sourceStaticRetryPause):
case <-s.ctx.Done():
break outer
}
}
s.ctxCancel()
}
// onSourceAPIDescribe implements source.
func (s *sourceStatic) onSourceAPIDescribe() interface{} {
return s.impl.onSourceAPIDescribe()
}
// onSourceStaticSetReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.source = s
return s.parent.onSourceStaticSetReady(req)
}
// onSourceStaticSetNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.source = s
s.parent.onSourceStaticSetNotReady(req)
}
Loading…
Cancel
Save