From d0c2c3a58653ea7fe650c5eb851c228615149beb Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 3 Aug 2020 17:35:34 +0200 Subject: [PATCH] support receiving single tracks (#48) --- client.go | 99 ++++++++++++++++++++++++++++++++++---------------- main.go | 105 +++++++++++++++++++++++++----------------------------- utils.go | 2 +- 3 files changed, 119 insertions(+), 87 deletions(-) diff --git a/client.go b/client.go index f349a135..994f6be2 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ import ( "net" "os" "os/exec" + "strconv" "strings" "time" @@ -88,7 +89,7 @@ type client struct { authHelper *gortsplib.AuthServer authFailures int streamProtocol gortsplib.StreamProtocol - streamTracks []*clientTrack + streamTracks map[int]*clientTrack rtcpReceivers []*gortsplib.RtcpReceiver readBuf *doubleBuffer writeBuf *doubleBuffer @@ -106,9 +107,10 @@ func newClient(p *program, nconn net.Conn) *client { ReadTimeout: p.conf.ReadTimeout, WriteTimeout: p.conf.WriteTimeout, }), - state: clientStateInitial, - readBuf: newDoubleBuffer(clientTcpReadBufferSize), - done: make(chan struct{}), + state: clientStateInitial, + streamTracks: make(map[int]*clientTrack), + readBuf: newDoubleBuffer(clientTcpReadBufferSize), + done: make(chan struct{}), } go c.run() @@ -433,7 +435,7 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - basePath, _, err := splitPath(path) + basePath, controlPath, err := splitPath(path) if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false @@ -457,6 +459,28 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return true } + if c.pathId != "" && basePath != c.pathId { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathId, basePath)) + return false + } + + if !strings.HasPrefix(controlPath, "trackID=") { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("invalid control path (%s)", controlPath)) + return false + } + + tmp, err := strconv.ParseInt(controlPath[len("trackID="):], 10, 64) + if err != nil || tmp < 0 { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("invalid track id (%s)", controlPath)) + return false + } + trackId := int(tmp) + + if _, ok := c.streamTracks[trackId]; ok { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("track %d has already been setup", trackId)) + return false + } + // play via UDP if func() bool { _, ok := th["RTP/AVP"] @@ -474,30 +498,31 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - rtpPort, rtcpPort := th.Ports("client_port") - if rtpPort == 0 || rtcpPort == 0 { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"])) - return false - } - - if c.pathId != "" && basePath != c.pathId { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathId, basePath)) + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return false } - if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) + rtpPort, rtcpPort := th.Ports("client_port") + if rtpPort == 0 || rtcpPort == 0 { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%v)", req.Header["Transport"])) return false } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, basePath, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } + c.streamProtocol = gortsplib.StreamProtocolUdp + c.streamTracks[trackId] = &clientTrack{ + rtpPort: rtpPort, + rtcpPort: rtcpPort, + } + c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -520,25 +545,26 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - if c.pathId != "" && basePath != c.pathId { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("path has changed, was '%s', now is '%s'", c.pathId, basePath)) - return false - } - if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolTcp { c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't receive tracks with different protocols")) return false } res := make(chan error) - c.p.events <- programEventClientSetupPlay{res, c, basePath, gortsplib.StreamProtocolTcp, 0, 0} + c.p.events <- programEventClientSetupPlay{res, c, basePath, trackId} err = <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } - interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1) + c.streamProtocol = gortsplib.StreamProtocolTcp + c.streamTracks[trackId] = &clientTrack{ + rtpPort: 0, + rtcpPort: 0, + } + + interleaved := fmt.Sprintf("%d-%d", ((trackId) * 2), ((trackId)*2)+1) c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, @@ -589,14 +615,14 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { return false } - rtpPort, rtcpPort := th.Ports("client_port") - if rtpPort == 0 || rtcpPort == 0 { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])) + if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) return false } - if len(c.streamTracks) > 0 && c.streamProtocol != gortsplib.StreamProtocolUdp { - c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("can't publish tracks with different protocols")) + rtpPort, rtcpPort := th.Ports("client_port") + if rtpPort == 0 || rtcpPort == 0 { + c.writeResError(req, gortsplib.StatusBadRequest, fmt.Errorf("transport header does not have valid client ports (%s)", req.Header["Transport"])) return false } @@ -606,13 +632,19 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolUdp, rtpPort, rtcpPort} + c.p.events <- programEventClientSetupRecord{res, c} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } + c.streamProtocol = gortsplib.StreamProtocolUdp + c.streamTracks[len(c.streamTracks)] = &clientTrack{ + rtpPort: rtpPort, + rtcpPort: rtcpPort, + } + c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -658,13 +690,19 @@ func (c *client) handleRequest(req *gortsplib.Request) bool { } res := make(chan error) - c.p.events <- programEventClientSetupRecord{res, c, gortsplib.StreamProtocolTcp, 0, 0} + c.p.events <- programEventClientSetupRecord{res, c} err := <-res if err != nil { c.writeResError(req, gortsplib.StatusBadRequest, err) return false } + c.streamProtocol = gortsplib.StreamProtocolTcp + c.streamTracks[len(c.streamTracks)] = &clientTrack{ + rtpPort: 0, + rtcpPort: 0, + } + c.conn.WriteResponse(&gortsplib.Response{ StatusCode: gortsplib.StatusOK, Header: gortsplib.Header{ @@ -776,6 +814,7 @@ func (c *client) runPlay(path string) { c.events = make(chan clientEvent) } + // start sending frames only after sending the response to the PLAY request done := make(chan struct{}) c.p.events <- programEventClientPlay2{done, c} <-done diff --git a/main.go b/main.go index 307a4007..1a86ea5d 100644 --- a/main.go +++ b/main.go @@ -70,22 +70,17 @@ type programEventClientAnnounce struct { func (programEventClientAnnounce) isProgramEvent() {} type programEventClientSetupPlay struct { - res chan error - client *client - path string - protocol gortsplib.StreamProtocol - rtpPort int - rtcpPort int + res chan error + client *client + path string + trackId int } func (programEventClientSetupPlay) isProgramEvent() {} type programEventClientSetupRecord struct { - res chan error - client *client - protocol gortsplib.StreamProtocol - rtpPort int - rtcpPort int + res chan error + client *client } func (programEventClientSetupRecord) isProgramEvent() {} @@ -384,26 +379,16 @@ outer: continue } - if len(evt.client.streamTracks) >= len(path.publisherSdpParsed.MediaDescriptions) { - evt.res <- fmt.Errorf("all the tracks have already been setup") + if evt.trackId >= len(path.publisherSdpParsed.MediaDescriptions) { + evt.res <- fmt.Errorf("track %d does not exist", evt.trackId) continue } evt.client.pathId = evt.path - evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{ - rtpPort: evt.rtpPort, - rtcpPort: evt.rtcpPort, - }) evt.client.state = clientStatePrePlay evt.res <- nil case programEventClientSetupRecord: - evt.client.streamProtocol = evt.protocol - evt.client.streamTracks = append(evt.client.streamTracks, &clientTrack{ - rtpPort: evt.rtpPort, - rtcpPort: evt.rtcpPort, - }) evt.client.state = clientStatePreRecord evt.res <- nil @@ -414,8 +399,8 @@ outer: continue } - if len(evt.client.streamTracks) != len(path.publisherSdpParsed.MediaDescriptions) { - evt.res <- fmt.Errorf("not all tracks have been setup") + if len(evt.client.streamTracks) == 0 { + evt.res <- fmt.Errorf("no tracks have been setup") continue } @@ -589,40 +574,48 @@ func (p *program) findClientPublisher(addr *net.UDPAddr, streamType gortsplib.St func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) { for c := range p.clients { - if c.pathId == path && c.state == clientStatePlay { - if c.streamProtocol == gortsplib.StreamProtocolUdp { - if streamType == gortsplib.StreamTypeRtp { - p.serverRtp.write(&udpAddrBufPair{ - addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[trackId].rtpPort, - }, - buf: frame, - }) - } else { - p.serverRtcp.write(&udpAddrBufPair{ - addr: &net.UDPAddr{ - IP: c.ip(), - Zone: c.zone(), - Port: c.streamTracks[trackId].rtcpPort, - }, - buf: frame, - }) - } + if c.pathId != path || + c.state != clientStatePlay { + continue + } + track, ok := c.streamTracks[trackId] + if !ok { + continue + } + + if c.streamProtocol == gortsplib.StreamProtocolUdp { + if streamType == gortsplib.StreamTypeRtp { + p.serverRtp.write(&udpAddrBufPair{ + addr: &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: track.rtpPort, + }, + buf: frame, + }) } else { - buf := c.writeBuf.swap() - buf = buf[:len(frame)] - copy(buf, frame) - - c.events <- clientEventFrameTcp{ - frame: &gortsplib.InterleavedFrame{ - TrackId: trackId, - StreamType: streamType, - Content: buf, + p.serverRtcp.write(&udpAddrBufPair{ + addr: &net.UDPAddr{ + IP: c.ip(), + Zone: c.zone(), + Port: track.rtcpPort, }, - } + buf: frame, + }) + } + + } else { + buf := c.writeBuf.swap() + buf = buf[:len(frame)] + copy(buf, frame) + + c.events <- clientEventFrameTcp{ + frame: &gortsplib.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: buf, + }, } } } diff --git a/utils.go b/utils.go index ba74291a..dde19d83 100644 --- a/utils.go +++ b/utils.go @@ -110,7 +110,7 @@ func sdpForServer(tracks []*gortsplib.Track) (*sdp.SessionDescription, []byte) { } } - // control attribute is mandatory, and is the path that is appended + // control attribute is the path that is appended // to the stream path in SETUP ret = append(ret, sdp.Attribute{ Key: "control",