Browse Source

move HTTP utilities in a dedicated package (#2123)

needed by #2068
pull/2124/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
119d6abf19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      internal/conf/ips_or_cidrs.go
  2. 16
      internal/core/api.go
  3. 14
      internal/core/hls_http_server.go
  4. 4
      internal/core/hls_manager.go
  5. 10
      internal/core/http_server_header.go
  6. 15
      internal/core/http_set_trusted_proxies.go
  7. 16
      internal/core/metrics.go
  8. 10
      internal/core/pprof.go
  9. 14
      internal/core/webrtc_http_server.go
  10. 19
      internal/httpserv/middleware_logger.go
  11. 11
      internal/httpserv/middleware_server_header.go
  12. 24
      internal/httpserv/wrapped_server.go

9
internal/conf/ips_or_cidrs.go

@ -55,3 +55,12 @@ func (d *IPsOrCIDRs) UnmarshalEnv(s string) error {
byts, _ := json.Marshal(strings.Split(s, ",")) byts, _ := json.Marshal(strings.Split(s, ","))
return d.UnmarshalJSON(byts) return d.UnmarshalJSON(byts)
} }
// ToTrustedProxies converts IPsOrCIDRs into a string slice for SetTrustedProxies.
func (d *IPsOrCIDRs) ToTrustedProxies() []string {
ret := make([]string, len(*d))
for i, entry := range *d {
ret[i] = entry.String()
}
return ret
}

16
internal/core/api.go

@ -12,6 +12,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -195,7 +196,7 @@ type api struct {
webRTCManager apiWebRTCManager webRTCManager apiWebRTCManager
parent apiParent parent apiParent
httpServer *httpServer httpServer *httpserv.WrappedServer
mutex sync.Mutex mutex sync.Mutex
} }
@ -227,9 +228,9 @@ func newAPI(
router := gin.New() router := gin.New()
router.SetTrustedProxies(nil) router.SetTrustedProxies(nil)
mwLog := httpLoggerMiddleware(a) mwLog := httpserv.MiddlewareLogger(a)
router.NoRoute(mwLog, httpServerHeaderMiddleware) router.NoRoute(mwLog, httpserv.MiddlewareServerHeader)
group := router.Group("/", mwLog, httpServerHeaderMiddleware) group := router.Group("/", mwLog, httpserv.MiddlewareServerHeader)
group.GET("/v2/config/get", a.onConfigGet) group.GET("/v2/config/get", a.onConfigGet)
group.POST("/v2/config/set", a.onConfigSet) group.POST("/v2/config/set", a.onConfigSet)
@ -279,8 +280,11 @@ func newAPI(
group.POST("/v2/webrtcsessions/kick/:id", a.onWebRTCSessionsKick) group.POST("/v2/webrtcsessions/kick/:id", a.onWebRTCSessionsKick)
} }
network, address := restrictNetwork("tcp", address)
var err error var err error
a.httpServer, err = newHTTPServer( a.httpServer, err = httpserv.NewWrappedServer(
network,
address, address,
readTimeout, readTimeout,
"", "",
@ -298,7 +302,7 @@ func newAPI(
func (a *api) close() { func (a *api) close() {
a.Log(logger.Info, "listener is closing") a.Log(logger.Info, "listener is closing")
a.httpServer.close() a.httpServer.Close()
} }
func (a *api) Log(level logger.Level, format string, args ...interface{}) { func (a *api) Log(level logger.Level, format string, args ...interface{}) {

14
internal/core/hls_http_server.go

@ -12,6 +12,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -32,7 +33,7 @@ type hlsHTTPServer struct {
pathManager *pathManager pathManager *pathManager
parent hlsHTTPServerParent parent hlsHTTPServerParent
inner *httpServer inner *httpserv.WrappedServer
} }
func newHLSHTTPServer( //nolint:dupl func newHLSHTTPServer( //nolint:dupl
@ -62,12 +63,15 @@ func newHLSHTTPServer( //nolint:dupl
} }
router := gin.New() router := gin.New()
httpSetTrustedProxies(router, trustedProxies) router.SetTrustedProxies(trustedProxies.ToTrustedProxies())
router.NoRoute(httpLoggerMiddleware(s), httpServerHeaderMiddleware, s.onRequest) router.NoRoute(httpserv.MiddlewareLogger(s), httpserv.MiddlewareServerHeader, s.onRequest)
network, address := restrictNetwork("tcp", address)
var err error var err error
s.inner, err = newHTTPServer( s.inner, err = httpserv.NewWrappedServer(
network,
address, address,
readTimeout, readTimeout,
serverCert, serverCert,
@ -86,7 +90,7 @@ func (s *hlsHTTPServer) Log(level logger.Level, format string, args ...interface
} }
func (s *hlsHTTPServer) close() { func (s *hlsHTTPServer) close() {
s.inner.close() s.inner.Close()
} }
func (s *hlsHTTPServer) onRequest(ctx *gin.Context) { func (s *hlsHTTPServer) onRequest(ctx *gin.Context) {

4
internal/core/hls_manager.go

@ -131,7 +131,7 @@ func newHLSManager(
m.pathManager.hlsManagerSet(m) m.pathManager.hlsManagerSet(m)
if m.metrics != nil { if m.metrics != nil {
m.metrics.hlsManagerSet(m) m.metrics.setHLSManager(m)
} }
m.wg.Add(1) m.wg.Add(1)
@ -226,7 +226,7 @@ outer:
m.pathManager.hlsManagerSet(nil) m.pathManager.hlsManagerSet(nil)
if m.metrics != nil { if m.metrics != nil {
m.metrics.hlsManagerSet(nil) m.metrics.setHLSManager(nil)
} }
} }

10
internal/core/http_server_header.go

@ -1,10 +0,0 @@
package core
import (
"github.com/gin-gonic/gin"
)
func httpServerHeaderMiddleware(ctx *gin.Context) {
ctx.Writer.Header().Set("Server", "mediamtx")
ctx.Next()
}

15
internal/core/http_set_trusted_proxies.go

@ -1,15 +0,0 @@
package core
import (
"github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/conf"
)
func httpSetTrustedProxies(router *gin.Engine, trustedProxies conf.IPsOrCIDRs) {
tmp := make([]string, len(trustedProxies))
for i, entry := range trustedProxies {
tmp[i] = entry.String()
}
router.SetTrustedProxies(tmp)
}

16
internal/core/metrics.go

@ -9,6 +9,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -23,7 +24,7 @@ type metricsParent interface {
type metrics struct { type metrics struct {
parent metricsParent parent metricsParent
httpServer *httpServer httpServer *httpserv.WrappedServer
mutex sync.Mutex mutex sync.Mutex
pathManager apiPathManager pathManager apiPathManager
rtspServer apiRTSPServer rtspServer apiRTSPServer
@ -45,12 +46,15 @@ func newMetrics(
router := gin.New() router := gin.New()
router.SetTrustedProxies(nil) router.SetTrustedProxies(nil)
mwLog := httpLoggerMiddleware(m) mwLog := httpserv.MiddlewareLogger(m)
router.NoRoute(mwLog) router.NoRoute(mwLog)
router.GET("/metrics", mwLog, m.onMetrics) router.GET("/metrics", mwLog, m.onMetrics)
network, address := restrictNetwork("tcp", address)
var err error var err error
m.httpServer, err = newHTTPServer( m.httpServer, err = httpserv.NewWrappedServer(
network,
address, address,
readTimeout, readTimeout,
"", "",
@ -68,7 +72,7 @@ func newMetrics(
func (m *metrics) close() { func (m *metrics) close() {
m.Log(logger.Info, "listener is closing") m.Log(logger.Info, "listener is closing")
m.httpServer.close() m.httpServer.Close()
} }
func (m *metrics) Log(level logger.Level, format string, args ...interface{}) { func (m *metrics) Log(level logger.Level, format string, args ...interface{}) {
@ -221,8 +225,8 @@ func (m *metrics) pathManagerSet(s apiPathManager) {
m.pathManager = s m.pathManager = s
} }
// hlsManagerSet is called by hlsManager. // setHLSManager is called by hlsManager.
func (m *metrics) hlsManagerSet(s apiHLSManager) { func (m *metrics) setHLSManager(s apiHLSManager) {
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.hlsManager = s m.hlsManager = s

10
internal/core/pprof.go

@ -7,6 +7,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -17,7 +18,7 @@ type pprofParent interface {
type pprof struct { type pprof struct {
parent pprofParent parent pprofParent
httpServer *httpServer httpServer *httpserv.WrappedServer
} }
func newPPROF( func newPPROF(
@ -29,8 +30,11 @@ func newPPROF(
parent: parent, parent: parent,
} }
network, address := restrictNetwork("tcp", address)
var err error var err error
pp.httpServer, err = newHTTPServer( pp.httpServer, err = httpserv.NewWrappedServer(
network,
address, address,
readTimeout, readTimeout,
"", "",
@ -48,7 +52,7 @@ func newPPROF(
func (pp *pprof) close() { func (pp *pprof) close() {
pp.Log(logger.Info, "listener is closing") pp.Log(logger.Info, "listener is closing")
pp.httpServer.close() pp.httpServer.Close()
} }
func (pp *pprof) Log(level logger.Level, format string, args ...interface{}) { func (pp *pprof) Log(level logger.Level, format string, args ...interface{}) {

14
internal/core/webrtc_http_server.go

@ -18,6 +18,7 @@ import (
"github.com/pion/webrtc/v3" "github.com/pion/webrtc/v3"
"github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/conf"
"github.com/bluenviron/mediamtx/internal/httpserv"
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
@ -170,7 +171,7 @@ type webRTCHTTPServer struct {
pathManager *pathManager pathManager *pathManager
parent webRTCHTTPServerParent parent webRTCHTTPServerParent
inner *httpServer inner *httpserv.WrappedServer
} }
func newWebRTCHTTPServer( //nolint:dupl func newWebRTCHTTPServer( //nolint:dupl
@ -200,11 +201,14 @@ func newWebRTCHTTPServer( //nolint:dupl
} }
router := gin.New() router := gin.New()
httpSetTrustedProxies(router, trustedProxies) router.SetTrustedProxies(trustedProxies.ToTrustedProxies())
router.NoRoute(httpLoggerMiddleware(s), httpServerHeaderMiddleware, s.onRequest) router.NoRoute(httpserv.MiddlewareLogger(s), httpserv.MiddlewareServerHeader, s.onRequest)
network, address := restrictNetwork("tcp", address)
var err error var err error
s.inner, err = newHTTPServer( s.inner, err = httpserv.NewWrappedServer(
network,
address, address,
readTimeout, readTimeout,
serverCert, serverCert,
@ -223,7 +227,7 @@ func (s *webRTCHTTPServer) Log(level logger.Level, format string, args ...interf
} }
func (s *webRTCHTTPServer) close() { func (s *webRTCHTTPServer) close() {
s.inner.close() s.inner.Close()
} }
func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) { func (s *webRTCHTTPServer) onRequest(ctx *gin.Context) {

19
internal/core/http_logger.go → internal/httpserv/middleware_logger.go

@ -1,4 +1,4 @@
package core package httpserv
import ( import (
"bytes" "bytes"
@ -11,22 +11,22 @@ import (
"github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/logger"
) )
type httpLoggerWriter struct { type loggerWriter struct {
gin.ResponseWriter gin.ResponseWriter
buf bytes.Buffer buf bytes.Buffer
} }
func (w *httpLoggerWriter) Write(b []byte) (int, error) { func (w *loggerWriter) Write(b []byte) (int, error) {
w.buf.Write(b) w.buf.Write(b)
return w.ResponseWriter.Write(b) return w.ResponseWriter.Write(b)
} }
func (w *httpLoggerWriter) WriteString(s string) (int, error) { func (w *loggerWriter) WriteString(s string) (int, error) {
w.buf.WriteString(s) w.buf.WriteString(s)
return w.ResponseWriter.WriteString(s) return w.ResponseWriter.WriteString(s)
} }
func (w *httpLoggerWriter) dump() string { func (w *loggerWriter) dump() string {
var buf bytes.Buffer var buf bytes.Buffer
fmt.Fprintf(&buf, "%s %d %s\n", "HTTP/1.1", w.ResponseWriter.Status(), http.StatusText(w.ResponseWriter.Status())) fmt.Fprintf(&buf, "%s %d %s\n", "HTTP/1.1", w.ResponseWriter.Status(), http.StatusText(w.ResponseWriter.Status()))
w.ResponseWriter.Header().Write(&buf) w.ResponseWriter.Header().Write(&buf)
@ -37,18 +37,15 @@ func (w *httpLoggerWriter) dump() string {
return buf.String() return buf.String()
} }
type httpLoggerParent interface { // MiddlewareLogger is a middleware that logs requests and responses.
logger.Writer func MiddlewareLogger(p logger.Writer) func(*gin.Context) {
}
func httpLoggerMiddleware(p httpLoggerParent) func(*gin.Context) {
return func(ctx *gin.Context) { return func(ctx *gin.Context) {
p.Log(logger.Debug, "[conn %v] %s %s", ctx.Request.RemoteAddr, ctx.Request.Method, ctx.Request.URL.Path) p.Log(logger.Debug, "[conn %v] %s %s", ctx.Request.RemoteAddr, ctx.Request.Method, ctx.Request.URL.Path)
byts, _ := httputil.DumpRequest(ctx.Request, true) byts, _ := httputil.DumpRequest(ctx.Request, true)
p.Log(logger.Debug, "[conn %v] [c->s] %s", ctx.Request.RemoteAddr, string(byts)) p.Log(logger.Debug, "[conn %v] [c->s] %s", ctx.Request.RemoteAddr, string(byts))
logw := &httpLoggerWriter{ResponseWriter: ctx.Writer} logw := &loggerWriter{ResponseWriter: ctx.Writer}
ctx.Writer = logw ctx.Writer = logw
ctx.Next() ctx.Next()

11
internal/httpserv/middleware_server_header.go

@ -0,0 +1,11 @@
package httpserv
import (
"github.com/gin-gonic/gin"
)
// MiddlewareServerHeader is a middleware that sets the Server header.
func MiddlewareServerHeader(ctx *gin.Context) {
ctx.Writer.Header().Set("Server", "mediamtx")
ctx.Next()
}

24
internal/core/http_server.go → internal/httpserv/wrapped_server.go

@ -1,4 +1,5 @@
package core // Package httpserv contains HTTP server utilities.
package httpserv
import ( import (
"context" "context"
@ -20,7 +21,7 @@ func (nilWriter) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
// exit when there's a panic inside HTTP handlers. // exit when there's a panic inside the HTTP handler.
// https://github.com/golang/go/issues/16542 // https://github.com/golang/go/issues/16542
type exitOnPanicHandler struct { type exitOnPanicHandler struct {
http.Handler http.Handler
@ -39,19 +40,25 @@ func (h exitOnPanicHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.Handler.ServeHTTP(w, r) h.Handler.ServeHTTP(w, r)
} }
type httpServer struct { // WrappedServer is a wrapper around http.Server that provides:
// - net.Listener allocation and closure
// - TLS allocation
// - exit on panic
type WrappedServer struct {
ln net.Listener ln net.Listener
inner *http.Server inner *http.Server
} }
func newHTTPServer( // NewWrappedServer allocates a WrappedServer.
func NewWrappedServer(
network string,
address string, address string,
readTimeout conf.StringDuration, readTimeout conf.StringDuration,
serverCert string, serverCert string,
serverKey string, serverKey string,
handler http.Handler, handler http.Handler,
) (*httpServer, error) { ) (*WrappedServer, error) {
ln, err := net.Listen(restrictNetwork("tcp", address)) ln, err := net.Listen(network, address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -69,7 +76,7 @@ func newHTTPServer(
} }
} }
s := &httpServer{ s := &WrappedServer{
ln: ln, ln: ln,
inner: &http.Server{ inner: &http.Server{
Handler: exitOnPanicHandler{handler}, Handler: exitOnPanicHandler{handler},
@ -88,7 +95,8 @@ func newHTTPServer(
return s, nil return s, nil
} }
func (s *httpServer) close() { // Close closes all resources and waits for all routines to return.
func (s *WrappedServer) Close() {
s.inner.Shutdown(context.Background()) s.inner.Shutdown(context.Background())
s.ln.Close() // in case Shutdown() is called before Serve() s.ln.Close() // in case Shutdown() is called before Serve()
} }
Loading…
Cancel
Save