Browse Source

move webrtc utilities into internal/webrtc (#2559)

pull/2560/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
28452acf56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 105
      internal/core/api_test.go
  2. 16
      internal/core/path_test.go
  3. 14
      internal/core/webrtc_http_server.go
  4. 177
      internal/core/webrtc_incoming_track.go
  5. 175
      internal/core/webrtc_manager.go
  6. 241
      internal/core/webrtc_manager_test.go
  7. 364
      internal/core/webrtc_outgoing_track.go
  8. 502
      internal/core/webrtc_session.go
  9. 117
      internal/core/webrtc_source.go
  10. 74
      internal/core/webrtc_source_test.go
  11. 156
      internal/webrtc/api.go
  12. 2
      internal/webrtc/ice_fragment.go
  13. 2
      internal/webrtc/ice_fragment_test.go
  14. 152
      internal/webrtc/incoming_track.go
  15. 2
      internal/webrtc/link_header.go
  16. 2
      internal/webrtc/link_header_test.go
  17. 154
      internal/webrtc/outgoing_track.go
  18. 381
      internal/webrtc/peer_connection.go
  19. 37
      internal/webrtc/track_count.go
  20. 2
      internal/webrtc/webrtc.go
  21. 213
      internal/webrtc/whip_client.go
  22. 6
      internal/webrtc/whip_delete_session.go
  23. 6
      internal/webrtc/whip_options_ice_servers.go
  24. 6
      internal/webrtc/whip_patch_candidate.go
  25. 10
      internal/webrtc/whip_post_offer.go
  26. 196
      internal/webrtcpc/pc.go
  27. 2
      internal/whip/whip.go

105
internal/core/api_test.go

@ -3,6 +3,7 @@ package core @@ -3,6 +3,7 @@ package core
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
@ -25,6 +26,7 @@ import ( @@ -25,6 +26,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/rtmp"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
var testFormatH264 = &format.H264{
@ -58,6 +60,10 @@ var testMediaAAC = &description.Media{ @@ -58,6 +60,10 @@ var testMediaAAC = &description.Media{
}},
}
func checkClose(t *testing.T, closeFunc func() error) {
require.NoError(t, closeFunc())
}
func httpRequest(t *testing.T, hc *http.Client, method string, ur string, in interface{}, out interface{}) {
buf := func() io.Reader {
if in == nil {
@ -776,7 +782,7 @@ func TestAPIProtocolList(t *testing.T) { @@ -776,7 +782,7 @@ func TestAPIProtocolList(t *testing.T) {
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20,
},*/
err = source.WritePacketRTP(medi, &rtp.Packet{
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
@ -808,25 +814,34 @@ func TestAPIProtocolList(t *testing.T) { @@ -808,25 +814,34 @@ func TestAPIProtocolList(t *testing.T) {
require.NoError(t, err)
defer source.Close()
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whep", false)
defer c.close(t, true)
u, err := url.Parse("http://localhost:8889/mypath/whep")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
err = source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
}()
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
<-c.incomingTrack
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
case "srt":
conf := srt.DefaultConfig()
@ -1094,25 +1109,34 @@ func TestAPIProtocolGet(t *testing.T) { @@ -1094,25 +1109,34 @@ func TestAPIProtocolGet(t *testing.T) {
require.NoError(t, err)
defer source.Close()
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whep", false)
defer c.close(t, true)
u, err := url.Parse("http://localhost:8889/mypath/whep")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
err = source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 1, 2, 3, 4},
})
require.NoError(t, err)
}()
<-c.incomingTrack
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
case "srt":
conf := srt.DefaultConfig()
@ -1384,8 +1408,19 @@ func TestAPIProtocolKick(t *testing.T) { @@ -1384,8 +1408,19 @@ func TestAPIProtocolKick(t *testing.T) {
require.NoError(t, err)
case "webrtc":
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whip", true)
defer c.close(t, false)
u, err := url.Parse("http://localhost:8889/mypath/whip")
require.NoError(t, err)
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Publish(context.Background(), medi.Formats[0], nil)
require.NoError(t, err)
defer func() {
require.Error(t, c.Close())
}()
case "srt":
conf := srt.DefaultConfig()

16
internal/core/path_test.go

@ -2,6 +2,7 @@ package core @@ -2,6 +2,7 @@ package core
import (
"bufio"
"context"
"fmt"
"net"
"net/http"
@ -24,6 +25,7 @@ import ( @@ -24,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/rtmp"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
var runOnDemandSampleScript = `
@ -364,8 +366,18 @@ func TestPathRunOnRead(t *testing.T) { @@ -364,8 +366,18 @@ func TestPathRunOnRead(t *testing.T) {
case "webrtc":
hc := &http.Client{Transport: &http.Transport{}}
c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep?query=value", false)
defer c.close(t, true)
u, err := url.Parse("http://localhost:8889/test/whep?query=value")
require.NoError(t, err)
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
_, err = c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
}
time.Sleep(500 * time.Millisecond)

14
internal/core/webrtc_http_server.go

@ -13,12 +13,12 @@ import ( @@ -13,12 +13,12 @@ import (
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/pion/webrtc/v3"
pwebrtc "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/whip"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
//go:embed webrtc_publish_index.html
@ -42,14 +42,12 @@ func relativeLocation(u *url.URL) string { @@ -42,14 +42,12 @@ func relativeLocation(u *url.URL) string {
type webRTCHTTPServerParent interface {
logger.Writer
generateICEServers() ([]webrtc.ICEServer, error)
generateICEServers() ([]pwebrtc.ICEServer, error)
newSession(req webRTCNewSessionReq) webRTCNewSessionRes
addSessionCandidates(req webRTCAddSessionCandidatesReq) webRTCAddSessionCandidatesRes
deleteSession(req webRTCDeleteSessionReq) error
}
// This implements https://datatracker.ietf.org/doc/draft-ietf-wish-whip/
// This implements https://datatracker.ietf.org/doc/draft-murillo-whep/
type webRTCHTTPServer struct {
allowOrigin string
pathManager *pathManager
@ -170,7 +168,7 @@ func (s *webRTCHTTPServer) onWHIPOptions(ctx *gin.Context, path string, publish @@ -170,7 +168,7 @@ func (s *webRTCHTTPServer) onWHIPOptions(ctx *gin.Context, path string, publish
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH, DELETE")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.Header().Set("Access-Control-Expose-Headers", "Link")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.Header()["Link"] = webrtc.LinkHeaderMarshal(servers)
ctx.Writer.WriteHeader(http.StatusNoContent)
}
@ -215,7 +213,7 @@ func (s *webRTCHTTPServer) onWHIPPost(ctx *gin.Context, path string, publish boo @@ -215,7 +213,7 @@ func (s *webRTCHTTPServer) onWHIPPost(ctx *gin.Context, path string, publish boo
ctx.Writer.Header().Set("ETag", "*")
ctx.Writer.Header().Set("ID", res.sx.uuid.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.Header()["Link"] = webrtc.LinkHeaderMarshal(servers)
ctx.Request.URL.Path += "/" + res.sx.secret.String()
ctx.Writer.Header().Set("Location", relativeLocation(ctx.Request.URL))
ctx.Writer.WriteHeader(http.StatusCreated)
@ -239,7 +237,7 @@ func (s *webRTCHTTPServer) onWHIPPatch(ctx *gin.Context, rawSecret string) { @@ -239,7 +237,7 @@ func (s *webRTCHTTPServer) onWHIPPatch(ctx *gin.Context, rawSecret string) {
return
}
candidates, err := whip.ICEFragmentUnmarshal(byts)
candidates, err := webrtc.ICEFragmentUnmarshal(byts)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return

177
internal/core/webrtc_incoming_track.go

@ -1,177 +0,0 @@ @@ -1,177 +0,0 @@
package core
import (
"fmt"
"strings"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
)
const (
keyFrameInterval = 2 * time.Second
)
type webRTCIncomingTrack struct {
track *webrtc.TrackRemote
receiver *webrtc.RTPReceiver
writeRTCP func([]rtcp.Packet) error
mediaType description.MediaType
format format.Format
media *description.Media
}
func newWebRTCIncomingTrack(
track *webrtc.TrackRemote,
receiver *webrtc.RTPReceiver,
writeRTCP func([]rtcp.Packet) error,
) (*webRTCIncomingTrack, error) {
t := &webRTCIncomingTrack{
track: track,
receiver: receiver,
writeRTCP: writeRTCP,
}
switch strings.ToLower(track.Codec().MimeType) {
case strings.ToLower(webrtc.MimeTypeAV1):
t.mediaType = description.MediaTypeVideo
t.format = &format.AV1{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeVP9):
t.mediaType = description.MediaTypeVideo
t.format = &format.VP9{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeVP8):
t.mediaType = description.MediaTypeVideo
t.format = &format.VP8{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeH264):
t.mediaType = description.MediaTypeVideo
t.format = &format.H264{
PayloadTyp: uint8(track.PayloadType()),
PacketizationMode: 1,
}
case strings.ToLower(webrtc.MimeTypeOpus):
t.mediaType = description.MediaTypeAudio
t.format = &format.Opus{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeG722):
t.mediaType = description.MediaTypeAudio
t.format = &format.G722{}
case strings.ToLower(webrtc.MimeTypePCMU):
t.mediaType = description.MediaTypeAudio
t.format = &format.G711{
MULaw: true,
}
case strings.ToLower(webrtc.MimeTypePCMA):
t.mediaType = description.MediaTypeAudio
t.format = &format.G711{
MULaw: false,
}
default:
return nil, fmt.Errorf("unsupported codec: %v", track.Codec())
}
t.media = &description.Media{
Type: t.mediaType,
Formats: []format.Format{t.format},
}
return t, nil
}
type webrtcTrackWrapper struct {
clockRate int
}
func (w webrtcTrackWrapper) ClockRate() int {
return w.clockRate
}
func (webrtcTrackWrapper) PTSEqualsDTS(*rtp.Packet) bool {
return true
}
func (t *webRTCIncomingTrack) start(stream *stream.Stream, timeDecoder *rtptime.GlobalDecoder, log logger.Writer) {
lossDetector := rtplossdetector.New()
trackWrapper := &webrtcTrackWrapper{clockRate: int(t.track.Codec().ClockRate)}
go func() {
for {
pkt, _, err := t.track.ReadRTP()
if err != nil {
return
}
lost := lossDetector.Process(pkt)
if lost != 0 {
log.Log(logger.Warn, (liberrors.ErrClientRTPPacketsLost{Lost: lost}).Error())
// do not return
}
// sometimes Chrome sends empty RTP packets. ignore them.
if len(pkt.Payload) == 0 {
continue
}
pts, ok := timeDecoder.Decode(trackWrapper, pkt)
if !ok {
continue
}
stream.WriteRTPPacket(t.media, t.format, pkt, time.Now(), pts)
}
}()
// read incoming RTCP packets to make interceptors work
go func() {
buf := make([]byte, 1500)
for {
_, _, err := t.receiver.Read(buf)
if err != nil {
return
}
}
}()
if t.mediaType == description.MediaTypeVideo {
go func() {
keyframeTicker := time.NewTicker(keyFrameInterval)
defer keyframeTicker.Stop()
for range keyframeTicker.C {
err := t.writeRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(t.track.SSRC()),
},
})
if err != nil {
return
}
}
}()
}
}

175
internal/core/webrtc_manager.go

@ -16,105 +16,20 @@ import ( @@ -16,105 +16,20 @@ import (
"github.com/google/uuid"
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
pwebrtc "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
const (
webrtcPauseAfterAuthError = 2 * time.Second
webrtcHandshakeTimeout = 10 * time.Second
webrtcTrackGatherTimeout = 3 * time.Second
webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header)
webrtcStreamID = "mediamtx"
webrtcTurnSecretExpiration = 24 * 3600 * time.Second
webrtcPayloadMaxSize = 1188 // 1200 - 12 (RTP header)
)
var videoCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
PayloadType: 96,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=0",
},
PayloadType: 97,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=1",
},
PayloadType: 98,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
PayloadType: 99,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
},
PayloadType: 100,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
},
PayloadType: 101,
},
}
var audioCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
},
PayloadType: 111,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: 8000,
},
PayloadType: 9,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
},
PayloadType: 0,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMA,
ClockRate: 8000,
},
PayloadType: 8,
},
}
func randInt63() (int64, error) {
var b [8]byte
_, err := rand.Read(b[:])
@ -168,69 +83,6 @@ func randomTurnUser() (string, error) { @@ -168,69 +83,6 @@ func randomTurnUser() (string, error) {
return string(b), nil
}
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
func webrtcNewAPI(
iceInterfaces []string,
iceHostNAT1To1IPs []string,
iceUDPMux ice.UDPMux,
iceTCPMux ice.TCPMux,
) (*webrtc.API, error) {
settingsEngine := webrtc.SettingEngine{}
if len(iceInterfaces) != 0 {
settingsEngine.SetInterfaceFilter(func(iface string) bool {
return stringInSlice(iface, iceInterfaces)
})
}
if len(iceHostNAT1To1IPs) != 0 {
settingsEngine.SetNAT1To1IPs(iceHostNAT1To1IPs, webrtc.ICECandidateTypeHost)
}
if iceUDPMux != nil {
settingsEngine.SetICEUDPMux(iceUDPMux)
}
if iceTCPMux != nil {
settingsEngine.SetICETCPMux(iceTCPMux)
settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4})
}
mediaEngine := &webrtc.MediaEngine{}
for _, codec := range videoCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo)
if err != nil {
return nil, err
}
}
for _, codec := range audioCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio)
if err != nil {
return nil, err
}
}
interceptorRegistry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil {
return nil, err
}
return webrtc.NewAPI(
webrtc.WithSettingEngine(settingsEngine),
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(interceptorRegistry)), nil
}
type webRTCManagerAPISessionsListRes struct {
data *apiWebRTCSessionList
err error
@ -284,7 +136,7 @@ type webRTCAddSessionCandidatesRes struct { @@ -284,7 +136,7 @@ type webRTCAddSessionCandidatesRes struct {
type webRTCAddSessionCandidatesReq struct {
secret uuid.UUID
candidates []*webrtc.ICECandidateInit
candidates []*pwebrtc.ICECandidateInit
res chan webRTCAddSessionCandidatesRes
}
@ -316,7 +168,7 @@ type webRTCManager struct { @@ -316,7 +168,7 @@ type webRTCManager struct {
httpServer *webRTCHTTPServer
udpMuxLn net.PacketConn
tcpMuxLn net.Listener
api *webrtc.API
api *pwebrtc.API
sessions map[*webRTCSession]struct{}
sessionsBySecret map[uuid.UUID]*webRTCSession
@ -403,7 +255,7 @@ func newWebRTCManager( @@ -403,7 +255,7 @@ func newWebRTCManager(
ctxCancel()
return nil, err
}
iceUDPMux = webrtc.NewICEUDPMux(nil, m.udpMuxLn)
iceUDPMux = pwebrtc.NewICEUDPMux(nil, m.udpMuxLn)
}
var iceTCPMux ice.TCPMux
@ -416,10 +268,15 @@ func newWebRTCManager( @@ -416,10 +268,15 @@ func newWebRTCManager(
ctxCancel()
return nil, err
}
iceTCPMux = webrtc.NewICETCPMux(nil, m.tcpMuxLn, 8)
iceTCPMux = pwebrtc.NewICETCPMux(nil, m.tcpMuxLn, 8)
}
m.api, err = webrtcNewAPI(iceInterfaces, iceHostNAT1To1IPs, iceUDPMux, iceTCPMux)
m.api, err = webrtc.NewAPI(webrtc.APIConf{
ICEInterfaces: iceInterfaces,
ICEHostNAT1To1IPs: iceHostNAT1To1IPs,
ICEUDPMux: iceUDPMux,
ICETCPMux: iceTCPMux,
})
if err != nil {
m.udpMuxLn.Close()
m.tcpMuxLn.Close()
@ -572,8 +429,8 @@ func (m *webRTCManager) findSessionByUUID(uuid uuid.UUID) *webRTCSession { @@ -572,8 +429,8 @@ func (m *webRTCManager) findSessionByUUID(uuid uuid.UUID) *webRTCSession {
return nil
}
func (m *webRTCManager) generateICEServers() ([]webrtc.ICEServer, error) {
ret := make([]webrtc.ICEServer, len(m.iceServers))
func (m *webRTCManager) generateICEServers() ([]pwebrtc.ICEServer, error) {
ret := make([]pwebrtc.ICEServer, len(m.iceServers))
for i, server := range m.iceServers {
if server.Username == "AUTH_SECRET" {
@ -592,7 +449,7 @@ func (m *webRTCManager) generateICEServers() ([]webrtc.ICEServer, error) { @@ -592,7 +449,7 @@ func (m *webRTCManager) generateICEServers() ([]webrtc.ICEServer, error) {
server.Password = base64.StdEncoding.EncodeToString(h.Sum(nil))
}
ret[i] = webrtc.ICEServer{
ret[i] = pwebrtc.ICEServer{
URLs: []string{server.URL},
Username: server.Username,
Credential: server.Password,

241
internal/core/webrtc_manager_test.go

@ -13,168 +13,12 @@ import ( @@ -13,168 +13,12 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format"
rtspurl "github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
pwebrtc "github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/whip"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
type nilLogger struct{}
func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
type webRTCTestClient struct {
hc *http.Client
url *url.URL
pc *webrtcpc.PeerConnection
outgoingTrack1 *webrtc.TrackLocalStaticRTP
outgoingTrack2 *webrtc.TrackLocalStaticRTP
incomingTrack chan *webrtc.TrackRemote
}
func newWebRTCTestClient(
t *testing.T,
hc *http.Client,
rawURL string,
publish bool,
) *webRTCTestClient {
c := &webRTCTestClient{
hc: hc,
}
var err error
c.url, err = url.Parse(rawURL)
require.NoError(t, err)
iceServers, err := whip.OptionsICEServers(context.Background(), hc, c.url.String())
require.NoError(t, err)
api, err := webrtcNewAPI(nil, nil, nil, nil)
require.NoError(t, err)
pc, err := webrtcpc.New(iceServers, api, nilLogger{})
require.NoError(t, err)
var outgoingTrack1 *webrtc.TrackLocalStaticRTP
var outgoingTrack2 *webrtc.TrackLocalStaticRTP
var incomingTrack chan *webrtc.TrackRemote
if publish {
var err error
outgoingTrack1, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
"vp8",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack1)
require.NoError(t, err)
outgoingTrack2, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
},
"opus",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack2)
require.NoError(t, err)
} else {
incomingTrack = make(chan *webrtc.TrackRemote, 1)
pc.OnTrack(func(trak *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
incomingTrack <- trak
})
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
require.NoError(t, err)
}
offer, err := pc.CreateOffer(nil)
require.NoError(t, err)
res, err := whip.PostOffer(context.Background(), hc, c.url.String(), &offer)
require.NoError(t, err)
c.url, err = c.url.Parse(res.Location)
require.NoError(t, err)
err = pc.SetLocalDescription(offer)
require.NoError(t, err)
// test adding additional candidates, even if it is not strictly necessary
outer:
for {
select {
case ca := <-pc.NewLocalCandidate():
err := whip.PatchCandidate(context.Background(), hc, c.url.String(), &offer, res.ETag, ca)
require.NoError(t, err)
case <-pc.GatheringDone():
break outer
}
}
err = pc.SetRemoteDescription(*res.Answer)
require.NoError(t, err)
<-pc.Connected()
if publish {
err := outgoingTrack1.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
err = outgoingTrack2.WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 1123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{2},
})
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
}
c.pc = pc
c.outgoingTrack1 = outgoingTrack1
c.outgoingTrack2 = outgoingTrack2
c.incomingTrack = incomingTrack
return c
}
func (c *webRTCTestClient) close(t *testing.T, delete bool) {
if delete {
err := whip.DeleteSession(context.Background(), c.hc, c.url.String())
require.NoError(t, err)
}
c.pc.Close()
}
func TestWebRTCRead(t *testing.T) {
for _, auth := range []string{
"none",
@ -255,27 +99,36 @@ func TestWebRTCRead(t *testing.T) { @@ -255,27 +99,36 @@ func TestWebRTCRead(t *testing.T) {
}
ur += "localhost:8889/teststream/whep?param=value"
c := newWebRTCTestClient(t, hc, ur, false)
defer c.close(t, true)
time.Sleep(500 * time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
err := source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 3},
})
require.NoError(t, err)
}()
err = source.WritePacketRTP(medi, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{5, 3},
})
u, err := url.Parse(ur)
require.NoError(t, err)
trak := <-c.incomingTrack
c := &webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
}
tracks, err := c.Read(context.Background())
require.NoError(t, err)
defer checkClose(t, c.Close)
pkt, _, err := trak.ReadRTP()
pkt, err := tracks[0].ReadRTP()
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
@ -301,16 +154,16 @@ func TestWebRTCReadNotFound(t *testing.T) { @@ -301,16 +154,16 @@ func TestWebRTCReadNotFound(t *testing.T) {
hc := &http.Client{Transport: &http.Transport{}}
iceServers, err := whip.OptionsICEServers(context.Background(), hc, "http://localhost:8889/stream/whep")
iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc, "http://localhost:8889/stream/whep")
require.NoError(t, err)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
pc, err := pwebrtc.NewPeerConnection(pwebrtc.Configuration{
ICEServers: iceServers,
})
require.NoError(t, err)
defer pc.Close() //nolint:errcheck
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
_, err = pc.AddTransceiverFromKind(pwebrtc.RTPCodecTypeVideo)
require.NoError(t, err)
offer, err := pc.CreateOffer(nil)
@ -403,8 +256,32 @@ func TestWebRTCPublish(t *testing.T) { @@ -403,8 +256,32 @@ func TestWebRTCPublish(t *testing.T) {
}
ur += "localhost:8889/teststream/whip?param=value"
s := newWebRTCTestClient(t, hc, ur, true)
defer s.close(t, true)
su, err := url.Parse(ur)
require.NoError(t, err)
s := &webrtc.WHIPClient{
HTTPClient: hc,
URL: su,
}
tracks, err := s.Publish(context.Background(), testMediaH264.Formats[0], nil)
require.NoError(t, err)
defer checkClose(t, s.Close)
err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 123,
Timestamp: 45343,
SSRC: 563423,
},
Payload: []byte{1},
})
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
if auth == "external" {
a.close()
@ -428,7 +305,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -428,7 +305,7 @@ func TestWebRTCPublish(t *testing.T) {
desc, _, err := c.Describe(u)
require.NoError(t, err)
var forma *format.VP8
var forma *format.H264
medi := desc.FindFormat(&forma)
_, err = c.Setup(desc.BaseURL, medi, 0, 0)
@ -444,7 +321,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -444,7 +321,7 @@ func TestWebRTCPublish(t *testing.T) {
_, err = c.Play(nil)
require.NoError(t, err)
err = s.outgoingTrack1.WriteRTP(&rtp.Packet{
err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,

364
internal/core/webrtc_outgoing_track.go

@ -1,364 +0,0 @@ @@ -1,364 +0,0 @@
package core
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
)
type webRTCOutgoingTrack struct {
sender *webrtc.RTPSender
media *description.Media
format format.Format
track *webrtc.TrackLocalStaticRTP
cb func(unit.Unit) error
}
func newWebRTCOutgoingTrackVideo(desc *description.Session) (*webRTCOutgoingTrack, error) {
var av1Format *format.AV1
videoMedia := desc.FindFormat(&av1Format)
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
"av1",
webrtcStreamID,
)
if err != nil {
return nil, err
}
encoder := &rtpav1.Encoder{
PayloadType: 105,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err = encoder.Init()
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: videoMedia,
format: av1Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
}
packets, err := encoder.Encode(tunit.TU)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
var vp9Format *format.VP9
videoMedia = desc.FindFormat(&vp9Format)
if videoMedia != nil { //nolint:dupl
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: uint32(vp9Format.ClockRate()),
},
"vp9",
webrtcStreamID,
)
if err != nil {
return nil, err
}
encoder := &rtpvp9.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err = encoder.Init()
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: videoMedia,
format: vp9Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if tunit.Frame == nil {
return nil
}
packets, err := encoder.Encode(tunit.Frame)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
var vp8Format *format.VP8
videoMedia = desc.FindFormat(&vp8Format)
if videoMedia != nil { //nolint:dupl
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: uint32(vp8Format.ClockRate()),
},
"vp8",
webrtcStreamID,
)
if err != nil {
return nil, err
}
encoder := &rtpvp8.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err = encoder.Init()
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: videoMedia,
format: vp8Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
tunit := u.(*unit.VP8)
if tunit.Frame == nil {
return nil
}
packets, err := encoder.Encode(tunit.Frame)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
var h264Format *format.H264
videoMedia = desc.FindFormat(&h264Format)
if videoMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: uint32(h264Format.ClockRate()),
},
"h264",
webrtcStreamID,
)
if err != nil {
return nil, err
}
encoder := &rtph264.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err = encoder.Init()
if err != nil {
return nil, err
}
firstReceived := false
var lastPTS time.Duration
return &webRTCOutgoingTrack{
media: videoMedia,
format: h264Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames")
}
lastPTS = tunit.PTS
packets, err := encoder.Encode(tunit.AU)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
return nil, nil
}
func newWebRTCOutgoingTrackAudio(desc *description.Session) (*webRTCOutgoingTrack, error) {
var opusFormat *format.Opus
audioMedia := desc.FindFormat(&opusFormat)
if audioMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: uint32(opusFormat.ClockRate()),
Channels: 2,
},
"opus",
webrtcStreamID,
)
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: audioMedia,
format: opusFormat,
track: webRTCTrak,
cb: func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
var g722Format *format.G722
audioMedia = desc.FindFormat(&g722Format)
if audioMedia != nil {
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: uint32(g722Format.ClockRate()),
},
"g722",
webrtcStreamID,
)
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: audioMedia,
format: g722Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
var g711Format *format.G711
audioMedia = desc.FindFormat(&g711Format)
if audioMedia != nil {
var mtyp string
if g711Format.MULaw {
mtyp = webrtc.MimeTypePCMU
} else {
mtyp = webrtc.MimeTypePCMA
}
webRTCTrak, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: mtyp,
ClockRate: uint32(g711Format.ClockRate()),
},
"g711",
webrtcStreamID,
)
if err != nil {
return nil, err
}
return &webRTCOutgoingTrack{
media: audioMedia,
format: g711Format,
track: webRTCTrak,
cb: func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
webRTCTrak.WriteRTP(pkt) //nolint:errcheck
}
return nil
},
}, nil
}
return nil, nil
}
func (t *webRTCOutgoingTrack) start(
stream *stream.Stream,
writer *asyncwriter.Writer,
) {
// read incoming RTCP packets to make interceptors work
go func() {
buf := make([]byte, 1500)
for {
_, _, err := t.sender.Read(buf)
if err != nil {
return
}
}
}()
stream.AddReader(writer, t.media, t.format, t.cb)
}

502
internal/core/webrtc_session.go

@ -11,156 +11,293 @@ import ( @@ -11,156 +11,293 @@ import (
"time"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpav1"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtph264"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp8"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpvp9"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/google/uuid"
"github.com/pion/rtp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
pwebrtc "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/asyncwriter"
"github.com/bluenviron/mediamtx/internal/externalcmd"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
type trackRecvPair struct {
track *webrtc.TrackRemote
receiver *webrtc.RTPReceiver
type webrtcTrackWrapper struct {
clockRate int
}
func webrtcMediasOfIncomingTracks(tracks []*webRTCIncomingTrack) []*description.Media {
ret := make([]*description.Media, len(tracks))
for i, track := range tracks {
ret[i] = track.media
}
return ret
func (w webrtcTrackWrapper) ClockRate() int {
return w.clockRate
}
func whipOffer(body []byte) *webrtc.SessionDescription {
return &webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: string(body),
}
func (webrtcTrackWrapper) PTSEqualsDTS(*rtp.Packet) bool {
return true
}
func webrtcWaitUntilConnected(
ctx context.Context,
pc *webrtcpc.PeerConnection,
) error {
t := time.NewTimer(webrtcHandshakeTimeout)
defer t.Stop()
type setupStreamFunc func(*webrtc.OutgoingTrack) error
outer:
for {
select {
case <-t.C:
return fmt.Errorf("deadline exceeded while waiting connection")
func webrtcFindVideoTrack(
stream *stream.Stream,
writer *asyncwriter.Writer,
) (format.Format, setupStreamFunc) {
var av1Format *format.AV1
media := stream.Desc().FindFormat(&av1Format)
case <-pc.Connected():
break outer
if av1Format != nil {
return av1Format, func(track *webrtc.OutgoingTrack) error {
encoder := &rtpav1.Encoder{
PayloadType: 105,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err := encoder.Init()
if err != nil {
return err
}
case <-ctx.Done():
return fmt.Errorf("terminated")
stream.AddReader(writer, media, av1Format, func(u unit.Unit) error {
tunit := u.(*unit.AV1)
if tunit.TU == nil {
return nil
}
packets, err := encoder.Encode(tunit.TU)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
return nil
}
var vp9Format *format.VP9
media = stream.Desc().FindFormat(&vp9Format)
func webrtcGatherOutgoingTracks(desc *description.Session) ([]*webRTCOutgoingTrack, error) {
var tracks []*webRTCOutgoingTrack
if vp9Format != nil {
return vp9Format, func(track *webrtc.OutgoingTrack) error {
encoder := &rtpvp9.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err := encoder.Init()
if err != nil {
return err
}
videoTrack, err := newWebRTCOutgoingTrackVideo(desc)
if err != nil {
return nil, err
}
stream.AddReader(writer, media, vp9Format, func(u unit.Unit) error {
tunit := u.(*unit.VP9)
if videoTrack != nil {
tracks = append(tracks, videoTrack)
}
if tunit.Frame == nil {
return nil
}
audioTrack, err := newWebRTCOutgoingTrackAudio(desc)
if err != nil {
return nil, err
packets, err := encoder.Encode(tunit.Frame)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
if audioTrack != nil {
tracks = append(tracks, audioTrack)
var vp8Format *format.VP8
media = stream.Desc().FindFormat(&vp8Format)
if vp8Format != nil {
return vp8Format, func(track *webrtc.OutgoingTrack) error {
encoder := &rtpvp8.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err := encoder.Init()
if err != nil {
return err
}
stream.AddReader(writer, media, vp8Format, func(u unit.Unit) error {
tunit := u.(*unit.VP8)
if tunit.Frame == nil {
return nil
}
packets, err := encoder.Encode(tunit.Frame)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
if tracks == nil {
return nil, fmt.Errorf(
"the stream doesn't contain any supported codec, which are currently AV1, VP9, VP8, H264, Opus, G722, G711")
var h264Format *format.H264
media = stream.Desc().FindFormat(&h264Format)
if h264Format != nil {
return h264Format, func(track *webrtc.OutgoingTrack) error {
encoder := &rtph264.Encoder{
PayloadType: 96,
PayloadMaxSize: webrtcPayloadMaxSize,
}
err := encoder.Init()
if err != nil {
return err
}
firstReceived := false
var lastPTS time.Duration
stream.AddReader(writer, media, h264Format, func(u unit.Unit) error {
tunit := u.(*unit.H264)
if tunit.AU == nil {
return nil
}
if !firstReceived {
firstReceived = true
} else if tunit.PTS < lastPTS {
return fmt.Errorf("WebRTC doesn't support H264 streams with B-frames")
}
lastPTS = tunit.PTS
packets, err := encoder.Encode(tunit.AU)
if err != nil {
return nil //nolint:nilerr
}
for _, pkt := range packets {
pkt.Timestamp += tunit.RTPPackets[0].Timestamp
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
return tracks, nil
return nil, nil
}
func webrtcTrackCount(medias []*sdp.MediaDescription) (int, error) {
videoTrack := false
audioTrack := false
trackCount := 0
func webrtcFindAudioTrack(
stream *stream.Stream,
writer *asyncwriter.Writer,
) (format.Format, setupStreamFunc) {
var opusFormat *format.Opus
media := stream.Desc().FindFormat(&opusFormat)
if opusFormat != nil {
return opusFormat, func(track *webrtc.OutgoingTrack) error {
stream.AddReader(writer, media, opusFormat, func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
track.WriteRTP(pkt) //nolint:errcheck
}
for _, media := range medias {
switch media.MediaName.Media {
case "video":
if videoTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
videoTrack = true
return nil
})
return nil
}
}
case "audio":
if audioTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
audioTrack = true
var g722Format *format.G722
media = stream.Desc().FindFormat(&g722Format)
default:
return 0, fmt.Errorf("unsupported media '%s'", media.MediaName.Media)
if g722Format != nil {
return g722Format, func(track *webrtc.OutgoingTrack) error {
stream.AddReader(writer, media, g722Format, func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
track.WriteRTP(pkt) //nolint:errcheck
}
return nil
})
return nil
}
}
var g711Format *format.G711
media = stream.Desc().FindFormat(&g711Format)
if g711Format != nil {
return g711Format, func(track *webrtc.OutgoingTrack) error {
stream.AddReader(writer, media, g711Format, func(u unit.Unit) error {
for _, pkt := range u.GetRTPPackets() {
track.WriteRTP(pkt) //nolint:errcheck
}
trackCount++
return nil
})
return nil
}
}
return trackCount, nil
return nil, nil
}
func webrtcGatherIncomingTracks(
ctx context.Context,
pc *webrtcpc.PeerConnection,
trackRecv chan trackRecvPair,
trackCount int,
) ([]*webRTCIncomingTrack, error) {
var tracks []*webRTCIncomingTrack
t := time.NewTimer(webrtcTrackGatherTimeout)
defer t.Stop()
func webrtcMediasOfIncomingTracks(tracks []*webrtc.IncomingTrack) []*description.Media {
ret := make([]*description.Media, len(tracks))
for {
select {
case <-t.C:
if trackCount == 0 {
return tracks, nil
}
return nil, fmt.Errorf("deadline exceeded while waiting tracks")
for i, track := range tracks {
forma := track.Format()
case pair := <-trackRecv:
track, err := newWebRTCIncomingTrack(pair.track, pair.receiver, pc.WriteRTCP)
if err != nil {
return nil, err
}
tracks = append(tracks, track)
var mediaType description.MediaType
if len(tracks) == trackCount {
return tracks, nil
}
switch forma.(type) {
case *format.AV1, *format.VP9, *format.VP8, *format.H264:
mediaType = description.MediaTypeVideo
case <-pc.Disconnected():
return nil, fmt.Errorf("peer connection closed")
default:
mediaType = description.MediaTypeAudio
}
case <-ctx.Done():
return nil, fmt.Errorf("terminated")
ret[i] = &description.Media{
Type: mediaType,
Formats: []format.Format{forma},
}
}
return ret
}
func whipOffer(body []byte) *pwebrtc.SessionDescription {
return &pwebrtc.SessionDescription{
Type: pwebrtc.SDPTypeOffer,
SDP: string(body),
}
}
type webRTCSessionPathManager interface {
@ -170,7 +307,7 @@ type webRTCSessionPathManager interface { @@ -170,7 +307,7 @@ type webRTCSessionPathManager interface {
type webRTCSession struct {
writeQueueSize int
api *webrtc.API
api *pwebrtc.API
req webRTCNewSessionReq
wg *sync.WaitGroup
externalCmdPool *externalcmd.Pool
@ -183,7 +320,7 @@ type webRTCSession struct { @@ -183,7 +320,7 @@ type webRTCSession struct {
uuid uuid.UUID
secret uuid.UUID
mutex sync.RWMutex
pc *webrtcpc.PeerConnection
pc *webrtc.PeerConnection
chNew chan webRTCNewSessionReq
chAddCandidates chan webRTCAddSessionCandidatesReq
@ -192,7 +329,7 @@ type webRTCSession struct { @@ -192,7 +329,7 @@ type webRTCSession struct {
func newWebRTCSession(
parentCtx context.Context,
writeQueueSize int,
api *webrtc.API,
api *pwebrtc.API,
req webRTCNewSessionReq,
wg *sync.WaitGroup,
externalCmdPool *externalcmd.Pool,
@ -302,15 +439,18 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -302,15 +439,18 @@ func (s *webRTCSession) runPublish() (int, error) {
defer res.path.removePublisher(pathRemovePublisherReq{author: s})
servers, err := s.parent.generateICEServers()
iceServers, err := s.parent.generateICEServers()
if err != nil {
return http.StatusInternalServerError, err
}
pc, err := webrtcpc.New(
servers,
s.api,
s)
pc := &webrtc.PeerConnection{
ICEServers: iceServers,
API: s.api,
Publish: false,
Log: s,
}
err = pc.Start()
if err != nil {
return http.StatusBadRequest, err
}
@ -324,59 +464,21 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -324,59 +464,21 @@ func (s *webRTCSession) runPublish() (int, error) {
return http.StatusBadRequest, err
}
trackCount, err := webrtcTrackCount(sdp.MediaDescriptions)
if err != nil {
return http.StatusBadRequest, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
trackCount, err := webrtc.TrackCount(sdp.MediaDescriptions)
if err != nil {
return http.StatusBadRequest, err
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
answer, err := pc.CreateFullAnswer(s.ctx, offer)
if err != nil {
return http.StatusBadRequest, err
}
trackRecv := make(chan trackRecvPair)
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case trackRecv <- trackRecvPair{track, receiver}:
case <-s.ctx.Done():
}
})
err = pc.SetRemoteDescription(*offer)
if err != nil {
return http.StatusBadRequest, err
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
return http.StatusBadRequest, err
}
err = pc.SetLocalDescription(answer)
if err != nil {
return http.StatusBadRequest, err
}
err = pc.WaitGatheringDone(s.ctx)
if err != nil {
return http.StatusBadRequest, err
}
s.writeAnswer(pc.LocalDescription())
s.writeAnswer(answer)
go s.readRemoteCandidates(pc)
err = webrtcWaitUntilConnected(s.ctx, pc)
err = pc.WaitUntilConnected(s.ctx)
if err != nil {
return 0, err
}
@ -385,10 +487,11 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -385,10 +487,11 @@ func (s *webRTCSession) runPublish() (int, error) {
s.pc = pc
s.mutex.Unlock()
tracks, err := webrtcGatherIncomingTracks(s.ctx, pc, trackRecv, trackCount)
tracks, err := pc.GatherIncomingTracks(s.ctx, trackCount)
if err != nil {
return 0, err
}
medias := webrtcMediasOfIncomingTracks(tracks)
rres := res.path.startPublisher(pathStartPublisherReq{
@ -402,8 +505,26 @@ func (s *webRTCSession) runPublish() (int, error) { @@ -402,8 +505,26 @@ func (s *webRTCSession) runPublish() (int, error) {
timeDecoder := rtptime.NewGlobalDecoder()
for _, track := range tracks {
track.start(rres.stream, timeDecoder, s)
for i, media := range medias {
ci := i
cmedia := media
trackWrapper := &webrtcTrackWrapper{clockRate: cmedia.Formats[0].ClockRate()}
go func() {
for {
pkt, err := tracks[ci].ReadRTP()
if err != nil {
return
}
pts, ok := timeDecoder.Decode(trackWrapper, pkt)
if !ok {
continue
}
rres.stream.WriteRTPPacket(cmedia, cmedia.Formats[0], pkt, time.Now(), pts)
}
}()
}
select {
@ -447,60 +568,50 @@ func (s *webRTCSession) runRead() (int, error) { @@ -447,60 +568,50 @@ func (s *webRTCSession) runRead() (int, error) {
defer res.path.removeReader(pathRemoveReaderReq{author: s})
tracks, err := webrtcGatherOutgoingTracks(res.stream.Desc())
if err != nil {
return http.StatusBadRequest, err
}
servers, err := s.parent.generateICEServers()
iceServers, err := s.parent.generateICEServers()
if err != nil {
return http.StatusInternalServerError, err
}
pc, err := webrtcpc.New(
servers,
s.api,
s)
pc := &webrtc.PeerConnection{
ICEServers: iceServers,
API: s.api,
Publish: false,
Log: s,
}
err = pc.Start()
if err != nil {
return http.StatusBadRequest, err
}
defer pc.Close()
for _, track := range tracks {
var err error
track.sender, err = pc.AddTrack(track.track)
if err != nil {
return http.StatusBadRequest, err
}
}
writer := asyncwriter.New(s.writeQueueSize, s)
offer := whipOffer(s.req.offer)
videoTrack, videoSetup := webrtcFindVideoTrack(res.stream, writer)
audioTrack, audioSetup := webrtcFindAudioTrack(res.stream, writer)
err = pc.SetRemoteDescription(*offer)
if err != nil {
return http.StatusBadRequest, err
if videoTrack == nil && audioTrack == nil {
return http.StatusBadRequest, fmt.Errorf(
"the stream doesn't contain any supported codec, which are currently AV1, VP9, VP8, H264, Opus, G722, G711")
}
answer, err := pc.CreateAnswer(nil)
tracks, err := pc.SetupOutgoingTracks(videoTrack, audioTrack)
if err != nil {
return http.StatusBadRequest, err
}
err = pc.SetLocalDescription(answer)
if err != nil {
return http.StatusBadRequest, err
}
offer := whipOffer(s.req.offer)
err = pc.WaitGatheringDone(s.ctx)
answer, err := pc.CreateFullAnswer(s.ctx, offer)
if err != nil {
return http.StatusBadRequest, err
}
s.writeAnswer(pc.LocalDescription())
s.writeAnswer(answer)
go s.readRemoteCandidates(pc)
err = webrtcWaitUntilConnected(s.ctx, pc)
err = pc.WaitUntilConnected(s.ctx)
if err != nil {
return 0, err
}
@ -509,12 +620,23 @@ func (s *webRTCSession) runRead() (int, error) { @@ -509,12 +620,23 @@ func (s *webRTCSession) runRead() (int, error) {
s.pc = pc
s.mutex.Unlock()
writer := asyncwriter.New(s.writeQueueSize, s)
defer res.stream.RemoveReader(writer)
for _, track := range tracks {
track.start(res.stream, writer)
n := 0
if videoTrack != nil {
err := videoSetup(tracks[n])
if err != nil {
return 0, err
}
n++
}
if audioTrack != nil {
err := audioSetup(tracks[n])
if err != nil {
return 0, err
}
}
s.Log(logger.Info, "is reading from path '%s', %s",
@ -548,19 +670,19 @@ func (s *webRTCSession) runRead() (int, error) { @@ -548,19 +670,19 @@ func (s *webRTCSession) runRead() (int, error) {
}
}
func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) {
func (s *webRTCSession) writeAnswer(answer *pwebrtc.SessionDescription) {
s.req.res <- webRTCNewSessionRes{
sx: s,
answer: []byte(answer.SDP),
}
}
func (s *webRTCSession) readRemoteCandidates(pc *webrtcpc.PeerConnection) {
func (s *webRTCSession) readRemoteCandidates(pc *webrtc.PeerConnection) {
for {
select {
case req := <-s.chAddCandidates:
for _, candidate := range req.candidates {
err := pc.AddICECandidate(*candidate)
err := pc.AddRemoteCandidate(*candidate)
if err != nil {
req.res <- webRTCAddSessionCandidatesRes{err: err}
}

117
internal/core/webrtc_source.go

@ -2,7 +2,6 @@ package core @@ -2,7 +2,6 @@ package core
import (
"context"
"fmt"
"net/http"
"net/url"
"strings"
@ -10,13 +9,10 @@ import ( @@ -10,13 +9,10 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/whip"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
type webRTCSourceParent interface {
@ -58,91 +54,22 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat @@ -58,91 +54,22 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat
u.Scheme = strings.ReplaceAll(u.Scheme, "whep", "http")
c := &http.Client{
hc := &http.Client{
Timeout: time.Duration(s.readTimeout),
}
iceServers, err := whip.OptionsICEServers(ctx, c, u.String())
if err != nil {
return err
}
api, err := webrtcNewAPI(nil, nil, nil, nil)
if err != nil {
return err
}
pc, err := webrtcpc.New(iceServers, api, s)
if err != nil {
return err
}
defer pc.Close()
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
if err != nil {
return err
client := webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
Log: s,
}
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio)
tracks, err := client.Read(ctx)
if err != nil {
return err
}
defer client.Close() //nolint:errcheck
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
err = pc.SetLocalDescription(offer)
if err != nil {
return err
}
err = pc.WaitGatheringDone(ctx)
if err != nil {
return err
}
res, err := whip.PostOffer(ctx, c, u.String(), pc.LocalDescription())
if err != nil {
return err
}
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(res.Answer.SDP))
if err != nil {
return err
}
// check that there are at most two tracks
_, err = webrtcTrackCount(sdp.MediaDescriptions)
if err != nil {
return err
}
trackRecv := make(chan trackRecvPair)
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case trackRecv <- trackRecvPair{track, receiver}:
case <-ctx.Done():
}
})
err = pc.SetRemoteDescription(*res.Answer)
if err != nil {
return err
}
err = webrtcWaitUntilConnected(ctx, pc)
if err != nil {
return err
}
tracks, err := webrtcGatherIncomingTracks(ctx, pc, trackRecv, 0)
if err != nil {
return err
}
medias := webrtcMediasOfIncomingTracks(tracks)
rres := s.parent.setReady(pathSourceStaticSetReadyReq{
@ -157,17 +84,29 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat @@ -157,17 +84,29 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat
timeDecoder := rtptime.NewGlobalDecoder()
for _, track := range tracks {
track.start(rres.stream, timeDecoder, s)
}
for i, media := range medias {
ci := i
cmedia := media
trackWrapper := &webrtcTrackWrapper{clockRate: cmedia.Formats[0].ClockRate()}
go func() {
for {
pkt, err := tracks[ci].ReadRTP()
if err != nil {
return
}
select {
case <-pc.Disconnected():
return fmt.Errorf("peer connection closed")
pts, ok := timeDecoder.Decode(trackWrapper, pkt)
if !ok {
continue
}
case <-ctx.Done():
return fmt.Errorf("terminated")
rres.stream.WriteRTPPacket(cmedia, cmedia.Formats[0], pkt, time.Now(), pts)
}
}()
}
return client.Wait(ctx)
}
// apiSourceDescribe implements sourceStaticImpl.

74
internal/core/webrtc_source_test.go

@ -11,49 +11,36 @@ import ( @@ -11,49 +11,36 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"github.com/bluenviron/mediamtx/internal/webrtcpc"
"github.com/bluenviron/mediamtx/internal/webrtc"
)
func TestWebRTCSource(t *testing.T) {
state := 0
api, err := webrtcNewAPI(nil, nil, nil, nil)
api, err := webrtc.NewAPI(webrtc.APIConf{})
require.NoError(t, err)
pc, err := webrtcpc.New(nil, api, nilLogger{})
pc := &webrtc.PeerConnection{
API: api,
Publish: true,
}
err = pc.Start()
require.NoError(t, err)
defer pc.Close()
outgoingTrack1, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
tracks, err := pc.SetupOutgoingTracks(
&format.VP8{
PayloadTyp: 96,
},
"vp8",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack1)
require.NoError(t, err)
outgoingTrack2, err := webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
&format.Opus{
PayloadTyp: 111,
IsStereo: true,
},
"opus",
webrtcStreamID,
)
require.NoError(t, err)
_, err = pc.AddTrack(outgoingTrack2)
require.NoError(t, err)
httpServ := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch state {
@ -74,16 +61,7 @@ func TestWebRTCSource(t *testing.T) { @@ -74,16 +61,7 @@ func TestWebRTCSource(t *testing.T) {
require.NoError(t, err)
offer := whipOffer(body)
err = pc.SetRemoteDescription(*offer)
require.NoError(t, err)
answer, err := pc.CreateAnswer(nil)
require.NoError(t, err)
err = pc.SetLocalDescription(answer)
require.NoError(t, err)
err = pc.WaitGatheringDone(context.Background())
answer, err := pc.CreateFullAnswer(context.Background(), offer)
require.NoError(t, err)
w.Header().Set("Content-Type", "application/sdp")
@ -91,12 +69,13 @@ func TestWebRTCSource(t *testing.T) { @@ -91,12 +69,13 @@ func TestWebRTCSource(t *testing.T) {
w.Header().Set("ETag", "test_etag")
w.Header().Set("Location", "/my/resource/sessionid")
w.WriteHeader(http.StatusCreated)
w.Write([]byte(pc.LocalDescription().SDP))
w.Write([]byte(answer.SDP))
go func() {
<-pc.Connected()
err = pc.WaitUntilConnected(context.Background())
require.NoError(t, err)
err = outgoingTrack1.WriteRTP(&rtp.Packet{
err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
@ -109,7 +88,7 @@ func TestWebRTCSource(t *testing.T) { @@ -109,7 +88,7 @@ func TestWebRTCSource(t *testing.T) {
})
require.NoError(t, err)
err = outgoingTrack2.WriteRTP(&rtp.Packet{
err = tracks[1].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
@ -124,7 +103,18 @@ func TestWebRTCSource(t *testing.T) { @@ -124,7 +103,18 @@ func TestWebRTCSource(t *testing.T) {
}()
default:
t.Errorf("should not happen since there should not be additional candidates")
require.Equal(t, "/my/resource/sessionid", r.URL.Path)
switch r.Method {
case http.MethodPatch:
w.WriteHeader(http.StatusNoContent)
case http.MethodDelete:
w.WriteHeader(http.StatusOK)
default:
t.Errorf("should not happen")
}
}
state++
}),
@ -171,7 +161,7 @@ func TestWebRTCSource(t *testing.T) { @@ -171,7 +161,7 @@ func TestWebRTCSource(t *testing.T) {
_, err = c.Play(nil)
require.NoError(t, err)
err = outgoingTrack1.WriteRTP(&rtp.Packet{
err = tracks[0].WriteRTP(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,

156
internal/webrtc/api.go

@ -0,0 +1,156 @@ @@ -0,0 +1,156 @@
package webrtc
import (
"github.com/pion/ice/v2"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v3"
)
func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}
var videoCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
PayloadType: 96,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=0",
},
PayloadType: 97,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: 90000,
SDPFmtpLine: "profile-id=1",
},
PayloadType: 98,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: 90000,
},
PayloadType: 99,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f",
},
PayloadType: 100,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: 90000,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f",
},
PayloadType: 101,
},
}
var audioCodecs = []webrtc.RTPCodecParameters{
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: 48000,
Channels: 2,
SDPFmtpLine: "minptime=10;useinbandfec=1;stereo=1;sprop-stereo=1",
},
PayloadType: 111,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: 8000,
},
PayloadType: 9,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMU,
ClockRate: 8000,
},
PayloadType: 0,
},
{
RTPCodecCapability: webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypePCMA,
ClockRate: 8000,
},
PayloadType: 8,
},
}
// APIConf is the configuration passed to NewAPI().
type APIConf struct {
ICEInterfaces []string
ICEHostNAT1To1IPs []string
ICEUDPMux ice.UDPMux
ICETCPMux ice.TCPMux
}
// NewAPI allocates a webrtc API.
func NewAPI(conf APIConf) (*webrtc.API, error) {
settingsEngine := webrtc.SettingEngine{}
if len(conf.ICEInterfaces) != 0 {
settingsEngine.SetInterfaceFilter(func(iface string) bool {
return stringInSlice(iface, conf.ICEInterfaces)
})
}
if len(conf.ICEHostNAT1To1IPs) != 0 {
settingsEngine.SetNAT1To1IPs(conf.ICEHostNAT1To1IPs, webrtc.ICECandidateTypeHost)
}
if conf.ICEUDPMux != nil {
settingsEngine.SetICEUDPMux(conf.ICEUDPMux)
}
if conf.ICETCPMux != nil {
settingsEngine.SetICETCPMux(conf.ICETCPMux)
settingsEngine.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4})
}
mediaEngine := &webrtc.MediaEngine{}
for _, codec := range videoCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeVideo)
if err != nil {
return nil, err
}
}
for _, codec := range audioCodecs {
err := mediaEngine.RegisterCodec(codec, webrtc.RTPCodecTypeAudio)
if err != nil {
return nil, err
}
}
interceptorRegistry := &interceptor.Registry{}
if err := webrtc.RegisterDefaultInterceptors(mediaEngine, interceptorRegistry); err != nil {
return nil, err
}
return webrtc.NewAPI(
webrtc.WithSettingEngine(settingsEngine),
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(interceptorRegistry)), nil
}

2
internal/whip/ice_fragment.go → internal/webrtc/ice_fragment.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"fmt"

2
internal/whip/ice_fragment_test.go → internal/webrtc/ice_fragment_test.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"testing"

152
internal/webrtc/incoming_track.go

@ -0,0 +1,152 @@ @@ -0,0 +1,152 @@
package webrtc
import (
"fmt"
"strings"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/rtplossdetector"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
const (
keyFrameInterval = 2 * time.Second
)
// IncomingTrack is an incoming track.
type IncomingTrack struct {
track *webrtc.TrackRemote
log logger.Writer
format format.Format
lossDetector *rtplossdetector.LossDetector
}
func newIncomingTrack(
track *webrtc.TrackRemote,
receiver *webrtc.RTPReceiver,
writeRTCP func([]rtcp.Packet) error,
log logger.Writer,
) (*IncomingTrack, error) {
t := &IncomingTrack{
track: track,
log: log,
lossDetector: rtplossdetector.New(),
}
isVideo := false
switch strings.ToLower(track.Codec().MimeType) {
case strings.ToLower(webrtc.MimeTypeAV1):
isVideo = true
t.format = &format.AV1{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeVP9):
isVideo = true
t.format = &format.VP9{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeVP8):
isVideo = true
t.format = &format.VP8{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeH264):
isVideo = true
t.format = &format.H264{
PayloadTyp: uint8(track.PayloadType()),
PacketizationMode: 1,
}
case strings.ToLower(webrtc.MimeTypeOpus):
t.format = &format.Opus{
PayloadTyp: uint8(track.PayloadType()),
}
case strings.ToLower(webrtc.MimeTypeG722):
t.format = &format.G722{}
case strings.ToLower(webrtc.MimeTypePCMU):
t.format = &format.G711{
MULaw: true,
}
case strings.ToLower(webrtc.MimeTypePCMA):
t.format = &format.G711{
MULaw: false,
}
default:
return nil, fmt.Errorf("unsupported codec: %v", track.Codec())
}
// read incoming RTCP packets to make interceptors work
go func() {
buf := make([]byte, 1500)
for {
_, _, err := receiver.Read(buf)
if err != nil {
return
}
}
}()
// send period key frame requests
if isVideo {
go func() {
keyframeTicker := time.NewTicker(keyFrameInterval)
defer keyframeTicker.Stop()
for range keyframeTicker.C {
err := writeRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(t.track.SSRC()),
},
})
if err != nil {
return
}
}
}()
}
return t, nil
}
// Format returns the track format.
func (t *IncomingTrack) Format() format.Format {
return t.format
}
// ReadRTP reads a RTP packet.
func (t *IncomingTrack) ReadRTP() (*rtp.Packet, error) {
for {
pkt, _, err := t.track.ReadRTP()
if err != nil {
return nil, err
}
lost := t.lossDetector.Process(pkt)
if lost != 0 {
t.log.Log(logger.Warn, (liberrors.ErrClientRTPPacketsLost{Lost: lost}).Error())
// do not return
}
// sometimes Chrome sends empty RTP packets. ignore them.
if len(pkt.Payload) == 0 {
continue
}
return pkt, nil
}
}

2
internal/whip/link_header.go → internal/webrtc/link_header.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"encoding/json"

2
internal/whip/link_header_test.go → internal/webrtc/link_header_test.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"testing"

154
internal/webrtc/outgoing_track.go

@ -0,0 +1,154 @@ @@ -0,0 +1,154 @@
package webrtc
import (
"fmt"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
)
type addTrackFunc func(webrtc.TrackLocal) (*webrtc.RTPSender, error)
// OutgoingTrack is a WebRTC outgoing track
type OutgoingTrack struct {
track *webrtc.TrackLocalStaticRTP
}
func newOutgoingTrack(forma format.Format, addTrack addTrackFunc) (*OutgoingTrack, error) {
t := &OutgoingTrack{}
switch forma := forma.(type) {
case *format.AV1:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeAV1,
ClockRate: 90000,
},
"av1",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.VP9:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP9,
ClockRate: uint32(forma.ClockRate()),
},
"vp9",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.VP8:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeVP8,
ClockRate: uint32(forma.ClockRate()),
},
"vp8",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.H264:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: uint32(forma.ClockRate()),
},
"h264",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.Opus:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: uint32(forma.ClockRate()),
Channels: 2,
},
"opus",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.G722:
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeG722,
ClockRate: uint32(forma.ClockRate()),
},
"g722",
webrtcStreamID,
)
if err != nil {
return nil, err
}
case *format.G711:
var mtyp string
if forma.MULaw {
mtyp = webrtc.MimeTypePCMU
} else {
mtyp = webrtc.MimeTypePCMA
}
var err error
t.track, err = webrtc.NewTrackLocalStaticRTP(
webrtc.RTPCodecCapability{
MimeType: mtyp,
ClockRate: uint32(forma.ClockRate()),
},
"g711",
webrtcStreamID,
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported track type: %T", forma)
}
sender, err := addTrack(t.track)
if err != nil {
return nil, err
}
// read incoming RTCP packets to make interceptors work
go func() {
buf := make([]byte, 1500)
for {
_, _, err := sender.Read(buf)
if err != nil {
return
}
}
}()
return t, nil
}
// WriteRTP writes a RTP packet.
func (t *OutgoingTrack) WriteRTP(pkt *rtp.Packet) error {
return t.track.WriteRTP(pkt)
}

381
internal/webrtc/peer_connection.go

@ -0,0 +1,381 @@ @@ -0,0 +1,381 @@
package webrtc
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
const (
webrtcHandshakeTimeout = 10 * time.Second
webrtcTrackGatherTimeout = 2 * time.Second
webrtcStreamID = "mediamtx"
)
type nilLogger struct{}
func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
type trackRecvPair struct {
track *webrtc.TrackRemote
receiver *webrtc.RTPReceiver
}
// PeerConnection is a wrapper around webrtc.PeerConnection.
type PeerConnection struct {
ICEServers []webrtc.ICEServer
API *webrtc.API
Publish bool
Log logger.Writer
wr *webrtc.PeerConnection
stateChangeMutex sync.Mutex
newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
closed chan struct{}
gatheringDone chan struct{}
incomingTrack chan trackRecvPair
}
// Start starts the peer connection.
func (co *PeerConnection) Start() error {
if co.Log == nil {
co.Log = &nilLogger{}
}
configuration := webrtc.Configuration{
ICEServers: co.ICEServers,
}
var err error
co.wr, err = co.API.NewPeerConnection(configuration)
if err != nil {
return err
}
co.newLocalCandidate = make(chan *webrtc.ICECandidateInit)
co.connected = make(chan struct{})
co.disconnected = make(chan struct{})
co.closed = make(chan struct{})
co.gatheringDone = make(chan struct{})
co.incomingTrack = make(chan trackRecvPair)
if !co.Publish {
_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
if err != nil {
co.wr.Close() //nolint:errcheck
return err
}
_, err = co.wr.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
})
if err != nil {
co.wr.Close() //nolint:errcheck
return err
}
co.wr.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
select {
case co.incomingTrack <- trackRecvPair{track, receiver}:
case <-co.closed:
}
})
}
co.wr.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
co.stateChangeMutex.Lock()
defer co.stateChangeMutex.Unlock()
select {
case <-co.closed:
return
default:
}
co.Log.Log(logger.Debug, "peer connection state: "+state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
co.Log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.LocalCandidate(), co.RemoteCandidate())
close(co.connected)
case webrtc.PeerConnectionStateDisconnected:
close(co.disconnected)
case webrtc.PeerConnectionStateClosed:
close(co.closed)
}
})
co.wr.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
v := i.ToJSON()
select {
case co.newLocalCandidate <- &v:
case <-co.connected:
case <-co.closed:
}
} else {
close(co.gatheringDone)
}
})
return nil
}
// Close closes the connection.
func (co *PeerConnection) Close() {
co.wr.Close() //nolint:errcheck
<-co.closed
}
// CreatePartialOffer creates a partial offer.
func (co *PeerConnection) CreatePartialOffer() (*webrtc.SessionDescription, error) {
offer, err := co.wr.CreateOffer(nil)
if err != nil {
return nil, err
}
err = co.wr.SetLocalDescription(offer)
if err != nil {
return nil, err
}
return &offer, nil
}
// SetAnswer sets the answer.
func (co *PeerConnection) SetAnswer(answer *webrtc.SessionDescription) error {
return co.wr.SetRemoteDescription(*answer)
}
// AddRemoteCandidate adds a remote candidate.
func (co *PeerConnection) AddRemoteCandidate(candidate webrtc.ICECandidateInit) error {
return co.wr.AddICECandidate(candidate)
}
// CreateFullAnswer creates a full answer.
func (co *PeerConnection) CreateFullAnswer(
ctx context.Context,
offer *webrtc.SessionDescription,
) (*webrtc.SessionDescription, error) {
err := co.wr.SetRemoteDescription(*offer)
if err != nil {
return nil, err
}
answer, err := co.wr.CreateAnswer(nil)
if err != nil {
return nil, err
}
err = co.wr.SetLocalDescription(answer)
if err != nil {
return nil, err
}
err = co.WaitGatheringDone(ctx)
if err != nil {
return nil, err
}
return co.wr.LocalDescription(), nil
}
// WaitGatheringDone waits until candidate gathering is complete.
func (co *PeerConnection) WaitGatheringDone(ctx context.Context) error {
for {
select {
case <-co.NewLocalCandidate():
case <-co.GatheringDone():
return nil
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
}
// WaitUntilConnected waits until connection is established.
func (co *PeerConnection) WaitUntilConnected(
ctx context.Context,
) error {
t := time.NewTimer(webrtcHandshakeTimeout)
defer t.Stop()
outer:
for {
select {
case <-t.C:
return fmt.Errorf("deadline exceeded while waiting connection")
case <-co.connected:
break outer
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
return nil
}
// GatherIncomingTracks gathers incoming tracks.
func (co *PeerConnection) GatherIncomingTracks(
ctx context.Context,
count int,
) ([]*IncomingTrack, error) {
var tracks []*IncomingTrack
t := time.NewTimer(webrtcTrackGatherTimeout)
defer t.Stop()
for {
select {
case <-t.C:
if count == 0 {
return tracks, nil
}
return nil, fmt.Errorf("deadline exceeded while waiting tracks")
case pair := <-co.incomingTrack:
track, err := newIncomingTrack(pair.track, pair.receiver, co.wr.WriteRTCP, co.Log)
if err != nil {
return nil, err
}
tracks = append(tracks, track)
if len(tracks) == count || len(tracks) >= 2 {
return tracks, nil
}
case <-co.Disconnected():
return nil, fmt.Errorf("peer connection closed")
case <-ctx.Done():
return nil, fmt.Errorf("terminated")
}
}
}
// SetupOutgoingTracks setups outgoing tracks.
func (co *PeerConnection) SetupOutgoingTracks(
videoTrack format.Format,
audioTrack format.Format,
) ([]*OutgoingTrack, error) {
var tracks []*OutgoingTrack
for _, forma := range []format.Format{videoTrack, audioTrack} {
if forma != nil {
track, err := newOutgoingTrack(forma, co.wr.AddTrack)
if err != nil {
return nil, err
}
tracks = append(tracks, track)
}
}
return tracks, nil
}
// Connected returns when connected.
func (co *PeerConnection) Connected() <-chan struct{} {
return co.connected
}
// Disconnected returns when disconnected.
func (co *PeerConnection) Disconnected() <-chan struct{} {
return co.disconnected
}
// NewLocalCandidate returns when there's a new local candidate.
func (co *PeerConnection) NewLocalCandidate() <-chan *webrtc.ICECandidateInit {
return co.newLocalCandidate
}
// GatheringDone returns when candidate gathering is complete.
func (co *PeerConnection) GatheringDone() <-chan struct{} {
return co.gatheringDone
}
// LocalCandidate returns the local candidate.
func (co *PeerConnection) LocalCandidate() string {
var cid string
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.LocalCandidateID
break
}
}
if cid != "" {
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// RemoteCandidate returns the remote candidate.
func (co *PeerConnection) RemoteCandidate() string {
var cid string
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.RemoteCandidateID
break
}
}
if cid != "" {
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// BytesReceived returns received bytes.
func (co *PeerConnection) BytesReceived() uint64 {
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesReceived
}
}
}
return 0
}
// BytesSent returns sent bytes.
func (co *PeerConnection) BytesSent() uint64 {
for _, stats := range co.wr.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesSent
}
}
}
return 0
}

37
internal/webrtc/track_count.go

@ -0,0 +1,37 @@ @@ -0,0 +1,37 @@
package webrtc
import (
"fmt"
"github.com/pion/sdp/v3"
)
// TrackCount returns the track count.
func TrackCount(medias []*sdp.MediaDescription) (int, error) {
videoTrack := false
audioTrack := false
trackCount := 0
for _, media := range medias {
switch media.MediaName.Media {
case "video":
if videoTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
videoTrack = true
case "audio":
if audioTrack {
return 0, fmt.Errorf("only a single video and a single audio track are supported")
}
audioTrack = true
default:
return 0, fmt.Errorf("unsupported media '%s'", media.MediaName.Media)
}
trackCount++
}
return trackCount, nil
}

2
internal/webrtc/webrtc.go

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
// Package webrtc contains WebRTC utilities.
package webrtc

213
internal/webrtc/whip_client.go

@ -0,0 +1,213 @@ @@ -0,0 +1,213 @@
package webrtc
import (
"context"
"fmt"
"net/http"
"net/url"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/sdp/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
// WHIPClient is a WHIP client.
type WHIPClient struct {
HTTPClient *http.Client
URL *url.URL
Log logger.Writer
pc *PeerConnection
}
// Publish publishes tracks.
func (c *WHIPClient) Publish(
ctx context.Context,
videoTrack format.Format,
audioTrack format.Format,
) ([]*OutgoingTrack, error) {
iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String())
if err != nil {
return nil, err
}
api, err := NewAPI(APIConf{})
if err != nil {
return nil, err
}
c.pc = &PeerConnection{
ICEServers: iceServers,
API: api,
Publish: true,
Log: c.Log,
}
err = c.pc.Start()
if err != nil {
return nil, err
}
tracks, err := c.pc.SetupOutgoingTracks(videoTrack, audioTrack)
if err != nil {
c.pc.Close()
return nil, err
}
offer, err := c.pc.CreatePartialOffer()
if err != nil {
c.pc.Close()
return nil, err
}
res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer)
if err != nil {
c.pc.Close()
return nil, err
}
c.URL, err = c.URL.Parse(res.Location)
if err != nil {
c.pc.Close()
return nil, err
}
err = c.pc.SetAnswer(res.Answer)
if err != nil {
c.pc.Close()
return nil, err
}
t := time.NewTimer(webrtcHandshakeTimeout)
defer t.Stop()
outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(context.Background(), c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
if err != nil {
c.pc.Close()
return nil, err
}
case <-c.pc.GatheringDone():
case <-c.pc.Connected():
break outer
case <-t.C:
c.pc.Close()
return nil, fmt.Errorf("deadline exceeded while waiting connection")
}
}
return tracks, nil
}
// Read reads tracks.
func (c *WHIPClient) Read(ctx context.Context) ([]*IncomingTrack, error) {
iceServers, err := WHIPOptionsICEServers(ctx, c.HTTPClient, c.URL.String())
if err != nil {
return nil, err
}
api, err := NewAPI(APIConf{})
if err != nil {
return nil, err
}
c.pc = &PeerConnection{
ICEServers: iceServers,
API: api,
Publish: false,
Log: c.Log,
}
err = c.pc.Start()
if err != nil {
return nil, err
}
offer, err := c.pc.CreatePartialOffer()
if err != nil {
c.pc.Close()
return nil, err
}
res, err := PostOffer(ctx, c.HTTPClient, c.URL.String(), offer)
if err != nil {
c.pc.Close()
return nil, err
}
c.URL, err = c.URL.Parse(res.Location)
if err != nil {
c.pc.Close()
return nil, err
}
var sdp sdp.SessionDescription
err = sdp.Unmarshal([]byte(res.Answer.SDP))
if err != nil {
c.pc.Close()
return nil, err
}
// check that there are at most two tracks
_, err = TrackCount(sdp.MediaDescriptions)
if err != nil {
c.pc.Close()
return nil, err
}
err = c.pc.SetAnswer(res.Answer)
if err != nil {
c.pc.Close()
return nil, err
}
t := time.NewTimer(webrtcHandshakeTimeout)
defer t.Stop()
outer:
for {
select {
case ca := <-c.pc.NewLocalCandidate():
err := WHIPPatchCandidate(context.Background(), c.HTTPClient, c.URL.String(), offer, res.ETag, ca)
if err != nil {
c.pc.Close()
return nil, err
}
case <-c.pc.GatheringDone():
case <-c.pc.Connected():
break outer
case <-t.C:
c.pc.Close()
return nil, fmt.Errorf("deadline exceeded while waiting connection")
}
}
return c.pc.GatherIncomingTracks(ctx, 0)
}
// Close closes the client.
func (c *WHIPClient) Close() error {
err := WHIPDeleteSession(context.Background(), c.HTTPClient, c.URL.String())
c.pc.Close()
return err
}
// Wait waits for client errors.
func (c *WHIPClient) Wait(ctx context.Context) error {
select {
case <-c.pc.Disconnected():
return fmt.Errorf("peer connection closed")
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}

6
internal/whip/delete_session.go → internal/webrtc/whip_delete_session.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"context"
@ -6,8 +6,8 @@ import ( @@ -6,8 +6,8 @@ import (
"net/http"
)
// DeleteSession deletes a WHIP/WHEP session.
func DeleteSession(
// WHIPDeleteSession deletes a WHIP/WHEP session.
func WHIPDeleteSession(
ctx context.Context,
hc *http.Client,
ur string,

6
internal/whip/options_ice_servers.go → internal/webrtc/whip_options_ice_servers.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"context"
@ -8,8 +8,8 @@ import ( @@ -8,8 +8,8 @@ import (
"github.com/pion/webrtc/v3"
)
// OptionsICEServers sends a WHIP/WHEP request for ICE servers.
func OptionsICEServers(
// WHIPOptionsICEServers sends a WHIP/WHEP request for ICE servers.
func WHIPOptionsICEServers(
ctx context.Context,
hc *http.Client,
ur string,

6
internal/whip/patch_candidate.go → internal/webrtc/whip_patch_candidate.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"bytes"
@ -9,8 +9,8 @@ import ( @@ -9,8 +9,8 @@ import (
"github.com/pion/webrtc/v3"
)
// PatchCandidate sends a WHIP/WHEP candidate.
func PatchCandidate(
// WHIPPatchCandidate sends a WHIP/WHEP candidate.
func WHIPPatchCandidate(
ctx context.Context,
hc *http.Client,
ur string,

10
internal/whip/post_offer.go → internal/webrtc/whip_post_offer.go

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
package whip
package webrtc
import (
"bytes"
@ -10,8 +10,8 @@ import ( @@ -10,8 +10,8 @@ import (
"github.com/pion/webrtc/v3"
)
// PostOfferResponse is the response to a post offer.
type PostOfferResponse struct {
// WHIPPostOfferResponse is the response to a post offer.
type WHIPPostOfferResponse struct {
Answer *webrtc.SessionDescription
Location string
ETag string
@ -23,7 +23,7 @@ func PostOffer( @@ -23,7 +23,7 @@ func PostOffer(
hc *http.Client,
ur string,
offer *webrtc.SessionDescription,
) (*PostOfferResponse, error) {
) (*WHIPPostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
@ -68,7 +68,7 @@ func PostOffer( @@ -68,7 +68,7 @@ func PostOffer(
SDP: string(sdp),
}
return &PostOfferResponse{
return &WHIPPostOfferResponse{
Answer: answer,
Location: Location,
ETag: etag,

196
internal/webrtcpc/pc.go

@ -1,196 +0,0 @@ @@ -1,196 +0,0 @@
// Package webrtcpc contains a WebRTC peer connection wrapper.
package webrtcpc
import (
"context"
"fmt"
"strconv"
"sync"
"github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/logger"
)
// PeerConnection is a wrapper around webrtc.PeerConnection.
type PeerConnection struct {
*webrtc.PeerConnection
stateChangeMutex sync.Mutex
newLocalCandidate chan *webrtc.ICECandidateInit
connected chan struct{}
disconnected chan struct{}
closed chan struct{}
gatheringDone chan struct{}
}
// New allocates a PeerConnection.
func New(
iceServers []webrtc.ICEServer,
api *webrtc.API,
log logger.Writer,
) (*PeerConnection, error) {
configuration := webrtc.Configuration{ICEServers: iceServers}
pc, err := api.NewPeerConnection(configuration)
if err != nil {
return nil, err
}
co := &PeerConnection{
PeerConnection: pc,
newLocalCandidate: make(chan *webrtc.ICECandidateInit),
connected: make(chan struct{}),
disconnected: make(chan struct{}),
closed: make(chan struct{}),
gatheringDone: make(chan struct{}),
}
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
co.stateChangeMutex.Lock()
defer co.stateChangeMutex.Unlock()
select {
case <-co.closed:
return
default:
}
log.Log(logger.Debug, "peer connection state: "+state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
log.Log(logger.Info, "peer connection established, local candidate: %v, remote candidate: %v",
co.LocalCandidate(), co.RemoteCandidate())
close(co.connected)
case webrtc.PeerConnectionStateDisconnected:
close(co.disconnected)
case webrtc.PeerConnectionStateClosed:
close(co.closed)
}
})
pc.OnICECandidate(func(i *webrtc.ICECandidate) {
if i != nil {
v := i.ToJSON()
select {
case co.newLocalCandidate <- &v:
case <-co.connected:
case <-co.closed:
}
} else {
close(co.gatheringDone)
}
})
return co, nil
}
// Close closes the connection.
func (co *PeerConnection) Close() {
co.PeerConnection.Close() //nolint:errcheck
<-co.closed
}
// Connected returns when connected.
func (co *PeerConnection) Connected() <-chan struct{} {
return co.connected
}
// Disconnected returns when disconnected.
func (co *PeerConnection) Disconnected() <-chan struct{} {
return co.disconnected
}
// NewLocalCandidate returns when there's a new local candidate.
func (co *PeerConnection) NewLocalCandidate() <-chan *webrtc.ICECandidateInit {
return co.newLocalCandidate
}
// GatheringDone returns when candidate gathering is complete.
func (co *PeerConnection) GatheringDone() <-chan struct{} {
return co.gatheringDone
}
// WaitGatheringDone waits until candidate gathering is complete.
func (co *PeerConnection) WaitGatheringDone(ctx context.Context) error {
for {
select {
case <-co.NewLocalCandidate():
case <-co.GatheringDone():
return nil
case <-ctx.Done():
return fmt.Errorf("terminated")
}
}
}
// LocalCandidate returns the local candidate.
func (co *PeerConnection) LocalCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.LocalCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// RemoteCandidate returns the remote candidate.
func (co *PeerConnection) RemoteCandidate() string {
var cid string
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidatePairStats); ok && tstats.Nominated {
cid = tstats.RemoteCandidateID
break
}
}
if cid != "" {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.ICECandidateStats); ok && tstats.ID == cid {
return tstats.CandidateType.String() + "/" + tstats.Protocol + "/" +
tstats.IP + "/" + strconv.FormatInt(int64(tstats.Port), 10)
}
}
}
return ""
}
// BytesReceived returns received bytes.
func (co *PeerConnection) BytesReceived() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesReceived
}
}
}
return 0
}
// BytesSent returns sent bytes.
func (co *PeerConnection) BytesSent() uint64 {
for _, stats := range co.GetStats() {
if tstats, ok := stats.(webrtc.TransportStats); ok {
if tstats.ID == "iceTransport" {
return tstats.BytesSent
}
}
}
return 0
}

2
internal/whip/whip.go

@ -1,2 +0,0 @@ @@ -1,2 +0,0 @@
// Package whip contains WebRTC / WHIP utilities.
package whip
Loading…
Cancel
Save