Browse Source

use contexts anywhere is possible

pull/442/head
aler9 5 years ago
parent
commit
e558b245e7
  1. 2
      go.mod
  2. 4
      go.sum
  3. 49
      internal/hlsconverter/converter.go
  4. 84
      internal/hlsserver/server.go
  5. 164
      internal/path/path.go
  6. 109
      internal/pathman/pathman.go
  7. 23
      internal/rtmpconn/conn.go
  8. 74
      internal/rtmpserver/server.go
  9. 18
      internal/rtmpsource/source.go
  10. 54
      internal/rtspserver/server.go
  11. 28
      internal/rtspsource/source.go
  12. 18
      main.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210510211300-0d6385640fad
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9

4
go.sum

@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210510211300-0d6385640fad h1:u6DNZKoG3gm5pMz+K1duYS5pqbrq6iuiDB1kJc4N0d4=
github.com/aler9/gortsplib v0.0.0-20210510211300-0d6385640fad/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251 h1:pG9MnD5MOIpqcOBccQ5L4ZAhJGBcN7LKO87ilADzIYI=
github.com/aler9/gortsplib v0.0.0-20210511094934-fa2830eb2251/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

49
internal/hlsconverter/converter.go

@ -136,6 +136,8 @@ type Converter struct { @@ -136,6 +136,8 @@ type Converter struct {
pathMan PathMan
parent Parent
ctx context.Context
ctxCancel func()
path readpublisher.Path
ringBuffer *ringbuffer.RingBuffer
tsQueue []*tsFile
@ -145,9 +147,7 @@ type Converter struct { @@ -145,9 +147,7 @@ type Converter struct {
lastRequestTime int64
// in
request chan Request
terminate chan struct{}
parentTerminate chan struct{}
request chan Request
}
// New allocates a Converter.
@ -161,6 +161,8 @@ func New( @@ -161,6 +161,8 @@ func New(
pathMan PathMan,
parent Parent) *Converter {
ctx, ctxCancel := context.WithCancel(context.Background())
c := &Converter{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
@ -170,11 +172,11 @@ func New( @@ -170,11 +172,11 @@ func New(
pathName: pathName,
pathMan: pathMan,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
lastRequestTime: time.Now().Unix(),
tsByName: make(map[string]*tsFile),
request: make(chan Request),
terminate: make(chan struct{}, 1),
parentTerminate: make(chan struct{}),
}
c.log(logger.Info, "opened")
@ -188,15 +190,11 @@ func New( @@ -188,15 +190,11 @@ func New(
// ParentClose closes a Converter.
func (c *Converter) ParentClose() {
c.log(logger.Info, "closed")
close(c.parentTerminate)
}
// Close closes a Converter.
func (c *Converter) Close() {
select {
case c.terminate <- struct{}{}:
default:
}
c.ctxCancel()
}
// IsReadPublisher implements readpublisher.ReadPublisher.
@ -217,28 +215,23 @@ func (c *Converter) PathName() string { @@ -217,28 +215,23 @@ func (c *Converter) PathName() string {
func (c *Converter) run() {
defer c.wg.Done()
ctx, cancel := context.WithCancel(context.Background())
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
runErr := make(chan error)
go func() {
runErr <- c.runInner(ctx)
runErr <- c.runInner(innerCtx)
}()
select {
case err := <-runErr:
cancel()
innerCtxCancel()
c.log(logger.Info, "ERR: %s", err)
case <-c.terminate:
cancel()
case <-c.ctx.Done():
innerCtxCancel()
<-runErr
}
go func() {
for req := range c.request {
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}()
c.ctxCancel()
if c.path != nil {
res := make(chan struct{})
@ -247,12 +240,9 @@ func (c *Converter) run() { @@ -247,12 +240,9 @@ func (c *Converter) run() {
}
c.parent.OnConverterClose(c)
<-c.parentTerminate
close(c.request)
}
func (c *Converter) runInner(ctx context.Context) error {
func (c *Converter) runInner(innerCtx context.Context) error {
pres := make(chan readpublisher.SetupPlayRes)
c.pathMan.OnReadPublisherSetupPlay(readpublisher.SetupPlayReq{
Author: c,
@ -504,7 +494,7 @@ func (c *Converter) runInner(ctx context.Context) error { @@ -504,7 +494,7 @@ func (c *Converter) runInner(ctx context.Context) error {
case err := <-writerDone:
return err
case <-ctx.Done():
case <-innerCtx.Done():
return nil
}
}
@ -598,7 +588,12 @@ func (c *Converter) runRequestHandler(terminate chan struct{}, done chan struct{ @@ -598,7 +588,12 @@ func (c *Converter) runRequestHandler(terminate chan struct{}, done chan struct{
// OnRequest is called by hlsserver.Server.
func (c *Converter) OnRequest(req Request) {
c.request <- req
select {
case c.request <- req:
case <-c.ctx.Done():
req.W.WriteHeader(http.StatusNotFound)
req.Res <- nil
}
}
// OnFrame implements path.Reader.

84
internal/hlsserver/server.go

@ -29,17 +29,15 @@ type Server struct { @@ -29,17 +29,15 @@ type Server struct {
pathMan *pathman.PathManager
parent Parent
ln net.Listener
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
ln net.Listener
converters map[string]*hlsconverter.Converter
// in
request chan hlsconverter.Request
connClose chan *hlsconverter.Converter
terminate chan struct{}
// out
done chan struct{}
}
// New allocates a Server.
@ -58,6 +56,8 @@ func New( @@ -58,6 +56,8 @@ func New(
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())
s := &Server{
hlsSegmentCount: hlsSegmentCount,
hlsSegmentDuration: hlsSegmentDuration,
@ -65,16 +65,17 @@ func New( @@ -65,16 +65,17 @@ func New(
stats: stats,
pathMan: pathMan,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
ln: ln,
converters: make(map[string]*hlsconverter.Converter),
request: make(chan hlsconverter.Request),
connClose: make(chan *hlsconverter.Converter),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
s.Log(logger.Info, "listener opened on "+address)
s.wg.Add(1)
go s.run()
return s, nil
@ -87,12 +88,12 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { @@ -87,12 +88,12 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) {
// Close closes all the server resources.
func (s *Server) Close() {
close(s.terminate)
<-s.done
s.ctxCancel()
s.wg.Wait()
}
func (s *Server) run() {
defer close(s.done)
defer s.wg.Done()
hs := &http.Server{Handler: s}
go hs.Serve(s.ln)
@ -122,36 +123,18 @@ outer: @@ -122,36 +123,18 @@ outer:
}
s.doConverterClose(c)
case <-s.terminate:
case <-s.ctx.Done():
break outer
}
}
go func() {
for {
select {
case req, ok := <-s.request:
if !ok {
return
}
req.Res <- nil
case _, ok := <-s.connClose:
if !ok {
return
}
}
}
}()
s.ctxCancel()
for _, c := range s.converters {
s.doConverterClose(c)
}
hs.Shutdown(context.Background())
close(s.request)
close(s.connClose)
}
// ServeHTTP implements http.Handler.
@ -174,30 +157,36 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -174,30 +157,36 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
cres := make(chan io.Reader)
s.request <- hlsconverter.Request{
hreq := hlsconverter.Request{
Path: parts[0],
Subpath: parts[1],
Req: r,
W: w,
Res: cres,
}
res := <-cres
if res != nil {
buf := make([]byte, 4096)
for {
n, err := res.Read(buf)
if err != nil {
return
}
_, err = w.Write(buf[:n])
if err != nil {
return
}
select {
case s.request <- hreq:
res := <-cres
if res != nil {
buf := make([]byte, 4096)
for {
n, err := res.Read(buf)
if err != nil {
return
}
w.(http.Flusher).Flush()
_, err = w.Write(buf[:n])
if err != nil {
return
}
w.(http.Flusher).Flush()
}
}
case <-s.ctx.Done():
}
}
@ -208,5 +197,8 @@ func (s *Server) doConverterClose(c *hlsconverter.Converter) { @@ -208,5 +197,8 @@ func (s *Server) doConverterClose(c *hlsconverter.Converter) {
// OnConverterClose is called by hlsconverter.Converter.
func (s *Server) OnConverterClose(c *hlsconverter.Converter) {
s.connClose <- c
select {
case s.connClose <- c:
case <-s.ctx.Done():
}
}

164
internal/path/path.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package path
import (
"context"
"fmt"
"net"
"strings"
@ -70,8 +71,9 @@ type Path struct { @@ -70,8 +71,9 @@ type Path struct {
stats *stats.Stats
parent Parent
ctx context.Context
ctxCancel func()
readPublishers map[readpublisher.ReadPublisher]readPublisherState
readPublishersWg sync.WaitGroup
describeRequests []readpublisher.DescribeReq
setupPlayRequests []readpublisher.SetupPlayReq
source source.Source
@ -99,7 +101,6 @@ type Path struct { @@ -99,7 +101,6 @@ type Path struct {
recordReq chan readpublisher.RecordReq
pauseReq chan readpublisher.PauseReq
removeReq chan readpublisher.RemoveReq
terminate chan struct{}
}
// New allocates a Path.
@ -116,6 +117,8 @@ func New( @@ -116,6 +117,8 @@ func New(
stats *stats.Stats,
parent Parent) *Path {
ctx, ctxCancel := context.WithCancel(context.Background())
pa := &Path{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
@ -128,6 +131,8 @@ func New( @@ -128,6 +131,8 @@ func New(
wg: wg,
stats: stats,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
readPublishers: make(map[readpublisher.ReadPublisher]readPublisherState),
readers: newReadersMap(),
describeTimer: newEmptyTimer(),
@ -143,7 +148,6 @@ func New( @@ -143,7 +148,6 @@ func New(
recordReq: make(chan readpublisher.RecordReq),
pauseReq: make(chan readpublisher.PauseReq),
removeReq: make(chan readpublisher.RemoveReq),
terminate: make(chan struct{}),
}
pa.wg.Add(1)
@ -153,7 +157,7 @@ func New( @@ -153,7 +157,7 @@ func New(
// Close closes a path.
func (pa *Path) Close() {
close(pa.terminate)
pa.ctxCancel()
}
// Log is the main logging function.
@ -218,9 +222,6 @@ outer: @@ -218,9 +222,6 @@ outer:
pa.scheduleClose()
case <-pa.closeTimer.C:
pa.exhaustChannels()
pa.parent.OnPathClose(pa)
<-pa.terminate
break outer
case req := <-pa.extSourceSetReady:
@ -262,15 +263,15 @@ outer: @@ -262,15 +263,15 @@ outer:
}
delete(pa.readPublishers, req.Author)
pa.readPublishersWg.Done()
close(req.Res)
case <-pa.terminate:
pa.exhaustChannels()
case <-pa.ctx.Done():
break outer
}
}
pa.ctxCancel()
pa.describeTimer.Stop()
pa.sourceCloseTimer.Stop()
pa.runOnDemandCloseTimer.Stop()
@ -312,86 +313,8 @@ outer: @@ -312,86 +313,8 @@ outer:
c.Close()
}
}
pa.readPublishersWg.Wait()
close(pa.extSourceSetReady)
close(pa.extSourceSetNotReady)
close(pa.describeReq)
close(pa.setupPlayReq)
close(pa.announceReq)
close(pa.playReq)
close(pa.recordReq)
close(pa.pauseReq)
close(pa.removeReq)
}
func (pa *Path) exhaustChannels() {
go func() {
for {
select {
case req, ok := <-pa.extSourceSetReady:
if !ok {
return
}
close(req.Res)
case req, ok := <-pa.extSourceSetNotReady:
if !ok {
return
}
close(req.Res)
case req, ok := <-pa.describeReq:
if !ok {
return
}
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.setupPlayReq:
if !ok {
return
}
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.announceReq:
if !ok {
return
}
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pa.playReq:
if !ok {
return
}
close(req.Res)
case req, ok := <-pa.recordReq:
if !ok {
return
}
close(req.Res)
case req, ok := <-pa.pauseReq:
if !ok {
return
}
close(req.Res)
case req, ok := <-pa.removeReq:
if !ok {
return
}
if _, ok := pa.readPublishers[req.Author]; !ok {
close(req.Res)
continue
}
pa.readPublishersWg.Done()
close(req.Res)
}
}
}()
pa.parent.OnPathClose(pa)
}
func (pa *Path) hasExternalSource() bool {
@ -446,7 +369,6 @@ func (pa *Path) hasReadPublishersNotSources() bool { @@ -446,7 +369,6 @@ func (pa *Path) hasReadPublishersNotSources() bool {
func (pa *Path) addReadPublisher(c readpublisher.ReadPublisher, state readPublisherState) {
pa.readPublishers[c] = state
pa.readPublishersWg.Add(1)
}
func (pa *Path) removeReadPublisher(c readpublisher.ReadPublisher) {
@ -781,47 +703,83 @@ func (pa *Path) Name() string { @@ -781,47 +703,83 @@ func (pa *Path) Name() string {
// OnExtSourceSetReady is called by an external source.
func (pa *Path) OnExtSourceSetReady(req source.ExtSetReadyReq) {
pa.extSourceSetReady <- req
select {
case pa.extSourceSetReady <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnExtSourceSetNotReady is called by an external source.
func (pa *Path) OnExtSourceSetNotReady(req source.ExtSetNotReadyReq) {
pa.extSourceSetNotReady <- req
select {
case pa.extSourceSetNotReady <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnPathManDescribe is called by pathman.PathMan.
func (pa *Path) OnPathManDescribe(req readpublisher.DescribeReq) {
pa.describeReq <- req
select {
case pa.describeReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
}
}
// OnPathManSetupPlay is called by pathman.PathMan.
func (pa *Path) OnPathManSetupPlay(req readpublisher.SetupPlayReq) {
pa.setupPlayReq <- req
select {
case pa.setupPlayReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
}
}
// OnPathManAnnounce is called by pathman.PathMan.
func (pa *Path) OnPathManAnnounce(req readpublisher.AnnounceReq) {
pa.announceReq <- req
}
// OnReadPublisherRemove is called by a readpublisher.
func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
pa.removeReq <- req
select {
case pa.announceReq <- req:
case <-pa.ctx.Done():
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
}
}
// OnReadPublisherPlay is called by a readpublisher.
func (pa *Path) OnReadPublisherPlay(req readpublisher.PlayReq) {
pa.playReq <- req
select {
case pa.playReq <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnReadPublisherRecord is called by a readpublisher.
func (pa *Path) OnReadPublisherRecord(req readpublisher.RecordReq) {
pa.recordReq <- req
select {
case pa.recordReq <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnReadPublisherPause is called by a readpublisher.
func (pa *Path) OnReadPublisherPause(req readpublisher.PauseReq) {
pa.pauseReq <- req
select {
case pa.pauseReq <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnReadPublisherRemove is called by a readpublisher.
func (pa *Path) OnReadPublisherRemove(req readpublisher.RemoveReq) {
select {
case pa.removeReq <- req:
case <-pa.ctx.Done():
close(req.Res)
}
}
// OnSPFrame is called by streamproc.StreamProc.

109
internal/pathman/pathman.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package pathman
import (
"context"
"fmt"
"net"
"sync"
@ -50,8 +51,10 @@ type PathManager struct { @@ -50,8 +51,10 @@ type PathManager struct {
stats *stats.Stats
parent Parent
paths map[string]*path.Path
wg sync.WaitGroup
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
paths map[string]*path.Path
// in
confReload chan map[string]*conf.PathConf
@ -59,10 +62,6 @@ type PathManager struct { @@ -59,10 +62,6 @@ type PathManager struct {
rpDescribe chan readpublisher.DescribeReq
rpSetupPlay chan readpublisher.SetupPlayReq
rpAnnounce chan readpublisher.AnnounceReq
terminate chan struct{}
// out
done chan struct{}
}
// New allocates a PathManager.
@ -77,6 +76,8 @@ func New( @@ -77,6 +76,8 @@ func New(
stats *stats.Stats,
parent Parent) *PathManager {
ctx, ctxCancel := context.WithCancel(context.Background())
pm := &PathManager{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
@ -87,26 +88,28 @@ func New( @@ -87,26 +88,28 @@ func New(
pathConfs: pathConfs,
stats: stats,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
paths: make(map[string]*path.Path),
confReload: make(chan map[string]*conf.PathConf),
pathClose: make(chan *path.Path),
rpDescribe: make(chan readpublisher.DescribeReq),
rpSetupPlay: make(chan readpublisher.SetupPlayReq),
rpAnnounce: make(chan readpublisher.AnnounceReq),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
pm.createPaths()
pm.wg.Add(1)
go pm.run()
return pm
}
// Close closes a PathManager.
func (pm *PathManager) Close() {
close(pm.terminate)
<-pm.done
pm.ctxCancel()
pm.wg.Wait()
}
// Log is the main logging function.
@ -115,7 +118,7 @@ func (pm *PathManager) Log(level logger.Level, format string, args ...interface{ @@ -115,7 +118,7 @@ func (pm *PathManager) Log(level logger.Level, format string, args ...interface{
}
func (pm *PathManager) run() {
defer close(pm.done)
defer pm.wg.Done()
outer:
for {
@ -155,7 +158,7 @@ outer: @@ -155,7 +158,7 @@ outer:
pm.createPaths()
case pa := <-pm.pathClose:
if _, ok := pm.paths[pa.Name()]; !ok {
if pmpa, ok := pm.paths[pa.Name()]; !ok || pmpa != pa {
continue
}
delete(pm.paths, pa.Name())
@ -242,55 +245,16 @@ outer: @@ -242,55 +245,16 @@ outer:
pm.paths[req.PathName].OnPathManAnnounce(req)
case <-pm.terminate:
case <-pm.ctx.Done():
break outer
}
}
go func() {
for {
select {
case _, ok := <-pm.confReload:
if !ok {
return
}
case _, ok := <-pm.pathClose:
if !ok {
return
}
case req, ok := <-pm.rpDescribe:
if !ok {
return
}
req.Res <- readpublisher.DescribeRes{nil, "", fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.rpSetupPlay:
if !ok {
return
}
req.Res <- readpublisher.SetupPlayRes{nil, nil, fmt.Errorf("terminated")} //nolint:govet
case req, ok := <-pm.rpAnnounce:
if !ok {
return
}
req.Res <- readpublisher.AnnounceRes{nil, fmt.Errorf("terminated")} //nolint:govet
}
}
}()
pm.ctxCancel()
for _, pa := range pm.paths {
pa.Close()
}
pm.wg.Wait()
close(pm.confReload)
close(pm.pathClose)
close(pm.rpDescribe)
close(pm.rpSetupPlay)
close(pm.rpAnnounce)
}
func (pm *PathManager) createPath(confName string, conf *conf.PathConf, name string) {
@ -339,27 +303,56 @@ func (pm *PathManager) findPathConf(name string) (string, *conf.PathConf, error) @@ -339,27 +303,56 @@ func (pm *PathManager) findPathConf(name string) (string, *conf.PathConf, error)
// OnProgramConfReload is called by program.
func (pm *PathManager) OnProgramConfReload(pathConfs map[string]*conf.PathConf) {
pm.confReload <- pathConfs
select {
case pm.confReload <- pathConfs:
case <-pm.ctx.Done():
}
}
// OnPathClose is called by path.Path.
func (pm *PathManager) OnPathClose(pa *path.Path) {
pm.pathClose <- pa
select {
case pm.pathClose <- pa:
case <-pm.ctx.Done():
}
}
// OnReadPublisherDescribe is called by a ReadPublisher.
func (pm *PathManager) OnReadPublisherDescribe(req readpublisher.DescribeReq) {
pm.rpDescribe <- req
select {
case pm.rpDescribe <- req:
case <-pm.ctx.Done():
req.Res <- readpublisher.DescribeRes{
SDP: nil,
Redirect: "",
Err: fmt.Errorf("terminated"),
}
}
}
// OnReadPublisherAnnounce is called by a ReadPublisher.
func (pm *PathManager) OnReadPublisherAnnounce(req readpublisher.AnnounceReq) {
pm.rpAnnounce <- req
select {
case pm.rpAnnounce <- req:
case <-pm.ctx.Done():
req.Res <- readpublisher.AnnounceRes{
Path: nil,
Err: fmt.Errorf("terminated"),
}
}
}
// OnReadPublisherSetupPlay is called by a ReadPublisher.
func (pm *PathManager) OnReadPublisherSetupPlay(req readpublisher.SetupPlayReq) {
pm.rpSetupPlay <- req
select {
case pm.rpSetupPlay <- req:
case <-pm.ctx.Done():
req.Res <- readpublisher.SetupPlayRes{
Path: nil,
Tracks: nil,
Err: fmt.Errorf("terminated"),
}
}
}
func (pm *PathManager) authenticate(

23
internal/rtmpconn/conn.go

@ -75,12 +75,10 @@ type Conn struct { @@ -75,12 +75,10 @@ type Conn struct {
pathMan PathMan
parent Parent
ctx context.Context
ctxCancel func()
path readpublisher.Path
ringBuffer *ringbuffer.RingBuffer // read
// in
terminate chan struct{}
parentTerminate chan struct{}
}
// New allocates a Conn.
@ -97,6 +95,8 @@ func New( @@ -97,6 +95,8 @@ func New(
pathMan PathMan,
parent Parent) *Conn {
ctx, ctxCancel := context.WithCancel(context.Background())
c := &Conn{
rtspAddress: rtspAddress,
readTimeout: readTimeout,
@ -109,8 +109,8 @@ func New( @@ -109,8 +109,8 @@ func New(
conn: rtmp.NewServerConn(nconn),
pathMan: pathMan,
parent: parent,
terminate: make(chan struct{}, 1),
parentTerminate: make(chan struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
}
c.log(logger.Info, "opened")
@ -124,15 +124,11 @@ func New( @@ -124,15 +124,11 @@ func New(
// ParentClose closes a Conn.
func (c *Conn) ParentClose() {
c.log(logger.Info, "closed")
close(c.parentTerminate)
}
// Close closes a Conn.
func (c *Conn) Close() {
select {
case c.terminate <- struct{}{}:
default:
}
c.ctxCancel()
}
// IsReadPublisher implements readpublisher.ReadPublisher.
@ -175,11 +171,13 @@ func (c *Conn) run() { @@ -175,11 +171,13 @@ func (c *Conn) run() {
c.log(logger.Info, "ERR: %s", err)
}
case <-c.terminate:
case <-c.ctx.Done():
cancel()
<-runErr
}
c.ctxCancel()
if c.path != nil {
res := make(chan struct{})
c.path.OnReadPublisherRemove(readpublisher.RemoveReq{c, res}) //nolint:govet
@ -187,7 +185,6 @@ func (c *Conn) run() { @@ -187,7 +185,6 @@ func (c *Conn) run() {
}
c.parent.OnConnClose(c)
<-c.parentTerminate
}
func (c *Conn) runInner(ctx context.Context) error {

74
internal/rtmpserver/server.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package rtmpserver
import (
"context"
"net"
"sync"
"time"
@ -28,16 +29,14 @@ type Server struct { @@ -28,16 +29,14 @@ type Server struct {
pathMan *pathman.PathManager
parent Parent
l net.Listener
wg sync.WaitGroup
conns map[*rtmpconn.Conn]struct{}
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
l net.Listener
conns map[*rtmpconn.Conn]struct{}
// in
connClose chan *rtmpconn.Conn
terminate chan struct{}
// out
done chan struct{}
}
// New allocates a Server.
@ -58,6 +57,8 @@ func New( @@ -58,6 +57,8 @@ func New(
return nil, err
}
ctx, ctxCancel := context.WithCancel(context.Background())
s := &Server{
readTimeout: readTimeout,
writeTimeout: writeTimeout,
@ -68,15 +69,16 @@ func New( @@ -68,15 +69,16 @@ func New(
stats: stats,
pathMan: pathMan,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
l: l,
conns: make(map[*rtmpconn.Conn]struct{}),
connClose: make(chan *rtmpconn.Conn),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
s.Log(logger.Info, "listener opened on %s", address)
s.wg.Add(1)
go s.run()
return s, nil
@ -89,28 +91,37 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { @@ -89,28 +91,37 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) {
// Close closes a Server.
func (s *Server) Close() {
close(s.terminate)
<-s.done
s.ctxCancel()
s.wg.Wait()
}
func (s *Server) run() {
defer close(s.done)
defer s.wg.Done()
s.wg.Add(1)
connNew := make(chan net.Conn)
acceptErr := make(chan error)
go func() {
defer s.wg.Done()
acceptErr <- func() error {
err := func() error {
for {
conn, err := s.l.Accept()
if err != nil {
return err
}
connNew <- conn
select {
case connNew <- conn:
case <-s.ctx.Done():
conn.Close()
}
}
}()
select {
case acceptErr <- err:
case <-s.ctx.Done():
}
}()
outer:
@ -141,44 +152,18 @@ outer: @@ -141,44 +152,18 @@ outer:
}
s.doConnClose(c)
case <-s.terminate:
case <-s.ctx.Done():
break outer
}
}
go func() {
for {
select {
case _, ok := <-acceptErr:
if !ok {
return
}
case conn, ok := <-connNew:
if !ok {
return
}
conn.Close()
case _, ok := <-s.connClose:
if !ok {
return
}
}
}
}()
s.ctxCancel()
s.l.Close()
for c := range s.conns {
s.doConnClose(c)
}
s.wg.Wait()
close(acceptErr)
close(connNew)
close(s.connClose)
}
func (s *Server) doConnClose(c *rtmpconn.Conn) {
@ -189,5 +174,8 @@ func (s *Server) doConnClose(c *rtmpconn.Conn) { @@ -189,5 +174,8 @@ func (s *Server) doConnClose(c *rtmpconn.Conn) {
// OnConnClose is called by rtmpconn.Conn.
func (s *Server) OnConnClose(c *rtmpconn.Conn) {
s.connClose <- c
select {
case s.connClose <- c:
case <-s.ctx.Done():
}
}

18
internal/rtmpsource/source.go

@ -40,8 +40,8 @@ type Source struct { @@ -40,8 +40,8 @@ type Source struct {
stats *stats.Stats
parent Parent
// in
terminate chan struct{}
ctx context.Context
ctxCancel func()
}
// New allocates a Source.
@ -51,6 +51,9 @@ func New(ur string, @@ -51,6 +51,9 @@ func New(ur string,
wg *sync.WaitGroup,
stats *stats.Stats,
parent Parent) *Source {
ctx, ctxCancel := context.WithCancel(context.Background())
s := &Source{
ur: ur,
readTimeout: readTimeout,
@ -58,7 +61,8 @@ func New(ur string, @@ -58,7 +61,8 @@ func New(ur string,
wg: wg,
stats: stats,
parent: parent,
terminate: make(chan struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
}
atomic.AddInt64(s.stats.CountSourcesRTMP, +1)
@ -73,7 +77,7 @@ func New(ur string, @@ -73,7 +77,7 @@ func New(ur string,
func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRTMPRunning, -1)
s.log(logger.Info, "stopped")
close(s.terminate)
s.ctxCancel()
}
// IsSource implements source.Source.
@ -99,7 +103,7 @@ func (s *Source) run() { @@ -99,7 +103,7 @@ func (s *Source) run() {
select {
case <-time.After(retryPause):
return true
case <-s.terminate:
case <-s.ctx.Done():
return false
}
}()
@ -107,6 +111,8 @@ func (s *Source) run() { @@ -107,6 +111,8 @@ func (s *Source) run() {
break
}
}
s.ctxCancel()
}
func (s *Source) runInner() bool {
@ -266,7 +272,7 @@ func (s *Source) runInner() bool { @@ -266,7 +272,7 @@ func (s *Source) runInner() bool {
s.log(logger.Info, "ERR: %s", err)
return true
case <-s.terminate:
case <-s.ctx.Done():
cancel()
<-runErr
return false

54
internal/rtspserver/server.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package rtspserver
import (
"context"
"crypto/rand"
"crypto/tls"
"encoding/binary"
@ -59,16 +60,13 @@ type Server struct { @@ -59,16 +60,13 @@ type Server struct {
pathMan *pathman.PathManager
parent Parent
srv *gortsplib.Server
mutex sync.RWMutex
conns map[*gortsplib.ServerConn]*rtspconn.Conn
sessions map[*gortsplib.ServerSession]*rtspsession.Session
// in
terminate chan struct{}
// out
done chan struct{}
ctx context.Context
ctxCancel func()
wg sync.WaitGroup
srv *gortsplib.Server
mutex sync.RWMutex
conns map[*gortsplib.ServerConn]*rtspconn.Conn
sessions map[*gortsplib.ServerSession]*rtspsession.Session
}
// New allocates a Server.
@ -92,6 +90,8 @@ func New( @@ -92,6 +90,8 @@ func New(
pathMan *pathman.PathManager,
parent Parent) (*Server, error) {
ctx, ctxCancel := context.WithCancel(context.Background())
s := &Server{
readTimeout: readTimeout,
isTLS: isTLS,
@ -100,10 +100,10 @@ func New( @@ -100,10 +100,10 @@ func New(
stats: stats,
pathMan: pathMan,
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
conns: make(map[*gortsplib.ServerConn]*rtspconn.Conn),
sessions: make(map[*gortsplib.ServerSession]*rtspsession.Session),
terminate: make(chan struct{}),
done: make(chan struct{}),
}
s.srv = &gortsplib.Server{
@ -143,6 +143,7 @@ func New( @@ -143,6 +143,7 @@ func New(
s.Log(logger.Info, "TCP listener opened on %s", address)
s.wg.Add(1)
go s.run()
return s, nil
@ -161,18 +162,24 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) { @@ -161,18 +162,24 @@ func (s *Server) Log(level logger.Level, format string, args ...interface{}) {
// Close closes a Server.
func (s *Server) Close() {
close(s.terminate)
<-s.done
s.ctxCancel()
s.wg.Wait()
}
func (s *Server) run() {
defer close(s.done)
defer s.wg.Done()
serverDone := make(chan struct{})
s.wg.Add(1)
serverErr := make(chan error)
go func() {
defer close(serverDone)
serverErr <- s.srv.Wait()
defer s.wg.Done()
err := s.srv.Wait()
select {
case serverErr <- err:
case <-s.ctx.Done():
}
}()
outer:
@ -181,20 +188,13 @@ outer: @@ -181,20 +188,13 @@ outer:
s.Log(logger.Warn, "ERR: %s", err)
break outer
case <-s.terminate:
case <-s.ctx.Done():
break outer
}
go func() {
for range serverErr {
}
}()
s.ctxCancel()
s.srv.Close()
<-serverDone
close(serverErr)
}
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.

28
internal/rtspsource/source.go

@ -43,8 +43,8 @@ type Source struct { @@ -43,8 +43,8 @@ type Source struct {
stats *stats.Stats
parent Parent
// in
terminate chan struct{}
ctx context.Context
ctxCancel func()
}
// New allocates a Source.
@ -59,6 +59,9 @@ func New( @@ -59,6 +59,9 @@ func New(
wg *sync.WaitGroup,
stats *stats.Stats,
parent Parent) *Source {
ctx, ctxCancel := context.WithCancel(context.Background())
s := &Source{
ur: ur,
proto: proto,
@ -70,7 +73,8 @@ func New( @@ -70,7 +73,8 @@ func New(
wg: wg,
stats: stats,
parent: parent,
terminate: make(chan struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
}
atomic.AddInt64(s.stats.CountSourcesRTSP, +1)
@ -85,7 +89,7 @@ func New( @@ -85,7 +89,7 @@ func New(
func (s *Source) Close() {
atomic.AddInt64(s.stats.CountSourcesRTSP, -1)
s.log(logger.Info, "stopped")
close(s.terminate)
s.ctxCancel()
}
// IsSource implements source.Source.
@ -111,7 +115,7 @@ func (s *Source) run() { @@ -111,7 +115,7 @@ func (s *Source) run() {
select {
case <-time.After(retryPause):
return true
case <-s.terminate:
case <-s.ctx.Done():
return false
}
}()
@ -119,6 +123,8 @@ func (s *Source) run() { @@ -119,6 +123,8 @@ func (s *Source) run() {
break
}
}
s.ctxCancel()
}
func (s *Source) runInner() bool {
@ -154,24 +160,24 @@ func (s *Source) runInner() bool { @@ -154,24 +160,24 @@ func (s *Source) runInner() bool {
},
}
ctx, ctxCancel := context.WithCancel(context.Background())
innerCtx, innerCtxCancel := context.WithCancel(context.Background())
var conn *gortsplib.ClientConn
var err error
dialDone := make(chan struct{})
go func() {
defer close(dialDone)
conn, err = client.DialReadContext(ctx, s.ur)
conn, err = client.DialReadContext(innerCtx, s.ur)
}()
select {
case <-s.terminate:
ctxCancel()
case <-s.ctx.Done():
innerCtxCancel()
<-dialDone
return false
case <-dialDone:
ctxCancel()
innerCtxCancel()
}
if err != nil {
@ -204,7 +210,7 @@ func (s *Source) runInner() bool { @@ -204,7 +210,7 @@ func (s *Source) runInner() bool {
}()
select {
case <-s.terminate:
case <-s.ctx.Done():
conn.Close()
<-readErr
return false

18
main.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package main
import (
"context"
"fmt"
"os"
"reflect"
@ -25,6 +26,8 @@ import ( @@ -25,6 +26,8 @@ import (
var version = "v0.0.0"
type program struct {
ctx context.Context
ctxCancel func()
confPath string
conf *conf.Conf
confFound bool
@ -39,8 +42,8 @@ type program struct { @@ -39,8 +42,8 @@ type program struct {
serverHLS *hlsserver.Server
confWatcher *confwatcher.ConfWatcher
terminate chan struct{}
done chan struct{}
// out
done chan struct{}
}
func newProgram(args []string) (*program, bool) {
@ -62,9 +65,12 @@ func newProgram(args []string) (*program, bool) { @@ -62,9 +65,12 @@ func newProgram(args []string) (*program, bool) {
// do not check for errors
rlimit.Raise()
ctx, ctxCancel := context.WithCancel(context.Background())
p := &program{
ctx: ctx,
ctxCancel: ctxCancel,
confPath: *argConfPath,
terminate: make(chan struct{}),
done: make(chan struct{}),
}
@ -97,7 +103,7 @@ func newProgram(args []string) (*program, bool) { @@ -97,7 +103,7 @@ func newProgram(args []string) (*program, bool) {
}
func (p *program) close() {
close(p.terminate)
p.ctxCancel()
<-p.done
}
@ -129,11 +135,13 @@ outer: @@ -129,11 +135,13 @@ outer:
break outer
}
case <-p.terminate:
case <-p.ctx.Done():
break outer
}
}
p.ctxCancel()
p.closeResources(nil)
if p.confWatcher != nil {

Loading…
Cancel
Save