From 854f00afdc0f897b566d6a6a89af8e16315a2d89 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 13 Dec 2020 13:58:56 +0100 Subject: [PATCH] update gortsplib --- go.mod | 2 +- go.sum | 4 +- internal/client/client.go | 389 ++++++++++++++++---------------------- 3 files changed, 169 insertions(+), 226 deletions(-) diff --git a/go.mod b/go.mod index ca475276..0cec0cbe 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect - github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8 + github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.4.9 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 diff --git a/go.sum b/go.sum index 44bba788..4cd14122 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafo github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8 h1:nDEZtoFBPDPgu9wxujoTEmMXFNTg+d0ATYKSgGHtsgE= -github.com/aler9/gortsplib v0.0.0-20201212222949-4c942d33fed8/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= +github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c h1:NEqO8o8hNAEeYB7OZnkT2k9QVLcBlOZXBLZYQeE1aRg= +github.com/aler9/gortsplib v0.0.0-20201213125208-7ce72fadb91c/go.mod h1:8P09VjpiPJFyfkVosyF5/TY82jNwkMN165NS/7sc32I= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/internal/client/client.go b/internal/client/client.go index 71b00476..0b6e59f4 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -200,181 +200,15 @@ func (c *Client) run() { defer onConnectCmd.Close() } - readDone := c.conn.Read(c.onRequest, c.onFrame) - - select { - case err := <-readDone: - c.conn.Close() - if err != io.EOF && err != errTerminated { - c.log(logger.Info, "ERR: %s", err) - } - - switch c.state { - case statePlay: - c.stopPlay() - - case stateRecord: - c.stopRecord() - } - - if c.path != nil { - c.path.OnClientRemove(c) - c.path = nil - } - - c.parent.OnClientClose(c) - <-c.terminate - - case <-c.terminate: - c.conn.Close() - <-readDone - - switch c.state { - case statePlay: - c.stopPlay() - - case stateRecord: - c.stopRecord() - } - - if c.path != nil { - c.path.OnClientRemove(c) - c.path = nil - } - } -} - -type errAuthNotCritical struct { - *base.Response -} - -func (errAuthNotCritical) Error() string { - return "auth not critical" -} - -type errAuthCritical struct { - *base.Response -} - -func (errAuthCritical) Error() string { - return "auth critical" -} - -// Authenticate performs an authentication. -func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error { - // validate ip - if ips != nil { - ip := c.ip() - - if !ipEqualOrInRange(ip, ips) { - c.log(logger.Info, "ERR: ip '%s' not allowed", ip) - - return errAuthCritical{&base.Response{ - StatusCode: base.StatusUnauthorized, - }} - } - } - - // validate user - if user != "" { - // reset authHelper every time the credentials change - if c.authHelper == nil || c.authUser != user || c.authPass != pass { - c.authUser = user - c.authPass = pass - c.authHelper = auth.NewServer(user, pass, authMethods) - } - - err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.URL) - if err != nil { - c.authFailures++ - - // vlc with login prompt sends 4 requests: - // 1) without credentials - // 2) with password but without username - // 3) without credentials - // 4) with password and username - // therefore we must allow up to 3 failures - if c.authFailures > 3 { - c.log(logger.Info, "ERR: unauthorized: %s", err) - - return errAuthCritical{&base.Response{ - StatusCode: base.StatusUnauthorized, - Header: base.Header{ - "WWW-Authenticate": c.authHelper.GenerateHeader(), - }, - }} - } - - if c.authFailures > 1 { - c.log(logger.Debug, "WARN: unauthorized: %s", err) - } - - return errAuthNotCritical{&base.Response{ - StatusCode: base.StatusUnauthorized, - Header: base.Header{ - "WWW-Authenticate": c.authHelper.GenerateHeader(), - }, - }} - } - } - - // login successful, reset authFailures - c.authFailures = 0 - - return nil -} - -func (c *Client) checkState(allowed map[state]struct{}) error { - if _, ok := allowed[c.state]; ok { - return nil + onRequest := func(req *base.Request) { + c.log(logger.Debug, "[c->s] %v", req) } - var allowedList []state - for s := range allowed { - allowedList = append(allowedList, s) + onResponse := func(res *base.Response) { + c.log(logger.Debug, "[s->c] %v", res) } - return fmt.Errorf("client must be in state %v, while is in state %v", - allowedList, c.state) -} - -func (c *Client) onRequest(req *base.Request) (*base.Response, error) { - c.log(logger.Debug, "[c->s] %v", req) - res, err := c.onRequestInner(req) - c.log(logger.Debug, "[s->c] %v", res) - return res, err -} - -func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { - switch req.Method { - case base.Options: - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Public": base.HeaderValue{strings.Join([]string{ - string(base.GetParameter), - string(base.Describe), - string(base.Announce), - string(base.Setup), - string(base.Play), - string(base.Record), - string(base.Pause), - string(base.Teardown), - }, ", ")}, - }, - }, nil - - // GET_PARAMETER is used like a ping - case base.GetParameter: - return &base.Response{ - StatusCode: base.StatusOK, - Header: base.Header{ - "Content-Type": base.HeaderValue{"text/parameters"}, - }, - Content: []byte("\n"), - }, nil - - case base.Describe: + onDescribe := func(req *base.Request) (*base.Response, error) { err := c.checkState(map[state]struct{}{ stateInitial: {}, }) @@ -457,8 +291,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { StatusCode: base.StatusBadRequest, }, errTerminated } + } - case base.Announce: + onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) { err := c.checkState(map[state]struct{}{ stateInitial: {}, }) @@ -475,32 +310,6 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { }, fmt.Errorf("unable to find base path (%s)", req.URL) } - ct, ok := req.Header["Content-Type"] - if !ok || len(ct) != 1 { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("Content-Type header missing") - } - - if ct[0] != "application/sdp" { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("unsupported Content-Type '%s'", ct) - } - - tracks, err := gortsplib.ReadTracks(req.Content) - if err != nil { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("invalid SDP: %s", err) - } - - if len(tracks) == 0 { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("no tracks defined") - } - path, err := c.parent.OnClientAnnounce(c, basePath, tracks, req) if err != nil { switch terr := err.(type) { @@ -528,15 +337,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { return &base.Response{ StatusCode: base.StatusOK, }, nil + } - case base.Setup: - th, err := headers.ReadTransport(req.Header["Transport"]) - if err != nil { - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("transport header: %s", err) - } - + onSetup := func(req *base.Request, th *headers.Transport) (*base.Response, error) { if th.Delivery != nil && *th.Delivery == base.StreamDeliveryMulticast { return &base.Response{ StatusCode: base.StatusBadRequest, @@ -828,8 +631,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { StatusCode: base.StatusBadRequest, }, fmt.Errorf("client is in state '%s'", c.state) } + } - case base.Play: + onPlay := func(req *base.Request) (*base.Response, error) { // play can be sent twice, allow calling it even if we're already playing err := c.checkState(map[state]struct{}{ statePrePlay: {}, @@ -873,8 +677,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { "Session": base.HeaderValue{sessionID}, }, }, nil + } - case base.Record: + onRecord := func(req *base.Request) (*base.Response, error) { err := c.checkState(map[state]struct{}{ statePreRecord: {}, }) @@ -914,8 +719,9 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { "Session": base.HeaderValue{sessionID}, }, }, nil + } - case base.Pause: + onPause := func(req *base.Request) (*base.Response, error) { err := c.checkState(map[state]struct{}{ statePrePlay: {}, statePlay: {}, @@ -944,28 +750,165 @@ func (c *Client) onRequestInner(req *base.Request) (*base.Response, error) { "Session": base.HeaderValue{sessionID}, }, }, nil + } - case base.Teardown: - return &base.Response{ - StatusCode: base.StatusOK, - }, errTerminated + onFrame := func(trackID int, streamType gortsplib.StreamType, content []byte) { + if c.state == stateRecord { + if trackID >= len(c.streamTracks) { + return + } - default: - return &base.Response{ - StatusCode: base.StatusBadRequest, - }, fmt.Errorf("unhandled method '%s'", req.Method) + c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, content) + c.path.OnFrame(trackID, streamType, content) + } + } + + readDone := c.conn.Read(gortsplib.ServerConnReadHandlers{ + OnRequest: onRequest, + OnResponse: onResponse, + OnDescribe: onDescribe, + OnAnnounce: onAnnounce, + OnSetup: onSetup, + OnPlay: onPlay, + OnRecord: onRecord, + OnPause: onPause, + OnFrame: onFrame, + }) + + select { + case err := <-readDone: + c.conn.Close() + if err != io.EOF && err != errTerminated { + c.log(logger.Info, "ERR: %s", err) + } + + switch c.state { + case statePlay: + c.stopPlay() + + case stateRecord: + c.stopRecord() + } + + if c.path != nil { + c.path.OnClientRemove(c) + c.path = nil + } + + c.parent.OnClientClose(c) + <-c.terminate + + case <-c.terminate: + c.conn.Close() + <-readDone + + switch c.state { + case statePlay: + c.stopPlay() + + case stateRecord: + c.stopRecord() + } + + if c.path != nil { + c.path.OnClientRemove(c) + c.path = nil + } } } -func (c *Client) onFrame(trackID int, streamType gortsplib.StreamType, content []byte) { - if c.state == stateRecord { - if trackID >= len(c.streamTracks) { - return +type errAuthNotCritical struct { + *base.Response +} + +func (errAuthNotCritical) Error() string { + return "auth not critical" +} + +type errAuthCritical struct { + *base.Response +} + +func (errAuthCritical) Error() string { + return "auth critical" +} + +// Authenticate performs an authentication. +func (c *Client) Authenticate(authMethods []headers.AuthMethod, ips []interface{}, user string, pass string, req *base.Request) error { + // validate ip + if ips != nil { + ip := c.ip() + + if !ipEqualOrInRange(ip, ips) { + c.log(logger.Info, "ERR: ip '%s' not allowed", ip) + + return errAuthCritical{&base.Response{ + StatusCode: base.StatusUnauthorized, + }} + } + } + + // validate user + if user != "" { + // reset authHelper every time the credentials change + if c.authHelper == nil || c.authUser != user || c.authPass != pass { + c.authUser = user + c.authPass = pass + c.authHelper = auth.NewServer(user, pass, authMethods) + } + + err := c.authHelper.ValidateHeader(req.Header["Authorization"], req.Method, req.URL) + if err != nil { + c.authFailures++ + + // vlc with login prompt sends 4 requests: + // 1) without credentials + // 2) with password but without username + // 3) without credentials + // 4) with password and username + // therefore we must allow up to 3 failures + if c.authFailures > 3 { + c.log(logger.Info, "ERR: unauthorized: %s", err) + + return errAuthCritical{&base.Response{ + StatusCode: base.StatusUnauthorized, + Header: base.Header{ + "WWW-Authenticate": c.authHelper.GenerateHeader(), + }, + }} + } + + if c.authFailures > 1 { + c.log(logger.Debug, "WARN: unauthorized: %s", err) + } + + return errAuthNotCritical{&base.Response{ + StatusCode: base.StatusUnauthorized, + Header: base.Header{ + "WWW-Authenticate": c.authHelper.GenerateHeader(), + }, + }} } + } + + // login successful, reset authFailures + c.authFailures = 0 - c.rtcpReceivers[trackID].ProcessFrame(time.Now(), streamType, content) - c.path.OnFrame(trackID, streamType, content) + return nil +} + +func (c *Client) checkState(allowed map[state]struct{}) error { + if _, ok := allowed[c.state]; ok { + return nil } + + var allowedList []state + for s := range allowed { + allowedList = append(allowedList, s) + } + + return fmt.Errorf("client must be in state %v, while is in state %v", + allowedList, c.state) } func (c *Client) startPlay() {