Browse Source

rename interaction functions between paths and readers / publishers

readers and publishers can now interact with paths by using:

readerAdd()
readerStart()
readerStop()

publisherAdd()
publisherStart()
publisherStop()
pull/1128/head
aler9 3 years ago
parent
commit
83faae8a8e
  1. 6
      internal/core/hls_muxer.go
  2. 112
      internal/core/path.go
  3. 24
      internal/core/path_manager.go
  4. 12
      internal/core/rtmp_conn.go
  5. 16
      internal/core/rtsp_session.go

6
internal/core/hls_muxer.go

@ -103,7 +103,7 @@ type hlsMuxerRequest struct { @@ -103,7 +103,7 @@ type hlsMuxerRequest struct {
}
type hlsMuxerPathManager interface {
readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes
}
type hlsMuxerParent interface {
@ -276,7 +276,7 @@ func (m *hlsMuxer) run() { @@ -276,7 +276,7 @@ func (m *hlsMuxer) run() {
}
func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) error {
res := m.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
res := m.pathManager.readerAdd(pathReaderAddReq{
author: m,
pathName: m.pathName,
authenticate: nil,
@ -347,7 +347,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -347,7 +347,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.ringBuffer, _ = ringbuffer.New(uint64(m.readBufferCount))
m.path.readerPlay(pathReaderPlayReq{author: m})
m.path.readerStart(pathReaderStartReq{author: m})
m.log(logger.Info, "is converting into HLS, %s",
sourceTrackInfo(res.stream.tracks()))

112
internal/core/path.go

@ -131,7 +131,7 @@ type pathReaderSetupPlayRes struct { @@ -131,7 +131,7 @@ type pathReaderSetupPlayRes struct {
err error
}
type pathReaderSetupPlayReq struct {
type pathReaderAddReq struct {
author reader
pathName string
authenticate authenticateFunc
@ -143,14 +143,14 @@ type pathPublisherAnnounceRes struct { @@ -143,14 +143,14 @@ type pathPublisherAnnounceRes struct {
err error
}
type pathPublisherAnnounceReq struct {
type pathPublisherAddReq struct {
author publisher
pathName string
authenticate authenticateFunc
res chan pathPublisherAnnounceRes
}
type pathReaderPlayReq struct {
type pathReaderStartReq struct {
author reader
res chan struct{}
}
@ -160,19 +160,19 @@ type pathPublisherRecordRes struct { @@ -160,19 +160,19 @@ type pathPublisherRecordRes struct {
err error
}
type pathPublisherRecordReq struct {
type pathPublisherStartReq struct {
author publisher
tracks gortsplib.Tracks
generateRTPPackets bool
res chan pathPublisherRecordRes
}
type pathReaderPauseReq struct {
type pathReaderStopReq struct {
author reader
res chan struct{}
}
type pathPublisherPauseReq struct {
type pathPublisherStopReq struct {
author publisher
res chan struct{}
}
@ -224,7 +224,7 @@ type path struct { @@ -224,7 +224,7 @@ type path struct {
stream *stream
readers map[reader]pathReaderState
describeRequestsOnHold []pathDescribeReq
setupPlayRequestsOnHold []pathReaderSetupPlayReq
readerAddRequestsOnHold []pathReaderAddReq
onDemandCmd *externalcmd.Cmd
onReadyCmd *externalcmd.Cmd
onDemandStaticSourceState pathOnDemandState
@ -239,13 +239,13 @@ type path struct { @@ -239,13 +239,13 @@ type path struct {
chSourceStaticSetNotReady chan pathSourceStaticSetNotReadyReq
chDescribe chan pathDescribeReq
chPublisherRemove chan pathPublisherRemoveReq
chPublisherAnnounce chan pathPublisherAnnounceReq
chPublisherRecord chan pathPublisherRecordReq
chPublisherPause chan pathPublisherPauseReq
chPublisherAdd chan pathPublisherAddReq
chPublisherStart chan pathPublisherStartReq
chPublisherStop chan pathPublisherStopReq
chReaderRemove chan pathReaderRemoveReq
chReaderSetupPlay chan pathReaderSetupPlayReq
chReaderPlay chan pathReaderPlayReq
chReaderPause chan pathReaderPauseReq
chReaderAdd chan pathReaderAddReq
chReaderStart chan pathReaderStartReq
chReaderStop chan pathReaderStopReq
chAPIPathsList chan pathAPIPathsListSubReq
}
@ -288,13 +288,13 @@ func newPath( @@ -288,13 +288,13 @@ func newPath(
chSourceStaticSetNotReady: make(chan pathSourceStaticSetNotReadyReq),
chDescribe: make(chan pathDescribeReq),
chPublisherRemove: make(chan pathPublisherRemoveReq),
chPublisherAnnounce: make(chan pathPublisherAnnounceReq),
chPublisherRecord: make(chan pathPublisherRecordReq),
chPublisherPause: make(chan pathPublisherPauseReq),
chPublisherAdd: make(chan pathPublisherAddReq),
chPublisherStart: make(chan pathPublisherStartReq),
chPublisherStop: make(chan pathPublisherStopReq),
chReaderRemove: make(chan pathReaderRemoveReq),
chReaderSetupPlay: make(chan pathReaderSetupPlayReq),
chReaderPlay: make(chan pathReaderPlayReq),
chReaderPause: make(chan pathReaderPauseReq),
chReaderAdd: make(chan pathReaderAddReq),
chReaderStart: make(chan pathReaderStartReq),
chReaderStop: make(chan pathReaderStopReq),
chAPIPathsList: make(chan pathAPIPathsListSubReq),
}
@ -388,10 +388,10 @@ func (pa *path) run() { @@ -388,10 +388,10 @@ func (pa *path) run() {
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequestsOnHold = nil
pa.readerAddRequestsOnHold = nil
pa.onDemandStaticSourceStop()
@ -413,10 +413,10 @@ func (pa *path) run() { @@ -413,10 +413,10 @@ func (pa *path) run() {
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.setupPlayRequestsOnHold = nil
pa.readerAddRequestsOnHold = nil
pa.onDemandPublisherStop()
@ -449,10 +449,10 @@ func (pa *path) run() { @@ -449,10 +449,10 @@ func (pa *path) run() {
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
for _, req := range pa.readerAddRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
pa.readerAddRequestsOnHold = nil
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
@ -487,13 +487,13 @@ func (pa *path) run() { @@ -487,13 +487,13 @@ func (pa *path) run() {
return fmt.Errorf("not in use")
}
case req := <-pa.chPublisherAnnounce:
case req := <-pa.chPublisherAdd:
pa.handlePublisherAnnounce(req)
case req := <-pa.chPublisherRecord:
case req := <-pa.chPublisherStart:
pa.handlePublisherRecord(req)
case req := <-pa.chPublisherPause:
case req := <-pa.chPublisherStop:
pa.handlePublisherPause(req)
if pa.shouldClose() {
@ -503,17 +503,17 @@ func (pa *path) run() { @@ -503,17 +503,17 @@ func (pa *path) run() {
case req := <-pa.chReaderRemove:
pa.handleReaderRemove(req)
case req := <-pa.chReaderSetupPlay:
case req := <-pa.chReaderAdd:
pa.handleReaderSetupPlay(req)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chReaderPlay:
case req := <-pa.chReaderStart:
pa.handleReaderPlay(req)
case req := <-pa.chReaderPause:
case req := <-pa.chReaderStop:
pa.handleReaderPause(req)
case req := <-pa.chAPIPathsList:
@ -541,7 +541,7 @@ func (pa *path) run() { @@ -541,7 +541,7 @@ func (pa *path) run() {
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
}
for _, req := range pa.setupPlayRequestsOnHold {
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
}
@ -572,7 +572,7 @@ func (pa *path) shouldClose() bool { @@ -572,7 +572,7 @@ func (pa *path) shouldClose() bool {
pa.source == nil &&
len(pa.readers) == 0 &&
len(pa.describeRequestsOnHold) == 0 &&
len(pa.setupPlayRequestsOnHold) == 0
len(pa.readerAddRequestsOnHold) == 0
}
func (pa *path) externalCmdEnv() externalcmd.Environment {
@ -788,7 +788,7 @@ func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) { @@ -788,7 +788,7 @@ func (pa *path) handlePublisherRemove(req pathPublisherRemoveReq) {
close(req.res)
}
func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
func (pa *path) handlePublisherAnnounce(req pathPublisherAddReq) {
if pa.conf.Source != "publisher" {
req.res <- pathPublisherAnnounceRes{
err: fmt.Errorf("can't publish to path '%s' since 'source' is not 'publisher'", pa.name),
@ -812,7 +812,7 @@ func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) { @@ -812,7 +812,7 @@ func (pa *path) handlePublisherAnnounce(req pathPublisherAnnounceReq) {
req.res <- pathPublisherAnnounceRes{path: pa}
}
func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
func (pa *path) handlePublisherRecord(req pathPublisherStartReq) {
if pa.source != req.author {
req.res <- pathPublisherRecordRes{err: fmt.Errorf("publisher is not assigned to this path anymore")}
return
@ -837,16 +837,16 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) { @@ -837,16 +837,16 @@ func (pa *path) handlePublisherRecord(req pathPublisherRecordReq) {
}
pa.describeRequestsOnHold = nil
for _, req := range pa.setupPlayRequestsOnHold {
for _, req := range pa.readerAddRequestsOnHold {
pa.handleReaderSetupPlayPost(req)
}
pa.setupPlayRequestsOnHold = nil
pa.readerAddRequestsOnHold = nil
}
req.res <- pathPublisherRecordRes{stream: pa.stream}
}
func (pa *path) handlePublisherPause(req pathPublisherPauseReq) {
func (pa *path) handlePublisherPause(req pathPublisherStopReq) {
if req.author == pa.source && pa.stream != nil {
if pa.hasOnDemandPublisher() && pa.onDemandPublisherState != pathOnDemandStateInitial {
pa.onDemandPublisherStop()
@ -876,7 +876,7 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) { @@ -876,7 +876,7 @@ func (pa *path) handleReaderRemove(req pathReaderRemoveReq) {
}
}
func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
func (pa *path) handleReaderSetupPlay(req pathReaderAddReq) {
if pa.stream != nil {
pa.handleReaderSetupPlayPost(req)
return
@ -886,7 +886,7 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) { @@ -886,7 +886,7 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
if pa.onDemandStaticSourceState == pathOnDemandStateInitial {
pa.onDemandStaticSourceStart()
}
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
return
}
@ -894,14 +894,14 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) { @@ -894,14 +894,14 @@ func (pa *path) handleReaderSetupPlay(req pathReaderSetupPlayReq) {
if pa.onDemandPublisherState == pathOnDemandStateInitial {
pa.onDemandPublisherStart()
}
pa.setupPlayRequestsOnHold = append(pa.setupPlayRequestsOnHold, req)
pa.readerAddRequestsOnHold = append(pa.readerAddRequestsOnHold, req)
return
}
req.res <- pathReaderSetupPlayRes{err: pathErrNoOnePublishing{pathName: pa.name}}
}
func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
func (pa *path) handleReaderSetupPlayPost(req pathReaderAddReq) {
pa.readers[req.author] = pathReaderStatePrePlay
if pa.hasOnDemandStaticSource() {
@ -924,7 +924,7 @@ func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) { @@ -924,7 +924,7 @@ func (pa *path) handleReaderSetupPlayPost(req pathReaderSetupPlayReq) {
}
}
func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
func (pa *path) handleReaderPlay(req pathReaderStartReq) {
pa.readers[req.author] = pathReaderStatePlay
pa.stream.readerAdd(req.author)
@ -932,7 +932,7 @@ func (pa *path) handleReaderPlay(req pathReaderPlayReq) { @@ -932,7 +932,7 @@ func (pa *path) handleReaderPlay(req pathReaderPlayReq) {
close(req.res)
}
func (pa *path) handleReaderPause(req pathReaderPauseReq) {
func (pa *path) handleReaderPause(req pathReaderStopReq) {
if state, ok := pa.readers[req.author]; ok && state == pathReaderStatePlay {
pa.readers[req.author] = pathReaderStatePrePlay
pa.stream.readerRemove(req.author)
@ -1021,9 +1021,9 @@ func (pa *path) publisherRemove(req pathPublisherRemoveReq) { @@ -1021,9 +1021,9 @@ func (pa *path) publisherRemove(req pathPublisherRemoveReq) {
}
// publisherAnnounce is called by a publisher through pathManager.
func (pa *path) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
func (pa *path) publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes {
select {
case pa.chPublisherAnnounce <- req:
case pa.chPublisherAdd <- req:
return <-req.res
case <-pa.ctx.Done():
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
@ -1031,10 +1031,10 @@ func (pa *path) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnn @@ -1031,10 +1031,10 @@ func (pa *path) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnn
}
// publisherRecord is called by a publisher.
func (pa *path) publisherRecord(req pathPublisherRecordReq) pathPublisherRecordRes {
func (pa *path) publisherStart(req pathPublisherStartReq) pathPublisherRecordRes {
req.res = make(chan pathPublisherRecordRes)
select {
case pa.chPublisherRecord <- req:
case pa.chPublisherStart <- req:
return <-req.res
case <-pa.ctx.Done():
return pathPublisherRecordRes{err: fmt.Errorf("terminated")}
@ -1042,10 +1042,10 @@ func (pa *path) publisherRecord(req pathPublisherRecordReq) pathPublisherRecordR @@ -1042,10 +1042,10 @@ func (pa *path) publisherRecord(req pathPublisherRecordReq) pathPublisherRecordR
}
// publisherPause is called by a publisher.
func (pa *path) publisherPause(req pathPublisherPauseReq) {
func (pa *path) publisherStop(req pathPublisherStopReq) {
req.res = make(chan struct{})
select {
case pa.chPublisherPause <- req:
case pa.chPublisherStop <- req:
<-req.res
case <-pa.ctx.Done():
}
@ -1062,9 +1062,9 @@ func (pa *path) readerRemove(req pathReaderRemoveReq) { @@ -1062,9 +1062,9 @@ func (pa *path) readerRemove(req pathReaderRemoveReq) {
}
// readerSetupPlay is called by a reader through pathManager.
func (pa *path) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
func (pa *path) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes {
select {
case pa.chReaderSetupPlay <- req:
case pa.chReaderAdd <- req:
return <-req.res
case <-pa.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}
@ -1072,20 +1072,20 @@ func (pa *path) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayR @@ -1072,20 +1072,20 @@ func (pa *path) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayR
}
// readerPlay is called by a reader.
func (pa *path) readerPlay(req pathReaderPlayReq) {
func (pa *path) readerStart(req pathReaderStartReq) {
req.res = make(chan struct{})
select {
case pa.chReaderPlay <- req:
case pa.chReaderStart <- req:
<-req.res
case <-pa.ctx.Done():
}
}
// readerPause is called by a reader.
func (pa *path) readerPause(req pathReaderPauseReq) {
func (pa *path) readerStop(req pathReaderStopReq) {
req.res = make(chan struct{})
select {
case pa.chReaderPause <- req:
case pa.chReaderStop <- req:
<-req.res
case <-pa.ctx.Done():
}

24
internal/core/path_manager.go

@ -41,8 +41,8 @@ type pathManager struct { @@ -41,8 +41,8 @@ type pathManager struct {
chPathSourceReady chan *path
chPathSourceNotReady chan *path
chDescribe chan pathDescribeReq
chReaderSetupPlay chan pathReaderSetupPlayReq
chPublisherAnnounce chan pathPublisherAnnounceReq
chReaderAdd chan pathReaderAddReq
chPublisherAdd chan pathPublisherAddReq
chHLSServerSet chan pathManagerHLSServer
chAPIPathsList chan pathAPIPathsListReq
}
@ -77,8 +77,8 @@ func newPathManager( @@ -77,8 +77,8 @@ func newPathManager(
chPathSourceReady: make(chan *path),
chPathSourceNotReady: make(chan *path),
chDescribe: make(chan pathDescribeReq),
chReaderSetupPlay: make(chan pathReaderSetupPlayReq),
chPublisherAnnounce: make(chan pathPublisherAnnounceReq),
chReaderAdd: make(chan pathReaderAddReq),
chPublisherAdd: make(chan pathPublisherAddReq),
chHLSServerSet: make(chan pathManagerHLSServer),
chAPIPathsList: make(chan pathAPIPathsListReq),
}
@ -196,7 +196,7 @@ outer: @@ -196,7 +196,7 @@ outer:
req.res <- pathDescribeRes{path: pm.paths[req.pathName]}
case req := <-pm.chReaderSetupPlay:
case req := <-pm.chReaderAdd:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
req.res <- pathReaderSetupPlayRes{err: err}
@ -221,7 +221,7 @@ outer: @@ -221,7 +221,7 @@ outer:
req.res <- pathReaderSetupPlayRes{path: pm.paths[req.pathName]}
case req := <-pm.chPublisherAnnounce:
case req := <-pm.chPublisherAdd:
pathConfName, pathConf, pathMatches, err := pm.findPathConf(req.pathName)
if err != nil {
req.res <- pathPublisherAnnounceRes{err: err}
@ -365,16 +365,16 @@ func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes { @@ -365,16 +365,16 @@ func (pm *pathManager) describe(req pathDescribeReq) pathDescribeRes {
}
// publisherAnnounce is called by a publisher.
func (pm *pathManager) publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes {
func (pm *pathManager) publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes {
req.res = make(chan pathPublisherAnnounceRes)
select {
case pm.chPublisherAnnounce <- req:
case pm.chPublisherAdd <- req:
res := <-req.res
if res.err != nil {
return res
}
return res.path.publisherAnnounce(req)
return res.path.publisherAdd(req)
case <-pm.ctx.Done():
return pathPublisherAnnounceRes{err: fmt.Errorf("terminated")}
@ -382,16 +382,16 @@ func (pm *pathManager) publisherAnnounce(req pathPublisherAnnounceReq) pathPubli @@ -382,16 +382,16 @@ func (pm *pathManager) publisherAnnounce(req pathPublisherAnnounceReq) pathPubli
}
// readerSetupPlay is called by a reader.
func (pm *pathManager) readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes {
func (pm *pathManager) readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes {
req.res = make(chan pathReaderSetupPlayRes)
select {
case pm.chReaderSetupPlay <- req:
case pm.chReaderAdd <- req:
res := <-req.res
if res.err != nil {
return res
}
return res.path.readerSetupPlay(req)
return res.path.readerAdd(req)
case <-pm.ctx.Done():
return pathReaderSetupPlayRes{err: fmt.Errorf("terminated")}

12
internal/core/rtmp_conn.go

@ -46,8 +46,8 @@ const ( @@ -46,8 +46,8 @@ const (
)
type rtmpConnPathManager interface {
readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes
publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes
}
type rtmpConnParent interface {
@ -220,7 +220,7 @@ func (c *rtmpConn) runInner(ctx context.Context) error { @@ -220,7 +220,7 @@ func (c *rtmpConn) runInner(ctx context.Context) error {
func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
res := c.pathManager.readerAdd(pathReaderAddReq{
author: c,
pathName: pathName,
authenticate: func(
@ -294,7 +294,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -294,7 +294,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
c.ringBuffer.Close()
}()
c.path.readerPlay(pathReaderPlayReq{
c.path.readerStart(pathReaderStartReq{
author: c,
})
@ -483,7 +483,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -483,7 +483,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
pathName, query, rawQuery := pathNameAndQuery(u)
res := c.pathManager.publisherAnnounce(pathPublisherAnnounceReq{
res := c.pathManager.publisherAdd(pathPublisherAddReq{
author: c,
pathName: pathName,
authenticate: func(
@ -514,7 +514,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { @@ -514,7 +514,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error {
c.state = rtmpConnStatePublish
c.stateMutex.Unlock()
rres := c.path.publisherRecord(pathPublisherRecordReq{
rres := c.path.publisherStart(pathPublisherStartReq{
author: c,
tracks: tracks,
generateRTPPackets: true,

16
internal/core/rtsp_session.go

@ -20,8 +20,8 @@ const ( @@ -20,8 +20,8 @@ const (
)
type rtspSessionPathManager interface {
publisherAnnounce(req pathPublisherAnnounceReq) pathPublisherAnnounceRes
readerSetupPlay(req pathReaderSetupPlayReq) pathReaderSetupPlayRes
publisherAdd(req pathPublisherAddReq) pathPublisherAnnounceRes
readerAdd(req pathReaderAddReq) pathReaderSetupPlayRes
}
type rtspSessionParent interface {
@ -122,7 +122,7 @@ func (s *rtspSession) onClose(err error) { @@ -122,7 +122,7 @@ func (s *rtspSession) onClose(err error) {
// onAnnounce is called by rtspServer.
func (s *rtspSession) onAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
res := s.pathManager.publisherAnnounce(pathPublisherAnnounceReq{
res := s.pathManager.publisherAdd(pathPublisherAddReq{
author: s,
pathName: ctx.Path,
authenticate: func(
@ -182,7 +182,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -182,7 +182,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
switch s.ss.State() {
case gortsplib.ServerSessionStateInitial, gortsplib.ServerSessionStatePrePlay: // play
res := s.pathManager.readerSetupPlay(pathReaderSetupPlayReq{
res := s.pathManager.readerAdd(pathReaderAddReq{
author: s,
pathName: ctx.Path,
authenticate: func(
@ -247,7 +247,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -247,7 +247,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
h := make(base.Header)
if s.ss.State() == gortsplib.ServerSessionStatePrePlay {
s.path.readerPlay(pathReaderPlayReq{author: s})
s.path.readerStart(pathReaderStartReq{author: s})
tracks := make(gortsplib.Tracks, len(s.ss.SetuppedTracks()))
n := 0
@ -286,7 +286,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo @@ -286,7 +286,7 @@ func (s *rtspSession) onPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Respo
// onRecord is called by rtspServer.
func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
res := s.path.publisherRecord(pathPublisherRecordReq{
res := s.path.publisherStart(pathPublisherStartReq{
author: s,
tracks: s.announcedTracks,
generateRTPPackets: false,
@ -322,14 +322,14 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res @@ -322,14 +322,14 @@ func (s *rtspSession) onPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Res
s.onReadCmd.Close()
}
s.path.readerPause(pathReaderPauseReq{author: s})
s.path.readerStop(pathReaderStopReq{author: s})
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePrePlay
s.stateMutex.Unlock()
case gortsplib.ServerSessionStateRecord:
s.path.publisherPause(pathPublisherPauseReq{author: s})
s.path.publisherStop(pathPublisherStopReq{author: s})
s.stateMutex.Lock()
s.state = gortsplib.ServerSessionStatePreRecord

Loading…
Cancel
Save