Browse Source

webrtc: fix WHIP/WHEP implementation (#1857) (#1861)

offers and answers are now encoded in SDP in place of JSON; Location
header is set by the server.

This fixes compatibility with GStreamer and whipsink
pull/1862/head v0.23.3
Alessandro Ros 3 years ago committed by GitHub
parent
commit
99aa0d0ac9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      internal/core/webrtc_http_server.go
  2. 27
      internal/core/webrtc_manager_test.go
  3. 9
      internal/core/webrtc_publish_index.html
  4. 9
      internal/core/webrtc_read_index.html
  5. 35
      internal/core/webrtc_session.go

1
internal/core/webrtc_http_server.go

@ -309,6 +309,7 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
ctx.Writer.Header().Set("E-Tag", res.sx.secret.String()) ctx.Writer.Header().Set("E-Tag", res.sx.secret.String())
ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag") ctx.Writer.Header().Set("Accept-Patch", "application/trickle-ice-sdpfrag")
ctx.Writer.Header()["Link"] = iceServersToLinkHeader(s.parent.genICEServers()) ctx.Writer.Header()["Link"] = iceServersToLinkHeader(s.parent.genICEServers())
ctx.Writer.Header().Set("Location", ctx.Request.URL.String())
ctx.Writer.WriteHeader(http.StatusCreated) ctx.Writer.WriteHeader(http.StatusCreated)
ctx.Writer.Write(res.answer) ctx.Writer.Write(res.answer)

27
internal/core/webrtc_manager_test.go

@ -2,7 +2,7 @@ package core
import ( import (
"bytes" "bytes"
"encoding/json" "io"
"net/http" "net/http"
"sync" "sync"
"testing" "testing"
@ -38,10 +38,7 @@ func whipGetICEServers(t *testing.T, hc *http.Client, ur string) []webrtc.ICESer
func whipPostOffer(t *testing.T, hc *http.Client, ur string, func whipPostOffer(t *testing.T, hc *http.Client, ur string,
offer *webrtc.SessionDescription, offer *webrtc.SessionDescription,
) (*webrtc.SessionDescription, string) { ) (*webrtc.SessionDescription, string) {
enc, err := json.Marshal(offer) req, err := http.NewRequest("POST", ur, bytes.NewReader([]byte(offer.SDP)))
require.NoError(t, err)
req, err := http.NewRequest("POST", ur, bytes.NewReader(enc))
require.NoError(t, err) require.NoError(t, err)
req.Header.Set("Content-Type", "application/sdp") req.Header.Set("Content-Type", "application/sdp")
@ -51,22 +48,27 @@ func whipPostOffer(t *testing.T, hc *http.Client, ur string,
defer res.Body.Close() defer res.Body.Close()
require.Equal(t, http.StatusCreated, res.StatusCode) require.Equal(t, http.StatusCreated, res.StatusCode)
require.Equal(t, "application/sdp", res.Header.Get("Content-Type"))
require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch"))
require.Equal(t, req.URL.Path, res.Header.Get("Location"))
link, ok := res.Header["Link"] link, ok := res.Header["Link"]
require.Equal(t, true, ok) require.Equal(t, true, ok)
servers := linkHeaderToIceServers(link) servers := linkHeaderToIceServers(link)
require.NotEqual(t, 0, len(servers)) require.NotEqual(t, 0, len(servers))
require.Equal(t, "application/sdp", res.Header.Get("Content-Type"))
etag := res.Header.Get("E-Tag") etag := res.Header.Get("E-Tag")
require.NotEqual(t, 0, len(etag)) require.NotEqual(t, 0, len(etag))
require.Equal(t, "application/trickle-ice-sdpfrag", res.Header.Get("Accept-Patch"))
var answer webrtc.SessionDescription sdp, err := io.ReadAll(res.Body)
err = json.NewDecoder(res.Body).Decode(&answer)
require.NoError(t, err) require.NoError(t, err)
return &answer, etag answer := &webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: string(sdp),
}
return answer, etag
} }
func whipPostCandidate(t *testing.T, ur string, offer *webrtc.SessionDescription, func whipPostCandidate(t *testing.T, ur string, offer *webrtc.SessionDescription,
@ -323,10 +325,7 @@ func TestWebRTCReadNotFound(t *testing.T) {
offer, err := pc.CreateOffer(nil) offer, err := pc.CreateOffer(nil)
require.NoError(t, err) require.NoError(t, err)
enc, err := json.Marshal(offer) req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader([]byte(offer.SDP)))
require.NoError(t, err)
req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader(enc))
require.NoError(t, err) require.NoError(t, err)
req.Header.Set("Content-Type", "application/sdp") req.Header.Set("Content-Type", "application/sdp")

9
internal/core/webrtc_publish_index.html

@ -232,16 +232,19 @@ class Transmitter {
headers: { headers: {
'Content-Type': 'application/sdp', 'Content-Type': 'application/sdp',
}, },
body: JSON.stringify(desc), body: desc.sdp,
}) })
.then((res) => { .then((res) => {
if (res.status !== 201) { if (res.status !== 201) {
throw new Error('bad status code'); throw new Error('bad status code');
} }
this.eTag = res.headers.get('E-Tag'); this.eTag = res.headers.get('E-Tag');
return res.json(); return res.text();
}) })
.then((answer) => this.onRemoteDescription(answer)) .then((sdp) => this.onRemoteDescription(new RTCSessionDescription({
type: 'answer',
sdp,
})))
.catch((err) => { .catch((err) => {
console.log('error: ' + err); console.log('error: ' + err);
this.scheduleRestart(); this.scheduleRestart();

9
internal/core/webrtc_read_index.html

@ -143,16 +143,19 @@ class WHEPClient {
headers: { headers: {
'Content-Type': 'application/sdp', 'Content-Type': 'application/sdp',
}, },
body: JSON.stringify(desc), body: desc.sdp,
}) })
.then((res) => { .then((res) => {
if (res.status !== 201) { if (res.status !== 201) {
throw new Error('bad status code'); throw new Error('bad status code');
} }
this.eTag = res.headers.get('E-Tag'); this.eTag = res.headers.get('E-Tag');
return res.json(); return res.text();
}) })
.then((answer) => this.onRemoteDescription(answer)) .then((sdp) => this.onRemoteDescription(new RTCSessionDescription({
type: 'answer',
sdp,
})))
.catch((err) => { .catch((err) => {
console.log('error: ' + err); console.log('error: ' + err);
this.scheduleRestart(); this.scheduleRestart();

35
internal/core/webrtc_session.go

@ -3,7 +3,6 @@ package core
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"strconv" "strconv"
@ -256,11 +255,6 @@ func (s *webRTCSession) runPublish() (int, error) {
defer res.path.publisherRemove(pathPublisherRemoveReq{author: s}) defer res.path.publisherRemove(pathPublisherRemoveReq{author: s})
offer, err := s.decodeOffer()
if err != nil {
return http.StatusBadRequest, err
}
pc, err := newPeerConnection( pc, err := newPeerConnection(
s.req.videoCodec, s.req.videoCodec,
s.req.audioCodec, s.req.audioCodec,
@ -297,6 +291,7 @@ func (s *webRTCSession) runPublish() (int, error) {
} }
}) })
offer := s.buildOffer()
err = pc.SetRemoteDescription(*offer) err = pc.SetRemoteDescription(*offer)
if err != nil { if err != nil {
return http.StatusBadRequest, err return http.StatusBadRequest, err
@ -390,11 +385,6 @@ func (s *webRTCSession) runRead() (int, error) {
return http.StatusBadRequest, err return http.StatusBadRequest, err
} }
offer, err := s.decodeOffer()
if err != nil {
return http.StatusBadRequest, err
}
pc, err := newPeerConnection( pc, err := newPeerConnection(
"", "",
"", "",
@ -416,6 +406,7 @@ func (s *webRTCSession) runRead() (int, error) {
} }
} }
offer := s.buildOffer()
err = pc.SetRemoteDescription(*offer) err = pc.SetRemoteDescription(*offer)
if err != nil { if err != nil {
return http.StatusBadRequest, err return http.StatusBadRequest, err
@ -484,18 +475,11 @@ func (s *webRTCSession) runRead() (int, error) {
} }
} }
func (s *webRTCSession) decodeOffer() (*webrtc.SessionDescription, error) { func (s *webRTCSession) buildOffer() *webrtc.SessionDescription {
var offer webrtc.SessionDescription return &webrtc.SessionDescription{
err := json.Unmarshal(s.req.offer, &offer) Type: webrtc.SDPTypeOffer,
if err != nil { SDP: string(s.req.offer),
return nil, err
}
if offer.Type != webrtc.SDPTypeOffer {
return nil, fmt.Errorf("received SDP is not an offer")
} }
return &offer, nil
} }
func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error { func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
@ -511,15 +495,10 @@ func (s *webRTCSession) waitGatheringDone(pc *peerConnection) error {
} }
func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) error { func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) error {
enc, err := json.Marshal(answer)
if err != nil {
return err
}
select { select {
case s.req.res <- webRTCSessionNewRes{ case s.req.res <- webRTCSessionNewRes{
sx: s, sx: s,
answer: enc, answer: []byte(answer.SDP),
}: }:
s.answerSent = true s.answerSent = true
case <-s.ctx.Done(): case <-s.ctx.Done():

Loading…
Cancel
Save