Browse Source

support multiple paths

pull/2/head v0.2.0
aler9 6 years ago
parent
commit
9728ea823d
  1. 8
      README.md
  2. 141
      client.go
  3. 33
      main.go
  4. 8
      rtsp/request.go
  5. 8
      rtsp/request_test.go
  6. 2
      rtsplistener.go
  7. 42
      udplistener.go

8
README.md

@ -10,7 +10,7 @@ This software was developed with the aim of simulating a live camera feed for de
Features: Features:
* Supports reading and publishing streams via UDP and TCP * Supports reading and publishing streams via UDP and TCP
* Supports publishing one stream at once, that can be read by multiple users * Supports publishing multiple streams at once, each in a separate path, that can be read by multiple users
* Supports multiple video and audio tracks for each stream * Supports multiple video and audio tracks for each stream
* Supports the RTP/RTCP streaming protocol * Supports the RTP/RTCP streaming protocol
@ -33,17 +33,17 @@ Precompiled binaries are available in the [release](https://github.com/aler9/rts
2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want): 2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want):
``` ```
ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/ ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream
``` ```
3. Open the stream with VLC: 3. Open the stream with VLC:
``` ```
vlc rtsp://localhost:8554/ vlc rtsp://localhost:8554/mystream
``` ```
you can alternatively use GStreamer: you can alternatively use GStreamer:
``` ```
gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/mystream ! rtph264depay ! decodebin ! autovideosink
``` ```
<br /> <br />

141
client.go

@ -21,14 +21,14 @@ var (
errRecord = errors.New("record") errRecord = errors.New("record")
) )
func interleavedChannelToTrack(channel int) (trackFlow, int) { func interleavedChannelToTrack(channel int) (int, trackFlow) {
if (channel % 2) == 0 { if (channel % 2) == 0 {
return _TRACK_FLOW_RTP, (channel / 2) return (channel / 2), _TRACK_FLOW_RTP
} }
return _TRACK_FLOW_RTCP, ((channel - 1) / 2) return ((channel - 1) / 2), _TRACK_FLOW_RTCP
} }
func trackToInterleavedChannel(flow trackFlow, id int) int { func trackToInterleavedChannel(id int, flow trackFlow) int {
if flow == _TRACK_FLOW_RTP { if flow == _TRACK_FLOW_RTP {
return id * 2 return id * 2
} }
@ -84,13 +84,14 @@ type client struct {
rconn *rtsp.Conn rconn *rtsp.Conn
state string state string
ip net.IP ip net.IP
path string
streamSdpText []byte // filled only if publisher streamSdpText []byte // filled only if publisher
streamSdpParsed *sdp.Message // filled only if publisher streamSdpParsed *sdp.Message // filled only if publisher
streamProtocol streamProtocol streamProtocol streamProtocol
streamTracks []*track streamTracks []*track
} }
func newRtspClient(p *program, nconn net.Conn) *client { func newClient(p *program, nconn net.Conn) *client {
c := &client{ c := &client{
p: p, p: p,
rconn: rtsp.NewConn(nconn), rconn: rtsp.NewConn(nconn),
@ -113,16 +114,19 @@ func (c *client) close() error {
delete(c.p.clients, c) delete(c.p.clients, c)
c.rconn.Close() c.rconn.Close()
if c.p.publisher == c { if c.path != "" {
c.p.publisher = nil if pub, ok := c.p.publishers[c.path]; ok && pub == c {
delete(c.p.publishers, c.path)
// if the publisher has disconnected // if the publisher has disconnected
// close all other connections // close all other connections that share the same path
for oc := range c.p.clients { for oc := range c.p.clients {
oc.close() if oc.path == c.path {
oc.close()
}
}
} }
} }
return nil return nil
} }
@ -181,7 +185,7 @@ func (c *client) run() {
return return
} }
c.log("is receiving %d %s via %s", len(c.streamTracks), func() string { c.log("is receiving on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 { if len(c.streamTracks) == 1 {
return "track" return "track"
} }
@ -219,7 +223,7 @@ func (c *client) run() {
c.state = "RECORD" c.state = "RECORD"
c.p.mutex.Unlock() c.p.mutex.Unlock()
c.log("is publishing %d %s via %s", len(c.streamTracks), func() string { c.log("is publishing on path %s, %d %s via %s", c.path, len(c.streamTracks), func() string {
if len(c.streamTracks) == 1 { if len(c.streamTracks) == 1 {
return "track" return "track"
} }
@ -241,7 +245,7 @@ func (c *client) run() {
return return
} }
trackFlow, trackId := interleavedChannelToTrack(channel) trackId, trackFlow := interleavedChannelToTrack(channel)
if trackId >= len(c.streamTracks) { if trackId >= len(c.streamTracks) {
c.log("ERR: invalid track id '%d'", trackId) c.log("ERR: invalid track id '%d'", trackId)
@ -249,7 +253,7 @@ func (c *client) run() {
} }
c.p.mutex.RLock() c.p.mutex.RLock()
c.p.forwardTrack(trackFlow, trackId, buf[:n]) c.p.forwardTrack(c.path, trackId, trackFlow, buf[:n])
c.p.mutex.RUnlock() c.p.mutex.RUnlock()
} }
} }
@ -283,10 +287,29 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("cseq missing") return nil, fmt.Errorf("cseq missing")
} }
ur, err := url.Parse(req.Path) path, err := func() (string, error) {
if err != nil { ur, err := url.Parse(req.Url)
return nil, fmt.Errorf("unable to parse path '%s'", req.Path) if err != nil {
} return "", fmt.Errorf("unable to parse path '%s'", req.Url)
}
path := ur.Path
// remove leading slash
if len(path) > 1 {
path = path[1:]
}
// strip any subpath
if n := strings.Index(path, "/"); n >= 0 {
path = path[:n]
}
return path, nil
}()
c.p.mutex.Lock()
c.path = path
c.p.mutex.Unlock()
switch req.Method { switch req.Method {
case "OPTIONS": case "OPTIONS":
@ -319,11 +342,12 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
c.p.mutex.RLock() c.p.mutex.RLock()
defer c.p.mutex.RUnlock() defer c.p.mutex.RUnlock()
if c.p.publisher == nil { pub, ok := c.p.publishers[path]
return nil, fmt.Errorf("no one is streaming") if !ok {
return nil, fmt.Errorf("no one is streaming on path '%s'", path)
} }
return c.p.publisher.streamSdpText, nil return pub.streamSdpText, nil
}() }()
if err != nil { if err != nil {
return nil, err return nil, err
@ -334,7 +358,7 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
Status: "OK", Status: "OK",
Headers: map[string]string{ Headers: map[string]string{
"CSeq": cseq, "CSeq": cseq,
"Content-Base": ur.String(), "Content-Base": req.Url,
"Content-Type": "application/sdp", "Content-Type": "application/sdp",
}, },
Content: sdp, Content: sdp,
@ -377,11 +401,13 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
c.p.mutex.Lock() c.p.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.mutex.Unlock()
if c.p.publisher != nil { _, ok := c.p.publishers[path]
return fmt.Errorf("another client is already streaming") if ok {
return fmt.Errorf("another client is already publishing on path '%s'", path)
} }
c.p.publisher = c c.path = path
c.p.publishers[path] = c
c.streamSdpText = req.Content c.streamSdpText = req.Content
c.streamSdpParsed = sdpParsed c.streamSdpParsed = sdpParsed
c.state = "ANNOUNCE" c.state = "ANNOUNCE"
@ -414,20 +440,6 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
switch c.state { switch c.state {
// play // play
case "STARTING", "PRE_PLAY": case "STARTING", "PRE_PLAY":
err := func() error {
c.p.mutex.RLock()
defer c.p.mutex.RUnlock()
if c.p.publisher == nil {
return fmt.Errorf("no one is streaming")
}
return nil
}()
if err != nil {
return nil, err
}
// play via UDP // play via UDP
if _, ok := th["RTP/AVP"]; ok { if _, ok := th["RTP/AVP"]; ok {
rtpPort, rtcpPort := th.getClientPorts() rtpPort, rtcpPort := th.getClientPorts()
@ -435,18 +447,28 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr) return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr)
} }
if c.path != "" && path != c.path {
return nil, fmt.Errorf("path has changed")
}
err = func() error { err = func() error {
c.p.mutex.Lock() c.p.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.mutex.Unlock()
pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP {
return fmt.Errorf("client want to send tracks with different protocols") return fmt.Errorf("client want to send tracks with different protocols")
} }
if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup") return fmt.Errorf("all the tracks have already been setup")
} }
c.path = path
c.streamProtocol = _STREAM_PROTOCOL_UDP c.streamProtocol = _STREAM_PROTOCOL_UDP
c.streamTracks = append(c.streamTracks, &track{ c.streamTracks = append(c.streamTracks, &track{
rtpPort: rtpPort, rtpPort: rtpPort,
@ -480,18 +502,28 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
// play via TCP // play via TCP
} else if _, ok := th["RTP/AVP/TCP"]; ok { } else if _, ok := th["RTP/AVP/TCP"]; ok {
if c.path != "" && path != c.path {
return nil, fmt.Errorf("path has changed")
}
err = func() error { err = func() error {
c.p.mutex.Lock() c.p.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.mutex.Unlock()
pub, ok := c.p.publishers[path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", path)
}
if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP {
return fmt.Errorf("client want to send tracks with different protocols") return fmt.Errorf("client want to send tracks with different protocols")
} }
if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { if len(c.streamTracks) >= len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("all the tracks have already been setup") return fmt.Errorf("all the tracks have already been setup")
} }
c.path = path
c.streamProtocol = _STREAM_PROTOCOL_TCP c.streamProtocol = _STREAM_PROTOCOL_TCP
c.streamTracks = append(c.streamTracks, &track{ c.streamTracks = append(c.streamTracks, &track{
rtpPort: 0, rtpPort: 0,
@ -531,6 +563,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("transport header does not contain mode=record") return nil, fmt.Errorf("transport header does not contain mode=record")
} }
if path != c.path {
return nil, fmt.Errorf("path has changed")
}
// record via UDP // record via UDP
if _, ok := th["RTP/AVP/UDP"]; ok { if _, ok := th["RTP/AVP/UDP"]; ok {
rtpPort, rtcpPort := th.getClientPorts() rtpPort, rtcpPort := th.getClientPorts()
@ -644,11 +680,20 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state) return nil, fmt.Errorf("client is in state '%s'", c.state)
} }
if path != c.path {
return nil, fmt.Errorf("path has changed")
}
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.mutex.Unlock()
if len(c.streamTracks) != len(c.p.publisher.streamSdpParsed.Medias) { pub, ok := c.p.publishers[c.path]
if !ok {
return fmt.Errorf("no one is streaming on path '%s'", c.path)
}
if len(c.streamTracks) != len(pub.streamSdpParsed.Medias) {
return fmt.Errorf("not all tracks have been setup") return fmt.Errorf("not all tracks have been setup")
} }
@ -672,6 +717,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state) return nil, fmt.Errorf("client is in state '%s'", c.state)
} }
if path != c.path {
return nil, fmt.Errorf("path has changed")
}
c.log("paused") c.log("paused")
c.p.mutex.Lock() c.p.mutex.Lock()
@ -692,6 +741,10 @@ func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) {
return nil, fmt.Errorf("client is in state '%s'", c.state) return nil, fmt.Errorf("client is in state '%s'", c.state)
} }
if path != c.path {
return nil, fmt.Errorf("path has changed")
}
err := func() error { err := func() error {
c.p.mutex.Lock() c.p.mutex.Lock()
defer c.p.mutex.Unlock() defer c.p.mutex.Unlock()

33
main.go

@ -39,23 +39,24 @@ func (s streamProtocol) String() string {
} }
type program struct { type program struct {
rtspPort int rtspPort int
rtpPort int rtpPort int
rtcpPort int rtcpPort int
mutex sync.RWMutex mutex sync.RWMutex
rtspl *rtspListener rtspl *rtspListener
rtpl *udpListener rtpl *udpListener
rtcpl *udpListener rtcpl *udpListener
clients map[*client]struct{} clients map[*client]struct{}
publisher *client publishers map[string]*client
} }
func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) {
p := &program{ p := &program{
rtspPort: rtspPort, rtspPort: rtspPort,
rtpPort: rtpPort, rtpPort: rtpPort,
rtcpPort: rtcpPort, rtcpPort: rtcpPort,
clients: make(map[*client]struct{}), clients: make(map[*client]struct{}),
publishers: make(map[string]*client),
} }
var err error var err error
@ -87,9 +88,9 @@ func (p *program) run() {
<-infty <-infty
} }
func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) { func (p *program) forwardTrack(path string, id int, flow trackFlow, frame []byte) {
for c := range p.clients { for c := range p.clients {
if c.state == "PLAY" { if c.path == path && c.state == "PLAY" {
if c.streamProtocol == _STREAM_PROTOCOL_UDP { if c.streamProtocol == _STREAM_PROTOCOL_UDP {
if flow == _TRACK_FLOW_RTP { if flow == _TRACK_FLOW_RTP {
p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{ p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{
@ -104,7 +105,7 @@ func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) {
} }
} else { } else {
c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(flow, id), frame) c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(id, flow), frame)
} }
} }
} }

8
rtsp/request.go

@ -8,7 +8,7 @@ import (
type Request struct { type Request struct {
Method string Method string
Path string Url string
Headers map[string]string Headers map[string]string
Content []byte Content []byte
} }
@ -32,9 +32,9 @@ func requestDecode(r io.Reader) (*Request, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
req.Path = string(byts[:len(byts)-1]) req.Url = string(byts[:len(byts)-1])
if len(req.Path) == 0 { if len(req.Url) == 0 {
return nil, fmt.Errorf("empty path") return nil, fmt.Errorf("empty path")
} }
@ -69,7 +69,7 @@ func requestDecode(r io.Reader) (*Request, error) {
func requestEncode(w io.Writer, req *Request) error { func requestEncode(w io.Writer, req *Request) error {
wb := bufio.NewWriter(w) wb := bufio.NewWriter(w)
_, err := wb.Write([]byte(req.Method + " " + req.Path + " " + _RTSP_PROTO + "\r\n")) _, err := wb.Write([]byte(req.Method + " " + req.Url + " " + _RTSP_PROTO + "\r\n"))
if err != nil { if err != nil {
return err return err
} }

8
rtsp/request_test.go

@ -21,7 +21,7 @@ var casesRequest = []struct {
"\r\n"), "\r\n"),
&Request{ &Request{
Method: "OPTIONS", Method: "OPTIONS",
Path: "rtsp://example.com/media.mp4", Url: "rtsp://example.com/media.mp4",
Headers: map[string]string{ Headers: map[string]string{
"CSeq": "1", "CSeq": "1",
"Require": "implicit-play", "Require": "implicit-play",
@ -36,7 +36,7 @@ var casesRequest = []struct {
"\r\n"), "\r\n"),
&Request{ &Request{
Method: "DESCRIBE", Method: "DESCRIBE",
Path: "rtsp://example.com/media.mp4", Url: "rtsp://example.com/media.mp4",
Headers: map[string]string{ Headers: map[string]string{
"CSeq": "2", "CSeq": "2",
}, },
@ -64,7 +64,7 @@ var casesRequest = []struct {
"m=video 2232 RTP/AVP 31\n"), "m=video 2232 RTP/AVP 31\n"),
&Request{ &Request{
Method: "ANNOUNCE", Method: "ANNOUNCE",
Path: "rtsp://example.com/media.mp4", Url: "rtsp://example.com/media.mp4",
Headers: map[string]string{ Headers: map[string]string{
"CSeq": "7", "CSeq": "7",
"Date": "23 Jan 1997 15:35:06 GMT", "Date": "23 Jan 1997 15:35:06 GMT",
@ -98,7 +98,7 @@ var casesRequest = []struct {
"jitter\n"), "jitter\n"),
&Request{ &Request{
Method: "GET_PARAMETER", Method: "GET_PARAMETER",
Path: "rtsp://example.com/media.mp4", Url: "rtsp://example.com/media.mp4",
Headers: map[string]string{ Headers: map[string]string{
"CSeq": "9", "CSeq": "9",
"Content-Type": "text/parameters", "Content-Type": "text/parameters",

2
rtsplistener.go

@ -38,7 +38,7 @@ func (l *rtspListener) run() {
break break
} }
rsc := newRtspClient(l.p, nconn) rsc := newClient(l.p, nconn)
go rsc.run() go rsc.run()
} }
} }

42
udplistener.go

@ -53,38 +53,32 @@ func (l *udpListener) run() {
l.p.mutex.RLock() l.p.mutex.RLock()
defer l.p.mutex.RUnlock() defer l.p.mutex.RUnlock()
if l.p.publisher == nil { // find path and track id
return path, trackId := func() (string, int) {
} for _, pub := range l.p.publishers {
for i, t := range pub.streamTracks {
if l.p.publisher.streamProtocol != _STREAM_PROTOCOL_UDP { if !pub.ip.Equal(addr.IP) {
return continue
}
if !l.p.publisher.ip.Equal(addr.IP) {
return
}
// get track id by using client port
trackId := func() int {
for i, t := range l.p.publisher.streamTracks {
if l.flow == _TRACK_FLOW_RTP {
if t.rtpPort == addr.Port {
return i
} }
} else {
if t.rtcpPort == addr.Port { if l.flow == _TRACK_FLOW_RTP {
return i if t.rtpPort == addr.Port {
return pub.path, i
}
} else {
if t.rtcpPort == addr.Port {
return pub.path, i
}
} }
} }
} }
return -1 return "", -1
}() }()
if trackId < 0 { if path == "" {
return return
} }
l.p.forwardTrack(l.flow, trackId, buf[:n]) l.p.forwardTrack(path, trackId, l.flow, buf[:n])
}() }()
} }
} }

Loading…
Cancel
Save