diff --git a/README.md b/README.md index c05e4bad..fe5dde94 100644 --- a/README.md +++ b/README.md @@ -20,9 +20,11 @@ Live streams can be published to the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[WebRTC](#webrtc)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec| -|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec| +|[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC)| +|[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC)| +|[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711| +|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| +|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[RTMP cameras and servers](#rtmp-cameras-and-servers)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS cameras and servers](#hls-cameras-and-servers)|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)| @@ -33,8 +35,9 @@ And can be read from the server with: |protocol|variants|video codecs|audio codecs| |--------|--------|------------|------------| -|[WebRTC](#webrtc-1)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| -|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G722, G711, LPCM and any RTP-compatible codec| +|[SRT](#srt)||H265, H264|Opus, MPEG-4 Audio (AAC)| +|[WebRTC](#webrtc)|Browser-based, WHEP|AV1, VP9, VP8, H264|Opus, G722, G711| +|[RTSP](#rtsp)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec| |[RTMP](#rtmp)|RTMP, RTMPS, Enhanced RTMP|H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)| |[HLS](#hls)|Low-Latency HLS, MP4-based HLS, legacy HLS|H265, H264|Opus, MPEG-4 Audio (AAC)| @@ -76,7 +79,9 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi * [Generic webcam](#generic-webcam) * [Raspberry Pi Cameras](#raspberry-pi-cameras) * [By protocol](#by-protocol) - * [WebRTC](#webrtc) + * [SRT clients](#srt-clients) + * [SRT servers](#srt-servers) + * [WebRTC clients](#webrtc-clients) * [RTSP clients](#rtsp-clients) * [RTSP cameras and servers](#rtsp-cameras-and-servers) * [RTMP clients](#rtmp-clients) @@ -90,7 +95,8 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi * [VLC](#vlc) * [Web browsers](#web-browsers-1) * [By protocol](#by-protocol-1) - * [WebRTC](#webrtc-1) + * [SRT](#srt) + * [WebRTC](#webrtc) * [RTSP](#rtsp) * [RTMP](#rtmp) * [HLS](#hls) @@ -157,6 +163,7 @@ docker run --rm -it \ -p 1935:1935 \ -p 8888:8888 \ -p 8889:8889 \ +-p 8890:8890/udp \ bluenviron/mediamtx ``` @@ -243,7 +250,7 @@ makepkg -si #### FFmpeg -FFmpeg can publish a stream to the server in multiple ways (RTSP client, RTMP client, UDP/MPEG-TS, WebRTC with WHIP). The recommended one consists in publishing as a [RTSP client](#rtsp-clients): +FFmpeg can publish a stream to the server in multiple ways (SRT client, SRT server, RTSP client, RTMP client, UDP/MPEG-TS, WebRTC with WHIP). The recommended one consists in publishing as a [RTSP client](#rtsp-clients): ``` ffmpeg -re -stream_loop -1 -i file.ts -c copy -f rtsp rtsp://localhost:8554/mystream @@ -259,7 +266,7 @@ The resulting stream will be available in path `/mystream`. #### GStreamer -GStreamer can publish a stream to the server in multiple ways (RTSP client, RTMP client, UDP/MPEG-TS, WebRTC with WHIP). The recommended one consists in publishing as a [RTSP client](#rtsp-clients): +GStreamer can publish a stream to the server in multiple ways (SRT client, SRT server, RTSP client, RTMP client, UDP/MPEG-TS, WebRTC with WHIP). The recommended one consists in publishing as a [RTSP client](#rtsp-clients): ```sh gst-launch-1.0 rtspclientsink name=s location=rtsp://localhost:8554/mystream \ @@ -286,7 +293,7 @@ The resulting stream will be available in path `/mystream`. #### OBS Studio -OBS Studio can publish to the server as a [RTMP client](#rtmp-clients). In `Settings -> Stream` (or in the Auto-configuration Wizard), use the following parameters: +OBS Studio can publish to the server in multiple ways (SRT client, RTMP client, WebRTC client). The recommended one consists in publishing as a [RTMP client](#rtmp-clients). In `Settings -> Stream` (or in the Auto-configuration Wizard), use the following parameters: * Service: `Custom...` * Server: `rtmp://localhost` @@ -324,7 +331,7 @@ The resulting stream will be available in path `/mystream`. #### OpenCV -OpenCV can publish to the server as a [RTSP client](#rtsp-clients). It must be compiled with GStreamer support, by following this procedure: +OpenCV can publish to the server through its GStreamer plugin, as a [RTSP client](#rtsp-clients). It must be compiled with GStreamer support, by following this procedure: ```sh sudo apt install -y libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-ugly gstreamer1.0-rtsp python3-dev python3-numpy @@ -535,7 +542,38 @@ The resulting stream will be available in path `/cam_with_audio`. ### By protocol -#### WebRTC +#### SRT clients + +SRT is a protocol that allows to publish and read live data stream, providing encryption, integrity and a retransmission mechanism. It is usually used to transfer media streams encoded with MPEG-TS. In order to publish a stream to the server with the SRT protocol, use this URL: + +``` +srt://localhost:8890?streamid=publish:mystream&pkt_size=1316 +``` + +Replace `mystream` with any name you want. The resulting stream will be available in path `/mystream`. + +If credentials are enabled, append username and password to `streamid`; + +``` +srt://localhost:8890?streamid=publish:mystream:user:pass&pkt_size=1316 +``` + +If you want to publish a stream by using a client in listening mode (i.e. with `mode=listener` appended to the URL), read the next section. + +Known clients that can publish with SRT are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio). + +#### SRT servers + +In order to ingest into the server a SRT stream from an existing server, camera or client in listening mode (i.e. with `mode=listener` appended to the URL), add the corresponding URL into the `source` parameter of a path: + +```yml +paths: + proxied: + # url of the source stream, in the format srt://host:port?streamid=streamid&other_parameters + source: srt://original-url +``` + +#### WebRTC clients WebRTC is an API that makes use of a set of protocols and methods to connect two clients together and allow them to exchange real-time media or data streams. You can publish a stream with WebRTC and a web browser by visiting: @@ -553,9 +591,11 @@ http://localhost:8889/mystream/whip Depending on the network it may be difficult to establish a connection between server and clients, see [WebRTC-specific features](#webrtc-specific-features) for remediations. +Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio). + #### RTSP clients -RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to publish a stream with the RTSP protocol, you can use this URL: +RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to publish a stream to the server with the RTSP protocol, use this URL: ``` rtsp://localhost:8554/mystream @@ -563,6 +603,8 @@ rtsp://localhost:8554/mystream The resulting stream will be available in path `/mystream`. +Known clients that can publish with RTSP are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio). + #### RTSP cameras and servers Most IP cameras expose their video stream by using a RTSP server that is embedded into the camera itself. You can use _MediaMTX_ to connect to one or multiple existing RTSP servers and read their video streams: @@ -603,6 +645,8 @@ In case authentication is enabled, credentials can be passed to the server by us rtmp://localhost/mystream?user=myuser&pass=mypass ``` +Known clients that can publish with RTMP are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio). + #### RTMP cameras and servers You can use _MediaMTX_ to connect to one or multiple existing RTMP servers and read their video streams: @@ -657,13 +701,15 @@ paths: The resulting stream will be available in path `/mypath`. +Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg) and [Gstreamer](#gstreamer). + ## Read from the server ### By software #### FFmpeg -FFmpeg can read a stream from the server in multiple ways (RTSP, RTMP, HLS, WebRTC with WHEP). The recommended one consists in reading with [RTSP](#rtsp): +FFmpeg can read a stream from the server in multiple ways (RTSP, RTMP, HLS, WebRTC with WHEP, SRT). The recommended one consists in reading with [RTSP](#rtsp): ```sh ffmpeg -i rtsp://localhost:8554/mystream -c copy output.mp4 @@ -677,7 +723,7 @@ ffmpeg -rtsp_transport tcp -i rtsp://localhost:8554/mystream -c copy output.mp4 #### GStreamer -GStreamer can read a stream from the server in multiple ways (RTSP, RTMP, HLS, WebRTC with WHEP). The recommended one consists in reading with [RTSP](#rtsp): +GStreamer can read a stream from the server in multiple ways (RTSP, RTMP, HLS, WebRTC with WHEP, SRT). The recommended one consists in reading with [RTSP](#rtsp): ```sh gst-launch-1.0 rtspsrc location=rtsp://127.0.0.1:8554/mystream latency=0 ! decodebin ! autovideosink @@ -697,7 +743,7 @@ gst-launch-1.0 rtspsrc tls-validation-flags=0 location=rtsps://ip:8322/... #### VLC -VLC can read a stream from the server in multiple ways (RTSP, RTMP, HLS). The recommended one consists in reading with [RTSP](#rtsp): +VLC can read a stream from the server in multiple ways (RTSP, RTMP, HLS, SRT). The recommended one consists in reading with [RTSP](#rtsp): ```sh vlc --network-caching=50 rtsp://localhost:8554/mystream @@ -764,6 +810,24 @@ This web page can be embedded into another web page by using an iframe: ### By protocol +#### SRT + +SRT is a protocol that allows to publish and read live data stream, providing encryption, integrity and a retransmission mechanism. It is usually used to transfer media streams encoded with MPEG-TS. In order to read a stream from the server with the SRT protocol, use this URL: + +``` +srt://localhost:8890?streamid=read:mystream&pkt_size=1316 +``` + +Replace `mystream` with the path name. + +If credentials are enabled, append username and password to `streamid`; + +``` +srt://localhost:8890?streamid=publish:mystream:user:pass&pkt_size=1316 +``` + +Known clients that can read with SRT are [FFmpeg](#ffmpeg-1), [Gstreamer](#gstreamer-1) and [VLC](#vlc). + #### WebRTC WebRTC is an API that makes use of a set of protocols and methods to connect two clients together and allow them to exchange real-time media or data streams. You can read a stream with WebRTC and a web browser by visiting: @@ -780,14 +844,18 @@ http://localhost:8889/mystream/whep Depending on the network it may be difficult to establish a connection between server and clients, see [WebRTC-specific features](#webrtc-specific-features) for remediations. +Known clients that can read with WebRTC and WHEP are [FFmpeg](#ffmpeg-1), [Gstreamer](#gstreamer-1) and [web browsers](#web-browsers-1). + #### RTSP -RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to read a stream with the RTSP protocol, you can use this URL: +RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to read a stream with the RTSP protocol, use this URL: ``` rtsp://localhost:8554/mystream ``` +Known clients that can read with SRT are [FFmpeg](#ffmpeg-1), [Gstreamer](#gstreamer-1) and [VLC](#vlc). + ##### Latency The RTSP protocol doesn't introduce any latency by itself. Latency is usually introduced by clients, that put frames in a buffer to compensate network fluctuations. In order to decrease latency, the best way consists in tuning the client. For instance, latency can be decreased with VLC by decreasing the Network caching parameter, that is available in the Open network stream dialog or alternatively ca be set with the command line: @@ -810,6 +878,8 @@ In case authentication is enabled, credentials can be passed to the server by us rtmp://localhost/mystream?user=myuser&pass=mypass ``` +Known clients that can read with RTMP are [FFmpeg](#ffmpeg-1), [Gstreamer](#gstreamer-1) and [VLC](#vlc). + #### HLS HLS is a protocol that works by splitting streams into segments, and by serving these segments and a playlist with the HTTP protocol. You can use _MediaMTX_ to generate a HLS stream, that is accessible through a web page: @@ -824,9 +894,9 @@ and can also be accessed without using the browsers, by software that supports t http://localhost:8888/mystream/index.m3u8 ``` -Although the server can produce HLS with a variety of video and audio codecs (that are listed at the beginning of the README), not all browsers can read all codecs. You can check what codecs your browser can read by visiting this page: +Although the server can produce HLS with a variety of video and audio codecs (that are listed at the beginning of the README), not all browsers can read all codecs. - +You can check what codecs your browser can read by [using this tool](https://jsfiddle.net/4msrhudv). If you want to support most browsers, you can to re-encode the stream by using the H264 and AAC codecs, for instance by using FFmpeg: @@ -837,6 +907,8 @@ ffmpeg -i rtsp://original-source \ -f rtsp rtsp://localhost:8554/mystream ``` +Known clients that can read with HLS are [FFmpeg](#ffmpeg-1), [Gstreamer](#gstreamer-1), [VLC](#vlc) and [web browsers](#web-browsers-1). + ##### LL-HLS Low-Latency HLS is a recently standardized variant of the protocol that allows to greatly reduce playback latency. It works by splitting segments into parts, that are served before the segment is complete. LL-HLS is enabled by default. If the stream is not shown correctly, try tuning the hlsPartDuration parameter, for instance: @@ -1050,7 +1122,7 @@ paths: ### Save streams on disk -To save available streams on disk, you can use the `runOnReady` parameter and _FFmpeg_: +To save available streams on disk, use the `runOnReady` parameter and _FFmpeg_: ```yml paths: @@ -1409,24 +1481,30 @@ The command will produce tarballs in folder `binaries/`. ## Standards * RTSP + * [RTSP / RTP / RTCP standards](https://github.com/bluenviron/gortsplib#standards) * HLS + * [HLS standards](https://github.com/bluenviron/gohlslib#standards) * RTMP + * [RTMP](https://rtmp.veriskope.com/pdf/rtmp_specification_1.0.pdf) * [Enhanced RTMP](https://raw.githubusercontent.com/veovera/enhanced-rtmp/main/enhanced-rtmp-v1.pdf) * WebRTC + * [WebRTC: Real-Time Communication in Browsers](https://www.w3.org/TR/webrtc/) * [WebRTC HTTP Ingestion Protocol (WHIP)](https://datatracker.ietf.org/doc/draft-ietf-wish-whip/) * [WebRTC HTTP Egress Protocol (WHEP)](https://datatracker.ietf.org/doc/draft-murillo-whep/) * Video and audio codecs + * [Codec standards](https://github.com/bluenviron/mediacommon#standards) * Other + * [Golang project layout](https://github.com/golang-standards/project-layout) ## Related projects @@ -1434,6 +1512,7 @@ The command will produce tarballs in folder `binaries/`. * [gortsplib (RTSP library used internally)](https://github.com/bluenviron/gortsplib) * [gohlslib (HLS library used internally)](https://github.com/bluenviron/gohlslib) * [mediacommon (codecs and formats library used internally)](https://github.com/bluenviron/mediacommon) +* [datarhei/gosrt (SRT library used internally)](https://github.com/datarhei/gosrt) * [pion/webrtc (WebRTC library used internally)](https://github.com/pion/webrtc) * [pion/sdp (SDP library used internally)](https://github.com/pion/sdp) * [pion/rtp (RTP library used internally)](https://github.com/pion/rtp) diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index c8cdc934..3ffddf52 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -169,6 +169,12 @@ components: webrtcICETCPMuxAddress: type: string + # srt + srt: + type: boolean + srtAddress: + type: string + # paths paths: type: object @@ -348,6 +354,16 @@ components: items: $ref: '#/components/schemas/PathSourceOrReader' + PathsList: + type: object + properties: + pageCount: + type: integer + items: + type: array + items: + $ref: '#/components/schemas/Path' + PathSourceOrReader: type: object properties: @@ -356,6 +372,7 @@ components: enum: - hlsMuxer - hlsSource + - redirect - rpiCameraSource - rtmpConn - rtmpSource @@ -363,48 +380,35 @@ components: - rtspSession - rtspSource - rtspsSession - - redirect + - srtConn + - srtSource - udpSource - webRTCSession id: type: string - RTSPConn: + HLSMuxer: type: object properties: - id: + path: type: string created: type: string - remoteAddr: + lastRequest: type: string - bytesReceived: - type: integer - format: int64 bytesSent: type: integer format: int64 - RTSPSession: + HLSMuxersList: type: object properties: - id: - type: string - created: - type: string - remoteAddr: - type: string - state: - type: string - enum: [idle, read, publish] - path: - type: string - bytesReceived: - type: integer - format: int64 - bytesSent: + pageCount: type: integer - format: int64 + items: + type: array + items: + $ref: '#/components/schemas/HLSMuxer' RTMPConn: type: object @@ -427,20 +431,33 @@ components: type: integer format: int64 - HLSMuxer: + RTMPConnsList: type: object properties: - path: + pageCount: + type: integer + items: + type: array + items: + $ref: '#/components/schemas/RTMPConn' + + RTSPConn: + type: object + properties: + id: type: string created: type: string - lastRequest: + remoteAddr: type: string + bytesReceived: + type: integer + format: int64 bytesSent: type: integer format: int64 - HLSMuxersList: + RTSPConnsList: type: object properties: pageCount: @@ -448,19 +465,30 @@ components: items: type: array items: - $ref: '#/components/schemas/HLSMuxer' + $ref: '#/components/schemas/RTSPConn' - PathsList: + RTSPSession: type: object properties: - pageCount: + id: + type: string + created: + type: string + remoteAddr: + type: string + state: + type: string + enum: [idle, read, publish] + path: + type: string + bytesReceived: type: integer - items: - type: array - items: - $ref: '#/components/schemas/Path' + format: int64 + bytesSent: + type: integer + format: int64 - RTMPConnsList: + RTSPSessionsList: type: object properties: pageCount: @@ -468,19 +496,30 @@ components: items: type: array items: - $ref: '#/components/schemas/RTMPConn' + $ref: '#/components/schemas/RTSPSession' - RTSPConnsList: + SRTConn: type: object properties: - pageCount: + id: + type: string + created: + type: string + remoteAddr: + type: string + state: + type: string + enum: [idle, read, publish] + path: + type: string + bytesReceived: type: integer - items: - type: array - items: - $ref: '#/components/schemas/RTSPConn' + format: int64 + bytesSent: + type: integer + format: int64 - RTSPSessionsList: + SRTConnsList: type: object properties: pageCount: @@ -488,7 +527,7 @@ components: items: type: array items: - $ref: '#/components/schemas/RTSPSession' + $ref: '#/components/schemas/SRTConn' WebRTCSession: type: object @@ -1178,6 +1217,84 @@ paths: '500': description: internal server error. + /v2/srtconns/list: + get: + operationId: srtConnsList + summary: returns all SRT connections. + description: '' + parameters: + - name: page + in: query + description: page number. + schema: + type: number + default: 0 + - name: itemsPerPage + in: query + description: items per page. + schema: + type: number + default: 100 + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/SRTConnsList' + '400': + description: invalid request. + '500': + description: internal server error. + + /v2/srtconns/get/{id}: + get: + operationId: srtConnsGet + summary: returns a SRT connection. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/SRTConn' + '400': + description: invalid request. + '404': + description: connection not found. + '500': + description: internal server error. + + /v2/srtconns/kick/{id}: + post: + operationId: srtConnsKick + summary: kicks out a SRT connection from the server. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + '400': + description: invalid request. + '404': + description: connection not found. + '500': + description: internal server error. + /v2/webrtcsessions/list: get: operationId: webrtcSessionsList diff --git a/go.mod b/go.mod index 494e10e7..a1c2d9ec 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( code.cloudfoundry.org/bytefmt v0.0.0 github.com/abema/go-mp4 v0.11.0 github.com/alecthomas/kong v0.8.0 - github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230730204034-8b8b52e689d9 github.com/bluenviron/mediacommon v0.7.1-0.20230730144331-10b74a4f6eda + github.com/datarhei/gosrt v0.5.3 github.com/fsnotify/fsnotify v1.6.0 github.com/gin-gonic/gin v1.9.1 github.com/google/uuid v1.3.0 @@ -33,6 +33,8 @@ require ( require ( github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 // indirect github.com/asticode/go-astikit v0.30.0 // indirect + github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 // indirect + github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -71,3 +73,5 @@ require ( ) replace code.cloudfoundry.org/bytefmt => github.com/cloudfoundry/bytefmt v0.0.0-20211005130812-5bb3c17173e5 + +replace github.com/datarhei/gosrt => github.com/aler9/gosrt v0.0.0-20230731174125-a330dfc27f6d diff --git a/go.sum b/go.sum index 215e27c8..54d6d67b 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,16 @@ github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2o github.com/alecthomas/kong v0.8.0 h1:ryDCzutfIqJPnNn0omnrgHLbAggDQM2VWHikE1xqK7s= github.com/alecthomas/kong v0.8.0/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U= github.com/alecthomas/repr v0.1.0 h1:ENn2e1+J3k09gyj2shc0dHr/yjaWSHRlrJ4DPMevDqE= +github.com/aler9/gosrt v0.0.0-20230731174125-a330dfc27f6d h1:iHAf7x9eCDva+Zwo50YIg6LHJDsgsq4R+abVaZ/eyBY= +github.com/aler9/gosrt v0.0.0-20230731174125-a330dfc27f6d/go.mod h1:EryxR+Men7aW67IX2FEo56SU+Pay9OHCw3kZx23qKyQ= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82 h1:9WgSzBLo3a9ToSVV7sRTBYZ1GGOZUpq4+5H3SN0UZq4= github.com/aler9/writerseeker v0.0.0-20220601075008-6f0e685b9c82/go.mod h1:qsMrZCbeBf/mCLOeF16KDkPu4gktn/pOWyaq1aYQE7U= github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87 h1:SCAqalLhgKGDghGz03yYVWr8TavHluP/i7IwshKU9yA= github.com/asticode/go-astits v1.11.1-0.20230727094110-0df190a2dd87/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= +github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYhJeJ2aZxADI2tGADS15AzIF8MQ8XAhT4= +github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI= github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072 h1:pAbC7frXsTMxP7Ck3E50hl7oFeSeD2dgc2lWjmHXztQ= github.com/bluenviron/gohlslib v0.3.1-0.20230730162911-eb9f86511072/go.mod h1:rK4b161qErs82QqvBEl84vpi2xhdZBUT0yubXuytZ7E= github.com/bluenviron/gortsplib/v3 v3.9.1-0.20230730204034-8b8b52e689d9 h1:QBdUlT/taEG0b8dxguJ6GYT7r6vJFRhvwlhs1LGWYlQ= diff --git a/internal/conf/conf.go b/internal/conf/conf.go index e06a5a56..8dc04ce4 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -159,6 +159,10 @@ type Conf struct { WebRTCICEUDPMuxAddress string `json:"webrtcICEUDPMuxAddress"` WebRTCICETCPMuxAddress string `json:"webrtcICETCPMuxAddress"` + // SRT + SRT bool `json:"srt"` + SRTAddress string `json:"srtAddress"` + // paths Paths map[string]*PathConf `json:"paths"` } @@ -336,6 +340,10 @@ func (conf *Conf) UnmarshalJSON(b []byte) error { conf.WebRTCAllowOrigin = "*" conf.WebRTCICEServers2 = []WebRTCICEServer{{URL: "stun:stun.l.google.com:19302"}} + // SRT + conf.SRT = true + conf.SRTAddress = ":8890" + type alias Conf d := json.NewDecoder(bytes.NewReader(b)) d.DisallowUnknownFields() diff --git a/internal/conf/path.go b/internal/conf/path.go index 9c46597b..f5fc4aeb 100644 --- a/internal/conf/path.go +++ b/internal/conf/path.go @@ -210,6 +210,16 @@ func (pconf *PathConf) check(conf *Conf, name string) error { return fmt.Errorf("'%s' is not a valid IP", host) } + case strings.HasPrefix(pconf.Source, "srt://"): + if pconf.Regexp != nil { + return fmt.Errorf("a path with a regular expression (or path 'all') cannot have a SRT source. use another path") + } + + _, err := gourl.Parse(pconf.Source) + if err != nil { + return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source) + } + case pconf.Source == "redirect": if pconf.SourceRedirect == "" { return fmt.Errorf("source redirect must be filled") @@ -337,6 +347,7 @@ func (pconf PathConf) HasStaticSource() bool { strings.HasPrefix(pconf.Source, "http://") || strings.HasPrefix(pconf.Source, "https://") || strings.HasPrefix(pconf.Source, "udp://") || + strings.HasPrefix(pconf.Source, "srt://") || pconf.Source == "rpiCamera" } diff --git a/internal/core/api.go b/internal/core/api.go index be828f5d..15cace43 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -180,6 +180,12 @@ type apiWebRTCManager interface { apiSessionsKick(uuid.UUID) error } +type apiSRTServer interface { + apiConnsList() (*apiSRTConnsList, error) + apiConnsGet(uuid.UUID) (*apiSRTConn, error) + apiConnsKick(uuid.UUID) error +} + type apiParent interface { logger.Writer apiConfigSet(conf *conf.Conf) @@ -194,6 +200,7 @@ type api struct { rtmpsServer apiRTMPServer hlsManager apiHLSManager webRTCManager apiWebRTCManager + srtServer apiSRTServer parent apiParent httpServer *httpserv.WrappedServer @@ -211,6 +218,7 @@ func newAPI( rtmpsServer apiRTMPServer, hlsManager apiHLSManager, webRTCManager apiWebRTCManager, + srtServer apiSRTServer, parent apiParent, ) (*api, error) { a := &api{ @@ -222,6 +230,7 @@ func newAPI( rtmpsServer: rtmpsServer, hlsManager: hlsManager, webRTCManager: webRTCManager, + srtServer: srtServer, parent: parent, } @@ -280,6 +289,12 @@ func newAPI( group.POST("/v2/webrtcsessions/kick/:id", a.onWebRTCSessionsKick) } + if !interfaceIsEmpty(a.srtServer) { + group.GET("/v2/srtconns/list", a.onSRTConnsList) + group.GET("/v2/srtconns/get/:id", a.onSRTConnsGet) + group.POST("/v2/srtconns/kick/:id", a.onSRTConnsKick) + } + network, address := restrictNetwork("tcp", address) var err error @@ -853,6 +868,56 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { ctx.Status(http.StatusOK) } +func (a *api) onSRTConnsList(ctx *gin.Context) { + data, err := a.srtServer.apiConnsList() + if err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } + + data.ItemCount = len(data.Items) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onSRTConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + data, err := a.srtServer.apiConnsGet(uuid) + if err != nil { + abortWithError(ctx, err) + return + } + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onSRTConnsKick(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + err = a.srtServer.apiConnsKick(uuid) + if err != nil { + abortWithError(ctx, err) + return + } + + ctx.Status(http.StatusOK) +} + // confReload is called by core. func (a *api) confReload(conf *conf.Conf) { a.mutex.Lock() diff --git a/internal/core/api_defs.go b/internal/core/api_defs.go index baef6a3b..a049c75d 100644 --- a/internal/core/api_defs.go +++ b/internal/core/api_defs.go @@ -104,3 +104,19 @@ type apiWebRTCSessionsList struct { PageCount int `json:"pageCount"` Items []*apiWebRTCSession `json:"items"` } + +type apiSRTConn struct { + ID uuid.UUID `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` + Path string `json:"path"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiSRTConnsList struct { + ItemCount int `json:"itemCount"` + PageCount int `json:"pageCount"` + Items []*apiSRTConn `json:"items"` +} diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 5d5c3b1e..a3d03e66 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -1,6 +1,7 @@ package core import ( + "bufio" "bytes" "crypto/tls" "encoding/json" @@ -17,6 +18,8 @@ import ( "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" "github.com/google/uuid" "github.com/pion/rtp" "github.com/stretchr/testify/require" @@ -509,6 +512,7 @@ func TestAPIProtocolList(t *testing.T) { "rtmps", "hls", "webrtc", + "srt", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -663,10 +667,33 @@ func TestAPIProtocolList(t *testing.T) { }) <-c.incomingTrack + + case "srt": + conf := srt.DefaultConfig() + conf.StreamId = "publish:mypath" + + conn, err := srt.Dial("srt", "localhost:8890", conf) + require.NoError(t, err) + defer conn.Close() + + track := &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(conn) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}}) + require.NoError(t, err) + bw.Flush() + + time.Sleep(500 * time.Millisecond) } switch ca { - case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps": + case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt": var pa string switch ca { case "rtsp conns": @@ -686,6 +713,9 @@ func TestAPIProtocolList(t *testing.T) { case "rtmps": pa = "rtmpsconns" + + case "srt": + pa = "srtconns" } type item struct { @@ -763,6 +793,7 @@ func TestAPIProtocolGet(t *testing.T) { "rtmps", "hls", "webrtc", + "srt", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -917,10 +948,33 @@ func TestAPIProtocolGet(t *testing.T) { }) <-c.incomingTrack + + case "srt": + conf := srt.DefaultConfig() + conf.StreamId = "publish:mypath" + + conn, err := srt.Dial("srt", "localhost:8890", conf) + require.NoError(t, err) + defer conn.Close() + + track := &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(conn) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}}) + require.NoError(t, err) + bw.Flush() + + time.Sleep(500 * time.Millisecond) } switch ca { - case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps": + case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps", "srt": var pa string switch ca { case "rtsp conns": @@ -940,6 +994,9 @@ func TestAPIProtocolGet(t *testing.T) { case "rtmps": pa = "rtmpsconns" + + case "srt": + pa = "srtconns" } type item struct { @@ -1020,6 +1077,7 @@ func TestAPIProtocolGetNotFound(t *testing.T) { "rtmps", "hls", "webrtc", + "srt", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -1071,6 +1129,9 @@ func TestAPIProtocolGetNotFound(t *testing.T) { case "webrtc": pa = "webrtcsessions" + + case "srt": + pa = "srtconns" } func() { @@ -1100,6 +1161,7 @@ func TestAPIProtocolKick(t *testing.T) { "rtsps", "rtmp", "webrtc", + "srt", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -1158,6 +1220,29 @@ func TestAPIProtocolKick(t *testing.T) { case "webrtc": c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whip", true) defer c.close() + + case "srt": + conf := srt.DefaultConfig() + conf.StreamId = "publish:mypath" + + conn, err := srt.Dial("srt", "localhost:8890", conf) + require.NoError(t, err) + defer conn.Close() + + track := &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(conn) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{{1}}) + require.NoError(t, err) + bw.Flush() + + // time.Sleep(500 * time.Millisecond) } var pa string @@ -1173,6 +1258,9 @@ func TestAPIProtocolKick(t *testing.T) { case "webrtc": pa = "webrtcsessions" + + case "srt": + pa = "srtconns" } var out1 struct { @@ -1209,6 +1297,7 @@ func TestAPIProtocolKickNotFound(t *testing.T) { "rtsps", "rtmp", "webrtc", + "srt", } { t.Run(ca, func(t *testing.T) { conf := "api: yes\n" @@ -1242,6 +1331,9 @@ func TestAPIProtocolKickNotFound(t *testing.T) { case "webrtc": pa = "webrtcsessions" + + case "srt": + pa = "srtconns" } func() { diff --git a/internal/core/authentication.go b/internal/core/authentication.go index f77911a2..3ce28385 100644 --- a/internal/core/authentication.go +++ b/internal/core/authentication.go @@ -50,6 +50,7 @@ const ( authProtocolRTMP authProtocol = "rtmp" authProtocolHLS authProtocol = "hls" authProtocolWebRTC authProtocol = "webrtc" + authProtocolSRT authProtocol = "srt" ) type authCredentials struct { diff --git a/internal/core/core.go b/internal/core/core.go index 743d21fe..01c8519d 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -44,6 +44,7 @@ type Core struct { rtmpsServer *rtmpServer hlsManager *hlsManager webRTCManager *webRTCManager + srtServer *srtServer api *api confWatcher *confwatcher.ConfWatcher @@ -432,6 +433,23 @@ func (p *Core) createResources(initial bool) error { } } + if p.conf.SRT { + if p.srtServer == nil { + p.srtServer, err = newSRTServer( + p.conf.SRTAddress, + p.conf.ReadTimeout, + p.conf.WriteTimeout, + p.conf.ReadBufferCount, + p.conf.UDPMaxPayloadSize, + p.pathManager, + p, + ) + if err != nil { + return err + } + } + } + if p.conf.API { if p.api == nil { p.api, err = newAPI( @@ -445,6 +463,7 @@ func (p *Core) createResources(initial bool) error { p.rtmpsServer, p.hlsManager, p.webRTCManager, + p.srtServer, p, ) if err != nil { @@ -595,6 +614,15 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.WebRTCICEUDPMuxAddress != p.conf.WebRTCICEUDPMuxAddress || newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress + closeSRTServer := newConf == nil || + newConf.SRT != p.conf.SRT || + newConf.SRTAddress != p.conf.SRTAddress || + newConf.ReadTimeout != p.conf.ReadTimeout || + newConf.WriteTimeout != p.conf.WriteTimeout || + newConf.ReadBufferCount != p.conf.ReadBufferCount || + newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || + closePathManager + closeAPI := newConf == nil || newConf.API != p.conf.API || newConf.APIAddress != p.conf.APIAddress || @@ -604,7 +632,8 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { closeRTSPSServer || closeRTMPServer || closeHLSManager || - closeWebRTCManager + closeWebRTCManager || + closeSRTServer if newConf == nil && p.confWatcher != nil { p.confWatcher.Close() @@ -620,6 +649,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { } } + if closeSRTServer && p.srtServer != nil { + p.srtServer.close() + p.srtServer = nil + } + if closeWebRTCManager && p.webRTCManager != nil { p.webRTCManager.close() p.webRTCManager = nil diff --git a/internal/core/hls_source_test.go b/internal/core/hls_source_test.go index 91f45332..3f254f4b 100644 --- a/internal/core/hls_source_test.go +++ b/internal/core/hls_source_test.go @@ -8,18 +8,33 @@ import ( "net/http" "testing" - "github.com/asticode/go-astits" "github.com/bluenviron/gortsplib/v3" "github.com/bluenviron/gortsplib/v3/pkg/formats" "github.com/bluenviron/gortsplib/v3/pkg/media" "github.com/bluenviron/gortsplib/v3/pkg/url" - "github.com/bluenviron/mediacommon/pkg/codecs/h264" "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" "github.com/gin-gonic/gin" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) +var track1 = &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, +} + +var track2 = &mpegts.Track{ + PID: 257, + Codec: &mpegts.CodecMPEG4Audio{ + Config: mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + }, +} + type testHLSManager struct { s *http.Server @@ -71,131 +86,29 @@ segment2.ts func (ts *testHLSManager) onSegment1(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", `video/MP2T`) - mux := astits.NewMuxer(context.Background(), ctx.Writer) - - mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - - mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 257, - StreamType: astits.StreamTypeAACAudio, - }) - mux.SetPCRPID(256) + w := mpegts.NewWriter(ctx.Writer, []*mpegts.Track{track1, track2}) - mux.WriteTables() - - pkts := mpeg4audio.ADTSPackets{ - { - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - AU: []byte{0x01, 0x02, 0x03, 0x04}, - }, - } - enc, _ := pkts.Marshal() - - mux.WriteData(&astits.MuxerData{ - PID: 257, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64(1 * 90000)}, - }, - StreamID: 192, - }, - Data: enc, - }, - }) + w.WriteMPEG4Audio(track2, 1*90000, [][]byte{{1, 2, 3, 4}}) } func (ts *testHLSManager) onSegment2(ctx *gin.Context) { <-ts.clientConnected ctx.Writer.Header().Set("Content-Type", `video/MP2T`) - mux := astits.NewMuxer(context.Background(), ctx.Writer) - - mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 256, - StreamType: astits.StreamTypeH264Video, - }) - - mux.AddElementaryStream(astits.PMTElementaryStream{ - ElementaryPID: 257, - StreamType: astits.StreamTypeAACAudio, - }) - mux.SetPCRPID(256) + w := mpegts.NewWriter(ctx.Writer, []*mpegts.Track{track1, track2}) - mux.WriteTables() - - enc, _ := h264.AnnexBMarshal([][]byte{ + w.WriteH26x(track1, 2*90000, 2*90000, true, [][]byte{ {7, 1, 2, 3}, // SPS {8}, // PPS }) - mux.WriteData(&astits.MuxerData{ - PID: 256, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64(2 * 90000)}, - }, - StreamID: 224, // = video - }, - Data: enc, - }, - }) - - pkts := mpeg4audio.ADTSPackets{ - { - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - AU: []byte{0x01, 0x02, 0x03, 0x04}, - }, - } - enc, _ = pkts.Marshal() - - mux.WriteData(&astits.MuxerData{ - PID: 257, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64(1 * 90000)}, - }, - StreamID: 192, - }, - Data: enc, - }, - }) + w.WriteMPEG4Audio(track2, 2*90000, [][]byte{{1, 2, 3, 4}}) - enc, _ = h264.AnnexBMarshal([][]byte{ + w.WriteH26x(track1, 2*90000, 2*90000, true, [][]byte{ {5}, // IDR }) - - mux.WriteData(&astits.MuxerData{ - PID: 256, - PES: &astits.PESData{ - Header: &astits.PESHeader{ - OptionalHeader: &astits.PESOptionalHeader{ - MarkerBits: 2, - PTSDTSIndicator: astits.PTSDTSIndicatorOnlyPTS, - PTS: &astits.ClockReference{Base: int64(2 * 90000)}, - }, - StreamID: 224, // = video - }, - Data: enc, - }, - }) } func TestHLSSource(t *testing.T) { diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 486a5dea..df0caaf5 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -41,8 +41,7 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values, string) { type rtmpConnState int const ( - rtmpConnStateIdle rtmpConnState = iota //nolint:deadcode,varcheck - rtmpConnStateRead + rtmpConnStateRead rtmpConnState = iota + 1 rtmpConnStatePublish ) @@ -756,8 +755,10 @@ func (c *rtmpConn) apiItem() *apiRTMPConn { case rtmpConnStatePublish: return "publish" + + default: + return "idle" } - return "idle" }(), Path: c.pathName, BytesReceived: bytesReceived, diff --git a/internal/core/source_static.go b/internal/core/source_static.go index 72c6e298..b5d45b94 100644 --- a/internal/core/source_static.go +++ b/internal/core/source_static.go @@ -86,6 +86,11 @@ func newSourceStatic( readTimeout, s) + case strings.HasPrefix(cnf.Source, "srt://"): + s.impl = newSRTSource( + readTimeout, + s) + case cnf.Source == "rpiCamera": s.impl = newRPICameraSource( s) diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go new file mode 100644 index 00000000..8dea51cc --- /dev/null +++ b/internal/core/srt_conn.go @@ -0,0 +1,810 @@ +package core + +import ( + "bufio" + "context" + "errors" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/media" + "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/codecs/h265" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + "github.com/google/uuid" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/formatprocessor" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" +) + +func durationGoToMPEGTS(v time.Duration) int64 { + return int64(v.Seconds() * 90000) +} + +func h265RandomAccessPresent(au [][]byte) bool { + for _, nalu := range au { + typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + switch typ { + case h265.NALUType_IDR_W_RADL, h265.NALUType_IDR_N_LP, h265.NALUType_CRA_NUT: + return true + } + } + return false +} + +type srtConnState int + +const ( + srtConnStateRead srtConnState = iota + 1 + srtConnStatePublish +) + +type srtConnPathManager interface { + addReader(req pathAddReaderReq) pathAddReaderRes + addPublisher(req pathAddPublisherReq) pathAddPublisherRes +} + +type srtConnParent interface { + logger.Writer + closeConn(*srtConn) +} + +type srtConn struct { + readTimeout conf.StringDuration + writeTimeout conf.StringDuration + readBufferCount int + udpMaxPayloadSize int + connReq srt.ConnRequest + wg *sync.WaitGroup + pathManager srtConnPathManager + parent srtConnParent + + ctx context.Context + ctxCancel func() + created time.Time + uuid uuid.UUID + mutex sync.RWMutex + state srtConnState + pathName string + conn srt.Conn + + chNew chan srtNewConnReq + chSetConn chan srt.Conn +} + +func newSRTConn( + parentCtx context.Context, + readTimeout conf.StringDuration, + writeTimeout conf.StringDuration, + readBufferCount int, + udpMaxPayloadSize int, + connReq srt.ConnRequest, + wg *sync.WaitGroup, + pathManager srtConnPathManager, + parent srtConnParent, +) *srtConn { + ctx, ctxCancel := context.WithCancel(parentCtx) + + c := &srtConn{ + readTimeout: readTimeout, + writeTimeout: writeTimeout, + readBufferCount: readBufferCount, + udpMaxPayloadSize: udpMaxPayloadSize, + connReq: connReq, + wg: wg, + pathManager: pathManager, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + created: time.Now(), + uuid: uuid.New(), + chNew: make(chan srtNewConnReq), + chSetConn: make(chan srt.Conn), + } + + c.Log(logger.Info, "opened") + + c.wg.Add(1) + go c.run() + + return c +} + +func (c *srtConn) close() { + c.ctxCancel() +} + +func (c *srtConn) Log(level logger.Level, format string, args ...interface{}) { + c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.connReq.RemoteAddr()}, args...)...) +} + +func (c *srtConn) ip() net.IP { + return c.connReq.RemoteAddr().(*net.UDPAddr).IP +} + +func (c *srtConn) run() { + defer c.wg.Done() + + err := c.runInner() + + c.ctxCancel() + + c.parent.closeConn(c) + + c.Log(logger.Info, "closed (%v)", err) +} + +func (c *srtConn) runInner() error { + var req srtNewConnReq + select { + case req = <-c.chNew: + case <-c.ctx.Done(): + return errors.New("terminated") + } + + answerSent, err := c.runInner2(req) + + if !answerSent { + req.res <- nil + } + + return err +} + +func (c *srtConn) runInner2(req srtNewConnReq) (bool, error) { + parts := strings.Split(req.connReq.StreamId(), ":") + if (len(parts) != 2 && len(parts) != 4) || (parts[0] != "read" && parts[0] != "publish") { + return false, fmt.Errorf("invalid streamid '%s':"+ + " it must be 'action:pathname' or 'action:pathname:user:pass', "+ + "where action is either read or publish, pathname is the path name, user and pass are the credentials", + req.connReq.StreamId()) + } + + pathName := parts[1] + user := "" + pass := "" + + if len(parts) == 4 { + user, pass = parts[2], parts[3] + } + + if parts[0] == "publish" { + return c.runPublish(req, pathName, user, pass) + } + return c.runRead(req, pathName, user, pass) +} + +func (c *srtConn) runPublish(req srtNewConnReq, pathName string, user string, pass string) (bool, error) { + res := c.pathManager.addPublisher(pathAddPublisherReq{ + author: c, + pathName: pathName, + credentials: authCredentials{ + ip: c.ip(), + user: user, + pass: pass, + proto: authProtocolSRT, + id: &c.uuid, + }, + }) + + if res.err != nil { + if terr, ok := res.err.(*errAuthentication); ok { + // TODO: re-enable. Currently this freezes the listener. + // wait some seconds to stop brute force attacks + // <-time.After(srtPauseAfterAuthError) + return false, terr + } + return false, res.err + } + + defer res.path.removePublisher(pathRemovePublisherReq{author: c}) + + sconn, err := c.exchangeRequestWithConn(req) + if err != nil { + return true, err + } + + c.mutex.Lock() + c.state = srtConnStatePublish + c.pathName = pathName + c.conn = sconn + c.mutex.Unlock() + + readerErr := make(chan error) + go func() { + readerErr <- c.runPublishReader(sconn, res.path) + }() + + select { + case err := <-readerErr: + sconn.Close() + return true, err + + case <-c.ctx.Done(): + sconn.Close() + <-readerErr + return true, errors.New("terminated") + } +} + +func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { + sconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) + r, err := mpegts.NewReader(mpegts.NewBufferedReader(sconn)) + if err != nil { + return err + } + + var medias media.Medias + var stream *stream.Stream + + var td *mpegts.TimeDecoder + decodeTime := func(t int64) time.Duration { + if td == nil { + td = mpegts.NewTimeDecoder(t) + } + return td.Decode(t) + } + + for _, track := range r.Tracks() { //nolint:dupl + var medi *media.Media + + switch tcodec := track.Codec.(type) { + case *mpegts.CodecH264: + medi = &media.Media{ + Type: media.TypeVideo, + Formats: []formats.Format{&formats.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AU: au, + }) + return nil + }) + + case *mpegts.CodecH265: + medi = &media.Media{ + Type: media.TypeVideo, + Formats: []formats.Format{&formats.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AU: au, + }) + return nil + }) + + case *mpegts.CodecMPEG4Audio: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.MPEG4Audio{ + PayloadTyp: 96, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + Config: &tcodec.Config, + }}, + } + + r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AUs: aus, + }) + return nil + }) + + case *mpegts.CodecOpus: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.Opus{ + PayloadTyp: 96, + IsStereo: (tcodec.ChannelCount == 2), + }}, + } + + r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + Packets: packets, + }) + return nil + }) + } + + medias = append(medias, medi) + } + + rres := path.startPublisher(pathStartPublisherReq{ + author: c, + medias: medias, + generateRTPPackets: true, + }) + if rres.err != nil { + return rres.err + } + + c.Log(logger.Info, "is publishing to path '%s', %s", + path.name, + sourceMediaInfo(medias)) + + stream = rres.stream + + for { + err := r.Read() + if err != nil { + return err + } + } +} + +func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass string) (bool, error) { + res := c.pathManager.addReader(pathAddReaderReq{ + author: c, + pathName: pathName, + credentials: authCredentials{ + ip: c.ip(), + user: user, + pass: pass, + proto: authProtocolSRT, + id: &c.uuid, + }, + }) + + if res.err != nil { + if terr, ok := res.err.(*errAuthentication); ok { + // TODO: re-enable. Currently this freezes the listener. + // wait some seconds to stop brute force attacks + // <-time.After(srtPauseAfterAuthError) + return false, terr + } + return false, res.err + } + + defer res.path.removeReader(pathRemoveReaderReq{author: c}) + + sconn, err := c.exchangeRequestWithConn(req) + if err != nil { + return true, err + } + defer sconn.Close() + + c.mutex.Lock() + c.state = srtConnStateRead + c.pathName = pathName + c.conn = sconn + c.mutex.Unlock() + + ringBuffer, _ := ringbuffer.New(uint64(c.readBufferCount)) + go func() { + <-c.ctx.Done() + ringBuffer.Close() + }() + + var w *mpegts.Writer + nextPID := uint16(256) + var tracks []*mpegts.Track + var medias media.Medias + bw := bufio.NewWriterSize(sconn, srtMaxPayloadSize(c.udpMaxPayloadSize)) + + leadingTrackChosen := false + leadingTrackInitialized := false + var leadingTrackStartDTS time.Duration + + for _, medi := range res.stream.Medias() { + for _, format := range medi.Formats { + switch format := format.(type) { + case *formats.H265: + track := &mpegts.Track{ + PID: nextPID, + Codec: &mpegts.CodecH265{}, + } + tracks = append(tracks, track) + medias = append(medias, medi) + nextPID++ + + var startPTS time.Duration + startPTSFilled := false + + var isLeadingTrack bool + if !leadingTrackChosen { + isLeadingTrack = true + } else { + isLeadingTrack = false + } + + randomAccessReceived := false + dtsExtractor := h265.NewDTSExtractor() + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitH265) + if tunit.AU == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + randomAccessPresent := h265RandomAccessPresent(tunit.AU) + + if !randomAccessReceived { + if !randomAccessPresent { + return nil + } + randomAccessReceived = true + } + + pts := tunit.PTS - startPTS + dts, err := dtsExtractor.Extract(tunit.AU, pts) + if err != nil { + return err + } + + if !leadingTrackInitialized { + if isLeadingTrack { + leadingTrackStartDTS = dts + leadingTrackInitialized = true + } else { + return nil + } + } + + dts -= leadingTrackStartDTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), randomAccessPresent, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) + }) + + case *formats.H264: + track := &mpegts.Track{ + PID: nextPID, + Codec: &mpegts.CodecH264{}, + } + tracks = append(tracks, track) + medias = append(medias, medi) + nextPID++ + + var startPTS time.Duration + startPTSFilled := false + + var isLeadingTrack bool + if !leadingTrackChosen { + isLeadingTrack = true + } else { + isLeadingTrack = false + } + + firstIDRReceived := false + dtsExtractor := h264.NewDTSExtractor() + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitH264) + if tunit.AU == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + idrPresent := h264.IDRPresent(tunit.AU) + + if !firstIDRReceived { + if !idrPresent { + return nil + } + firstIDRReceived = true + } + + pts := tunit.PTS - startPTS + dts, err := dtsExtractor.Extract(tunit.AU, pts) + if err != nil { + return err + } + + if !leadingTrackInitialized { + if isLeadingTrack { + leadingTrackStartDTS = dts + leadingTrackInitialized = true + } else { + return nil + } + } + + dts -= leadingTrackStartDTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteH26x(track, durationGoToMPEGTS(pts), durationGoToMPEGTS(dts), idrPresent, tunit.AU) + if err != nil { + return err + } + return bw.Flush() + }) + }) + + case *formats.MPEG4AudioGeneric: + track := &mpegts.Track{ + PID: nextPID, + Codec: &mpegts.CodecMPEG4Audio{ + Config: *format.Config, + }, + } + tracks = append(tracks, track) + medias = append(medias, medi) + nextPID++ + + var startPTS time.Duration + startPTSFilled := false + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitMPEG4AudioGeneric) + if tunit.AUs == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + if leadingTrackChosen && !leadingTrackInitialized { + return nil + } + + pts := tunit.PTS + pts -= startPTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), tunit.AUs) + if err != nil { + return err + } + return bw.Flush() + }) + }) + + case *formats.MPEG4AudioLATM: + if format.Config != nil && + len(format.Config.Programs) == 1 && + len(format.Config.Programs[0].Layers) == 1 { + track := &mpegts.Track{ + PID: nextPID, + Codec: &mpegts.CodecMPEG4Audio{ + Config: *format.Config.Programs[0].Layers[0].AudioSpecificConfig, + }, + } + tracks = append(tracks, track) + medias = append(medias, medi) + nextPID++ + + var startPTS time.Duration + startPTSFilled := false + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitMPEG4AudioLATM) + if tunit.AU == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + if leadingTrackChosen && !leadingTrackInitialized { + return nil + } + + pts := tunit.PTS + pts -= startPTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteMPEG4Audio(track, durationGoToMPEGTS(pts), [][]byte{tunit.AU}) + if err != nil { + return err + } + return bw.Flush() + }) + }) + } + + case *formats.Opus: + track := &mpegts.Track{ + PID: nextPID, + Codec: &mpegts.CodecOpus{ + ChannelCount: func() int { + if format.IsStereo { + return 2 + } + return 1 + }(), + }, + } + tracks = append(tracks, track) + medias = append(medias, medi) + nextPID++ + + var startPTS time.Duration + startPTSFilled := false + + res.stream.AddReader(c, medi, format, func(unit formatprocessor.Unit) { + ringBuffer.Push(func() error { + tunit := unit.(*formatprocessor.UnitOpus) + if tunit.Packets == nil { + return nil + } + + if !startPTSFilled { + startPTS = tunit.PTS + startPTSFilled = true + } + + if leadingTrackChosen && !leadingTrackInitialized { + return nil + } + + pts := tunit.PTS + pts -= startPTS + pts -= leadingTrackStartDTS + + sconn.SetWriteDeadline(time.Now().Add(time.Duration(c.writeTimeout))) + err = w.WriteOpus(track, durationGoToMPEGTS(pts), tunit.Packets) + if err != nil { + return err + } + return bw.Flush() + }) + }) + } + } + } + + if len(tracks) == 0 { + return true, fmt.Errorf( + "the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG4-Audio") + } + + c.Log(logger.Info, "is reading from path '%s', %s", + res.path.name, sourceMediaInfo(medias)) + + w = mpegts.NewWriter(bw, tracks) + + // disable read deadline + sconn.SetReadDeadline(time.Time{}) + + for { + item, ok := ringBuffer.Pull() + if !ok { + return true, fmt.Errorf("terminated") + } + + err := item.(func() error)() + if err != nil { + return true, err + } + } +} + +func (c *srtConn) exchangeRequestWithConn(req srtNewConnReq) (srt.Conn, error) { + req.res <- c + + select { + case sconn := <-c.chSetConn: + return sconn, nil + + case <-c.ctx.Done(): + return nil, errors.New("terminated") + } +} + +// new is called by srtListener through srtServer. +func (c *srtConn) new(req srtNewConnReq) *srtConn { + select { + case c.chNew <- req: + return <-req.res + + case <-c.ctx.Done(): + return nil + } +} + +// setConn is called by srtListener . +func (c *srtConn) setConn(sconn srt.Conn) { + select { + case c.chSetConn <- sconn: + case <-c.ctx.Done(): + } +} + +// apiReaderDescribe implements reader. +func (c *srtConn) apiReaderDescribe() pathAPISourceOrReader { + return pathAPISourceOrReader{ + Type: "srtConn", + ID: c.uuid.String(), + } +} + +// apiSourceDescribe implements source. +func (c *srtConn) apiSourceDescribe() pathAPISourceOrReader { + return c.apiReaderDescribe() +} + +func (c *srtConn) apiItem() *apiSRTConn { + c.mutex.RLock() + defer c.mutex.RUnlock() + + bytesReceived := uint64(0) + bytesSent := uint64(0) + + if c.conn != nil { + var s srt.Statistics + c.conn.Stats(&s) + bytesReceived = s.Accumulated.ByteRecv + bytesSent = s.Accumulated.ByteSent + } + + return &apiSRTConn{ + ID: c.uuid, + Created: c.created, + RemoteAddr: c.connReq.RemoteAddr().String(), + State: func() string { + switch c.state { + case srtConnStateRead: + return "read" + + case srtConnStatePublish: + return "publish" + + default: + return "idle" + } + }(), + Path: c.pathName, + BytesReceived: bytesReceived, + BytesSent: bytesSent, + } +} diff --git a/internal/core/srt_listener.go b/internal/core/srt_listener.go new file mode 100644 index 00000000..3fb75fbb --- /dev/null +++ b/internal/core/srt_listener.go @@ -0,0 +1,60 @@ +package core + +import ( + "sync" + + "github.com/datarhei/gosrt" +) + +type srtListener struct { + ln srt.Listener + wg *sync.WaitGroup + parent *srtServer +} + +func newSRTListener( + ln srt.Listener, + wg *sync.WaitGroup, + parent *srtServer, +) *srtListener { + l := &srtListener{ + ln: ln, + wg: wg, + parent: parent, + } + + l.wg.Add(1) + go l.run() + + return l +} + +func (l *srtListener) run() { + defer l.wg.Done() + + err := func() error { + for { + var sconn *srtConn + conn, _, err := l.ln.Accept(func(req srt.ConnRequest) srt.ConnType { + sconn = l.parent.newConnRequest(req) + if sconn == nil { + return srt.REJECT + } + + // currently it's the same to return SUBSCRIBE or PUBLISH + return srt.SUBSCRIBE + }) + if err != nil { + return err + } + + if conn == nil { + continue + } + + sconn.setConn(conn) + } + }() + + l.parent.acceptError(err) +} diff --git a/internal/core/srt_server.go b/internal/core/srt_server.go new file mode 100644 index 00000000..70d0c9ee --- /dev/null +++ b/internal/core/srt_server.go @@ -0,0 +1,308 @@ +package core + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/datarhei/gosrt" + "github.com/google/uuid" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/logger" +) + +func srtMaxPayloadSize(u int) int { + return ((u - 16) / 188) * 188 // 16 = SRT header, 188 = MPEG-TS packet +} + +type srtNewConnReq struct { + connReq srt.ConnRequest + res chan *srtConn +} + +type srtServerAPIConnsListRes struct { + data *apiSRTConnsList + err error +} + +type srtServerAPIConnsListReq struct { + res chan srtServerAPIConnsListRes +} + +type srtServerAPIConnsGetRes struct { + data *apiSRTConn + err error +} + +type srtServerAPIConnsGetReq struct { + uuid uuid.UUID + res chan srtServerAPIConnsGetRes +} + +type srtServerAPIConnsKickRes struct { + err error +} + +type srtServerAPIConnsKickReq struct { + uuid uuid.UUID + res chan srtServerAPIConnsKickRes +} + +type srtServerParent interface { + logger.Writer +} + +type srtServer struct { + readTimeout conf.StringDuration + writeTimeout conf.StringDuration + readBufferCount int + udpMaxPayloadSize int + pathManager *pathManager + parent srtServerParent + + ctx context.Context + ctxCancel func() + wg sync.WaitGroup + ln srt.Listener + conns map[*srtConn]struct{} + + // in + chNewConnRequest chan srtNewConnReq + chAcceptErr chan error + chCloseConn chan *srtConn + chAPIConnsList chan srtServerAPIConnsListReq + chAPIConnsGet chan srtServerAPIConnsGetReq + chAPIConnsKick chan srtServerAPIConnsKickReq +} + +func newSRTServer( + address string, + readTimeout conf.StringDuration, + writeTimeout conf.StringDuration, + readBufferCount int, + udpMaxPayloadSize int, + pathManager *pathManager, + parent srtServerParent, +) (*srtServer, error) { + conf := srt.DefaultConfig() + conf.ConnectionTimeout = time.Duration(readTimeout) + conf.PayloadSize = uint32(srtMaxPayloadSize(udpMaxPayloadSize)) + + ln, err := srt.Listen("srt", address, conf) + if err != nil { + return nil, err + } + + ctx, ctxCancel := context.WithCancel(context.Background()) + + s := &srtServer{ + readTimeout: readTimeout, + writeTimeout: writeTimeout, + readBufferCount: readBufferCount, + udpMaxPayloadSize: udpMaxPayloadSize, + pathManager: pathManager, + parent: parent, + ctx: ctx, + ctxCancel: ctxCancel, + ln: ln, + conns: make(map[*srtConn]struct{}), + chNewConnRequest: make(chan srtNewConnReq), + chAcceptErr: make(chan error), + chCloseConn: make(chan *srtConn), + chAPIConnsList: make(chan srtServerAPIConnsListReq), + chAPIConnsGet: make(chan srtServerAPIConnsGetReq), + chAPIConnsKick: make(chan srtServerAPIConnsKickReq), + } + + s.Log(logger.Info, "listener opened on "+address+" (UDP)") + + newSRTListener( + s.ln, + &s.wg, + s, + ) + + s.wg.Add(1) + go s.run() + + return s, nil +} + +// Log is the main logging function. +func (s *srtServer) Log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[SRT] "+format, append([]interface{}{}, args...)...) +} + +func (s *srtServer) close() { + s.Log(logger.Info, "listener is closing") + s.ctxCancel() + s.wg.Wait() +} + +func (s *srtServer) run() { + defer s.wg.Done() + +outer: + for { + select { + case err := <-s.chAcceptErr: + s.Log(logger.Error, "%s", err) + break outer + + case req := <-s.chNewConnRequest: + c := newSRTConn( + s.ctx, + s.readTimeout, + s.writeTimeout, + s.readBufferCount, + s.udpMaxPayloadSize, + req.connReq, + &s.wg, + s.pathManager, + s) + s.conns[c] = struct{}{} + req.res <- c + + case c := <-s.chCloseConn: + delete(s.conns, c) + + case req := <-s.chAPIConnsList: + data := &apiSRTConnsList{ + Items: []*apiSRTConn{}, + } + + for c := range s.conns { + data.Items = append(data.Items, c.apiItem()) + } + + sort.Slice(data.Items, func(i, j int) bool { + return data.Items[i].Created.Before(data.Items[j].Created) + }) + + req.res <- srtServerAPIConnsListRes{data: data} + + case req := <-s.chAPIConnsGet: + c := s.findConnByUUID(req.uuid) + if c == nil { + req.res <- srtServerAPIConnsGetRes{err: errAPINotFound} + continue + } + + req.res <- srtServerAPIConnsGetRes{data: c.apiItem()} + + case req := <-s.chAPIConnsKick: + c := s.findConnByUUID(req.uuid) + if c == nil { + req.res <- srtServerAPIConnsKickRes{err: errAPINotFound} + continue + } + + delete(s.conns, c) + c.close() + req.res <- srtServerAPIConnsKickRes{} + + case <-s.ctx.Done(): + break outer + } + } + + s.ctxCancel() + + s.ln.Close() +} + +func (s *srtServer) findConnByUUID(uuid uuid.UUID) *srtConn { + for sx := range s.conns { + if sx.uuid == uuid { + return sx + } + } + return nil +} + +// newConnRequest is called by srtListener. +func (s *srtServer) newConnRequest(connReq srt.ConnRequest) *srtConn { + req := srtNewConnReq{ + connReq: connReq, + res: make(chan *srtConn), + } + + select { + case s.chNewConnRequest <- req: + c := <-req.res + + return c.new(req) + + case <-s.ctx.Done(): + return nil + } +} + +// acceptError is called by srtListener. +func (s *srtServer) acceptError(err error) { + select { + case s.chAcceptErr <- err: + case <-s.ctx.Done(): + } +} + +// closeConn is called by srtConn. +func (s *srtServer) closeConn(c *srtConn) { + select { + case s.chCloseConn <- c: + case <-s.ctx.Done(): + } +} + +// apiConnsList is called by api. +func (s *srtServer) apiConnsList() (*apiSRTConnsList, error) { + req := srtServerAPIConnsListReq{ + res: make(chan srtServerAPIConnsListRes), + } + + select { + case s.chAPIConnsList <- req: + res := <-req.res + return res.data, res.err + + case <-s.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} + +// apiConnsGet is called by api. +func (s *srtServer) apiConnsGet(uuid uuid.UUID) (*apiSRTConn, error) { + req := srtServerAPIConnsGetReq{ + uuid: uuid, + res: make(chan srtServerAPIConnsGetRes), + } + + select { + case s.chAPIConnsGet <- req: + res := <-req.res + return res.data, res.err + + case <-s.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} + +// apiConnsKick is called by api. +func (s *srtServer) apiConnsKick(uuid uuid.UUID) error { + req := srtServerAPIConnsKickReq{ + uuid: uuid, + res: make(chan srtServerAPIConnsKickRes), + } + + select { + case s.chAPIConnsKick <- req: + res := <-req.res + return res.err + + case <-s.ctx.Done(): + return fmt.Errorf("terminated") + } +} diff --git a/internal/core/srt_server_test.go b/internal/core/srt_server_test.go new file mode 100644 index 00000000..b3e79063 --- /dev/null +++ b/internal/core/srt_server_test.go @@ -0,0 +1,115 @@ +package core + +import ( + "bufio" + "testing" + "time" + + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + "github.com/stretchr/testify/require" +) + +func TestSRTServer(t *testing.T) { + p, ok := newInstance("paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + conf := srt.DefaultConfig() + address, err := conf.UnmarshalURL("srt://localhost:8890?streamid=publish:mypath") + require.NoError(t, err) + + err = conf.Validate() + require.NoError(t, err) + + publisher, err := srt.Dial("srt", address, conf) + require.NoError(t, err) + defer publisher.Close() + + track := &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(publisher) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + { // IDR + 0x05, 1, + }, + }) + require.NoError(t, err) + bw.Flush() + + time.Sleep(500 * time.Millisecond) + + conf = srt.DefaultConfig() + address, err = conf.UnmarshalURL("srt://localhost:8890?streamid=read:mypath") + require.NoError(t, err) + + err = conf.Validate() + require.NoError(t, err) + + reader, err := srt.Dial("srt", address, conf) + require.NoError(t, err) + defer reader.Close() + + err = w.WriteH26x(track, 2*90000, 1*90000, true, [][]byte{ + { // IDR + 0x05, 2, + }, + }) + require.NoError(t, err) + bw.Flush() + + r, err := mpegts.NewReader(reader) + require.NoError(t, err) + + require.Equal(t, []*mpegts.Track{{ + PID: 256, + Codec: &mpegts.CodecH264{}, + }}, r.Tracks()) + + received := false + + r.OnDataH26x(r.Tracks()[0], func(pts int64, dts int64, au [][]byte) error { + require.Equal(t, int64(0), pts) + require.Equal(t, int64(0), dts) + require.Equal(t, [][]byte{ + { // SPS + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, + 0x20, + }, + { // PPS + 0x08, 0x06, 0x07, 0x08, + }, + { // IDR + 0x05, 1, + }, + }, au) + received = true + return nil + }) + + for { + err = r.Read() + require.NoError(t, err) + if received { + break + } + } +} diff --git a/internal/core/srt_source.go b/internal/core/srt_source.go new file mode 100644 index 00000000..18d398bc --- /dev/null +++ b/internal/core/srt_source.go @@ -0,0 +1,221 @@ +package core + +import ( + "context" + "time" + + "github.com/bluenviron/gortsplib/v3/pkg/formats" + "github.com/bluenviron/gortsplib/v3/pkg/media" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/formatprocessor" + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" +) + +type srtSourceParent interface { + logger.Writer + sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes + sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) +} + +type srtSource struct { + readTimeout conf.StringDuration + parent srtSourceParent +} + +func newSRTSource( + readTimeout conf.StringDuration, + parent srtSourceParent, +) *srtSource { + s := &srtSource{ + readTimeout: readTimeout, + parent: parent, + } + + return s +} + +func (s *srtSource) Log(level logger.Level, format string, args ...interface{}) { + s.parent.Log(level, "[srt source] "+format, args...) +} + +// run implements sourceStaticImpl. +func (s *srtSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan *conf.PathConf) error { + s.Log(logger.Debug, "connecting") + + conf := srt.DefaultConfig() + address, err := conf.UnmarshalURL(cnf.Source) + if err != nil { + return err + } + + err = conf.Validate() + if err != nil { + return err + } + + sconn, err := srt.Dial("srt", address, conf) + if err != nil { + return err + } + + readDone := make(chan error) + go func() { + readDone <- s.runReader(sconn) + }() + + for { + select { + case err := <-readDone: + sconn.Close() + return err + + case <-reloadConf: + + case <-ctx.Done(): + sconn.Close() + <-readDone + return nil + } + } +} + +func (s *srtSource) runReader(sconn srt.Conn) error { + sconn.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + r, err := mpegts.NewReader(mpegts.NewBufferedReader(sconn)) + if err != nil { + return err + } + + var medias media.Medias + var stream *stream.Stream + + var td *mpegts.TimeDecoder + decodeTime := func(t int64) time.Duration { + if td == nil { + td = mpegts.NewTimeDecoder(t) + } + return td.Decode(t) + } + + for _, track := range r.Tracks() { //nolint:dupl + var medi *media.Media + + switch tcodec := track.Codec.(type) { + case *mpegts.CodecH264: + medi = &media.Media{ + Type: media.TypeVideo, + Formats: []formats.Format{&formats.H264{ + PayloadTyp: 96, + PacketizationMode: 1, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH264{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AU: au, + }) + return nil + }) + + case *mpegts.CodecH265: + medi = &media.Media{ + Type: media.TypeVideo, + Formats: []formats.Format{&formats.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitH265{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AU: au, + }) + return nil + }) + + case *mpegts.CodecMPEG4Audio: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.MPEG4Audio{ + PayloadTyp: 96, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + Config: &tcodec.Config, + }}, + } + + r.OnDataMPEG4Audio(track, func(pts int64, _ int64, aus [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitMPEG4AudioGeneric{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + AUs: aus, + }) + return nil + }) + + case *mpegts.CodecOpus: + medi = &media.Media{ + Type: media.TypeAudio, + Formats: []formats.Format{&formats.Opus{ + PayloadTyp: 96, + IsStereo: (tcodec.ChannelCount == 2), + }}, + } + + r.OnDataOpus(track, func(pts int64, _ int64, packets [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &formatprocessor.UnitOpus{ + BaseUnit: formatprocessor.BaseUnit{ + NTP: time.Now(), + }, + PTS: decodeTime(pts), + Packets: packets, + }) + return nil + }) + } + + medias = append(medias, medi) + } + + res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{ + medias: medias, + generateRTPPackets: true, + }) + if res.err != nil { + return res.err + } + + s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias)) + + stream = res.stream + + for { + sconn.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeout))) + err := r.Read() + if err != nil { + return err + } + } +} + +// apiSourceDescribe implements sourceStaticImpl. +func (*srtSource) apiSourceDescribe() pathAPISourceOrReader { + return pathAPISourceOrReader{ + Type: "srtSource", + ID: "", + } +} diff --git a/internal/core/srt_source_test.go b/internal/core/srt_source_test.go new file mode 100644 index 00000000..b85ec42b --- /dev/null +++ b/internal/core/srt_source_test.go @@ -0,0 +1,98 @@ +package core + +import ( + "bufio" + "testing" + + "github.com/bluenviron/gortsplib/v3" + "github.com/bluenviron/gortsplib/v3/pkg/url" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/datarhei/gosrt" + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func TestSRTSource(t *testing.T) { + ln, err := srt.Listen("srt", "localhost:9999", srt.DefaultConfig()) + require.NoError(t, err) + defer ln.Close() + + connected := make(chan struct{}) + received := make(chan struct{}) + done := make(chan struct{}) + + go func() { + conn, _, err := ln.Accept(func(req srt.ConnRequest) srt.ConnType { + require.Equal(t, "sidname", req.StreamId()) + + err := req.SetPassphrase("ttest1234567") + if err != nil { + return srt.REJECT + } + + return srt.SUBSCRIBE + }) + require.NoError(t, err) + require.NotNil(t, conn) + defer conn.Close() + + track := &mpegts.Track{ + PID: 256, + Codec: &mpegts.CodecH264{}, + } + + bw := bufio.NewWriter(conn) + w := mpegts.NewWriter(bw, []*mpegts.Track{track}) + require.NoError(t, err) + + err = w.WriteH26x(track, 0, 0, true, [][]byte{ + { // IDR + 0x05, 1, + }, + }) + require.NoError(t, err) + bw.Flush() + + <-connected + + err = w.WriteH26x(track, 0, 0, true, [][]byte{{5, 2}}) + require.NoError(t, err) + bw.Flush() + + <-done + }() + + p, ok := newInstance("paths:\n" + + " proxied:\n" + + " source: srt://localhost:9999?streamid=sidname&passphrase=ttest1234567\n" + + " sourceOnDemand: yes\n") + require.Equal(t, true, ok) + defer p.Close() + + c := gortsplib.Client{} + + u, err := url.Parse("rtsp://127.0.0.1:8554/proxied") + require.NoError(t, err) + + err = c.Start(u.Scheme, u.Host) + require.NoError(t, err) + defer c.Close() + + medias, baseURL, _, err := c.Describe(u) + require.NoError(t, err) + + err = c.SetupAll(medias, baseURL) + require.NoError(t, err) + + c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) { + require.Equal(t, []byte{5, 1}, pkt.Payload) + close(received) + }) + + _, err = c.Play(nil) + require.NoError(t, err) + + close(connected) + <-received + close(done) +} diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index aebaf512..b57c684e 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -93,15 +93,6 @@ type webRTCManagerAPISessionsListReq struct { res chan webRTCManagerAPISessionsListRes } -type webRTCManagerAPISessionsKickRes struct { - err error -} - -type webRTCManagerAPISessionsKickReq struct { - uuid uuid.UUID - res chan webRTCManagerAPISessionsKickRes -} - type webRTCManagerAPISessionsGetRes struct { data *apiWebRTCSession err error @@ -112,6 +103,15 @@ type webRTCManagerAPISessionsGetReq struct { res chan webRTCManagerAPISessionsGetRes } +type webRTCManagerAPISessionsKickRes struct { + err error +} + +type webRTCManagerAPISessionsKickReq struct { + uuid uuid.UUID + res chan webRTCManagerAPISessionsKickRes +} + type webRTCNewSessionRes struct { sx *webRTCSession answer []byte diff --git a/mediamtx.yml b/mediamtx.yml index 4a9cfef6..575598a2 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -226,6 +226,14 @@ webrtcICEUDPMuxAddress: # optimal for WebRTC. webrtcICETCPMuxAddress: +############################################### +# SRT parameters + +# Enables support for the SRT protocol. +srt: yes +# Address of the SRT listener. +srtAddress: :8890 + ############################################### # Path parameters @@ -238,14 +246,15 @@ webrtcICETCPMuxAddress: paths: all: # Source of the stream. This can be: - # * publisher -> the stream is published by a RTSP or RTMP client + # * publisher -> the stream is published by a RTSP, RTMP, WebRTC or SRT client # * rtsp://existing-url -> the stream is pulled from another RTSP server / camera # * rtsps://existing-url -> the stream is pulled from another RTSP server / camera with RTSPS # * rtmp://existing-url -> the stream is pulled from another RTMP server / camera # * rtmps://existing-url -> the stream is pulled from another RTMP server / camera with RTMPS # * http://existing-url/stream.m3u8 -> the stream is pulled from another HLS server # * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS - # * udp://ip:port -> the stream is pulled from UDP, by listening on the specified IP and port + # * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port + # * srt://existing-url -> the stream is pulled from another SRT server # * redirect -> the stream is provided by another path or server # * rpiCamera -> the stream is provided by a Raspberry Pi Camera source: publisher