Browse Source

move stream in a dedicated package (#2121)

needed by #2068
pull/2122/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
db3862cf0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      internal/core/hls_muxer.go
  2. 11
      internal/core/hls_source.go
  3. 17
      internal/core/path.go
  4. 5
      internal/core/rpicamera_source.go
  5. 41
      internal/core/rtmp_conn.go
  6. 2
      internal/core/rtsp_conn.go
  7. 7
      internal/core/rtsp_session.go
  8. 2
      internal/core/rtsp_source.go
  9. 11
      internal/core/udp_source.go
  10. 5
      internal/core/webrtc_incoming_track.go
  11. 5
      internal/core/webrtc_outgoing_track.go
  12. 4
      internal/core/webrtc_session.go
  13. 44
      internal/stream/stream.go
  14. 18
      internal/stream/stream_format.go
  15. 6
      internal/stream/stream_media.go

27
internal/core/hls_muxer.go

@ -21,6 +21,7 @@ import ( @@ -21,6 +21,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
const (
@ -267,7 +268,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -267,7 +268,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
medias = append(medias, audioMedia)
}
defer res.stream.readerRemove(m)
defer res.stream.RemoveReader(m)
if medias == nil {
return fmt.Errorf(
@ -334,15 +335,15 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -334,15 +335,15 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
}
}
func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Track) {
func (m *hlsMuxer) createVideoTrack(stream *stream.Stream) (*media.Media, *gohlslib.Track) {
var videoFormatH265 *formats.H265
videoMedia := stream.medias().FindFormat(&videoFormatH265)
videoMedia := stream.Medias().FindFormat(&videoFormatH265)
if videoFormatH265 != nil {
videoStartPTSFilled := false
var videoStartPTS time.Duration
stream.readerAdd(m, videoMedia, videoFormatH265, func(unit formatprocessor.Unit) {
stream.AddReader(m, videoMedia, videoFormatH265, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitH265)
@ -377,13 +378,13 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -377,13 +378,13 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra
}
var videoFormatH264 *formats.H264
videoMedia = stream.medias().FindFormat(&videoFormatH264)
videoMedia = stream.Medias().FindFormat(&videoFormatH264)
if videoFormatH264 != nil {
videoStartPTSFilled := false
var videoStartPTS time.Duration
stream.readerAdd(m, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) {
stream.AddReader(m, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitH264)
@ -419,15 +420,15 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -419,15 +420,15 @@ func (m *hlsMuxer) createVideoTrack(stream *stream) (*media.Media, *gohlslib.Tra
return nil, nil
}
func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Track) {
func (m *hlsMuxer) createAudioTrack(stream *stream.Stream) (*media.Media, *gohlslib.Track) {
var audioFormatMPEG4AudioGeneric *formats.MPEG4AudioGeneric
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4AudioGeneric)
audioMedia := stream.Medias().FindFormat(&audioFormatMPEG4AudioGeneric)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(m, audioMedia, audioFormatMPEG4AudioGeneric, func(unit formatprocessor.Unit) {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioGeneric, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioGeneric)
@ -461,7 +462,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -461,7 +462,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
}
var audioFormatMPEG4AudioLATM *formats.MPEG4AudioLATM
audioMedia = stream.medias().FindFormat(&audioFormatMPEG4AudioLATM)
audioMedia = stream.Medias().FindFormat(&audioFormatMPEG4AudioLATM)
if audioMedia != nil &&
audioFormatMPEG4AudioLATM.Config != nil &&
@ -470,7 +471,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -470,7 +471,7 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(m, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
stream.AddReader(m, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioLATM)
@ -504,13 +505,13 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra @@ -504,13 +505,13 @@ func (m *hlsMuxer) createAudioTrack(stream *stream) (*media.Media, *gohlslib.Tra
}
var audioFormatOpus *formats.Opus
audioMedia = stream.medias().FindFormat(&audioFormatOpus)
audioMedia = stream.Medias().FindFormat(&audioFormatOpus)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(m, audioMedia, audioFormatOpus, func(unit formatprocessor.Unit) {
stream.AddReader(m, audioMedia, audioFormatOpus, func(unit formatprocessor.Unit) {
m.ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitOpus)

11
internal/core/hls_source.go

@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
type hlsSourceParent interface {
@ -39,7 +40,7 @@ func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) @@ -39,7 +40,7 @@ func (s *hlsSource) Log(level logger.Level, format string, args ...interface{})
// run implements sourceStaticImpl.
func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error {
var stream *stream
var stream *stream.Stream
defer func() {
if stream != nil {
@ -78,7 +79,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -78,7 +79,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: pts,
AU: au,
NTP: time.Now(),
@ -97,7 +98,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -97,7 +98,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnDataH26x(track, func(pts time.Duration, dts time.Duration, au [][]byte) {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
PTS: pts,
AU: au,
NTP: time.Now(),
@ -117,7 +118,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -117,7 +118,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnDataMPEG4Audio(track, func(pts time.Duration, dts time.Duration, aus [][]byte) {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
PTS: pts,
AUs: aus,
NTP: time.Now(),
@ -134,7 +135,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -134,7 +135,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
}
c.OnDataOpus(track, func(pts time.Duration, dts time.Duration, packets [][]byte) {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
PTS: pts,
Packets: packets,
NTP: time.Now(),

17
internal/core/path.go

@ -16,6 +16,7 @@ import ( @@ -16,6 +16,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
func newEmptyTimer() *time.Timer {
@ -50,7 +51,7 @@ const ( @@ -50,7 +51,7 @@ const (
)
type pathSourceStaticSetReadyRes struct {
stream *stream
stream *stream.Stream
err error
}
@ -88,7 +89,7 @@ type pathGetConfForPathReq struct { @@ -88,7 +89,7 @@ type pathGetConfForPathReq struct {
type pathDescribeRes struct {
path *path
stream *stream
stream *stream.Stream
redirect string
err error
}
@ -102,7 +103,7 @@ type pathDescribeReq struct { @@ -102,7 +103,7 @@ type pathDescribeReq struct {
type pathReaderSetupPlayRes struct {
path *path
stream *stream
stream *stream.Stream
err error
}
@ -128,7 +129,7 @@ type pathPublisherAddReq struct { @@ -128,7 +129,7 @@ type pathPublisherAddReq struct {
}
type pathPublisherRecordRes struct {
stream *stream
stream *stream.Stream
err error
}
@ -187,7 +188,7 @@ type path struct { @@ -187,7 +188,7 @@ type path struct {
ctxCancel func()
confMutex sync.RWMutex
source source
stream *stream
stream *stream.Stream
readyTime time.Time
bytesReceived *uint64
readers map[reader]struct{}
@ -619,7 +620,7 @@ func (pa *path) onDemandPublisherStop() { @@ -619,7 +620,7 @@ func (pa *path) onDemandPublisherStop() {
}
func (pa *path) setReady(medias media.Medias, allocateEncoder bool) error {
stream, err := newStream(
stream, err := stream.New(
pa.udpMaxPayloadSize,
medias,
allocateEncoder,
@ -665,7 +666,7 @@ func (pa *path) setNotReady() { @@ -665,7 +666,7 @@ func (pa *path) setNotReady() {
}
if pa.stream != nil {
pa.stream.close()
pa.stream.Close()
pa.stream = nil
}
}
@ -897,7 +898,7 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { @@ -897,7 +898,7 @@ func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) {
if pa.stream == nil {
return []string{}
}
return mediasDescription(pa.stream.medias())
return mediasDescription(pa.stream.Medias())
}(),
BytesReceived: atomic.LoadUint64(pa.bytesReceived),
Readers: func() []interface{} {

5
internal/core/rpicamera_source.go

@ -11,6 +11,7 @@ import ( @@ -11,6 +11,7 @@ import (
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/rpicamera"
"github.com/bluenviron/mediamtx/internal/stream"
)
func paramsFromConf(cnf *conf.PathConf) rpicamera.Params {
@ -82,7 +83,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -82,7 +83,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
}},
}
medias := media.Medias{medi}
var stream *stream
var stream *stream.Stream
onData := func(dts time.Duration, au [][]byte) {
if stream == nil {
@ -98,7 +99,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -98,7 +99,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
stream = res.stream
}
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: dts,
AU: au,
NTP: time.Now(),

41
internal/core/rtmp_conn.go

@ -27,6 +27,7 @@ import ( @@ -27,6 +27,7 @@ import (
"github.com/bluenviron/mediamtx/internal/rtmp"
"github.com/bluenviron/mediamtx/internal/rtmp/h264conf"
"github.com/bluenviron/mediamtx/internal/rtmp/message"
"github.com/bluenviron/mediamtx/internal/stream"
)
const (
@ -43,7 +44,7 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) { @@ -43,7 +44,7 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) {
type rtmpWriteFunc func(msg interface{}) error
func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) rtmpWriteFunc {
func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream.Stream) rtmpWriteFunc {
switch format.(type) {
case *formats.H264:
return func(msg interface{}) error {
@ -62,7 +63,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -62,7 +63,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
conf.PPS,
}
stream.writeUnit(medi, format, &formatprocessor.UnitH264{
stream.WriteUnit(medi, format, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
@ -74,7 +75,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -74,7 +75,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH264{
stream.WriteUnit(medi, format, &formatprocessor.UnitH264{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
@ -93,7 +94,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -93,7 +94,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
@ -105,7 +106,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -105,7 +106,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS,
AU: au,
NTP: time.Now(),
@ -117,7 +118,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -117,7 +118,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return fmt.Errorf("unable to decode AVCC: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitH265{
stream.WriteUnit(medi, format, &formatprocessor.UnitH265{
PTS: tmsg.DTS + tmsg.PTSDelta,
AU: au,
NTP: time.Now(),
@ -135,7 +136,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -135,7 +136,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return fmt.Errorf("unable to decode bitstream: %v", err)
}
stream.writeUnit(medi, format, &formatprocessor.UnitAV1{
stream.WriteUnit(medi, format, &formatprocessor.UnitAV1{
PTS: tmsg.DTS,
OBUs: obus,
NTP: time.Now(),
@ -149,7 +150,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -149,7 +150,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
return func(msg interface{}) error {
tmsg := msg.(*message.Audio)
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG2Audio{
stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG2Audio{
PTS: tmsg.DTS,
Frames: [][]byte{tmsg.Payload},
NTP: time.Now(),
@ -163,7 +164,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream) @@ -163,7 +164,7 @@ func getRTMPWriteFunc(medi *media.Media, format formats.Format, stream *stream)
tmsg := msg.(*message.Audio)
if tmsg.AACType == message.AudioAACTypeAU {
stream.writeUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{
stream.WriteUnit(medi, format, &formatprocessor.UnitMPEG4AudioGeneric{
PTS: tmsg.DTS,
AUs: [][]byte{tmsg.Payload},
NTP: time.Now(),
@ -407,7 +408,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -407,7 +408,7 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
}
defer res.stream.readerRemove(c)
defer res.stream.RemoveReader(c)
c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(medias))
@ -451,18 +452,18 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error { @@ -451,18 +452,18 @@ func (c *rtmpConn) runRead(ctx context.Context, u *url.URL) error {
}
}
func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBuffer,
func (c *rtmpConn) findVideoFormat(stream *stream.Stream, ringBuffer *ringbuffer.RingBuffer,
videoFirstIDRFound *bool, videoStartDTS *time.Duration,
) (*media.Media, formats.Format) {
var videoFormatH264 *formats.H264
videoMedia := stream.medias().FindFormat(&videoFormatH264)
videoMedia := stream.Medias().FindFormat(&videoFormatH264)
if videoFormatH264 != nil {
videoStartPTSFilled := false
var videoStartPTS time.Duration
var videoDTSExtractor *h264.DTSExtractor
stream.readerAdd(c, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) {
stream.AddReader(c, videoMedia, videoFormatH264, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitH264)
@ -556,20 +557,20 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu @@ -556,20 +557,20 @@ func (c *rtmpConn) findVideoFormat(stream *stream, ringBuffer *ringbuffer.RingBu
}
func (c *rtmpConn) findAudioFormat(
stream *stream,
stream *stream.Stream,
ringBuffer *ringbuffer.RingBuffer,
videoFormat formats.Format,
videoFirstIDRFound *bool,
videoStartDTS *time.Duration,
) (*media.Media, formats.Format) {
var audioFormatMPEG4Generic *formats.MPEG4AudioGeneric
audioMedia := stream.medias().FindFormat(&audioFormatMPEG4Generic)
audioMedia := stream.Medias().FindFormat(&audioFormatMPEG4Generic)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(c, audioMedia, audioFormatMPEG4Generic, func(unit formatprocessor.Unit) {
stream.AddReader(c, audioMedia, audioFormatMPEG4Generic, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioGeneric)
@ -621,7 +622,7 @@ func (c *rtmpConn) findAudioFormat( @@ -621,7 +622,7 @@ func (c *rtmpConn) findAudioFormat(
}
var audioFormatMPEG4AudioLATM *formats.MPEG4AudioLATM
audioMedia = stream.medias().FindFormat(&audioFormatMPEG4AudioLATM)
audioMedia = stream.Medias().FindFormat(&audioFormatMPEG4AudioLATM)
if audioMedia != nil &&
audioFormatMPEG4AudioLATM.Config != nil &&
@ -630,7 +631,7 @@ func (c *rtmpConn) findAudioFormat( @@ -630,7 +631,7 @@ func (c *rtmpConn) findAudioFormat(
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(c, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
stream.AddReader(c, audioMedia, audioFormatMPEG4AudioLATM, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG4AudioLATM)
@ -679,13 +680,13 @@ func (c *rtmpConn) findAudioFormat( @@ -679,13 +680,13 @@ func (c *rtmpConn) findAudioFormat(
}
var audioFormatMPEG2 *formats.MPEG2Audio
audioMedia = stream.medias().FindFormat(&audioFormatMPEG2)
audioMedia = stream.Medias().FindFormat(&audioFormatMPEG2)
if audioMedia != nil {
audioStartPTSFilled := false
var audioStartPTS time.Duration
stream.readerAdd(c, audioMedia, audioFormatMPEG2, func(unit formatprocessor.Unit) {
stream.AddReader(c, audioMedia, audioFormatMPEG2, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() error {
tunit := unit.(*formatprocessor.UnitMPEG2Audio)

2
internal/core/rtsp_conn.go

@ -182,7 +182,7 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx, @@ -182,7 +182,7 @@ func (c *rtspConn) onDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx,
return &base.Response{
StatusCode: base.StatusOK,
}, res.stream.rtspStream, nil
}, res.stream.RTSPStream(), nil
}
func (c *rtspConn) handleAuthError(authErr error) (*base.Response, error) {

7
internal/core/rtsp_session.go

@ -17,6 +17,7 @@ import ( @@ -17,6 +17,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
type rtspSessionPathManager interface {
@ -40,7 +41,7 @@ type rtspSession struct { @@ -40,7 +41,7 @@ type rtspSession struct {
uuid uuid.UUID
created time.Time
path *path
stream *stream
stream *stream.Stream
onReadCmd *externalcmd.Cmd // read
mutex sync.Mutex
state gortsplib.ServerSessionState
@ -245,7 +246,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt @@ -245,7 +246,7 @@ func (s *rtspSession) onSetup(c *rtspConn, ctx *gortsplib.ServerHandlerOnSetupCt
return &base.Response{
StatusCode: base.StatusOK,
}, res.stream.rtspStream, nil
}, res.stream.RTSPStream(), nil
default: // record
return &base.Response{
@ -315,7 +316,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R @@ -315,7 +316,7 @@ func (s *rtspSession) onRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.R
cforma := forma
ctx.Session.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
res.stream.writeRTPPacket(cmedi, cforma, pkt, time.Now())
res.stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now())
})
}
}

2
internal/core/rtsp_source.go

@ -185,7 +185,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -185,7 +185,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
cforma := forma
c.OnPacketRTP(cmedi, cforma, func(pkt *rtp.Packet) {
res.stream.writeRTPPacket(cmedi, cforma, pkt, time.Now())
res.stream.WriteRTPPacket(cmedi, cforma, pkt, time.Now())
})
}
}

11
internal/core/udp_source.go

@ -14,6 +14,7 @@ import ( @@ -14,6 +14,7 @@ import (
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
const (
@ -139,7 +140,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -139,7 +140,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
}
var medias media.Medias
var stream *stream
var stream *stream.Stream
var td *mpegts.TimeDecoder
decodeTime := func(t int64) time.Duration {
@ -163,7 +164,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -163,7 +164,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{
PTS: decodeTime(pts),
AU: au,
NTP: time.Now(),
@ -180,7 +181,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -180,7 +181,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{
PTS: decodeTime(pts),
AU: au,
NTP: time.Now(),
@ -201,7 +202,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -201,7 +202,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
}
r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{
PTS: decodeTime(pts),
AUs: aus,
NTP: time.Now(),
@ -219,7 +220,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -219,7 +220,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
}
r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error {
stream.writeUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{
PTS: decodeTime(pts),
Packets: packets,
NTP: time.Now(),

5
internal/core/webrtc_incoming_track.go

@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
@ -96,7 +97,7 @@ func newWebRTCIncomingTrack( @@ -96,7 +97,7 @@ func newWebRTCIncomingTrack(
return t, nil
}
func (t *webRTCIncomingTrack) start(stream *stream) {
func (t *webRTCIncomingTrack) start(stream *stream.Stream) {
go func() {
for {
pkt, _, err := t.track.ReadRTP()
@ -109,7 +110,7 @@ func (t *webRTCIncomingTrack) start(stream *stream) { @@ -109,7 +110,7 @@ func (t *webRTCIncomingTrack) start(stream *stream) {
continue
}
stream.writeRTPPacket(t.media, t.format, pkt, time.Now())
stream.WriteRTPPacket(t.media, t.format, pkt, time.Now())
}
}()

5
internal/core/webrtc_outgoing_track.go

@ -16,6 +16,7 @@ import ( @@ -16,6 +16,7 @@ import (
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/stream"
)
// workaround until this gets tagged:
@ -346,7 +347,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err @@ -346,7 +347,7 @@ func newWebRTCOutgoingTrackAudio(medias media.Medias) (*webRTCOutgoingTrack, err
func (t *webRTCOutgoingTrack) start(
ctx context.Context,
r reader,
stream *stream,
stream *stream.Stream,
ringBuffer *ringbuffer.RingBuffer,
writeError chan error,
) {
@ -361,7 +362,7 @@ func (t *webRTCOutgoingTrack) start( @@ -361,7 +362,7 @@ func (t *webRTCOutgoingTrack) start(
}
}()
stream.readerAdd(r, t.media, t.format, func(unit formatprocessor.Unit) {
stream.AddReader(r, t.media, t.format, func(unit formatprocessor.Unit) {
ringBuffer.Push(func() {
err := t.cb(unit)
if err != nil {

4
internal/core/webrtc_session.go

@ -446,7 +446,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -446,7 +446,7 @@ func (s *webRTCSession) runRead() (int, error) {
defer res.path.readerRemove(pathReaderRemoveReq{author: s})
tracks, err := gatherOutgoingTracks(res.stream.medias())
tracks, err := gatherOutgoingTracks(res.stream.Medias())
if err != nil {
return http.StatusBadRequest, err
}
@ -522,7 +522,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -522,7 +522,7 @@ func (s *webRTCSession) runRead() (int, error) {
track.start(s.ctx, s, res.stream, ringBuffer, writeError)
}
defer res.stream.readerRemove(s)
defer res.stream.RemoveReader(s)
s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(mediasOfOutgoingTracks(tracks)))

44
internal/core/stream.go → internal/stream/stream.go

@ -1,4 +1,5 @@ @@ -1,4 +1,5 @@
package core
// Package stream contains the Stream object.
package stream
import (
"time"
@ -9,23 +10,27 @@ import ( @@ -9,23 +10,27 @@ import (
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/formatprocessor"
"github.com/bluenviron/mediamtx/internal/logger"
)
type stream struct {
// Stream is a media stream.
// It stores tracks, readers and allow to write data to readers.
type Stream struct {
bytesReceived *uint64
rtspStream *gortsplib.ServerStream
smedias map[*media.Media]*streamMedia
}
func newStream(
// New allocates a Stream.
func New(
udpMaxPayloadSize int,
medias media.Medias,
generateRTPPackets bool,
bytesReceived *uint64,
source source,
) (*stream, error) {
s := &stream{
source logger.Writer,
) (*Stream, error) {
s := &Stream{
bytesReceived: bytesReceived,
rtspStream: gortsplib.NewServerStream(medias),
}
@ -43,35 +48,46 @@ func newStream( @@ -43,35 +48,46 @@ func newStream(
return s, nil
}
func (s *stream) close() {
// Close closes all resources of the stream.
func (s *Stream) Close() {
s.rtspStream.Close()
}
func (s *stream) medias() media.Medias {
// Medias returns medias of the stream.
func (s *Stream) Medias() media.Medias {
return s.rtspStream.Medias()
}
func (s *stream) readerAdd(r reader, medi *media.Media, forma formats.Format, cb func(formatprocessor.Unit)) {
// RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream() *gortsplib.ServerStream {
return s.rtspStream
}
// AddReader adds a reader.
func (s *Stream) AddReader(r interface{}, medi *media.Media, forma formats.Format, cb func(formatprocessor.Unit)) {
sm := s.smedias[medi]
sf := sm.formats[forma]
sf.readerAdd(r, cb)
sf.addReader(r, cb)
}
func (s *stream) readerRemove(r reader) {
// RemoveReader removes a reader.
func (s *Stream) RemoveReader(r interface{}) {
for _, sm := range s.smedias {
for _, sf := range sm.formats {
sf.readerRemove(r)
sf.removeReader(r)
}
}
}
func (s *stream) writeUnit(medi *media.Media, forma formats.Format, data formatprocessor.Unit) {
// WriteUnit writes a Unit.
func (s *Stream) WriteUnit(medi *media.Media, forma formats.Format, data formatprocessor.Unit) {
sm := s.smedias[medi]
sf := sm.formats[forma]
sf.writeUnit(s, medi, data)
}
func (s *stream) writeRTPPacket(medi *media.Media, forma formats.Format, pkt *rtp.Packet, ntp time.Time) {
// WriteRTPPacket writes a RTP packet.
func (s *Stream) WriteRTPPacket(medi *media.Media, forma formats.Format, pkt *rtp.Packet, ntp time.Time) {
sm := s.smedias[medi]
sf := sm.formats[forma]
sf.writeRTPPacket(s, medi, pkt, ntp)

18
internal/core/stream_format.go → internal/stream/stream_format.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package core
package stream
import (
"sync"
@ -14,17 +14,17 @@ import ( @@ -14,17 +14,17 @@ import (
)
type streamFormat struct {
source source
source logger.Writer
proc formatprocessor.Processor
mutex sync.RWMutex
nonRTSPReaders map[reader]func(formatprocessor.Unit)
nonRTSPReaders map[interface{}]func(formatprocessor.Unit)
}
func newStreamFormat(
udpMaxPayloadSize int,
forma formats.Format,
generateRTPPackets bool,
source source,
source logger.Writer,
) (*streamFormat, error) {
proc, err := formatprocessor.New(udpMaxPayloadSize, forma, generateRTPPackets, source)
if err != nil {
@ -34,25 +34,25 @@ func newStreamFormat( @@ -34,25 +34,25 @@ func newStreamFormat(
sf := &streamFormat{
source: source,
proc: proc,
nonRTSPReaders: make(map[reader]func(formatprocessor.Unit)),
nonRTSPReaders: make(map[interface{}]func(formatprocessor.Unit)),
}
return sf, nil
}
func (sf *streamFormat) readerAdd(r reader, cb func(formatprocessor.Unit)) {
func (sf *streamFormat) addReader(r interface{}, cb func(formatprocessor.Unit)) {
sf.mutex.Lock()
defer sf.mutex.Unlock()
sf.nonRTSPReaders[r] = cb
}
func (sf *streamFormat) readerRemove(r reader) {
func (sf *streamFormat) removeReader(r interface{}) {
sf.mutex.Lock()
defer sf.mutex.Unlock()
delete(sf.nonRTSPReaders, r)
}
func (sf *streamFormat) writeUnit(s *stream, medi *media.Media, data formatprocessor.Unit) {
func (sf *streamFormat) writeUnit(s *Stream, medi *media.Media, data formatprocessor.Unit) {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
@ -76,6 +76,6 @@ func (sf *streamFormat) writeUnit(s *stream, medi *media.Media, data formatproce @@ -76,6 +76,6 @@ func (sf *streamFormat) writeUnit(s *stream, medi *media.Media, data formatproce
}
}
func (sf *streamFormat) writeRTPPacket(s *stream, medi *media.Media, pkt *rtp.Packet, ntp time.Time) {
func (sf *streamFormat) writeRTPPacket(s *Stream, medi *media.Media, pkt *rtp.Packet, ntp time.Time) {
sf.writeUnit(s, medi, sf.proc.UnitForRTPPacket(pkt, ntp))
}

6
internal/core/stream_media.go → internal/stream/stream_media.go

@ -1,8 +1,10 @@ @@ -1,8 +1,10 @@
package core
package stream
import (
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/mediamtx/internal/logger"
)
type streamMedia struct {
@ -12,7 +14,7 @@ type streamMedia struct { @@ -12,7 +14,7 @@ type streamMedia struct {
func newStreamMedia(udpMaxPayloadSize int,
medi *media.Media,
generateRTPPackets bool,
source source,
source logger.Writer,
) (*streamMedia, error) {
sm := &streamMedia{
formats: make(map[formats.Format]*streamFormat),
Loading…
Cancel
Save