diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..70d34cec --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.git +/release diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..4f062d69 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/release diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..50bbbd83 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2019 aler9 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..ccc3c5a5 --- /dev/null +++ b/Makefile @@ -0,0 +1,101 @@ + +.PHONY: $(shell ls) + +BASE_IMAGE = amd64/golang:1.13-alpine3.10 + +help: + @echo "usage: make [action]" + @echo "" + @echo "available actions:" + @echo "" + @echo " mod-tidy run go mod tidy" + @echo " format format source files" + @echo " test run available tests" + @echo " run run app" + @echo "" + +mod-tidy: + docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \ + sh -c "apk add git && cd /s && go get && go mod tidy" + +format: + docker run --rm -it -v $(PWD):/s $(BASE_IMAGE) \ + sh -c "cd /s && find . -type f -name '*.go' | xargs gofmt -l -w -s" + +define DOCKERFILE_TEST +FROM $(BASE_IMAGE) +RUN apk add --no-cache make git +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +endef +export DOCKERFILE_TEST + +test: + echo "$$DOCKERFILE_TEST" | docker build -q . -f - -t temp + docker run --rm -it \ + --name temp \ + temp \ + make test-nodocker + +IMAGES = $(shell echo test-images/*/ | xargs -n1 basename) + +test-nodocker: + $(eval export CGO_ENABLED = 0) + go test -v ./rtsp + +define DOCKERFILE_RUN +FROM $(BASE_IMAGE) +RUN apk add --no-cache git +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +RUN go build -o /out . +endef +export DOCKERFILE_RUN + +run: + echo "$$DOCKERFILE_RUN" | docker build -q . -f - -t temp + docker run --rm -it \ + --network=host \ + --name temp \ + temp \ + /out + +define DOCKERFILE_RELEASE +FROM $(BASE_IMAGE) +RUN apk add --no-cache zip make git tar +WORKDIR /s +COPY go.mod go.sum ./ +RUN go mod download +COPY . ./ +RUN make release-nodocker +endef +export DOCKERFILE_RELEASE + +release: + echo "$$DOCKERFILE_RELEASE" | docker build . -f - -t temp \ + && docker run --rm -it -v $(PWD):/out \ + temp sh -c "rm -rf /out/release && cp -r /s/release /out/" + +release-nodocker: + $(eval VERSION := $(shell git describe --tags)) + $(eval GOBUILD := go build -ldflags '-X "main.Version=$(VERSION)"') + rm -rf release && mkdir release + + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server.exe + cd /tmp && zip -q $(PWD)/release/rtsp-simple-server_$(VERSION)_windows_amd64.zip rtsp-simple-server.exe + + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_amd64.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm6.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm7.tar.gz --owner=0 --group=0 rtsp-simple-server + + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 $(GOBUILD) -o /tmp/rtsp-simple-server + tar -C /tmp -czf $(PWD)/release/rtsp-simple-server_$(VERSION)_linux_arm64.tar.gz --owner=0 --group=0 rtsp-simple-server diff --git a/README.md b/README.md index a6878f1c..3a599827 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,58 @@ + # rtsp-simple-server -RTSP video server written in Go + +_rtsp-simple-server_ is a simple an ready-to-use RTSP video server, a program that allows multiple users to publish or read live video and audio streams. RTSP a standardized protocol that defines how to perform these operations with the help of a server, that both publishers and readers can contact in order to negotiate a streaming protocol and write or read data. The server is then responsible of linking the publisher stream with the readers. + +This software was developed with the aim of simulating a live camera feed for debugging purposes, and therefore to use files instead of real streams. Another reason for the development was the deprecation of _FFserver_, the component of the FFmpeg project that allowed to create a RTSP server with _FFmpeg_ (but this server can be used with any software that supports RTSP). + +It actually supports *one* publisher, while readers can be more than one. + + +## Installation + +Precompiled binaries are available in the [release](https://github.com/aler9/rtsp-simple-server/releases) page. Just download and extract the executable. + + +## Usage + +1. Start the server: + ``` + ./rtsp-simple-server + ``` + +2. In another terminal, publish something with FFmpeg (in this example it's a video file, but it can be anything you want): + ``` + ffmpeg -re -stream_loop -1 -i file.ts -map 0:v:0 -c:v copy -f rtsp rtsp://localhost:8554/ + ``` + +3. Open the stream with VLC: + ``` + vlc rtsp://localhost:8554/ + ``` + + you can alternatively use GStreamer: + ``` + gst-launch-1.0 -v rtspsrc location=rtsp://localhost:8554/ ! rtph264depay ! decodebin ! autovideosink + ``` + +## Full command-line usage + +``` +usage: rtsp-simple-server [] + +rtsp-simple-server + +RTSP server. + +Flags: + --help Show context-sensitive help (also try --help-long and --help-man). + --version print rtsp-simple-server version + --rtsp-port=8554 port of the RTSP TCP listener + --rtp-port=8000 port of the RTP UDP listener + --rtcp-port=8001 port of the RTCP UDP listener +``` + +## Links + +Specifications +* https://tools.ietf.org/html/rfc7826 diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..9dd47e71 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module rtsp-server + +go 1.13 + +require ( + github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect + github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect + github.com/stretchr/testify v1.4.0 + gopkg.in/alecthomas/kingpin.v2 v2.2.6 +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..edf40129 --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go new file mode 100644 index 00000000..6fef8441 --- /dev/null +++ b/main.go @@ -0,0 +1,116 @@ +package main + +import ( + "fmt" + "log" + "net" + "os" + "sync" + + "gopkg.in/alecthomas/kingpin.v2" +) + +var Version string = "v0.0.0" + +type program struct { + rtspPort int + rtpPort int + rtcpPort int + mutex sync.Mutex + rtspl *rtspListener + rtpl *udpListener + rtcpl *udpListener + clients map[*rtspClient]struct{} + streamAuthor *rtspClient + streamSdp []byte +} + +func newProgram(rtspPort int, rtpPort int, rtcpPort int) (*program, error) { + p := &program{ + rtspPort: rtspPort, + rtpPort: rtpPort, + rtcpPort: rtcpPort, + clients: make(map[*rtspClient]struct{}), + } + + var err error + + p.rtpl, err = newUdpListener(rtpPort, "RTP", func(l *udpListener, buf []byte) { + p.mutex.Lock() + defer p.mutex.Unlock() + for c := range p.clients { + if c.state == "PLAY" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtpPort, + }) + } + } + }) + if err != nil { + return nil, err + } + + p.rtcpl, err = newUdpListener(rtcpPort, "RTCP", func(l *udpListener, buf []byte) { + p.mutex.Lock() + defer p.mutex.Unlock() + for c := range p.clients { + if c.state == "PLAY" { + l.nconn.WriteTo(buf, &net.UDPAddr{ + IP: c.IP, + Port: c.rtcpPort, + }) + } + } + }) + if err != nil { + return nil, err + } + + p.rtspl, err = newRtspListener(p) + if err != nil { + return nil, err + } + + return p, nil +} + +func (p *program) run() { + var wg sync.WaitGroup + + wg.Add(1) + go p.rtpl.run(wg) + + wg.Add(1) + go p.rtcpl.run(wg) + + wg.Add(1) + go p.rtspl.run(wg) + + wg.Wait() +} + +func main() { + kingpin.CommandLine.Help = "rtsp-simple-server " + Version + "\n\n" + + "RTSP server." + + version := kingpin.Flag("version", "print rtsp-simple-server version").Bool() + + rtspPort := kingpin.Flag("rtsp-port", "port of the RTSP TCP listener").Default("8554").Int() + rtpPort := kingpin.Flag("rtp-port", "port of the RTP UDP listener").Default("8000").Int() + rtcpPort := kingpin.Flag("rtcp-port", "port of the RTCP UDP listener").Default("8001").Int() + + kingpin.Parse() + + if *version == true { + fmt.Println("rtsp-simple-server " + Version) + os.Exit(0) + } + + p, err := newProgram(*rtspPort, *rtpPort, *rtcpPort) + if err != nil { + log.Fatal("ERR:", err) + } + + p.run() +} diff --git a/rtsp/conn.go b/rtsp/conn.go new file mode 100644 index 00000000..3a5aca7b --- /dev/null +++ b/rtsp/conn.go @@ -0,0 +1,39 @@ +package rtsp + +import ( + "net" +) + +type Conn struct { + c net.Conn +} + +func NewConn(c net.Conn) *Conn { + return &Conn{ + c: c, + } +} + +func (c *Conn) Close() error { + return c.c.Close() +} + +func (c *Conn) RemoteAddr() net.Addr { + return c.c.RemoteAddr() +} + +func (c *Conn) ReadRequest() (*Request, error) { + return requestDecode(c.c) +} + +func (c *Conn) WriteRequest(req *Request) error { + return requestEncode(c.c, req) +} + +func (c *Conn) ReadResponse() (*Response, error) { + return responseDecode(c.c) +} + +func (c *Conn) WriteResponse(res *Response) error { + return responseEncode(c.c, res) +} diff --git a/rtsp/request.go b/rtsp/request.go new file mode 100644 index 00000000..44b08bd1 --- /dev/null +++ b/rtsp/request.go @@ -0,0 +1,88 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" +) + +type Request struct { + Method string + Path string + Headers map[string]string + Content []byte +} + +func requestDecode(r io.Reader) (*Request, error) { + rb := bufio.NewReader(r) + + req := &Request{} + + byts, err := readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + req.Method = string(byts[:len(byts)-1]) + + if len(req.Method) == 0 { + return nil, fmt.Errorf("empty method") + } + + byts, err = readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + req.Path = string(byts[:len(byts)-1]) + + if len(req.Path) == 0 { + return nil, fmt.Errorf("empty path") + } + + byts, err = readBytesLimited(rb, '\r', 255) + if err != nil { + return nil, err + } + proto := string(byts[:len(byts)-1]) + + if proto != _RTSP_PROTO { + return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto) + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + req.Headers, err = readHeaders(rb) + if err != nil { + return nil, err + } + + req.Content, err = readContent(rb, req.Headers) + if err != nil { + return nil, err + } + + return req, nil +} + +func requestEncode(w io.Writer, req *Request) error { + wb := bufio.NewWriter(w) + + _, err := wb.Write([]byte(req.Method + " " + req.Path + " " + _RTSP_PROTO + "\r\n")) + if err != nil { + return err + } + + err = writeHeaders(wb, req.Headers) + if err != nil { + return err + } + + err = writeContent(wb, req.Content) + if err != nil { + return err + } + + return wb.Flush() +} diff --git a/rtsp/request_test.go b/rtsp/request_test.go new file mode 100644 index 00000000..fb910a8d --- /dev/null +++ b/rtsp/request_test.go @@ -0,0 +1,134 @@ +package rtsp + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var casesRequest = []struct { + name string + byts []byte + req *Request +}{ + { + "options", + []byte("OPTIONS rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 1\r\n" + + "Proxy-Require: gzipped-messages\r\n" + + "Require: implicit-play\r\n" + + "\r\n"), + &Request{ + Method: "OPTIONS", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "1", + "Require": "implicit-play", + "Proxy-Require": "gzipped-messages", + }, + }, + }, + { + "describe", + []byte("DESCRIBE rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 2\r\n" + + "\r\n"), + &Request{ + Method: "DESCRIBE", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "2", + }, + }, + }, + { + "announce", + []byte("ANNOUNCE rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 7\r\n" + + "Content-Length: 306\r\n" + + "Content-Type: application/sdp\r\n" + + "Date: 23 Jan 1997 15:35:06 GMT\r\n" + + "Session: 12345678\r\n" + + "\r\n" + + "v=0\n" + + "o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" + + "s=SDP Seminar\n" + + "i=A Seminar on the session description protocol\n" + + "u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" + + "e=mjh@isi.edu (Mark Handley)\n" + + "c=IN IP4 224.2.17.12/127\n" + + "t=2873397496 2873404696\n" + + "a=recvonly\n" + + "m=audio 3456 RTP/AVP 0\n" + + "m=video 2232 RTP/AVP 31\n"), + &Request{ + Method: "ANNOUNCE", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "7", + "Date": "23 Jan 1997 15:35:06 GMT", + "Session": "12345678", + "Content-Type": "application/sdp", + "Content-Length": "306", + }, + Content: []byte("v=0\n" + + "o=mhandley 2890844526 2890845468 IN IP4 126.16.64.4\n" + + "s=SDP Seminar\n" + + "i=A Seminar on the session description protocol\n" + + "u=http://www.cs.ucl.ac.uk/staff/M.Handley/sdp.03.ps\n" + + "e=mjh@isi.edu (Mark Handley)\n" + + "c=IN IP4 224.2.17.12/127\n" + + "t=2873397496 2873404696\n" + + "a=recvonly\n" + + "m=audio 3456 RTP/AVP 0\n" + + "m=video 2232 RTP/AVP 31\n", + ), + }, + }, + { + "get_parameter", + []byte("GET_PARAMETER rtsp://example.com/media.mp4 RTSP/1.0\r\n" + + "CSeq: 9\r\n" + + "Content-Length: 24\r\n" + + "Content-Type: text/parameters\r\n" + + "Session: 12345678\r\n" + + "\r\n" + + "packets_received\n" + + "jitter\n"), + &Request{ + Method: "GET_PARAMETER", + Path: "rtsp://example.com/media.mp4", + Headers: map[string]string{ + "CSeq": "9", + "Content-Type": "text/parameters", + "Session": "12345678", + "Content-Length": "24", + }, + Content: []byte("packets_received\n" + + "jitter\n", + ), + }, + }, +} + +func TestRequestDecode(t *testing.T) { + for _, c := range casesRequest { + t.Run(c.name, func(t *testing.T) { + req, err := requestDecode(bytes.NewBuffer(c.byts)) + require.NoError(t, err) + require.Equal(t, c.req, req) + }) + } +} + +func TestRequestEncode(t *testing.T) { + for _, c := range casesRequest { + t.Run(c.name, func(t *testing.T) { + var buf bytes.Buffer + err := requestEncode(&buf, c.req) + require.NoError(t, err) + require.Equal(t, c.byts, buf.Bytes()) + }) + } +} diff --git a/rtsp/response.go b/rtsp/response.go new file mode 100644 index 00000000..3edf1789 --- /dev/null +++ b/rtsp/response.go @@ -0,0 +1,95 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" + "strconv" +) + +type Response struct { + StatusCode int + Status string + Headers map[string]string + Content []byte +} + +func responseDecode(r io.Reader) (*Response, error) { + rb := bufio.NewReader(r) + + res := &Response{} + + byts, err := readBytesLimited(rb, ' ', 255) + if err != nil { + return nil, err + } + proto := string(byts[:len(byts)-1]) + + if proto != _RTSP_PROTO { + return nil, fmt.Errorf("expected '%s', got '%s'", _RTSP_PROTO, proto) + } + + byts, err = readBytesLimited(rb, ' ', 4) + if err != nil { + return nil, err + } + statusCodeStr := string(byts[:len(byts)-1]) + + statusCode64, err := strconv.ParseInt(statusCodeStr, 10, 32) + res.StatusCode = int(statusCode64) + if err != nil { + return nil, fmt.Errorf("unable to parse status code") + } + + byts, err = readBytesLimited(rb, '\r', 255) + if err != nil { + return nil, err + } + res.Status = string(byts[:len(byts)-1]) + + if len(res.Status) == 0 { + return nil, fmt.Errorf("empty status") + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + res.Headers, err = readHeaders(rb) + if err != nil { + return nil, err + } + + res.Content, err = readContent(rb, res.Headers) + if err != nil { + return nil, err + } + + return res, nil +} + +func responseEncode(w io.Writer, res *Response) error { + wb := bufio.NewWriter(w) + + _, err := wb.Write([]byte(_RTSP_PROTO + " " + strconv.FormatInt(int64(res.StatusCode), 10) + " " + res.Status + "\r\n")) + if err != nil { + return err + } + + if len(res.Content) != 0 { + res.Headers["Content-Length"] = strconv.FormatInt(int64(len(res.Content)), 10) + } + + err = writeHeaders(wb, res.Headers) + if err != nil { + return err + } + + err = writeContent(wb, res.Content) + if err != nil { + return err + } + + return wb.Flush() +} diff --git a/rtsp/response_test.go b/rtsp/response_test.go new file mode 100644 index 00000000..087456fc --- /dev/null +++ b/rtsp/response_test.go @@ -0,0 +1,105 @@ +package rtsp + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +var casesResponse = []struct { + name string + byts []byte + res *Response +}{ + { + "ok", + []byte("RTSP/1.0 200 OK\r\n" + + "CSeq: 1\r\n" + + "Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n" + + "\r\n", + ), + &Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": "1", + "Public": "DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE", + }, + }, + }, + { + "ok with content", + []byte("RTSP/1.0 200 OK\r\n" + + "CSeq: 2\r\n" + + "Content-Base: rtsp://example.com/media.mp4\r\n" + + "Content-Length: 444\r\n" + + "Content-Type: application/sdp\r\n" + + "\r\n" + + "m=video 0 RTP/AVP 96\n" + + "a=control:streamid=0\n" + + "a=range:npt=0-7.741000\n" + + "a=length:npt=7.741000\n" + + "a=rtpmap:96 MP4V-ES/5544\n" + + "a=mimetype:string;\"video/MP4V-ES\"\n" + + "a=AvgBitRate:integer;304018\n" + + "a=StreamName:string;\"hinted video track\"\n" + + "m=audio 0 RTP/AVP 97\n" + + "a=control:streamid=1\n" + + "a=range:npt=0-7.712000\n" + + "a=length:npt=7.712000\n" + + "a=rtpmap:97 mpeg4-generic/32000/2\n" + + "a=mimetype:string;\"audio/mpeg4-generic\"\n" + + "a=AvgBitRate:integer;65790\n" + + "a=StreamName:string;\"hinted audio track\"\n", + ), + &Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "Content-Base": "rtsp://example.com/media.mp4", + "Content-Length": "444", + "Content-Type": "application/sdp", + "CSeq": "2", + }, + Content: []byte("m=video 0 RTP/AVP 96\n" + + "a=control:streamid=0\n" + + "a=range:npt=0-7.741000\n" + + "a=length:npt=7.741000\n" + + "a=rtpmap:96 MP4V-ES/5544\n" + + "a=mimetype:string;\"video/MP4V-ES\"\n" + + "a=AvgBitRate:integer;304018\n" + + "a=StreamName:string;\"hinted video track\"\n" + + "m=audio 0 RTP/AVP 97\n" + + "a=control:streamid=1\n" + + "a=range:npt=0-7.712000\n" + + "a=length:npt=7.712000\n" + + "a=rtpmap:97 mpeg4-generic/32000/2\n" + + "a=mimetype:string;\"audio/mpeg4-generic\"\n" + + "a=AvgBitRate:integer;65790\n" + + "a=StreamName:string;\"hinted audio track\"\n", + ), + }, + }, +} + +func TestResponseDecode(t *testing.T) { + for _, c := range casesResponse { + t.Run(c.name, func(t *testing.T) { + res, err := responseDecode(bytes.NewBuffer(c.byts)) + require.NoError(t, err) + require.Equal(t, c.res, res) + }) + } +} + +func TestResponseEncode(t *testing.T) { + for _, c := range casesResponse { + t.Run(c.name, func(t *testing.T) { + var buf bytes.Buffer + err := responseEncode(&buf, c.res) + require.NoError(t, err) + require.Equal(t, c.byts, buf.Bytes()) + }) + } +} diff --git a/rtsp/utils.go b/rtsp/utils.go new file mode 100644 index 00000000..0ec613fc --- /dev/null +++ b/rtsp/utils.go @@ -0,0 +1,161 @@ +package rtsp + +import ( + "bufio" + "fmt" + "io" + "sort" + "strconv" +) + +const ( + _RTSP_PROTO = "RTSP/1.0" + _MAX_HEADER_COUNT = 255 + _MAX_HEADER_KEY_LENGTH = 255 + _MAX_HEADER_VALUE_LENGTH = 255 + _MAX_CONTENT_LENGTH = 4096 +) + +func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) { + for i := 1; i <= n; i++ { + byts, err := rb.Peek(i) + if err != nil { + return nil, err + } + + if byts[len(byts)-1] == delim { + rb.Discard(len(byts)) + return byts, nil + } + } + return nil, fmt.Errorf("buffer length exceeds %d", n) +} + +func readByteEqual(rb *bufio.Reader, cmp byte) error { + byt, err := rb.ReadByte() + if err != nil { + return err + } + + if byt != cmp { + return fmt.Errorf("expected '%c', got '%c'", cmp, byt) + } + + return nil +} + +func readHeaders(rb *bufio.Reader) (map[string]string, error) { + ret := make(map[string]string) + + for { + byt, err := rb.ReadByte() + if err != nil { + return nil, err + } + + if byt == '\r' { + err := readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + break + } + + if len(ret) >= _MAX_HEADER_COUNT { + return nil, fmt.Errorf("headers count exceeds %d", _MAX_HEADER_COUNT) + } + + key := string([]byte{byt}) + byts, err := readBytesLimited(rb, ':', _MAX_HEADER_KEY_LENGTH-1) + if err != nil { + return nil, err + } + key += string(byts[:len(byts)-1]) + + err = readByteEqual(rb, ' ') + if err != nil { + return nil, err + } + + byts, err = readBytesLimited(rb, '\r', _MAX_HEADER_VALUE_LENGTH) + if err != nil { + return nil, err + } + val := string(byts[:len(byts)-1]) + + if len(val) == 0 { + return nil, fmt.Errorf("empty header value") + } + + err = readByteEqual(rb, '\n') + if err != nil { + return nil, err + } + + ret[key] = val + } + + return ret, nil +} + +func writeHeaders(wb *bufio.Writer, headers map[string]string) error { + // sort headers by key + // in order to obtain deterministic results + var keys []string + for key := range headers { + keys = append(keys, key) + } + sort.Strings(keys) + + for _, key := range keys { + _, err := wb.Write([]byte(key + ": " + headers[key] + "\r\n")) + if err != nil { + return err + } + } + + _, err := wb.Write([]byte("\r\n")) + if err != nil { + return err + } + + return nil +} + +func readContent(rb *bufio.Reader, headers map[string]string) ([]byte, error) { + cls, ok := headers["Content-Length"] + if !ok { + return nil, nil + } + + cl, err := strconv.ParseInt(cls, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Length") + } + + if cl > _MAX_CONTENT_LENGTH { + return nil, fmt.Errorf("Content-Length exceeds %d", _MAX_CONTENT_LENGTH) + } + + ret := make([]byte, cl) + n, err := io.ReadFull(rb, ret) + if err != nil && n != len(ret) { + return nil, err + } + + return ret, nil +} + +func writeContent(wb *bufio.Writer, content []byte) error { + if len(content) == 0 { + return nil + } + + _, err := wb.Write(content) + if err != nil { + return err + } + + return nil +} diff --git a/rtsp_client.go b/rtsp_client.go new file mode 100644 index 00000000..c3410615 --- /dev/null +++ b/rtsp_client.go @@ -0,0 +1,414 @@ +package main + +import ( + "fmt" + "io" + "log" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "rtsp-server/rtsp" +) + +type rtspClient struct { + p *program + rconn *rtsp.Conn + state string + IP net.IP + rtpPort int + rtcpPort int +} + +func newRtspClient(p *program, rconn *rtsp.Conn) *rtspClient { + c := &rtspClient{ + p: p, + rconn: rconn, + state: "STARTING", + } + + c.p.mutex.Lock() + c.p.clients[c] = struct{}{} + c.p.mutex.Unlock() + + return c +} + +func (c *rtspClient) close() error { + delete(c.p.clients, c) + + if c.p.streamAuthor == c { + c.p.streamAuthor = nil + c.p.streamSdp = nil + + // if the streamer has disconnected + // close all other connections + for oc := range c.p.clients { + oc.close() + } + } + + return c.rconn.Close() +} + +func (c *rtspClient) log(format string, args ...interface{}) { + format = "[RTSP client " + c.rconn.RemoteAddr().String() + "] " + format + log.Printf(format, args...) +} + +func (c *rtspClient) run(wg sync.WaitGroup) { + defer wg.Done() + defer c.log("disconnected") + defer func() { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + c.close() + }() + + c.log("connected") + + for { + req, err := c.rconn.ReadRequest() + if err != nil { + if err != io.EOF { + c.log("ERR: %s", err) + } + return + } + + c.log(req.Method) + + cseq, ok := req.Headers["CSeq"] + if !ok { + c.log("ERR: cseq missing") + return + } + + ur, err := url.Parse(req.Path) + if err != nil { + c.log("ERR: unable to parse path '%s'", req.Path) + return + } + + switch req.Method { + case "OPTIONS": + // do not check state, since OPTIONS can be requested + // in any state + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Public": strings.Join([]string{ + "DESCRIBE", + "SETUP", + "PLAY", + "PAUSE", + "TEARDOWN", + }, ", "), + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + case "DESCRIBE": + if c.state != "STARTING" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + sdp, err := func() ([]byte, error) { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if len(c.p.streamSdp) == 0 { + return nil, fmt.Errorf("no one is streaming") + } + + return c.p.streamSdp, nil + }() + if err != nil { + c.log("ERR: %s", err) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Content-Base": ur.String(), + "Content-Type": "application/sdp", + }, + Content: sdp, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + case "SETUP": + transport, ok := req.Headers["Transport"] + if !ok { + c.log("ERR: transport header missing") + return + } + + transports := strings.Split(transport, ";") + + ok = func() bool { + for _, t := range transports { + if t == "unicast" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain unicast") + return + } + + clientPort1, clientPort2 := func() (int, int) { + for _, t := range transports { + if !strings.HasPrefix(t, "client_port=") { + continue + } + t = t[len("client_port="):] + + ports := strings.Split(t, "-") + if len(ports) != 2 { + return 0, 0 + } + + port1, err := strconv.ParseInt(ports[0], 10, 64) + if err != nil { + return 0, 0 + } + + port2, err := strconv.ParseInt(ports[1], 10, 64) + if err != nil { + return 0, 0 + } + + return int(port1), int(port2) + } + return 0, 0 + }() + if clientPort1 == 0 || clientPort2 == 0 { + c.log("ERR: transport header does not have valid client ports (%s)", transport) + return + } + + switch c.state { + // play + case "STARTING": + ok = func() bool { + for _, t := range transports { + if t == "RTP/AVP" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain RTP/AVP") + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_PLAY" + c.p.mutex.Unlock() + + // record + case "ANNOUNCE": + ok = func() bool { + for _, t := range transports { + if t == "RTP/AVP/UDP" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain RTP/AVP/UDP") + return + } + + ok = func() bool { + for _, t := range transports { + if t == "mode=record" { + return true + } + } + return false + }() + if !ok { + c.log("ERR: transport header does not contain mode=record") + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Transport": strings.Join([]string{ + "RTP/AVP", + "unicast", + fmt.Sprintf("client_port=%d-%d", clientPort1, clientPort2), + fmt.Sprintf("server_port=%d-%d", c.p.rtpPort, c.p.rtcpPort), + "ssrc=1234ABCD", + }, ";"), + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + ipstr, _, _ := net.SplitHostPort(c.rconn.RemoteAddr().String()) + c.IP = net.ParseIP(ipstr) + c.rtpPort = clientPort1 + c.rtcpPort = clientPort2 + c.state = "PRE_RECORD" + c.p.mutex.Unlock() + + default: + c.log("ERR: client is in state '%s'", c.state) + return + } + + case "PLAY": + if c.state != "PRE_PLAY" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "PLAY" + c.p.mutex.Unlock() + + case "RECORD": + if c.state != "PRE_RECORD" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + "Session": "12345678", + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "RECORD" + c.p.mutex.Unlock() + + case "ANNOUNCE": + if c.state != "STARTING" { + c.log("ERR: client is in state '%s'", c.state) + return + } + + ct, ok := req.Headers["Content-Type"] + if !ok { + c.log("ERR: Content-Type header missing") + return + } + + if ct != "application/sdp" { + c.log("ERR: unsupported Content-Type '%s'", ct) + return + } + + err := func() error { + c.p.mutex.Lock() + defer c.p.mutex.Unlock() + + if c.p.streamAuthor != nil { + return fmt.Errorf("another client is already streaming") + } + + c.p.streamAuthor = c + c.p.streamSdp = req.Content + return nil + }() + if err != nil { + c.log("ERR: %s", err) + return + } + + err = c.rconn.WriteResponse(&rtsp.Response{ + StatusCode: 200, + Status: "OK", + Headers: map[string]string{ + "CSeq": cseq, + }, + }) + if err != nil { + c.log("ERR: %s", err) + return + } + + c.p.mutex.Lock() + c.state = "ANNOUNCE" + c.p.mutex.Unlock() + + case "TEARDOWN": + return + + default: + c.log("ERR: method %s unhandled", req.Method) + } + } +} diff --git a/rtsp_listener.go b/rtsp_listener.go new file mode 100644 index 00000000..a677296b --- /dev/null +++ b/rtsp_listener.go @@ -0,0 +1,51 @@ +package main + +import ( + "log" + "net" + "sync" + + "rtsp-server/rtsp" +) + +type rtspListener struct { + p *program + netl *net.TCPListener +} + +func newRtspListener(p *program) (*rtspListener, error) { + netl, err := net.ListenTCP("tcp", &net.TCPAddr{ + Port: p.rtspPort, + }) + if err != nil { + return nil, err + } + + s := &rtspListener{ + p: p, + netl: netl, + } + + s.log("opened on :%d", p.rtspPort) + return s, nil +} + +func (l *rtspListener) log(format string, args ...interface{}) { + log.Printf("[RTSP listener] "+format, args...) +} + +func (l *rtspListener) run(wg sync.WaitGroup) { + defer wg.Done() + + for { + conn, err := l.netl.AcceptTCP() + if err != nil { + break + } + + rconn := rtsp.NewConn(conn) + rsc := newRtspClient(l.p, rconn) + wg.Add(1) + go rsc.run(wg) + } +} diff --git a/udp_listener.go b/udp_listener.go new file mode 100644 index 00000000..4aabbd27 --- /dev/null +++ b/udp_listener.go @@ -0,0 +1,53 @@ +package main + +import ( + "log" + "net" + "sync" +) + +type udpListenerCb func(*udpListener, []byte) + +type udpListener struct { + nconn *net.UDPConn + logPrefix string + cb udpListenerCb +} + +func newUdpListener(port int, logPrefix string, cb udpListenerCb) (*udpListener, error) { + nconn, err := net.ListenUDP("udp", &net.UDPAddr{ + Port: port, + }) + if err != nil { + return nil, err + } + + l := &udpListener{ + nconn: nconn, + logPrefix: logPrefix, + cb: cb, + } + + l.log("opened on :%d", port) + return l, nil +} + +func (l *udpListener) log(format string, args ...interface{}) { + log.Printf("["+l.logPrefix+" listener] "+format, args...) +} + +func (l *udpListener) run(wg sync.WaitGroup) { + defer wg.Done() + + buf := make([]byte, 2048) // UDP MTU is 1400 + + for { + n, _, err := l.nconn.ReadFromUDP(buf) + if err != nil { + l.log("ERR: %s", err) + break + } + + l.cb(l, buf[:n]) + } +}