Browse Source

support proxying WebRTC streams (#2142)

pull/2148/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
bc3084ae7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      README.md
  2. 1
      apidocs/openapi.yaml
  3. 24
      internal/conf/path.go
  4. 12
      internal/core/core.go
  5. 108
      internal/core/hls_manager_test.go
  6. 11
      internal/core/hls_source.go
  7. 7
      internal/core/hls_source_test.go
  8. 294
      internal/core/path.go
  9. 11
      internal/core/rpicamera_source.go
  10. 12
      internal/core/rtmp_source.go
  11. 7
      internal/core/rtmp_source_test.go
  12. 12
      internal/core/rtsp_source.go
  13. 14
      internal/core/rtsp_source_test.go
  14. 23
      internal/core/source_static.go
  15. 10
      internal/core/srt_source.go
  16. 8
      internal/core/srt_source_test.go
  17. 12
      internal/core/udp_source.go
  18. 142
      internal/core/webrtc_http_server.go
  19. 178
      internal/core/webrtc_manager.go
  20. 195
      internal/core/webrtc_manager_test.go
  21. 279
      internal/core/webrtc_pc.go
  22. 214
      internal/core/webrtc_session.go
  23. 175
      internal/core/webrtc_source.go
  24. 188
      internal/core/webrtc_source_test.go
  25. 93
      internal/highleveltests/hls_manager_test.go
  26. 182
      internal/highleveltests/hls_server_test.go
  27. 196
      internal/webrtcpc/pc.go
  28. 18
      internal/websocket/serverconn_test.go
  29. 33
      internal/whip/get_ice_servers.go
  30. 83
      internal/whip/ice_fragment.go
  31. 208
      internal/whip/ice_fragment_test.go
  32. 66
      internal/whip/link_header.go
  33. 52
      internal/whip/link_header_test.go
  34. 46
      internal/whip/post_candidate.go
  35. 76
      internal/whip/post_offer.go
  36. 2
      mediamtx.yml

13
README.md

@ -23,6 +23,7 @@ Live streams can be published to the server with: @@ -23,6 +23,7 @@ Live streams can be published to the server with:
|[SRT clients](#srt-clients)||H265, H264|Opus, MPEG-4 Audio (AAC)|
|[SRT servers](#srt-servers)||H265, H264|Opus, MPEG-4 Audio (AAC)|
|[WebRTC clients](#webrtc-clients)|Browser-based, WHIP|AV1, VP9, VP8, H264|Opus, G722, G711|
|[WebRTC servers](#webrtc-servers)|WHEP|AV1, VP9, VP8, H264|Opus, G722, G711|
|[RTSP clients](#rtsp-clients)|UDP, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec|
|[RTSP cameras and servers](#rtsp-cameras-and-servers)|UDP, UDP-Multicast, TCP, RTSPS|AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), MPEG-1/2 Video, M-JPEG and any RTP-compatible codec|Opus, MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3), G726, G722, G711, LPCM and any RTP-compatible codec|
|[RTMP clients](#rtmp-clients)|RTMP, RTMPS, Enhanced RTMP|AV1, H265, H264|MPEG-4 Audio (AAC), MPEG-1/2 Audio (MP3)|
@ -82,6 +83,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi @@ -82,6 +83,7 @@ _rtsp-simple-server_ has been rebranded as _MediaMTX_. The reason is pretty obvi
* [SRT clients](#srt-clients)
* [SRT servers](#srt-servers)
* [WebRTC clients](#webrtc-clients)
* [WebRTC servers](#webrtc-servers)
* [RTSP clients](#rtsp-clients)
* [RTSP cameras and servers](#rtsp-cameras-and-servers)
* [RTMP clients](#rtmp-clients)
@ -593,6 +595,17 @@ Depending on the network it may be difficult to establish a connection between s @@ -593,6 +595,17 @@ Depending on the network it may be difficult to establish a connection between s
Known clients that can publish with WebRTC and WHIP are [FFmpeg](#ffmpeg), [Gstreamer](#gstreamer), [OBS Studio](#obs-studio).
#### WebRTC servers
In order to ingest into the server a WebRTC stream from an existing server, add the corresponding WHEP URL into the `source` parameter of a path:
```yml
paths:
proxied:
# url of the source stream, in the format whep://host:port/path (HTTP) or wheps:// (HTTPS)
source: wheps://host:port/path
```
#### RTSP clients
RTSP is a protocol that allows to publish and read streams. It supports different underlying transport protocols and allows to encrypt streams in transit (see [RTSP-specific features](#rtsp-specific-features)). In order to publish a stream to the server with the RTSP protocol, use this URL:

1
apidocs/openapi.yaml

@ -384,6 +384,7 @@ components: @@ -384,6 +384,7 @@ components:
- srtSource
- udpSource
- webRTCSession
- webRTCSource
id:
type: string

24
internal/conf/path.go

@ -149,7 +149,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -149,7 +149,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
_, err := url.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid RTSP URL", pconf.Source)
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
case strings.HasPrefix(pconf.Source, "rtmp://") ||
@ -160,7 +160,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -160,7 +160,7 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
u, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid RTMP URL", pconf.Source)
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
if u.User != nil {
@ -180,10 +180,10 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -180,10 +180,10 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
u, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source)
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
if u.Scheme != "http" && u.Scheme != "https" {
return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source)
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
if u.User != nil {
@ -217,7 +217,19 @@ func (pconf *PathConf) check(conf *Conf, name string) error { @@ -217,7 +217,19 @@ func (pconf *PathConf) check(conf *Conf, name string) error {
_, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid HLS URL", pconf.Source)
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
case strings.HasPrefix(pconf.Source, "whep://") ||
strings.HasPrefix(pconf.Source, "wheps://"):
if pconf.Regexp != nil {
return fmt.Errorf("a path with a regular expression (or path 'all') " +
"cannot have a WebRTC/WHEP source. use another path")
}
_, err := gourl.Parse(pconf.Source)
if err != nil {
return fmt.Errorf("'%s' is not a valid URL", pconf.Source)
}
case pconf.Source == "redirect":
@ -348,6 +360,8 @@ func (pconf PathConf) HasStaticSource() bool { @@ -348,6 +360,8 @@ func (pconf PathConf) HasStaticSource() bool {
strings.HasPrefix(pconf.Source, "https://") ||
strings.HasPrefix(pconf.Source, "udp://") ||
strings.HasPrefix(pconf.Source, "srt://") ||
strings.HasPrefix(pconf.Source, "whep://") ||
strings.HasPrefix(pconf.Source, "wheps://") ||
pconf.Source == "rpiCamera"
}

12
internal/core/core.go

@ -420,12 +420,12 @@ func (p *Core) createResources(initial bool) error { @@ -420,12 +420,12 @@ func (p *Core) createResources(initial bool) error {
p.conf.WebRTCICEServers2,
p.conf.ReadTimeout,
p.conf.ReadBufferCount,
p.pathManager,
p.metrics,
p,
p.conf.WebRTCICEHostNAT1To1IPs,
p.conf.WebRTCICEUDPMuxAddress,
p.conf.WebRTCICETCPMuxAddress,
p.pathManager,
p.metrics,
p,
)
if err != nil {
return err
@ -608,11 +608,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { @@ -608,11 +608,11 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) {
!reflect.DeepEqual(newConf.WebRTCICEServers2, p.conf.WebRTCICEServers2) ||
newConf.ReadTimeout != p.conf.ReadTimeout ||
newConf.ReadBufferCount != p.conf.ReadBufferCount ||
closeMetrics ||
closePathManager ||
!reflect.DeepEqual(newConf.WebRTCICEHostNAT1To1IPs, p.conf.WebRTCICEHostNAT1To1IPs) ||
newConf.WebRTCICEUDPMuxAddress != p.conf.WebRTCICEUDPMuxAddress ||
newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress
newConf.WebRTCICETCPMuxAddress != p.conf.WebRTCICETCPMuxAddress ||
closeMetrics ||
closePathManager
closeSRTServer := newConf == nil ||
newConf.SRT != p.conf.SRT ||

108
internal/core/hls_manager_test.go

@ -12,80 +12,72 @@ import ( @@ -12,80 +12,72 @@ import (
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/gin-gonic/gin"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
type testHTTPAuthenticator struct {
protocol string
action string
s *http.Server
firstReceived bool
*http.Server
}
func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator {
ln, err := net.Listen("tcp", "127.0.0.1:9120")
require.NoError(t, err)
ts := &testHTTPAuthenticator{
protocol: protocol,
action: action,
firstReceived := false
ts := &testHTTPAuthenticator{}
ts.Server = &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "/auth", r.URL.Path)
var in struct {
IP string `json:"ip"`
User string `json:"user"`
Password string `json:"password"`
Path string `json:"path"`
Protocol string `json:"protocol"`
ID string `json:"id"`
Action string `json:"action"`
Query string `json:"query"`
}
err := json.NewDecoder(r.Body).Decode(&in)
require.NoError(t, err)
var user string
if action == "publish" {
user = "testpublisher"
} else {
user = "testreader"
}
if in.IP != "127.0.0.1" ||
in.User != user ||
in.Password != "testpass" ||
in.Path != "teststream" ||
in.Protocol != protocol ||
(firstReceived && in.ID == "") ||
in.Action != action ||
(in.Query != "user=testreader&pass=testpass&param=value" &&
in.Query != "user=testpublisher&pass=testpass&param=value" &&
in.Query != "param=value") {
w.WriteHeader(http.StatusBadRequest)
return
}
firstReceived = true
}),
}
router := gin.New()
router.POST("/auth", ts.onAuth)
ln, err := net.Listen("tcp", "127.0.0.1:9120")
require.NoError(t, err)
ts.s = &http.Server{Handler: router}
go ts.s.Serve(ln)
go ts.Server.Serve(ln)
return ts
}
func (ts *testHTTPAuthenticator) close() {
ts.s.Shutdown(context.Background())
}
func (ts *testHTTPAuthenticator) onAuth(ctx *gin.Context) {
var in struct {
IP string `json:"ip"`
User string `json:"user"`
Password string `json:"password"`
Path string `json:"path"`
Protocol string `json:"protocol"`
ID string `json:"id"`
Action string `json:"action"`
Query string `json:"query"`
}
err := json.NewDecoder(ctx.Request.Body).Decode(&in)
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
var user string
if ts.action == "publish" {
user = "testpublisher"
} else {
user = "testreader"
}
if in.IP != "127.0.0.1" ||
in.User != user ||
in.Password != "testpass" ||
in.Path != "teststream" ||
in.Protocol != ts.protocol ||
(ts.firstReceived && in.ID == "") ||
in.Action != ts.action ||
(in.Query != "user=testreader&pass=testpass&param=value" &&
in.Query != "user=testpublisher&pass=testpass&param=value" &&
in.Query != "param=value") {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
ts.firstReceived = true
ts.Server.Shutdown(context.Background())
}
func httpPullFile(t *testing.T, hc *http.Client, u string) []byte {

11
internal/core/hls_source.go

@ -18,8 +18,8 @@ import ( @@ -18,8 +18,8 @@ import (
type hlsSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type hlsSource struct {
@ -35,7 +35,7 @@ func newHLSSource( @@ -35,7 +35,7 @@ func newHLSSource(
}
func (s *hlsSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[hls source] "+format, args...)
s.parent.Log(level, "[HLS source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -44,7 +44,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -44,7 +44,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
defer func() {
if stream != nil {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
}
}()
@ -163,7 +163,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -163,7 +163,7 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
medias = append(medias, medi)
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
@ -171,7 +171,6 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan @@ -171,7 +171,6 @@ func (s *hlsSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf chan
return res.err
}
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
stream = res.stream
return nil

7
internal/core/hls_source_test.go

@ -171,10 +171,13 @@ func TestHLSSource(t *testing.T) { @@ -171,10 +171,13 @@ func TestHLSSource(t *testing.T) {
},
}, medias)
err = c.SetupAll(medias, baseURL)
var forma *formats.H264
medi := medias.FindFormat(&forma)
_, err = c.Setup(medi, baseURL, 0, 0)
require.NoError(t, err)
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,

294
internal/core/path.go

@ -331,196 +331,198 @@ func (pa *path) run() { @@ -331,196 +331,198 @@ func (pa *path) run() {
})
}
err := func() error {
for {
select {
case <-pa.onDemandStaticSourceReadyTimer.C:
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
err := pa.runInner()
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
// call before destroying context
pa.parent.closePath(pa)
pa.onDemandStaticSourceStop()
pa.ctxCancel()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherCloseTimer.Stop()
case <-pa.onDemandStaticSourceCloseTimer.C:
pa.setNotReady()
pa.onDemandStaticSourceStop()
if onInitCmd != nil {
onInitCmd.Close()
pa.Log(logger.Info, "runOnInit command stopped")
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
}
case <-pa.onDemandPublisherReadyTimer.C:
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("terminated")}
}
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
if pa.stream != nil {
pa.setNotReady()
}
pa.onDemandStopPublisher()
if pa.source != nil {
if source, ok := pa.source.(*sourceStatic); ok {
source.close()
} else if source, ok := pa.source.(publisher); ok {
source.close()
}
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.Log(logger.Info, "runOnDemand command stopped")
}
case <-pa.onDemandPublisherCloseTimer.C:
pa.onDemandStopPublisher()
pa.Log(logger.Debug, "destroyed (%v)", err)
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
func (pa *path) runInner() error {
for {
select {
case <-pa.onDemandStaticSourceReadyTimer.C:
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
case newConf := <-pa.chReloadConf:
if pa.conf.HasStaticSource() {
go pa.source.(*sourceStatic).reloadConf(newConf)
}
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
case req := <-pa.chSourceStaticSetReady:
err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
} else {
if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
pa.onDemandStaticSourceScheduleClose()
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
pa.onDemandStaticSourceStop()
for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
}
case <-pa.onDemandStaticSourceCloseTimer.C:
pa.setNotReady()
pa.onDemandStaticSourceStop()
case req := <-pa.chSourceStaticSetNotReady:
pa.setNotReady()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
// send response before calling onDemandStaticSourceStop()
// in order to avoid a deadlock due to sourceStatic.stop()
close(req.res)
case <-pa.onDemandPublisherReadyTimer.C:
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.describeRequestsOnHold = nil
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("source of path '%s' has timed out", pa.name)}
}
pa.readerAddRequestsOnHold = nil
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
pa.onDemandStopPublisher()
case req := <-pa.chDescribe:
pa.handleDescribe(req)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case <-pa.onDemandPublisherCloseTimer.C:
pa.onDemandStopPublisher()
case req := <-pa.chRemovePublisher:
pa.handleRemovePublisher(req)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case newConf := <-pa.chReloadConf:
if pa.conf.HasStaticSource() {
go pa.source.(*sourceStatic).reloadConf(newConf)
}
pa.confMutex.Lock()
pa.conf = newConf
pa.confMutex.Unlock()
case req := <-pa.chAddPublisher:
pa.handleAddPublisher(req)
case req := <-pa.chSourceStaticSetReady:
err := pa.setReady(req.medias, req.generateRTPPackets)
if err != nil {
req.res <- pathSourceStaticSetReadyRes{err: err}
} else {
if pa.conf.HasOnDemandStaticSource() {
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceReadyTimer = newEmptyTimer()
case req := <-pa.chStartPublisher:
pa.handleStartPublisher(req)
pa.onDemandStaticSourceScheduleClose()
case req := <-pa.chStopPublisher:
pa.handleStopPublisher(req)
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{
stream: pa.stream,
}
}
pa.describeRequestsOnHold = nil
if pa.shouldClose() {
return fmt.Errorf("not in use")
for _, req := range pa.readerAddRequestsOnHold {
pa.handleAddReaderPost(req)
}
pa.readerAddRequestsOnHold = nil
}
case req := <-pa.chAddReader:
pa.handleAddReader(req)
req.res <- pathSourceStaticSetReadyRes{stream: pa.stream}
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chSourceStaticSetNotReady:
pa.setNotReady()
case req := <-pa.chRemoveReader:
pa.handleRemoveReader(req)
// send response before calling onDemandStaticSourceStop()
// in order to avoid a deadlock due to sourceStatic.stop()
close(req.res)
case req := <-pa.chAPIPathsGet:
pa.handleAPIPathsGet(req)
if pa.conf.HasOnDemandStaticSource() && pa.onDemandStaticSourceState != pathOnDemandStateInitial {
pa.onDemandStaticSourceStop()
}
case <-pa.ctx.Done():
return fmt.Errorf("terminated")
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
}
}()
// call before destroying context
pa.parent.closePath(pa)
case req := <-pa.chDescribe:
pa.handleDescribe(req)
pa.ctxCancel()
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
pa.onDemandStaticSourceReadyTimer.Stop()
pa.onDemandStaticSourceCloseTimer.Stop()
pa.onDemandPublisherReadyTimer.Stop()
pa.onDemandPublisherCloseTimer.Stop()
case req := <-pa.chRemovePublisher:
pa.handleRemovePublisher(req)
if onInitCmd != nil {
onInitCmd.Close()
pa.Log(logger.Info, "runOnInit command stopped")
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
for _, req := range pa.describeRequestsOnHold {
req.res <- pathDescribeRes{err: fmt.Errorf("terminated")}
}
case req := <-pa.chAddPublisher:
pa.handleAddPublisher(req)
for _, req := range pa.readerAddRequestsOnHold {
req.res <- pathAddReaderRes{err: fmt.Errorf("terminated")}
}
case req := <-pa.chStartPublisher:
pa.handleStartPublisher(req)
if pa.stream != nil {
pa.setNotReady()
}
case req := <-pa.chStopPublisher:
pa.handleStopPublisher(req)
if pa.source != nil {
if source, ok := pa.source.(*sourceStatic); ok {
source.close()
} else if source, ok := pa.source.(publisher); ok {
source.close()
}
}
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
if pa.onDemandCmd != nil {
pa.onDemandCmd.Close()
pa.Log(logger.Info, "runOnDemand command stopped")
}
case req := <-pa.chAddReader:
pa.handleAddReader(req)
pa.Log(logger.Debug, "destroyed (%v)", err)
if pa.shouldClose() {
return fmt.Errorf("not in use")
}
case req := <-pa.chRemoveReader:
pa.handleRemoveReader(req)
case req := <-pa.chAPIPathsGet:
pa.handleAPIPathsGet(req)
case <-pa.ctx.Done():
return fmt.Errorf("terminated")
}
}
}
func (pa *path) shouldClose() bool {

11
internal/core/rpicamera_source.go

@ -53,8 +53,8 @@ func paramsFromConf(cnf *conf.PathConf) rpicamera.Params { @@ -53,8 +53,8 @@ func paramsFromConf(cnf *conf.PathConf) rpicamera.Params {
type rpiCameraSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type rpiCameraSource struct {
@ -70,7 +70,7 @@ func newRPICameraSource( @@ -70,7 +70,7 @@ func newRPICameraSource(
}
func (s *rpiCameraSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rpicamera source] "+format, args...)
s.parent.Log(level, "[RPI Camera source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -87,7 +87,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -87,7 +87,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
onData := func(dts time.Duration, au [][]byte) {
if stream == nil {
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
@ -95,7 +95,6 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -95,7 +95,6 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
return
}
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
stream = res.stream
}
@ -116,7 +115,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon @@ -116,7 +115,7 @@ func (s *rpiCameraSource) run(ctx context.Context, cnf *conf.PathConf, reloadCon
defer func() {
if stream != nil {
s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
}
}()

12
internal/core/rtmp_source.go

@ -20,8 +20,8 @@ import ( @@ -20,8 +20,8 @@ import (
type rtmpSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtmpSource struct {
@ -43,7 +43,7 @@ func newRTMPSource( @@ -43,7 +43,7 @@ func newRTMPSource(
}
func (s *rtmpSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtmp source] "+format, args...)
s.parent.Log(level, "[RTMP source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -173,7 +173,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error { @@ -173,7 +173,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error {
}
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
@ -181,9 +181,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error { @@ -181,9 +181,7 @@ func (s *rtmpSource) runReader(u *url.URL, nconn net.Conn) error {
return res.err
}
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
stream = res.stream

7
internal/core/rtmp_source_test.go

@ -119,10 +119,13 @@ func TestRTMPSource(t *testing.T) { @@ -119,10 +119,13 @@ func TestRTMPSource(t *testing.T) {
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
var forma *formats.H264
medi := medias.FindFormat(&forma)
_, err = c.Setup(medi, baseURL, 0, 0)
require.NoError(t, err)
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, []byte{
0x18, 0x0, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9,
0x0, 0x78, 0x2, 0x27, 0xe5, 0x84, 0x0, 0x0,

12
internal/core/rtsp_source.go

@ -61,8 +61,8 @@ func createRangeHeader(cnf *conf.PathConf) (*headers.Range, error) { @@ -61,8 +61,8 @@ func createRangeHeader(cnf *conf.PathConf) (*headers.Range, error) {
type rtspSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type rtspSource struct {
@ -87,7 +87,7 @@ func newRTSPSource( @@ -87,7 +87,7 @@ func newRTSPSource(
}
func (s *rtspSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[rtsp source] "+format, args...)
s.parent.Log(level, "[RTSP source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -142,7 +142,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -142,7 +142,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
return err
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: false,
})
@ -150,9 +150,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha @@ -150,9 +150,7 @@ func (s *rtspSource) run(ctx context.Context, cnf *conf.PathConf, reloadConf cha
return res.err
}
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
for _, medi := range medias {
for _, forma := range medi.Formats {

14
internal/core/rtsp_source_test.go

@ -9,6 +9,7 @@ import ( @@ -9,6 +9,7 @@ import (
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/auth"
"github.com/bluenviron/gortsplib/v3/pkg/base"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/pion/rtp"
@ -41,8 +42,8 @@ func TestRTSPSource(t *testing.T) { @@ -41,8 +42,8 @@ func TestRTSPSource(t *testing.T) {
"tls",
} {
t.Run(source, func(t *testing.T) {
medi := testMediaH264
stream := gortsplib.NewServerStream(media.Medias{medi})
serverMedia := testMediaH264
stream := gortsplib.NewServerStream(media.Medias{serverMedia})
nonce, err := auth.GenerateNonce2()
require.NoError(t, err)
@ -73,7 +74,7 @@ func TestRTSPSource(t *testing.T) { @@ -73,7 +74,7 @@ func TestRTSPSource(t *testing.T) {
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(1 * time.Second)
stream.WritePacketRTP(medi, &rtp.Packet{
stream.WritePacketRTP(serverMedia, &rtp.Packet{
Header: rtp.Header{
Version: 0x02,
PayloadType: 96,
@ -151,10 +152,13 @@ func TestRTSPSource(t *testing.T) { @@ -151,10 +152,13 @@ func TestRTSPSource(t *testing.T) {
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
var forma *formats.H264
medi := medias.FindFormat(&forma)
_, err = c.Setup(medi, baseURL, 0, 0)
require.NoError(t, err)
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, pkt.Payload)
close(received)
})

23
internal/core/source_static.go

@ -91,6 +91,12 @@ func newSourceStatic( @@ -91,6 +91,12 @@ func newSourceStatic(
readTimeout,
s)
case strings.HasPrefix(cnf.Source, "whep://") ||
strings.HasPrefix(cnf.Source, "wheps://"):
s.impl = newWebRTCSource(
readTimeout,
s)
case cnf.Source == "rpiCamera":
s.impl = newRPICameraSource(
s)
@ -210,19 +216,26 @@ func (s *sourceStatic) apiSourceDescribe() pathAPISourceOrReader { @@ -210,19 +216,26 @@ func (s *sourceStatic) apiSourceDescribe() pathAPISourceOrReader {
return s.impl.apiSourceDescribe()
}
// sourceStaticImplSetReady is called by a sourceStaticImpl.
func (s *sourceStatic) sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
// setReady is called by a sourceStaticImpl.
func (s *sourceStatic) setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes {
req.res = make(chan pathSourceStaticSetReadyRes)
select {
case s.chSourceStaticImplSetReady <- req:
return <-req.res
res := <-req.res
if res.err == nil {
s.impl.Log(logger.Info, "ready: %s", sourceMediaInfo(req.medias))
}
return res
case <-s.ctx.Done():
return pathSourceStaticSetReadyRes{err: fmt.Errorf("terminated")}
}
}
// sourceStaticImplSetNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq) {
// setNotReady is called by a sourceStaticImpl.
func (s *sourceStatic) setNotReady(req pathSourceStaticSetNotReadyReq) {
req.res = make(chan struct{})
select {
case s.chSourceStaticImplSetNotReady <- req:

10
internal/core/srt_source.go

@ -17,8 +17,8 @@ import ( @@ -17,8 +17,8 @@ import (
type srtSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type srtSource struct {
@ -39,7 +39,7 @@ func newSRTSource( @@ -39,7 +39,7 @@ func newSRTSource(
}
func (s *srtSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[srt source] "+format, args...)
s.parent.Log(level, "[SRT source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -191,7 +191,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error { @@ -191,7 +191,7 @@ func (s *srtSource) runReader(sconn srt.Conn) error {
medias = append(medias, medi)
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
@ -199,8 +199,6 @@ func (s *srtSource) runReader(sconn srt.Conn) error { @@ -199,8 +199,6 @@ func (s *srtSource) runReader(sconn srt.Conn) error {
return res.err
}
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
stream = res.stream
for {

8
internal/core/srt_source_test.go

@ -5,6 +5,7 @@ import ( @@ -5,6 +5,7 @@ import (
"testing"
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/datarhei/gosrt"
@ -81,10 +82,13 @@ func TestSRTSource(t *testing.T) { @@ -81,10 +82,13 @@ func TestSRTSource(t *testing.T) {
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
err = c.SetupAll(medias, baseURL)
var forma *formats.H264
medi := medias.FindFormat(&forma)
_, err = c.Setup(medi, baseURL, 0, 0)
require.NoError(t, err)
c.OnPacketRTP(medias[0], medias[0].Formats[0], func(pkt *rtp.Packet) {
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, []byte{5, 1}, pkt.Payload)
close(received)
})

12
internal/core/udp_source.go

@ -63,8 +63,8 @@ func (r *packetConnReader) Read(p []byte) (int, error) { @@ -63,8 +63,8 @@ func (r *packetConnReader) Read(p []byte) (int, error) {
type udpSourceParent interface {
logger.Writer
sourceStaticImplSetReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
sourceStaticImplSetNotReady(req pathSourceStaticSetNotReadyReq)
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type udpSource struct {
@ -83,7 +83,7 @@ func newUDPSource( @@ -83,7 +83,7 @@ func newUDPSource(
}
func (s *udpSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[udp source] "+format, args...)
s.parent.Log(level, "[UDP source] "+format, args...)
}
// run implements sourceStaticImpl.
@ -239,7 +239,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -239,7 +239,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
medias = append(medias, medi)
}
res := s.parent.sourceStaticImplSetReady(pathSourceStaticSetReadyReq{
res := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
@ -247,9 +247,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -247,9 +247,7 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
return res.err
}
defer s.parent.sourceStaticImplSetNotReady(pathSourceStaticSetNotReadyReq{})
s.Log(logger.Info, "ready: %s", sourceMediaInfo(medias))
defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
stream = res.stream

142
internal/core/webrtc_http_server.go

@ -2,24 +2,21 @@ package core @@ -2,24 +2,21 @@ package core
import (
_ "embed"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/whip"
)
//go:embed webrtc_publish_index.html
@ -28,137 +25,6 @@ var webrtcPublishIndex []byte @@ -28,137 +25,6 @@ var webrtcPublishIndex []byte
//go:embed webrtc_read_index.html
var webrtcReadIndex []byte
func quoteCredential(v string) string {
b, _ := json.Marshal(v)
s := string(b)
return s[1 : len(s)-1]
}
func unquoteCredential(v string) string {
var s string
json.Unmarshal([]byte("\""+v+"\""), &s)
return s
}
func iceServersToLinkHeader(iceServers []webrtc.ICEServer) []string {
ret := make([]string, len(iceServers))
for i, server := range iceServers {
link := "<" + server.URLs[0] + ">; rel=\"ice-server\""
if server.Username != "" {
link += "; username=\"" + quoteCredential(server.Username) + "\"" +
"; credential=\"" + quoteCredential(server.Credential.(string)) + "\"; credential-type=\"password\""
}
ret[i] = link
}
return ret
}
var reLink = regexp.MustCompile(`^<(.+?)>; rel="ice-server"(; username="(.+?)"` +
`; credential="(.+?)"; credential-type="password")?`)
func linkHeaderToIceServers(link []string) []webrtc.ICEServer {
var ret []webrtc.ICEServer
for _, li := range link {
m := reLink.FindStringSubmatch(li)
if m != nil {
s := webrtc.ICEServer{
URLs: []string{m[1]},
}
if m[3] != "" {
s.Username = unquoteCredential(m[3])
s.Credential = unquoteCredential(m[4])
s.CredentialType = webrtc.ICECredentialTypePassword
}
ret = append(ret, s)
}
}
return ret
}
func unmarshalICEFragment(buf []byte) ([]*webrtc.ICECandidateInit, error) {
buf = append([]byte("v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n"), buf...)
var sdp sdp.SessionDescription
err := sdp.Unmarshal(buf)
if err != nil {
return nil, err
}
usernameFragment, ok := sdp.Attribute("ice-ufrag")
if !ok {
return nil, fmt.Errorf("ice-ufrag attribute is missing")
}
var ret []*webrtc.ICECandidateInit
for _, media := range sdp.MediaDescriptions {
mid, ok := media.Attribute("mid")
if !ok {
return nil, fmt.Errorf("mid attribute is missing")
}
tmp, err := strconv.ParseUint(mid, 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid mid attribute")
}
midNum := uint16(tmp)
for _, attr := range media.Attributes {
if attr.Key == "candidate" {
ret = append(ret, &webrtc.ICECandidateInit{
Candidate: attr.Value,
SDPMid: &mid,
SDPMLineIndex: &midNum,
UsernameFragment: &usernameFragment,
})
}
}
}
return ret, nil
}
func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.ICECandidateInit) ([]byte, error) {
var sdp sdp.SessionDescription
err := sdp.Unmarshal([]byte(offer.SDP))
if err != nil || len(sdp.MediaDescriptions) == 0 {
return nil, err
}
firstMedia := sdp.MediaDescriptions[0]
iceUfrag, _ := firstMedia.Attribute("ice-ufrag")
icePwd, _ := firstMedia.Attribute("ice-pwd")
candidatesByMedia := make(map[uint16][]*webrtc.ICECandidateInit)
for _, candidate := range candidates {
mid := *candidate.SDPMLineIndex
candidatesByMedia[mid] = append(candidatesByMedia[mid], candidate)
}
frag := "a=ice-ufrag:" + iceUfrag + "\r\n" +
"a=ice-pwd:" + icePwd + "\r\n"
for mid, media := range sdp.MediaDescriptions {
cbm, ok := candidatesByMedia[uint16(mid)]
if ok {
frag += "m=" + media.MediaName.String() + "\r\n" +
"a=mid:" + strconv.FormatUint(uint64(mid), 10) + "\r\n"
for _, candidate := range cbm {
frag += "a=" + candidate.Candidate + "\r\n"
}
}
}
return []byte(frag), nil
}
type webRTCHTTPServerParent interface {
logger.Writer
generateICEServers() ([]webrtc.ICEServer, error)
@ -358,7 +224,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { @@ -358,7 +224,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.Header()["Link"] = iceServersToLinkHeader(servers)
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.WriteHeader(http.StatusNoContent)
case http.MethodPost:
@ -397,7 +263,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { @@ -397,7 +263,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("E-Tag", res.sx.secret.String())
ctx.Writer.Header().Set("ID", res.sx.uuid.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = iceServersToLinkHeader(servers)
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.Header().Set("Location", ctx.Request.URL.String())
ctx.Writer.WriteHeader(http.StatusCreated)
ctx.Writer.Write(res.answer)
@ -419,7 +285,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { @@ -419,7 +285,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
return
}
candidates, err := unmarshalICEFragment(byts)
candidates, err := whip.ICEFragmentUnmarshal(byts)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return

178
internal/core/webrtc_manager.go

@ -16,6 +16,7 @@ import ( @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
@ -25,12 +26,94 @@ import ( @@ -25,12 +26,94 @@ import (
const (
webrtcPauseAfterAuthError = 2 * time.Second
webrtcHandshakeTimeout = 10 * time.Second
webrtcTrackGatherTimeout = 5 * time.Second
webrtcTrackGatherTimeout = 3 * time.Second
webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header)
webrtcStreamID = "mediamtx"
webrtcTurnSecretExpiration = 24 * 3600 * time.Second
)
var videoCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
PayloadType: 96,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=0",
},
PayloadType: 97,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=1",
},
PayloadType: 98,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
PayloadType: 99,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
},
PayloadType: 100,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
},
PayloadType: 101,
},
}
var audioCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1",
},
PayloadType: 111,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: 8000,
},
PayloadType: 9,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
},
PayloadType: 0,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMA,
ClockRate: 8000,
},
PayloadType: 8,
},
}
func randInt63() (int64, error) {
var b [8]byte
_, err := rand.Read(b[:])
@ -84,6 +167,53 @@ func randomTurnUser() (string, error) { @@ -84,6 +167,53 @@ func randomTurnUser() (string, error) {
return string(b), nil
}
func webrtcNewAPI(
iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux,
iceTCPMux ice.TCPMux,
) (*webrtc.API, error) {
settingsEngine := webrtc.SettingEngine{}
if len(iceHostNAT1To1IPs) != 0 {
settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost)
}
if iceUDPMux != nil {
settingsEngine.SetICEUDPMux(iceUDPMux)
}
if iceTCPMux != nil {
settingsEngine.SetICETCPMux(iceTCPMux)
settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4})
}
mediaEngine := &webrtc.MediaEngine{}
for _, codec := range videoCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo)
if err != nil {
return nil, err
}
}
for _, codec := range audioCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio)
if err != nil {
return nil, err
}
}
interceptorRegistry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil {
return nil, err
}
return webrtc.NewAPI(
webrtc.WithSettingEngine(settingsEngine),
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(interceptorRegistry)), nil
}
type webRTCManagerAPISessionsListRes struct {
data *apiWebRTCSessionsList
err error
@ -154,16 +284,14 @@ type webRTCManager struct { @@ -154,16 +284,14 @@ type webRTCManager struct {
metrics *metrics
parent webRTCManagerParent
ctx context.Context
ctxCancel func()
httpServer *webRTCHTTPServer
udpMuxLn net.PacketConn
tcpMuxLn net.Listener
sessions map[*webRTCSession]struct{}
sessionsBySecret map[uuid.UUID]*webRTCSession
iceHostNAT1To1IPs []string
iceUDPMux ice.UDPMux
iceTCPMux ice.TCPMux
ctx context.Context
ctxCancel func()
httpServer *webRTCHTTPServer
udpMuxLn net.PacketConn
tcpMuxLn net.Listener
api *webrtc.API
sessions map[*webRTCSession]struct{}
sessionsBySecret map[uuid.UUID]*webRTCSession
// in
chNewSession chan webRTCNewSessionReq
@ -187,12 +315,12 @@ func newWebRTCManager( @@ -187,12 +315,12 @@ func newWebRTCManager(
iceServers []conf.WebRTCICEServer,
readTimeout conf.StringDuration,
readBufferCount int,
pathManager *pathManager,
metrics *metrics,
parent webRTCManagerParent,
iceHostNAT1To1IPs []string,
iceUDPMuxAddress string,
iceTCPMuxAddress string,
pathManager *pathManager,
metrics *metrics,
parent webRTCManagerParent,
) (*webRTCManager, error) {
ctx, ctxCancel := context.WithCancel(context.Background())
@ -206,7 +334,6 @@ func newWebRTCManager( @@ -206,7 +334,6 @@ func newWebRTCManager(
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
sessions: make(map[*webRTCSession]struct{}),
sessionsBySecret: make(map[uuid.UUID]*webRTCSession),
chNewSession: make(chan webRTCNewSessionReq),
@ -235,6 +362,8 @@ func newWebRTCManager( @@ -235,6 +362,8 @@ func newWebRTCManager(
return nil, err
}
var iceUDPMux ice.UDPMux
if iceUDPMuxAddress != "" {
m.udpMuxLn, err = net.ListenPacket(restrictNetwork("udp", iceUDPMuxAddress))
if err != nil {
@ -242,9 +371,11 @@ func newWebRTCManager( @@ -242,9 +371,11 @@ func newWebRTCManager(
ctxCancel()
return nil, err
}
m.iceUDPMux = webrtc.NewICEUDPMux(nil, m.udpMuxLn)
iceUDPMux = webrtc.NewICEUDPMux(nil, m.udpMuxLn)
}
var iceTCPMux ice.TCPMux
if iceTCPMuxAddress != "" {
m.tcpMuxLn, err = net.Listen(restrictNetwork("tcp", iceTCPMuxAddress))
if err != nil {
@ -253,7 +384,16 @@ func newWebRTCManager( @@ -253,7 +384,16 @@ func newWebRTCManager(
ctxCancel()
return nil, err
}
m.iceTCPMux = webrtc.NewICETCPMux(nil, m.tcpMuxLn, 8)
iceTCPMux = webrtc.NewICETCPMux(nil, m.tcpMuxLn, 8)
}
m.api, err = webrtcNewAPI(iceHostNAT1To1IPs, iceUDPMux, iceTCPMux)
if err != nil {
m.udpMuxLn.Close()
m.tcpMuxLn.Close()
m.httpServer.close()
ctxCancel()
return nil, err
}
str := "listener opened on " + address + " (HTTP)"
@ -297,11 +437,9 @@ outer: @@ -297,11 +437,9 @@ outer:
sx := newWebRTCSession(
m.ctx,
m.readBufferCount,
m.api,
req,
&wg,
m.iceHostNAT1To1IPs,
m.iceUDPMux,
m.iceTCPMux,
m.pathManager,
m,
)

195
internal/core/webrtc_manager_test.go

@ -2,9 +2,8 @@ package core @@ -2,9 +2,8 @@ package core
import (
"bytes"
"io"
"context"
"net/http"
"sync"
"testing"
"time"
@ -15,106 +14,22 @@ import ( @@ -15,106 +14,22 @@ import (
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
func whipGetICEServers(
t *testing.T,
hc *http.Client,
ur string,
) []webrtc.ICEServer {
req, err := http.NewRequest("OPTIONS", ur, nil)
require.NoError(t, err)
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNoContent, res.StatusCode)
link, ok := res.Header["Link"]
require.Equal(t, true, ok)
servers := linkHeaderToIceServers(link)
require.NotEqual(t, 0, len(servers))
return servers
}
func whipPostOffer(
t *testing.T,
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
) (*webrtc.SessionDescription, string) {
req, err := http.NewRequest("POST", ur, bytes.NewReader([]byte(offer.SDP)))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/sdp")
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusCreated, res.StatusCode)
require.Equal(t, "application/sdp", res.Header.Get("Content-Type"))
require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch"))
loc := req.URL.Path
if req.URL.RawQuery != "" {
loc += "?" + req.URL.RawQuery
}
require.Equal(t, loc, res.Header.Get("Location"))
link, ok := res.Header["Link"]
require.Equal(t, true, ok)
servers := linkHeaderToIceServers(link)
require.NotEqual(t, 0, len(servers))
etag := res.Header.Get("E-Tag")
require.NotEqual(t, "", etag)
require.NotEqual(t, "", res.Header.Get("ID"))
sdp, err := io.ReadAll(res.Body)
require.NoError(t, err)
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return answer, etag
}
func whipPostCandidate(
t *testing.T,
ur string,
offer *webrtc.SessionDescription,
etag string,
candidate *webrtc.ICECandidateInit,
) {
frag, err := marshalICEFragment(offer, []*webrtc.ICECandidateInit{candidate})
require.NoError(t, err)
req, err := http.NewRequest("PATCH", ur, bytes.NewReader(frag))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag")
req.Header.Set("If-Match", etag)
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/whip"
)
hc := &http.Client{Transport: &http.Transport{}}
type nilLogger struct{}
res, err := hc.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNoContent, res.StatusCode)
func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
type webRTCTestClient struct {
pc *webrtc.PeerConnection
pc *webrtcpc.PeerConnection
outgoingTrack1 *webrtc.TrackLocalStaticRTP
outgoingTrack2 *webrtc.TrackLocalStaticRTP
incomingTrack chan *webrtc.TrackRemote
closed chan struct{}
}
func newWebRTCTestClient(
@ -123,35 +38,16 @@ func newWebRTCTestClient( @@ -123,35 +38,16 @@ func newWebRTCTestClient(
ur string,
publish bool,
) *webRTCTestClient {
iceServers := whipGetICEServers(t, hc, ur)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: iceServers,
})
iceServers, err := whip.GetICEServers(context.Background(), hc, ur)
require.NoError(t, err)
connected := make(chan struct{})
closed := make(chan struct{})
var stateChangeMutex sync.Mutex
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
stateChangeMutex.Lock()
defer stateChangeMutex.Unlock()
select {
case <-closed:
return
default:
}
c := &webRTCTestClient{}
switch state {
case webrtc.PeerConnectionStateConnected:
close(connected)
api, err := webrtcNewAPI(nil, nil, nil)
require.NoError(t, err)
case webrtc.PeerConnectionStateClosed:
close(closed)
}
})
pc, err := webrtcpc.New(iceServers, api, nilLogger{})
require.NoError(t, err)
var outgoingTrack1 *webrtc.TrackLocalStaticRTP
var outgoingTrack2 *webrtc.TrackLocalStaticRTP
@ -198,31 +94,30 @@ func newWebRTCTestClient( @@ -198,31 +94,30 @@ func newWebRTCTestClient(
offer, err := pc.CreateOffer(nil)
require.NoError(t, err)
answer, etag := whipPostOffer(t, hc, ur, &offer)
// test adding additional candidates, even if it is not mandatory here
gatheringDone := make(chan struct{})
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
c := i.ToJSON()
whipPostCandidate(t, ur, &offer, etag, &c)
} else {
close(gatheringDone)
}
})
res, err := whip.PostOffer(context.Background(), hc, ur, &offer)
require.NoError(t, err)
err = pc.SetLocalDescription(offer)
require.NoError(t, err)
err = pc.SetRemoteDescription(*answer)
// test adding additional candidates, even if it is not mandatory here
outer:
for {
select {
case c := <-pc.NewLocalCandidate():
err := whip.PostCandidate(context.Background(), hc, ur, &offer, res.ETag, c)
require.NoError(t, err)
case <-pc.GatheringDone():
break outer
}
}
err = pc.SetRemoteDescription(*res.Answer)
require.NoError(t, err)
<-gatheringDone
<-connected
<-pc.Connected()
if publish {
time.Sleep(200 * time.Millisecond)
err := outgoingTrack1.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
@ -232,7 +127,7 @@ func newWebRTCTestClient( @@ -232,7 +127,7 @@ func newWebRTCTestClient(
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{1},
})
require.NoError(t, err)
@ -245,25 +140,22 @@ func newWebRTCTestClient( @@ -245,25 +140,22 @@ func newWebRTCTestClient(
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{2},
})
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
}
return &webRTCTestClient{
pc: pc,
outgoingTrack1: outgoingTrack1,
outgoingTrack2: outgoingTrack2,
incomingTrack: incomingTrack,
closed: closed,
}
c.pc = pc
c.outgoingTrack1 = outgoingTrack1
c.outgoingTrack2 = outgoingTrack2
c.incomingTrack = incomingTrack
return c
}
func (c *webRTCTestClient) close() {
c.pc.Close()
<-c.closed
}
func TestWebRTCRead(t *testing.T) {
@ -359,7 +251,7 @@ func TestWebRTCRead(t *testing.T) { @@ -359,7 +251,7 @@ func TestWebRTCRead(t *testing.T) {
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{3},
})
trak := <-c.incomingTrack
@ -370,13 +262,13 @@ func TestWebRTCRead(t *testing.T) { @@ -370,13 +262,13 @@ func TestWebRTCRead(t *testing.T) {
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 102,
PayloadType: 100,
SequenceNumber: pkt.SequenceNumber,
Timestamp: pkt.Timestamp,
SSRC: pkt.SSRC,
CSRC: []uint32{},
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
Payload: []byte{3},
}, pkt)
})
}
@ -390,7 +282,8 @@ func TestWebRTCReadNotFound(t *testing.T) { @@ -390,7 +282,8 @@ func TestWebRTCReadNotFound(t *testing.T) {
hc := &http.Client{Transport: &http.Transport{}}
iceServers := whipGetICEServers(t, hc, "http://localhost:8889/stream/whep")
iceServers, err := whip.GetICEServers(context.Background(), hc, "http://localhost:8889/stream/whep")
require.NoError(t, err)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: iceServers,
@ -525,7 +418,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -525,7 +418,7 @@ func TestWebRTCPublish(t *testing.T) {
received := make(chan struct{})
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, pkt.Payload)
require.Equal(t, []byte{3}, pkt.Payload)
close(received)
})
@ -541,7 +434,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -541,7 +434,7 @@ func TestWebRTCPublish(t *testing.T) {
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{0x05, 0x06, 0x07, 0x08},
Payload: []byte{3},
})
require.NoError(t, err)

279
internal/core/webrtc_pc.go

@ -1,279 +0,0 @@ @@ -1,279 +0,0 @@
package core
import (
"strconv"
"sync"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
var videoCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
PayloadType: 96,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=0",
},
PayloadType: 97,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=1",
},
PayloadType: 98,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
PayloadType: 99,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
},
PayloadType: 100,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
},
PayloadType: 101,
},
}
var audioCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1",
},
PayloadType: 111,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: 8000,
},
PayloadType: 9,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
},
PayloadType: 0,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMA,
ClockRate: 8000,
},
PayloadType: 8,
},
}
type peerConnection struct {
*webrtc.PeerConnection
stateChangeMutex sync.Mutex
localCandidateRecv chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
closed chan struct{}
gatheringDone chan struct{}
}
func newPeerConnection(
iceServers []webrtc.ICEServer,
iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux,
iceTCPMux ice.TCPMux,
log logger.Writer,
) (*peerConnection, error) {
configuration := webrtc.Configuration{ICEServers: iceServers}
settingsEngine := webrtc.SettingEngine{}
if len(iceHostNAT1To1IPs) != 0 {
settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost)
}
if iceUDPMux != nil {
settingsEngine.SetICEUDPMux(iceUDPMux)
}
if iceTCPMux != nil {
settingsEngine.SetICETCPMux(iceTCPMux)
settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4})
}
mediaEngine := &webrtc.MediaEngine{}
for _, codec := range videoCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo)
if err != nil {
return nil, err
}
}
for _, codec := range audioCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio)
if err != nil {
return nil, err
}
}
interceptorRegistry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil {
return nil, err
}
api := webrtc.NewAPI(
webrtc.WithSettingEngine(settingsEngine),
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(interceptorRegistry))
pc, err := api.NewPeerConnection(configuration)
if err != nil {
return nil, err
}
co := &peerConnection{
PeerConnection: pc,
localCandidateRecv: make(chan *webrtc.ICECandidateInit),
connected: make(chan struct{}),
disconnected: make(chan struct{}),
closed: make(chan struct{}),
gatheringDone: make(chan struct{}),
}
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
co.stateChangeMutex.Lock()
defer co.stateChangeMutex.Unlock()
select {
case <-co.closed:
return
default:
}
log.Log(logger.Debug, "peer connection state: "+state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.localCandidate(), co.remoteCandidate())
close(co.connected)
case webrtc.PeerConnectionStateDisconnected:
close(co.disconnected)
case webrtc.PeerConnectionStateClosed:
close(co.closed)
}
})
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
v := i.ToJSON()
select {
case co.localCandidateRecv <- &v:
case <-co.connected:
case <-co.closed:
}
} else {
close(co.gatheringDone)
}
})
return co, nil
}
func (co *peerConnection) close() {
co.PeerConnection.Close()
<-co.closed
}
func (co *peerConnection) localCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.LocalCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
func (co *peerConnection) remoteCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.RemoteCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
func (co *peerConnection) bytesReceived() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesReceived
}
}
}
return 0
}
func (co *peerConnection) bytesSent() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesSent
}
}
}
return 0
}

214
internal/core/webrtc_session.go

@ -13,11 +13,11 @@ import ( @@ -13,11 +13,11 @@ import (
"github.com/bluenviron/gortsplib/v3/pkg/media"
"github.com/bluenviron/gortsplib/v3/pkg/ringbuffer"
"github.com/google/uuid"
"github.com/pion/ice/v2"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
)
type trackRecvPair struct {
@ -25,7 +25,7 @@ type trackRecvPair struct { @@ -25,7 +25,7 @@ type trackRecvPair struct {
receiver *webrtc.RTPReceiver
}
func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias {
func webrtcMediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias {
ret := make(media.Medias, len(tracks))
for i, track := range tracks {
ret[i] = track.media
@ -33,7 +33,7 @@ func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias { @@ -33,7 +33,7 @@ func mediasOfOutgoingTracks(tracks []*webRTCOutgoingTrack) media.Medias {
return ret
}
func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias {
func webrtcMediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias {
ret := make(media.Medias, len(tracks))
for i, track := range tracks {
ret[i] = track.media
@ -41,9 +41,16 @@ func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias { @@ -41,9 +41,16 @@ func mediasOfIncomingTracks(tracks []*webRTCIncomingTrack) media.Medias {
return ret
}
func waitUntilConnected(
func whipOffer(body []byte) *webrtc.SessionDescription {
return &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(body),
}
}
func webrtcWaitUntilConnected(
ctx context.Context,
pc *peerConnection,
pc *webrtcpc.PeerConnection,
) error {
t := time.NewTimer(webrtcHandshakeTimeout)
defer t.Stop()
@ -54,7 +61,7 @@ outer: @@ -54,7 +61,7 @@ outer:
case <-t.C:
return fmt.Errorf("deadline exceeded while waiting connection")
case <-pc.connected:
case <-pc.Connected():
break outer
case <-ctx.Done():
@ -65,7 +72,7 @@ outer: @@ -65,7 +72,7 @@ outer:
return nil
}
func gatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) {
func webrtcGatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) {
var tracks []*webRTCOutgoingTrack
videoTrack, err := newWebRTCOutgoingTrackVideo(medias)
@ -94,9 +101,38 @@ func gatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) { @@ -94,9 +101,38 @@ func gatherOutgoingTracks(medias media.Medias) ([]*webRTCOutgoingTrack, error) {
return tracks, nil
}
func gatherIncomingTracks(
func webrtcTrackCount(medias []*sdp.MediaDescription) (int, error) {
videoTrack := false
audioTrack := false
trackCount := 0
for _, media := range medias {
switch media.MediaName.Media {
case "video":
if videoTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
videoTrack = true
case "audio":
if audioTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
audioTrack = true
default:
return 0, fmt.Errorf("unsupported media '%s'", media.MediaName.Media)
}
trackCount++
}
return trackCount, nil
}
func webrtcGatherIncomingTracks(
ctx context.Context,
pc *peerConnection,
pc *webrtcpc.PeerConnection,
trackRecv chan trackRecvPair,
trackCount int,
) ([]*webRTCIncomingTrack, error) {
@ -108,6 +144,9 @@ func gatherIncomingTracks( @@ -108,6 +144,9 @@ func gatherIncomingTracks(
for {
select {
case <-t.C:
if trackCount == 0 {
return tracks, nil
}
return nil, fmt.Errorf("deadline exceeded while waiting tracks")
case pair := <-trackRecv:
@ -121,7 +160,7 @@ func gatherIncomingTracks( @@ -121,7 +160,7 @@ func gatherIncomingTracks(
return tracks, nil
}
case <-pc.disconnected:
case <-pc.Disconnected():
return nil, fmt.Errorf("peer connection closed")
case <-ctx.Done():
@ -136,14 +175,12 @@ type webRTCSessionPathManager interface { @@ -136,14 +175,12 @@ type webRTCSessionPathManager interface {
}
type webRTCSession struct {
readBufferCount int
req webRTCNewSessionReq
wg *sync.WaitGroup
iceHostNAT1To1IPs []string
iceUDPMux ice.UDPMux
iceTCPMux ice.TCPMux
pathManager webRTCSessionPathManager
parent *webRTCManager
readBufferCount int
api *webrtc.API
req webRTCNewSessionReq
wg *sync.WaitGroup
pathManager webRTCSessionPathManager
parent *webRTCManager
ctx context.Context
ctxCancel func()
@ -151,7 +188,7 @@ type webRTCSession struct { @@ -151,7 +188,7 @@ type webRTCSession struct {
uuid uuid.UUID
secret uuid.UUID
mutex sync.RWMutex
pc *peerConnection
pc *webrtcpc.PeerConnection
chNew chan webRTCNewSessionReq
chAddCandidates chan webRTCAddSessionCandidatesReq
@ -160,32 +197,28 @@ type webRTCSession struct { @@ -160,32 +197,28 @@ type webRTCSession struct {
func newWebRTCSession(
parentCtx context.Context,
readBufferCount int,
api *webrtc.API,
req webRTCNewSessionReq,
wg *sync.WaitGroup,
iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux,
iceTCPMux ice.TCPMux,
pathManager webRTCSessionPathManager,
parent *webRTCManager,
) *webRTCSession {
ctx, ctxCancel := context.WithCancel(parentCtx)
s := &webRTCSession{
readBufferCount: readBufferCount,
req: req,
wg: wg,
iceHostNAT1To1IPs: iceHostNAT1To1IPs,
iceUDPMux: iceUDPMux,
iceTCPMux: iceTCPMux,
parent: parent,
pathManager: pathManager,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
uuid: uuid.New(),
secret: uuid.New(),
chNew: make(chan webRTCNewSessionReq),
chAddCandidates: make(chan webRTCAddSessionCandidatesReq),
readBufferCount: readBufferCount,
api: api,
req: req,
wg: wg,
parent: parent,
pathManager: pathManager,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
uuid: uuid.New(),
secret: uuid.New(),
chNew: make(chan webRTCNewSessionReq),
chAddCandidates: make(chan webRTCAddSessionCandidatesReq),
}
s.Log(logger.Info, "created by %s", req.remoteAddr)
@ -276,18 +309,16 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -276,18 +309,16 @@ func (s *webRTCSession) runPublish() (int, error) {
return http.StatusInternalServerError, err
}
pc, err := newPeerConnection(
pc, err := webrtcpc.New(
servers,
s.iceHostNAT1To1IPs,
s.iceUDPMux,
s.iceTCPMux,
s.api,
s)
if err != nil {
return http.StatusBadRequest, err
}
defer pc.close()
defer pc.Close()
offer := s.offer()
offer := whipOffer(s.req.offer)
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(offer.SDP))
@ -295,29 +326,9 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -295,29 +326,9 @@ func (s *webRTCSession) runPublish() (int, error) {
return http.StatusBadRequest, err
}
videoTrack := false
audioTrack := false
trackCount := 0
for _, media := range sdp.MediaDescriptions {
switch media.MediaName.Media {
case "video":
if videoTrack {
return http.StatusBadRequest, fmt.Errorf("only a single video and a single audio track are supported")
}
videoTrack = true
case "audio":
if audioTrack {
return http.StatusBadRequest, fmt.Errorf("only a single video and a single audio track are supported")
}
audioTrack = true
default:
return http.StatusBadRequest, fmt.Errorf("unsupported media '%s'", media.MediaName.Media)
}
trackCount++
trackCount, err := webrtcTrackCount(sdp.MediaDescriptions)
if err != nil {
return http.StatusBadRequest, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{
@ -339,7 +350,7 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -339,7 +350,7 @@ func (s *webRTCSession) runPublish() (int, error) {
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case trackRecv <- trackRecvPair{track, receiver}:
case <-pc.closed:
case <-s.ctx.Done():
}
})
@ -358,19 +369,16 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -358,19 +369,16 @@ func (s *webRTCSession) runPublish() (int, error) {
return http.StatusBadRequest, err
}
err = s.waitGatheringDone(pc)
err = pc.WaitGatheringDone(s.ctx)
if err != nil {
return http.StatusBadRequest, err
}
tmp := pc.LocalDescription()
answer = *tmp
s.writeAnswer(&answer)
s.writeAnswer(pc.LocalDescription())
go s.readRemoteCandidates(pc)
err = waitUntilConnected(s.ctx, pc)
err = webrtcWaitUntilConnected(s.ctx, pc)
if err != nil {
return 0, err
}
@ -379,11 +387,11 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -379,11 +387,11 @@ func (s *webRTCSession) runPublish() (int, error) {
s.pc = pc
s.mutex.Unlock()
tracks, err := gatherIncomingTracks(s.ctx, pc, trackRecv, trackCount)
tracks, err := webrtcGatherIncomingTracks(s.ctx, pc, trackRecv, trackCount)
if err != nil {
return 0, err
}
medias := mediasOfIncomingTracks(tracks)
medias := webrtcMediasOfIncomingTracks(tracks)
rres := res.path.startPublisher(pathStartPublisherReq{
author: s,
@ -403,7 +411,7 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -403,7 +411,7 @@ func (s *webRTCSession) runPublish() (int, error) {
}
select {
case <-pc.disconnected:
case <-pc.Disconnected():
return 0, fmt.Errorf("peer connection closed")
case <-s.ctx.Done():
@ -443,7 +451,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -443,7 +451,7 @@ func (s *webRTCSession) runRead() (int, error) {
defer res.path.removeReader(pathRemoveReaderReq{author: s})
tracks, err := gatherOutgoingTracks(res.stream.Medias())
tracks, err := webrtcGatherOutgoingTracks(res.stream.Medias())
if err != nil {
return http.StatusBadRequest, err
}
@ -453,16 +461,14 @@ func (s *webRTCSession) runRead() (int, error) { @@ -453,16 +461,14 @@ func (s *webRTCSession) runRead() (int, error) {
return http.StatusInternalServerError, err
}
pc, err := newPeerConnection(
pc, err := webrtcpc.New(
servers,
s.iceHostNAT1To1IPs,
s.iceUDPMux,
s.iceTCPMux,
s.api,
s)
if err != nil {
return http.StatusBadRequest, err
}
defer pc.close()
defer pc.Close()
for _, track := range tracks {
var err error
@ -472,7 +478,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -472,7 +478,7 @@ func (s *webRTCSession) runRead() (int, error) {
}
}
offer := s.offer()
offer := whipOffer(s.req.offer)
err = pc.SetRemoteDescription(*offer)
if err != nil {
@ -489,19 +495,16 @@ func (s *webRTCSession) runRead() (int, error) { @@ -489,19 +495,16 @@ func (s *webRTCSession) runRead() (int, error) {
return http.StatusBadRequest, err
}
err = s.waitGatheringDone(pc)
err = pc.WaitGatheringDone(s.ctx)
if err != nil {
return http.StatusBadRequest, err
}
tmp := pc.LocalDescription()
answer = *tmp
s.writeAnswer(&answer)
s.writeAnswer(pc.LocalDescription())
go s.readRemoteCandidates(pc)
err = waitUntilConnected(s.ctx, pc)
err = webrtcWaitUntilConnected(s.ctx, pc)
if err != nil {
return 0, err
}
@ -522,7 +525,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -522,7 +525,7 @@ func (s *webRTCSession) runRead() (int, error) {
defer res.stream.RemoveReader(s)
s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(mediasOfOutgoingTracks(tracks)))
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))
go func() {
for {
@ -535,7 +538,7 @@ func (s *webRTCSession) runRead() (int, error) { @@ -535,7 +538,7 @@ func (s *webRTCSession) runRead() (int, error) {
}()
select {
case <-pc.disconnected:
case <-pc.Disconnected():
return 0, fmt.Errorf("peer connection closed")
case err := <-writeError:
@ -546,25 +549,6 @@ func (s *webRTCSession) runRead() (int, error) { @@ -546,25 +549,6 @@ func (s *webRTCSession) runRead() (int, error) {
}
}
func (s *webRTCSession) offer() *webrtc.SessionDescription {
return &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(s.req.offer),
}
}
func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
for {
select {
case <-pc.localCandidateRecv:
case <-pc.gatheringDone:
return nil
case <-s.ctx.Done():
return fmt.Errorf("terminated")
}
}
}
func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) {
s.req.res <- webRTCNewSessionRes{
sx: s,
@ -572,7 +556,7 @@ func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) { @@ -572,7 +556,7 @@ func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) {
}
}
func (s *webRTCSession) readRemoteCandidates(pc *peerConnection) {
func (s *webRTCSession) readRemoteCandidates(pc *webrtcpc.PeerConnection) {
for {
select {
case req := <-s.chAddCandidates:
@ -639,10 +623,10 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession { @@ -639,10 +623,10 @@ func (s *webRTCSession) apiItem() *apiWebRTCSession {
if s.pc != nil {
peerConnectionEstablished = true
localCandidate = s.pc.localCandidate()
remoteCandidate = s.pc.remoteCandidate()
bytesReceived = s.pc.bytesReceived()
bytesSent = s.pc.bytesSent()
localCandidate = s.pc.LocalCandidate()
remoteCandidate = s.pc.RemoteCandidate()
bytesReceived = s.pc.BytesReceived()
bytesSent = s.pc.BytesSent()
}
return &apiWebRTCSession{

175
internal/core/webrtc_source.go

@ -0,0 +1,175 @@ @@ -0,0 +1,175 @@
package core
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/whip"
)
type webRTCSourceParent interface {
logger.Writer
setReady(req pathSourceStaticSetReadyReq) pathSourceStaticSetReadyRes
setNotReady(req pathSourceStaticSetNotReadyReq)
}
type webRTCSource struct {
readTimeout conf.StringDuration
parent webRTCSourceParent
}
func newWebRTCSource(
readTimeout conf.StringDuration,
parent webRTCSourceParent,
) *webRTCSource {
s := &webRTCSource{
readTimeout: readTimeout,
parent: parent,
}
return s
}
func (s *webRTCSource) Log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[WebRTC source] "+format, args...)
}
// run implements sourceStaticImpl.
func (s *webRTCSource) run(ctx context.Context, cnf *conf.PathConf, _ chan *conf.PathConf) error {
s.Log(logger.Debug, "connecting")
u, err := url.Parse(cnf.Source)
if err != nil {
return err
}
u.Scheme = strings.ReplaceAll(u.Scheme, "whep", "http")
c := &http.Client{
Timeout: time.Duration(s.readTimeout),
}
iceServers, err := whip.GetICEServers(ctx, c, u.String())
if err != nil {
return err
}
api, err := webrtcNewAPI(nil, nil, nil)
if err != nil {
return err
}
pc, err := webrtcpc.New(iceServers, api, s)
if err != nil {
return err
}
defer pc.Close()
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
if err != nil {
return err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio)
if err != nil {
return err
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
err = pc.SetLocalDescription(offer)
if err != nil {
return err
}
err = pc.WaitGatheringDone(ctx)
if err != nil {
return err
}
res, err := whip.PostOffer(ctx, c, u.String(), pc.LocalDescription())
if err != nil {
return err
}
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(res.Answer.SDP))
if err != nil {
return err
}
// check that there are at most two tracks
_, err = webrtcTrackCount(sdp.MediaDescriptions)
if err != nil {
return err
}
trackRecv := make(chan trackRecvPair)
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case trackRecv <- trackRecvPair{track, receiver}:
case <-ctx.Done():
}
})
err = pc.SetRemoteDescription(*res.Answer)
if err != nil {
return err
}
err = webrtcWaitUntilConnected(ctx, pc)
if err != nil {
return err
}
tracks, err := webrtcGatherIncomingTracks(ctx, pc, trackRecv, 0)
if err != nil {
return err
}
medias := webrtcMediasOfIncomingTracks(tracks)
rres := s.parent.setReady(pathSourceStaticSetReadyReq{
medias: medias,
generateRTPPackets: true,
})
if rres.err != nil {
return rres.err
}
defer s.parent.setNotReady(pathSourceStaticSetNotReadyReq{})
for _, track := range tracks {
track.start(rres.stream)
}
select {
case <-pc.Disconnected():
return fmt.Errorf("peer connection closed")
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
// apiSourceDescribe implements sourceStaticImpl.
func (*webRTCSource) apiSourceDescribe() pathAPISourceOrReader {
return pathAPISourceOrReader{
Type: "webRTCSource",
ID: "",
}
}

188
internal/core/webrtc_source_test.go

@ -0,0 +1,188 @@ @@ -0,0 +1,188 @@
package core
import (
"context"
"io"
"net"
"net/http"
"testing"
"github.com/bluenviron/gortsplib/v3"
"github.com/bluenviron/gortsplib/v3/pkg/formats"
"github.com/bluenviron/gortsplib/v3/pkg/url"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
)
func TestWebRTCSource(t *testing.T) {
state := 0
api, err := webrtcNewAPI(nil, nil, nil)
require.NoError(t, err)
pc, err := webrtcpc.New(nil, api, nilLogger{})
require.NoError(t, err)
defer pc.Close()
outgoingTrack1, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
"vp8",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack1)
require.NoError(t, err)
outgoingTrack2, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
},
"opus",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack2)
require.NoError(t, err)
httpServ := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch state {
case 0:
require.Equal(t, http.MethodOptions, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
w.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
w.WriteHeader(http.StatusNoContent)
case 1:
require.Equal(t, http.MethodPost, r.Method)
require.Equal(t, "/my/resource", r.URL.Path)
require.Equal(t, "application/sdp", r.Header.Get("Content-Type"))
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
offer := whipOffer(body)
err = pc.SetRemoteDescription(*offer)
require.NoError(t, err)
answer, err := pc.CreateAnswer(nil)
require.NoError(t, err)
err = pc.SetLocalDescription(answer)
require.NoError(t, err)
err = pc.WaitGatheringDone(context.Background())
require.NoError(t, err)
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
w.Header().Set("E-Tag", "test_etag")
w.Header().Set("Location", "/my/resource/sessionid")
w.WriteHeader(http.StatusCreated)
w.Write([]byte(pc.LocalDescription().SDP))
go func() {
<-pc.Connected()
err = outgoingTrack1.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
err = outgoingTrack2.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 97,
SequenceNumber: 1123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{2},
})
require.NoError(t, err)
}()
default:
t.Errorf("should not happen since there should not be additional candidates")
}
state++
}),
}
ln, err := net.Listen("tcp", "localhost:5555")
require.NoError(t, err)
go httpServ.Serve(ln)
defer httpServ.Shutdown(context.Background())
p, ok := newInstance("paths:\n" +
" proxied:\n" +
" source: whep://localhost:5555/my/resource\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.Close()
c := gortsplib.Client{}
u, err := url.Parse("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)
require.NoError(t, err)
defer c.Close()
medias, baseURL, _, err := c.Describe(u)
require.NoError(t, err)
var forma *formats.VP8
medi := medias.FindFormat(&forma)
_, err = c.Setup(medi, baseURL, 0, 0)
require.NoError(t, err)
received := make(chan struct{})
c.OnPacketRTP(medi, forma, func(pkt *rtp.Packet) {
require.Equal(t, []byte{3}, pkt.Payload)
close(received)
})
_, err = c.Play(nil)
require.NoError(t, err)
err = outgoingTrack1.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 124,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{3},
})
require.NoError(t, err)
<-received
}

93
internal/highleveltests/hls_manager_test.go

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
//go:build enable_highlevel_tests
// +build enable_highlevel_tests
package highleveltests
import (
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestHLSServerRead(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://127.0.0.1:8554/test/stream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://127.0.0.1:8888/test/stream/index.m3u8",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestHLSServerAuth(t *testing.T) {
for _, result := range []string{
"success",
"fail",
} {
t.Run(result, func(t *testing.T) {
conf := "paths:\n" +
" all:\n" +
" readUser: testreader\n" +
" readPass: testpass\n" +
" readIPs: [127.0.0.0/16]\n"
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://testpublisher:testpass@127.0.0.1:8554/teststream?param=value",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
var usr string
if result == "success" {
usr = "testreader"
} else {
usr = "testreader2"
}
hc := &http.Client{Transport: &http.Transport{}}
res, err := hc.Get("http://" + usr + ":testpass@127.0.0.1:8888/teststream/index.m3u8?param=value")
require.NoError(t, err)
defer res.Body.Close()
if result == "success" {
require.Equal(t, http.StatusOK, res.StatusCode)
} else {
require.Equal(t, http.StatusUnauthorized, res.StatusCode)
}
})
}
}

182
internal/highleveltests/hls_server_test.go

@ -1,182 +0,0 @@ @@ -1,182 +0,0 @@
//go:build enable_highlevel_tests
// +build enable_highlevel_tests
package highleveltests
import (
"context"
"encoding/json"
"net"
"net/http"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
type testHTTPAuthenticator struct {
action string
s *http.Server
}
func newTestHTTPAuthenticator(t *testing.T, action string) *testHTTPAuthenticator {
ln, err := net.Listen("tcp", "127.0.0.1:9120")
require.NoError(t, err)
ts := &testHTTPAuthenticator{
action: action,
}
router := gin.New()
router.POST("/auth", ts.onAuth)
ts.s = &http.Server{Handler: router}
go ts.s.Serve(ln)
return ts
}
func (ts *testHTTPAuthenticator) close() {
ts.s.Shutdown(context.Background())
}
func (ts *testHTTPAuthenticator) onAuth(ctx *gin.Context) {
var in struct {
IP string `json:"ip"`
User string `json:"user"`
Password string `json:"password"`
Path string `json:"path"`
Action string `json:"action"`
Query string `json:"query"`
}
err := json.NewDecoder(ctx.Request.Body).Decode(&in)
if err != nil {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
var user string
if ts.action == "publish" {
user = "testpublisher"
} else {
user = "testreader"
}
if in.IP != "127.0.0.1" ||
in.User != user ||
in.Password != "testpass" ||
in.Path != "teststream" ||
in.Action != ts.action ||
(in.Query != "user=testreader&pass=testpass&param=value" &&
in.Query != "user=testpublisher&pass=testpass&param=value" &&
in.Query != "param=value") {
ctx.AbortWithStatus(http.StatusBadRequest)
return
}
}
func TestHLSServerRead(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://127.0.0.1:8554/test/stream",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "dest", []string{
"-i", "http://127.0.0.1:8888/test/stream/index.m3u8",
"-vframes", "1",
"-f", "image2",
"-y", "/dev/null",
})
require.NoError(t, err)
defer cnt2.close()
require.Equal(t, 0, cnt2.wait())
}
func TestHLSServerAuth(t *testing.T) {
for _, mode := range []string{
"internal",
"external",
} {
for _, result := range []string{
"success",
"fail",
} {
t.Run(mode+"_"+result, func(t *testing.T) {
var conf string
if mode == "internal" {
conf = "paths:\n" +
" all:\n" +
" readUser: testreader\n" +
" readPass: testpass\n" +
" readIPs: [127.0.0.0/16]\n"
} else {
conf = "externalAuthenticationURL: http://127.0.0.1:9120/auth\n" +
"paths:\n" +
" all:\n"
}
p, ok := newInstance(conf)
require.Equal(t, true, ok)
defer p.Close()
var a *testHTTPAuthenticator
if mode == "external" {
a = newTestHTTPAuthenticator(t, "publish")
}
cnt1, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.mkv",
"-c", "copy",
"-f", "rtsp",
"rtsp://testpublisher:testpass@127.0.0.1:8554/teststream?param=value",
})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
if mode == "external" {
a.close()
a = newTestHTTPAuthenticator(t, "read")
defer a.close()
}
var usr string
if result == "success" {
usr = "testreader"
} else {
usr = "testreader2"
}
hc := &http.Client{Transport: &http.Transport{}}
res, err := hc.Get("http://" + usr + ":testpass@127.0.0.1:8888/teststream/index.m3u8?param=value")
require.NoError(t, err)
defer res.Body.Close()
if result == "success" {
require.Equal(t, http.StatusOK, res.StatusCode)
} else {
require.Equal(t, http.StatusUnauthorized, res.StatusCode)
}
})
}
}
}

196
internal/webrtcpc/pc.go

@ -0,0 +1,196 @@ @@ -0,0 +1,196 @@
// Package webrtcpc contains a WebRTC peer connection wrapper.
package webrtcpc
import (
"context"
"fmt"
"strconv"
"sync"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
// PeerConnection is a wrapper around webrtc.PeerConnection.
type PeerConnection struct {
*webrtc.PeerConnection
stateChangeMutex sync.Mutex
newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
closed chan struct{}
gatheringDone chan struct{}
}
// New allocates a PeerConnection.
func New(
iceServers []webrtc.ICEServer,
api *webrtc.API,
log logger.Writer,
) (*PeerConnection, error) {
configuration := webrtc.Configuration{ICEServers: iceServers}
pc, err := api.NewPeerConnection(configuration)
if err != nil {
return nil, err
}
co := &PeerConnection{
PeerConnection: pc,
newLocalCandidate: make(chan *webrtc.ICECandidateInit),
connected: make(chan struct{}),
disconnected: make(chan struct{}),
closed: make(chan struct{}),
gatheringDone: make(chan struct{}),
}
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
co.stateChangeMutex.Lock()
defer co.stateChangeMutex.Unlock()
select {
case <-co.closed:
return
default:
}
log.Log(logger.Debug, "peer connection state: "+state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.LocalCandidate(), co.RemoteCandidate())
close(co.connected)
case webrtc.PeerConnectionStateDisconnected:
close(co.disconnected)
case webrtc.PeerConnectionStateClosed:
close(co.closed)
}
})
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
v := i.ToJSON()
select {
case co.newLocalCandidate <- &v:
case <-co.connected:
case <-co.closed:
}
} else {
close(co.gatheringDone)
}
})
return co, nil
}
// Close closes the connection.
func (co *PeerConnection) Close() {
co.PeerConnection.Close()
<-co.closed
}
// Connected returns when connected.
func (co *PeerConnection) Connected() <-chan struct{} {
return co.connected
}
// Disconnected returns when disconnected.
func (co *PeerConnection) Disconnected() <-chan struct{} {
return co.disconnected
}
// NewLocalCandidate returns when there's a new local candidate.
func (co *PeerConnection) NewLocalCandidate() <-chan *webrtc.ICECandidateInit {
return co.newLocalCandidate
}
// GatheringDone returns when candidate gathering is complete.
func (co *PeerConnection) GatheringDone() <-chan struct{} {
return co.gatheringDone
}
// WaitGatheringDone waits until candidate gathering is complete.
func (co *PeerConnection) WaitGatheringDone(ctx context.Context) error {
for {
select {
case <-co.NewLocalCandidate():
case <-co.GatheringDone():
return nil
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
}
// LocalCandidate returns the local candidate.
func (co *PeerConnection) LocalCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.LocalCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// RemoteCandidate returns the remote candidate.
func (co *PeerConnection) RemoteCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.RemoteCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// BytesReceived returns received bytes.
func (co *PeerConnection) BytesReceived() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesReceived
}
}
}
return 0
}
// BytesSent returns sent bytes.
func (co *PeerConnection) BytesSent() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesSent
}
}
}
return 0
}

18
internal/websocket/serverconn_test.go

@ -15,22 +15,22 @@ func TestServerConn(t *testing.T) { @@ -15,22 +15,22 @@ func TestServerConn(t *testing.T) {
pingReceived := make(chan struct{})
pingInterval = 100 * time.Millisecond
handler := func(w http.ResponseWriter, r *http.Request) {
c, err := NewServerConn(w, r)
require.NoError(t, err)
defer c.Close()
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := NewServerConn(w, r)
require.NoError(t, err)
defer c.Close()
err = c.WriteJSON("testing")
require.NoError(t, err)
err = c.WriteJSON("testing")
require.NoError(t, err)
<-pingReceived
<-pingReceived
}),
}
ln, err := net.Listen("tcp", "localhost:6344")
require.NoError(t, err)
defer ln.Close()
s := &http.Server{Handler: http.HandlerFunc(handler)}
go s.Serve(ln)
defer s.Shutdown(context.Background())

33
internal/whip/get_ice_servers.go

@ -0,0 +1,33 @@ @@ -0,0 +1,33 @@
package whip
import (
"context"
"fmt"
"net/http"
"github.com/pion/webrtc/v3"
)
// GetICEServers posts a WHIP/WHEP request for ICE servers.
func GetICEServers(
ctx context.Context,
hc *http.Client,
ur string,
) ([]webrtc.ICEServer, error) {
req, err := http.NewRequestWithContext(ctx, "OPTIONS", ur, nil)
if err != nil {
return nil, err
}
res, err := hc.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNoContent {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
return LinkHeaderUnmarshal(res.Header["Link"])
}

83
internal/whip/ice_fragment.go

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
package whip
import (
"fmt"
"strconv"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
)
// ICEFragmentUnmarshal decodes an ICE fragment.
func ICEFragmentUnmarshal(buf []byte) ([]*webrtc.ICECandidateInit, error) {
buf = append([]byte("v=0\r\no=- 0 0 IN IP4 0.0.0.0\r\ns=-\r\nt=0 0\r\n"), buf...)
var sdp sdp.SessionDescription
err := sdp.Unmarshal(buf)
if err != nil {
return nil, err
}
var ret []*webrtc.ICECandidateInit
for _, media := range sdp.MediaDescriptions {
mid, ok := media.Attribute("mid")
if !ok {
return nil, fmt.Errorf("mid attribute is missing")
}
tmp, err := strconv.ParseUint(mid, 10, 16)
if err != nil {
return nil, fmt.Errorf("invalid mid attribute")
}
midNum := uint16(tmp)
for _, attr := range media.Attributes {
if attr.Key == "candidate" {
ret = append(ret, &webrtc.ICECandidateInit{
Candidate: attr.Value,
SDPMid: &mid,
SDPMLineIndex: &midNum,
})
}
}
}
return ret, nil
}
// ICEFragmentMarshal encodes an ICE fragment.
func ICEFragmentMarshal(offer string, candidates []*webrtc.ICECandidateInit) ([]byte, error) {
var sdp sdp.SessionDescription
err := sdp.Unmarshal([]byte(offer))
if err != nil || len(sdp.MediaDescriptions) == 0 {
return nil, err
}
firstMedia := sdp.MediaDescriptions[0]
iceUfrag, _ := firstMedia.Attribute("ice-ufrag")
icePwd, _ := firstMedia.Attribute("ice-pwd")
candidatesByMedia := make(map[uint16][]*webrtc.ICECandidateInit)
for _, candidate := range candidates {
mid := *candidate.SDPMLineIndex
candidatesByMedia[mid] = append(candidatesByMedia[mid], candidate)
}
frag := "a=ice-ufrag:" + iceUfrag + "\r\n" +
"a=ice-pwd:" + icePwd + "\r\n"
for mid, media := range sdp.MediaDescriptions {
cbm, ok := candidatesByMedia[uint16(mid)]
if ok {
frag += "m=" + media.MediaName.String() + "\r\n" +
"a=mid:" + strconv.FormatUint(uint64(mid), 10) + "\r\n"
for _, candidate := range cbm {
frag += "a=candidate:" + candidate.Candidate + "\r\n"
}
}
}
return []byte(frag), nil
}

208
internal/whip/ice_fragment_test.go

@ -0,0 +1,208 @@ @@ -0,0 +1,208 @@
package whip
import (
"testing"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
func stringPtr(v string) *string {
return &v
}
func uint16Ptr(v uint16) *uint16 {
return &v
}
var iceFragmentCases = []struct {
name string
offer string
candidates []*webrtc.ICECandidateInit
enc string
}{
{
"a",
"v=0\n" +
"o=- 8429658789122714282 1690995382 IN IP4 0.0.0.0\n" +
"s=-\n" +
"t=0 0\n" +
"a=fingerprint:sha-256 EA:05:9D:04:8F:56:41:92:3E:D5:2B:55:03:" +
"1B:5A:2C:3D:D8:B3:FB:1B:D9:F7:1F:DA:77:0E:B9:E0:3D:B6:FF\n" +
"a=extmap-allow-mixed\n" +
"a=group:BUNDLE 0\n" +
"m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 123 118 45 46 116\n" +
"c=IN IP4 0.0.0.0\n" +
"a=setup:actpass\n" +
"a=mid:0\n" +
"a=ice-ufrag:tUQMzoQAVLzlvBys\n" +
"a=ice-pwd:pimyGfJcjjRwvUjnmGOODSjtIxyDljQj\n" +
"a=rtcp-mux\n" +
"a=rtcp-rsize\n" +
"a=rtpmap:96 VP8/90000\n" +
"a=rtcp-fb:96 goog-remb \n" +
"a=rtcp-fb:96 ccm fir\n" +
"a=rtcp-fb:96 nack \n" +
"a=rtcp-fb:96 nack pli\n" +
"a=rtcp-fb:96 nack \n" +
"a=rtcp-fb:96 nack pli\n" +
"a=rtcp-fb:96 transport-cc \n" +
"a=rtpmap:97 rtx/90000\n" +
"a=fmtp:97 apt=96\n" +
"a=rtcp-fb:97 nack \n" +
"a=rtcp-fb:97 nack pli\n" +
"a=rtcp-fb:97 transport-cc \n" +
"a=rtpmap:98 VP9/90000\n" +
"a=fmtp:98 profile-id=0\n" +
"a=rtcp-fb:98 goog-remb \n" +
"a=rtcp-fb:98 ccm fir\n" +
"a=rtcp-fb:98 nack \n" +
"a=rtcp-fb:98 nack pli\n" +
"a=rtcp-fb:98 nack \n" +
"a=rtcp-fb:98 nack pli\n" +
"a=rtcp-fb:98 transport-cc \n" +
"a=rtpmap:99 rtx/90000\n" +
"a=fmtp:99 apt=98\n" +
"a=rtcp-fb:99 nack \n" +
"a=rtcp-fb:99 nack pli\n" +
"a=rtcp-fb:99 transport-cc \n" +
"a=rtpmap:100 VP9/90000\n" +
"a=fmtp:100 profile-id=1\n" +
"a=rtcp-fb:100 goog-remb \n" +
"a=rtcp-fb:100 ccm fir\n" +
"a=rtcp-fb:100 nack \n" +
"a=rtcp-fb:100 nack pli\n" +
"a=rtcp-fb:100 nack \n" +
"a=rtcp-fb:100 nack pli\n" +
"a=rtcp-fb:100 transport-cc \n" +
"a=rtpmap:101 rtx/90000\n" +
"a=fmtp:101 apt=100\n" +
"a=rtcp-fb:101 nack \n" +
"a=rtcp-fb:101 nack pli\n" +
"a=rtcp-fb:101 transport-cc \n" +
"a=rtpmap:102 H264/90000\n" +
"a=fmtp:102 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f\n" +
"a=rtcp-fb:102 goog-remb \n" +
"a=rtcp-fb:102 ccm fir\n" +
"a=rtcp-fb:102 nack \n" +
"a=rtcp-fb:102 nack pli\n" +
"a=rtcp-fb:102 nack \n" +
"a=rtcp-fb:102 nack pli\n" +
"a=rtcp-fb:102 transport-cc \n" +
"a=rtpmap:121 rtx/90000\n" +
"a=fmtp:121 apt=102\n" +
"a=rtcp-fb:121 nack \n" +
"a=rtcp-fb:121 nack pli\n" +
"a=rtcp-fb:121 transport-cc \n" +
"a=rtpmap:127 H264/90000\n" +
"a=fmtp:127 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f\n" +
"a=rtcp-fb:127 goog-remb \n" +
"a=rtcp-fb:127 ccm fir\n" +
"a=rtcp-fb:127 nack \n" +
"a=rtcp-fb:127 nack pli\n" +
"a=rtcp-fb:127 nack \n" +
"a=rtcp-fb:127 nack pli\n" +
"a=rtcp-fb:127 transport-cc \n" +
"a=rtpmap:120 rtx/90000\n" +
"a=fmtp:120 apt=127\n" +
"a=rtcp-fb:120 nack \n" +
"a=rtcp-fb:120 nack pli\n" +
"a=rtcp-fb:120 transport-cc \n" +
"a=rtpmap:125 H264/90000\n" +
"a=fmtp:125 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f\n" +
"a=rtcp-fb:125 goog-remb \n" +
"a=rtcp-fb:125 ccm fir\n" +
"a=rtcp-fb:125 nack \n" +
"a=rtcp-fb:125 nack pli\n" +
"a=rtcp-fb:125 nack \n" +
"a=rtcp-fb:125 nack pli\n" +
"a=rtcp-fb:125 transport-cc \n" +
"a=rtpmap:107 rtx/90000\n" +
"a=fmtp:107 apt=125\n" +
"a=rtcp-fb:107 nack \n" +
"a=rtcp-fb:107 nack pli\n" +
"a=rtcp-fb:107 transport-cc \n" +
"a=rtpmap:108 H264/90000\n" +
"a=fmtp:108 level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f\n" +
"a=rtcp-fb:108 goog-remb \n" +
"a=rtcp-fb:108 ccm fir\n" +
"a=rtcp-fb:108 nack \n" +
"a=rtcp-fb:108 nack pli\n" +
"a=rtcp-fb:108 nack \n" +
"a=rtcp-fb:108 nack pli\n" +
"a=rtcp-fb:108 transport-cc \n" +
"a=rtpmap:109 rtx/90000\n" +
"a=fmtp:109 apt=108\n" +
"a=rtcp-fb:109 nack \n" +
"a=rtcp-fb:109 nack pli\n" +
"a=rtcp-fb:109 transport-cc \n" +
"a=rtpmap:123 H264/90000\n" +
"a=fmtp:123 level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032\n" +
"a=rtcp-fb:123 goog-remb \n" +
"a=rtcp-fb:123 ccm fir\n" +
"a=rtcp-fb:123 nack \n" +
"a=rtcp-fb:123 nack pli\n" +
"a=rtcp-fb:123 nack \n" +
"a=rtcp-fb:123 nack pli\n" +
"a=rtcp-fb:123 transport-cc \n" +
"a=rtpmap:118 rtx/90000\n" +
"a=fmtp:118 apt=123\n" +
"a=rtcp-fb:118 nack \n" +
"a=rtcp-fb:118 nack pli\n" +
"a=rtcp-fb:118 transport-cc \n" +
"a=rtpmap:45 AV1/90000\n" +
"a=rtcp-fb:45 goog-remb \n" +
"a=rtcp-fb:45 ccm fir\n" +
"a=rtcp-fb:45 nack \n" +
"a=rtcp-fb:45 nack pli\n" +
"a=rtcp-fb:45 nack \n" +
"a=rtcp-fb:45 nack pli\n" +
"a=rtcp-fb:45 transport-cc \n" +
"a=rtpmap:46 rtx/90000\n" +
"a=fmtp:46 apt=45\n" +
"a=rtcp-fb:46 nack \n" +
"a=rtcp-fb:46 nack pli\n" +
"a=rtcp-fb:46 transport-cc \n" +
"a=rtpmap:116 ulpfec/90000\n" +
"a=rtcp-fb:116 nack \n" +
"a=rtcp-fb:116 nack pli\n" +
"a=rtcp-fb:116 transport-cc \n" +
"a=extmap:1 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\n" +
"a=ssrc:3421396091 cname:BmFVQDtOlcBwXZCl\n" +
"a=ssrc:3421396091 msid:BmFVQDtOlcBwXZCl CLgunVCazXXKLyEx\n" +
"a=ssrc:3421396091 mslabel:BmFVQDtOlcBwXZCl\n" +
"a=ssrc:3421396091 label:CLgunVCazXXKLyEx\n" +
"a=msid:BmFVQDtOlcBwXZCl CLgunVCazXXKLyEx\n" +
"a=sendrecv\n",
[]*webrtc.ICECandidateInit{{
Candidate: "3628911098 1 udp 2130706431 192.168.3.218 49462 typ host",
SDPMid: stringPtr("0"),
SDPMLineIndex: uint16Ptr(0),
}},
"a=ice-ufrag:tUQMzoQAVLzlvBys\r\n" +
"a=ice-pwd:pimyGfJcjjRwvUjnmGOODSjtIxyDljQj\r\n" +
"m=video 9 UDP/TLS/RTP/SAVPF 96 97 98 99 100 101 102 121 127 120 125 107 108 109 123 118 45 46 116\r\n" +
"a=mid:0\r\n" +
"a=candidate:3628911098 1 udp 2130706431 192.168.3.218 49462 typ host\r\n",
},
}
func TestICEFragmentUnmarshal(t *testing.T) {
for _, ca := range iceFragmentCases {
t.Run(ca.name, func(t *testing.T) {
candidates, err := ICEFragmentUnmarshal([]byte(ca.enc))
require.NoError(t, err)
require.Equal(t, ca.candidates, candidates)
})
}
}
func TestICEFragmentMarshal(t *testing.T) {
for _, ca := range iceFragmentCases {
t.Run(ca.name, func(t *testing.T) {
byts, err := ICEFragmentMarshal(ca.offer, ca.candidates)
require.NoError(t, err)
require.Equal(t, ca.enc, string(byts))
})
}
}

66
internal/whip/link_header.go

@ -0,0 +1,66 @@ @@ -0,0 +1,66 @@
package whip
import (
"encoding/json"
"fmt"
"regexp"
"github.com/pion/webrtc/v3"
)
func quoteCredential(v string) string {
b, _ := json.Marshal(v)
s := string(b)
return s[1 : len(s)-1]
}
func unquoteCredential(v string) string {
var s string
json.Unmarshal([]byte("\""+v+"\""), &s)
return s
}
// LinkHeaderMarshal encodes a link header.
func LinkHeaderMarshal(iceServers []webrtc.ICEServer) []string {
ret := make([]string, len(iceServers))
for i, server := range iceServers {
link := "<" + server.URLs[0] + ">; rel=\"ice-server\""
if server.Username != "" {
link += "; username=\"" + quoteCredential(server.Username) + "\"" +
"; credential=\"" + quoteCredential(server.Credential.(string)) + "\"; credential-type=\"password\""
}
ret[i] = link
}
return ret
}
var reLink = regexp.MustCompile(`^<(.+?)>; rel="ice-server"(; username="(.+?)"` +
`; credential="(.+?)"; credential-type="password")?`)
// LinkHeaderUnmarshal decodes a link header.
func LinkHeaderUnmarshal(link []string) ([]webrtc.ICEServer, error) {
ret := make([]webrtc.ICEServer, len(link))
for i, li := range link {
m := reLink.FindStringSubmatch(li)
if m == nil {
return nil, fmt.Errorf("invalid link header: '%s'", li)
}
s := webrtc.ICEServer{
URLs: []string{m[1]},
}
if m[3] != "" {
s.Username = unquoteCredential(m[3])
s.Credential = unquoteCredential(m[4])
s.CredentialType = webrtc.ICECredentialTypePassword
}
ret[i] = s
}
return ret, nil
}

52
internal/whip/link_header_test.go

@ -0,0 +1,52 @@ @@ -0,0 +1,52 @@
package whip
import (
"testing"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
)
var linkHeaderCases = []struct {
name string
enc []string
dec []webrtc.ICEServer
}{
{
"a",
[]string{
`<stun:stun.l.google.com:19302>; rel="ice-server"`,
`<turns:turn.example.com>; rel="ice-server"; username="myuser\"a?2;B"; ` +
`credential="mypwd"; credential-type="password"`,
},
[]webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
{
URLs: []string{"turns:turn.example.com"},
Username: "myuser\"a?2;B",
Credential: "mypwd",
},
},
},
}
func TestLinkHeaderUnmarshal(t *testing.T) {
for _, ca := range linkHeaderCases {
t.Run(ca.name, func(t *testing.T) {
dec, err := LinkHeaderUnmarshal(ca.enc)
require.NoError(t, err)
require.Equal(t, ca.dec, dec)
})
}
}
func TestLinkHeaderMarshal(t *testing.T) {
for _, ca := range linkHeaderCases {
t.Run(ca.name, func(t *testing.T) {
enc := LinkHeaderMarshal(ca.dec)
require.Equal(t, ca.enc, enc)
})
}
}

46
internal/whip/post_candidate.go

@ -0,0 +1,46 @@ @@ -0,0 +1,46 @@
// Package whip contains WebRTC / WHIP utilities.
package whip
import (
"bytes"
"context"
"fmt"
"net/http"
"github.com/pion/webrtc/v3"
)
// PostCandidate posts a WHIP/WHEP candidate.
func PostCandidate(
ctx context.Context,
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
etag string,
candidate *webrtc.ICECandidateInit,
) error {
frag, err := ICEFragmentMarshal(offer.SDP, []*webrtc.ICECandidateInit{candidate})
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "PATCH", ur, bytes.NewReader(frag))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/trickle-ice-sdpfrag")
req.Header.Set("If-Match", etag)
res, err := hc.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}

76
internal/whip/post_offer.go

@ -0,0 +1,76 @@ @@ -0,0 +1,76 @@
package whip
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"github.com/pion/webrtc/v3"
)
// PostOfferResponse is the response to a post offer.
type PostOfferResponse struct {
Answer *webrtc.SessionDescription
Location string
ETag string
}
// PostOffer posts a WHIP/WHEP offer.
func PostOffer(
ctx context.Context,
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
) (*PostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, "POST", ur, bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/sdp")
res, err := hc.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("bad status code: %v", res.StatusCode)
}
contentType := res.Header.Get("Content-Type")
if contentType != "application/sdp" {
return nil, fmt.Errorf("bad Content-Type: expected 'application/sdp', got '%s'", contentType)
}
acceptPatch := res.Header.Get("Accept-Patch")
if acceptPatch != "application/trickle-ice-sdpfrag" {
return nil, fmt.Errorf("wrong Accept-Patch: expected 'application/trickle-ice-sdpfrag', got '%s'", acceptPatch)
}
Location := res.Header.Get("Location")
etag := res.Header.Get("E-Tag")
if etag == "" {
return nil, fmt.Errorf("E-Tag is missing")
}
sdp, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return &PostOfferResponse{
Answer: answer,
Location: Location,
ETag: etag,
}, nil
}

2
mediamtx.yml

@ -255,6 +255,8 @@ paths: @@ -255,6 +255,8 @@ paths:
# * https://existing-url/stream.m3u8 -> the stream is pulled from another HLS server with HTTPS
# * udp://ip:port -> the stream is pulled with UDP, by listening on the specified IP and port
# * srt://existing-url -> the stream is pulled from another SRT server
# * whep://existing-url -> the stream is pulled from another WebRTC server
# * wheps://existing-url -> the stream is pulled from another WebRTC server with HTTPS
# * redirect -> the stream is provided by another path or server
# * rpiCamera -> the stream is provided by a Raspberry Pi Camera
source: publisher

Loading…
Cancel
Save