Browse Source

webrtc: support WHIP/WHEP DELETE method (#2507)

pull/2559/head
Rui Lopes 2 years ago committed by GitHub
parent
commit
9f5169ba26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      internal/core/api_test.go
  2. 2
      internal/core/path_test.go
  3. 384
      internal/core/webrtc_http_server.go
  4. 38
      internal/core/webrtc_manager.go
  5. 43
      internal/core/webrtc_manager_test.go
  6. 50
      internal/core/webrtc_publish_index.html
  7. 162
      internal/core/webrtc_read_index.html
  8. 2
      internal/core/webrtc_source.go
  9. 31
      internal/whip/delete_session.go
  10. 6
      internal/whip/options_ice_servers.go
  11. 7
      internal/whip/patch_candidate.go
  12. 2
      internal/whip/post_offer.go
  13. 2
      internal/whip/whip.go

6
internal/core/api_test.go

@ -809,7 +809,7 @@ func TestAPIProtocolList(t *testing.T) { @@ -809,7 +809,7 @@ func TestAPIProtocolList(t *testing.T) {
defer source.Close()
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whep", false)
defer c.close()
defer c.close(t, true)
time.Sleep(500 * time.Millisecond)
@ -1095,7 +1095,7 @@ func TestAPIProtocolGet(t *testing.T) { @@ -1095,7 +1095,7 @@ func TestAPIProtocolGet(t *testing.T) {
defer source.Close()
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whep", false)
defer c.close()
defer c.close(t, true)
time.Sleep(500 * time.Millisecond)
@ -1385,7 +1385,7 @@ func TestAPIProtocolKick(t *testing.T) { @@ -1385,7 +1385,7 @@ func TestAPIProtocolKick(t *testing.T) {
case "webrtc":
c := newWebRTCTestClient(t, hc, "http://localhost:8889/mypath/whip", true)
defer c.close()
defer c.close(t, false)
case "srt":
conf := srt.DefaultConfig()

2
internal/core/path_test.go

@ -365,7 +365,7 @@ func TestPathRunOnRead(t *testing.T) { @@ -365,7 +365,7 @@ func TestPathRunOnRead(t *testing.T) {
case "webrtc":
hc := &http.Client{Transport: &http.Transport{}}
c := newWebRTCTestClient(t, hc, "http://localhost:8889/test/whep?query=value", false)
defer c.close()
defer c.close(t, true)
}
time.Sleep(500 * time.Millisecond)

384
internal/core/webrtc_http_server.go

@ -6,6 +6,8 @@ import ( @@ -6,6 +6,8 @@ import (
"io"
"net"
"net/http"
"net/url"
"regexp"
"strings"
"time"
@ -25,13 +27,29 @@ var webrtcPublishIndex []byte @@ -25,13 +27,29 @@ var webrtcPublishIndex []byte
//go:embed webrtc_read_index.html
var webrtcReadIndex []byte
var (
reWHIPWHEPNoID = regexp.MustCompile("^/(.+?)/(whip|whep)$")
reWHIPWHEPWithID = regexp.MustCompile("^/(.+?)/(whip|whep)/(.+?)$")
)
func relativeLocation(u *url.URL) string {
p := u.Path
if u.RawQuery != "" {
p += "?" + u.RawQuery
}
return p
}
type webRTCHTTPServerParent interface {
logger.Writer
generateICEServers() ([]webrtc.ICEServer, error)
newSession(req webRTCNewSessionReq) webRTCNewSessionRes
addSessionCandidates(req webRTCAddSessionCandidatesReq) webRTCAddSessionCandidatesRes
deleteSession(req webRTCDeleteSessionReq) error
}
// This implements https://datatracker.ietf.org/doc/draft-ietf-wish-whip/
// This implements https://datatracker.ietf.org/doc/draft-murillo-whep/
type webRTCHTTPServer struct {
allowOrigin string
pathManager *pathManager
@ -97,214 +115,244 @@ func (s *webRTCHTTPServer) close() { @@ -97,214 +115,244 @@ func (s *webRTCHTTPServer) close() {
s.inner.Close()
}
func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.allowOrigin)
ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
// remove leading prefix
pa := ctx.Request.URL.Path[1:]
func (s *webRTCHTTPServer) checkAuthOutsideSession(ctx *gin.Context, path string, publish bool) bool {
ip := ctx.ClientIP()
_, port, _ := net.SplitHostPort(ctx.Request.RemoteAddr)
remoteAddr := net.JoinHostPort(ip, port)
user, pass, hasCredentials := ctx.Request.BasicAuth()
isWHIPorWHEP := strings.HasSuffix(pa, "/whip") || strings.HasSuffix(pa, "/whep")
isPreflight := ctx.Request.Method == http.MethodOptions &&
ctx.Request.Header.Get("Access-Control-Request-Method") != ""
res := s.pathManager.getConfForPath(pathGetConfForPathReq{
accessRequest: pathAccessRequest{
name: path,
query: ctx.Request.URL.RawQuery,
publish: publish,
ip: net.ParseIP(ip),
user: user,
pass: pass,
proto: authProtocolWebRTC,
},
})
if res.err != nil {
if terr, ok := res.err.(*errAuthentication); ok {
if !hasCredentials {
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return false
}
if !isWHIPorWHEP || isPreflight {
switch ctx.Request.Method {
case http.MethodOptions:
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.WriteHeader(http.StatusNoContent)
return
s.Log(logger.Info, "connection %v failed to authenticate: %v", remoteAddr, terr.message)
case http.MethodGet:
// wait some seconds to stop brute force attacks
<-time.After(webrtcPauseAfterAuthError)
default:
return
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return false
}
ctx.Writer.WriteHeader(http.StatusNotFound)
return false
}
var dir string
var fname string
var publish bool
return true
}
switch {
case pa == "", pa == "favicon.ico":
func (s *webRTCHTTPServer) onWHIPOptions(ctx *gin.Context, path string, publish bool) {
if !s.checkAuthOutsideSession(ctx, path, publish) {
return
}
case strings.HasSuffix(pa, "/publish"):
dir, fname = pa[:len(pa)-len("/publish")], "publish"
publish = true
case strings.HasSuffix(pa, "/whip"):
dir, fname = pa[:len(pa)-len("/whip")], "whip"
publish = true
case strings.HasSuffix(pa, "/whep"):
dir, fname = pa[:len(pa)-len("/whep")], "whep"
publish = false
servers, err := s.parent.generateICEServers()
if err != nil {
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
default:
dir, fname = pa, ""
publish = false
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH, DELETE")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.Header().Set("Access-Control-Expose-Headers", "Link")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.WriteHeader(http.StatusNoContent)
}
if !strings.HasSuffix(dir, "/") {
l := "/" + dir + "/"
if ctx.Request.URL.RawQuery != "" {
l += "?" + ctx.Request.URL.RawQuery
}
ctx.Writer.Header().Set("Location", l)
ctx.Writer.WriteHeader(http.StatusMovedPermanently)
return
}
func (s *webRTCHTTPServer) onWHIPPost(ctx *gin.Context, path string, publish bool) {
if ctx.Request.Header.Get("Content-Type") != "application/sdp" {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
dir = strings.TrimSuffix(dir, "/")
if dir == "" {
offer, err := io.ReadAll(ctx.Request.Body)
if err != nil {
return
}
ip := ctx.ClientIP()
_, port, _ := net.SplitHostPort(ctx.Request.RemoteAddr)
remoteAddr := net.JoinHostPort(ip, port)
user, pass, hasCredentials := ctx.Request.BasicAuth()
user, pass, _ := ctx.Request.BasicAuth()
res := s.parent.newSession(webRTCNewSessionReq{
pathName: path,
remoteAddr: remoteAddr,
query: ctx.Request.URL.RawQuery,
user: user,
pass: pass,
offer: offer,
publish: publish,
})
if res.err != nil {
ctx.Writer.WriteHeader(res.errStatusCode)
return
}
// if request doesn't belong to a session, check authentication here
if !isWHIPorWHEP || ctx.Request.Method == http.MethodOptions {
res := s.pathManager.getConfForPath(pathGetConfForPathReq{
accessRequest: pathAccessRequest{
name: dir,
query: ctx.Request.URL.RawQuery,
publish: publish,
ip: net.ParseIP(ip),
user: user,
pass: pass,
proto: authProtocolWebRTC,
},
})
if res.err != nil {
if terr, ok := res.err.(*errAuthentication); ok {
if !hasCredentials {
ctx.Header("WWW-Authenticate", `Basic realm="mediamtx"`)
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return
}
s.Log(logger.Info, "connection %v failed to authenticate: %v", remoteAddr, terr.message)
// wait some seconds to stop brute force attacks
<-time.After(webrtcPauseAfterAuthError)
servers, err := s.parent.generateICEServers()
if err != nil {
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
ctx.Writer.WriteHeader(http.StatusUnauthorized)
return
}
ctx.Writer.Header().Set("Content-Type", "application/sdp")
ctx.Writer.Header().Set("Access-Control-Expose-Headers", "ETag, Accept-Patch, Link, Location")
ctx.Writer.Header().Set("ETag", "*")
ctx.Writer.Header().Set("ID", res.sx.uuid.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Request.URL.Path += "/" + res.sx.secret.String()
ctx.Writer.Header().Set("Location", relativeLocation(ctx.Request.URL))
ctx.Writer.WriteHeader(http.StatusCreated)
ctx.Writer.Write(res.answer)
}
ctx.Writer.WriteHeader(http.StatusNotFound)
return
}
func (s *webRTCHTTPServer) onWHIPPatch(ctx *gin.Context, rawSecret string) {
secret, err := uuid.Parse(rawSecret)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
switch fname {
case "":
ctx.Writer.Header().Set("Cache-Control", "max-age=3600")
ctx.Writer.Header().Set("Content-Type", "text/html")
ctx.Writer.WriteHeader(http.StatusOK)
ctx.Writer.Write(webrtcReadIndex)
if ctx.Request.Header.Get("Content-Type") != "application/trickle-ice-sdpfrag" {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
case "publish":
ctx.Writer.Header().Set("Cache-Control", "max-age=3600")
ctx.Writer.Header().Set("Content-Type", "text/html")
ctx.Writer.WriteHeader(http.StatusOK)
ctx.Writer.Write(webrtcPublishIndex)
byts, err := io.ReadAll(ctx.Request.Body)
if err != nil {
return
}
case "whip", "whep":
switch ctx.Request.Method {
case http.MethodOptions:
servers, err := s.parent.generateICEServers()
if err != nil {
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
candidates, err := whip.ICEFragmentUnmarshal(byts)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.Header().Set("Access-Control-Expose-Headers", "Link")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.WriteHeader(http.StatusNoContent)
res := s.parent.addSessionCandidates(webRTCAddSessionCandidatesReq{
secret: secret,
candidates: candidates,
})
if res.err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodPost:
if ctx.Request.Header.Get("Content-Type") != "application/sdp" {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
ctx.Writer.WriteHeader(http.StatusNoContent)
}
offer, err := io.ReadAll(ctx.Request.Body)
if err != nil {
return
}
func (s *webRTCHTTPServer) onWHIPDelete(ctx *gin.Context, rawSecret string) {
secret, err := uuid.Parse(rawSecret)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
res := s.parent.newSession(webRTCNewSessionReq{
pathName: dir,
remoteAddr: remoteAddr,
query: ctx.Request.URL.RawQuery,
user: user,
pass: pass,
offer: offer,
publish: (fname == "whip"),
})
if res.err != nil {
ctx.Writer.WriteHeader(res.errStatusCode)
return
}
err = s.parent.deleteSession(webRTCDeleteSessionReq{
secret: secret,
})
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
servers, err := s.parent.generateICEServers()
if err != nil {
ctx.Writer.WriteHeader(http.StatusInternalServerError)
return
}
ctx.Writer.WriteHeader(http.StatusOK)
}
ctx.Writer.Header().Set("Content-Type", "application/sdp")
ctx.Writer.Header().Set("Access-Control-Expose-Headers", "ETag, Accept-Patch, Link, Location")
ctx.Writer.Header().Set("ETag", res.sx.secret.String())
ctx.Writer.Header().Set("ID", res.sx.uuid.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = whip.LinkHeaderMarshal(servers)
ctx.Writer.Header().Set("Location", ctx.Request.URL.String())
ctx.Writer.WriteHeader(http.StatusCreated)
ctx.Writer.Write(res.answer)
func (s *webRTCHTTPServer) onPage(ctx *gin.Context, path string, publish bool) {
if !s.checkAuthOutsideSession(ctx, path, publish) {
return
}
ctx.Writer.Header().Set("Cache-Control", "max-age=3600")
ctx.Writer.Header().Set("Content-Type", "text/html")
ctx.Writer.WriteHeader(http.StatusOK)
if publish {
ctx.Writer.Write(webrtcPublishIndex)
} else {
ctx.Writer.Write(webrtcReadIndex)
}
}
func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("Access-Control-Allow-Origin", s.allowOrigin)
ctx.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
// preflight requests
if ctx.Request.Method == http.MethodOptions &&
ctx.Request.Header.Get("Access-Control-Request-Method") != "" {
ctx.Writer.Header().Set("Access-Control-Allow-Methods", "OPTIONS, GET, POST, PATCH, DELETE")
ctx.Writer.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, If-Match")
ctx.Writer.WriteHeader(http.StatusNoContent)
return
}
// WHIP, outside session
if m := reWHIPWHEPNoID.FindStringSubmatch(ctx.Request.URL.Path); m != nil {
switch ctx.Request.Method {
case http.MethodOptions:
s.onWHIPOptions(ctx, m[1], m[2] == "whip")
case http.MethodPost:
s.onWHIPPost(ctx, m[1], m[2] == "whip")
case http.MethodGet, http.MethodHead, http.MethodPut:
// RFC draft-ietf-whip-09
// The WHIP endpoints MUST return an "405 Method Not Allowed" response
// for any HTTP GET, HEAD or PUT requests
ctx.Writer.WriteHeader(http.StatusMethodNotAllowed)
}
return
}
// WHIP, inside session
if m := reWHIPWHEPWithID.FindStringSubmatch(ctx.Request.URL.Path); m != nil {
switch ctx.Request.Method {
case http.MethodPatch:
secret, err := uuid.Parse(ctx.Request.Header.Get("If-Match"))
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
s.onWHIPPatch(ctx, m[3])
if ctx.Request.Header.Get("Content-Type") != "application/trickle-ice-sdpfrag" {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
s.onWHIPDelete(ctx, m[3])
}
return
}
byts, err := io.ReadAll(ctx.Request.Body)
if err != nil {
return
}
// static resources
if ctx.Request.Method == http.MethodGet {
switch {
case ctx.Request.URL.Path == "/favicon.ico":
candidates, err := whip.ICEFragmentUnmarshal(byts)
if err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
case len(ctx.Request.URL.Path) >= 3:
switch {
case strings.HasSuffix(ctx.Request.URL.Path, "/publish"):
s.onPage(ctx, ctx.Request.URL.Path[1:len(ctx.Request.URL.Path)-len("/publish")], true)
res := s.parent.addSessionCandidates(webRTCAddSessionCandidatesReq{
secret: secret,
candidates: candidates,
})
if res.err != nil {
ctx.Writer.WriteHeader(http.StatusBadRequest)
return
}
case ctx.Request.URL.Path[len(ctx.Request.URL.Path)-1] != '/':
ctx.Request.URL.Path += "/"
ctx.Writer.Header().Set("Location", relativeLocation(ctx.Request.URL))
ctx.Writer.WriteHeader(http.StatusMovedPermanently)
ctx.Writer.WriteHeader(http.StatusNoContent)
default:
s.onPage(ctx, ctx.Request.URL.Path[1:len(ctx.Request.URL.Path)-1], false)
}
}
return
}
}

38
internal/core/webrtc_manager.go

@ -288,6 +288,15 @@ type webRTCAddSessionCandidatesReq struct { @@ -288,6 +288,15 @@ type webRTCAddSessionCandidatesReq struct {
res chan webRTCAddSessionCandidatesRes
}
type webRTCDeleteSessionRes struct {
err error
}
type webRTCDeleteSessionReq struct {
secret uuid.UUID
res chan webRTCDeleteSessionRes
}
type webRTCManagerParent interface {
logger.Writer
}
@ -315,6 +324,7 @@ type webRTCManager struct { @@ -315,6 +324,7 @@ type webRTCManager struct {
chNewSession chan webRTCNewSessionReq
chCloseSession chan *webRTCSession
chAddSessionCandidates chan webRTCAddSessionCandidatesReq
chDeleteSession chan webRTCDeleteSessionReq
chAPISessionsList chan webRTCManagerAPISessionsListReq
chAPISessionsGet chan webRTCManagerAPISessionsGetReq
chAPIConnsKick chan webRTCManagerAPISessionsKickReq
@ -360,6 +370,7 @@ func newWebRTCManager( @@ -360,6 +370,7 @@ func newWebRTCManager(
chNewSession: make(chan webRTCNewSessionReq),
chCloseSession: make(chan *webRTCSession),
chAddSessionCandidates: make(chan webRTCAddSessionCandidatesReq),
chDeleteSession: make(chan webRTCDeleteSessionReq),
chAPISessionsList: make(chan webRTCManagerAPISessionsListReq),
chAPISessionsGet: make(chan webRTCManagerAPISessionsGetReq),
chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq),
@ -482,6 +493,19 @@ outer: @@ -482,6 +493,19 @@ outer:
req.res <- webRTCAddSessionCandidatesRes{sx: sx}
case req := <-m.chDeleteSession:
sx, ok := m.sessionsBySecret[req.secret]
if !ok {
req.res <- webRTCDeleteSessionRes{err: fmt.Errorf("session not found")}
continue
}
delete(m.sessions, sx)
delete(m.sessionsBySecret, sx.secret)
sx.close()
req.res <- webRTCDeleteSessionRes{}
case req := <-m.chAPISessionsList:
data := &apiWebRTCSessionList{
Items: []*apiWebRTCSession{},
@ -516,6 +540,7 @@ outer: @@ -516,6 +540,7 @@ outer:
delete(m.sessions, sx)
delete(m.sessionsBySecret, sx.secret)
sx.close()
req.res <- webRTCManagerAPISessionsKickRes{}
case <-m.ctx.Done():
@ -619,6 +644,19 @@ func (m *webRTCManager) addSessionCandidates( @@ -619,6 +644,19 @@ func (m *webRTCManager) addSessionCandidates(
}
}
// deleteSession is called by webRTCHTTPServer.
func (m *webRTCManager) deleteSession(req webRTCDeleteSessionReq) error {
req.res = make(chan webRTCDeleteSessionRes)
select {
case m.chDeleteSession <- req:
res := <-req.res
return res.err
case <-m.ctx.Done():
return fmt.Errorf("terminated")
}
}
// apiSessionsList is called by api.
func (m *webRTCManager) apiSessionsList() (*apiWebRTCSessionList, error) {
req := webRTCManagerAPISessionsListReq{

43
internal/core/webrtc_manager_test.go

@ -4,13 +4,14 @@ import ( @@ -4,13 +4,14 @@ import (
"bytes"
"context"
"net/http"
"net/url"
"testing"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/url"
rtspurl "github.com/bluenviron/gortsplib/v4/pkg/url"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
@ -26,6 +27,9 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) { @@ -26,6 +27,9 @@ func (nilLogger) Log(_ logger.Level, _ string, _ ...interface{}) {
}
type webRTCTestClient struct {
hc *http.Client
url *url.URL
pc *webrtcpc.PeerConnection
outgoingTrack1 *webrtc.TrackLocalStaticRTP
outgoingTrack2 *webrtc.TrackLocalStaticRTP
@ -35,13 +39,19 @@ type webRTCTestClient struct { @@ -35,13 +39,19 @@ type webRTCTestClient struct {
func newWebRTCTestClient(
t *testing.T,
hc *http.Client,
ur string,
rawURL string,
publish bool,
) *webRTCTestClient {
iceServers, err := whip.GetICEServers(context.Background(), hc, ur)
c := &webRTCTestClient{
hc: hc,
}
var err error
c.url, err = url.Parse(rawURL)
require.NoError(t, err)
c := &webRTCTestClient{}
iceServers, err := whip.OptionsICEServers(context.Background(), hc, c.url.String())
require.NoError(t, err)
api, err := webrtcNewAPI(nil, nil, nil, nil)
require.NoError(t, err)
@ -94,18 +104,21 @@ func newWebRTCTestClient( @@ -94,18 +104,21 @@ func newWebRTCTestClient(
offer, err := pc.CreateOffer(nil)
require.NoError(t, err)
res, err := whip.PostOffer(context.Background(), hc, ur, &offer)
res, err := whip.PostOffer(context.Background(), hc, c.url.String(), &offer)
require.NoError(t, err)
c.url, err = c.url.Parse(res.Location)
require.NoError(t, err)
err = pc.SetLocalDescription(offer)
require.NoError(t, err)
// test adding additional candidates, even if it is not mandatory here
// test adding additional candidates, even if it is not strictly necessary
outer:
for {
select {
case c := <-pc.NewLocalCandidate():
err := whip.PostCandidate(context.Background(), hc, ur, &offer, res.ETag, c)
case ca := <-pc.NewLocalCandidate():
err := whip.PatchCandidate(context.Background(), hc, c.url.String(), &offer, res.ETag, ca)
require.NoError(t, err)
case <-pc.GatheringDone():
break outer
@ -154,7 +167,11 @@ outer: @@ -154,7 +167,11 @@ outer:
return c
}
func (c *webRTCTestClient) close() {
func (c *webRTCTestClient) close(t *testing.T, delete bool) {
if delete {
err := whip.DeleteSession(context.Background(), c.hc, c.url.String())
require.NoError(t, err)
}
c.pc.Close()
}
@ -239,7 +256,7 @@ func TestWebRTCRead(t *testing.T) { @@ -239,7 +256,7 @@ func TestWebRTCRead(t *testing.T) {
ur += "localhost:8889/teststream/whep?param=value"
c := newWebRTCTestClient(t, hc, ur, false)
defer c.close()
defer c.close(t, true)
time.Sleep(500 * time.Millisecond)
@ -284,7 +301,7 @@ func TestWebRTCReadNotFound(t *testing.T) { @@ -284,7 +301,7 @@ func TestWebRTCReadNotFound(t *testing.T) {
hc := &http.Client{Transport: &http.Transport{}}
iceServers, err := whip.GetICEServers(context.Background(), hc, "http://localhost:8889/stream/whep")
iceServers, err := whip.OptionsICEServers(context.Background(), hc, "http://localhost:8889/stream/whep")
require.NoError(t, err)
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
@ -387,7 +404,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -387,7 +404,7 @@ func TestWebRTCPublish(t *testing.T) {
ur += "localhost:8889/teststream/whip?param=value"
s := newWebRTCTestClient(t, hc, ur, true)
defer s.close()
defer s.close(t, true)
if auth == "external" {
a.close()
@ -401,7 +418,7 @@ func TestWebRTCPublish(t *testing.T) { @@ -401,7 +418,7 @@ func TestWebRTCPublish(t *testing.T) {
},
}
u, err := url.Parse("rtsp://testreader:testpass@127.0.0.1:8554/teststream?param=value")
u, err := rtspurl.Parse("rtsp://testreader:testpass@127.0.0.1:8554/teststream?param=value")
require.NoError(t, err)
err = c.Start(u.Scheme, u.Host)

50
internal/core/webrtc_publish_index.html

@ -5,17 +5,17 @@ @@ -5,17 +5,17 @@
<meta name="viewport" content="width=device-width">
<style>
html, body {
margin: 0;
padding: 0;
height: 100%;
margin: 0;
padding: 0;
height: 100%;
}
body {
display: flex;
flex-direction: column;
}
#video {
height: 100%;
background: black;
height: 100%;
background: black;
flex-grow: 1;
min-height: 0;
}
@ -313,11 +313,11 @@ const editAnswer = (answer, videoCodec, audioCodec, videoBitrate, audioBitrate, @@ -313,11 +313,11 @@ const editAnswer = (answer, videoCodec, audioCodec, videoBitrate, audioBitrate,
class Transmitter {
constructor(stream) {
this.stream = stream;
this.pc = null;
this.restartTimeout = null;
this.eTag = '';
this.pc = null;
this.restartTimeout = null;
this.sessionUrl = '';
this.queuedCandidates = [];
this.start();
this.start();
}
start() {
@ -366,7 +366,7 @@ class Transmitter { @@ -366,7 +366,7 @@ class Transmitter {
if (res.status !== 201) {
throw new Error('bad status code');
}
this.eTag = res.headers.get('ETag');
this.sessionUrl = new URL(res.headers.get('location'), window.location.href).toString();
return res.text();
})
.then((sdp) => this.onRemoteAnswer(new RTCSessionDescription({
@ -393,9 +393,9 @@ class Transmitter { @@ -393,9 +393,9 @@ class Transmitter {
}
onRemoteAnswer(answer) {
if (this.restartTimeout !== null) {
return;
}
if (this.restartTimeout !== null) {
return;
}
editAnswer(
answer,
@ -412,7 +412,7 @@ class Transmitter { @@ -412,7 +412,7 @@ class Transmitter {
this.sendLocalCandidates(this.queuedCandidates);
this.queuedCandidates = [];
}
}
}
onLocalCandidate(evt) {
if (this.restartTimeout !== null) {
@ -420,7 +420,7 @@ class Transmitter { @@ -420,7 +420,7 @@ class Transmitter {
}
if (evt.candidate !== null) {
if (this.eTag === '') {
if (this.sessionUrl === '') {
this.queuedCandidates.push(evt.candidate);
} else {
this.sendLocalCandidates([evt.candidate])
@ -429,11 +429,11 @@ class Transmitter { @@ -429,11 +429,11 @@ class Transmitter {
}
sendLocalCandidates(candidates) {
fetch(new URL('whip', window.location.href) + window.location.search, {
fetch(this.sessionUrl + window.location.search, {
method: 'PATCH',
headers: {
'Content-Type': 'application/trickle-ice-sdpfrag',
'If-Match': this.eTag,
'If-Match': '*',
},
body: generateSdpFragment(this.offerData, candidates),
})
@ -463,7 +463,21 @@ class Transmitter { @@ -463,7 +463,21 @@ class Transmitter {
this.start();
}, restartPause);
this.eTag = '';
if (this.sessionUrl) {
fetch(this.sessionUrl, {
method: 'DELETE',
})
.then((res) => {
if (res.status !== 200) {
throw new Error('bad status code');
}
})
.catch((err) => {
console.log('delete session error: ' + err);
});
}
this.sessionUrl = '';
this.queuedCandidates = [];
}
}

162
internal/core/webrtc_read_index.html

@ -5,15 +5,15 @@ @@ -5,15 +5,15 @@
<meta name="viewport" content="width=device-width">
<style>
html, body {
margin: 0;
padding: 0;
height: 100%;
overflow: hidden;
margin: 0;
padding: 0;
height: 100%;
overflow: hidden;
}
#video {
width: 100%;
height: 100%;
background: black;
width: 100%;
height: 100%;
background: black;
}
</style>
</head>
@ -137,17 +137,17 @@ const generateSdpFragment = (offerData, candidates) => { @@ -137,17 +137,17 @@ const generateSdpFragment = (offerData, candidates) => {
}
class WHEPClient {
constructor(video) {
this.video = video;
this.pc = null;
this.restartTimeout = null;
this.eTag = '';
constructor(video) {
this.video = video;
this.pc = null;
this.restartTimeout = null;
this.sessionUrl = '';
this.queuedCandidates = [];
this.start();
}
this.start();
}
start() {
console.log("requesting ICE servers");
start() {
console.log("requesting ICE servers");
fetch(new URL('whep', window.location.href) + window.location.search, {
method: 'OPTIONS',
@ -157,7 +157,7 @@ class WHEPClient { @@ -157,7 +157,7 @@ class WHEPClient {
console.log('error: ' + err);
this.scheduleRestart();
});
}
}
onIceServers(res) {
this.pc = new RTCPeerConnection({
@ -199,7 +199,7 @@ class WHEPClient { @@ -199,7 +199,7 @@ class WHEPClient {
if (res.status !== 201) {
throw new Error('bad status code');
}
this.eTag = res.headers.get('ETag');
this.sessionUrl = new URL(res.headers.get('location'), window.location.href).toString();
return res.text();
})
.then((sdp) => this.onRemoteAnswer(new RTCSessionDescription({
@ -225,18 +225,18 @@ class WHEPClient { @@ -225,18 +225,18 @@ class WHEPClient {
}
}
onRemoteAnswer(answer) {
onRemoteAnswer(answer) {
if (this.restartTimeout !== null) {
return;
}
this.pc.setRemoteDescription(answer);
this.pc.setRemoteDescription(answer);
if (this.queuedCandidates.length !== 0) {
this.sendLocalCandidates(this.queuedCandidates);
this.queuedCandidates = [];
}
}
}
onLocalCandidate(evt) {
if (this.restartTimeout !== null) {
@ -244,7 +244,7 @@ class WHEPClient { @@ -244,7 +244,7 @@ class WHEPClient {
}
if (evt.candidate !== null) {
if (this.eTag === '') {
if (this.sessionUrl === '') {
this.queuedCandidates.push(evt.candidate);
} else {
this.sendLocalCandidates([evt.candidate])
@ -253,11 +253,11 @@ class WHEPClient { @@ -253,11 +253,11 @@ class WHEPClient {
}
sendLocalCandidates(candidates) {
fetch(new URL('whep', window.location.href) + window.location.search, {
fetch(this.sessionUrl + window.location.search, {
method: 'PATCH',
headers: {
'Content-Type': 'application/trickle-ice-sdpfrag',
'If-Match': this.eTag,
'If-Match': '*',
},
body: generateSdpFragment(this.offerData, candidates),
})
@ -287,7 +287,21 @@ class WHEPClient { @@ -287,7 +287,21 @@ class WHEPClient {
this.start();
}, restartPause);
this.eTag = '';
if (this.sessionUrl) {
fetch(this.sessionUrl, {
method: 'DELETE',
})
.then((res) => {
if (res.status !== 200) {
throw new Error('bad status code');
}
})
.catch((err) => {
console.log('delete session error: ' + err);
});
}
this.sessionUrl = '';
this.queuedCandidates = [];
}
}
@ -300,31 +314,31 @@ class WHEPClient { @@ -300,31 +314,31 @@ class WHEPClient {
* @returns {Object} An object representing the query parameters with keys as parameter names and values as parameter values.
*/
const parseQueryString = (url) => {
const queryString = (url || window.location.search).split("?")[1];
if (!queryString) return {};
const paramsArray = queryString.split("&");
const result = {};
for (let i = 0; i < paramsArray.length; i++) {
const param = paramsArray[i].split("=");
const key = decodeURIComponent(param[0]);
const value = decodeURIComponent(param[1] || "");
if (key) {
if (result[key]) {
if (Array.isArray(result[key])) {
result[key].push(value);
} else {
result[key] = [result[key], value];
}
} else {
result[key] = value;
}
}
}
return result;
const queryString = (url || window.location.search).split("?")[1];
if (!queryString) return {};
const paramsArray = queryString.split("&");
const result = {};
for (let i = 0; i < paramsArray.length; i++) {
const param = paramsArray[i].split("=");
const key = decodeURIComponent(param[0]);
const value = decodeURIComponent(param[1] || "");
if (key) {
if (result[key]) {
if (Array.isArray(result[key])) {
result[key].push(value);
} else {
result[key] = [result[key], value];
}
} else {
result[key] = value;
}
}
}
return result;
};
/**
@ -334,17 +348,17 @@ class WHEPClient { @@ -334,17 +348,17 @@ class WHEPClient {
* @returns {boolean}
*/
const parseBoolString = (str, defaultVal) => {
const trueValues = ["1", "yes", "true"];
const falseValues = ["0", "no", "false"];
str = (str || "").toString();
if (trueValues.includes(str.toLowerCase())) {
return true;
} else if (falseValues.includes(str.toLowerCase())) {
return false;
} else {
return defaultVal;
}
const trueValues = ["1", "yes", "true"];
const falseValues = ["0", "no", "false"];
str = (str || "").toString();
if (trueValues.includes(str.toLowerCase())) {
return true;
} else if (falseValues.includes(str.toLowerCase())) {
return false;
} else {
return defaultVal;
}
};
/**
@ -353,12 +367,12 @@ const parseBoolString = (str, defaultVal) => { @@ -353,12 +367,12 @@ const parseBoolString = (str, defaultVal) => {
* @param {HTMLVideoElement} video - The video element on which to set the attributes.
*/
const setVideoAttributes = (video) => {
let qs = parseQueryString();
let qs = parseQueryString();
video.controls = parseBoolString(qs["controls"], true);
video.muted = parseBoolString(qs["muted"], true);
video.autoplay = parseBoolString(qs["autoplay"], true);
video.playsInline = parseBoolString(qs["playsinline"], true);
video.controls = parseBoolString(qs["controls"], true);
video.muted = parseBoolString(qs["muted"], true);
video.autoplay = parseBoolString(qs["autoplay"], true);
video.playsInline = parseBoolString(qs["playsinline"], true);
};
/**
@ -368,14 +382,14 @@ const setVideoAttributes = (video) => { @@ -368,14 +382,14 @@ const setVideoAttributes = (video) => {
* @returns
*/
const initVideoElement = (callback, container) => {
return () => {
const video = document.createElement("video");
video.id = "video";
setVideoAttributes(video);
container.append(video);
callback(video);
};
return () => {
const video = document.createElement("video");
video.id = "video";
setVideoAttributes(video);
container.append(video);
callback(video);
};
};
window.addEventListener('DOMContentLoaded', initVideoElement((video) => new WHEPClient(video), document.body));

2
internal/core/webrtc_source.go

@ -62,7 +62,7 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat @@ -62,7 +62,7 @@ func (s *webRTCSource) run(ctx context.Context, cnf *conf.Path, _ chan *conf.Pat
Timeout: time.Duration(s.readTimeout),
}
iceServers, err := whip.GetICEServers(ctx, c, u.String())
iceServers, err := whip.OptionsICEServers(ctx, c, u.String())
if err != nil {
return err
}

31
internal/whip/delete_session.go

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
package whip
import (
"context"
"fmt"
"net/http"
)
// DeleteSession deletes a WHIP/WHEP session.
func DeleteSession(
ctx context.Context,
hc *http.Client,
ur string,
) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, ur, nil)
if err != nil {
return err
}
res, err := hc.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("bad status code: %v", res.StatusCode)
}
return nil
}

6
internal/whip/get_ice_servers.go → internal/whip/options_ice_servers.go

@ -8,13 +8,13 @@ import ( @@ -8,13 +8,13 @@ import (
"github.com/pion/webrtc/v3"
)
// GetICEServers posts a WHIP/WHEP request for ICE servers.
func GetICEServers(
// OptionsICEServers sends a WHIP/WHEP request for ICE servers.
func OptionsICEServers(
ctx context.Context,
hc *http.Client,
ur string,
) ([]webrtc.ICEServer, error) {
req, err := http.NewRequestWithContext(ctx, "OPTIONS", ur, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodOptions, ur, nil)
if err != nil {
return nil, err
}

7
internal/whip/post_candidate.go → internal/whip/patch_candidate.go

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
// Package whip contains WebRTC / WHIP utilities.
package whip
import (
@ -10,8 +9,8 @@ import ( @@ -10,8 +9,8 @@ import (
"github.com/pion/webrtc/v3"
)
// PostCandidate posts a WHIP/WHEP candidate.
func PostCandidate(
// PatchCandidate sends a WHIP/WHEP candidate.
func PatchCandidate(
ctx context.Context,
hc *http.Client,
ur string,
@ -24,7 +23,7 @@ func PostCandidate( @@ -24,7 +23,7 @@ func PostCandidate(
return err
}
req, err := http.NewRequestWithContext(ctx, "PATCH", ur, bytes.NewReader(frag))
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, ur, bytes.NewReader(frag))
if err != nil {
return err
}

2
internal/whip/post_offer.go

@ -24,7 +24,7 @@ func PostOffer( @@ -24,7 +24,7 @@ func PostOffer(
ur string,
offer *webrtc.SessionDescription,
) (*PostOfferResponse, error) {
req, err := http.NewRequestWithContext(ctx, "POST", ur, bytes.NewReader([]byte(offer.SDP)))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, ur, bytes.NewReader([]byte(offer.SDP)))
if err != nil {
return nil, err
}

2
internal/whip/whip.go

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
// Package whip contains WebRTC / WHIP utilities.
package whip
Loading…
Cancel
Save