diff --git a/README.md b/README.md index d4806f11..1ca26672 100644 --- a/README.md +++ b/README.md @@ -4,15 +4,14 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/aler9/rtsp-simple-server)](https://goreportcard.com/report/github.com/aler9/rtsp-simple-server) [![Build Status](https://travis-ci.org/aler9/rtsp-simple-server.svg?branch=master)](https://travis-ci.org/aler9/rtsp-simple-server) -_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server, a program that allows multiple users to read or publish live video streams. RTSP a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol and read or write data. The server is then responsible of linking the publisher stream with the readers. +_rtsp-simple-server_ is a simple, ready-to-use and zero-dependency RTSP server, a program that allows multiple users to read or publish live video and audio streams. RTSP a standardized protocol that defines how to perform these operations with the help of a server, that is contacted by both readers and publishers in order to negotiate a streaming protocol and read or write data. The server is then responsible of linking the publisher stream with the readers. This software was developed with the aim of simulating a live camera feed for debugging purposes, and therefore to use files instead of real streams. Another reason for the development was the deprecation of _FFserver_, the component of the FFmpeg project that allowed to create a RTSP server with _FFmpeg_ (but this server can be used with any software that supports RTSP). Features: -* Supports reading streams via UDP and TCP -* Supports publishing streams via UDP and TCP -* Supports a single video stream -* Supports one publisher at once, while readers are limitless +* Supports reading and publishing streams via UDP and TCP +* Supports publishing one stream at once, that can be read by multiple users +* Supports multiple video and audio tracks for each stream * Supports the RTP/RTCP streaming protocol @@ -34,7 +33,7 @@ Precompiled binaries are available in the [release](https://github.com/aler9/rts 2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want): ``` - ffmpeg -re -stream_loop -1 -i file.ts -map 0:v:0 -c:v copy -f rtsp rtsp://localhost:8554/ + ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/ ``` 3. Open the stream with VLC: diff --git a/client.go b/client.go new file mode 100644 index 00000000..7056f8fa --- /dev/null +++ b/client.go @@ -0,0 +1,724 @@ +package main + +import ( + "errors" + "fmt" + "io" + "log" + "net" + "net/url" + "strconv" + "strings" + + "rtsp-server/rtsp" + + "gortc.io/sdp" +) + +var ( + errTeardown = errors.New("teardown") + errPlay = errors.New("play") + errRecord = errors.New("record") +) + +func interleavedChannelToTrack(channel int) (trackFlow, int) { + if (channel % 2) == 0 { + return _TRACK_FLOW_RTP, (channel / 2) + } + return _TRACK_FLOW_RTCP, ((channel - 1) / 2) +} + +func trackToInterleavedChannel(flow trackFlow, id int) int { + if flow == _TRACK_FLOW_RTP { + return id * 2 + } + return (id * 2) + 1 +} + +type transportHeader map[string]struct{} + +func newTransportHeader(in string) transportHeader { + th := make(map[string]struct{}) + for _, t := range strings.Split(in, ";") { + th[t] = struct{}{} + } + return th +} + +func (th transportHeader) getKeyValue(key string) string { + prefix := key + "=" + for t := range th { + if strings.HasPrefix(t, prefix) { + return t[len(prefix):] + } + } + return "" +} + +func (th transportHeader) getClientPorts() (int, int) { + val := th.getKeyValue("client_port") + if val == "" { + return 0, 0 + } + + ports := strings.Split(val, "-") + if len(ports) != 2 { + return 0, 0 + } + + port1, err := strconv.ParseInt(ports[0], 10, 64) + if err != nil { + return 0, 0 + } + + port2, err := strconv.ParseInt(ports[1], 10, 64) + if err != nil { + return 0, 0 + } + + return int(port1), int(port2) +} + +type client struct { + p *program + rconn *rtsp.Conn + state string + ip net.IP + streamSdpText []byte // filled only if publisher + streamSdpParsed *sdp.Message // filled only if publisher + streamProtocol streamProtocol + streamTracks []*track +} + +func newRtspClient(p *program, nconn net.Conn) *client { + c := &client{ + p: p, + rconn: rtsp.NewConn(nconn), + state: "STARTING", + } + + c.p.mutex.Lock() + c.p.clients[c] = struct{}{} + c.p.mutex.Unlock() + + return c +} + +func (c *client) close() error { + // already deleted + if _, ok := c.p.clients[c]; !ok { + return nil + } + + delete(c.p.clients, c) + c.rconn.Close() + + if c.p.publisher == c { + c.p.publisher = nil + + // if the publisher has disconnected + // close all other connections + for oc := range c.p.clients { + oc.close() + } + } + + return nil +} + +func (c *client) log(format string, args ...interface{}) { + format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format + log.Printf(format, args...) +} + +func (c *client) run() { + defer c.log("disconnected") + defer func() { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + c.close() + }() + + ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String()) + c.ip = net.ParseIP(ipstr) + + c.log("connected") + + for { + req, err := c.rconn.ReadRequest() + if err != nil { + if err != io.EOF { + c.log("ERR: %s", err) + } + return + } + + c.log(req.Method) + + res, err := c.handleRequest(req) + + switch err { + // normal response + case nil: + err = c.rconn.WriteResponse(res) + if err != nil { + c.log("ERR: %s", err) + return + } + + // TEARDOWN, close connection silently + case errTeardown: + return + + // PLAY: first write response, then set state + // otherwise, in case of TCP connections, RTP packets could be written + // before the response + // then switch to RTP if TCP + case errPlay: + err = c.rconn.WriteResponse(res) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.log("is receiving %d %s via %s", len(c.streamTracks), func() string { + if len(c.streamTracks) == 1 { + return "track" + } + return "tracks" + }(), c.streamProtocol) + + c.p.mutex.Lock() + c.state = "PLAY" + c.p.mutex.Unlock() + + // when protocol is TCP, the RTSP connection becomes a RTP connection + // receive RTP feedback, do not parse it, wait until connection closes + if c.streamProtocol == _STREAM_PROTOCOL_TCP { + buf := make([]byte, 2048) + for { + _, err := c.rconn.Read(buf) + if err != nil { + if err != io.EOF { + c.log("ERR: %s", err) + } + return + } + } + } + + // RECORD: switch to RTP if TCP + case errRecord: + err = c.rconn.WriteResponse(res) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "RECORD" + c.p.mutex.Unlock() + + c.log("is publishing %d %s via %s", len(c.streamTracks), func() string { + if len(c.streamTracks) == 1 { + return "track" + } + return "tracks" + }(), c.streamProtocol) + + // when protocol is TCP, the RTSP connection becomes a RTP connection + // receive RTP data and parse it + if c.streamProtocol == _STREAM_PROTOCOL_TCP { + buf := make([]byte, 2048) + for { + channel, n, err := c.rconn.ReadInterleavedFrame(buf) + if err != nil { + if _, ok := err.(*net.OpError); ok { + } else if err == io.EOF { + } else { + c.log("ERR: %s", err) + } + return + } + + trackFlow, trackId := interleavedChannelToTrack(channel) + + if trackId >= len(c.streamTracks) { + c.log("ERR: invalid track id '%d'", trackId) + return + } + + c.p.mutex.RLock() + c.p.forwardTrack(trackFlow, trackId, buf[:n]) + c.p.mutex.RUnlock() + } + } + + // error: write and exit + default: + c.log("ERR: %s", err) + + if cseq, ok := req.Headers["CSeq"]; ok { + c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 400, + Status: "Bad Request", + Headers: map[string]string{ + "CSeq": cseq, + }, + }) + } else { + c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 400, + Status: "Bad Request", + }) + } + return + } + } +} + +func (c *client) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { + cseq, ok := req.Headers["CSeq"] + if !ok { + return nil, fmt.Errorf("cseq missing") + } + + ur, err := url.Parse(req.Path) + if err != nil { + return nil, fmt.Errorf("unable to parse path '%s'", req.Path) + } + + switch req.Method { + case "OPTIONS": + // do not check state, since OPTIONS can be requested + // in any state + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Public": strings.Join([]string{ + "DESCRIBE", + "ANNOUNCE", + "SETUP", + "PLAY", + "PAUSE", + "RECORD", + "TEARDOWN", + }, ", "), + }, + }, nil + + case "DESCRIBE": + if c.state != "STARTING" { + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + sdp, err := func() ([]byte, error) { + c.p.mutex.RLock() + defer c.p.mutex.RUnlock() + + if c.p.publisher == nil { + return nil, fmt.Errorf("no one is streaming") + } + + return c.p.publisher.streamSdpText, nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Content-Base": ur.String(), + "Content-Type": "application/sdp", + }, + Content: sdp, + }, nil + + case "ANNOUNCE": + if c.state != "STARTING" { + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + ct, ok := req.Headers["Content-Type"] + if !ok { + return nil, fmt.Errorf("Content-Type header missing") + } + + if ct != "application/sdp" { + return nil, fmt.Errorf("unsupported Content-Type '%s'", ct) + } + + sdpParsed, err := func() (*sdp.Message, error) { + s, err := sdp.DecodeSession(req.Content, nil) + if err != nil { + return nil, err + } + + m := &sdp.Message{} + d := sdp.NewDecoder(s) + err = d.Decode(m) + if err != nil { + return nil, err + } + + return m, nil + }() + if err != nil { + return nil, fmt.Errorf("invalid SDP: %s", err) + } + + err = func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if c.p.publisher != nil { + return fmt.Errorf("another client is already streaming") + } + + c.p.publisher = c + c.streamSdpText = req.Content + c.streamSdpParsed = sdpParsed + c.state = "ANNOUNCE" + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + }, + }, nil + + case "SETUP": + transportstr, ok := req.Headers["Transport"] + if !ok { + return nil, fmt.Errorf("transport header missing") + } + + th := newTransportHeader(transportstr) + + if _, ok := th["unicast"]; !ok { + return nil, fmt.Errorf("transport header does not contain unicast") + } + + switch c.state { + // play + case "STARTING", "PRE_PLAY": + err := func() error { + c.p.mutex.RLock() + defer c.p.mutex.RUnlock() + + if c.p.publisher == nil { + return fmt.Errorf("no one is streaming") + } + + return nil + }() + if err != nil { + return nil, err + } + + // play via UDP + if _, ok := th["RTP/AVP"]; ok { + rtpPort, rtcpPort := th.getClientPorts() + if rtpPort == 0 || rtcpPort == 0 { + return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr) + } + + err = func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { + return fmt.Errorf("client want to send tracks with different protocols") + } + + if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { + return fmt.Errorf("all the tracks have already been setup") + } + + c.streamProtocol = _STREAM_PROTOCOL_UDP + c.streamTracks = append(c.streamTracks, &track{ + rtpPort: rtpPort, + rtcpPort: rtcpPort, + }) + + c.state = "PRE_PLAY" + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), + // use two fake server ports, since we do not want to receive feedback + // from the client + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort+2, c.p.rtcpPort+2), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }, nil + + // play via TCP + } else if _, ok := th["RTP/AVP/TCP"]; ok { + err = func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { + return fmt.Errorf("client want to send tracks with different protocols") + } + + if len(c.streamTracks) >= len(c.p.publisher.streamSdpParsed.Medias) { + return fmt.Errorf("all the tracks have already been setup") + } + + c.streamProtocol = _STREAM_PROTOCOL_TCP + c.streamTracks = append(c.streamTracks, &track{ + rtpPort: 0, + rtcpPort: 0, + }) + + c.state = "PRE_PLAY" + return nil + }() + if err != nil { + return nil, err + } + + interleaved := fmt.Sprintf("%d-%d", ((len(c.streamTracks) - 1) * 2), ((len(c.streamTracks)-1)*2)+1) + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP/TCP", + "unicast", + fmt.Sprintf("interleaved=%s", interleaved), + }, ";"), + "Session": "12345678", + }, + }, nil + + } else { + return nil, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transportstr) + } + + // record + case "ANNOUNCE", "PRE_RECORD": + if _, ok := th["mode=record"]; !ok { + return nil, fmt.Errorf("transport header does not contain mode=record") + } + + // record via UDP + if _, ok := th["RTP/AVP/UDP"]; ok { + rtpPort, rtcpPort := th.getClientPorts() + if rtpPort == 0 || rtcpPort == 0 { + return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transportstr) + } + + err = func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_UDP { + return fmt.Errorf("client want to send tracks with different protocols") + } + + if len(c.streamTracks) >= len(c.streamSdpParsed.Medias) { + return fmt.Errorf("all the tracks have already been setup") + } + + c.streamProtocol = _STREAM_PROTOCOL_UDP + c.streamTracks = append(c.streamTracks, &track{ + rtpPort: rtpPort, + rtcpPort: rtcpPort, + }) + + c.state = "PRE_RECORD" + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }, nil + + // record via TCP + } else if _, ok := th["RTP/AVP/TCP"]; ok { + var interleaved string + err = func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) > 0 && c.streamProtocol != _STREAM_PROTOCOL_TCP { + return fmt.Errorf("client want to send tracks with different protocols") + } + + if len(c.streamTracks) >= len(c.streamSdpParsed.Medias) { + return fmt.Errorf("all the tracks have already been setup") + } + + interleaved = th.getKeyValue("interleaved") + if interleaved == "" { + return fmt.Errorf("transport header does not contain interleaved field") + } + + expInterleaved := fmt.Sprintf("%d-%d", 0+len(c.streamTracks)*2, 1+len(c.streamTracks)*2) + if interleaved != expInterleaved { + return fmt.Errorf("wrong interleaved value, expected '%s', got '%s'", expInterleaved, interleaved) + } + + c.streamProtocol = _STREAM_PROTOCOL_TCP + c.streamTracks = append(c.streamTracks, &track{ + rtpPort: 0, + rtcpPort: 0, + }) + + c.state = "PRE_RECORD" + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP/TCP", + "unicast", + fmt.Sprintf("interleaved=%s", interleaved), + }, ";"), + "Session": "12345678", + }, + }, nil + + } else { + return nil, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transportstr) + } + + default: + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + case "PLAY": + if c.state != "PRE_PLAY" { + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + err := func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) != len(c.p.publisher.streamSdpParsed.Medias) { + return fmt.Errorf("not all tracks have been setup") + } + + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }, errPlay + + case "PAUSE": + if c.state != "PLAY" { + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + c.log("paused") + + c.p.mutex.Lock() + c.state = "PRE_PLAY" + c.p.mutex.Unlock() + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }, nil + + case "RECORD": + if c.state != "PRE_RECORD" { + return nil, fmt.Errorf("client is in state '%s'", c.state) + } + + err := func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.streamTracks) != len(c.streamSdpParsed.Medias) { + return fmt.Errorf("not all tracks have been setup") + } + + return nil + }() + if err != nil { + return nil, err + } + + return &rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }, errRecord + + case "TEARDOWN": + return nil, errTeardown + + default: + return nil, fmt.Errorf("unhandled method '%s'", req.Method) + } +} diff --git a/go.mod b/go.mod index 9dd47e71..5550f356 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,5 @@ require ( github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect github.com/stretchr/testify v1.4.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 + gortc.io/sdp v0.17.0 ) diff --git a/go.sum b/go.sum index edf40129..85db1369 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -15,3 +17,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gortc.io/sdp v0.17.0 h1:gPmGXqyszHplnlMeF2X0eCOK+G8aAK4PP6puglIcofc= +gortc.io/sdp v0.17.0/go.mod h1:iwG4LtzGX1MLglrl5x7AL7cfLndFPMfJ02koQl5KIgE= diff --git a/main.go b/main.go index 9607e149..b5c3fa58 100644 --- a/main.go +++ b/main.go @@ -12,17 +12,42 @@ import ( var Version string = "v0.0.0" +type trackFlow int + +const ( + _TRACK_FLOW_RTP trackFlow = iota + _TRACK_FLOW_RTCP +) + +type track struct { + rtpPort int + rtcpPort int +} + +type streamProtocol int + +const ( + _STREAM_PROTOCOL_UDP = iota + _STREAM_PROTOCOL_TCP +) + +func (s streamProtocol) String() string { + if s == _STREAM_PROTOCOL_UDP { + return "udp" + } + return "tcp" +} + type program struct { - rtspPort int - rtpPort int - rtcpPort int - mutex sync.RWMutex - rtspl *rtspListener - rtpl *udpListener - rtcpl *udpListener - clients map[*rtspClient]struct{} - streamAuthor *rtspClient - streamSdp []byte + rtspPort int + rtpPort int + rtcpPort int + mutex sync.RWMutex + rtspl *rtspListener + rtpl *udpListener + rtcpl *udpListener + clients map[*client]struct{} + publisher *client } func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { @@ -30,17 +55,17 @@ func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { rtspPort: rtspPort, rtpPort: rtpPort, rtcpPort: rtcpPort, - clients: make(map[*rtspClient]struct{}), + clients: make(map[*client]struct{}), } var err error - p.rtpl, err = newUdpListener(rtpPort, "RTP", p.handleRtp) + p.rtpl, err = newUdpListener(p, rtpPort, _TRACK_FLOW_RTP) if err != nil { return nil, err } - p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", p.handleRtcp) + p.rtcpl, err = newUdpListener(p, rtcpPort, _TRACK_FLOW_RTCP) if err != nil { return nil, err } @@ -62,37 +87,24 @@ func (p *program) run() { <-infty } -func (p *program) handleRtp(frame []byte) { - p.mutex.RLock() - defer p.mutex.RUnlock() - +func (p *program) forwardTrack(flow trackFlow, id int, frame []byte) { for c := range p.clients { if c.state == "PLAY" { - if c.rtpProto == "udp" { - p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{ - IP: c.IP, - Port: c.rtpPort, - }) - } else { - c.rconn.WriteInterleavedFrame(frame) - } - } - } -} - -func (p *program) handleRtcp(frame []byte) { - p.mutex.RLock() - defer p.mutex.RUnlock() + if c.streamProtocol == _STREAM_PROTOCOL_UDP { + if flow == _TRACK_FLOW_RTP { + p.rtpl.nconn.WriteTo(frame, &net.UDPAddr{ + IP: c.ip, + Port: c.streamTracks[id].rtpPort, + }) + } else { + p.rtcpl.nconn.WriteTo(frame, &net.UDPAddr{ + IP: c.ip, + Port: c.streamTracks[id].rtcpPort, + }) + } - for c := range p.clients { - if c.state == "PLAY" { - if c.rtpProto == "udp" { - p.rtcpl.nconn.WriteTo(frame, &net.UDPAddr{ - IP: c.IP, - Port: c.rtcpPort, - }) } else { - c.rconn.WriteInterleavedFrame(frame) + c.rconn.WriteInterleavedFrame(trackToInterleavedChannel(flow, id), frame) } } } diff --git a/rtsp/conn.go b/rtsp/conn.go index e7c0d1d7..644b0e91 100644 --- a/rtsp/conn.go +++ b/rtsp/conn.go @@ -72,9 +72,9 @@ func (c *Conn) ReadInterleavedFrame(frame []byte) (int, int, error) { return int(header[1]), int(framelen), nil } -func (c *Conn) WriteInterleavedFrame(frame []byte) error { +func (c *Conn) WriteInterleavedFrame(channel int, frame []byte) error { c.writeBuf[0] = 0x24 - c.writeBuf[1] = 0x00 + c.writeBuf[1] = byte(channel) binary.BigEndian.PutUint16(c.writeBuf[2:], uint16(len(frame))) n := copy(c.writeBuf[4:], frame) diff --git a/rtsp_client.go b/rtsp_client.go deleted file mode 100644 index 7080f263..00000000 --- a/rtsp_client.go +++ /dev/null @@ -1,540 +0,0 @@ -package main - -import ( - "errors" - "fmt" - "io" - "log" - "net" - "net/url" - "strconv" - "strings" - - "rtsp-server/rtsp" -) - -var ( - errTeardown = errors.New("teardown") - errPlay = errors.New("play") - errRecord = errors.New("record") -) - -type rtspClient struct { - p *program - rconn *rtsp.Conn - state string - IP net.IP - rtpProto string - rtpPort int - rtcpPort int -} - -func newRtspClient(p *program, nconn net.Conn) *rtspClient { - c := &rtspClient{ - p: p, - rconn: rtsp.NewConn(nconn), - state: "STARTING", - } - - c.p.mutex.Lock() - c.p.clients[c] = struct{}{} - c.p.mutex.Unlock() - - return c -} - -func (c *rtspClient) close() error { - // already deleted - if _, ok := c.p.clients[c]; !ok { - return nil - } - - delete(c.p.clients, c) - c.rconn.Close() - - if c.p.streamAuthor == c { - c.p.streamAuthor = nil - c.p.streamSdp = nil - - // if the publisher has disconnected - // close all other connections - for oc := range c.p.clients { - oc.close() - } - } - - return nil -} - -func (c *rtspClient) log(format string, args ...interface{}) { - format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format - log.Printf(format, args...) -} - -func (c *rtspClient) run() { - defer c.log("disconnected") - defer func() { - c.p.mutex.Lock() - defer c.p.mutex.Unlock() - c.close() - }() - - ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String()) - c.IP = net.ParseIP(ipstr) - - c.log("connected") - - for { - req, err := c.rconn.ReadRequest() - if err != nil { - if err != io.EOF { - c.log("ERR: %s", err) - } - return - } - - c.log(req.Method) - - res, err := c.handleRequest(req) - - switch err { - // normal response - case nil: - err = c.rconn.WriteResponse(res) - if err != nil { - c.log("ERR: %s", err) - return - } - - // TEARDOWN, close connection silently - case errTeardown: - return - - // PLAY: first write response, then set state - // otherwise, in case of TCP connections, RTP packets could be written - // before the response - // then switch to RTP if TCP - case errPlay: - err = c.rconn.WriteResponse(res) - if err != nil { - c.log("ERR: %s", err) - return - } - - c.log("is receiving (via %s)", c.rtpProto) - - c.p.mutex.Lock() - c.state = "PLAY" - c.p.mutex.Unlock() - - // when rtp protocol is TCP, the RTSP connection becomes a RTP connection - // receive RTP feedback, do not parse it, wait until connection closes - if c.rtpProto == "tcp" { - buf := make([]byte, 2048) - for { - _, err := c.rconn.Read(buf) - if err != nil { - if err != io.EOF { - c.log("ERR: %s", err) - } - return - } - } - } - - // RECORD: switch to RTP if TCP - case errRecord: - err = c.rconn.WriteResponse(res) - if err != nil { - c.log("ERR: %s", err) - return - } - - c.p.mutex.Lock() - c.state = "RECORD" - c.p.mutex.Unlock() - - c.log("is publishing (via %s)", c.rtpProto) - - // when rtp protocol is TCP, the RTSP connection becomes a RTP connection - // receive RTP data and parse it - if c.rtpProto == "tcp" { - buf := make([]byte, 2048) - for { - channel, n, err := c.rconn.ReadInterleavedFrame(buf) - if err != nil { - if err != io.EOF { - c.log("ERR: %s", err) - } - return - } - - switch channel { - case 0: - c.p.handleRtp(buf[:n]) - - case 1: - c.p.handleRtcp(buf[:n]) - - default: - c.log("ERR: unsupported channel '%d'", channel) - return - } - } - } - - // error: write and exit - default: - c.log("ERR: %s", err) - - if cseq, ok := req.Headers["CSeq"]; ok { - c.rconn.WriteResponse(&rtsp.Response{ - StatusCode: 400, - Status: "Bad Request", - Headers: map[string]string{ - "CSeq": cseq, - }, - }) - } else { - c.rconn.WriteResponse(&rtsp.Response{ - StatusCode: 400, - Status: "Bad Request", - }) - } - return - } - } -} - -func (c *rtspClient) handleRequest(req *rtsp.Request) (*rtsp.Response, error) { - cseq, ok := req.Headers["CSeq"] - if !ok { - return nil, fmt.Errorf("cseq missing") - } - - ur, err := url.Parse(req.Path) - if err != nil { - return nil, fmt.Errorf("unable to parse path '%s'", req.Path) - } - - switch req.Method { - case "OPTIONS": - // do not check state, since OPTIONS can be requested - // in any state - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Public": strings.Join([]string{ - "DESCRIBE", - "ANNOUNCE", - "SETUP", - "PLAY", - "PAUSE", - "RECORD", - "TEARDOWN", - }, ", "), - }, - }, nil - - case "DESCRIBE": - if c.state != "STARTING" { - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - sdp, err := func() ([]byte, error) { - c.p.mutex.RLock() - defer c.p.mutex.RUnlock() - - if len(c.p.streamSdp) == 0 { - return nil, fmt.Errorf("no one is streaming") - } - - return c.p.streamSdp, nil - }() - if err != nil { - return nil, err - } - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Content-Base": ur.String(), - "Content-Type": "application/sdp", - }, - Content: sdp, - }, nil - - case "ANNOUNCE": - if c.state != "STARTING" { - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - ct, ok := req.Headers["Content-Type"] - if !ok { - return nil, fmt.Errorf("Content-Type header missing") - } - - if ct != "application/sdp" { - return nil, fmt.Errorf("unsupported Content-Type '%s'", ct) - } - - err := func() error { - c.p.mutex.Lock() - defer c.p.mutex.Unlock() - - if c.p.streamAuthor != nil { - return fmt.Errorf("another client is already streaming") - } - - c.p.streamAuthor = c - c.p.streamSdp = req.Content - return nil - }() - if err != nil { - return nil, err - } - - c.p.mutex.Lock() - c.state = "ANNOUNCE" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - }, - }, nil - - case "SETUP": - transport, ok := req.Headers["Transport"] - if !ok { - return nil, fmt.Errorf("transport header missing") - } - - transports := make(map[string]struct{}) - for _, t := range strings.Split(transport, ";") { - transports[t] = struct{}{} - } - - if _, ok := transports["unicast"]; !ok { - return nil, fmt.Errorf("transport header does not contain unicast") - } - - getPorts := func() (int, int) { - for t := range transports { - if !strings.HasPrefix(t, "client_port=") { - continue - } - t = t[len("client_port="):] - - ports := strings.Split(t, "-") - if len(ports) != 2 { - return 0, 0 - } - - port1, err := strconv.ParseInt(ports[0], 10, 64) - if err != nil { - return 0, 0 - } - - port2, err := strconv.ParseInt(ports[1], 10, 64) - if err != nil { - return 0, 0 - } - - return int(port1), int(port2) - } - return 0, 0 - } - - switch c.state { - // play - case "STARTING": - // UDP - if _, ok := transports["RTP/AVP"]; ok { - clientPort1, clientPort2 := getPorts() - if clientPort1 == 0 || clientPort2 == 0 { - return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transport) - } - - c.p.mutex.Lock() - c.rtpProto = "udp" - c.rtpPort = clientPort1 - c.rtcpPort = clientPort2 - c.state = "PRE_PLAY" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP", - "unicast", - fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), - // use two fake server ports, since we do not want to receive feedback - // from the client - fmt.Sprintf("server_port=%d-%d", c.p.rtpPort+2, c.p.rtcpPort+2), - "ssrc=1234ABCD", - }, ";"), - "Session": "12345678", - }, - }, nil - - // TCP - } else if _, ok := transports["RTP/AVP/TCP"]; ok { - c.p.mutex.Lock() - c.rtpProto = "tcp" - c.state = "PRE_PLAY" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP/TCP", - "unicast", - "destination=127.0.0.1", - "source=127.0.0.1", - "interleaved=0-1", - }, ";"), - "Session": "12345678", - }, - }, nil - - } else { - return nil, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport) - } - - // record - case "ANNOUNCE": - if _, ok := transports["mode=record"]; !ok { - return nil, fmt.Errorf("transport header does not contain mode=record") - } - - if _, ok := transports["RTP/AVP/UDP"]; ok { - clientPort1, clientPort2 := getPorts() - if clientPort1 == 0 || clientPort2 == 0 { - return nil, fmt.Errorf("transport header does not have valid client ports (%s)", transport) - } - - c.p.mutex.Lock() - c.rtpProto = "udp" - c.rtpPort = clientPort1 - c.rtcpPort = clientPort2 - c.state = "PRE_RECORD" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP", - "unicast", - fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), - fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), - "ssrc=1234ABCD", - }, ";"), - "Session": "12345678", - }, - }, nil - - } else if _, ok := transports["RTP/AVP/TCP"]; ok { - if _, ok := transports["interleaved=0-1"]; !ok { - return nil, fmt.Errorf("transport header does not contain interleaved=0-1") - } - - c.p.mutex.Lock() - c.rtpProto = "tcp" - c.state = "PRE_RECORD" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Transport": strings.Join([]string{ - "RTP/AVP/TCP", - "unicast", - "destination=127.0.0.1", - "source=127.0.0.1", - }, ";"), - "Session": "12345678", - }, - }, nil - - } else { - return nil, fmt.Errorf("transport header does not contain a valid protocol (RTP/AVP or RTP/AVP/TCP) (%s)", transport) - } - - default: - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - case "PLAY": - if c.state != "PRE_PLAY" { - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Session": "12345678", - }, - }, errPlay - - case "PAUSE": - if c.state != "PLAY" { - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - c.log("paused receiving") - - c.p.mutex.Lock() - c.state = "PRE_PLAY" - c.p.mutex.Unlock() - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Session": "12345678", - }, - }, nil - - case "RECORD": - if c.state != "PRE_RECORD" { - return nil, fmt.Errorf("client is in state '%s'", c.state) - } - - return &rtsp.Response{ - StatusCode: 200, - Status: "OK", - Headers: map[string]string{ - "CSeq": cseq, - "Session": "12345678", - }, - }, errRecord - - case "TEARDOWN": - return nil, errTeardown - - default: - return nil, fmt.Errorf("unhandled method '%s'", req.Method) - } -} diff --git a/rtsp_listener.go b/rtsplistener.go similarity index 100% rename from rtsp_listener.go rename to rtsplistener.go diff --git a/udp_listener.go b/udp_listener.go deleted file mode 100644 index 7cc5966a..00000000 --- a/udp_listener.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "log" - "net" -) - -type udpListener struct { - nconn *net.UDPConn - logPrefix string - cb func([]byte) -} - -func newUdpListener(port int, logPrefix string, cb func([]byte)) (*udpListener, error) { - nconn, err := net.ListenUDP("udp", &net.UDPAddr{ - Port: port, - }) - if err != nil { - return nil, err - } - - l := &udpListener{ - nconn: nconn, - logPrefix: logPrefix, - cb: cb, - } - - l.log("opened on :%d", port) - return l, nil -} - -func (l *udpListener) log(format string, args ...interface{}) { - log.Printf("["+l.logPrefix+" listener] "+format, args...) -} - -func (l *udpListener) run() { - buf := make([]byte, 2048) // UDP MTU is 1400 - - for { - n, _, err := l.nconn.ReadFromUDP(buf) - if err != nil { - l.log("ERR: %s", err) - break - } - - l.cb(buf[:n]) - } -} diff --git a/udplistener.go b/udplistener.go new file mode 100644 index 00000000..5d273679 --- /dev/null +++ b/udplistener.go @@ -0,0 +1,90 @@ +package main + +import ( + "log" + "net" +) + +type udpListener struct { + p *program + nconn *net.UDPConn + flow trackFlow +} + +func newUdpListener(p *program, port int, flow trackFlow) (*udpListener, error) { + nconn, err := net.ListenUDP("udp", &net.UDPAddr{ + Port: port, + }) + if err != nil { + return nil, err + } + + l := &udpListener{ + p: p, + nconn: nconn, + flow: flow, + } + + l.log("opened on :%d", port) + return l, nil +} + +func (l *udpListener) log(format string, args ...interface{}) { + var label string + if l.flow == _TRACK_FLOW_RTP { + label = "RTP" + } else { + label = "RTCP" + } + log.Printf("["+label+" listener] "+format, args...) +} + +func (l *udpListener) run() { + buf := make([]byte, 2048) // UDP MTU is 1400 + + for { + n, addr, err := l.nconn.ReadFromUDP(buf) + if err != nil { + l.log("ERR: %s", err) + break + } + + func() { + l.p.mutex.RLock() + defer l.p.mutex.RUnlock() + + if l.p.publisher == nil { + return + } + + if l.p.publisher.streamProtocol != _STREAM_PROTOCOL_UDP { + return + } + + if !l.p.publisher.ip.Equal(addr.IP) { + return + } + + // get track id by using client port + trackId := func() int { + for i, t := range l.p.publisher.streamTracks { + if l.flow == _TRACK_FLOW_RTP { + if t.rtpPort == addr.Port { + return i + } + } else { + if t.rtcpPort == addr.Port { + return i + } + } + } + return -1 + }() + if trackId < 0 { + return + } + + l.p.forwardTrack(l.flow, trackId, buf[:n]) + }() + } +}