Browse Source

RTSP session: add author connection to log messages

pull/442/head
aler9 4 years ago
parent
commit
4da59604e1
  1. 2
      go.mod
  2. 4
      go.sum
  3. 2
      internal/clientrtmp/client.go
  4. 2
      internal/clientrtsp/client.go
  5. 2
      internal/serverhls/server.go
  6. 88
      internal/serverrtsp/server.go
  7. 13
      internal/sessionrtsp/session.go

2
go.mod

@ -5,7 +5,7 @@ go 1.15 @@ -5,7 +5,7 @@ go 1.15
require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9
github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4
github.com/asticode/go-astits v0.0.0-00010101000000-000000000000
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9

4
go.sum

@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c @@ -4,8 +4,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2c
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04 h1:CXgQLsU4uxWAmsXNOjGLbj0A+0IlRcpZpMgI13fmVwo=
github.com/aler9/go-astits v0.0.0-20210423195926-582b09ed7c04/go.mod h1:DkOWmBNQpnr9mv24KfZjq4JawCFX1FCqjLVGvO0DygQ=
github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9 h1:v9QqgvTa2mCcsEFvwJigabE/fNIIOXC40fGneIPm89Y=
github.com/aler9/gortsplib v0.0.0-20210508145957-4beda10c06f9/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4 h1:mL4yLi09Bpph+cLlXdU6CJAsIox9a5grz4ik4M3Q9ik=
github.com/aler9/gortsplib v0.0.0-20210508200522-b98fa90057b4/go.mod h1:zVCg+TQX445hh1pC5QgAuuBvvXZMWLY1XYz626dGFqY=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927 h1:95mXJ5fUCYpBRdSOnLAQAdJHHKxxxJrVCiaqDi965YQ=
github.com/aler9/rtmp v0.0.0-20210403095203-3be4a5535927/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=

2
internal/clientrtmp/client.go

@ -138,7 +138,7 @@ func (c *Client) IsReadPublisher() {} @@ -138,7 +138,7 @@ func (c *Client) IsReadPublisher() {}
func (c *Client) IsSource() {}
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
}
func (c *Client) ip() net.IP {

2
internal/clientrtsp/client.go

@ -110,7 +110,7 @@ func (c *Client) Close(err error) { @@ -110,7 +110,7 @@ func (c *Client) Close(err error) {
}
func (c *Client) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[client %s] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr().String()}, args...)...)
c.parent.Log(level, "[client %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
}
// Conn returns the RTSP connection.

2
internal/serverhls/server.go

@ -156,7 +156,7 @@ outer: @@ -156,7 +156,7 @@ outer:
// ServeHTTP implements http.Handler.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.Log(logger.Info, "[client %s] %s %s", r.RemoteAddr, r.Method, r.URL.Path)
s.Log(logger.Info, "[client %v] %s %s", r.RemoteAddr, r.Method, r.URL.Path)
// remove leading prefix
path := r.URL.Path[1:]

88
internal/serverrtsp/server.go

@ -1,7 +1,10 @@ @@ -1,7 +1,10 @@
package serverrtsp
import (
"crypto/rand"
"crypto/tls"
"encoding/binary"
"strconv"
"sync"
"time"
@ -15,6 +18,30 @@ import ( @@ -15,6 +18,30 @@ import (
"github.com/aler9/rtsp-simple-server/internal/stats"
)
func newSessionVisualID(sessions map[*gortsplib.ServerSession]*sessionrtsp.Session) (string, error) {
for {
b := make([]byte, 4)
_, err := rand.Read(b)
if err != nil {
return "", err
}
id := strconv.FormatUint(uint64(binary.LittleEndian.Uint32(b)), 10)
alreadyPresent := func() bool {
for _, s := range sessions {
if s.VisualID() == id {
return true
}
}
return false
}()
if !alreadyPresent {
return id, nil
}
}
}
// Parent is implemented by program.
type Parent interface {
Log(logger.Level, string, ...interface{})
@ -170,8 +197,8 @@ outer: @@ -170,8 +197,8 @@ outer:
close(serverErr)
}
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpenCtx.
func (s *Server) OnConnOpen(sc *gortsplib.ServerConn) {
// OnConnOpen implements gortsplib.ServerHandlerOnConnOpen.
func (s *Server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
c := clientrtsp.New(
s.rtspAddress,
s.readTimeout,
@ -179,25 +206,25 @@ func (s *Server) OnConnOpen(sc *gortsplib.ServerConn) { @@ -179,25 +206,25 @@ func (s *Server) OnConnOpen(sc *gortsplib.ServerConn) {
s.runOnConnectRestart,
s.pathMan,
s.stats,
sc,
ctx.Conn,
s)
s.mutex.Lock()
s.clients[sc] = c
s.clients[ctx.Conn] = c
s.mutex.Unlock()
}
// OnConnClose implements gortsplib.ServerHandlerOnConnCloseCtx.
func (s *Server) OnConnClose(sc *gortsplib.ServerConn, err error) {
// OnConnClose implements gortsplib.ServerHandlerOnConnClose.
func (s *Server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
s.mutex.Lock()
c := s.clients[sc]
delete(s.clients, sc)
c := s.clients[ctx.Conn]
delete(s.clients, ctx.Conn)
s.mutex.Unlock()
c.Close(err)
c.Close(ctx.Error)
}
// OnRequest implements gortsplib.ServerHandlerOnRequestCtx.
// OnRequest implements gortsplib.ServerHandlerOnRequest.
func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
s.mutex.Lock()
c := s.clients[sc]
@ -206,7 +233,7 @@ func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) { @@ -206,7 +233,7 @@ func (s *Server) OnRequest(sc *gortsplib.ServerConn, req *base.Request) {
c.OnRequest(req)
}
// OnResponse implements gortsplib.ServerHandlerOnResponseCtx.
// OnResponse implements gortsplib.ServerHandlerOnResponse.
func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) {
s.mutex.Lock()
c := s.clients[sc]
@ -215,31 +242,38 @@ func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) { @@ -215,31 +242,38 @@ func (s *Server) OnResponse(sc *gortsplib.ServerConn, res *base.Response) {
c.OnResponse(res)
}
// OnSessionOpen implements gortsplib.ServerHandlerOnSessionOpenCtx.
func (s *Server) OnSessionOpen(ss *gortsplib.ServerSession) {
// OnSessionOpen implements gortsplib.ServerHandlerOnSessionOpen.
func (s *Server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
s.mutex.Lock()
// do not use ss.ID() in logs, since it allows to take ownership of a session
// use a new random ID
visualID, _ := newSessionVisualID(s.sessions)
se := sessionrtsp.New(
s.rtspAddress,
s.protocols,
ss,
visualID,
ctx.Session,
ctx.Conn,
s.pathMan,
s)
s.mutex.Lock()
s.sessions[ss] = se
s.sessions[ctx.Session] = se
s.mutex.Unlock()
}
// OnSessionClose implements gortsplib.ServerHandlerOnSessionCloseCtx.
func (s *Server) OnSessionClose(ss *gortsplib.ServerSession, err error) {
// OnSessionClose implements gortsplib.ServerHandlerOnSessionClose.
func (s *Server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
s.mutex.Lock()
se := s.sessions[ss]
delete(s.sessions, ss)
se := s.sessions[ctx.Session]
delete(s.sessions, ctx.Session)
s.mutex.Unlock()
se.Close()
}
// OnDescribe implements gortsplib.ServerHandlerOnDescribeCtx.
// OnDescribe implements gortsplib.ServerHandlerOnDescribe.
func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, []byte, error) {
s.mutex.RLock()
c := s.clients[ctx.Conn]
@ -247,7 +281,7 @@ func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re @@ -247,7 +281,7 @@ func (s *Server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Re
return c.OnDescribe(ctx)
}
// OnAnnounce implements gortsplib.ServerHandlerOnAnnounceCtx.
// OnAnnounce implements gortsplib.ServerHandlerOnAnnounce.
func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
s.mutex.RLock()
c := s.clients[ctx.Conn]
@ -256,7 +290,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re @@ -256,7 +290,7 @@ func (s *Server) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Re
return se.OnAnnounce(c, ctx)
}
// OnSetup implements gortsplib.ServerHandlerOnSetupCtx.
// OnSetup implements gortsplib.ServerHandlerOnSetup.
func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, error) {
s.mutex.RLock()
c := s.clients[ctx.Conn]
@ -265,7 +299,7 @@ func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response @@ -265,7 +299,7 @@ func (s *Server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response
return se.OnSetup(c, ctx)
}
// OnPlay implements gortsplib.ServerHandlerOnPlayCtx.
// OnPlay implements gortsplib.ServerHandlerOnPlay.
func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
s.mutex.RLock()
se := s.sessions[ctx.Session]
@ -273,7 +307,7 @@ func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, @@ -273,7 +307,7 @@ func (s *Server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response,
return se.OnPlay(ctx)
}
// OnRecord implements gortsplib.ServerHandlerOnRecordCtx.
// OnRecord implements gortsplib.ServerHandlerOnRecord.
func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
s.mutex.RLock()
se := s.sessions[ctx.Session]
@ -281,7 +315,7 @@ func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respon @@ -281,7 +315,7 @@ func (s *Server) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Respon
return se.OnRecord(ctx)
}
// OnPause implements gortsplib.ServerHandlerOnPauseCtx.
// OnPause implements gortsplib.ServerHandlerOnPause.
func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response, error) {
s.mutex.RLock()
se := s.sessions[ctx.Session]
@ -289,7 +323,7 @@ func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response @@ -289,7 +323,7 @@ func (s *Server) OnPause(ctx *gortsplib.ServerHandlerOnPauseCtx) (*base.Response
return se.OnPause(ctx)
}
// OnFrame implements gortsplib.ServerHandlerOnFrameCtx.
// OnFrame implements gortsplib.ServerHandlerOnFrame.
func (s *Server) OnFrame(ctx *gortsplib.ServerHandlerOnFrameCtx) {
s.mutex.RLock()
se := s.sessions[ctx.Session]

13
internal/sessionrtsp/session.go

@ -39,6 +39,7 @@ type Parent interface { @@ -39,6 +39,7 @@ type Parent interface {
type Session struct {
rtspAddress string
protocols map[gortsplib.StreamProtocol]struct{}
visualID string
ss *gortsplib.ServerSession
pathMan PathMan
parent Parent
@ -54,19 +55,22 @@ type Session struct { @@ -54,19 +55,22 @@ type Session struct {
func New(
rtspAddress string,
protocols map[gortsplib.StreamProtocol]struct{},
visualID string,
ss *gortsplib.ServerSession,
sc *gortsplib.ServerConn,
pathMan PathMan,
parent Parent) *Session {
s := &Session{
rtspAddress: rtspAddress,
protocols: protocols,
visualID: visualID,
ss: ss,
pathMan: pathMan,
parent: parent,
}
s.log(logger.Info, "created")
s.log(logger.Info, "created by %v", sc.NetConn().RemoteAddr())
return s
}
@ -106,8 +110,13 @@ func (s *Session) IsReadPublisher() {} @@ -106,8 +110,13 @@ func (s *Session) IsReadPublisher() {}
// IsSource implements source.Source.
func (s *Session) IsSource() {}
// VisualID returns the visual ID of the session.
func (s *Session) VisualID() string {
return s.visualID
}
func (s *Session) log(level logger.Level, format string, args ...interface{}) {
s.parent.Log(level, "[session %s] "+format, append([]interface{}{"TODO"}, args...)...)
s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.visualID}, args...)...)
}
// OnAnnounce is called by serverrtsp.Server.

Loading…
Cancel
Save