Browse Source

hls: decrease number of idle connections kept open after a reconnection (#3128)

pull/3132/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
24cc62e344
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 60
      internal/api/api_test.go
  2. 40
      internal/core/api_test.go
  3. 10
      internal/core/metrics_test.go
  4. 8
      internal/core/path_test.go
  5. 4
      internal/highleveltests/hls_manager_test.go
  6. 4
      internal/servers/hls/server_test.go
  7. 16
      internal/servers/webrtc/server_test.go
  8. 11
      internal/staticsources/hls/source.go
  9. 18
      internal/staticsources/webrtc/source.go

60
internal/api/api_test.go

@ -136,7 +136,9 @@ func TestConfigGlobalGet(t *testing.T) { @@ -136,7 +136,9 @@ func TestConfigGlobalGet(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out map[string]interface{}
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/global/get", nil, &out)
@ -157,7 +159,9 @@ func TestConfigGlobalPatch(t *testing.T) { @@ -157,7 +159,9 @@ func TestConfigGlobalPatch(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPatch, "http://myuser:mypass@localhost:9997/v3/config/global/patch",
map[string]interface{}{
@ -198,7 +202,9 @@ func TestAPIConfigGlobalPatchUnknownField(t *testing.T) { //nolint:dupl @@ -198,7 +202,9 @@ func TestAPIConfigGlobalPatchUnknownField(t *testing.T) { //nolint:dupl
byts, err := json.Marshal(b)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
req, err := http.NewRequest(http.MethodPatch, "http://myuser:mypass@localhost:9997/v3/config/global/patch",
bytes.NewReader(byts))
@ -226,7 +232,9 @@ func TestAPIConfigPathDefaultsGet(t *testing.T) { @@ -226,7 +232,9 @@ func TestAPIConfigPathDefaultsGet(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out map[string]interface{}
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/pathdefaults/get", nil, &out)
@ -247,7 +255,9 @@ func TestAPIConfigPathDefaultsPatch(t *testing.T) { @@ -247,7 +255,9 @@ func TestAPIConfigPathDefaultsPatch(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPatch, "http://myuser:mypass@localhost:9997/v3/config/pathdefaults/patch",
map[string]interface{}{
@ -292,7 +302,9 @@ func TestAPIConfigPathsList(t *testing.T) { @@ -292,7 +302,9 @@ func TestAPIConfigPathsList(t *testing.T) {
Items []pathConfig `json:"items"`
}
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out listRes
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/paths/list", nil, &out)
@ -324,7 +336,9 @@ func TestAPIConfigPathsGet(t *testing.T) { @@ -324,7 +336,9 @@ func TestAPIConfigPathsGet(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out map[string]interface{}
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/config/paths/get/my/path", nil, &out)
@ -346,7 +360,9 @@ func TestAPIConfigPathsAdd(t *testing.T) { @@ -346,7 +360,9 @@ func TestAPIConfigPathsAdd(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPost, "http://myuser:mypass@localhost:9997/v3/config/paths/add/my/path",
map[string]interface{}{
@ -385,7 +401,9 @@ func TestAPIConfigPathsAddUnknownField(t *testing.T) { //nolint:dupl @@ -385,7 +401,9 @@ func TestAPIConfigPathsAddUnknownField(t *testing.T) { //nolint:dupl
byts, err := json.Marshal(b)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
req, err := http.NewRequest(http.MethodPost,
"http://myuser:mypass@localhost:9997/v3/config/paths/add/my/path", bytes.NewReader(byts))
@ -413,7 +431,9 @@ func TestAPIConfigPathsPatch(t *testing.T) { //nolint:dupl @@ -413,7 +431,9 @@ func TestAPIConfigPathsPatch(t *testing.T) { //nolint:dupl
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPost, "http://myuser:mypass@localhost:9997/v3/config/paths/add/my/path",
map[string]interface{}{
@ -451,7 +471,9 @@ func TestAPIConfigPathsReplace(t *testing.T) { //nolint:dupl @@ -451,7 +471,9 @@ func TestAPIConfigPathsReplace(t *testing.T) { //nolint:dupl
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPost, "http://myuser:mypass@localhost:9997/v3/config/paths/add/my/path",
map[string]interface{}{
@ -489,7 +511,9 @@ func TestAPIConfigPathsDelete(t *testing.T) { @@ -489,7 +511,9 @@ func TestAPIConfigPathsDelete(t *testing.T) {
require.NoError(t, err)
defer api.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPost, "http://myuser:mypass@localhost:9997/v3/config/paths/add/my/path",
map[string]interface{}{
@ -546,7 +570,9 @@ func TestRecordingsList(t *testing.T) { @@ -546,7 +570,9 @@ func TestRecordingsList(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, "mypath2", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out interface{}
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/recordings/list", nil, &out)
@ -607,7 +633,9 @@ func TestRecordingsGet(t *testing.T) { @@ -607,7 +633,9 @@ func TestRecordingsGet(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, "mypath1", "2009-11-07_11-22-00-900000.mp4"), []byte(""), 0o644)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out interface{}
httpRequest(t, hc, http.MethodGet, "http://myuser:mypass@localhost:9997/v3/recordings/get/mypath1", nil, &out)
@ -651,7 +679,9 @@ func TestRecordingsDeleteSegment(t *testing.T) { @@ -651,7 +679,9 @@ func TestRecordingsDeleteSegment(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, "mypath1", "2008-11-07_11-22-00-900000.mp4"), []byte(""), 0o644)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
u, err := url.Parse("http://myuser:mypass@localhost:9997/v3/recordings/deletesegment")
require.NoError(t, err)

40
internal/core/api_test.go

@ -97,7 +97,9 @@ func TestAPIPathsList(t *testing.T) { @@ -97,7 +97,9 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
media0 := test.UniqueMediaH264()
@ -155,7 +157,9 @@ func TestAPIPathsList(t *testing.T) { @@ -155,7 +157,9 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
source := gortsplib.Client{TLSConfig: &tls.Config{InsecureSkipVerify: true}}
err = source.StartRecording("rtsps://localhost:8322/mypath",
@ -191,7 +195,9 @@ func TestAPIPathsList(t *testing.T) { @@ -191,7 +195,9 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
@ -218,7 +224,9 @@ func TestAPIPathsList(t *testing.T) { @@ -218,7 +224,9 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
@ -245,7 +253,9 @@ func TestAPIPathsList(t *testing.T) { @@ -245,7 +253,9 @@ func TestAPIPathsList(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var out pathList
httpRequest(t, hc, http.MethodGet, "http://localhost:9997/v3/paths/list", nil, &out)
@ -271,7 +281,9 @@ func TestAPIPathsGet(t *testing.T) { @@ -271,7 +281,9 @@ func TestAPIPathsGet(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
for _, ca := range []string{"ok", "ok-nested", "not found"} {
t.Run(ca, func(t *testing.T) {
@ -371,7 +383,9 @@ func TestAPIProtocolListGet(t *testing.T) { @@ -371,7 +383,9 @@ func TestAPIProtocolListGet(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
medi := test.UniqueMediaH264()
@ -844,7 +858,9 @@ func TestAPIProtocolGetNotFound(t *testing.T) { @@ -844,7 +858,9 @@ func TestAPIProtocolGetNotFound(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var pa string
switch ca {
@ -934,7 +950,9 @@ func TestAPIProtocolKick(t *testing.T) { @@ -934,7 +950,9 @@ func TestAPIProtocolKick(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
medi := test.MediaH264
@ -1081,7 +1099,9 @@ func TestAPIProtocolKickNotFound(t *testing.T) { @@ -1081,7 +1099,9 @@ func TestAPIProtocolKickNotFound(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
var pa string
switch ca {

10
internal/core/metrics_test.go

@ -65,7 +65,9 @@ func TestMetrics(t *testing.T) { @@ -65,7 +65,9 @@ func TestMetrics(t *testing.T) {
require.Equal(t, true, ok)
defer p.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
t.Run("initial", func(t *testing.T) {
bo := httpPullFile(t, hc, "http://localhost:9998/metrics")
@ -165,8 +167,12 @@ webrtc_sessions_bytes_sent 0 @@ -165,8 +167,12 @@ webrtc_sessions_bytes_sent 0
su, err := url.Parse("http://localhost:8889/webrtc_path/whip")
require.NoError(t, err)
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
s := &webrtc.WHIPClient{
HTTPClient: &http.Client{Transport: &http.Transport{}},
HTTPClient: hc,
URL: su,
Log: test.NilLogger{},
}

8
internal/core/path_test.go

@ -397,7 +397,9 @@ func TestPathRunOnRead(t *testing.T) { @@ -397,7 +397,9 @@ func TestPathRunOnRead(t *testing.T) {
defer reader.Close()
case "webrtc":
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
u, err := url.Parse("http://localhost:8889/test/whep?query=value")
require.NoError(t, err)
@ -543,7 +545,9 @@ func TestPathRecord(t *testing.T) { @@ -543,7 +545,9 @@ func TestPathRecord(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(files))
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
httpRequest(t, hc, http.MethodPatch, "http://localhost:9997/v3/config/paths/patch/all_others", map[string]interface{}{
"record": false,

4
internal/highleveltests/hls_manager_test.go

@ -77,7 +77,9 @@ func TestHLSServerAuth(t *testing.T) { @@ -77,7 +77,9 @@ func TestHLSServerAuth(t *testing.T) {
usr = "testreader2"
}
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
res, err := hc.Get("http://" + usr + ":testpass@127.0.0.1:8888/teststream/index.m3u8?param=value")
require.NoError(t, err)

4
internal/servers/hls/server_test.go

@ -94,7 +94,9 @@ func TestServerNotFound(t *testing.T) { @@ -94,7 +94,9 @@ func TestServerNotFound(t *testing.T) {
require.NoError(t, err)
defer s.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
func() {
req, err := http.NewRequest(http.MethodGet, "http://myuser:mypass@127.0.0.1:8888/nonexisting/", nil)

16
internal/servers/webrtc/server_test.go

@ -114,7 +114,9 @@ func TestServerStaticPages(t *testing.T) { @@ -114,7 +114,9 @@ func TestServerStaticPages(t *testing.T) {
require.NoError(t, err)
defer s.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
for _, path := range []string{"/stream", "/stream/publish", "/publish"} {
func() {
@ -160,7 +162,9 @@ func TestServerPublish(t *testing.T) { @@ -160,7 +162,9 @@ func TestServerPublish(t *testing.T) {
require.NoError(t, err)
defer s.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
// preflight requests must always work, without authentication
func() {
@ -285,7 +289,9 @@ func TestServerRead(t *testing.T) { @@ -285,7 +289,9 @@ func TestServerRead(t *testing.T) {
u, err := url.Parse(ur)
require.NoError(t, err)
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
wc := &webrtc.WHIPClient{
HTTPClient: hc,
@ -370,7 +376,9 @@ func TestServerReadNotFound(t *testing.T) { @@ -370,7 +376,9 @@ func TestServerReadNotFound(t *testing.T) {
require.NoError(t, err)
defer s.Close()
hc := &http.Client{Transport: &http.Transport{}}
tr := &http.Transport{}
defer tr.CloseIdleConnections()
hc := &http.Client{Transport: tr}
iceServers, err := webrtc.WHIPOptionsICEServers(context.Background(), hc,
"http://myuser:mypass@localhost:8886/nonexisting/whep")

11
internal/staticsources/hls/source.go

@ -42,14 +42,17 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { @@ -42,14 +42,17 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
decodeErrLogger := logger.NewLimitedLogger(s)
tr := &http.Transport{
TLSClientConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
}
defer tr.CloseIdleConnections()
var c *gohlslib.Client
c = &gohlslib.Client{
URI: s.ResolvedSource,
HTTPClient: &http.Client{
Timeout: time.Duration(s.ReadTimeout),
Transport: &http.Transport{
TLSClientConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
},
Timeout: time.Duration(s.ReadTimeout),
Transport: tr,
},
OnDownloadPrimaryPlaylist: func(u string) {
s.Log(logger.Debug, "downloading primary playlist %v", u)

18
internal/staticsources/webrtc/source.go

@ -40,18 +40,18 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error { @@ -40,18 +40,18 @@ func (s *Source) Run(params defs.StaticSourceRunParams) error {
u.Scheme = strings.ReplaceAll(u.Scheme, "whep", "http")
hc := &http.Client{
Timeout: time.Duration(s.ReadTimeout),
Transport: &http.Transport{
TLSClientConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
},
tr := &http.Transport{
TLSClientConfig: tls.ConfigForFingerprint(params.Conf.SourceFingerprint),
}
defer hc.CloseIdleConnections()
defer tr.CloseIdleConnections()
client := webrtc.WHIPClient{
HTTPClient: hc,
URL: u,
Log: s,
HTTPClient: &http.Client{
Timeout: time.Duration(s.ReadTimeout),
Transport: tr,
},
URL: u,
Log: s,
}
tracks, err := client.Read(params.Context)

Loading…
Cancel
Save