Browse Source

support recording to MPEG-TS (#2505)

pull/2506/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
95ab9375c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      README.md
  2. 2
      internal/conf/conf.go
  3. 2
      internal/conf/conf_test.go
  4. 10
      internal/conf/path.go
  5. 56
      internal/conf/record_format.go
  6. 2
      internal/conf/rtsp_range_type.go
  7. 1
      internal/core/core.go
  8. 2
      internal/core/hls_muxer.go
  9. 252
      internal/core/mpegts.go
  10. 3
      internal/core/path.go
  11. 9
      internal/core/reader.go
  12. 27
      internal/core/rtmp_conn.go
  13. 2
      internal/core/rtsp_session.go
  14. 2
      internal/core/source.go
  15. 2
      internal/core/source_static.go
  16. 243
      internal/core/srt_conn.go
  17. 2
      internal/core/srt_source.go
  18. 2
      internal/core/udp_source.go
  19. 10
      internal/core/webrtc_session.go
  20. 862
      internal/record/agent.go
  21. 282
      internal/record/agent_test.go
  22. 13
      internal/record/cleaner.go
  23. 3
      internal/record/cleaner_test.go
  24. 5
      internal/record/rec_format.go
  25. 821
      internal/record/rec_format_fmp4.go
  26. 48
      internal/record/rec_format_fmp4_part.go
  27. 42
      internal/record/rec_format_fmp4_segment.go
  28. 57
      internal/record/rec_format_fmp4_track.go
  29. 332
      internal/record/rec_format_mpegts.go
  30. 73
      internal/record/rec_format_mpegts_segment.go
  31. 57
      internal/record/track.go
  32. 19
      internal/stream/stream.go
  33. 3
      mediamtx.yml

1
README.md

@ -47,6 +47,7 @@ And can be recorded with: @@ -47,6 +47,7 @@ And can be recorded with:
|format|video codecs|audio codecs|
|------|------------|------------|
|[fMP4](#record-streams-to-disk)|AV1, VP9, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3, LPCM|
|[MPEG-TS](#record-streams-to-disk)|H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), AC-3|
**Features**

2
internal/conf/conf.go

@ -165,7 +165,7 @@ type Conf struct { @@ -165,7 +165,7 @@ type Conf struct {
// Record
Record *bool `json:"record,omitempty"` // deprecated
RecordPath *string `json:"recordPath,omitempty"` // deprecated
RecordFormat *string `json:"recordFormat,omitempty"` // deprecated
RecordFormat *RecordFormat `json:"recordFormat,omitempty"` // deprecated
RecordPartDuration *StringDuration `json:"recordPartDuration,omitempty"` // deprecated
RecordSegmentDuration *StringDuration `json:"recordSegmentDuration,omitempty"` // deprecated
RecordDeleteAfter *StringDuration `json:"recordDeleteAfter,omitempty"` // deprecated

2
internal/conf/conf_test.go

@ -52,7 +52,7 @@ func TestConfFromFile(t *testing.T) { @@ -52,7 +52,7 @@ func TestConfFromFile(t *testing.T) {
SourceOnDemandStartTimeout: 10 * StringDuration(time.Second),
SourceOnDemandCloseAfter: 10 * StringDuration(time.Second),
RecordPath: "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f",
RecordFormat: "fmp4",
RecordFormat: RecordFormatFMP4,
RecordPartDuration: 100000000,
RecordSegmentDuration: 3600000000000,
RecordDeleteAfter: 86400000000000,

10
internal/conf/path.go

@ -63,7 +63,7 @@ type Path struct { @@ -63,7 +63,7 @@ type Path struct {
// Record
Record bool `json:"record"`
RecordPath string `json:"recordPath"`
RecordFormat string `json:"recordFormat"`
RecordFormat RecordFormat `json:"recordFormat"`
RecordPartDuration StringDuration `json:"recordPartDuration"`
RecordSegmentDuration StringDuration `json:"recordSegmentDuration"`
RecordDeleteAfter StringDuration `json:"recordDeleteAfter"`
@ -150,7 +150,7 @@ func (pconf *Path) setDefaults() { @@ -150,7 +150,7 @@ func (pconf *Path) setDefaults() {
// Record
pconf.RecordPath = "./recordings/%path/%Y-%m-%d_%H-%M-%S-%f"
pconf.RecordFormat = "fmp4"
pconf.RecordFormat = RecordFormatFMP4
pconf.RecordPartDuration = 100 * StringDuration(time.Millisecond)
pconf.RecordSegmentDuration = 3600 * StringDuration(time.Second)
pconf.RecordDeleteAfter = 24 * 3600 * StringDuration(time.Second)
@ -400,12 +400,6 @@ func (pconf *Path) check(conf *Conf, name string) error { @@ -400,12 +400,6 @@ func (pconf *Path) check(conf *Conf, name string) error {
}
}
// Record
if pconf.RecordFormat != "fmp4" {
return fmt.Errorf("unsupported record format '%s'", pconf.RecordFormat)
}
// Publisher
if pconf.DisablePublisherOverride != nil {

56
internal/conf/record_format.go

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
package conf
import (
"encoding/json"
"fmt"
)
// RecordFormat is the recordFormat parameter.
type RecordFormat int
// supported values.
const (
RecordFormatFMP4 RecordFormat = iota
RecordFormatMPEGTS
)
// MarshalJSON implements json.Marshaler.
func (d RecordFormat) MarshalJSON() ([]byte, error) {
var out string
switch d {
case RecordFormatMPEGTS:
out = "mpegts"
default:
out = "fmp4"
}
return json.Marshal(out)
}
// UnmarshalJSON implements json.Unmarshaler.
func (d *RecordFormat) UnmarshalJSON(b []byte) error {
var in string
if err := json.Unmarshal(b, &in); err != nil {
return err
}
switch in {
case "mpegts":
*d = RecordFormatMPEGTS
case "fmp4":
*d = RecordFormatFMP4
default:
return fmt.Errorf("invalid record format '%s'", in)
}
return nil
}
// UnmarshalEnv implements env.Unmarshaler.
func (d *RecordFormat) UnmarshalEnv(_ string, v string) error {
return d.UnmarshalJSON([]byte(`"` + v + `"`))
}

2
internal/conf/rtsp_range_type.go

@ -8,7 +8,7 @@ import ( @@ -8,7 +8,7 @@ import (
// RTSPRangeType is the type used in the Range header.
type RTSPRangeType int
// supported rtsp range types.
// supported values.
const (
RTSPRangeTypeUndefined RTSPRangeType = iota
RTSPRangeTypeClock

1
internal/core/core.go

@ -41,6 +41,7 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry { @@ -41,6 +41,7 @@ func gatherCleanerEntries(paths map[string]*conf.Path) []record.CleanerEntry {
if pa.Record {
entry := record.CleanerEntry{
RecordPath: pa.RecordPath,
RecordFormat: pa.RecordFormat,
RecordDeleteAfter: time.Duration(pa.RecordDeleteAfter),
}
out[entry] = struct{}{}

2
internal/core/hls_muxer.go

@ -302,7 +302,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -302,7 +302,7 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
innerReady <- struct{}{}
m.Log(logger.Info, "is converting into HLS, %s",
sourceMediaInfo(medias))
mediaInfo(medias))
m.writer.Start()

252
internal/core/mpegts.go

@ -1,18 +1,32 @@ @@ -1,18 +1,32 @@
package core
import (
"bufio"
"errors"
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/ac3"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) {
var errMPEGTSNoTracks = errors.New("no supported tracks found (supported are H265, H264," +
" MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3")
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
func mpegtsSetupRead(r *mpegts.Reader, stream **stream.Stream) ([]*description.Media, error) {
var medias []*description.Media //nolint:prealloc
var td *mpegts.TimeDecoder
@ -191,8 +205,242 @@ func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description @@ -191,8 +205,242 @@ func mpegtsSetupTracks(r *mpegts.Reader, stream **stream.Stream) ([]*description
}
if len(medias) == 0 {
return nil, fmt.Errorf("no supported tracks found")
return nil, errMPEGTSNoTracks
}
return medias, nil
}
func mpegtsSetupWrite(stream *stream.Stream,
writer *asyncwriter.Writer, bw *bufio.Writer, sconn srt.Conn,
writeTimeout time.Duration,
) error {
var w *mpegts.Writer
var tracks []*mpegts.Track
addTrack := func(codec mpegts.Codec) *mpegts.Track {
track := &mpegts.Track{
Codec: codec,
}
tracks = append(tracks, track)
return track
}
for _, medi := range stream.Desc().Medias {
for _, forma := range medi.Formats {
switch forma := forma.(type) {
case *format.H265: //nolint:dupl
track := addTrack(&mpegts.CodecH265{})
var dtsExtractor *h265.DTSExtractor
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := h265.IsRandomAccess(tunit.AU)
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err = (*w).WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
})
case *format.H264: //nolint:dupl
track := addTrack(&mpegts.CodecH264{})
var dtsExtractor *h264.DTSExtractor
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
idrPresent := h264.IDRPresent(tunit.AU)
if dtsExtractor == nil {
if !idrPresent {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err = (*w).WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG4Video:
track := addTrack(&mpegts.CodecMPEG4Video{})
firstReceived := false
var lastPTS time.Duration
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG1Video:
track := addTrack(&mpegts.CodecMPEG1Video{})
firstReceived := false
var lastPTS time.Duration
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil {
return err
}
return bw.Flush()
})
case *format.Opus:
track := addTrack(&mpegts.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
})
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG4Audio:
track := addTrack(&mpegts.CodecMPEG4Audio{
Config: *forma.GetConfig(),
})
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG1Audio:
track := addTrack(&mpegts.CodecMPEG1Audio{})
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames)
if err != nil {
return err
}
return bw.Flush()
})
case *format.AC3:
track := addTrack(&mpegts.CodecAC3{})
sampleRate := time.Duration(forma.SampleRate)
stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
}
for i, frame := range tunit.Frames {
framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame*
time.Second/sampleRate
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteAC3(track, durationGoToMPEGTS(framePTS), frame)
if err != nil {
return err
}
}
return bw.Flush()
})
}
}
}
if len(tracks) == 0 {
return errMPEGTSNoTracks
}
w = mpegts.NewWriter(bw, tracks)
return nil
}

3
internal/core/path.go

@ -649,7 +649,7 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) { @@ -649,7 +649,7 @@ func (pa *path) doStartPublisher(req pathStartPublisherReq) {
req.author.Log(logger.Info, "is publishing to path '%s', %s",
pa.name,
sourceMediaInfo(req.desc.Medias))
mediaInfo(req.desc.Medias))
if pa.conf.HasOnDemandPublisher() {
pa.onDemandPublisherReadyTimer.Stop()
@ -953,6 +953,7 @@ func (pa *path) startRecording() { @@ -953,6 +953,7 @@ func (pa *path) startRecording() {
pa.recordAgent = record.NewAgent(
pa.writeQueueSize,
pa.conf.RecordPath,
pa.conf.RecordFormat,
time.Duration(pa.conf.RecordPartDuration),
time.Duration(pa.conf.RecordSegmentDuration),
pa.name,

9
internal/core/reader.go

@ -1,7 +1,16 @@ @@ -1,7 +1,16 @@
package core
import (
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream"
)
// reader is an entity that can read a stream.
type reader interface {
close()
apiReaderDescribe() apiPathSourceOrReader
}
func readerMediaInfo(r *asyncwriter.Writer, stream *stream.Stream) string {
return mediaInfo(stream.MediasForReader(r))
}

27
internal/core/rtmp_conn.go

@ -233,24 +233,17 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -233,24 +233,17 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
defer res.stream.RemoveReader(writer)
var medias []*description.Media
var w *rtmp.Writer
videoMedia, videoFormat := c.setupVideo(
videoFormat := c.setupVideo(
&w,
res.stream,
writer)
if videoMedia != nil {
medias = append(medias, videoMedia)
}
audioMedia, audioFormat := c.setupAudio(
audioFormat := c.setupAudio(
&w,
res.stream,
writer)
if audioFormat != nil {
medias = append(medias, audioMedia)
}
if videoFormat == nil && audioFormat == nil {
return fmt.Errorf(
@ -258,7 +251,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -258,7 +251,7 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
}
c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(medias))
res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf()
@ -325,7 +318,7 @@ func (c *rtmpConn) setupVideo( @@ -325,7 +318,7 @@ func (c *rtmpConn) setupVideo(
w **rtmp.Writer,
stream *stream.Stream,
writer *asyncwriter.Writer,
) (*description.Media, format.Format) {
) format.Format {
var videoFormatH264 *format.H264
videoMedia := stream.Desc().FindFormat(&videoFormatH264)
@ -384,17 +377,17 @@ func (c *rtmpConn) setupVideo( @@ -384,17 +377,17 @@ func (c *rtmpConn) setupVideo(
return (*w).WriteH264(tunit.PTS, dts, idrPresent, tunit.AU)
})
return videoMedia, videoFormatH264
return videoFormatH264
}
return nil, nil
return nil
}
func (c *rtmpConn) setupAudio(
w **rtmp.Writer,
stream *stream.Stream,
writer *asyncwriter.Writer,
) (*description.Media, format.Format) {
) format.Format {
var audioFormatMPEG4Audio *format.MPEG4Audio
audioMedia := stream.Desc().FindFormat(&audioFormatMPEG4Audio)
@ -421,7 +414,7 @@ func (c *rtmpConn) setupAudio( @@ -421,7 +414,7 @@ func (c *rtmpConn) setupAudio(
return nil
})
return audioMedia, audioFormatMPEG4Audio
return audioFormatMPEG4Audio
}
var audioFormatMPEG1 *format.MPEG1Audio
@ -457,10 +450,10 @@ func (c *rtmpConn) setupAudio( @@ -457,10 +450,10 @@ func (c *rtmpConn) setupAudio(
return nil
})
return audioMedia, audioFormatMPEG1
return audioFormatMPEG1
}
return nil, nil
return nil
}
func (c *rtmpConn) runPublish(conn *rtmp.Conn, u *url.URL) error {

2
internal/core/rtsp_session.go

@ -308,7 +308,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons @@ -308,7 +308,7 @@ func (s *rtspSession) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Respons
s.Log(logger.Info, "is reading from path '%s', with %s, %s",
s.path.name,
s.session.SetuppedTransport(),
sourceMediaInfo(s.session.SetuppedMedias()))
mediaInfo(s.session.SetuppedMedias()))
pathConf := s.path.safeConf()

2
internal/core/source.go

@ -35,7 +35,7 @@ func mediasDescription(medias []*description.Media) []string { @@ -35,7 +35,7 @@ func mediasDescription(medias []*description.Media) []string {
return ret
}
func sourceMediaInfo(medias []*description.Media) string {
func mediaInfo(medias []*description.Media) string {
return fmt.Sprintf("%d %s (%s)",
len(medias),
func() string {

2
internal/core/source_static.go

@ -228,7 +228,7 @@ func (s *sourceStatic) setReady(req pathSourceStaticSetReadyReq) pathSourceStati @@ -228,7 +228,7 @@ func (s *sourceStatic) setReady(req pathSourceStaticSetReadyReq) pathSourceStati
res := <-req.res
if res.err == nil {
s.impl.Log(logger.Info, "ready: %s", sourceMediaInfo(req.desc.Medias))
s.impl.Log(logger.Info, "ready: %s", mediaInfo(req.desc.Medias))
}
return res

243
internal/core/srt_conn.go

@ -11,10 +11,6 @@ import ( @@ -11,10 +11,6 @@ import (
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/ac3"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
"github.com/google/uuid"
@ -24,13 +20,8 @@ import ( @@ -24,13 +20,8 @@ import (
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
func srtCheckPassphrase(connReq srt.ConnRequest, passphrase string) error {
if passphrase == "" {
return nil
@ -287,7 +278,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { @@ -287,7 +278,7 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
var stream *stream.Stream
medias, err := mpegtsSetupTracks(r, &stream)
medias, err := mpegtsSetupRead(r, &stream)
if err != nil {
return err
}
@ -357,237 +348,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass @@ -357,237 +348,15 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
defer res.stream.RemoveReader(writer)
var w *mpegts.Writer
var tracks []*mpegts.Track
var medias []*description.Media
bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize))
addTrack := func(medi *description.Media, codec mpegts.Codec) *mpegts.Track {
track := &mpegts.Track{
Codec: codec,
}
tracks = append(tracks, track)
medias = append(medias, medi)
return track
}
for _, medi := range res.stream.Desc().Medias {
for _, forma := range medi.Formats {
switch forma := forma.(type) {
case *format.H265: //nolint:dupl
track := addTrack(medi, &mpegts.CodecH265{})
var dtsExtractor *h265.DTSExtractor
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := h265.IsRandomAccess(tunit.AU)
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
})
case *format.H264: //nolint:dupl
track := addTrack(medi, &mpegts.CodecH264{})
var dtsExtractor *h264.DTSExtractor
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
idrPresent := h264.IDRPresent(tunit.AU)
if dtsExtractor == nil {
if !idrPresent {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteH26x(track, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG4Video:
track := addTrack(medi, &mpegts.CodecMPEG4Video{})
firstReceived := false
var lastPTS time.Duration
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG1Video:
track := addTrack(medi, &mpegts.CodecMPEG1Video{})
firstReceived := false
var lastPTS time.Duration
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG4Audio:
track := addTrack(medi, &mpegts.CodecMPEG4Audio{
Config: *forma.GetConfig(),
})
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs)
if err != nil {
return err
}
return bw.Flush()
})
case *format.Opus:
track := addTrack(medi, &mpegts.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
})
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG1Audio:
track := addTrack(medi, &mpegts.CodecMPEG1Audio{})
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames)
if err != nil {
return err
}
return bw.Flush()
})
case *format.AC3:
track := addTrack(medi, &mpegts.CodecAC3{})
sampleRate := time.Duration(forma.SampleRate)
res.stream.AddReader(writer, medi, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
}
for i, frame := range tunit.Frames {
framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame*
time.Second/sampleRate
sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout)))
err = w.WriteAC3(track, durationGoToMPEGTS(framePTS), frame)
if err != nil {
return err
}
}
return bw.Flush()
})
}
}
}
if len(tracks) == 0 {
return true, fmt.Errorf(
"the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG-4 Audio")
err = mpegtsSetupWrite(res.stream, writer, bw, sconn, time.Duration(c.writeTimeout))
if err != nil {
return true, err
}
c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(medias))
res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf()
@ -629,8 +398,6 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass @@ -629,8 +398,6 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
}()
}
w = mpegts.NewWriter(bw, tracks)
// disable read deadline
sconn.SetReadDeadline(time.Time{})

2
internal/core/srt_source.go

@ -96,7 +96,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { @@ -96,7 +96,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error {
var stream *stream.Stream
medias, err := mpegtsSetupTracks(r, &stream)
medias, err := mpegtsSetupRead(r, &stream)
if err != nil {
return err
}

2
internal/core/udp_source.go

@ -129,7 +129,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -129,7 +129,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
var stream *stream.Stream
medias, err := mpegtsSetupTracks(r, &stream)
medias, err := mpegtsSetupRead(r, &stream)
if err != nil {
return err
}

10
internal/core/webrtc_session.go

@ -27,14 +27,6 @@ type trackRecvPair struct { @@ -27,14 +27,6 @@ type trackRecvPair struct {
receiver *webrtc.RTPReceiver
}
func webrtcMediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) []*description.Media {
ret := make([]*description.Media, len(tracks))
for i, track := range tracks {
ret[i] = track.media
}
return ret
}
func webrtcMediasOfIncomingTracks(tracks []*webRTCIncomingTrack) []*description.Media {
ret := make([]*description.Media, len(tracks))
for i, track := range tracks {
@ -525,7 +517,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -525,7 +517,7 @@ func (s *webRTCSession) runRead() (int, error) {
}
s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))
res.path.name, readerMediaInfo(writer, res.stream))
pathConf := res.path.safeConf()

862
internal/record/agent.go

@ -1,108 +1,21 @@ @@ -1,108 +1,21 @@
package record
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/ac3"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/jpeg"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video"
"github.com/bluenviron/mediacommon/pkg/codecs/opus"
"github.com/bluenviron/mediacommon/pkg/codecs/vp9"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
// OnSegmentFunc is the prototype of the function passed as runOnSegmentStart / runOnSegmentComplete
type OnSegmentFunc = func(string)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale)
secs := v / time.Second
dec := v % time.Second
return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second)
}
func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int {
switch cm {
case mpeg1audio.ChannelModeStereo,
mpeg1audio.ChannelModeJointStereo,
mpeg1audio.ChannelModeDualChannel:
return 2
default:
return 1
}
}
func jpegExtractSize(image []byte) (int, int, error) {
l := len(image)
if l < 2 || image[0] != 0xFF || image[1] != jpeg.MarkerStartOfImage {
return 0, 0, fmt.Errorf("invalid header")
}
image = image[2:]
for {
if len(image) < 2 {
return 0, 0, fmt.Errorf("not enough bits")
}
h0, h1 := image[0], image[1]
image = image[2:]
if h0 != 0xFF {
return 0, 0, fmt.Errorf("invalid image")
}
switch h1 {
case 0xE0, 0xE1, 0xE2, // JFIF
jpeg.MarkerDefineHuffmanTable,
jpeg.MarkerComment,
jpeg.MarkerDefineQuantizationTable,
jpeg.MarkerDefineRestartInterval:
mlen := int(image[0])<<8 | int(image[1])
if len(image) < mlen {
return 0, 0, fmt.Errorf("not enough bits")
}
image = image[mlen:]
case jpeg.MarkerStartOfFrame1:
mlen := int(image[0])<<8 | int(image[1])
if len(image) < mlen {
return 0, 0, fmt.Errorf("not enough bits")
}
var sof jpeg.StartOfFrame1
err := sof.Unmarshal(image[2:mlen])
if err != nil {
return 0, 0, err
}
return sof.Width, sof.Height, nil
case jpeg.MarkerStartOfScan:
return 0, 0, fmt.Errorf("SOF not found")
default:
return 0, 0, fmt.Errorf("unknown marker: 0x%.2x", h1)
}
}
}
type sample struct {
*fmp4.PartSample
dts time.Duration
@ -118,13 +31,10 @@ type Agent struct { @@ -118,13 +31,10 @@ type Agent struct {
onSegmentComplete OnSegmentFunc
parent logger.Writer
ctx context.Context
ctxCancel func()
writer *asyncwriter.Writer
tracks []*track
hasVideo bool
currentSegment *segment
nextSequenceNumber uint32
ctx context.Context
ctxCancel func()
writer *asyncwriter.Writer
format recFormat
done chan struct{}
}
@ -132,7 +42,8 @@ type Agent struct { @@ -132,7 +42,8 @@ type Agent struct {
// NewAgent allocates an Agent.
func NewAgent(
writeQueueSize int,
recordPath string,
path string,
format conf.RecordFormat,
partDuration time.Duration,
segmentDuration time.Duration,
pathName string,
@ -141,13 +52,20 @@ func NewAgent( @@ -141,13 +52,20 @@ func NewAgent(
onSegmentComplete OnSegmentFunc,
parent logger.Writer,
) *Agent {
recordPath = strings.ReplaceAll(recordPath, "%path", pathName)
recordPath += ".mp4"
path = strings.ReplaceAll(path, "%path", pathName)
switch format {
case conf.RecordFormatMPEGTS:
path += ".ts"
default:
path += ".mp4"
}
ctx, ctxCancel := context.WithCancel(context.Background())
r := &Agent{
path: recordPath,
a := &Agent{
path: path,
partDuration: partDuration,
segmentDuration: segmentDuration,
stream: stream,
@ -159,744 +77,48 @@ func NewAgent( @@ -159,744 +77,48 @@ func NewAgent(
done: make(chan struct{}),
}
r.writer = asyncwriter.New(writeQueueSize, r)
nextID := 1
addTrack := func(codec fmp4.Codec) *track {
initTrack := &fmp4.InitTrack{
TimeScale: 90000,
Codec: codec,
}
initTrack.ID = nextID
nextID++
track := newTrack(r, initTrack)
r.tracks = append(r.tracks, track)
return track
}
for _, media := range stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.AV1:
codec := &fmp4.CodecAV1{
SequenceHeader: []byte{
8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64,
},
}
track := addTrack(codec)
firstReceived := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
}
randomAccess := false
for _, obu := range tunit.TU {
var h av1.OBUHeader
err := h.Unmarshal(obu)
if err != nil {
return err
}
if h.Type == av1.OBUTypeSequenceHeader {
if !bytes.Equal(codec.SequenceHeader, obu) {
codec.SequenceHeader = obu
r.updateCodecs()
}
randomAccess = true
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
sampl, err := fmp4.NewPartSampleAV1(
randomAccess,
tunit.TU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: tunit.PTS,
})
})
case *format.VP9:
codec := &fmp4.CodecVP9{
Width: 1280,
Height: 720,
Profile: 1,
BitDepth: 8,
ChromaSubsampling: 1,
ColorRange: false,
}
track := addTrack(codec)
firstReceived := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
}
var h vp9.Header
err := h.Unmarshal(tunit.Frame)
if err != nil {
return err
}
randomAccess := false
if h.FrameType == vp9.FrameTypeKeyFrame {
randomAccess = true
if w := h.Width(); codec.Width != w {
codec.Width = w
r.updateCodecs()
}
if h := h.Width(); codec.Height != h {
codec.Height = h
r.updateCodecs()
}
if codec.Profile != h.Profile {
codec.Profile = h.Profile
r.updateCodecs()
}
if codec.BitDepth != h.ColorConfig.BitDepth {
codec.BitDepth = h.ColorConfig.BitDepth
r.updateCodecs()
}
if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c {
codec.ChromaSubsampling = c
r.updateCodecs()
}
if codec.ColorRange != h.ColorConfig.ColorRange {
codec.ColorRange = h.ColorConfig.ColorRange
r.updateCodecs()
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
IsNonSyncSample: !randomAccess,
Payload: tunit.Frame,
},
dts: tunit.PTS,
})
})
case *format.VP8:
// TODO
case *format.H265:
vps, sps, pps := forma.SafeParams()
if vps == nil || sps == nil || pps == nil {
vps = []byte{
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
}
sps = []byte{
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
}
pps = []byte{
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
}
}
codec := &fmp4.CodecH265{
VPS: vps,
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h265.DTSExtractor
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := false
for _, nalu := range tunit.AU {
typ := h265.NALUType((nalu[0] >> 1) & 0b111111)
switch typ {
case h265.NALUType_VPS_NUT:
if !bytes.Equal(codec.VPS, nalu) {
codec.VPS = nalu
r.updateCodecs()
}
case h265.NALUType_SPS_NUT:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
r.updateCodecs()
}
case h265.NALUType_PPS_NUT:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
r.updateCodecs()
}
case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.H264:
sps, pps := forma.SafeParams()
if sps == nil || pps == nil {
sps = []byte{
0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11,
0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01,
0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32,
0x48,
}
pps = []byte{
0x68, 0xcb, 0x8c, 0xb2,
}
}
codec := &fmp4.CodecH264{
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h264.DTSExtractor
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
a.writer = asyncwriter.New(writeQueueSize, a)
randomAccess := false
switch format {
case conf.RecordFormatMPEGTS:
a.format = newRecFormatMPEGTS(a)
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
r.updateCodecs()
}
case h264.NALUTypePPS:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
r.updateCodecs()
}
case h264.NALUTypeIDR:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.MPEG4Video:
config := forma.SafeParams()
if config == nil {
config = []byte{
0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01,
0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00,
0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00,
0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38,
0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30,
}
}
codec := &fmp4.CodecMPEG4Video{
Config: config,
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
r.updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MPEG1Video:
codec := &fmp4.CodecMPEG1Video{
Config: []byte{
0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35,
0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5,
0x14, 0x4a, 0x00, 0x01, 0x00, 0x00,
},
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
r.updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MJPEG:
codec := &fmp4.CodecMJPEG{
Width: 800,
Height: 600,
}
track := addTrack(codec)
parsed := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MJPEG)
if tunit.Frame == nil {
return nil
}
if !parsed {
parsed = true
width, height, err := jpegExtractSize(tunit.Frame)
if err != nil {
return err
}
codec.Width = width
codec.Height = height
r.updateCodecs()
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
},
dts: tunit.PTS,
})
})
case *format.Opus:
codec := &fmp4.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
}
track := addTrack(codec)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
pts := tunit.PTS
for _, packet := range tunit.Packets {
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: packet,
},
dts: pts,
})
if err != nil {
return err
}
pts += opus.PacketDuration(packet)
}
return nil
})
case *format.MPEG4Audio:
codec := &fmp4.CodecMPEG4Audio{
Config: *forma.GetConfig(),
}
track := addTrack(codec)
sampleRate := time.Duration(forma.ClockRate())
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
for i, au := range tunit.AUs {
auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/sampleRate
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: au,
},
dts: auPTS,
})
if err != nil {
return err
}
}
return nil
})
case *format.MPEG1Audio:
codec := &fmp4.CodecMPEG1Audio{
SampleRate: 32000,
ChannelCount: 2,
}
track := addTrack(codec)
parsed := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
pts := tunit.PTS
for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
if !parsed {
parsed = true
codec.SampleRate = h.SampleRate
codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode)
r.updateCodecs()
}
err = track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: frame,
},
dts: pts,
})
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
case *format.AC3:
codec := &fmp4.CodecAC3{
SampleRate: forma.SampleRate,
ChannelCount: forma.ChannelCount,
Fscod: 0,
Bsid: 8,
Bsmod: 0,
Acmod: 7,
LfeOn: true,
BitRateCode: 7,
}
track := addTrack(codec)
parsed := false
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
}
pts := tunit.PTS
for _, frame := range tunit.Frames {
var syncInfo ac3.SyncInfo
err := syncInfo.Unmarshal(frame)
if err != nil {
return fmt.Errorf("invalid AC-3 frame: %s", err)
}
var bsi ac3.BSI
err = bsi.Unmarshal(frame[5:])
if err != nil {
return fmt.Errorf("invalid AC-3 frame: %s", err)
}
if !parsed {
parsed = true
codec.SampleRate = syncInfo.SampleRate()
codec.ChannelCount = bsi.ChannelCount()
codec.Fscod = syncInfo.Fscod
codec.Bsid = bsi.Bsid
codec.Bsmod = bsi.Bsmod
codec.Acmod = bsi.Acmod
codec.LfeOn = bsi.LfeOn
codec.BitRateCode = syncInfo.Frmsizecod >> 1
r.updateCodecs()
}
err = track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: frame,
},
dts: pts,
})
if err != nil {
return err
}
pts += time.Duration(ac3.SamplesPerFrame) *
time.Second / time.Duration(codec.SampleRate)
}
return nil
})
case *format.G722:
// TODO
case *format.G711:
// TODO
case *format.LPCM:
codec := &fmp4.CodecLPCM{
LittleEndian: false,
BitDepth: forma.BitDepth,
SampleRate: forma.SampleRate,
ChannelCount: forma.ChannelCount,
}
track := addTrack(codec)
stream.AddReader(r.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.LPCM)
if tunit.Samples == nil {
return nil
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Samples,
},
dts: tunit.PTS,
})
})
}
}
default:
a.format = newRecFormatFMP4(a)
}
r.Log(logger.Info, "recording %d %s",
len(r.tracks),
func() string {
if len(r.tracks) == 1 {
return "track"
}
return "tracks"
}())
go r.run()
go a.run()
return r
return a
}
// Close closes the Agent.
func (r *Agent) Close() {
r.Log(logger.Info, "recording stopped")
func (a *Agent) Close() {
a.Log(logger.Info, "recording stopped")
r.ctxCancel()
<-r.done
a.ctxCancel()
<-a.done
}
// Log is the main logging function.
func (r *Agent) Log(level logger.Level, format string, args ...interface{}) {
r.parent.Log(level, "[record] "+format, args...)
func (a *Agent) Log(level logger.Level, format string, args ...interface{}) {
a.parent.Log(level, "[record] "+format, args...)
}
func (r *Agent) run() {
defer close(r.done)
func (a *Agent) run() {
defer close(a.done)
r.writer.Start()
a.writer.Start()
select {
case err := <-r.writer.Error():
r.Log(logger.Error, err.Error())
r.stream.RemoveReader(r.writer)
case err := <-a.writer.Error():
a.Log(logger.Error, err.Error())
a.stream.RemoveReader(a.writer)
case <-r.ctx.Done():
r.stream.RemoveReader(r.writer)
r.writer.Stop()
case <-a.ctx.Done():
a.stream.RemoveReader(a.writer)
a.writer.Stop()
}
if r.currentSegment != nil {
r.currentSegment.close() //nolint:errcheck
}
}
func (r *Agent) updateCodecs() {
// if codec parameters have been updated,
// and current segment has already written codec parameters on disk,
// close current segment.
if r.currentSegment != nil && r.currentSegment.f != nil {
r.currentSegment.close() //nolint:errcheck
r.currentSegment = nil
}
a.format.close()
}

282
internal/record/agent_test.go

@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
@ -23,142 +24,161 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { @@ -23,142 +24,161 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
func TestAgent(t *testing.T) {
n := 0
timeNow = func() time.Time {
n++
if n >= 2 {
return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC)
}
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC)
}
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
},
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
}}
stream, err := stream.New(
1460,
desc,
true,
&nilLogger{},
)
require.NoError(t, err)
defer stream.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
segCreated := make(chan struct{}, 2)
segDone := make(chan struct{}, 2)
a := NewAgent(
1024,
recordPath,
100*time.Millisecond,
1*time.Second,
"mypath",
stream,
func(fpath string) {
segCreated <- struct{}{}
},
func(fpath string) {
segDone <- struct{}{}
},
&nilLogger{},
)
for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
for _, ca := range []string{"fmp4", "mpegts"} {
t.Run(ca, func(t *testing.T) {
n := 0
timeNow = func() time.Time {
n++
if n >= 2 {
return time.Date(2008, 0o5, 20, 22, 15, 25, 125000, time.UTC)
}
return time.Date(2009, 0o5, 20, 22, 15, 25, 427000, time.UTC)
}
desc := &description.Session{Medias: []*description.Media{
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
},
{ // SPS
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
},
{ // PPS
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
Config: &mpeg4audio.Config{
Type: 2,
SampleRate: 44100,
ChannelCount: 2,
},
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}},
},
{byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR
},
})
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
}}
stream, err := stream.New(
1460,
desc,
true,
&nilLogger{},
)
require.NoError(t, err)
defer stream.Close()
dir, err := os.MkdirTemp("", "mediamtx-agent")
require.NoError(t, err)
defer os.RemoveAll(dir)
recordPath := filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f")
segCreated := make(chan struct{}, 2)
segDone := make(chan struct{}, 2)
var f conf.RecordFormat
if ca == "fmp4" {
f = conf.RecordFormatFMP4
} else {
f = conf.RecordFormatMPEGTS
}
a := NewAgent(
1024,
recordPath,
f,
100*time.Millisecond,
1*time.Second,
"mypath",
stream,
func(fpath string) {
segCreated <- struct{}{}
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
func(fpath string) {
segDone <- struct{}{}
},
{5}, // IDR
},
})
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AUs: [][]byte{{1, 2, 3, 4}},
&nilLogger{},
)
for i := 0; i < 3; i++ {
stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H265{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // VPS
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
},
{ // SPS
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
},
{ // PPS
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
},
{byte(h265.NALUType_CRA_NUT) << 1, 0}, // IDR
},
})
stream.WriteUnit(desc.Medias[1], desc.Medias[1].Formats[0], &unit.H264{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AU: [][]byte{
{ // SPS
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},
{ // PPS
0x08, 0x06, 0x07, 0x08,
},
{5}, // IDR
},
})
stream.WriteUnit(desc.Medias[2], desc.Medias[2].Formats[0], &unit.MPEG4Audio{
Base: unit.Base{
PTS: (50 + time.Duration(i)) * time.Second,
},
AUs: [][]byte{{1, 2, 3, 4}},
})
}
for i := 0; i < 2; i++ {
<-segCreated
<-segDone
}
a.Close()
var ext string
if ca == "fmp4" {
ext = "mp4"
} else {
ext = "ts"
}
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125."+ext))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427."+ext))
require.NoError(t, err)
})
}
for i := 0; i < 2; i++ {
<-segCreated
<-segDone
}
a.Close()
_, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000125.mp4"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(dir, "mypath", "2009-05-20_22-15-25-000427.mp4"))
require.NoError(t, err)
}

13
internal/record/cleaner.go

@ -8,6 +8,7 @@ import ( @@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
)
@ -41,6 +42,7 @@ func commonPath(v string) string { @@ -41,6 +42,7 @@ func commonPath(v string) string {
// CleanerEntry is a cleaner entry.
type CleanerEntry struct {
RecordPath string
RecordFormat conf.RecordFormat
RecordDeleteAfter time.Duration
}
@ -115,7 +117,16 @@ func (c *Cleaner) doRun() { @@ -115,7 +117,16 @@ func (c *Cleaner) doRun() {
}
func (c *Cleaner) doRunEntry(e *CleanerEntry) error {
recordPath := e.RecordPath + ".mp4"
recordPath := e.RecordPath
switch e.RecordFormat {
case conf.RecordFormatMPEGTS:
recordPath += ".ts"
default:
recordPath += ".mp4"
}
commonPath := commonPath(recordPath)
now := timeNow()

3
internal/record/cleaner_test.go

@ -7,6 +7,8 @@ import ( @@ -7,6 +7,8 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/conf"
)
func TestCleaner(t *testing.T) {
@ -32,6 +34,7 @@ func TestCleaner(t *testing.T) { @@ -32,6 +34,7 @@ func TestCleaner(t *testing.T) {
c := NewCleaner(
[]CleanerEntry{{
RecordPath: recordPath,
RecordFormat: conf.RecordFormatFMP4,
RecordDeleteAfter: 10 * time.Second,
}},
nilLogger{},

5
internal/record/rec_format.go

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
package record
type recFormat interface {
close()
}

821
internal/record/rec_format_fmp4.go

@ -0,0 +1,821 @@ @@ -0,0 +1,821 @@
package record
import (
"bytes"
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/ac3"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/jpeg"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg1audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video"
"github.com/bluenviron/mediacommon/pkg/codecs/opus"
"github.com/bluenviron/mediacommon/pkg/codecs/vp9"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
timeScale64 := uint64(timeScale)
secs := v / time.Second
dec := v % time.Second
return uint64(secs)*timeScale64 + uint64(dec)*timeScale64/uint64(time.Second)
}
func mpeg1audioChannelCount(cm mpeg1audio.ChannelMode) int {
switch cm {
case mpeg1audio.ChannelModeStereo,
mpeg1audio.ChannelModeJointStereo,
mpeg1audio.ChannelModeDualChannel:
return 2
default:
return 1
}
}
func jpegExtractSize(image []byte) (int, int, error) {
l := len(image)
if l < 2 || image[0] != 0xFF || image[1] != jpeg.MarkerStartOfImage {
return 0, 0, fmt.Errorf("invalid header")
}
image = image[2:]
for {
if len(image) < 2 {
return 0, 0, fmt.Errorf("not enough bits")
}
h0, h1 := image[0], image[1]
image = image[2:]
if h0 != 0xFF {
return 0, 0, fmt.Errorf("invalid image")
}
switch h1 {
case 0xE0, 0xE1, 0xE2, // JFIF
jpeg.MarkerDefineHuffmanTable,
jpeg.MarkerComment,
jpeg.MarkerDefineQuantizationTable,
jpeg.MarkerDefineRestartInterval:
mlen := int(image[0])<<8 | int(image[1])
if len(image) < mlen {
return 0, 0, fmt.Errorf("not enough bits")
}
image = image[mlen:]
case jpeg.MarkerStartOfFrame1:
mlen := int(image[0])<<8 | int(image[1])
if len(image) < mlen {
return 0, 0, fmt.Errorf("not enough bits")
}
var sof jpeg.StartOfFrame1
err := sof.Unmarshal(image[2:mlen])
if err != nil {
return 0, 0, err
}
return sof.Width, sof.Height, nil
case jpeg.MarkerStartOfScan:
return 0, 0, fmt.Errorf("SOF not found")
default:
return 0, 0, fmt.Errorf("unknown marker: 0x%.2x", h1)
}
}
}
type recFormatFMP4 struct {
a *Agent
tracks []*recFormatFMP4Track
hasVideo bool
currentSegment *recFormatFMP4Segment
nextSequenceNumber uint32
}
func newRecFormatFMP4(a *Agent) recFormat {
f := &recFormatFMP4{
a: a,
}
nextID := 1
addTrack := func(codec fmp4.Codec) *recFormatFMP4Track {
initTrack := &fmp4.InitTrack{
TimeScale: 90000,
Codec: codec,
}
initTrack.ID = nextID
nextID++
track := newRecFormatFMP4Track(f, initTrack)
f.tracks = append(f.tracks, track)
return track
}
updateCodecs := func() {
// if codec parameters have been updated,
// and current segment has already written codec parameters on disk,
// close current segment.
if f.currentSegment != nil && f.currentSegment.fi != nil {
f.currentSegment.close() //nolint:errcheck
f.currentSegment = nil
}
}
for _, media := range a.stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.AV1:
codec := &fmp4.CodecAV1{
SequenceHeader: []byte{
8, 0, 0, 0, 66, 167, 191, 228, 96, 13, 0, 64,
},
}
track := addTrack(codec)
firstReceived := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
}
randomAccess := false
for _, obu := range tunit.TU {
var h av1.OBUHeader
err := h.Unmarshal(obu)
if err != nil {
return err
}
if h.Type == av1.OBUTypeSequenceHeader {
if !bytes.Equal(codec.SequenceHeader, obu) {
codec.SequenceHeader = obu
updateCodecs()
}
randomAccess = true
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
sampl, err := fmp4.NewPartSampleAV1(
randomAccess,
tunit.TU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: tunit.PTS,
})
})
case *format.VP9:
codec := &fmp4.CodecVP9{
Width: 1280,
Height: 720,
Profile: 1,
BitDepth: 8,
ChromaSubsampling: 1,
ColorRange: false,
}
track := addTrack(codec)
firstReceived := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
}
var h vp9.Header
err := h.Unmarshal(tunit.Frame)
if err != nil {
return err
}
randomAccess := false
if h.FrameType == vp9.FrameTypeKeyFrame {
randomAccess = true
if w := h.Width(); codec.Width != w {
codec.Width = w
updateCodecs()
}
if h := h.Width(); codec.Height != h {
codec.Height = h
updateCodecs()
}
if codec.Profile != h.Profile {
codec.Profile = h.Profile
updateCodecs()
}
if codec.BitDepth != h.ColorConfig.BitDepth {
codec.BitDepth = h.ColorConfig.BitDepth
updateCodecs()
}
if c := h.ChromaSubsampling(); codec.ChromaSubsampling != c {
codec.ChromaSubsampling = c
updateCodecs()
}
if codec.ColorRange != h.ColorConfig.ColorRange {
codec.ColorRange = h.ColorConfig.ColorRange
updateCodecs()
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
IsNonSyncSample: !randomAccess,
Payload: tunit.Frame,
},
dts: tunit.PTS,
})
})
case *format.VP8:
// TODO
case *format.H265:
vps, sps, pps := forma.SafeParams()
if vps == nil || sps == nil || pps == nil {
vps = []byte{
0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x02, 0x20,
0x00, 0x00, 0x03, 0x00, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x7b, 0x18, 0xb0, 0x24,
}
sps = []byte{
0x42, 0x01, 0x01, 0x02, 0x20, 0x00, 0x00, 0x03,
0x00, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x7b, 0xa0, 0x07, 0x82, 0x00, 0x88, 0x7d,
0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88,
0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9,
0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc,
0xa2, 0x23, 0xff, 0x00, 0x01, 0x00, 0x01, 0x6a,
0x02, 0x02, 0x02, 0x01,
}
pps = []byte{
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
}
}
codec := &fmp4.CodecH265{
VPS: vps,
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h265.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := false
for _, nalu := range tunit.AU {
typ := h265.NALUType((nalu[0] >> 1) & 0b111111)
switch typ {
case h265.NALUType_VPS_NUT:
if !bytes.Equal(codec.VPS, nalu) {
codec.VPS = nalu
updateCodecs()
}
case h265.NALUType_SPS_NUT:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
updateCodecs()
}
case h265.NALUType_PPS_NUT:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
updateCodecs()
}
case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.H264:
sps, pps := forma.SafeParams()
if sps == nil || pps == nil {
sps = []byte{
0x67, 0x42, 0xc0, 0x1f, 0xd9, 0x00, 0xf0, 0x11,
0x7e, 0xf0, 0x11, 0x00, 0x00, 0x03, 0x00, 0x01,
0x00, 0x00, 0x03, 0x00, 0x30, 0x8f, 0x18, 0x32,
0x48,
}
pps = []byte{
0x68, 0xcb, 0x8c, 0xb2,
}
}
codec := &fmp4.CodecH264{
SPS: sps,
PPS: pps,
}
track := addTrack(codec)
var dtsExtractor *h264.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
randomAccess := false
for _, nalu := range tunit.AU {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
if !bytes.Equal(codec.SPS, nalu) {
codec.SPS = nalu
updateCodecs()
}
case h264.NALUTypePPS:
if !bytes.Equal(codec.PPS, nalu) {
codec.PPS = nalu
updateCodecs()
}
case h264.NALUTypeIDR:
randomAccess = true
}
}
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
sampl, err := fmp4.NewPartSampleH26x(
int32(durationGoToMp4(tunit.PTS-dts, 90000)),
randomAccess,
tunit.AU)
if err != nil {
return err
}
return track.record(&sample{
PartSample: sampl,
dts: dts,
})
})
case *format.MPEG4Video:
config := forma.SafeParams()
if config == nil {
config = []byte{
0x00, 0x00, 0x01, 0xb0, 0x01, 0x00, 0x00, 0x01,
0xb5, 0x89, 0x13, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x01, 0x20, 0x00, 0xc4, 0x8d, 0x88, 0x00,
0xf5, 0x3c, 0x04, 0x87, 0x14, 0x63, 0x00, 0x00,
0x01, 0xb2, 0x4c, 0x61, 0x76, 0x63, 0x35, 0x38,
0x2e, 0x31, 0x33, 0x34, 0x2e, 0x31, 0x30, 0x30,
}
}
codec := &fmp4.CodecMPEG4Video{
Config: config,
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.VisualObjectSequenceStartCode)}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MPEG1Video:
codec := &fmp4.CodecMPEG1Video{
Config: []byte{
0x00, 0x00, 0x01, 0xb3, 0x78, 0x04, 0x38, 0x35,
0xff, 0xff, 0xe0, 0x18, 0x00, 0x00, 0x01, 0xb5,
0x14, 0x4a, 0x00, 0x01, 0x00, 0x00,
},
}
track := addTrack(codec)
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
if bytes.HasPrefix(tunit.Frame, []byte{0, 0, 1, 0xB3}) {
end := bytes.Index(tunit.Frame[4:], []byte{0, 0, 1, 0xB8})
if end >= 0 {
config := tunit.Frame[:end+4]
if !bytes.Equal(codec.Config, config) {
codec.Config = config
updateCodecs()
}
}
}
if !firstReceived {
if !randomAccess {
return nil
}
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
IsNonSyncSample: !randomAccess,
},
dts: tunit.PTS,
})
})
case *format.MJPEG:
codec := &fmp4.CodecMJPEG{
Width: 800,
Height: 600,
}
track := addTrack(codec)
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MJPEG)
if tunit.Frame == nil {
return nil
}
if !parsed {
parsed = true
width, height, err := jpegExtractSize(tunit.Frame)
if err != nil {
return err
}
codec.Width = width
codec.Height = height
updateCodecs()
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Frame,
},
dts: tunit.PTS,
})
})
case *format.Opus:
codec := &fmp4.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
}
track := addTrack(codec)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
pts := tunit.PTS
for _, packet := range tunit.Packets {
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: packet,
},
dts: pts,
})
if err != nil {
return err
}
pts += opus.PacketDuration(packet)
}
return nil
})
case *format.MPEG4Audio:
codec := &fmp4.CodecMPEG4Audio{
Config: *forma.GetConfig(),
}
track := addTrack(codec)
sampleRate := time.Duration(forma.ClockRate())
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
for i, au := range tunit.AUs {
auPTS := tunit.PTS + time.Duration(i)*mpeg4audio.SamplesPerAccessUnit*
time.Second/sampleRate
err := track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: au,
},
dts: auPTS,
})
if err != nil {
return err
}
}
return nil
})
case *format.MPEG1Audio:
codec := &fmp4.CodecMPEG1Audio{
SampleRate: 32000,
ChannelCount: 2,
}
track := addTrack(codec)
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
pts := tunit.PTS
for _, frame := range tunit.Frames {
var h mpeg1audio.FrameHeader
err := h.Unmarshal(frame)
if err != nil {
return err
}
if !parsed {
parsed = true
codec.SampleRate = h.SampleRate
codec.ChannelCount = mpeg1audioChannelCount(h.ChannelMode)
updateCodecs()
}
err = track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: frame,
},
dts: pts,
})
if err != nil {
return err
}
pts += time.Duration(h.SampleCount()) *
time.Second / time.Duration(h.SampleRate)
}
return nil
})
case *format.AC3:
codec := &fmp4.CodecAC3{
SampleRate: forma.SampleRate,
ChannelCount: forma.ChannelCount,
Fscod: 0,
Bsid: 8,
Bsmod: 0,
Acmod: 7,
LfeOn: true,
BitRateCode: 7,
}
track := addTrack(codec)
parsed := false
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
}
pts := tunit.PTS
for _, frame := range tunit.Frames {
var syncInfo ac3.SyncInfo
err := syncInfo.Unmarshal(frame)
if err != nil {
return fmt.Errorf("invalid AC-3 frame: %s", err)
}
var bsi ac3.BSI
err = bsi.Unmarshal(frame[5:])
if err != nil {
return fmt.Errorf("invalid AC-3 frame: %s", err)
}
if !parsed {
parsed = true
codec.SampleRate = syncInfo.SampleRate()
codec.ChannelCount = bsi.ChannelCount()
codec.Fscod = syncInfo.Fscod
codec.Bsid = bsi.Bsid
codec.Bsmod = bsi.Bsmod
codec.Acmod = bsi.Acmod
codec.LfeOn = bsi.LfeOn
codec.BitRateCode = syncInfo.Frmsizecod >> 1
updateCodecs()
}
err = track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: frame,
},
dts: pts,
})
if err != nil {
return err
}
pts += time.Duration(ac3.SamplesPerFrame) *
time.Second / time.Duration(codec.SampleRate)
}
return nil
})
case *format.G722:
// TODO
case *format.G711:
// TODO
case *format.LPCM:
codec := &fmp4.CodecLPCM{
LittleEndian: false,
BitDepth: forma.BitDepth,
SampleRate: forma.SampleRate,
ChannelCount: forma.ChannelCount,
}
track := addTrack(codec)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.LPCM)
if tunit.Samples == nil {
return nil
}
return track.record(&sample{
PartSample: &fmp4.PartSample{
Payload: tunit.Samples,
},
dts: tunit.PTS,
})
})
}
}
}
a.Log(logger.Info, "recording %d %s",
len(f.tracks),
func() string {
if len(f.tracks) == 1 {
return "track"
}
return "tracks"
}())
return f
}
func (f *recFormatFMP4) close() {
if f.currentSegment != nil {
f.currentSegment.close() //nolint:errcheck
}
}

48
internal/record/part.go → internal/record/rec_format_fmp4_part.go

@ -12,7 +12,11 @@ import ( @@ -12,7 +12,11 @@ import (
"github.com/bluenviron/mediamtx/internal/logger"
)
func writePart(f io.Writer, sequenceNumber uint32, partTracks map[*track]*fmp4.PartTrack) error {
func writePart(
f io.Writer,
sequenceNumber uint32,
partTracks map[*recFormatFMP4Track]*fmp4.PartTrack,
) error {
fmp4PartTracks := make([]*fmp4.PartTrack, len(partTracks))
i := 0
for _, partTrack := range partTracks {
@ -35,58 +39,60 @@ func writePart(f io.Writer, sequenceNumber uint32, partTracks map[*track]*fmp4.P @@ -35,58 +39,60 @@ func writePart(f io.Writer, sequenceNumber uint32, partTracks map[*track]*fmp4.P
return err
}
type part struct {
s *segment
type recFormatFMP4Part struct {
s *recFormatFMP4Segment
sequenceNumber uint32
startDTS time.Duration
partTracks map[*track]*fmp4.PartTrack
created time.Time
partTracks map[*recFormatFMP4Track]*fmp4.PartTrack
endDTS time.Duration
}
func newPart(
s *segment,
func newRecFormatFMP4Part(
s *recFormatFMP4Segment,
sequenceNumber uint32,
startDTS time.Duration,
) *part {
return &part{
) *recFormatFMP4Part {
return &recFormatFMP4Part{
s: s,
startDTS: startDTS,
sequenceNumber: sequenceNumber,
partTracks: make(map[*track]*fmp4.PartTrack),
created: timeNow(),
partTracks: make(map[*recFormatFMP4Track]*fmp4.PartTrack),
}
}
func (p *part) close() error {
if p.s.f == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: timeNow()}, p.s.r.path)
p.s.r.Log(logger.Debug, "creating segment %s", p.s.fpath)
func (p *recFormatFMP4Part) close() error {
if p.s.fi == nil {
p.s.fpath = encodeRecordPath(&recordPathParams{time: p.created}, p.s.f.a.path)
p.s.f.a.Log(logger.Debug, "creating segment %s", p.s.fpath)
err := os.MkdirAll(filepath.Dir(p.s.fpath), 0o755)
if err != nil {
return err
}
f, err := os.Create(p.s.fpath)
fi, err := os.Create(p.s.fpath)
if err != nil {
return err
}
p.s.r.onSegmentCreate(p.s.fpath)
p.s.f.a.onSegmentCreate(p.s.fpath)
err = writeInit(f, p.s.r.tracks)
err = writeInit(fi, p.s.f.tracks)
if err != nil {
f.Close()
fi.Close()
return err
}
p.s.f = f
p.s.fi = fi
}
return writePart(p.s.f, p.sequenceNumber, p.partTracks)
return writePart(p.s.fi, p.sequenceNumber, p.partTracks)
}
func (p *part) record(track *track, sample *sample) error {
func (p *recFormatFMP4Part) record(track *recFormatFMP4Track, sample *sample) error {
partTrack, ok := p.partTracks[track]
if !ok {
partTrack = &fmp4.PartTrack{
@ -102,6 +108,6 @@ func (p *part) record(track *track, sample *sample) error { @@ -102,6 +108,6 @@ func (p *part) record(track *track, sample *sample) error {
return nil
}
func (p *part) duration() time.Duration {
func (p *recFormatFMP4Part) duration() time.Duration {
return p.endDTS - p.startDTS
}

42
internal/record/segment.go → internal/record/rec_format_fmp4_segment.go

@ -13,7 +13,7 @@ import ( @@ -13,7 +13,7 @@ import (
var timeNow = time.Now
func writeInit(f io.Writer, tracks []*track) error {
func writeInit(f io.Writer, tracks []*recFormatFMP4Track) error {
fmp4Tracks := make([]*fmp4.InitTrack, len(tracks))
for i, track := range tracks {
fmp4Tracks[i] = track.initTrack
@ -33,52 +33,52 @@ func writeInit(f io.Writer, tracks []*track) error { @@ -33,52 +33,52 @@ func writeInit(f io.Writer, tracks []*track) error {
return err
}
type segment struct {
r *Agent
type recFormatFMP4Segment struct {
f *recFormatFMP4
startDTS time.Duration
fpath string
f *os.File
curPart *part
fi *os.File
curPart *recFormatFMP4Part
}
func newSegment(
r *Agent,
func newRecFormatFMP4Segment(
f *recFormatFMP4,
startDTS time.Duration,
) *segment {
return &segment{
r: r,
) *recFormatFMP4Segment {
return &recFormatFMP4Segment{
f: f,
startDTS: startDTS,
}
}
func (s *segment) close() error {
func (s *recFormatFMP4Segment) close() error {
var err error
if s.curPart != nil {
err = s.curPart.close()
}
if s.f != nil {
s.r.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.f.Close()
if s.fi != nil {
s.f.a.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.r.onSegmentComplete(s.fpath)
s.f.a.onSegmentComplete(s.fpath)
}
}
return err
}
func (s *segment) record(track *track, sample *sample) error {
func (s *recFormatFMP4Segment) record(track *recFormatFMP4Track, sample *sample) error {
if s.curPart == nil {
s.curPart = newPart(s, s.r.nextSequenceNumber, sample.dts)
s.r.nextSequenceNumber++
} else if s.curPart.duration() >= s.r.partDuration {
s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts)
s.f.nextSequenceNumber++
} else if s.curPart.duration() >= s.f.a.partDuration {
err := s.curPart.close()
s.curPart = nil
@ -86,8 +86,8 @@ func (s *segment) record(track *track, sample *sample) error { @@ -86,8 +86,8 @@ func (s *segment) record(track *track, sample *sample) error {
return err
}
s.curPart = newPart(s, s.r.nextSequenceNumber, sample.dts)
s.r.nextSequenceNumber++
s.curPart = newRecFormatFMP4Part(s, s.f.nextSequenceNumber, sample.dts)
s.f.nextSequenceNumber++
}
return s.curPart.record(track, sample)

57
internal/record/rec_format_fmp4_track.go

@ -0,0 +1,57 @@ @@ -0,0 +1,57 @@
package record
import (
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
)
type recFormatFMP4Track struct {
f *recFormatFMP4
initTrack *fmp4.InitTrack
nextSample *sample
}
func newRecFormatFMP4Track(
f *recFormatFMP4,
initTrack *fmp4.InitTrack,
) *recFormatFMP4Track {
return &recFormatFMP4Track{
f: f,
initTrack: initTrack,
}
}
func (t *recFormatFMP4Track) record(sample *sample) error {
// wait the first video sample before setting hasVideo
if t.initTrack.Codec.IsVideo() {
t.f.hasVideo = true
}
if t.f.currentSegment == nil {
t.f.currentSegment = newRecFormatFMP4Segment(t.f, sample.dts)
}
sample, t.nextSample = t.nextSample, sample
if sample == nil {
return nil
}
sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale))
err := t.f.currentSegment.record(t, sample)
if err != nil {
return err
}
if (!t.f.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.f.currentSegment.startDTS) >= t.f.a.segmentDuration {
err := t.f.currentSegment.close()
if err != nil {
return err
}
t.f.currentSegment = newRecFormatFMP4Segment(t.f, t.nextSample.dts)
}
return nil
}

332
internal/record/rec_format_mpegts.go

@ -0,0 +1,332 @@ @@ -0,0 +1,332 @@
package record
import (
"bufio"
"bytes"
"fmt"
"io"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/ac3"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/codecs/h265"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4video"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
const (
mpegtsMaxBufferSize = 64 * 1024
)
func durationGoToMPEGTS(v time.Duration) int64 {
return int64(v.Seconds() * 90000)
}
type dynamicWriter struct {
w io.Writer
}
func (d *dynamicWriter) Write(p []byte) (int, error) {
return d.w.Write(p)
}
func (d *dynamicWriter) setTarget(w io.Writer) {
d.w = w
}
type recFormatMPEGTS struct {
a *Agent
dw *dynamicWriter
bw *bufio.Writer
mw *mpegts.Writer
hasVideo bool
currentSegment *recFormatMPEGTSSegment
}
func newRecFormatMPEGTS(a *Agent) recFormat {
f := &recFormatMPEGTS{
a: a,
}
var tracks []*mpegts.Track
addTrack := func(codec mpegts.Codec) *mpegts.Track {
track := &mpegts.Track{
Codec: codec,
}
tracks = append(tracks, track)
return track
}
for _, media := range a.stream.Desc().Medias {
for _, forma := range media.Formats {
switch forma := forma.(type) {
case *format.H265:
track := addTrack(&mpegts.CodecH265{})
var dtsExtractor *h265.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H265)
if tunit.AU == nil {
return nil
}
randomAccess := h265.IsRandomAccess(tunit.AU)
if dtsExtractor == nil {
if !randomAccess {
return nil
}
dtsExtractor = h265.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), randomAccess, tunit.AU)
})
case *format.H264:
track := addTrack(&mpegts.CodecH264{})
var dtsExtractor *h264.DTSExtractor
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
idrPresent := h264.IDRPresent(tunit.AU)
if dtsExtractor == nil {
if !idrPresent {
return nil
}
dtsExtractor = h264.NewDTSExtractor()
}
dts, err := dtsExtractor.Extract(tunit.AU, tunit.PTS)
if err != nil {
return err
}
return f.recordH26x(track, dts, durationGoToMPEGTS(tunit.PTS), durationGoToMPEGTS(dts), idrPresent, tunit.AU)
})
case *format.MPEG4Video:
track := addTrack(&mpegts.CodecMPEG4Video{})
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-4 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, byte(mpeg4video.GroupOfVOPStartCode)})
err := f.setupSegment(tunit.PTS, true, randomAccess)
if err != nil {
return err
}
return f.mw.WriteMPEG4Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
})
case *format.MPEG1Video:
track := addTrack(&mpegts.CodecMPEG1Video{})
firstReceived := false
var lastPTS time.Duration
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Video)
if tunit.Frame == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("MPEG-1 Video streams with B-frames are not supported (yet)")
}
lastPTS = tunit.PTS
f.hasVideo = true
randomAccess := bytes.Contains(tunit.Frame, []byte{0, 0, 1, 0xB8})
err := f.setupSegment(tunit.PTS, true, randomAccess)
if err != nil {
return err
}
return f.mw.WriteMPEG1Video(track, durationGoToMPEGTS(tunit.PTS), tunit.Frame)
})
case *format.Opus:
track := addTrack(&mpegts.CodecOpus{
ChannelCount: func() int {
if forma.IsStereo {
return 2
}
return 1
}(),
})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.Opus)
if tunit.Packets == nil {
return nil
}
err := f.setupSegment(tunit.PTS, false, true)
if err != nil {
return err
}
return f.mw.WriteOpus(track, durationGoToMPEGTS(tunit.PTS), tunit.Packets)
})
case *format.MPEG4Audio:
track := addTrack(&mpegts.CodecMPEG4Audio{
Config: *forma.GetConfig(),
})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG4Audio)
if tunit.AUs == nil {
return nil
}
err := f.setupSegment(tunit.PTS, false, true)
if err != nil {
return err
}
return f.mw.WriteMPEG4Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.AUs)
})
case *format.MPEG1Audio:
track := addTrack(&mpegts.CodecMPEG1Audio{})
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.MPEG1Audio)
if tunit.Frames == nil {
return nil
}
err := f.setupSegment(tunit.PTS, false, true)
if err != nil {
return err
}
return f.mw.WriteMPEG1Audio(track, durationGoToMPEGTS(tunit.PTS), tunit.Frames)
})
case *format.AC3:
track := addTrack(&mpegts.CodecAC3{})
sampleRate := time.Duration(forma.SampleRate)
a.stream.AddReader(a.writer, media, forma, func(u unit.Unit) error {
tunit := u.(*unit.AC3)
if tunit.Frames == nil {
return nil
}
for i, frame := range tunit.Frames {
framePTS := tunit.PTS + time.Duration(i)*ac3.SamplesPerFrame*
time.Second/sampleRate
err := f.mw.WriteAC3(track, durationGoToMPEGTS(framePTS), frame)
if err != nil {
return err
}
}
return nil
})
}
}
}
f.dw = &dynamicWriter{}
f.bw = bufio.NewWriterSize(f.dw, mpegtsMaxBufferSize)
f.mw = mpegts.NewWriter(f.bw, tracks)
a.Log(logger.Info, "recording %d %s",
len(tracks),
func() string {
if len(tracks) == 1 {
return "track"
}
return "tracks"
}())
return f
}
func (f *recFormatMPEGTS) close() {
if f.currentSegment != nil {
f.currentSegment.close() //nolint:errcheck
}
}
func (f *recFormatMPEGTS) setupSegment(dts time.Duration, isVideo bool, randomAccess bool) error {
switch {
case f.currentSegment == nil:
f.currentSegment = newRecFormatMPEGTSSegment(f, dts)
case (!f.hasVideo || isVideo) &&
randomAccess &&
(dts-f.currentSegment.startDTS) >= f.a.segmentDuration:
err := f.currentSegment.close()
if err != nil {
return err
}
f.currentSegment = newRecFormatMPEGTSSegment(f, dts)
case (dts - f.currentSegment.lastFlush) >= f.a.partDuration:
err := f.bw.Flush()
if err != nil {
return err
}
f.currentSegment.lastFlush = dts
}
return nil
}
func (f *recFormatMPEGTS) recordH26x(track *mpegts.Track, goDTS time.Duration,
pts int64, dts int64, randomAccess bool, au [][]byte,
) error {
f.hasVideo = true
err := f.setupSegment(goDTS, true, randomAccess)
if err != nil {
return err
}
return f.mw.WriteH26x(track, pts, dts, randomAccess, au)
}

73
internal/record/rec_format_mpegts_segment.go

@ -0,0 +1,73 @@ @@ -0,0 +1,73 @@
package record
import (
"os"
"path/filepath"
"time"
"github.com/bluenviron/mediamtx/internal/logger"
)
type recFormatMPEGTSSegment struct {
f *recFormatMPEGTS
startDTS time.Duration
lastFlush time.Duration
created time.Time
fpath string
fi *os.File
}
func newRecFormatMPEGTSSegment(f *recFormatMPEGTS, startDTS time.Duration) *recFormatMPEGTSSegment {
s := &recFormatMPEGTSSegment{
f: f,
startDTS: startDTS,
lastFlush: startDTS,
created: timeNow(),
}
f.dw.setTarget(s)
return s
}
func (s *recFormatMPEGTSSegment) close() error {
err := s.f.bw.Flush()
if s.fi != nil {
s.f.a.Log(logger.Debug, "closing segment %s", s.fpath)
err2 := s.fi.Close()
if err == nil {
err = err2
}
if err2 == nil {
s.f.a.onSegmentComplete(s.fpath)
}
}
return err
}
func (s *recFormatMPEGTSSegment) Write(p []byte) (int, error) {
if s.fi == nil {
s.fpath = encodeRecordPath(&recordPathParams{time: s.created}, s.f.a.path)
s.f.a.Log(logger.Debug, "creating segment %s", s.fpath)
err := os.MkdirAll(filepath.Dir(s.fpath), 0o755)
if err != nil {
return 0, err
}
fi, err := os.Create(s.fpath)
if err != nil {
return 0, err
}
s.f.a.onSegmentCreate(s.fpath)
s.fi = fi
}
return s.fi.Write(p)
}

57
internal/record/track.go

@ -1,57 +0,0 @@ @@ -1,57 +0,0 @@
package record
import (
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
)
type track struct {
r *Agent
initTrack *fmp4.InitTrack
nextSample *sample
}
func newTrack(
r *Agent,
initTrack *fmp4.InitTrack,
) *track {
return &track{
r: r,
initTrack: initTrack,
}
}
func (t *track) record(sample *sample) error {
// wait the first video sample before setting hasVideo
if t.initTrack.Codec.IsVideo() {
t.r.hasVideo = true
}
if t.r.currentSegment == nil {
t.r.currentSegment = newSegment(t.r, sample.dts)
}
sample, t.nextSample = t.nextSample, sample
if sample == nil {
return nil
}
sample.Duration = uint32(durationGoToMp4(t.nextSample.dts-sample.dts, t.initTrack.TimeScale))
err := t.r.currentSegment.record(t, sample)
if err != nil {
return err
}
if (!t.r.hasVideo || t.initTrack.Codec.IsVideo()) &&
!t.nextSample.IsNonSyncSample &&
(t.nextSample.dts-t.r.currentSegment.startDTS) >= t.r.segmentDuration {
err := t.r.currentSegment.close()
if err != nil {
return err
}
t.r.currentSegment = newSegment(t.r, t.nextSample.dts)
}
return nil
}

19
internal/stream/stream.go

@ -119,6 +119,25 @@ func (s *Stream) RemoveReader(r *asyncwriter.Writer) { @@ -119,6 +119,25 @@ func (s *Stream) RemoveReader(r *asyncwriter.Writer) {
}
}
// MediasForReader returns all medias that a reader is reading.
func (s *Stream) MediasForReader(r *asyncwriter.Writer) []*description.Media {
s.mutex.Lock()
defer s.mutex.Unlock()
var medias []*description.Media
for media, sm := range s.smedias {
for _, sf := range sm.formats {
if _, ok := sf.readers[r]; ok {
medias = append(medias, media)
break
}
}
}
return medias
}
// WriteUnit writes a Unit.
func (s *Stream) WriteUnit(medi *description.Media, forma format.Format, u unit.Unit) {
sm := s.smedias[medi]

3
mediamtx.yml

@ -300,9 +300,10 @@ pathDefaults: @@ -300,9 +300,10 @@ pathDefaults:
# Available variables are %path (path name), %Y %m %d %H %M %S %f (time in strftime format)
recordPath: ./recordings/%path/%Y-%m-%d_%H-%M-%S-%f
# Format of recorded segments.
# Currently the only available format is fmp4 (fragmented MP4).
# Available formats are "fmp4" (fragmented MP4) and "mpegts" (MPEG-TS).
recordFormat: fmp4
# fMP4 segments are concatenation of small MP4 files (parts), each with this duration.
# MPEG-TS segments are concatenation of 188-bytes packets, flushed to disk with this period.
# When a system failure occurs, the last part gets lost.
# Therefore, the part duration is equal to the RPO (recovery point objective).
recordPartDuration: 100ms

Loading…
Cancel
Save