Browse Source

un-capitalize private fields

pull/764/head
aler9 4 years ago
parent
commit
8ac665be87
  1. 32
      internal/core/api.go
  2. 4
      internal/core/core_test.go
  3. 102
      internal/core/hls_muxer.go
  4. 46
      internal/core/hls_server.go
  5. 14
      internal/core/hls_source.go
  6. 20
      internal/core/metrics.go
  7. 270
      internal/core/path.go
  8. 90
      internal/core/path_manager.go
  9. 58
      internal/core/rtmp_conn.go
  10. 32
      internal/core/rtmp_server.go
  11. 16
      internal/core/rtmp_source.go
  12. 44
      internal/core/rtsp_conn.go
  13. 18
      internal/core/rtsp_server.go
  14. 68
      internal/core/rtsp_session.go
  15. 28
      internal/core/rtsp_source.go

32
internal/core/api.go

@ -427,29 +427,29 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { @@ -427,29 +427,29 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) {
func (a *api) onPathsList(ctx *gin.Context) {
res := a.pathManager.onAPIPathsList(pathAPIPathsListReq{})
if res.Err != nil {
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.Data)
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTSPSessionsList(ctx *gin.Context) {
res := a.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{})
if res.Err != nil {
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.Data)
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtspServer.onAPISessionsKick(rtspServerAPISessionsKickReq{ID: id})
if res.Err != nil {
res := a.rtspServer.onAPISessionsKick(rtspServerAPISessionsKickReq{id: id})
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -459,19 +459,19 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { @@ -459,19 +459,19 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) {
func (a *api) onRTSPSSessionsList(ctx *gin.Context) {
res := a.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{})
if res.Err != nil {
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.Data)
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtspsServer.onAPISessionsKick(rtspServerAPISessionsKickReq{ID: id})
if res.Err != nil {
res := a.rtspsServer.onAPISessionsKick(rtspServerAPISessionsKickReq{id: id})
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -481,19 +481,19 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { @@ -481,19 +481,19 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) {
func (a *api) onRTMPConnsList(ctx *gin.Context) {
res := a.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{})
if res.Err != nil {
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.Data)
ctx.JSON(http.StatusOK, res.data)
}
func (a *api) onRTMPConnsKick(ctx *gin.Context) {
id := ctx.Param("id")
res := a.rtmpServer.onAPIConnsKick(rtmpServerAPIConnsKickReq{ID: id})
if res.Err != nil {
res := a.rtmpServer.onAPIConnsKick(rtmpServerAPIConnsKickReq{id: id})
if res.err != nil {
ctx.AbortWithStatus(http.StatusNotFound)
return
}
@ -503,12 +503,12 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { @@ -503,12 +503,12 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) {
func (a *api) onHLSMuxersList(ctx *gin.Context) {
res := a.hlsServer.onAPIHLSMuxersList(hlsServerAPIMuxersListReq{})
if res.Err != nil {
if res.err != nil {
ctx.AbortWithStatus(http.StatusInternalServerError)
return
}
ctx.JSON(http.StatusOK, res.Data)
ctx.JSON(http.StatusOK, res.data)
}
// onConfReload is called by core.

4
internal/core/core_test.go

@ -191,9 +191,9 @@ func TestCorePathAutoDeletion(t *testing.T) { @@ -191,9 +191,9 @@ func TestCorePathAutoDeletion(t *testing.T) {
}()
res := p.pathManager.onAPIPathsList(pathAPIPathsListReq{})
require.NoError(t, res.Err)
require.NoError(t, res.err)
require.Equal(t, 0, len(res.Data.Items))
require.Equal(t, 0, len(res.data.Items))
})
}
}

102
internal/core/hls_muxer.go

@ -95,16 +95,16 @@ window.addEventListener('DOMContentLoaded', create); @@ -95,16 +95,16 @@ window.addEventListener('DOMContentLoaded', create);
`
type hlsMuxerResponse struct {
Status int
Header map[string]string
Body io.Reader
status int
header map[string]string
body io.Reader
}
type hlsMuxerRequest struct {
Dir string
File string
Req *http.Request
Res chan hlsMuxerResponse
dir string
file string
req *http.Request
res chan hlsMuxerResponse
}
type hlsMuxerTrackIDPayloadPair struct {
@ -224,21 +224,21 @@ func (m *hlsMuxer) run() { @@ -224,21 +224,21 @@ func (m *hlsMuxer) run() {
case req := <-m.request:
if isReady {
req.Res <- m.handleRequest(req)
req.res <- m.handleRequest(req)
} else {
m.requests = append(m.requests, req)
}
case req := <-m.hlsServerAPIMuxersList:
req.Data.Items[m.name] = hlsServerAPIMuxersListItem{
req.data.Items[m.name] = hlsServerAPIMuxersListItem{
LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(),
}
close(req.Res)
close(req.res)
case <-innerReady:
isReady = true
for _, req := range m.requests {
req.Res <- m.handleRequest(req)
req.res <- m.handleRequest(req)
}
m.requests = nil
@ -252,7 +252,7 @@ func (m *hlsMuxer) run() { @@ -252,7 +252,7 @@ func (m *hlsMuxer) run() {
m.ctxCancel()
for _, req := range m.requests {
req.Res <- hlsMuxerResponse{Status: http.StatusNotFound}
req.res <- hlsMuxerResponse{status: http.StatusNotFound}
}
m.parent.onMuxerClose(m)
@ -262,18 +262,18 @@ func (m *hlsMuxer) run() { @@ -262,18 +262,18 @@ func (m *hlsMuxer) run() {
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: m,
PathName: m.pathName,
Authenticate: nil,
author: m,
pathName: m.pathName,
authenticate: nil,
})
if res.Err != nil {
return res.Err
if res.err != nil {
return res.err
}
m.path = res.Path
m.path = res.path
defer func() {
m.path.onReaderRemove(pathReaderRemoveReq{Author: m})
m.path.onReaderRemove(pathReaderRemoveReq{author: m})
}()
var videoTrack *gortsplib.Track
@ -283,7 +283,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -283,7 +283,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
audioTrackID := -1
var aacDecoder *rtpaac.Decoder
for i, t := range res.Stream.tracks() {
for i, t := range res.stream.tracks() {
if t.IsH264() {
if videoTrack != nil {
return fmt.Errorf("can't read track %d with HLS: too many tracks", i+1)
@ -330,7 +330,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -330,7 +330,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.ringBuffer = ringbuffer.New(uint64(m.readBufferCount))
m.path.onReaderPlay(pathReaderPlayReq{Author: m})
m.path.onReaderPlay(pathReaderPlayReq{author: m})
writerDone := make(chan error)
go func() {
@ -415,67 +415,67 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -415,67 +415,67 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
func (m *hlsMuxer) handleRequest(req hlsMuxerRequest) hlsMuxerResponse {
atomic.StoreInt64(m.lastRequestTime, time.Now().Unix())
err := m.authenticate(req.Req)
err := m.authenticate(req.req)
if err != nil {
if terr, ok := err.(pathErrAuthCritical); ok {
m.log(logger.Info, "authentication error: %s", terr.Message)
m.log(logger.Info, "authentication error: %s", terr.message)
return hlsMuxerResponse{
Status: http.StatusUnauthorized,
status: http.StatusUnauthorized,
}
}
return hlsMuxerResponse{
Status: http.StatusUnauthorized,
Header: map[string]string{
status: http.StatusUnauthorized,
header: map[string]string{
"WWW-Authenticate": `Basic realm="rtsp-simple-server"`,
},
}
}
switch {
case req.File == "index.m3u8":
case req.file == "index.m3u8":
return hlsMuxerResponse{
Status: http.StatusOK,
Header: map[string]string{
status: http.StatusOK,
header: map[string]string{
"Content-Type": `application/x-mpegURL`,
},
Body: m.muxer.PrimaryPlaylist(),
body: m.muxer.PrimaryPlaylist(),
}
case req.File == "stream.m3u8":
case req.file == "stream.m3u8":
return hlsMuxerResponse{
Status: http.StatusOK,
Header: map[string]string{
status: http.StatusOK,
header: map[string]string{
"Content-Type": `application/x-mpegURL`,
},
Body: m.muxer.StreamPlaylist(),
body: m.muxer.StreamPlaylist(),
}
case strings.HasSuffix(req.File, ".ts"):
r := m.muxer.Segment(req.File)
case strings.HasSuffix(req.file, ".ts"):
r := m.muxer.Segment(req.file)
if r == nil {
return hlsMuxerResponse{Status: http.StatusNotFound}
return hlsMuxerResponse{status: http.StatusNotFound}
}
return hlsMuxerResponse{
Status: http.StatusOK,
Header: map[string]string{
status: http.StatusOK,
header: map[string]string{
"Content-Type": `video/MP2T`,
},
Body: r,
body: r,
}
case req.File == "":
case req.file == "":
return hlsMuxerResponse{
Status: http.StatusOK,
Header: map[string]string{
status: http.StatusOK,
header: map[string]string{
"Content-Type": `text/html`,
},
Body: bytes.NewReader([]byte(index)),
body: bytes.NewReader([]byte(index)),
}
default:
return hlsMuxerResponse{Status: http.StatusNotFound}
return hlsMuxerResponse{status: http.StatusNotFound}
}
}
@ -499,7 +499,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error { @@ -499,7 +499,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error {
"read")
if err != nil {
return pathErrAuthCritical{
Message: fmt.Sprintf("external authentication failed: %s", err),
message: fmt.Sprintf("external authentication failed: %s", err),
}
}
}
@ -510,7 +510,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error { @@ -510,7 +510,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error {
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
Message: fmt.Sprintf("IP '%s' not allowed", ip),
message: fmt.Sprintf("IP '%s' not allowed", ip),
}
}
}
@ -523,7 +523,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error { @@ -523,7 +523,7 @@ func (m *hlsMuxer) authenticate(req *http.Request) error {
if user != string(pathUser) || pass != string(pathPass) {
return pathErrAuthCritical{
Message: "invalid credentials",
message: "invalid credentials",
}
}
}
@ -536,7 +536,7 @@ func (m *hlsMuxer) onRequest(req hlsMuxerRequest) { @@ -536,7 +536,7 @@ func (m *hlsMuxer) onRequest(req hlsMuxerRequest) {
select {
case m.request <- req:
case <-m.ctx.Done():
req.Res <- hlsMuxerResponse{Status: http.StatusNotFound}
req.res <- hlsMuxerResponse{status: http.StatusNotFound}
}
}
@ -563,10 +563,10 @@ func (m *hlsMuxer) onReaderAPIDescribe() interface{} { @@ -563,10 +563,10 @@ func (m *hlsMuxer) onReaderAPIDescribe() interface{} {
// onAPIHLSMuxersList is called by api.
func (m *hlsMuxer) onAPIHLSMuxersList(req hlsServerAPIMuxersListSubReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case m.hlsServerAPIMuxersList <- req:
<-req.Res
<-req.res
case <-m.ctx.Done():
}

46
internal/core/hls_server.go

@ -26,18 +26,18 @@ type hlsServerAPIMuxersListData struct { @@ -26,18 +26,18 @@ type hlsServerAPIMuxersListData struct {
}
type hlsServerAPIMuxersListRes struct {
Data *hlsServerAPIMuxersListData
Muxers map[string]*hlsMuxer
Err error
data *hlsServerAPIMuxersListData
muxers map[string]*hlsMuxer
err error
}
type hlsServerAPIMuxersListReq struct {
Res chan hlsServerAPIMuxersListRes
res chan hlsServerAPIMuxersListRes
}
type hlsServerAPIMuxersListSubReq struct {
Data *hlsServerAPIMuxersListData
Res chan struct{}
data *hlsServerAPIMuxersListData
res chan struct{}
}
type hlsServerParent interface {
@ -151,7 +151,7 @@ outer: @@ -151,7 +151,7 @@ outer:
}
case req := <-s.request:
r := s.findOrCreateMuxer(req.Dir)
r := s.findOrCreateMuxer(req.dir)
r.onRequest(req)
case c := <-s.muxerClose:
@ -167,8 +167,8 @@ outer: @@ -167,8 +167,8 @@ outer:
muxers[name] = m
}
req.Res <- hlsServerAPIMuxersListRes{
Muxers: muxers,
req.res <- hlsServerAPIMuxersListRes{
muxers: muxers,
}
case <-s.ctx.Done():
@ -240,23 +240,23 @@ func (s *hlsServer) onRequest(ctx *gin.Context) { @@ -240,23 +240,23 @@ func (s *hlsServer) onRequest(ctx *gin.Context) {
cres := make(chan hlsMuxerResponse)
hreq := hlsMuxerRequest{
Dir: dir,
File: fname,
Req: ctx.Request,
Res: cres,
dir: dir,
file: fname,
req: ctx.Request,
res: cres,
}
select {
case s.request <- hreq:
res := <-cres
for k, v := range res.Header {
for k, v := range res.header {
ctx.Writer.Header().Set(k, v)
}
ctx.Writer.WriteHeader(res.Status)
ctx.Writer.WriteHeader(res.status)
if res.Body != nil {
io.Copy(ctx.Writer, res.Body)
if res.body != nil {
io.Copy(ctx.Writer, res.body)
}
case <-s.ctx.Done():
@ -303,22 +303,22 @@ func (s *hlsServer) onPathSourceReady(pa *path) { @@ -303,22 +303,22 @@ func (s *hlsServer) onPathSourceReady(pa *path) {
// onAPIHLSMuxersList is called by api.
func (s *hlsServer) onAPIHLSMuxersList(req hlsServerAPIMuxersListReq) hlsServerAPIMuxersListRes {
req.Res = make(chan hlsServerAPIMuxersListRes)
req.res = make(chan hlsServerAPIMuxersListRes)
select {
case s.apiMuxersList <- req:
res := <-req.Res
res := <-req.res
res.Data = &hlsServerAPIMuxersListData{
res.data = &hlsServerAPIMuxersListData{
Items: make(map[string]hlsServerAPIMuxersListItem),
}
for _, pa := range res.Muxers {
pa.onAPIHLSMuxersList(hlsServerAPIMuxersListSubReq{Data: res.Data})
for _, pa := range res.muxers {
pa.onAPIHLSMuxersList(hlsServerAPIMuxersListSubReq{data: res.data})
}
return res
case <-s.ctx.Done():
return hlsServerAPIMuxersListRes{Err: fmt.Errorf("terminated")}
return hlsServerAPIMuxersListRes{err: fmt.Errorf("terminated")}
}
}

14
internal/core/hls_source.go

@ -19,7 +19,7 @@ const ( @@ -19,7 +19,7 @@ const (
type hlsSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type hlsSource struct {
@ -94,7 +94,7 @@ func (s *hlsSource) runInner() bool { @@ -94,7 +94,7 @@ func (s *hlsSource) runInner() bool {
defer func() {
if stream != nil {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
rtcpSenders.Close()
}
}()
@ -113,16 +113,16 @@ func (s *hlsSource) runInner() bool { @@ -113,16 +113,16 @@ func (s *hlsSource) runInner() bool {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s,
Tracks: tracks,
source: s,
tracks: tracks,
})
if res.Err != nil {
return res.Err
if res.err != nil {
return res.err
}
s.Log(logger.Info, "ready")
stream = res.Stream
stream = res.stream
rtcpSenders = rtcpsenderset.New(tracks, stream.onPacketRTCP)
return nil

20
internal/core/metrics.go

@ -96,8 +96,8 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -96,8 +96,8 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
out := ""
res := m.pathManager.onAPIPathsList(pathAPIPathsListReq{})
if res.Err == nil {
for name, p := range res.Data.Items {
if res.err == nil {
for name, p := range res.data.Items {
if p.SourceReady {
out += metric("paths{name=\""+name+"\",state=\"ready\"}", 1)
} else {
@ -108,12 +108,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -108,12 +108,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtspServer) {
res := m.rtspServer.onAPISessionsList(rtspServerAPISessionsListReq{})
if res.Err == nil {
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
@ -135,12 +135,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -135,12 +135,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtspsServer) {
res := m.rtspsServer.onAPISessionsList(rtspServerAPISessionsListReq{})
if res.Err == nil {
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
@ -162,12 +162,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -162,12 +162,12 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.rtmpServer) {
res := m.rtmpServer.onAPIConnsList(rtmpServerAPIConnsListReq{})
if res.Err == nil {
if res.err == nil {
idleCount := int64(0)
readCount := int64(0)
publishCount := int64(0)
for _, i := range res.Data.Items {
for _, i := range res.data.Items {
switch i.State {
case "idle":
idleCount++
@ -189,8 +189,8 @@ func (m *metrics) onMetrics(ctx *gin.Context) { @@ -189,8 +189,8 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
if !interfaceIsEmpty(m.hlsServer) {
res := m.hlsServer.onAPIHLSMuxersList(hlsServerAPIMuxersListReq{})
if res.Err == nil {
for name := range res.Data.Items {
if res.err == nil {
for name := range res.data.Items {
out += metric("hls_muxers{name=\""+name+"\"}", 1)
}
}

270
internal/core/path.go

@ -30,17 +30,17 @@ type authenticateFunc func( @@ -30,17 +30,17 @@ type authenticateFunc func(
) error
type pathErrNoOnePublishing struct {
PathName string
pathName string
}
// Error implements the error interface.
func (e pathErrNoOnePublishing) Error() string {
return fmt.Sprintf("no one is publishing to path '%s'", e.PathName)
return fmt.Sprintf("no one is publishing to path '%s'", e.pathName)
}
type pathErrAuthNotCritical struct {
Message string
Response *base.Response
message string
response *base.Response
}
// Error implements the error interface.
@ -49,8 +49,8 @@ func (pathErrAuthNotCritical) Error() string { @@ -49,8 +49,8 @@ func (pathErrAuthNotCritical) Error() string {
}
type pathErrAuthCritical struct {
Message string
Response *base.Response
message string
response *base.Response
}
// Error implements the error interface.
@ -94,94 +94,94 @@ const ( @@ -94,94 +94,94 @@ const (
)
type pathSourceStaticSetReadyRes struct {
Stream *stream
Err error
stream *stream
err error
}
type pathSourceStaticSetReadyReq struct {
Source sourceStatic
Tracks gortsplib.Tracks
Res chan pathSourceStaticSetReadyRes
source sourceStatic
tracks gortsplib.Tracks
res chan pathSourceStaticSetReadyRes
}
type pathSourceStaticSetNotReadyReq struct {
Source sourceStatic
Res chan struct{}
source sourceStatic
res chan struct{}
}
type pathReaderRemoveReq struct {
Author reader
Res chan struct{}
author reader
res chan struct{}
}
type pathPublisherRemoveReq struct {
Author publisher
Res chan struct{}
author publisher
res chan struct{}
}
type pathDescribeRes struct {
Path *path
Stream *stream
Redirect string
Err error
path *path
stream *stream
redirect string
err error
}
type pathDescribeReq struct {
PathName string
URL *base.URL
Authenticate authenticateFunc
Res chan pathDescribeRes
pathName string
url *base.URL
authenticate authenticateFunc
res chan pathDescribeRes
}
type pathReaderSetupPlayRes struct {
Path *path
Stream *stream
Err error
path *path
stream *stream
err error
}
type pathReaderSetupPlayReq struct {
Author reader
PathName string
Authenticate authenticateFunc
Res chan pathReaderSetupPlayRes
author reader
pathName string
authenticate authenticateFunc
res chan pathReaderSetupPlayRes
}
type pathPublisherAnnounceRes struct {
Path *path
Err error
path *path
err error
}
type pathPublisherAnnounceReq struct {
Author publisher
PathName string
Authenticate authenticateFunc
Res chan pathPublisherAnnounceRes
author publisher
pathName string
authenticate authenticateFunc
res chan pathPublisherAnnounceRes
}
type pathReaderPlayReq struct {
Author reader
Res chan struct{}
author reader
res chan struct{}
}
type pathPublisherRecordRes struct {
Stream *stream
Err error
stream *stream
err error
}
type pathPublisherRecordReq struct {
Author publisher
Tracks gortsplib.Tracks
Res chan pathPublisherRecordRes
author publisher
tracks gortsplib.Tracks
res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
Author reader
Res chan struct{}
author reader
res chan struct{}
}
type pathPublisherPauseReq struct {
Author publisher
Res chan struct{}
author publisher
res chan struct{}
}
type pathAPIPathsListItem struct {
@ -197,18 +197,18 @@ type pathAPIPathsListData struct { @@ -197,18 +197,18 @@ type pathAPIPathsListData struct {
}
type pathAPIPathsListRes struct {
Data *pathAPIPathsListData
Paths map[string]*path
Err error
data *pathAPIPathsListData
paths map[string]*path
err error
}
type pathAPIPathsListReq struct {
Res chan pathAPIPathsListRes
res chan pathAPIPathsListRes
}
type pathAPIPathsListSubReq struct {
Data *pathAPIPathsListData
Res chan struct{}
data *pathAPIPathsListData
res chan struct{}
}
type path struct {
@ -362,12 +362,12 @@ func (pa *path) run() { @@ -362,12 +362,12 @@ func (pa *path) run() {
select {
case <-pa.onDemandReadyTimer.C:
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequests = nil
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequests = nil
@ -385,22 +385,22 @@ func (pa *path) run() { @@ -385,22 +385,22 @@ func (pa *path) run() {
}
case req := <-pa.sourceStaticSetReady:
if req.Source == pa.source {
pa.sourceSetReady(req.Tracks)
req.Res <- pathSourceStaticSetReadyRes{Stream: pa.stream}
if req.source == pa.source {
pa.sourceSetReady(req.tracks)
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
} else {
req.Res <- pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
req.res <- pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
}
case req := <-pa.sourceStaticSetNotReady:
if req.Source == pa.source {
if req.source == pa.source {
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
}
close(req.Res)
close(req.res)
if pa.shouldClose() {
return fmt.Errorf("not in use")
@ -469,11 +469,11 @@ func (pa *path) run() { @@ -469,11 +469,11 @@ func (pa *path) run() {
}
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{Err: fmt.Errorf("terminated")}
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequests {
req.Res <- pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}
for rp := range pa.readers {
@ -609,8 +609,8 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) { @@ -609,8 +609,8 @@ func (pa *path) sourceSetReady(tracks gortsplib.Tracks) {
pa.onDemandReadyTimer = newEmptyTimer()
for _, req := range pa.describeRequests {
req.Res <- pathDescribeRes{
Stream: pa.stream,
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequests = nil
@ -711,15 +711,15 @@ func (pa *path) doPublisherRemove() { @@ -711,15 +711,15 @@ func (pa *path) doPublisherRemove() {
func (pa *path) handleDescribe(req pathDescribeReq) {
if _, ok := pa.source.(*sourceRedirect); ok {
req.Res <- pathDescribeRes{
Redirect: pa.conf.SourceRedirect,
req.res <- pathDescribeRes{
redirect: pa.conf.SourceRedirect,
}
return
}
if pa.sourceReady {
req.Res <- pathDescribeRes{
Stream: pa.stream,
req.res <- pathDescribeRes{
stream: pa.stream,
}
return
}
@ -736,38 +736,38 @@ func (pa *path) handleDescribe(req pathDescribeReq) { @@ -736,38 +736,38 @@ func (pa *path) handleDescribe(req pathDescribeReq) {
fallbackURL := func() string {
if strings.HasPrefix(pa.conf.Fallback, "/") {
ur := base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Scheme: req.url.Scheme,
User: req.url.User,
Host: req.url.Host,
Path: pa.conf.Fallback,
}
return ur.String()
}
return pa.conf.Fallback
}()
req.Res <- pathDescribeRes{Redirect: fallbackURL}
req.res <- pathDescribeRes{redirect: fallbackURL}
return
}
req.Res <- pathDescribeRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
req.res <- pathDescribeRes{err: pathErrNoOnePublishing{pathName: pa.name}}
}
func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
if pa.source == req.Author {
if pa.source == req.author {
pa.doPublisherRemove()
}
close(req.Res)
close(req.res)
}
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
if pa.source != nil {
if pa.hasStaticSource() {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("path '%s' is assigned to a static source", pa.name)}
req.res <- pathPublisherAnnounceRes{err: fmt.Errorf("path '%s' is assigned to a static source", pa.name)}
return
}
if pa.conf.DisablePublisherOverride {
req.Res <- pathPublisherAnnounceRes{Err: fmt.Errorf("another publisher is already publishing to path '%s'", pa.name)}
req.res <- pathPublisherAnnounceRes{err: fmt.Errorf("another publisher is already publishing to path '%s'", pa.name)}
return
}
@ -776,20 +776,20 @@ func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) { @@ -776,20 +776,20 @@ func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
pa.doPublisherRemove()
}
pa.source = req.Author
pa.source = req.author
req.Res <- pathPublisherAnnounceRes{Path: pa}
req.res <- pathPublisherAnnounceRes{path: pa}
}
func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
if pa.source != req.Author {
req.Res <- pathPublisherRecordRes{Err: fmt.Errorf("publisher is not assigned to this path anymore")}
if pa.source != req.author {
req.res <- pathPublisherRecordRes{err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
}
req.Author.onPublisherAccepted(len(req.Tracks))
req.author.onPublisherAccepted(len(req.tracks))
pa.sourceSetReady(req.Tracks)
pa.sourceSetReady(req.tracks)
if pa.conf.RunOnPublish != "" {
pa.log(logger.Info, "runOnPublish command started")
@ -803,25 +803,25 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) { @@ -803,25 +803,25 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
})
}
req.Res <- pathPublisherRecordRes{Stream: pa.stream}
req.res <- pathPublisherRecordRes{stream: pa.stream}
}
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
if req.Author == pa.source && pa.sourceReady {
if req.author == pa.source && pa.sourceReady {
if pa.isOnDemand() && pa.onDemandState != pathOnDemandStateInitial {
pa.onDemandCloseSource()
} else {
pa.sourceSetNotReady()
}
}
close(req.Res)
close(req.res)
}
func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
if _, ok := pa.readers[req.Author]; ok {
pa.doReaderRemove(req.Author)
if _, ok := pa.readers[req.author]; ok {
pa.doReaderRemove(req.author)
}
close(req.Res)
close(req.res)
if pa.isOnDemand() &&
len(pa.readers) == 0 &&
@ -844,11 +844,11 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) { @@ -844,11 +844,11 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
return
}
req.Res <- pathReaderSetupPlayRes{Err: pathErrNoOnePublishing{PathName: pa.name}}
req.res <- pathReaderSetupPlayRes{err: pathErrNoOnePublishing{pathName: pa.name}}
}
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
pa.readers[req.Author] = pathReaderStatePrePlay
pa.readers[req.author] = pathReaderStatePrePlay
if pa.isOnDemand() && pa.onDemandState == pathOnDemandStateClosing {
pa.onDemandState = pathOnDemandStateReady
@ -856,32 +856,32 @@ func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) { @@ -856,32 +856,32 @@ func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
pa.onDemandCloseTimer = newEmptyTimer()
}
req.Res <- pathReaderSetupPlayRes{
Path: pa,
Stream: pa.stream,
req.res <- pathReaderSetupPlayRes{
path: pa,
stream: pa.stream,
}
}
func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
pa.readers[req.Author] = pathReaderStatePlay
pa.readers[req.author] = pathReaderStatePlay
pa.stream.readerAdd(req.Author)
pa.stream.readerAdd(req.author)
req.Author.onReaderAccepted()
req.author.onReaderAccepted()
close(req.Res)
close(req.res)
}
func (pa *path) handleReaderPause(req pathReaderPauseReq) {
if state, ok := pa.readers[req.Author]; ok && state == pathReaderStatePlay {
pa.readers[req.Author] = pathReaderStatePrePlay
pa.stream.readerRemove(req.Author)
if state, ok := pa.readers[req.author]; ok && state == pathReaderStatePlay {
pa.readers[req.author] = pathReaderStatePrePlay
pa.stream.readerRemove(req.author)
}
close(req.Res)
close(req.res)
}
func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
req.Data.Items[pa.name] = pathAPIPathsListItem{
req.data.Items[pa.name] = pathAPIPathsListItem{
ConfName: pa.confName,
Conf: pa.conf,
Source: func() interface{} {
@ -899,26 +899,26 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { @@ -899,26 +899,26 @@ func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) {
return ret
}(),
}
close(req.Res)
close(req.res)
}
// onSourceStaticSetReady is called by a sourceStatic.
func (pa *path) onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.Res = make(chan pathSourceStaticSetReadyRes)
req.res = make(chan pathSourceStaticSetReadyRes)
select {
case pa.sourceStaticSetReady <- req:
return <-req.Res
return <-req.res
case <-pa.ctx.Done():
return pathSourceStaticSetReadyRes{Err: fmt.Errorf("terminated")}
return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
}
}
// OnSourceStaticSetNotReady is called by a sourceStatic.
func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.Res = make(chan struct{})
// onSourceStaticSetNotReady is called by a sourceStatic.
func (pa *path) onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
req.res = make(chan struct{})
select {
case pa.sourceStaticSetNotReady <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
@ -927,18 +927,18 @@ func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) { @@ -927,18 +927,18 @@ func (pa *path) OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq) {
func (pa *path) onDescribe(req pathDescribeReq) pathDescribeRes {
select {
case pa.describe <- req:
return <-req.Res
return <-req.res
case <-pa.ctx.Done():
return pathDescribeRes{Err: fmt.Errorf("terminated")}
return pathDescribeRes{err: fmt.Errorf("terminated")}
}
}
// onPublisherRemove is called by a publisher.
func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.publisherRemove <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
@ -947,39 +947,39 @@ func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) { @@ -947,39 +947,39 @@ func (pa *path) onPublisherRemove(req pathPublisherRemoveReq) {
func (pa *path) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
select {
case pa.publisherAnnounce <- req:
return <-req.Res
return <-req.res
case <-pa.ctx.Done():
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
}
}
// onPublisherRecord is called by a publisher.
func (pa *path) onPublisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
req.Res = make(chan pathPublisherRecordRes)
req.res = make(chan pathPublisherRecordRes)
select {
case pa.publisherRecord <- req:
return <-req.Res
return <-req.res
case <-pa.ctx.Done():
return pathPublisherRecordRes{Err: fmt.Errorf("terminated")}
return pathPublisherRecordRes{err: fmt.Errorf("terminated")}
}
}
// onPublisherPause is called by a publisher.
func (pa *path) onPublisherPause(req pathPublisherPauseReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.publisherPause <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
// onReaderRemove is called by a reader.
func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.readerRemove <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
@ -988,38 +988,38 @@ func (pa *path) onReaderRemove(req pathReaderRemoveReq) { @@ -988,38 +988,38 @@ func (pa *path) onReaderRemove(req pathReaderRemoveReq) {
func (pa *path) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
select {
case pa.readerSetupPlay <- req:
return <-req.Res
return <-req.res
case <-pa.ctx.Done():
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}
}
// onReaderPlay is called by a reader.
func (pa *path) onReaderPlay(req pathReaderPlayReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.readerPlay <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
// onReaderPause is called by a reader.
func (pa *path) onReaderPause(req pathReaderPauseReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.readerPause <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}
}
// onAPIPathsList is called by api.
func (pa *path) onAPIPathsList(req pathAPIPathsListSubReq) {
req.Res = make(chan struct{})
req.res = make(chan struct{})
select {
case pa.apiPathsList <- req:
<-req.Res
<-req.res
case <-pa.ctx.Done():
}

90
internal/core/path_manager.go

@ -165,75 +165,75 @@ outer: @@ -165,75 +165,75 @@ outer:
}
case req := <-pm.describe:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.PathName)
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
req.Res <- pathDescribeRes{Err: err}
req.res <- pathDescribeRes{err: err}
continue
}
err = req.Authenticate(
err = req.authenticate(
pathConf.ReadIPs,
pathConf.ReadUser,
pathConf.ReadPass)
if err != nil {
req.Res <- pathDescribeRes{Err: err}
req.res <- pathDescribeRes{err: err}
continue
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
pm.createPath(pathConfName, pathConf, req.PathName, pathMatches)
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.Res <- pathDescribeRes{Path: pm.paths[req.PathName]}
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
case req := <-pm.readerSetupPlay:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.PathName)
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
req.Res <- pathReaderSetupPlayRes{Err: err}
req.res <- pathReaderSetupPlayRes{err: err}
continue
}
if req.Authenticate != nil {
err = req.Authenticate(
if req.authenticate != nil {
err = req.authenticate(
pathConf.ReadIPs,
pathConf.ReadUser,
pathConf.ReadPass)
if err != nil {
req.Res <- pathReaderSetupPlayRes{Err: err}
req.res <- pathReaderSetupPlayRes{err: err}
continue
}
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
pm.createPath(pathConfName, pathConf, req.PathName, pathMatches)
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.Res <- pathReaderSetupPlayRes{Path: pm.paths[req.PathName]}
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]}
case req := <-pm.publisherAnnounce:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.PathName)
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
req.Res <- pathPublisherAnnounceRes{Err: err}
req.res <- pathPublisherAnnounceRes{err: err}
continue
}
err = req.Authenticate(
err = req.authenticate(
pathConf.PublishIPs,
pathConf.PublishUser,
pathConf.PublishPass)
if err != nil {
req.Res <- pathPublisherAnnounceRes{Err: err}
req.res <- pathPublisherAnnounceRes{err: err}
continue
}
// create path if it doesn't exist
if _, ok := pm.paths[req.PathName]; !ok {
pm.createPath(pathConfName, pathConf, req.PathName, pathMatches)
if _, ok := pm.paths[req.pathName]; !ok {
pm.createPath(pathConfName, pathConf, req.pathName, pathMatches)
}
req.Res <- pathPublisherAnnounceRes{Path: pm.paths[req.PathName]}
req.res <- pathPublisherAnnounceRes{path: pm.paths[req.pathName]}
case s := <-pm.hlsServerSet:
pm.hlsServer = s
@ -245,8 +245,8 @@ outer: @@ -245,8 +245,8 @@ outer:
paths[name] = pa
}
req.Res <- pathAPIPathsListRes{
Paths: paths,
req.res <- pathAPIPathsListRes{
paths: paths,
}
case <-pm.ctx.Done():
@ -332,52 +332,52 @@ func (pm *pathManager) onPathClose(pa *path) { @@ -332,52 +332,52 @@ func (pm *pathManager) onPathClose(pa *path) {
// onDescribe is called by a reader or publisher.
func (pm *pathManager) onDescribe(req pathDescribeReq) pathDescribeRes {
req.Res = make(chan pathDescribeRes)
req.res = make(chan pathDescribeRes)
select {
case pm.describe <- req:
res := <-req.Res
if res.Err != nil {
res := <-req.res
if res.err != nil {
return res
}
return res.Path.onDescribe(req)
return res.path.onDescribe(req)
case <-pm.ctx.Done():
return pathDescribeRes{Err: fmt.Errorf("terminated")}
return pathDescribeRes{err: fmt.Errorf("terminated")}
}
}
// onPublisherAnnounce is called by a publisher.
func (pm *pathManager) onPublisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
req.Res = make(chan pathPublisherAnnounceRes)
req.res = make(chan pathPublisherAnnounceRes)
select {
case pm.publisherAnnounce <- req:
res := <-req.Res
if res.Err != nil {
res := <-req.res
if res.err != nil {
return res
}
return res.Path.onPublisherAnnounce(req)
return res.path.onPublisherAnnounce(req)
case <-pm.ctx.Done():
return pathPublisherAnnounceRes{Err: fmt.Errorf("terminated")}
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
}
}
// onReaderSetupPlay is called by a reader.
func (pm *pathManager) onReaderSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
req.Res = make(chan pathReaderSetupPlayRes)
req.res = make(chan pathReaderSetupPlayRes)
select {
case pm.readerSetupPlay <- req:
res := <-req.Res
if res.Err != nil {
res := <-req.res
if res.err != nil {
return res
}
return res.Path.onReaderSetupPlay(req)
return res.path.onReaderSetupPlay(req)
case <-pm.ctx.Done():
return pathReaderSetupPlayRes{Err: fmt.Errorf("terminated")}
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}
}
@ -391,22 +391,22 @@ func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) { @@ -391,22 +391,22 @@ func (pm *pathManager) onHLSServerSet(s pathManagerHLSServer) {
// onAPIPathsList is called by api.
func (pm *pathManager) onAPIPathsList(req pathAPIPathsListReq) pathAPIPathsListRes {
req.Res = make(chan pathAPIPathsListRes)
req.res = make(chan pathAPIPathsListRes)
select {
case pm.apiPathsList <- req:
res := <-req.Res
res := <-req.res
res.Data = &pathAPIPathsListData{
res.data = &pathAPIPathsListData{
Items: make(map[string]pathAPIPathsListItem),
}
for _, pa := range res.Paths {
pa.onAPIPathsList(pathAPIPathsListSubReq{Data: res.Data})
for _, pa := range res.paths {
pa.onAPIPathsList(pathAPIPathsListSubReq{data: res.data})
}
return res
case <-pm.ctx.Done():
return pathAPIPathsListRes{Err: fmt.Errorf("terminated")}
return pathAPIPathsListRes{err: fmt.Errorf("terminated")}
}
}

58
internal/core/rtmp_conn.go

@ -220,9 +220,9 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -220,9 +220,9 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: c,
PathName: pathName,
Authenticate: func(
author: c,
pathName: pathName,
authenticate: func(
pathIPs []interface{},
pathUser conf.Credential,
pathPass conf.Credential) error {
@ -230,19 +230,19 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -230,19 +230,19 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
},
})
if res.Err != nil {
if terr, ok := res.Err.(pathErrAuthCritical); ok {
if res.err != nil {
if terr, ok := res.err.(pathErrAuthCritical); ok {
// wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.Message)
return errors.New(terr.message)
}
return res.Err
return res.err
}
c.path = res.Path
c.path = res.path
defer func() {
c.path.onReaderRemove(pathReaderRemoveReq{Author: c})
c.path.onReaderRemove(pathReaderRemoveReq{author: c})
}()
c.stateMutex.Lock()
@ -257,7 +257,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -257,7 +257,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
var audioClockRate int
var aacDecoder *rtpaac.Decoder
for i, t := range res.Stream.tracks() {
for i, t := range res.stream.tracks() {
if t.IsH264() {
if videoTrack != nil {
return fmt.Errorf("can't read track %d with RTMP: too many tracks", i+1)
@ -293,7 +293,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error { @@ -293,7 +293,7 @@ func (c *rtmpConn) runRead(ctx context.Context) error {
}()
c.path.onReaderPlay(pathReaderPlayReq{
Author: c,
author: c,
})
if c.path.Conf().RunOnRead != "" {
@ -465,9 +465,9 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -465,9 +465,9 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
pathName, query := pathNameAndQuery(c.conn.URL())
res := c.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{
Author: c,
PathName: pathName,
Authenticate: func(
author: c,
pathName: pathName,
authenticate: func(
pathIPs []interface{},
pathUser conf.Credential,
pathPass conf.Credential) error {
@ -475,19 +475,19 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -475,19 +475,19 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
},
})
if res.Err != nil {
if terr, ok := res.Err.(pathErrAuthCritical); ok {
if res.err != nil {
if terr, ok := res.err.(pathErrAuthCritical); ok {
// wait some seconds to stop brute force attacks
<-time.After(rtmpConnPauseAfterAuthError)
return errors.New(terr.Message)
return errors.New(terr.message)
}
return res.Err
return res.err
}
c.path = res.Path
c.path = res.path
defer func() {
c.path.onPublisherRemove(pathPublisherRemoveReq{Author: c})
c.path.onPublisherRemove(pathPublisherRemoveReq{author: c})
}()
c.stateMutex.Lock()
@ -498,19 +498,19 @@ func (c *rtmpConn) runPublish(ctx context.Context) error { @@ -498,19 +498,19 @@ func (c *rtmpConn) runPublish(ctx context.Context) error {
c.conn.SetWriteDeadline(time.Time{})
rres := c.path.onPublisherRecord(pathPublisherRecordReq{
Author: c,
Tracks: tracks,
author: c,
tracks: tracks,
})
if rres.Err != nil {
return rres.Err
if rres.err != nil {
return rres.err
}
rtcpSenders := rtcpsenderset.New(tracks, rres.Stream.onPacketRTCP)
rtcpSenders := rtcpsenderset.New(tracks, rres.stream.onPacketRTCP)
defer rtcpSenders.Close()
onPacketRTP := func(trackID int, payload []byte) {
rtcpSenders.OnPacketRTP(trackID, payload)
rres.Stream.onPacketRTP(trackID, payload)
rres.stream.onPacketRTP(trackID, payload)
}
for {
@ -610,7 +610,7 @@ func (c *rtmpConn) authenticate( @@ -610,7 +610,7 @@ func (c *rtmpConn) authenticate(
action)
if err != nil {
return pathErrAuthCritical{
Message: fmt.Sprintf("external authentication failed: %s", err),
message: fmt.Sprintf("external authentication failed: %s", err),
}
}
}
@ -619,7 +619,7 @@ func (c *rtmpConn) authenticate( @@ -619,7 +619,7 @@ func (c *rtmpConn) authenticate(
ip := c.ip()
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
Message: fmt.Sprintf("IP '%s' not allowed", ip),
message: fmt.Sprintf("IP '%s' not allowed", ip),
}
}
}
@ -628,7 +628,7 @@ func (c *rtmpConn) authenticate( @@ -628,7 +628,7 @@ func (c *rtmpConn) authenticate(
if query.Get("user") != string(pathUser) ||
query.Get("pass") != string(pathPass) {
return pathErrAuthCritical{
Message: "invalid credentials",
message: "invalid credentials",
}
}
}

32
internal/core/rtmp_server.go

@ -26,21 +26,21 @@ type rtmpServerAPIConnsListData struct { @@ -26,21 +26,21 @@ type rtmpServerAPIConnsListData struct {
}
type rtmpServerAPIConnsListRes struct {
Data *rtmpServerAPIConnsListData
Err error
data *rtmpServerAPIConnsListData
err error
}
type rtmpServerAPIConnsListReq struct {
Res chan rtmpServerAPIConnsListRes
res chan rtmpServerAPIConnsListRes
}
type rtmpServerAPIConnsKickRes struct {
Err error
err error
}
type rtmpServerAPIConnsKickReq struct {
ID string
Res chan rtmpServerAPIConnsKickRes
id string
res chan rtmpServerAPIConnsKickRes
}
type rtmpServerParent interface {
@ -219,12 +219,12 @@ outer: @@ -219,12 +219,12 @@ outer:
}
}
req.Res <- rtmpServerAPIConnsListRes{Data: data}
req.res <- rtmpServerAPIConnsListRes{data: data}
case req := <-s.apiConnsKick:
res := func() bool {
for c := range s.conns {
if c.ID() == req.ID {
if c.ID() == req.id {
delete(s.conns, c)
c.close()
return true
@ -233,9 +233,9 @@ outer: @@ -233,9 +233,9 @@ outer:
return false
}()
if res {
req.Res <- rtmpServerAPIConnsKickRes{}
req.res <- rtmpServerAPIConnsKickRes{}
} else {
req.Res <- rtmpServerAPIConnsKickRes{fmt.Errorf("not found")}
req.res <- rtmpServerAPIConnsKickRes{fmt.Errorf("not found")}
}
case <-s.ctx.Done():
@ -290,24 +290,24 @@ func (s *rtmpServer) onConnClose(c *rtmpConn) { @@ -290,24 +290,24 @@ func (s *rtmpServer) onConnClose(c *rtmpConn) {
// onAPIConnsList is called by api.
func (s *rtmpServer) onAPIConnsList(req rtmpServerAPIConnsListReq) rtmpServerAPIConnsListRes {
req.Res = make(chan rtmpServerAPIConnsListRes)
req.res = make(chan rtmpServerAPIConnsListRes)
select {
case s.apiConnsList <- req:
return <-req.Res
return <-req.res
case <-s.ctx.Done():
return rtmpServerAPIConnsListRes{Err: fmt.Errorf("terminated")}
return rtmpServerAPIConnsListRes{err: fmt.Errorf("terminated")}
}
}
// onAPIConnsKick is called by api.
func (s *rtmpServer) onAPIConnsKick(req rtmpServerAPIConnsKickReq) rtmpServerAPIConnsKickRes {
req.Res = make(chan rtmpServerAPIConnsKickRes)
req.res = make(chan rtmpServerAPIConnsKickRes)
select {
case s.apiConnsKick <- req:
return <-req.Res
return <-req.res
case <-s.ctx.Done():
return rtmpServerAPIConnsKickRes{Err: fmt.Errorf("terminated")}
return rtmpServerAPIConnsKickRes{err: fmt.Errorf("terminated")}
}
}

16
internal/core/rtmp_source.go

@ -25,7 +25,7 @@ const ( @@ -25,7 +25,7 @@ const (
type rtmpSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtmpSource struct {
@ -149,25 +149,25 @@ func (s *rtmpSource) runInner() bool { @@ -149,25 +149,25 @@ func (s *rtmpSource) runInner() bool {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s,
Tracks: tracks,
source: s,
tracks: tracks,
})
if res.Err != nil {
return res.Err
if res.err != nil {
return res.err
}
s.log(logger.Info, "ready")
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
}()
rtcpSenders := rtcpsenderset.New(tracks, res.Stream.onPacketRTCP)
rtcpSenders := rtcpsenderset.New(tracks, res.stream.onPacketRTCP)
defer rtcpSenders.Close()
onPacketRTP := func(trackID int, payload []byte) {
rtcpSenders.OnPacketRTP(trackID, payload)
res.Stream.onPacketRTP(trackID, payload)
res.stream.onPacketRTP(trackID, payload)
}
for {

44
internal/core/rtsp_conn.go

@ -138,8 +138,8 @@ func (c *rtspConn) authenticate( @@ -138,8 +138,8 @@ func (c *rtspConn) authenticate(
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
return pathErrAuthCritical{
Message: "unauthorized: " + err.Error(),
Response: &base.Response{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
@ -147,8 +147,8 @@ func (c *rtspConn) authenticate( @@ -147,8 +147,8 @@ func (c *rtspConn) authenticate(
v := "IPCAM"
return pathErrAuthNotCritical{
Message: "unauthorized: " + err.Error(),
Response: &base.Response{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": headers.Authenticate{
@ -165,8 +165,8 @@ func (c *rtspConn) authenticate( @@ -165,8 +165,8 @@ func (c *rtspConn) authenticate(
ip := c.ip()
if !ipEqualOrInRange(ip, pathIPs) {
return pathErrAuthCritical{
Message: fmt.Sprintf("IP '%s' not allowed", ip),
Response: &base.Response{
message: fmt.Sprintf("IP '%s' not allowed", ip),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
@ -193,15 +193,15 @@ func (c *rtspConn) authenticate( @@ -193,15 +193,15 @@ func (c *rtspConn) authenticate(
// therefore we must allow up to 3 failures
if c.authFailures > 3 {
return pathErrAuthCritical{
Message: "unauthorized: " + err.Error(),
Response: &base.Response{
message: "unauthorized: " + err.Error(),
response: &base.Response{
StatusCode: base.StatusUnauthorized,
},
}
}
return pathErrAuthNotCritical{
Response: &base.Response{
response: &base.Response{
StatusCode: base.StatusUnauthorized,
Header: base.Header{
"WWW-Authenticate": c.authValidator.Header(),
@ -241,9 +241,9 @@ func (c *rtspConn) OnResponse(res *base.Response) { @@ -241,9 +241,9 @@ func (c *rtspConn) OnResponse(res *base.Response) {
func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
) (*base.Response, *gortsplib.ServerStream, error) {
res := c.pathManager.onDescribe(pathDescribeReq{
PathName: ctx.Path,
URL: ctx.Req.URL,
Authenticate: func(
pathName: ctx.Path,
url: ctx.Req.URL,
authenticate: func(
pathIPs []interface{},
pathUser conf.Credential,
pathPass conf.Credential) error {
@ -251,40 +251,40 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -251,40 +251,40 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
},
})
if res.Err != nil {
switch terr := res.Err.(type) {
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
c.log(logger.Debug, "non-critical authentication error: %s", terr.Message)
return terr.Response, nil, nil
c.log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(rtspConnPauseAfterAuthError)
return terr.Response, nil, errors.New(terr.Message)
return terr.response, nil, errors.New(terr.message)
case pathErrNoOnePublishing:
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, res.Err
}, nil, res.err
default:
return &base.Response{
StatusCode: base.StatusBadRequest,
}, nil, res.Err
}, nil, res.err
}
}
if res.Redirect != "" {
if res.redirect != "" {
return &base.Response{
StatusCode: base.StatusMovedPermanently,
Header: base.Header{
"Location": base.HeaderValue{res.Redirect},
"Location": base.HeaderValue{res.redirect},
},
}, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, res.Stream.rtspStream, nil
}, res.stream.rtspStream, nil
}

18
internal/core/rtsp_server.go

@ -31,18 +31,18 @@ type rtspServerAPISessionsListData struct { @@ -31,18 +31,18 @@ type rtspServerAPISessionsListData struct {
}
type rtspServerAPISessionsListRes struct {
Data *rtspServerAPISessionsListData
Err error
data *rtspServerAPISessionsListData
err error
}
type rtspServerAPISessionsListReq struct{}
type rtspServerAPISessionsKickRes struct {
Err error
err error
}
type rtspServerAPISessionsKickReq struct {
ID string
id string
}
type rtspServerParent interface {
@ -405,7 +405,7 @@ func (s *rtspServer) OnPacketRTCP(ctx *gortsplib.ServerHandlerOnPacketRTCPCtx) { @@ -405,7 +405,7 @@ func (s *rtspServer) OnPacketRTCP(ctx *gortsplib.ServerHandlerOnPacketRTCPCtx) {
func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspServerAPISessionsListRes {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsListRes{Err: fmt.Errorf("terminated")}
return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")}
default:
}
@ -434,14 +434,14 @@ func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspSer @@ -434,14 +434,14 @@ func (s *rtspServer) onAPISessionsList(req rtspServerAPISessionsListReq) rtspSer
}
}
return rtspServerAPISessionsListRes{Data: data}
return rtspServerAPISessionsListRes{data: data}
}
// onAPISessionsKick is called by api.
func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspServerAPISessionsKickRes {
select {
case <-s.ctx.Done():
return rtspServerAPISessionsKickRes{Err: fmt.Errorf("terminated")}
return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")}
default:
}
@ -449,7 +449,7 @@ func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspSer @@ -449,7 +449,7 @@ func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspSer
defer s.mutex.RUnlock()
for key, se := range s.sessions {
if se.ID() == req.ID {
if se.ID() == req.id {
se.close()
delete(s.sessions, key)
se.onClose(liberrors.ErrServerTerminated{})
@ -457,5 +457,5 @@ func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspSer @@ -457,5 +457,5 @@ func (s *rtspServer) onAPISessionsKick(req rtspServerAPISessionsKickReq) rtspSer
}
}
return rtspServerAPISessionsKickRes{Err: fmt.Errorf("not found")}
return rtspServerAPISessionsKickRes{err: fmt.Errorf("not found")}
}

68
internal/core/rtsp_session.go

@ -112,11 +112,11 @@ func (s *rtspSession) onClose(err error) { @@ -112,11 +112,11 @@ func (s *rtspSession) onClose(err error) {
switch s.ss.State() {
case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead:
s.path.onReaderRemove(pathReaderRemoveReq{Author: s})
s.path.onReaderRemove(pathReaderRemoveReq{author: s})
s.path = nil
case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish:
s.path.onPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path.onPublisherRemove(pathPublisherRemoveReq{author: s})
s.path = nil
}
@ -155,9 +155,9 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -155,9 +155,9 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
}
res := s.pathManager.onPublisherAnnounce(pathPublisherAnnounceReq{
Author: s,
PathName: ctx.Path,
Authenticate: func(
author: s,
pathName: ctx.Path,
authenticate: func(
pathIPs []interface{},
pathUser conf.Credential,
pathPass conf.Credential) error {
@ -165,26 +165,26 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno @@ -165,26 +165,26 @@ func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnno
},
})
if res.Err != nil {
switch terr := res.Err.(type) {
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
s.log(logger.Debug, "non-critical authentication error: %s", terr.Message)
return terr.Response, nil
s.log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError)
return terr.Response, errors.New(terr.Message)
return terr.response, errors.New(terr.message)
default:
return &base.Response{
StatusCode: base.StatusBadRequest,
}, res.Err
}, res.err
}
}
s.path = res.Path
s.path = res.path
s.announcedTracks = ctx.Tracks
s.stateMutex.Lock()
@ -214,9 +214,9 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -214,9 +214,9 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePreRead: // play
res := s.pathManager.onReaderSetupPlay(pathReaderSetupPlayReq{
Author: s,
PathName: ctx.Path,
Authenticate: func(
author: s,
pathName: ctx.Path,
authenticate: func(
pathIPs []interface{},
pathUser conf.Credential,
pathPass conf.Credential) error {
@ -224,33 +224,33 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -224,33 +224,33 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
},
})
if res.Err != nil {
switch terr := res.Err.(type) {
if res.err != nil {
switch terr := res.err.(type) {
case pathErrAuthNotCritical:
s.log(logger.Debug, "non-critical authentication error: %s", terr.Message)
return terr.Response, nil, nil
s.log(logger.Debug, "non-critical authentication error: %s", terr.message)
return terr.response, nil, nil
case pathErrAuthCritical:
// wait some seconds to stop brute force attacks
<-time.After(pauseAfterAuthError)
return terr.Response, nil, errors.New(terr.Message)
return terr.response, nil, errors.New(terr.message)
case pathErrNoOnePublishing:
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, res.Err
}, nil, res.err
default:
return &base.Response{
StatusCode: base.StatusBadRequest,
}, nil, res.Err
}, nil, res.err
}
}
s.path = res.Path
s.path = res.path
if ctx.TrackID >= len(res.Stream.tracks()) {
if ctx.TrackID >= len(res.stream.tracks()) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, nil, fmt.Errorf("track %d does not exist", ctx.TrackID)
@ -259,7 +259,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -259,7 +259,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
if s.setuppedTracks == nil {
s.setuppedTracks = make(map[int]*gortsplib.Track)
}
s.setuppedTracks[ctx.TrackID] = res.Stream.tracks()[ctx.TrackID]
s.setuppedTracks[ctx.TrackID] = res.stream.tracks()[ctx.TrackID]
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRead
@ -267,7 +267,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -267,7 +267,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
return &base.Response{
StatusCode: base.StatusOK,
}, res.Stream.rtspStream, nil
}, res.stream.rtspStream, nil
default: // record
return &base.Response{
@ -281,7 +281,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -281,7 +281,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePreRead {
s.path.onReaderPlay(pathReaderPlayReq{Author: s})
s.path.onReaderPlay(pathReaderPlayReq{author: s})
if s.path.Conf().RunOnRead != "" {
s.log(logger.Info, "runOnRead command started")
@ -309,16 +309,16 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -309,16 +309,16 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
// onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.onPublisherRecord(pathPublisherRecordReq{
Author: s,
Tracks: s.announcedTracks,
author: s,
tracks: s.announcedTracks,
})
if res.Err != nil {
if res.err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, res.Err
}, res.err
}
s.stream = res.Stream
s.stream = res.stream
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePublish
@ -338,14 +338,14 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res @@ -338,14 +338,14 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
s.onReadCmd.Close()
}
s.path.onReaderPause(pathReaderPauseReq{Author: s})
s.path.onReaderPause(pathReaderPauseReq{author: s})
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRead
s.stateMutex.Unlock()
case gortsplib.ServerSessionStatePublish:
s.path.onPublisherPause(pathPublisherPauseReq{Author: s})
s.path.onPublisherPause(pathPublisherPauseReq{author: s})
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePublish

28
internal/core/rtsp_source.go

@ -27,7 +27,7 @@ const ( @@ -27,7 +27,7 @@ const (
type rtspSourceParent interface {
log(logger.Level, string, ...interface{})
onSourceStaticSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
OnSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
onSourceStaticSetNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtspSource struct {
@ -195,25 +195,25 @@ func (s *rtspSource) runInner() bool { @@ -195,25 +195,25 @@ func (s *rtspSource) runInner() bool {
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s,
Tracks: c.Tracks(),
source: s,
tracks: c.Tracks(),
})
if res.Err != nil {
return res.Err
if res.err != nil {
return res.err
}
s.log(logger.Info, "ready")
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
}()
c.OnPacketRTP = func(trackID int, payload []byte) {
res.Stream.onPacketRTP(trackID, payload)
res.stream.onPacketRTP(trackID, payload)
}
c.OnPacketRTCP = func(trackID int, payload []byte) {
res.Stream.onPacketRTCP(trackID, payload)
res.stream.onPacketRTCP(trackID, payload)
}
_, err = c.Play(nil)
@ -358,23 +358,23 @@ func (s *rtspSource) handleMissingH264Params(c *gortsplib.Client, tracks gortspl @@ -358,23 +358,23 @@ func (s *rtspSource) handleMissingH264Params(c *gortsplib.Client, tracks gortspl
tracks[h264TrackID] = track
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s,
Tracks: tracks,
source: s,
tracks: tracks,
})
if res.Err != nil {
return res.Err
if res.err != nil {
return res.err
}
func() {
streamMutex.Lock()
defer streamMutex.Unlock()
stream = res.Stream
stream = res.stream
}()
s.log(logger.Info, "ready")
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
s.parent.onSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{source: s})
}()
}

Loading…
Cancel
Save