Browse Source

playback: do not concatenate segments with different tracks (#3197)

main
Alessandro Ros 1 year ago committed by GitHub
parent
commit
50322fc14e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 494
      internal/playback/fmp4.go
  2. 179
      internal/playback/on_get.go
  3. 146
      internal/playback/on_get_test.go
  4. 120
      internal/playback/on_list.go
  5. 134
      internal/playback/on_list_test.go
  6. 30
      internal/playback/segment.go
  7. 184
      internal/playback/server.go

494
internal/playback/fmp4.go

@ -5,7 +5,6 @@ import ( @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"os"
"time"
"github.com/abema/go-mp4"
@ -15,6 +14,8 @@ import ( @@ -15,6 +14,8 @@ import (
const (
sampleFlagIsNonSyncSample = 1 << 16
concatenationTolerance = 1 * time.Second
fmp4Timescale = 90000
)
func durationGoToMp4(v time.Duration, timeScale uint32) uint64 {
@ -33,6 +34,17 @@ func durationMp4ToGo(v uint64, timeScale uint32) time.Duration { @@ -33,6 +34,17 @@ func durationMp4ToGo(v uint64, timeScale uint32) time.Duration {
var errTerminated = errors.New("terminated")
func fmp4CanBeConcatenated(
prevInit []byte,
prevEnd time.Time,
curInit []byte,
curStart time.Time,
) bool {
return bytes.Equal(prevInit, curInit) &&
!curStart.Before(prevEnd.Add(-concatenationTolerance)) &&
!curStart.After(prevEnd.Add(concatenationTolerance))
}
func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
buf := make([]byte, 8)
_, err := io.ReadFull(r, buf)
@ -83,6 +95,217 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) { @@ -83,6 +95,217 @@ func fmp4ReadInit(r io.ReadSeeker) ([]byte, error) {
return buf, nil
}
func fmp4ReadDuration(r io.ReadSeeker) (time.Duration, error) {
// find and skip ftyp
buf := make([]byte, 8)
_, err := io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return 0, fmt.Errorf("ftyp box not found")
}
ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return 0, err
}
// find and skip moov
_, err = io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return 0, fmt.Errorf("moov box not found")
}
moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(moovSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// find last valid moof and mdat
lastMoofPos := int64(-1)
for {
moofPos, err := r.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
_, err = io.ReadFull(r, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'f'}) {
return 0, fmt.Errorf("moof box not found")
}
moofSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(moofSize)-8, io.SeekCurrent)
if err != nil {
break
}
_, err = io.ReadFull(r, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
return 0, fmt.Errorf("mdat box not found")
}
mdatSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(mdatSize)-8, io.SeekCurrent)
if err != nil {
break
}
lastMoofPos = moofPos
}
if lastMoofPos < 0 {
return 0, fmt.Errorf("no moof boxes found")
}
// open last moof
_, err = r.Seek(lastMoofPos+8, io.SeekStart)
if err != nil {
return 0, err
}
_, err = io.ReadFull(r, buf)
if err != nil {
return 0, err
}
// skip mfhd
if !bytes.Equal(buf[4:], []byte{'m', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("mfhd box not found")
}
_, err = r.Seek(8, io.SeekCurrent)
if err != nil {
return 0, err
}
maxElapsed := uint64(0)
// foreach traf
for {
_, err := io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) {
if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
break
}
return 0, fmt.Errorf("traf box not found")
}
// skip tfhd
_, err = io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("tfhd box not found")
}
tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = r.Seek(int64(tfhdSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// parse tfdt
_, err = io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'd', 't'}) {
return 0, fmt.Errorf("tfdt box not found")
}
tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 := make([]byte, tfdtSize-8)
_, err = io.ReadFull(r, buf2)
if err != nil {
return 0, err
}
var tfdt mp4.Tfdt
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfdt, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid tfdt box: %w", err)
}
elapsed := tfdt.BaseMediaDecodeTimeV1
// parse trun
_, err = io.ReadFull(r, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'u', 'n'}) {
return 0, fmt.Errorf("trun box not found")
}
trunSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 = make([]byte, trunSize-8)
_, err = io.ReadFull(r, buf2)
if err != nil {
return 0, err
}
var trun mp4.Trun
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &trun, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid trun box: %w", err)
}
for _, entry := range trun.Entries {
elapsed += uint64(entry.SampleDuration)
}
if elapsed > maxElapsed {
maxElapsed = elapsed
}
}
return durationMp4ToGo(maxElapsed, fmp4Timescale), nil
}
func fmp4SeekAndMuxParts(
r io.ReadSeeker,
init []byte,
@ -90,8 +313,8 @@ func fmp4SeekAndMuxParts( @@ -90,8 +313,8 @@ func fmp4SeekAndMuxParts(
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
minTimeMP4 := durationGoToMp4(minTime, 90000)
maxTimeMP4 := durationGoToMp4(maxTime, 90000)
minTimeMP4 := durationGoToMp4(minTime, fmp4Timescale)
maxTimeMP4 := durationGoToMp4(maxTime, fmp4Timescale)
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
@ -246,7 +469,7 @@ func fmp4SeekAndMuxParts( @@ -246,7 +469,7 @@ func fmp4SeekAndMuxParts(
elapsed -= minTimeMP4
return durationMp4ToGo(elapsed, 90000), nil
return durationMp4ToGo(elapsed, fmp4Timescale), nil
}
func fmp4MuxParts(
@ -255,7 +478,7 @@ func fmp4MuxParts( @@ -255,7 +478,7 @@ func fmp4MuxParts(
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
maxTimeMP4 := durationGoToMp4(maxTime, 90000)
maxTimeMP4 := durationGoToMp4(maxTime, fmp4Timescale)
moofOffset := uint64(0)
var tfhd *mp4.Tfhd
var tfdt *mp4.Tfdt
@ -294,7 +517,7 @@ func fmp4MuxParts( @@ -294,7 +517,7 @@ func fmp4MuxParts(
outTrack = &fmp4.PartTrack{
ID: int(tfhd.TrackID),
BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, 90000),
BaseTime: tfdt.BaseMediaDecodeTimeV1 + durationGoToMp4(startTime, fmp4Timescale),
}
case "trun":
@ -367,262 +590,5 @@ func fmp4MuxParts( @@ -367,262 +590,5 @@ func fmp4MuxParts(
return 0, err
}
return durationMp4ToGo(elapsed, 90000), nil
}
func fmp4SeekAndMux(
fpath string,
minTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
init, err := fmp4ReadInit(f)
if err != nil {
return 0, err
}
elapsed, err := fmp4SeekAndMuxParts(f, init, minTime, maxTime, w)
if err != nil {
return 0, err
}
return elapsed, nil
}
func fmp4Mux(
fpath string,
startTime time.Duration,
maxTime time.Duration,
w io.Writer,
) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
return fmp4MuxParts(f, startTime, maxTime, w)
}
func fmp4Duration(fpath string) (time.Duration, error) {
f, err := os.Open(fpath)
if err != nil {
return 0, err
}
defer f.Close()
// find and skip ftyp
buf := make([]byte, 8)
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'f', 't', 'y', 'p'}) {
return 0, fmt.Errorf("ftyp box not found")
}
ftypSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(ftypSize), io.SeekStart)
if err != nil {
return 0, err
}
// find and skip moov
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'v'}) {
return 0, fmt.Errorf("moov box not found")
}
moovSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(moovSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// find last valid moof and mdat
lastMoofPos := int64(-1)
for {
moofPos, err := f.Seek(0, io.SeekCurrent)
if err != nil {
return 0, err
}
_, err = io.ReadFull(f, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'o', 'o', 'f'}) {
return 0, fmt.Errorf("moof box not found")
}
moofSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(moofSize)-8, io.SeekCurrent)
if err != nil {
break
}
_, err = io.ReadFull(f, buf)
if err != nil {
break
}
if !bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
return 0, fmt.Errorf("mdat box not found")
}
mdatSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(mdatSize)-8, io.SeekCurrent)
if err != nil {
break
}
lastMoofPos = moofPos
}
if lastMoofPos < 0 {
return 0, fmt.Errorf("no moof boxes found")
}
// open last moof
_, err = f.Seek(lastMoofPos+8, io.SeekStart)
if err != nil {
return 0, err
}
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
// skip mfhd
if !bytes.Equal(buf[4:], []byte{'m', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("mfhd box not found")
}
_, err = f.Seek(8, io.SeekCurrent)
if err != nil {
return 0, err
}
maxElapsed := uint64(0)
// foreach traf
for {
_, err := io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'a', 'f'}) {
if bytes.Equal(buf[4:], []byte{'m', 'd', 'a', 't'}) {
break
}
return 0, fmt.Errorf("traf box not found")
}
// skip tfhd
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'h', 'd'}) {
return 0, fmt.Errorf("tfhd box not found")
}
tfhdSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
_, err = f.Seek(int64(tfhdSize)-8, io.SeekCurrent)
if err != nil {
return 0, err
}
// parse tfdt
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'f', 'd', 't'}) {
return 0, fmt.Errorf("tfdt box not found")
}
tfdtSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 := make([]byte, tfdtSize-8)
_, err = io.ReadFull(f, buf2)
if err != nil {
return 0, err
}
var tfdt mp4.Tfdt
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &tfdt, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid tfdt box: %w", err)
}
elapsed := tfdt.BaseMediaDecodeTimeV1
// parse trun
_, err = io.ReadFull(f, buf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf[4:], []byte{'t', 'r', 'u', 'n'}) {
return 0, fmt.Errorf("trun box not found")
}
trunSize := uint32(buf[0])<<24 | uint32(buf[1])<<16 | uint32(buf[2])<<8 | uint32(buf[3])
buf2 = make([]byte, trunSize-8)
_, err = io.ReadFull(f, buf2)
if err != nil {
return 0, err
}
var trun mp4.Trun
_, err = mp4.Unmarshal(bytes.NewReader(buf2), uint64(len(buf2)), &trun, mp4.Context{})
if err != nil {
return 0, fmt.Errorf("invalid trun box: %w", err)
}
for _, entry := range trun.Entries {
elapsed += uint64(entry.SampleDuration)
}
if elapsed > maxElapsed {
maxElapsed = elapsed
}
}
return durationMp4ToGo(maxElapsed, 90000), nil
return durationMp4ToGo(elapsed, fmp4Timescale), nil
}

179
internal/playback/on_get.go

@ -0,0 +1,179 @@ @@ -0,0 +1,179 @@
package playback
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/gin-gonic/gin"
)
var errStopIteration = errors.New("stop iteration")
func parseDuration(raw string) (time.Duration, error) {
// seconds
if secs, err := strconv.ParseFloat(raw, 64); err == nil {
return time.Duration(secs * float64(time.Second)), nil
}
// deprecated, golang format
return time.ParseDuration(raw)
}
func seekAndMux(
recordFormat conf.RecordFormat,
segments []*Segment,
start time.Time,
duration time.Duration,
w io.Writer,
) error {
if recordFormat == conf.RecordFormatFMP4 {
minTime := start.Sub(segments[0].Start)
maxTime := minTime + duration
var init []byte
var elapsed time.Duration
err := func() error {
f, err := os.Open(segments[0].Fpath)
if err != nil {
return err
}
defer f.Close()
init, err = fmp4ReadInit(f)
if err != nil {
return err
}
elapsed, err = fmp4SeekAndMuxParts(f, init, minTime, maxTime, w)
if err != nil {
return err
}
return nil
}()
if err != nil {
return err
}
prevInit := init
prevEnd := start.Add(elapsed)
duration -= elapsed
overallElapsed := elapsed
for _, seg := range segments[1:] {
err := func() error {
f, err := os.Open(seg.Fpath)
if err != nil {
return err
}
defer f.Close()
init, err := fmp4ReadInit(f)
if err != nil {
return err
}
if !fmp4CanBeConcatenated(prevInit, prevEnd, init, seg.Start) {
return errStopIteration
}
elapsed, err = fmp4MuxParts(f, overallElapsed, duration, w)
if err != nil {
return err
}
return nil
}()
if err != nil {
if errors.Is(err, errStopIteration) {
return nil
}
return err
}
prevEnd = seg.Start.Add(elapsed)
duration -= elapsed
overallElapsed += elapsed
}
return nil
}
return fmt.Errorf("MPEG-TS format is not supported yet")
}
func (p *Server) onGet(ctx *gin.Context) {
pathName := ctx.Query("path")
if !p.doAuth(ctx, pathName) {
return
}
start, err := time.Parse(time.RFC3339, ctx.Query("start"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err))
return
}
duration, err := parseDuration(ctx.Query("duration"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err))
return
}
format := ctx.Query("format")
if format != "" && format != "fmp4" {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format))
return
}
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
segments, err := findSegmentsInTimespan(pathConf, pathName, start, duration)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
ww := &writerWrapper{ctx: ctx}
err = seekAndMux(pathConf.RecordFormat, segments, start, duration, ww)
if err != nil {
// user aborted the download
var neterr *net.OpError
if errors.As(err, &neterr) {
return
}
// nothing has been written yet; send back JSON
if !ww.written {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
// something has already been written: abort and write logs only
p.Log(logger.Error, err.Error())
return
}
}

146
internal/playback/server_test.go → internal/playback/on_get_test.go

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package playback
import (
"encoding/json"
"io"
"net/http"
"net/url"
@ -10,6 +9,7 @@ import ( @@ -10,6 +9,7 @@ import (
"testing"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4"
"github.com/bluenviron/mediacommon/pkg/formats/fmp4/seekablebuffer"
"github.com/bluenviron/mediamtx/internal/auth"
@ -20,14 +20,27 @@ import ( @@ -20,14 +20,27 @@ import (
func writeSegment1(t *testing.T, fpath string) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
}},
{
ID: 2,
TimeScale: 90000,
Codec: &fmp4.CodecMPEG4Audio{
Config: mpeg4audio.Config{
Type: mpeg4audio.ObjectTypeAACLC,
SampleRate: 48000,
ChannelCount: 2,
},
},
},
},
}
var buf1 seekablebuffer.Buffer
@ -78,14 +91,27 @@ func writeSegment1(t *testing.T, fpath string) { @@ -78,14 +91,27 @@ func writeSegment1(t *testing.T, fpath string) {
func writeSegment2(t *testing.T, fpath string) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
}},
{
ID: 2,
TimeScale: 90000,
Codec: &fmp4.CodecMPEG4Audio{
Config: mpeg4audio.Config{
Type: mpeg4audio.ObjectTypeAACLC,
SampleRate: 48000,
ChannelCount: 2,
},
},
},
},
}
var buf1 seekablebuffer.Buffer
@ -121,6 +147,48 @@ func writeSegment2(t *testing.T, fpath string) { @@ -121,6 +147,48 @@ func writeSegment2(t *testing.T, fpath string) {
require.NoError(t, err)
}
func writeSegment3(t *testing.T, fpath string) {
init := fmp4.Init{
Tracks: []*fmp4.InitTrack{
{
ID: 1,
TimeScale: 90000,
Codec: &fmp4.CodecH264{
SPS: test.FormatH264.SPS,
PPS: test.FormatH264.PPS,
},
},
},
}
var buf1 seekablebuffer.Buffer
err := init.Marshal(&buf1)
require.NoError(t, err)
var buf2 seekablebuffer.Buffer
parts := fmp4.Parts{
{
SequenceNumber: 1,
Tracks: []*fmp4.PartTrack{{
ID: 1,
BaseTime: 0,
Samples: []*fmp4.PartSample{
{
Duration: 1 * 90000,
IsNonSyncSample: false,
Payload: []byte{10, 11},
},
},
}},
},
}
err = parts.Marshal(&buf2)
require.NoError(t, err)
err = os.WriteFile(fpath, append(buf1.Bytes(), buf2.Bytes()...), 0o644)
require.NoError(t, err)
}
var authManager = &auth.Manager{
Method: conf.AuthMethodInternal,
InternalUsers: []conf.AuthInternalUser{
@ -138,7 +206,7 @@ var authManager = &auth.Manager{ @@ -138,7 +206,7 @@ var authManager = &auth.Manager{
RTSPAuthMethods: nil,
}
func TestServerGet(t *testing.T) {
func TestOnGet(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -229,7 +297,7 @@ func TestServerGet(t *testing.T) { @@ -229,7 +297,7 @@ func TestServerGet(t *testing.T) {
}, parts)
}
func TestServerList(t *testing.T) {
func TestOnGetDifferentInit(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
@ -238,8 +306,7 @@ func TestServerList(t *testing.T) { @@ -238,8 +306,7 @@ func TestServerList(t *testing.T) {
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-500000.mp4"))
writeSegment3(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
@ -257,11 +324,14 @@ func TestServerList(t *testing.T) { @@ -257,11 +324,14 @@ func TestServerList(t *testing.T) {
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:9996/list")
u, err := url.Parse("http://myuser:mypass@localhost:9996/get")
require.NoError(t, err)
v := url.Values{}
v.Set("path", "mypath")
v.Set("start", time.Date(2008, 11, 0o7, 11, 23, 1, 500000000, time.Local).Format(time.RFC3339Nano))
v.Set("duration", "2")
v.Set("format", "fmp4")
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
@ -273,18 +343,32 @@ func TestServerList(t *testing.T) { @@ -273,18 +343,32 @@ func TestServerList(t *testing.T) {
require.Equal(t, http.StatusOK, res.StatusCode)
var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
buf, err := io.ReadAll(res.Body)
require.NoError(t, err)
require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(64),
"start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano),
},
map[string]interface{}{
"duration": float64(2),
"start": time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano),
var parts fmp4.Parts
err = parts.Unmarshal(buf)
require.NoError(t, err)
require.Equal(t, fmp4.Parts{
{
SequenceNumber: 0,
Tracks: []*fmp4.PartTrack{
{
ID: 1,
Samples: []*fmp4.PartSample{
{
Duration: 0,
Payload: []byte{3, 4},
},
{
Duration: 90000,
IsNonSyncSample: true,
Payload: []byte{5, 6},
},
},
},
},
},
}, out)
}, parts)
}

120
internal/playback/on_list.go

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
package playback
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/gin-gonic/gin"
)
type listEntryDuration time.Duration
func (d listEntryDuration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(d).Seconds())
}
type listEntry struct {
Start time.Time `json:"start"`
Duration listEntryDuration `json:"duration"`
}
func computeDurationAndConcatenate(recordFormat conf.RecordFormat, segments []*Segment) ([]listEntry, error) {
if recordFormat == conf.RecordFormatFMP4 {
out := []listEntry{}
var prevInit []byte
for _, seg := range segments {
err := func() error {
f, err := os.Open(seg.Fpath)
if err != nil {
return err
}
defer f.Close()
init, err := fmp4ReadInit(f)
if err != nil {
return err
}
_, err = f.Seek(0, io.SeekStart)
if err != nil {
return err
}
duration, err := fmp4ReadDuration(f)
if err != nil {
return err
}
if len(out) != 0 && fmp4CanBeConcatenated(
prevInit,
out[len(out)-1].Start.Add(time.Duration(out[len(out)-1].Duration)),
init,
seg.Start) {
prevStart := out[len(out)-1].Start
curEnd := seg.Start.Add(duration)
out[len(out)-1].Duration = listEntryDuration(curEnd.Sub(prevStart))
} else {
out = append(out, listEntry{
Start: seg.Start,
Duration: listEntryDuration(duration),
})
}
prevInit = init
return nil
}()
if err != nil {
return nil, err
}
}
return out, nil
}
return nil, fmt.Errorf("MPEG-TS format is not supported yet")
}
func (p *Server) onList(ctx *gin.Context) {
pathName := ctx.Query("path")
if !p.doAuth(ctx, pathName) {
return
}
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
if !pathConf.Playback {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName))
return
}
segments, err := FindSegments(pathConf, pathName)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
out, err := computeDurationAndConcatenate(pathConf.RecordFormat, segments)
if err != nil {
p.writeError(ctx, http.StatusInternalServerError, err)
return
}
ctx.JSON(http.StatusOK, out)
}

134
internal/playback/on_list_test.go

@ -0,0 +1,134 @@ @@ -0,0 +1,134 @@
package playback
import (
"encoding/json"
"net/http"
"net/url"
"os"
"path/filepath"
"testing"
"time"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require"
)
func TestOnList(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
writeSegment2(t, filepath.Join(dir, "mypath", "2009-11-07_11-23-02-500000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Playback: true,
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: authManager,
Parent: &test.NilLogger{},
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:9996/list")
require.NoError(t, err)
v := url.Values{}
v.Set("path", "mypath")
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
require.NoError(t, err)
require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(64),
"start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano),
},
map[string]interface{}{
"duration": float64(2),
"start": time.Date(2009, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano),
},
}, out)
}
func TestOnListDifferentInit(t *testing.T) {
dir, err := os.MkdirTemp("", "mediamtx-playback")
require.NoError(t, err)
defer os.RemoveAll(dir)
err = os.Mkdir(filepath.Join(dir, "mypath"), 0o755)
require.NoError(t, err)
writeSegment1(t, filepath.Join(dir, "mypath", "2008-11-07_11-22-00-500000.mp4"))
writeSegment3(t, filepath.Join(dir, "mypath", "2008-11-07_11-23-02-500000.mp4"))
s := &Server{
Address: "127.0.0.1:9996",
ReadTimeout: conf.StringDuration(10 * time.Second),
PathConfs: map[string]*conf.Path{
"mypath": {
Playback: true,
RecordPath: filepath.Join(dir, "%path/%Y-%m-%d_%H-%M-%S-%f"),
},
},
AuthManager: authManager,
Parent: &test.NilLogger{},
}
err = s.Initialize()
require.NoError(t, err)
defer s.Close()
u, err := url.Parse("http://myuser:mypass@localhost:9996/list")
require.NoError(t, err)
v := url.Values{}
v.Set("path", "mypath")
u.RawQuery = v.Encode()
req, err := http.NewRequest(http.MethodGet, u.String(), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusOK, res.StatusCode)
var out interface{}
err = json.NewDecoder(res.Body).Decode(&out)
require.NoError(t, err)
require.Equal(t, []interface{}{
map[string]interface{}{
"duration": float64(62),
"start": time.Date(2008, 11, 0o7, 11, 22, 0, 500000000, time.Local).Format(time.RFC3339Nano),
},
map[string]interface{}{
"duration": float64(1),
"start": time.Date(2008, 11, 0o7, 11, 23, 2, 500000000, time.Local).Format(time.RFC3339Nano),
},
}, out)
}

30
internal/playback/segment.go

@ -14,9 +14,8 @@ import ( @@ -14,9 +14,8 @@ import (
// Segment is a recording segment.
type Segment struct {
fpath string
Start time.Time
duration time.Duration
Fpath string
Start time.Time
}
func findSegmentsInTimespan(
@ -54,7 +53,7 @@ func findSegmentsInTimespan( @@ -54,7 +53,7 @@ func findSegmentsInTimespan(
// gather all segments that starts before the end of the playback
if ok && !end.Before(pa.Start) {
segments = append(segments, &Segment{
fpath: fpath,
Fpath: fpath,
Start: pa.Start,
})
}
@ -126,7 +125,7 @@ func FindSegments( @@ -126,7 +125,7 @@ func FindSegments(
ok := pa.Decode(recordPath, fpath)
if ok {
segments = append(segments, &Segment{
fpath: fpath,
Fpath: fpath,
Start: pa.Start,
})
}
@ -148,24 +147,3 @@ func FindSegments( @@ -148,24 +147,3 @@ func FindSegments(
return segments, nil
}
func canBeConcatenated(seg1, seg2 *Segment) bool {
end1 := seg1.Start.Add(seg1.duration)
return !seg2.Start.Before(end1.Add(-concatenationTolerance)) && !seg2.Start.After(end1.Add(concatenationTolerance))
}
func mergeConcatenatedSegments(in []*Segment) []*Segment {
var out []*Segment
for _, seg := range in {
if len(out) != 0 && canBeConcatenated(out[len(out)-1], seg) {
start := out[len(out)-1].Start
end := seg.Start.Add(seg.duration)
out[len(out)-1].duration = end.Sub(start)
} else {
out = append(out, seg)
}
}
return out
}

184
internal/playback/server.go

@ -3,10 +3,8 @@ package playback @@ -3,10 +3,8 @@ package playback
import (
"errors"
"fmt"
"net"
"net/http"
"strconv"
"sync"
"time"
@ -18,27 +16,8 @@ import ( @@ -18,27 +16,8 @@ import (
"github.com/gin-gonic/gin"
)
const (
concatenationTolerance = 1 * time.Second
)
var errNoSegmentsFound = errors.New("no recording segments found for the given timestamp")
func parseDuration(raw string) (time.Duration, error) {
// seconds
if secs, err := strconv.ParseFloat(raw, 64); err == nil {
return time.Duration(secs * float64(time.Second)), nil
}
// deprecated, golang format
return time.ParseDuration(raw)
}
type listEntry struct {
Start time.Time `json:"start"`
Duration float64 `json:"duration"`
}
type writerWrapper struct {
ctx *gin.Context
written bool
@ -65,7 +44,7 @@ type Server struct { @@ -65,7 +44,7 @@ type Server struct {
mutex sync.RWMutex
}
// Initialize initializes API.
// Initialize initializes Server.
func (p *Server) Initialize() error {
router := gin.New()
router.SetTrustedProxies(nil) //nolint:errcheck
@ -161,164 +140,3 @@ func (p *Server) doAuth(ctx *gin.Context, pathName string) bool { @@ -161,164 +140,3 @@ func (p *Server) doAuth(ctx *gin.Context, pathName string) bool {
return true
}
func (p *Server) onList(ctx *gin.Context) {
pathName := ctx.Query("path")
if !p.doAuth(ctx, pathName) {
return
}
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
if !pathConf.Playback {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("playback is disabled on path '%s'", pathName))
return
}
segments, err := FindSegments(pathConf, pathName)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
if pathConf.RecordFormat != conf.RecordFormatFMP4 {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4"))
return
}
for _, seg := range segments {
d, err := fmp4Duration(seg.fpath)
if err != nil {
p.writeError(ctx, http.StatusInternalServerError, err)
return
}
seg.duration = d
}
segments = mergeConcatenatedSegments(segments)
out := make([]listEntry, len(segments))
for i, seg := range segments {
out[i] = listEntry{
Start: seg.Start,
Duration: seg.duration.Seconds(),
}
}
ctx.JSON(http.StatusOK, out)
}
func (p *Server) onGet(ctx *gin.Context) {
pathName := ctx.Query("path")
if !p.doAuth(ctx, pathName) {
return
}
start, err := time.Parse(time.RFC3339, ctx.Query("start"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid start: %w", err))
return
}
duration, err := parseDuration(ctx.Query("duration"))
if err != nil {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid duration: %w", err))
return
}
format := ctx.Query("format")
if format != "" && format != "fmp4" {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("invalid format: %s", format))
return
}
pathConf, err := p.safeFindPathConf(pathName)
if err != nil {
p.writeError(ctx, http.StatusBadRequest, err)
return
}
segments, err := findSegmentsInTimespan(pathConf, pathName, start, duration)
if err != nil {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
if pathConf.RecordFormat != conf.RecordFormatFMP4 {
p.writeError(ctx, http.StatusBadRequest, fmt.Errorf("format of recording segments is not fmp4"))
return
}
ww := &writerWrapper{ctx: ctx}
minTime := start.Sub(segments[0].Start)
maxTime := minTime + duration
elapsed, err := fmp4SeekAndMux(
segments[0].fpath,
minTime,
maxTime,
ww)
if err != nil {
// user aborted the download
var neterr *net.OpError
if errors.As(err, &neterr) {
return
}
// nothing has been written yet; send back JSON
if !ww.written {
if errors.Is(err, errNoSegmentsFound) {
p.writeError(ctx, http.StatusNotFound, err)
} else {
p.writeError(ctx, http.StatusBadRequest, err)
}
return
}
// something has already been written: abort and write logs only
p.Log(logger.Error, err.Error())
return
}
start = start.Add(elapsed)
duration -= elapsed
overallElapsed := elapsed
for _, seg := range segments[1:] {
// there's a gap between segments, stop serving the recording
if seg.Start.Before(start.Add(-concatenationTolerance)) || seg.Start.After(start.Add(concatenationTolerance)) {
return
}
elapsed, err := fmp4Mux(seg.fpath, overallElapsed, duration, ctx.Writer)
if err != nil {
// user aborted the download
var neterr *net.OpError
if errors.As(err, &neterr) {
return
}
// something has been already written: abort and write to logs only
p.Log(logger.Error, err.Error())
return
}
start = seg.Start.Add(elapsed)
duration -= elapsed
overallElapsed += elapsed
}
}

Loading…
Cancel
Save