From a867c469fd53fff88389812e08255e6b4e7a9b94 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Wed, 3 Nov 2021 22:28:50 +0100 Subject: [PATCH] fix automatic deletion of regexp paths (#664) (#680) --- go.mod | 3 +- go.sum | 4 +- internal/core/core_test.go | 157 +++++++++++++++++++++++++++++- internal/core/path.go | 16 +++ internal/core/rtsp_server_test.go | 148 +--------------------------- 5 files changed, 179 insertions(+), 149 deletions(-) diff --git a/go.mod b/go.mod index ca024da8..5b5972af 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.16 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20211030125802-b5f1d7ccb6bc + github.com/aler9/gortsplib v0.0.0-20211103204924-c1ca81eb7445 github.com/asticode/go-astits v1.10.0 github.com/fsnotify/fsnotify v1.4.9 github.com/gin-gonic/gin v1.7.2 @@ -14,6 +14,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/notedit/rtmp v0.0.2 github.com/pion/rtp v1.6.2 + github.com/pion/sdp/v3 v3.0.2 github.com/stretchr/testify v1.6.1 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/go.sum b/go.sum index d6222284..273507b1 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20211030125802-b5f1d7ccb6bc h1:GgJgZW0o1AKBfVVnp8XI2Pk7IsySn1VmzySaWw8FtqQ= -github.com/aler9/gortsplib v0.0.0-20211030125802-b5f1d7ccb6bc/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc= +github.com/aler9/gortsplib v0.0.0-20211103204924-c1ca81eb7445 h1:ltLUBHa201xlIWKw5IHMjNCUYlmH+dVh4HYIFHRe7O0= +github.com/aler9/gortsplib v0.0.0-20211103204924-c1ca81eb7445/go.mod h1:fyQrQyHo8QvdR/h357tkv1g36VesZlzEPsdAu2VrHHc= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ= github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= diff --git a/internal/core/core_test.go b/internal/core/core_test.go index 86e55c84..113d6f53 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "io/ioutil" "os" "os/exec" @@ -10,6 +11,9 @@ import ( "time" "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/gortsplib/pkg/headers" + psdp "github.com/pion/sdp/v3" "github.com/stretchr/testify/require" ) @@ -148,6 +152,157 @@ func newInstance(conf string) (*Core, bool) { return New([]string{tmpf}) } +func TestCorePathAutoDeletion(t *testing.T) { + for _, ca := range []string{"describe", "setup"} { + t.Run(ca, func(t *testing.T) { + p, ok := New([]string{}) + require.Equal(t, true, ok) + defer p.close() + + func() { + conn, err := gortsplib.Dial("rtsp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + + if ca == "describe" { + ur, err := base.ParseURL("rtsp://localhost:8554/mypath") + require.NoError(t, err) + + _, _, _, err = conn.Describe(ur) + require.EqualError(t, err, "bad status code: 404 (Not Found)") + } else { + baseURL, err := base.ParseURL("rtsp://localhost:8554/mypath/") + require.NoError(t, err) + + track, err := gortsplib.NewTrackH264(96, + &gortsplib.TrackConfigH264{SPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x01, 0x02, 0x03, 0x04}}) + require.NoError(t, err) + + track.Media.Attributes = append(track.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=0", + }) + + _, err = conn.Setup(headers.TransportModePlay, baseURL, track, 0, 0) + require.EqualError(t, err, "bad status code: 404 (Not Found)") + } + }() + + res := p.pathManager.onAPIPathsList(apiPathsListReq1{}) + require.NoError(t, res.Err) + + require.Equal(t, 0, len(res.Data.Items)) + }) + } +} + +func TestCorePathRunOnDemand(t *testing.T) { + doneFile := filepath.Join(os.TempDir(), "ondemand_done") + + srcFile := filepath.Join(os.TempDir(), "ondemand.go") + err := ioutil.WriteFile(srcFile, []byte(` +package main + +import ( + "os" + "os/signal" + "syscall" + "io/ioutil" + "github.com/aler9/gortsplib" +) + +func main() { + track, err := gortsplib.NewTrackH264(96, + &gortsplib.TrackConfigH264{SPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x01, 0x02, 0x03, 0x04}}) + if err != nil { + panic(err) + } + + source, err := gortsplib.DialPublish( + "rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("RTSP_PATH"), + gortsplib.Tracks{track}) + if err != nil { + panic(err) + } + defer source.Close() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT) + <-c + + err = ioutil.WriteFile("`+doneFile+`", []byte(""), 0644) + if err != nil { + panic(err) + } +} +`), 0o644) + require.NoError(t, err) + + execFile := filepath.Join(os.TempDir(), "ondemand_cmd") + cmd := exec.Command("go", "build", "-o", execFile, srcFile) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + require.NoError(t, err) + defer os.Remove(execFile) + + os.Remove(srcFile) + + for _, ca := range []string{"describe", "setup", "describe and setup"} { + t.Run(ca, func(t *testing.T) { + defer os.Remove(doneFile) + + p1, ok := newInstance(fmt.Sprintf("rtmpDisable: yes\n"+ + "hlsDisable: yes\n"+ + "paths:\n"+ + " all:\n"+ + " runOnDemand: %s\n"+ + " runOnDemandCloseAfter: 1s\n", execFile)) + require.Equal(t, true, ok) + defer p1.close() + + func() { + conn, err := gortsplib.Dial("rtsp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + + if ca == "describe" || ca == "describe and setup" { + ur, err := base.ParseURL("rtsp://localhost:8554/ondemand") + require.NoError(t, err) + + _, _, _, err = conn.Describe(ur) + require.NoError(t, err) + } + + if ca == "setup" || ca == "describe and setup" { + baseURL, err := base.ParseURL("rtsp://localhost:8554/ondemand/") + require.NoError(t, err) + + track, err := gortsplib.NewTrackH264(96, + &gortsplib.TrackConfigH264{SPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x01, 0x02, 0x03, 0x04}}) + require.NoError(t, err) + + track.Media.Attributes = append(track.Media.Attributes, psdp.Attribute{ + Key: "control", + Value: "trackID=0", + }) + + _, err = conn.Setup(headers.TransportModePlay, baseURL, track, 0, 0) + require.NoError(t, err) + } + }() + + for { + _, err := os.Stat(doneFile) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + }) + } +} + func TestCoreHotReloading(t *testing.T) { confPath := filepath.Join(os.TempDir(), "rtsp-conf") @@ -171,7 +326,7 @@ func TestCoreHotReloading(t *testing.T) { _, err = gortsplib.DialPublish( "rtsp://localhost:8554/test1", gortsplib.Tracks{track}) - require.EqualError(t, err, "invalid status code: 401 (Unauthorized)") + require.EqualError(t, err, "bad status code: 401 (Unauthorized)") }() err = ioutil.WriteFile(confPath, []byte("paths:\n"+ diff --git a/internal/core/path.go b/internal/core/path.go index 81399a90..114551f1 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -369,6 +369,14 @@ outer: case req := <-pa.describe: pa.handleDescribe(req) + if pa.conf.Regexp != nil && + pa.source == nil && + len(pa.readers) == 0 && + len(pa.describeRequests) == 0 && + len(pa.setupPlayRequests) == 0 { + break outer + } + case req := <-pa.publisherRemove: pa.handlePublisherRemove(req) @@ -395,6 +403,14 @@ outer: case req := <-pa.readerSetupPlay: pa.handleReaderSetupPlay(req) + if pa.conf.Regexp != nil && + pa.source == nil && + len(pa.readers) == 0 && + len(pa.describeRequests) == 0 && + len(pa.setupPlayRequests) == 0 { + break outer + } + case req := <-pa.readerPlay: pa.handleReaderPlay(req) diff --git a/internal/core/rtsp_server_test.go b/internal/core/rtsp_server_test.go index 6e8a7fa2..686ed7fe 100644 --- a/internal/core/rtsp_server_test.go +++ b/internal/core/rtsp_server_test.go @@ -1,31 +1,16 @@ package core import ( - "bufio" "bytes" - "fmt" - "io/ioutil" - "net" "os" - "os/exec" - "path/filepath" "testing" "time" "github.com/aler9/gortsplib" "github.com/aler9/gortsplib/pkg/base" - "github.com/aler9/gortsplib/pkg/headers" "github.com/stretchr/testify/require" ) -func mustParseURL(s string) *base.URL { - u, err := base.ParseURL(s) - if err != nil { - panic(err) - } - return u -} - func TestRTSPServerPublishRead(t *testing.T) { for _, ca := range []struct { publisherSoft string @@ -336,7 +321,7 @@ func TestRTSPServerAuthFail(t *testing.T) { "rtsp://"+ca.user+":"+ca.pass+"@localhost:8554/test/stream", gortsplib.Tracks{track}, ) - require.EqualError(t, err, "invalid status code: 401 (Unauthorized)") + require.EqualError(t, err, "bad status code: 401 (Unauthorized)") }) } @@ -374,7 +359,7 @@ func TestRTSPServerAuthFail(t *testing.T) { _, err := gortsplib.DialRead( "rtsp://" + ca.user + ":" + ca.pass + "@localhost:8554/test/stream", ) - require.EqualError(t, err, "invalid status code: 401 (Unauthorized)") + require.EqualError(t, err, "bad status code: 401 (Unauthorized)") }) } @@ -395,7 +380,7 @@ func TestRTSPServerAuthFail(t *testing.T) { "rtsp://localhost:8554/test/stream", gortsplib.Tracks{track}, ) - require.EqualError(t, err, "invalid status code: 401 (Unauthorized)") + require.EqualError(t, err, "bad status code: 401 (Unauthorized)") }) } @@ -679,130 +664,3 @@ func TestRTSPServerFallback(t *testing.T) { }) } } - -func TestRTSPServerRunOnDemand(t *testing.T) { - doneFile := filepath.Join(os.TempDir(), "ondemand_done") - - srcFile := filepath.Join(os.TempDir(), "ondemand.go") - err := ioutil.WriteFile(srcFile, []byte(` -package main - -import ( - "os" - "os/signal" - "syscall" - "io/ioutil" - "github.com/aler9/gortsplib" -) - -func main() { - track, err := gortsplib.NewTrackH264(96, - &gortsplib.TrackConfigH264{SPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x01, 0x02, 0x03, 0x04}}) - if err != nil { - panic(err) - } - - source, err := gortsplib.DialPublish( - "rtsp://localhost:" + os.Getenv("RTSP_PORT") + "/" + os.Getenv("RTSP_PATH"), - gortsplib.Tracks{track}) - if err != nil { - panic(err) - } - defer source.Close() - - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT) - <-c - - err = ioutil.WriteFile("`+doneFile+`", []byte(""), 0644) - if err != nil { - panic(err) - } -} -`), 0o644) - require.NoError(t, err) - - execFile := filepath.Join(os.TempDir(), "ondemand_cmd") - cmd := exec.Command("go", "build", "-o", execFile, srcFile) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err = cmd.Run() - require.NoError(t, err) - defer os.Remove(execFile) - - os.Remove(srcFile) - - for _, ca := range []string{"describe", "setup", "describe and setup"} { - t.Run(ca, func(t *testing.T) { - defer os.Remove(doneFile) - - p1, ok := newInstance(fmt.Sprintf("rtmpDisable: yes\n"+ - "hlsDisable: yes\n"+ - "paths:\n"+ - " all:\n"+ - " runOnDemand: %s\n"+ - " runOnDemandCloseAfter: 1s\n", execFile)) - require.Equal(t, true, ok) - defer p1.close() - - func() { - conn, err := net.Dial("tcp", "127.0.0.1:8554") - require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - - if ca == "describe" || ca == "describe and setup" { - err = base.Request{ - Method: base.Describe, - URL: mustParseURL("rtsp://localhost:8554/ondemand"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - } - - if ca == "setup" || ca == "describe and setup" { - err = base.Request{ - Method: base.Setup, - URL: mustParseURL("rtsp://localhost:8554/ondemand/trackID=0"), - Header: base.Header{ - "CSeq": base.HeaderValue{"2"}, - "Transport": headers.Transport{ - Protocol: headers.TransportProtocolTCP, - Delivery: func() *headers.TransportDelivery { - v := headers.TransportDeliveryUnicast - return &v - }(), - Mode: func() *headers.TransportMode { - v := headers.TransportModePlay - return &v - }(), - InterleavedIDs: &[2]int{0, 1}, - }.Write(), - }, - }.Write(bconn.Writer) - require.NoError(t, err) - - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - } - }() - - for { - _, err := os.Stat(doneFile) - if err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - }) - } -}