Browse Source

srt: fix memory leak during reader disconnection (#2273)

pull/2276/head
Alessandro Ros 2 years ago committed by GitHub
parent
commit
95baade478
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      internal/core/hls_muxer.go
  2. 4
      internal/core/rtmp_conn.go
  3. 54
      internal/core/srt_conn.go
  4. 52
      internal/core/udp_source.go
  5. 4
      internal/core/webrtc_session.go

4
internal/core/hls_muxer.go

@ -256,6 +256,8 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -256,6 +256,8 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
m.writer = asyncwriter.New(m.writeQueueSize, m)
defer res.stream.RemoveReader(m.writer)
var medias []*description.Media
videoMedia, videoTrack := m.createVideoTrack(res.stream)
@ -268,8 +270,6 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) @@ -268,8 +270,6 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{})
medias = append(medias, audioMedia)
}
defer res.stream.RemoveReader(m.writer)
if medias == nil {
return fmt.Errorf(
"the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG-4 Audio")

4
internal/core/rtmp_conn.go

@ -243,6 +243,8 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -243,6 +243,8 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
writer := asyncwriter.New(c.writeQueueSize, c)
defer res.stream.RemoveReader(writer)
var medias []*description.Media
var w *rtmp.Writer
@ -267,8 +269,6 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { @@ -267,8 +269,6 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error {
"the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio")
}
defer res.stream.RemoveReader(writer)
c.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(medias))

54
internal/core/srt_conn.go

@ -256,17 +256,16 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { @@ -256,17 +256,16 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
var medi *description.Media
switch tcodec := track.Codec.(type) {
case *mpegts.CodecH264:
case *mpegts.CodecH265:
medi = &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.H264{
stream.WriteUnit(medi, medi.Formats[0], &unit.H265{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
@ -276,16 +275,17 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { @@ -276,16 +275,17 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
return nil
})
case *mpegts.CodecH265:
case *mpegts.CodecH264:
medi = &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.H265{
stream.WriteUnit(medi, medi.Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
@ -295,45 +295,45 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { @@ -295,45 +295,45 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error {
return nil
})
case *mpegts.CodecMPEG4Audio:
case *mpegts.CodecOpus:
medi = &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
Config: &tcodec.Config,
Formats: []format.Format{&format.Opus{
PayloadTyp: 96,
IsStereo: (tcodec.ChannelCount == 2),
}},
}
r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{
r.OnDataOpus(track, func(pts int64, packets [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
},
AUs: aus,
Packets: packets,
})
return nil
})
case *mpegts.CodecOpus:
case *mpegts.CodecMPEG4Audio:
medi = &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.Opus{
PayloadTyp: 96,
IsStereo: (tcodec.ChannelCount == 2),
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
Config: &tcodec.Config,
}},
}
r.OnDataOpus(track, func(pts int64, packets [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{
r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
},
Packets: packets,
AUs: aus,
})
return nil
})
@ -424,6 +424,8 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass @@ -424,6 +424,8 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass
writer := asyncwriter.New(c.writeQueueSize, c)
defer res.stream.RemoveReader(writer)
var w *mpegts.Writer
var tracks []*mpegts.Track
var medias []*description.Media

52
internal/core/udp_source.go

@ -161,17 +161,16 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -161,17 +161,16 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
var medi *description.Media
switch tcodec := track.Codec.(type) {
case *mpegts.CodecH264:
case *mpegts.CodecH265:
medi = &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
}},
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.H264{
stream.WriteUnit(medi, medi.Formats[0], &unit.H265{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
@ -181,16 +180,17 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -181,16 +180,17 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
return nil
})
case *mpegts.CodecH265:
case *mpegts.CodecH264:
medi = &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{&format.H265{
PayloadTyp: 96,
Formats: []format.Format{&format.H264{
PayloadTyp: 96,
PacketizationMode: 1,
}},
}
r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.H265{
stream.WriteUnit(medi, medi.Formats[0], &unit.H264{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
@ -200,45 +200,45 @@ func (s *udpSource) runReader(pc net.PacketConn) error { @@ -200,45 +200,45 @@ func (s *udpSource) runReader(pc net.PacketConn) error {
return nil
})
case *mpegts.CodecMPEG4Audio:
case *mpegts.CodecOpus:
medi = &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
Config: &tcodec.Config,
Formats: []format.Format{&format.Opus{
PayloadTyp: 96,
IsStereo: (tcodec.ChannelCount == 2),
}},
}
r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{
r.OnDataOpus(track, func(pts int64, packets [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
},
AUs: aus,
Packets: packets,
})
return nil
})
case *mpegts.CodecOpus:
case *mpegts.CodecMPEG4Audio:
medi = &description.Media{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.Opus{
PayloadTyp: 96,
IsStereo: (tcodec.ChannelCount == 2),
Formats: []format.Format{&format.MPEG4Audio{
PayloadTyp: 96,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
Config: &tcodec.Config,
}},
}
r.OnDataOpus(track, func(pts int64, packets [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{
r.OnDataMPEG4Audio(track, func(pts int64, aus [][]byte) error {
stream.WriteUnit(medi, medi.Formats[0], &unit.MPEG4AudioGeneric{
Base: unit.Base{
NTP: time.Now(),
PTS: decodeTime(pts),
},
Packets: packets,
AUs: aus,
})
return nil
})

4
internal/core/webrtc_session.go

@ -514,12 +514,12 @@ func (s *webRTCSession) runRead() (int, error) { @@ -514,12 +514,12 @@ func (s *webRTCSession) runRead() (int, error) {
writer := asyncwriter.New(s.writeQueueSize, s)
defer res.stream.RemoveReader(writer)
for _, track := range tracks {
track.start(res.stream, writer)
}
defer res.stream.RemoveReader(writer)
s.Log(logger.Info, "is reading from path '%s', %s",
res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))

Loading…
Cancel
Save