From 50322fc14eb1c883b6e09cd66a0a0223bf8f674d Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sat, 6 Apr 2024 20:01:23 +0200 Subject: [PATCH] playback: do not concatenate segments with different tracks (#3197) --- internal/playback/fmp4.go | 494 ++++++++---------- internal/playback/on_get.go | 179 +++++++ .../{server_test.go => on_get_test.go} | 146 ++++-- internal/playback/on_list.go | 120 +++++ internal/playback/on_list_test.go | 134 +++++ internal/playback/segment.go | 30 +- internal/playback/server.go | 184 +------ 7 files changed, 783 insertions(+), 504 deletions(-) create mode 100644 internal/playback/on_get.go rename internal/playback/{server_test.go => on_get_test.go} (68%) create mode 100644 internal/playback/on_list.go create mode 100644 internal/playback/on_list_test.go diff --git a/internal/playback/fmp4.go b/internal/playback/fmp4.go index 55630844..c7819749 100644 --- a/internal/playback/fmp4.go +++ b/internal/playback/fmp4.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "os" "time" "github.com/abema/go-mp4" @@ -15,6 +14,8 @@ import ( const ( sampleFlagIsNonSyncSample = 1 << 16 + concatenationTolerance = 1 * time.Second + fmp4Timescale = 90000 ) func durationGoToMp4(v time.Duration, timeScale uint32) uint64 { @@ -33,6 +34,17 @@ func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { var errTerminated = errors.New("terminated") +func fmp4CanBeConcatenated( + prevInit []byte, + prevEnd time.Time, + curInit []byte, + curStart time.Time, +) bool { + return bytes.Equal(prevInit, curInit) && + !curStart.Before(prevEnd.Add(-concatenationTolerance)) && + !curStart.After(prevEnd.Add(concatenationTolerance)) +} + func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) { buf := make([]byte, 8) _, err := io.ReadFull(r, buf) @@ -83,6 +95,217 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) { return buf, nil } +func fmp4ReadDuration(r io.ReadSeeker) (time.Duration, error) { + // find and skip ftyp + + buf := make([]byte, 8) + _, err := io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { + return 0, fmt.Errorf("ftyp box not found") + } + + ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(ftypSize), io.SeekStart) + if err != nil { + return 0, err + } + + // find and skip moov + + _, err = io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { + return 0, fmt.Errorf("moov box not found") + } + + moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(moovSize)-8, io.SeekCurrent) + if err != nil { + return 0, err + } + + // find last valid moof and mdat + + lastMoofPos := int64(-1) + + for { + moofPos, err := r.Seek(0, io.SeekCurrent) + if err != nil { + return 0, err + } + + _, err = io.ReadFull(r, buf) + if err != nil { + break + } + + if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'f'}) { + return 0, fmt.Errorf("moof box not found") + } + + moofSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(moofSize)-8, io.SeekCurrent) + if err != nil { + break + } + + _, err = io.ReadFull(r, buf) + if err != nil { + break + } + + if !bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) { + return 0, fmt.Errorf("mdat box not found") + } + + mdatSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(mdatSize)-8, io.SeekCurrent) + if err != nil { + break + } + + lastMoofPos = moofPos + } + + if lastMoofPos < 0 { + return 0, fmt.Errorf("no moof boxes found") + } + + // open last moof + + _, err = r.Seek(lastMoofPos+8, io.SeekStart) + if err != nil { + return 0, err + } + + _, err = io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + // skip mfhd + + if !bytes.Equal(buf[4:], []byte{'m', 'f', 'h', 'd'}) { + return 0, fmt.Errorf("mfhd box not found") + } + + _, err = r.Seek(8, io.SeekCurrent) + if err != nil { + return 0, err + } + + maxElapsed := uint64(0) + + // foreach traf + + for { + _, err := io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) { + if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) { + break + } + return 0, fmt.Errorf("traf box not found") + } + + // skip tfhd + + _, err = io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'t', 'f', 'h', 'd'}) { + return 0, fmt.Errorf("tfhd box not found") + } + + tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + _, err = r.Seek(int64(tfhdSize)-8, io.SeekCurrent) + if err != nil { + return 0, err + } + + // parse tfdt + + _, err = io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'t', 'f', 'd', 't'}) { + return 0, fmt.Errorf("tfdt box not found") + } + + tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + buf2 := make([]byte, tfdtSize-8) + + _, err = io.ReadFull(r, buf2) + if err != nil { + return 0, err + } + + var tfdt mp4.Tfdt + _, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfdt, mp4.Context{}) + if err != nil { + return 0, fmt.Errorf("invalid tfdt box: %w", err) + } + + elapsed := tfdt.BaseMediaDecodeTimeV1 + + // parse trun + + _, err = io.ReadFull(r, buf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf[4:], []byte{'t', 'r', 'u', 'n'}) { + return 0, fmt.Errorf("trun box not found") + } + + trunSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) + + buf2 = make([]byte, trunSize-8) + + _, err = io.ReadFull(r, buf2) + if err != nil { + return 0, err + } + + var trun mp4.Trun + _, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &trun, mp4.Context{}) + if err != nil { + return 0, fmt.Errorf("invalid trun box: %w", err) + } + + for _, entry := range trun.Entries { + elapsed += uint64(entry.SampleDuration) + } + + if elapsed > maxElapsed { + maxElapsed = elapsed + } + } + + return durationMp4ToGo(maxElapsed, fmp4Timescale), nil +} + func fmp4SeekAndMuxParts( r io.ReadSeeker, init []byte, @@ -90,8 +313,8 @@ func fmp4SeekAndMuxParts( maxTime time.Duration, w io.Writer, ) (time.Duration, error) { - minTimeMP4 := durationGoToMp4(minTime, 90000) - maxTimeMP4 := durationGoToMp4(maxTime, 90000) + minTimeMP4 := durationGoToMp4(minTime, fmp4Timescale) + maxTimeMP4 := durationGoToMp4(maxTime, fmp4Timescale) moofOffset := uint64(0) var tfhd *mp4.Tfhd var tfdt *mp4.Tfdt @@ -246,7 +469,7 @@ func fmp4SeekAndMuxParts( elapsed -= minTimeMP4 - return durationMp4ToGo(elapsed, 90000), nil + return durationMp4ToGo(elapsed, fmp4Timescale), nil } func fmp4MuxParts( @@ -255,7 +478,7 @@ func fmp4MuxParts( maxTime time.Duration, w io.Writer, ) (time.Duration, error) { - maxTimeMP4 := durationGoToMp4(maxTime, 90000) + maxTimeMP4 := durationGoToMp4(maxTime, fmp4Timescale) moofOffset := uint64(0) var tfhd *mp4.Tfhd var tfdt *mp4.Tfdt @@ -294,7 +517,7 @@ func fmp4MuxParts( outTrack = &fmp4.PartTrack{ ID: int(tfhd.TrackID), - BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, 90000), + BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, fmp4Timescale), } case "trun": @@ -367,262 +590,5 @@ func fmp4MuxParts( return 0, err } - return durationMp4ToGo(elapsed, 90000), nil -} - -func fmp4SeekAndMux( - fpath string, - minTime time.Duration, - maxTime time.Duration, - w io.Writer, -) (time.Duration, error) { - f, err := os.Open(fpath) - if err != nil { - return 0, err - } - defer f.Close() - - init, err := fmp4ReadInit(f) - if err != nil { - return 0, err - } - - elapsed, err := fmp4SeekAndMuxParts(f, init, minTime, maxTime, w) - if err != nil { - return 0, err - } - - return elapsed, nil -} - -func fmp4Mux( - fpath string, - startTime time.Duration, - maxTime time.Duration, - w io.Writer, -) (time.Duration, error) { - f, err := os.Open(fpath) - if err != nil { - return 0, err - } - defer f.Close() - - return fmp4MuxParts(f, startTime, maxTime, w) -} - -func fmp4Duration(fpath string) (time.Duration, error) { - f, err := os.Open(fpath) - if err != nil { - return 0, err - } - defer f.Close() - - // find and skip ftyp - - buf := make([]byte, 8) - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) { - return 0, fmt.Errorf("ftyp box not found") - } - - ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - _, err = f.Seek(int64(ftypSize), io.SeekStart) - if err != nil { - return 0, err - } - - // find and skip moov - - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) { - return 0, fmt.Errorf("moov box not found") - } - - moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - _, err = f.Seek(int64(moovSize)-8, io.SeekCurrent) - if err != nil { - return 0, err - } - - // find last valid moof and mdat - - lastMoofPos := int64(-1) - - for { - moofPos, err := f.Seek(0, io.SeekCurrent) - if err != nil { - return 0, err - } - - _, err = io.ReadFull(f, buf) - if err != nil { - break - } - - if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'f'}) { - return 0, fmt.Errorf("moof box not found") - } - - moofSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - _, err = f.Seek(int64(moofSize)-8, io.SeekCurrent) - if err != nil { - break - } - - _, err = io.ReadFull(f, buf) - if err != nil { - break - } - - if !bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) { - return 0, fmt.Errorf("mdat box not found") - } - - mdatSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - _, err = f.Seek(int64(mdatSize)-8, io.SeekCurrent) - if err != nil { - break - } - - lastMoofPos = moofPos - } - - if lastMoofPos < 0 { - return 0, fmt.Errorf("no moof boxes found") - } - - // open last moof - - _, err = f.Seek(lastMoofPos+8, io.SeekStart) - if err != nil { - return 0, err - } - - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - // skip mfhd - - if !bytes.Equal(buf[4:], []byte{'m', 'f', 'h', 'd'}) { - return 0, fmt.Errorf("mfhd box not found") - } - - _, err = f.Seek(8, io.SeekCurrent) - if err != nil { - return 0, err - } - - maxElapsed := uint64(0) - - // foreach traf - - for { - _, err := io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) { - if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) { - break - } - return 0, fmt.Errorf("traf box not found") - } - - // skip tfhd - - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'t', 'f', 'h', 'd'}) { - return 0, fmt.Errorf("tfhd box not found") - } - - tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - _, err = f.Seek(int64(tfhdSize)-8, io.SeekCurrent) - if err != nil { - return 0, err - } - - // parse tfdt - - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'t', 'f', 'd', 't'}) { - return 0, fmt.Errorf("tfdt box not found") - } - - tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - buf2 := make([]byte, tfdtSize-8) - - _, err = io.ReadFull(f, buf2) - if err != nil { - return 0, err - } - - var tfdt mp4.Tfdt - _, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfdt, mp4.Context{}) - if err != nil { - return 0, fmt.Errorf("invalid tfdt box: %w", err) - } - - elapsed := tfdt.BaseMediaDecodeTimeV1 - - // parse trun - - _, err = io.ReadFull(f, buf) - if err != nil { - return 0, err - } - - if !bytes.Equal(buf[4:], []byte{'t', 'r', 'u', 'n'}) { - return 0, fmt.Errorf("trun box not found") - } - - trunSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3]) - - buf2 = make([]byte, trunSize-8) - - _, err = io.ReadFull(f, buf2) - if err != nil { - return 0, err - } - - var trun mp4.Trun - _, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &trun, mp4.Context{}) - if err != nil { - return 0, fmt.Errorf("invalid trun box: %w", err) - } - - for _, entry := range trun.Entries { - elapsed += uint64(entry.SampleDuration) - } - - if elapsed > maxElapsed { - maxElapsed = elapsed - } - } - - return durationMp4ToGo(maxElapsed, 90000), nil + return durationMp4ToGo(elapsed, fmp4Timescale), nil } diff --git a/internal/playback/on_get.go b/internal/playback/on_get.go new file mode 100644 index 00000000..4cea1839 --- /dev/null +++ b/internal/playback/on_get.go @@ -0,0 +1,179 @@ +package playback + +import ( + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "strconv" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/gin-gonic/gin" +) + +var errStopIteration = errors.New("stop iteration") + +func parseDuration(raw string) (time.Duration, error) { + // seconds + if secs, err := strconv.ParseFloat(raw, 64); err == nil { + return time.Duration(secs * float64(time.Second)), nil + } + + // deprecated, golang format + return time.ParseDuration(raw) +} + +func seekAndMux( + recordFormat conf.RecordFormat, + segments []*Segment, + start time.Time, + duration time.Duration, + w io.Writer, +) error { + if recordFormat == conf.RecordFormatFMP4 { + minTime := start.Sub(segments[0].Start) + maxTime := minTime + duration + var init []byte + var elapsed time.Duration + + err := func() error { + f, err := os.Open(segments[0].Fpath) + if err != nil { + return err + } + defer f.Close() + + init, err = fmp4ReadInit(f) + if err != nil { + return err + } + + elapsed, err = fmp4SeekAndMuxParts(f, init, minTime, maxTime, w) + if err != nil { + return err + } + + return nil + }() + if err != nil { + return err + } + + prevInit := init + prevEnd := start.Add(elapsed) + duration -= elapsed + overallElapsed := elapsed + + for _, seg := range segments[1:] { + err := func() error { + f, err := os.Open(seg.Fpath) + if err != nil { + return err + } + defer f.Close() + + init, err := fmp4ReadInit(f) + if err != nil { + return err + } + + if !fmp4CanBeConcatenated(prevInit, prevEnd, init, seg.Start) { + return errStopIteration + } + + elapsed, err = fmp4MuxParts(f, overallElapsed, duration, w) + if err != nil { + return err + } + + return nil + }() + if err != nil { + if errors.Is(err, errStopIteration) { + return nil + } + + return err + } + + prevEnd = seg.Start.Add(elapsed) + duration -= elapsed + overallElapsed += elapsed + } + + return nil + } + + return fmt.Errorf("MPEG-TS format is not supported yet") +} + +func (p *Server) onGet(ctx *gin.Context) { + pathName := ctx.Query("path") + + if !p.doAuth(ctx, pathName) { + return + } + + start, err := time.Parse(time.RFC3339, ctx.Query("start")) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err)) + return + } + + duration, err := parseDuration(ctx.Query("duration")) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err)) + return + } + + format := ctx.Query("format") + if format != "" && format != "fmp4" { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format)) + return + } + + pathConf, err := p.safeFindPathConf(pathName) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, err) + return + } + + segments, err := findSegmentsInTimespan(pathConf, pathName, start, duration) + if err != nil { + if errors.Is(err, errNoSegmentsFound) { + p.writeError(ctx, http.StatusNotFound, err) + } else { + p.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + ww := &writerWrapper{ctx: ctx} + + err = seekAndMux(pathConf.RecordFormat, segments, start, duration, ww) + if err != nil { + // user aborted the download + var neterr *net.OpError + if errors.As(err, &neterr) { + return + } + + // nothing has been written yet; send back JSON + if !ww.written { + if errors.Is(err, errNoSegmentsFound) { + p.writeError(ctx, http.StatusNotFound, err) + } else { + p.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + // something has already been written: abort and write logs only + p.Log(logger.Error, err.Error()) + return + } +} diff --git a/internal/playback/server_test.go b/internal/playback/on_get_test.go similarity index 68% rename from internal/playback/server_test.go rename to internal/playback/on_get_test.go index f56d6ec0..266a1588 100644 --- a/internal/playback/server_test.go +++ b/internal/playback/on_get_test.go @@ -1,7 +1,6 @@ package playback import ( - "encoding/json" "io" "net/http" "net/url" @@ -10,6 +9,7 @@ import ( "testing" "time" + "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/bluenviron/mediacommon/pkg/formats/fmp4" "github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer" "github.com/bluenviron/mediamtx/internal/auth" @@ -20,14 +20,27 @@ import ( func writeSegment1(t *testing.T, fpath string) { init := fmp4.Init{ - Tracks: []*fmp4.InitTrack{{ - ID: 1, - TimeScale: 90000, - Codec: &fmp4.CodecH264{ - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + }, }, - }}, + { + ID: 2, + TimeScale: 90000, + Codec: &fmp4.CodecMPEG4Audio{ + Config: mpeg4audio.Config{ + Type: mpeg4audio.ObjectTypeAACLC, + SampleRate: 48000, + ChannelCount: 2, + }, + }, + }, + }, } var buf1 seekablebuffer.Buffer @@ -78,14 +91,27 @@ func writeSegment1(t *testing.T, fpath string) { func writeSegment2(t *testing.T, fpath string) { init := fmp4.Init{ - Tracks: []*fmp4.InitTrack{{ - ID: 1, - TimeScale: 90000, - Codec: &fmp4.CodecH264{ - SPS: test.FormatH264.SPS, - PPS: test.FormatH264.PPS, + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + }, }, - }}, + { + ID: 2, + TimeScale: 90000, + Codec: &fmp4.CodecMPEG4Audio{ + Config: mpeg4audio.Config{ + Type: mpeg4audio.ObjectTypeAACLC, + SampleRate: 48000, + ChannelCount: 2, + }, + }, + }, + }, } var buf1 seekablebuffer.Buffer @@ -121,6 +147,48 @@ func writeSegment2(t *testing.T, fpath string) { require.NoError(t, err) } +func writeSegment3(t *testing.T, fpath string) { + init := fmp4.Init{ + Tracks: []*fmp4.InitTrack{ + { + ID: 1, + TimeScale: 90000, + Codec: &fmp4.CodecH264{ + SPS: test.FormatH264.SPS, + PPS: test.FormatH264.PPS, + }, + }, + }, + } + + var buf1 seekablebuffer.Buffer + err := init.Marshal(&buf1) + require.NoError(t, err) + + var buf2 seekablebuffer.Buffer + parts := fmp4.Parts{ + { + SequenceNumber: 1, + Tracks: []*fmp4.PartTrack{{ + ID: 1, + BaseTime: 0, + Samples: []*fmp4.PartSample{ + { + Duration: 1 * 90000, + IsNonSyncSample: false, + Payload: []byte{10, 11}, + }, + }, + }}, + }, + } + err = parts.Marshal(&buf2) + require.NoError(t, err) + + err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644) + require.NoError(t, err) +} + var authManager = &auth.Manager{ Method: conf.AuthMethodInternal, InternalUsers: []conf.AuthInternalUser{ @@ -138,7 +206,7 @@ var authManager = &auth.Manager{ RTSPAuthMethods: nil, } -func TestServerGet(t *testing.T) { +func TestOnGet(t *testing.T) { dir, err := os.MkdirTemp("", "mediamtx-playback") require.NoError(t, err) defer os.RemoveAll(dir) @@ -229,7 +297,7 @@ func TestServerGet(t *testing.T) { }, parts) } -func TestServerList(t *testing.T) { +func TestOnGetDifferentInit(t *testing.T) { dir, err := os.MkdirTemp("", "mediamtx-playback") require.NoError(t, err) defer os.RemoveAll(dir) @@ -238,8 +306,7 @@ func TestServerList(t *testing.T) { require.NoError(t, err) writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4")) - writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4")) - writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-500000.mp4")) + writeSegment3(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4")) s := &Server{ Address: "127.0.0.1:9996", @@ -257,11 +324,14 @@ func TestServerList(t *testing.T) { require.NoError(t, err) defer s.Close() - u, err := url.Parse("http://myuser:mypass@localhost:9996/list") + u, err := url.Parse("http://myuser:mypass@localhost:9996/get") require.NoError(t, err) v := url.Values{} v.Set("path", "mypath") + v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano)) + v.Set("duration", "2") + v.Set("format", "fmp4") u.RawQuery = v.Encode() req, err := http.NewRequest(http.MethodGet, u.String(), nil) @@ -273,18 +343,32 @@ func TestServerList(t *testing.T) { require.Equal(t, http.StatusOK, res.StatusCode) - var out interface{} - err = json.NewDecoder(res.Body).Decode(&out) + buf, err := io.ReadAll(res.Body) require.NoError(t, err) - require.Equal(t, []interface{}{ - map[string]interface{}{ - "duration": float64(64), - "start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), - }, - map[string]interface{}{ - "duration": float64(2), - "start": time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano), + var parts fmp4.Parts + err = parts.Unmarshal(buf) + require.NoError(t, err) + + require.Equal(t, fmp4.Parts{ + { + SequenceNumber: 0, + Tracks: []*fmp4.PartTrack{ + { + ID: 1, + Samples: []*fmp4.PartSample{ + { + Duration: 0, + Payload: []byte{3, 4}, + }, + { + Duration: 90000, + IsNonSyncSample: true, + Payload: []byte{5, 6}, + }, + }, + }, + }, }, - }, out) + }, parts) } diff --git a/internal/playback/on_list.go b/internal/playback/on_list.go new file mode 100644 index 00000000..57c14355 --- /dev/null +++ b/internal/playback/on_list.go @@ -0,0 +1,120 @@ +package playback + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/gin-gonic/gin" +) + +type listEntryDuration time.Duration + +func (d listEntryDuration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(d).Seconds()) +} + +type listEntry struct { + Start time.Time `json:"start"` + Duration listEntryDuration `json:"duration"` +} + +func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*Segment) ([]listEntry, error) { + if recordFormat == conf.RecordFormatFMP4 { + out := []listEntry{} + var prevInit []byte + + for _, seg := range segments { + err := func() error { + f, err := os.Open(seg.Fpath) + if err != nil { + return err + } + defer f.Close() + + init, err := fmp4ReadInit(f) + if err != nil { + return err + } + + _, err = f.Seek(0, io.SeekStart) + if err != nil { + return err + } + + duration, err := fmp4ReadDuration(f) + if err != nil { + return err + } + + if len(out) != 0 && fmp4CanBeConcatenated( + prevInit, + out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)), + init, + seg.Start) { + prevStart := out[len(out)-1].Start + curEnd := seg.Start.Add(duration) + out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart)) + } else { + out = append(out, listEntry{ + Start: seg.Start, + Duration: listEntryDuration(duration), + }) + } + + prevInit = init + + return nil + }() + if err != nil { + return nil, err + } + } + + return out, nil + } + + return nil, fmt.Errorf("MPEG-TS format is not supported yet") +} + +func (p *Server) onList(ctx *gin.Context) { + pathName := ctx.Query("path") + + if !p.doAuth(ctx, pathName) { + return + } + + pathConf, err := p.safeFindPathConf(pathName) + if err != nil { + p.writeError(ctx, http.StatusBadRequest, err) + return + } + + if !pathConf.Playback { + p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName)) + return + } + + segments, err := FindSegments(pathConf, pathName) + if err != nil { + if errors.Is(err, errNoSegmentsFound) { + p.writeError(ctx, http.StatusNotFound, err) + } else { + p.writeError(ctx, http.StatusBadRequest, err) + } + return + } + + out, err := computeDurationAndConcatenate(pathConf.RecordFormat, segments) + if err != nil { + p.writeError(ctx, http.StatusInternalServerError, err) + return + } + + ctx.JSON(http.StatusOK, out) +} diff --git a/internal/playback/on_list_test.go b/internal/playback/on_list_test.go new file mode 100644 index 00000000..639057e4 --- /dev/null +++ b/internal/playback/on_list_test.go @@ -0,0 +1,134 @@ +package playback + +import ( + "encoding/json" + "net/http" + "net/url" + "os" + "path/filepath" + "testing" + "time" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/stretchr/testify/require" +) + +func TestOnList(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755) + require.NoError(t, err) + + writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4")) + writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4")) + writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-500000.mp4")) + + s := &Server{ + Address: "127.0.0.1:9996", + ReadTimeout: conf.StringDuration(10 * time.Second), + PathConfs: map[string]*conf.Path{ + "mypath": { + Playback: true, + RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"), + }, + }, + AuthManager: authManager, + Parent: &test.NilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + u, err := url.Parse("http://myuser:mypass@localhost:9996/list") + require.NoError(t, err) + + v := url.Values{} + v.Set("path", "mypath") + u.RawQuery = v.Encode() + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + require.NoError(t, err) + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out interface{} + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, []interface{}{ + map[string]interface{}{ + "duration": float64(64), + "start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), + }, + map[string]interface{}{ + "duration": float64(2), + "start": time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano), + }, + }, out) +} + +func TestOnListDifferentInit(t *testing.T) { + dir, err := os.MkdirTemp("", "mediamtx-playback") + require.NoError(t, err) + defer os.RemoveAll(dir) + + err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755) + require.NoError(t, err) + + writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4")) + writeSegment3(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4")) + + s := &Server{ + Address: "127.0.0.1:9996", + ReadTimeout: conf.StringDuration(10 * time.Second), + PathConfs: map[string]*conf.Path{ + "mypath": { + Playback: true, + RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"), + }, + }, + AuthManager: authManager, + Parent: &test.NilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + u, err := url.Parse("http://myuser:mypass@localhost:9996/list") + require.NoError(t, err) + + v := url.Values{} + v.Set("path", "mypath") + u.RawQuery = v.Encode() + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + require.NoError(t, err) + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer res.Body.Close() + + require.Equal(t, http.StatusOK, res.StatusCode) + + var out interface{} + err = json.NewDecoder(res.Body).Decode(&out) + require.NoError(t, err) + + require.Equal(t, []interface{}{ + map[string]interface{}{ + "duration": float64(62), + "start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano), + }, + map[string]interface{}{ + "duration": float64(1), + "start": time.Date(2008, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano), + }, + }, out) +} diff --git a/internal/playback/segment.go b/internal/playback/segment.go index 33b117f5..67b01b41 100644 --- a/internal/playback/segment.go +++ b/internal/playback/segment.go @@ -14,9 +14,8 @@ import ( // Segment is a recording segment. type Segment struct { - fpath string - Start time.Time - duration time.Duration + Fpath string + Start time.Time } func findSegmentsInTimespan( @@ -54,7 +53,7 @@ func findSegmentsInTimespan( // gather all segments that starts before the end of the playback if ok && !end.Before(pa.Start) { segments = append(segments, &Segment{ - fpath: fpath, + Fpath: fpath, Start: pa.Start, }) } @@ -126,7 +125,7 @@ func FindSegments( ok := pa.Decode(recordPath, fpath) if ok { segments = append(segments, &Segment{ - fpath: fpath, + Fpath: fpath, Start: pa.Start, }) } @@ -148,24 +147,3 @@ func FindSegments( return segments, nil } - -func canBeConcatenated(seg1, seg2 *Segment) bool { - end1 := seg1.Start.Add(seg1.duration) - return !seg2.Start.Before(end1.Add(-concatenationTolerance)) && !seg2.Start.After(end1.Add(concatenationTolerance)) -} - -func mergeConcatenatedSegments(in []*Segment) []*Segment { - var out []*Segment - - for _, seg := range in { - if len(out) != 0 && canBeConcatenated(out[len(out)-1], seg) { - start := out[len(out)-1].Start - end := seg.Start.Add(seg.duration) - out[len(out)-1].duration = end.Sub(start) - } else { - out = append(out, seg) - } - } - - return out -} diff --git a/internal/playback/server.go b/internal/playback/server.go index cea04391..bccf4c5d 100644 --- a/internal/playback/server.go +++ b/internal/playback/server.go @@ -3,10 +3,8 @@ package playback import ( "errors" - "fmt" "net" "net/http" - "strconv" "sync" "time" @@ -18,27 +16,8 @@ import ( "github.com/gin-gonic/gin" ) -const ( - concatenationTolerance = 1 * time.Second -) - var errNoSegmentsFound = errors.New("no recording segments found for the given timestamp") -func parseDuration(raw string) (time.Duration, error) { - // seconds - if secs, err := strconv.ParseFloat(raw, 64); err == nil { - return time.Duration(secs * float64(time.Second)), nil - } - - // deprecated, golang format - return time.ParseDuration(raw) -} - -type listEntry struct { - Start time.Time `json:"start"` - Duration float64 `json:"duration"` -} - type writerWrapper struct { ctx *gin.Context written bool @@ -65,7 +44,7 @@ type Server struct { mutex sync.RWMutex } -// Initialize initializes API. +// Initialize initializes Server. func (p *Server) Initialize() error { router := gin.New() router.SetTrustedProxies(nil) //nolint:errcheck @@ -161,164 +140,3 @@ func (p *Server) doAuth(ctx *gin.Context, pathName string) bool { return true } - -func (p *Server) onList(ctx *gin.Context) { - pathName := ctx.Query("path") - - if !p.doAuth(ctx, pathName) { - return - } - - pathConf, err := p.safeFindPathConf(pathName) - if err != nil { - p.writeError(ctx, http.StatusBadRequest, err) - return - } - - if !pathConf.Playback { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName)) - return - } - - segments, err := FindSegments(pathConf, pathName) - if err != nil { - if errors.Is(err, errNoSegmentsFound) { - p.writeError(ctx, http.StatusNotFound, err) - } else { - p.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - if pathConf.RecordFormat != conf.RecordFormatFMP4 { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4")) - return - } - - for _, seg := range segments { - d, err := fmp4Duration(seg.fpath) - if err != nil { - p.writeError(ctx, http.StatusInternalServerError, err) - return - } - seg.duration = d - } - - segments = mergeConcatenatedSegments(segments) - - out := make([]listEntry, len(segments)) - for i, seg := range segments { - out[i] = listEntry{ - Start: seg.Start, - Duration: seg.duration.Seconds(), - } - } - - ctx.JSON(http.StatusOK, out) -} - -func (p *Server) onGet(ctx *gin.Context) { - pathName := ctx.Query("path") - - if !p.doAuth(ctx, pathName) { - return - } - - start, err := time.Parse(time.RFC3339, ctx.Query("start")) - if err != nil { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err)) - return - } - - duration, err := parseDuration(ctx.Query("duration")) - if err != nil { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err)) - return - } - - format := ctx.Query("format") - if format != "" && format != "fmp4" { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format)) - return - } - - pathConf, err := p.safeFindPathConf(pathName) - if err != nil { - p.writeError(ctx, http.StatusBadRequest, err) - return - } - - segments, err := findSegmentsInTimespan(pathConf, pathName, start, duration) - if err != nil { - if errors.Is(err, errNoSegmentsFound) { - p.writeError(ctx, http.StatusNotFound, err) - } else { - p.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - if pathConf.RecordFormat != conf.RecordFormatFMP4 { - p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4")) - return - } - - ww := &writerWrapper{ctx: ctx} - minTime := start.Sub(segments[0].Start) - maxTime := minTime + duration - - elapsed, err := fmp4SeekAndMux( - segments[0].fpath, - minTime, - maxTime, - ww) - if err != nil { - // user aborted the download - var neterr *net.OpError - if errors.As(err, &neterr) { - return - } - - // nothing has been written yet; send back JSON - if !ww.written { - if errors.Is(err, errNoSegmentsFound) { - p.writeError(ctx, http.StatusNotFound, err) - } else { - p.writeError(ctx, http.StatusBadRequest, err) - } - return - } - - // something has already been written: abort and write logs only - p.Log(logger.Error, err.Error()) - return - } - - start = start.Add(elapsed) - duration -= elapsed - overallElapsed := elapsed - - for _, seg := range segments[1:] { - // there's a gap between segments, stop serving the recording - if seg.Start.Before(start.Add(-concatenationTolerance)) || seg.Start.After(start.Add(concatenationTolerance)) { - return - } - - elapsed, err := fmp4Mux(seg.fpath, overallElapsed, duration, ctx.Writer) - if err != nil { - // user aborted the download - var neterr *net.OpError - if errors.As(err, &neterr) { - return - } - - // something has been already written: abort and write to logs only - p.Log(logger.Error, err.Error()) - return - } - - start = seg.Start.Add(elapsed) - duration -= elapsed - overallElapsed += elapsed - } -}