Browse Source

drop existing publisher when a new publisher connects (#187)

pull/235/head
aler9 5 years ago
parent
commit
2f156b59f8
  1. 3
      go.mod
  2. 4
      go.sum
  3. 18
      internal/path/path.go
  4. 101
      main_test.go

3
go.mod

@ -5,11 +5,12 @@ go 1.15 @@ -5,11 +5,12 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210203222351-9ecea799f5f0
github.com/aler9/gortsplib v0.0.0-20210215170952-6ce21a78419c
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/notedit/rtmp v0.0.2
github.com/pion/sdp/v3 v3.0.2
github.com/stretchr/testify v1.6.1
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
gopkg.in/alecthomas/kingpin.v2 v2.2.6

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20210203222351-9ecea799f5f0 h1:GR21cFNTYj4mNdqIz917WqqDQVarc+kKqElBgHXzoG0=
github.com/aler9/gortsplib v0.0.0-20210203222351-9ecea799f5f0/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/aler9/gortsplib v0.0.0-20210215170952-6ce21a78419c h1:ckOJiy0/+h6DmG7gAVvf5NI2Fl1AZAYIDf7DAUbDhl8=
github.com/aler9/gortsplib v0.0.0-20210215170952-6ce21a78419c/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=

18
internal/path/path.go

@ -685,8 +685,22 @@ func (pa *Path) onClientAnnounce(c client.Client, tracks gortsplib.Tracks) error @@ -685,8 +685,22 @@ func (pa *Path) onClientAnnounce(c client.Client, tracks gortsplib.Tracks) error
return fmt.Errorf("already subscribed")
}
if pa.source != nil || pa.hasExternalSource() {
return fmt.Errorf("someone is already publishing to path '%s'", pa.name)
if pa.hasExternalSource() {
return fmt.Errorf("path '%s' is assigned to an external source", pa.name)
}
if pa.source != nil {
pa.Log(logger.Info, "disconnecting existing publisher")
curPublisher := pa.source.(client.Client)
pa.removeClient(curPublisher)
pa.parent.OnPathClientClose(curPublisher)
// prevent path closure
if pa.closeTimerStarted {
pa.closeTimer.Stop()
pa.closeTimer = newEmptyTimer()
pa.closeTimerStarted = false
}
}
pa.addClient(c, clientStatePreRecord)

101
main_test.go

@ -15,6 +15,7 @@ import ( @@ -15,6 +15,7 @@ import (
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
psdp "github.com/pion/sdp/v3"
"github.com/stretchr/testify/require"
)
@ -350,6 +351,100 @@ func TestAutomaticProtocol(t *testing.T) { @@ -350,6 +351,100 @@ func TestAutomaticProtocol(t *testing.T) {
}
}
func TestPublisherOverride(t *testing.T) {
p, ok := testProgram("")
require.Equal(t, true, ok)
defer p.close()
publish := func() (net.Conn, error) {
conn, err := net.Dial("tcp", "127.0.0.1:8554")
if err != nil {
return nil, err
}
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
var tracks gortsplib.Tracks
videoTrack, err := gortsplib.NewTrackH264(96, []byte("123456"), []byte("123456"))
if err != nil {
conn.Close()
return nil, err
}
tracks = append(tracks, videoTrack)
for i, t := range tracks {
t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{
Key: "control",
Value: "trackID=" + strconv.FormatInt(int64(i), 10),
})
}
err = base.Request{
Method: base.Announce,
URL: base.MustParseURL("rtsp://localhost:8554/mypath"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: tracks.Write(),
}.Write(bconn.Writer)
if err != nil {
conn.Close()
return nil, err
}
var res base.Response
err = res.Read(bconn.Reader)
if err != nil {
conn.Close()
return nil, err
}
require.Equal(t, base.StatusOK, res.StatusCode)
err = base.Request{
Method: base.Setup,
URL: base.MustParseURL("rtsp://localhost:8554/mypath/trackID=0"),
Header: base.Header{
"CSeq": base.HeaderValue{"2"},
"Transport": headers.Transport{
Protocol: gortsplib.StreamProtocolTCP,
Delivery: func() *base.StreamDelivery {
v := base.StreamDeliveryUnicast
return &v
}(),
Mode: func() *headers.TransportMode {
v := headers.TransportModeRecord
return &v
}(),
InterleavedIds: &[2]int{0, 1},
}.Write(),
},
}.Write(bconn.Writer)
if err != nil {
conn.Close()
return nil, err
}
err = res.Read(bconn.Reader)
if err != nil {
conn.Close()
return nil, err
}
require.Equal(t, base.StatusOK, res.StatusCode)
return conn, nil
}
conn1, err := publish()
require.NoError(t, err)
defer conn1.Close()
conn2, err := publish()
require.NoError(t, err)
defer conn2.Close()
}
func TestPath(t *testing.T) {
for _, ca := range []struct {
name string
@ -952,7 +1047,7 @@ wait @@ -952,7 +1047,7 @@ wait
defer p1.close()
func() {
conn, err := net.Dial("tcp", ownDockerIP+":8554")
conn, err := net.Dial("tcp", "127.0.0.1:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
@ -992,7 +1087,7 @@ wait @@ -992,7 +1087,7 @@ wait
defer p1.close()
func() {
conn, err := net.Dial("tcp", ownDockerIP+":8554")
conn, err := net.Dial("tcp", "127.0.0.1:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
@ -1057,7 +1152,7 @@ wait @@ -1057,7 +1152,7 @@ wait
defer p1.close()
func() {
conn, err := net.Dial("tcp", ownDockerIP+":8554")
conn, err := net.Dial("tcp", "127.0.0.1:8554")
require.NoError(t, err)
defer conn.Close()
bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))

Loading…
Cancel
Save