Browse Source

update gortsplib

pull/2/head v0.4.4
aler9 6 years ago
parent
commit
0285d029c7
  1. 141
      client.go
  2. 2
      go.mod
  3. 4
      go.sum
  4. 6
      main.go

141
client.go

@ -77,18 +77,18 @@ func sdpFilter(msgIn *sdp.Message, byteIn []byte) (*sdp.Message, []byte) {
return msgOut, byteOut return msgOut, byteOut
} }
func interleavedChannelToTrack(channel int) (int, trackFlow) { func interleavedChannelToTrack(channel uint8) (int, trackFlow) {
if (channel % 2) == 0 { if (channel % 2) == 0 {
return (channel / 2), _TRACK_FLOW_RTP return int(channel / 2), _TRACK_FLOW_RTP
} }
return ((channel - 1) / 2), _TRACK_FLOW_RTCP return int((channel - 1) / 2), _TRACK_FLOW_RTCP
} }
func trackToInterleavedChannel(id int, flow trackFlow) int { func trackToInterleavedChannel(id int, flow trackFlow) uint8 {
if flow == _TRACK_FLOW_RTP { if flow == _TRACK_FLOW_RTP {
return id * 2 return uint8(id * 2)
} }
return (id * 2) + 1 return uint8((id * 2) + 1)
} }
type clientState int type clientState int
@ -104,7 +104,7 @@ const (
type client struct { type client struct {
p *program p *program
conn *gortsplib.Conn conn *gortsplib.ConnServer
state clientState state clientState
ip net.IP ip net.IP
path string path string
@ -117,7 +117,7 @@ type client struct {
func newClient(p *program, nconn net.Conn) *client { func newClient(p *program, nconn net.Conn) *client {
c := &client{ c := &client{
p: p, p: p,
conn: gortsplib.NewConn(nconn), conn: gortsplib.NewConnServer(nconn),
state: _CLIENT_STATE_STARTING, state: _CLIENT_STATE_STARTING,
} }
@ -195,12 +195,12 @@ func (c *client) writeResDeadline(res *gortsplib.Response) {
func (c *client) writeResError(req *gortsplib.Request, err error) { func (c *client) writeResError(req *gortsplib.Request, err error) {
c.log("ERR: %s", err) c.log("ERR: %s", err)
if cseq, ok := req.Headers["CSeq"]; ok { if cseq, ok := req.Header["CSeq"]; ok && len(cseq) == 1 {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 400, StatusCode: 400,
Status: "Bad Request", Status: "Bad Request",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
}, },
}) })
} else { } else {
@ -214,8 +214,8 @@ func (c *client) writeResError(req *gortsplib.Request, err error) {
func (c *client) handleRequest(req *gortsplib.Request) bool { func (c *client) handleRequest(req *gortsplib.Request) bool {
c.log(req.Method) c.log(req.Method)
cseq, ok := req.Headers["CSeq"] cseq, ok := req.Header["CSeq"]
if !ok { if !ok || len(cseq) != 1 {
c.writeResError(req, fmt.Errorf("cseq missing")) c.writeResError(req, fmt.Errorf("cseq missing"))
return false return false
} }
@ -250,9 +250,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": cseq,
"Public": strings.Join([]string{ "Public": []string{strings.Join([]string{
"DESCRIBE", "DESCRIBE",
"ANNOUNCE", "ANNOUNCE",
"SETUP", "SETUP",
@ -260,7 +260,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
"PAUSE", "PAUSE",
"RECORD", "RECORD",
"TEARDOWN", "TEARDOWN",
}, ", "), }, ", ")},
}, },
}) })
return true return true
@ -290,10 +290,10 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Content-Base": req.Url, "Content-Base": []string{req.Url},
"Content-Type": "application/sdp", "Content-Type": []string{"application/sdp"},
}, },
Content: sdp, Content: sdp,
}) })
@ -305,13 +305,13 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
return false return false
} }
ct, ok := req.Headers["Content-Type"] ct, ok := req.Header["Content-Type"]
if !ok { if !ok || len(ct) != 1 {
c.writeResError(req, fmt.Errorf("Content-Type header missing")) c.writeResError(req, fmt.Errorf("Content-Type header missing"))
return false return false
} }
if ct != "application/sdp" { if ct[0] != "application/sdp" {
c.writeResError(req, fmt.Errorf("unsupported Content-Type '%s'", ct)) c.writeResError(req, fmt.Errorf("unsupported Content-Type '%s'", ct))
return false return false
} }
@ -338,8 +338,8 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 401, StatusCode: 401,
Status: "Unauthorized", Status: "Unauthorized",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": req.Headers["CSeq"], "CSeq": []string{cseq[0]},
}, },
}) })
return false return false
@ -370,20 +370,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
}, },
}) })
return true return true
case "SETUP": case "SETUP":
transportStr, ok := req.Headers["Transport"] tsRaw, ok := req.Header["Transport"]
if !ok { if !ok || len(tsRaw) != 1 {
c.writeResError(req, fmt.Errorf("transport header missing")) c.writeResError(req, fmt.Errorf("transport header missing"))
return false return false
} }
th := gortsplib.NewTransportHeader(transportStr) th := gortsplib.ReadHeaderTransport(tsRaw[0])
if _, ok := th["unicast"]; !ok { if _, ok := th["unicast"]; !ok {
c.writeResError(req, fmt.Errorf("transport header does not contain unicast")) c.writeResError(req, fmt.Errorf("transport header does not contain unicast"))
@ -410,7 +410,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 461, StatusCode: 461,
Status: "Unsupported Transport", Status: "Unsupported Transport",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": cseq,
}, },
}) })
@ -419,7 +419,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
rtpPort, rtcpPort := th.GetPorts("client_port") rtpPort, rtcpPort := th.GetPorts("client_port")
if rtpPort == 0 || rtcpPort == 0 { if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr)) c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", tsRaw[0]))
return false return false
} }
@ -463,15 +463,15 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Transport": strings.Join([]string{ "Transport": []string{strings.Join([]string{
"RTP/AVP/UDP", "RTP/AVP/UDP",
"unicast", "unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
}, ";"), }, ";")},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
return true return true
@ -483,7 +483,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 461, StatusCode: 461,
Status: "Unsupported Transport", Status: "Unsupported Transport",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": cseq,
}, },
}) })
@ -532,20 +532,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Transport": strings.Join([]string{ "Transport": []string{strings.Join([]string{
"RTP/AVP/TCP", "RTP/AVP/TCP",
"unicast", "unicast",
fmt.Sprintf("interleaved=%s", interleaved), fmt.Sprintf("interleaved=%s", interleaved),
}, ";"), }, ";")},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
return true return true
} else { } else {
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr)) c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", tsRaw[0]))
return false return false
} }
@ -578,7 +578,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 461, StatusCode: 461,
Status: "Unsupported Transport", Status: "Unsupported Transport",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": cseq,
}, },
}) })
@ -587,7 +587,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
rtpPort, rtcpPort := th.GetPorts("client_port") rtpPort, rtcpPort := th.GetPorts("client_port")
if rtpPort == 0 || rtcpPort == 0 { if rtpPort == 0 || rtcpPort == 0 {
c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", transportStr)) c.writeResError(req, fmt.Errorf("transport header does not have valid client ports (%s)", tsRaw[0]))
return false return false
} }
@ -620,15 +620,15 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Transport": strings.Join([]string{ "Transport": []string{strings.Join([]string{
"RTP/AVP/UDP", "RTP/AVP/UDP",
"unicast", "unicast",
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort),
}, ";"), }, ";")},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
return true return true
@ -640,7 +640,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 461, StatusCode: 461,
Status: "Unsupported Transport", Status: "Unsupported Transport",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": cseq,
}, },
}) })
@ -687,20 +687,20 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Transport": strings.Join([]string{ "Transport": []string{strings.Join([]string{
"RTP/AVP/TCP", "RTP/AVP/TCP",
"unicast", "unicast",
fmt.Sprintf("interleaved=%s", interleaved), fmt.Sprintf("interleaved=%s", interleaved),
}, ";"), }, ";")},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
return true return true
} else { } else {
c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", transportStr)) c.writeResError(req, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP, RTP/AVP/UDP or RTP/AVP/TCP) (%s)", tsRaw[0]))
return false return false
} }
@ -746,9 +746,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
@ -800,9 +800,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
return true return true
@ -836,9 +836,9 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
c.writeResDeadline(&gortsplib.Response{ c.writeResDeadline(&gortsplib.Response{
StatusCode: 200, StatusCode: 200,
Status: "OK", Status: "OK",
Headers: map[string]string{ Header: gortsplib.Header{
"CSeq": cseq, "CSeq": []string{cseq[0]},
"Session": "12345678", "Session": []string{"12345678"},
}, },
}) })
@ -856,16 +856,15 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
// when protocol is TCP, the RTSP connection becomes a RTP connection // when protocol is TCP, the RTSP connection becomes a RTP connection
// receive RTP data and parse it // receive RTP data and parse it
if c.streamProtocol == _STREAM_PROTOCOL_TCP { if c.streamProtocol == _STREAM_PROTOCOL_TCP {
buf := make([]byte, 2048)
for { for {
c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT)) c.conn.NetConn().SetReadDeadline(time.Now().Add(_READ_TIMEOUT))
channel, n, err := c.conn.ReadInterleavedFrame(buf) frame, err := c.conn.ReadInterleavedFrame()
if err != nil { if err != nil {
c.log("ERR: %s", err) c.log("ERR: %s", err)
return false return false
} }
trackId, trackFlow := interleavedChannelToTrack(channel) trackId, trackFlow := interleavedChannelToTrack(frame.Channel)
if trackId >= len(c.streamTracks) { if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId) c.log("ERR: invalid track id '%d'", trackId)
@ -873,7 +872,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool {
} }
c.p.mutex.RLock() c.p.mutex.RLock()
c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n]) c.p.forwardTrack(c.path, trackId, trackFlow, frame.Content)
c.p.mutex.RUnlock() c.p.mutex.RUnlock()
} }
} }

2
go.mod

@ -5,7 +5,7 @@ go 1.13
require ( require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc
gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 v2.2.6
gortc.io/sdp v0.17.0 gortc.io/sdp v0.17.0
) )

4
go.sum

@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c h1:dKNfvjX6CN/+fPsCxwQPRDy0dqW9K1d+61KXeBfzlcU= github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc h1:lRClB+QB904mIwYOy07gArpCwu7wZ9cqgdmrbsP28Rc=
github.com/aler9/gortsplib v0.0.0-20200120211423-ea12a2ccff1c/go.mod h1:YiIgmmv0ELkWUy11Jj2h5AgfqLCpy8sIX/l9MmS8+uw= github.com/aler9/gortsplib v0.0.0-20200126104709-9d2081e29ccc/go.mod h1:YiIgmmv0ELkWUy11Jj2h5AgfqLCpy8sIX/l9MmS8+uw=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=

6
main.go

@ -10,6 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/aler9/gortsplib"
"gopkg.in/alecthomas/kingpin.v2" "gopkg.in/alecthomas/kingpin.v2"
) )
@ -145,7 +146,10 @@ func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte
} else { } else {
c.conn.NetConn().SetWriteDeadline(time.Now().Add(_WRITE_TIMEOUT)) c.conn.NetConn().SetWriteDeadline(time.Now().Add(_WRITE_TIMEOUT))
c.conn.WriteInterleavedFrame(trackToInterleavedChannel(id, flow), frame) c.conn.WriteInterleavedFrame(&gortsplib.InterleavedFrame{
Channel: trackToInterleavedChannel(id, flow),
Content: frame,
})
} }
} }
} }

Loading…
Cancel
Save