Browse Source

webrtc: return 404 when a stream is not present (#1805)

pull/1808/head
Alessandro Ros 3 years ago committed by GitHub
parent
commit
503a131097
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      README.md
  2. 2
      go.mod
  3. 4
      go.sum
  4. 6
      internal/core/webrtc_http_server.go
  5. 22
      internal/core/webrtc_manager.go
  6. 35
      internal/core/webrtc_manager_test.go
  7. 82
      internal/core/webrtc_session.go

2
README.md

@ -4,7 +4,7 @@
<br> <br>
_MediaMTX_/ [_rtsp-simple-server_](#note-about-rtsp-simple-server) is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams. _MediaMTX_ / [_rtsp-simple-server_](#note-about-rtsp-simple-server) is a ready-to-use and zero-dependency server and proxy that allows users to publish, read and proxy live video and audio streams.
Live streams can be published to the server with: Live streams can be published to the server with:

2
go.mod

@ -8,7 +8,7 @@ require (
github.com/alecthomas/kong v0.7.1 github.com/alecthomas/kong v0.7.1
github.com/asticode/go-astits v1.11.0 github.com/asticode/go-astits v1.11.0
github.com/bluenviron/gohlslib v0.2.4 github.com/bluenviron/gohlslib v0.2.4
github.com/bluenviron/gortsplib/v3 v3.6.1 github.com/bluenviron/gortsplib/v3 v3.6.2
github.com/bluenviron/mediacommon v0.5.0 github.com/bluenviron/mediacommon v0.5.0
github.com/fsnotify/fsnotify v1.6.0 github.com/fsnotify/fsnotify v1.6.0
github.com/gin-gonic/gin v1.9.0 github.com/gin-gonic/gin v1.9.0

4
go.sum

@ -12,8 +12,8 @@ github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2Z
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/gohlslib v0.2.4 h1:I0J6b9jVSYWL8paaC/h/r8Fyrq6SQSQj2DGnhHSi4K4= github.com/bluenviron/gohlslib v0.2.4 h1:I0J6b9jVSYWL8paaC/h/r8Fyrq6SQSQj2DGnhHSi4K4=
github.com/bluenviron/gohlslib v0.2.4/go.mod h1:PghpVEkRW003EqzSny2mUhl/5w7R0w9aAYgTIRCoaTg= github.com/bluenviron/gohlslib v0.2.4/go.mod h1:PghpVEkRW003EqzSny2mUhl/5w7R0w9aAYgTIRCoaTg=
github.com/bluenviron/gortsplib/v3 v3.6.1 h1:+/kPiwmdRwUasU5thOBATJQ4/yD+vrIEutJyRTB/f+0= github.com/bluenviron/gortsplib/v3 v3.6.2 h1:rbMcohs6h3HwToJAyYcvUJm/m3PzWK5bYkNLZPVkjF8=
github.com/bluenviron/gortsplib/v3 v3.6.1/go.mod h1:gc6Z8pBUMC9QBqYxcOY9eVxjDPOrmFcwVH61Xs3Gu2A= github.com/bluenviron/gortsplib/v3 v3.6.2/go.mod h1:U9oODQzfy3xHaWUScGwcGUHxAQWr4b7FpAo7qzu+REE=
github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY= github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY=
github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8= github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=

6
internal/core/webrtc_http_server.go

@ -109,7 +109,7 @@ func marshalICEFragment(offer *webrtc.SessionDescription, candidates []*webrtc.I
type webRTCHTTPServerParent interface { type webRTCHTTPServerParent interface {
logger.Writer logger.Writer
genICEServers() []webrtc.ICEServer genICEServers() []webrtc.ICEServer
sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes
sessionAddCandidates(req webRTCSessionAddCandidatesReq) webRTCSessionAddCandidatesRes sessionAddCandidates(req webRTCSessionAddCandidatesReq) webRTCSessionAddCandidatesRes
} }
@ -315,7 +315,9 @@ func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {
videoBitrate: ctx.Query("video_bitrate"), videoBitrate: ctx.Query("video_bitrate"),
}) })
if res.err != nil { if res.err != nil {
ctx.Writer.WriteHeader(http.StatusInternalServerError) if res.errStatusCode != 0 {
ctx.Writer.WriteHeader(res.errStatusCode)
}
return return
} }

22
internal/core/webrtc_manager.go

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"net/http"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@ -96,10 +97,11 @@ type webRTCManagerAPISessionsKickReq struct {
res chan webRTCManagerAPISessionsKickRes res chan webRTCManagerAPISessionsKickRes
} }
type webRTCNewSessionRes struct { type webRTCSessionNewRes struct {
sx *webRTCSession sx *webRTCSession
answer []byte answer []byte
err error err error
errStatusCode int
} }
type webRTCSessionNewReq struct { type webRTCSessionNewReq struct {
@ -110,7 +112,7 @@ type webRTCSessionNewReq struct {
videoCodec string videoCodec string
audioCodec string audioCodec string
videoBitrate string videoBitrate string
res chan webRTCNewSessionRes res chan webRTCSessionNewRes
} }
type webRTCSessionAddCandidatesRes struct { type webRTCSessionAddCandidatesRes struct {
@ -289,7 +291,7 @@ outer:
) )
m.sessions[sx] = struct{}{} m.sessions[sx] = struct{}{}
m.sessionsBySecret[sx.secret] = sx m.sessionsBySecret[sx.secret] = sx
req.res <- webRTCNewSessionRes{sx: sx} req.res <- webRTCSessionNewRes{sx: sx}
case sx := <-m.chSessionClose: case sx := <-m.chSessionClose:
delete(m.sessions, sx) delete(m.sessions, sx)
@ -429,8 +431,8 @@ func (m *webRTCManager) genICEServers() []webrtc.ICEServer {
} }
// sessionNew is called by webRTCHTTPServer. // sessionNew is called by webRTCHTTPServer.
func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes { func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCSessionNewRes {
req.res = make(chan webRTCNewSessionRes) req.res = make(chan webRTCSessionNewRes)
select { select {
case m.chSessionNew <- req: case m.chSessionNew <- req:
@ -441,11 +443,11 @@ func (m *webRTCManager) sessionNew(req webRTCSessionNewReq) webRTCNewSessionRes
return res2 return res2
case <-res1.sx.ctx.Done(): case <-res1.sx.ctx.Done():
return webRTCNewSessionRes{err: fmt.Errorf("terminated")} return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
} }
case <-m.ctx.Done(): case <-m.ctx.Done():
return webRTCNewSessionRes{err: fmt.Errorf("terminated")} return webRTCSessionNewRes{err: fmt.Errorf("terminated"), errStatusCode: http.StatusInternalServerError}
} }
} }

35
internal/core/webrtc_manager_test.go

@ -295,6 +295,41 @@ func TestWebRTCRead(t *testing.T) {
}, pkt) }, pkt)
} }
func TestWebRTCReadNotFound(t *testing.T) {
p, ok := newInstance("paths:\n" +
" all:\n")
require.Equal(t, true, ok)
defer p.Close()
iceServers := whipGetICEServers(t, "http://localhost:8889/stream/whep")
pc, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: iceServers,
})
require.NoError(t, err)
defer pc.Close()
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo)
require.NoError(t, err)
offer, err := pc.CreateOffer(nil)
require.NoError(t, err)
enc, err := json.Marshal(offer)
require.NoError(t, err)
req, err := http.NewRequest("POST", "http://localhost:8889/stream/whep", bytes.NewReader(enc))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/sdp")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer res.Body.Close()
require.Equal(t, http.StatusNotFound, res.StatusCode)
}
func TestWebRTCPublish(t *testing.T) { func TestWebRTCPublish(t *testing.T) {
p, ok := newInstance("paths:\n" + p, ok := newInstance("paths:\n" +
" all:\n") " all:\n")

82
internal/core/webrtc_session.go

@ -5,7 +5,9 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -155,9 +157,9 @@ type webRTCSession struct {
created time.Time created time.Time
uuid uuid.UUID uuid uuid.UUID
secret uuid.UUID secret uuid.UUID
answerSent bool
pcMutex sync.RWMutex pcMutex sync.RWMutex
pc *peerConnection pc *peerConnection
answerSent bool
chAddRemoteCandidates chan webRTCSessionAddCandidatesReq chAddRemoteCandidates chan webRTCSessionAddCandidatesReq
} }
@ -218,12 +220,13 @@ func (s *webRTCSession) safePC() *peerConnection {
func (s *webRTCSession) run() { func (s *webRTCSession) run() {
defer s.wg.Done() defer s.wg.Done()
err := s.runInner() errStatusCode, err := s.runInner()
if !s.answerSent { if !s.answerSent {
select { select {
case s.req.res <- webRTCNewSessionRes{ case s.req.res <- webRTCSessionNewRes{
err: err, err: err,
errStatusCode: errStatusCode,
}: }:
case <-s.ctx.Done(): case <-s.ctx.Done():
} }
@ -234,28 +237,28 @@ func (s *webRTCSession) run() {
s.Log(logger.Info, "closed (%v)", err) s.Log(logger.Info, "closed (%v)", err)
} }
func (s *webRTCSession) runInner() error { func (s *webRTCSession) runInner() (int, error) {
if s.req.publish { if s.req.publish {
return s.runPublish() return s.runPublish()
} }
return s.runRead() return s.runRead()
} }
func (s *webRTCSession) runPublish() error { func (s *webRTCSession) runPublish() (int, error) {
res := s.pathManager.publisherAdd(pathPublisherAddReq{ res := s.pathManager.publisherAdd(pathPublisherAddReq{
author: s, author: s,
pathName: s.req.pathName, pathName: s.req.pathName,
skipAuth: true, skipAuth: true,
}) })
if res.err != nil { if res.err != nil {
return res.err return http.StatusInternalServerError, res.err
} }
defer res.path.publisherRemove(pathPublisherRemoveReq{author: s}) defer res.path.publisherRemove(pathPublisherRemoveReq{author: s})
offer, err := s.decodeOffer() offer, err := s.decodeOffer()
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
pc, err := newPeerConnection( pc, err := newPeerConnection(
@ -267,7 +270,7 @@ func (s *webRTCSession) runPublish() error {
s.iceTCPMux, s.iceTCPMux,
s) s)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
defer pc.close() defer pc.close()
@ -275,14 +278,14 @@ func (s *webRTCSession) runPublish() error {
Direction: webrtc.RTPTransceiverDirectionRecvonly, Direction: webrtc.RTPTransceiverDirectionRecvonly,
}) })
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
_, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{ _, err = pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly, Direction: webrtc.RTPTransceiverDirectionRecvonly,
}) })
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
trackRecv := make(chan trackRecvPair) trackRecv := make(chan trackRecvPair)
@ -296,23 +299,23 @@ func (s *webRTCSession) runPublish() error {
err = pc.SetRemoteDescription(*offer) err = pc.SetRemoteDescription(*offer)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
answer, err := pc.CreateAnswer(nil) answer, err := pc.CreateAnswer(nil)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
err = pc.SetLocalDescription(answer) err = pc.SetLocalDescription(answer)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
if s.req.videoBitrate != "" { if s.req.videoBitrate != "" {
tmp, err := strconv.ParseUint(s.req.videoBitrate, 10, 31) tmp, err := strconv.ParseUint(s.req.videoBitrate, 10, 31)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
insertTias(&answer, tmp*1024) insertTias(&answer, tmp*1024)
@ -320,24 +323,24 @@ func (s *webRTCSession) runPublish() error {
err = s.waitGatheringDone(pc) err = s.waitGatheringDone(pc)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
err = s.writeAnswer(pc.LocalDescription()) err = s.writeAnswer(pc.LocalDescription())
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
go s.readRemoteCandidates(pc) go s.readRemoteCandidates(pc)
err = s.waitUntilConnected(pc) err = s.waitUntilConnected(pc)
if err != nil { if err != nil {
return err return 0, err
} }
tracks, err := gatherIncomingTracks(s.ctx, pc, trackRecv) tracks, err := gatherIncomingTracks(s.ctx, pc, trackRecv)
if err != nil { if err != nil {
return err return 0, err
} }
medias := mediasOfIncomingTracks(tracks) medias := mediasOfIncomingTracks(tracks)
@ -347,7 +350,7 @@ func (s *webRTCSession) runPublish() error {
generateRTPPackets: false, generateRTPPackets: false,
}) })
if rres.err != nil { if rres.err != nil {
return rres.err return 0, rres.err
} }
s.Log(logger.Info, "is publishing to path '%s', %s", s.Log(logger.Info, "is publishing to path '%s', %s",
@ -360,33 +363,36 @@ func (s *webRTCSession) runPublish() error {
select { select {
case <-pc.disconnected: case <-pc.disconnected:
return fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case <-s.ctx.Done(): case <-s.ctx.Done():
return fmt.Errorf("terminated") return 0, fmt.Errorf("terminated")
} }
} }
func (s *webRTCSession) runRead() error { func (s *webRTCSession) runRead() (int, error) {
res := s.pathManager.readerAdd(pathReaderAddReq{ res := s.pathManager.readerAdd(pathReaderAddReq{
author: s, author: s,
pathName: s.req.pathName, pathName: s.req.pathName,
skipAuth: true, skipAuth: true,
}) })
if res.err != nil { if res.err != nil {
return res.err if strings.HasPrefix(res.err.Error(), "no one is publishing") {
return http.StatusNotFound, res.err
}
return http.StatusInternalServerError, res.err
} }
defer res.path.readerRemove(pathReaderRemoveReq{author: s}) defer res.path.readerRemove(pathReaderRemoveReq{author: s})
tracks, err := gatherOutgoingTracks(res.stream.medias()) tracks, err := gatherOutgoingTracks(res.stream.medias())
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
offer, err := s.decodeOffer() offer, err := s.decodeOffer()
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
pc, err := newPeerConnection( pc, err := newPeerConnection(
@ -398,7 +404,7 @@ func (s *webRTCSession) runRead() error {
s.iceTCPMux, s.iceTCPMux,
s) s)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
defer pc.close() defer pc.close()
@ -406,40 +412,40 @@ func (s *webRTCSession) runRead() error {
var err error var err error
track.sender, err = pc.AddTrack(track.track) track.sender, err = pc.AddTrack(track.track)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
} }
err = pc.SetRemoteDescription(*offer) err = pc.SetRemoteDescription(*offer)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
answer, err := pc.CreateAnswer(nil) answer, err := pc.CreateAnswer(nil)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
err = pc.SetLocalDescription(answer) err = pc.SetLocalDescription(answer)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
err = s.waitGatheringDone(pc) err = s.waitGatheringDone(pc)
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
err = s.writeAnswer(pc.LocalDescription()) err = s.writeAnswer(pc.LocalDescription())
if err != nil { if err != nil {
return err return http.StatusBadRequest, err
} }
go s.readRemoteCandidates(pc) go s.readRemoteCandidates(pc)
err = s.waitUntilConnected(pc) err = s.waitUntilConnected(pc)
if err != nil { if err != nil {
return err return 0, err
} }
ringBuffer, _ := ringbuffer.New(uint64(s.readBufferCount)) ringBuffer, _ := ringbuffer.New(uint64(s.readBufferCount))
@ -468,13 +474,13 @@ func (s *webRTCSession) runRead() error {
select { select {
case <-pc.disconnected: case <-pc.disconnected:
return fmt.Errorf("peer connection closed") return 0, fmt.Errorf("peer connection closed")
case err := <-writeError: case err := <-writeError:
return err return 0, err
case <-s.ctx.Done(): case <-s.ctx.Done():
return fmt.Errorf("terminated") return 0, fmt.Errorf("terminated")
} }
} }
@ -511,7 +517,7 @@ func (s *webRTCSession) writeAnswer(answer *webrtc.SessionDescription) error {
} }
select { select {
case s.req.res <- webRTCNewSessionRes{ case s.req.res <- webRTCSessionNewRes{
sx: s, sx: s,
answer: enc, answer: enc,
}: }:

Loading…
Cancel
Save