Browse Source

speed up RTMP sources

pull/235/head
aler9 5 years ago
parent
commit
49ac52ff67
  1. 296
      internal/sourcertmp/source.go
  2. 12
      main_test.go
  3. 2
      testimages/ffmpeg/Dockerfile
  4. BIN
      testimages/ffmpeg/emptyvideoaudio.ts

296
internal/sourcertmp/source.go

@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/notedit/rtmp/av"
"github.com/notedit/rtmp/codec/h264"
"github.com/notedit/rtmp/format/flv/flvio"
"github.com/notedit/rtmp/format/rtmp"
"github.com/aler9/rtsp-simple-server/internal/logger"
@ -20,8 +21,10 @@ import ( @@ -20,8 +21,10 @@ import (
)
const (
retryPause = 5 * time.Second
analyzeTimeout = 8 * time.Second
retryPause = 5 * time.Second
codecH264 = 7
codecAAC = 10
)
// Parent is implemeneted by path.Path.
@ -107,6 +110,33 @@ func (s *Source) run() { @@ -107,6 +110,33 @@ func (s *Source) run() {
}
}
func readMetadata(conn *rtmp.Conn) (flvio.AMFMap, error) {
pkt, err := conn.ReadPacket()
if err != nil {
return nil, err
}
if pkt.Type != av.Metadata {
return nil, fmt.Errorf("first packet must be metadata")
}
arr, err := flvio.ParseAMFVals(pkt.Data, false)
if err != nil {
return nil, err
}
if len(arr) != 1 {
return nil, fmt.Errorf("invalid metadata")
}
ma, ok := arr[0].(flvio.AMFMap)
if !ok {
return nil, fmt.Errorf("invalid metadata")
}
return ma, nil
}
func (s *Source) runInner() bool {
s.log(logger.Info, "connecting")
@ -130,115 +160,130 @@ func (s *Source) runInner() bool { @@ -130,115 +160,130 @@ func (s *Source) runInner() bool {
return true
}
// gather video and audio features
var h264Sps []byte
var h264Pps []byte
var aacConfig []byte
confDone := make(chan struct{})
confClose := uint32(0)
go func() {
defer close(confDone)
var tracks gortsplib.Tracks
for {
var pkt av.Packet
pkt, err = conn.ReadPacket()
if err != nil {
return
}
var videoTrack *gortsplib.Track
var videoRTCPSender *rtcpsender.RTCPSender
var h264Encoder *rtph264.Encoder
if atomic.LoadUint32(&confClose) > 0 {
return
var audioTrack *gortsplib.Track
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
confDone := make(chan error)
go func() {
confDone <- func() error {
md, err := readMetadata(conn)
if err != nil {
return err
}
switch pkt.Type {
case av.H264DecoderConfig:
codec, err := h264.FromDecoderConfig(pkt.Data)
if err != nil {
panic(err)
hasVideo := false
if v, ok := md.GetFloat64("videocodecid"); ok {
switch v {
case codecH264:
hasVideo = true
case 0:
default:
return fmt.Errorf("unsupported video codec %v", v)
}
h264Sps, h264Pps = codec.SPS[0], codec.PPS[0]
}
if aacConfig != nil {
return
hasAudio := false
if v, ok := md.GetFloat64("audiocodecid"); ok {
switch v {
case codecAAC:
hasAudio = true
case 0:
default:
return fmt.Errorf("unsupported audio codec %v", v)
}
}
case av.AACDecoderConfig:
aacConfig = pkt.Data
if !hasVideo && !hasAudio {
return fmt.Errorf("stream has no tracks")
}
if h264Sps != nil {
return
for {
var pkt av.Packet
pkt, err = conn.ReadPacket()
if err != nil {
return err
}
}
}
}()
timer := time.NewTimer(analyzeTimeout)
defer timer.Stop()
switch pkt.Type {
case av.H264DecoderConfig:
if !hasVideo {
return fmt.Errorf("unexpected video packet")
}
if videoTrack != nil {
return fmt.Errorf("video track setupped twice")
}
select {
case <-confDone:
case <-timer.C:
atomic.StoreUint32(&confClose, 1)
<-confDone
}
codec, err := h264.FromDecoderConfig(pkt.Data)
if err != nil {
return err
}
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
videoTrack, err = gortsplib.NewTrackH264(96, codec.SPS[0], codec.PPS[0])
if err != nil {
return err
}
var tracks gortsplib.Tracks
clockRate, _ := videoTrack.ClockRate()
videoRTCPSender = rtcpsender.New(clockRate)
var videoTrack *gortsplib.Track
var videoRTCPSender *rtcpsender.RTCPSender
var h264Encoder *rtph264.Encoder
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
return err
}
var audioTrack *gortsplib.Track
var audioRTCPSender *rtcpsender.RTCPSender
var aacEncoder *rtpaac.Encoder
tracks = append(tracks, videoTrack)
if h264Sps != nil {
videoTrack, err = gortsplib.NewTrackH264(96, h264Sps, h264Pps)
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
case av.AACDecoderConfig:
if !hasAudio {
return fmt.Errorf("unexpected audio packet")
}
if audioTrack != nil {
return fmt.Errorf("audio track setupped twice")
}
clockRate, _ := videoTrack.ClockRate()
videoRTCPSender = rtcpsender.New(clockRate)
audioTrack, err = gortsplib.NewTrackAAC(96, pkt.Data)
if err != nil {
return err
}
h264Encoder, err = rtph264.NewEncoder(96)
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
clockRate, _ := audioTrack.ClockRate()
audioRTCPSender = rtcpsender.New(clockRate)
tracks = append(tracks, videoTrack)
}
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
if err != nil {
return err
}
if aacConfig != nil {
audioTrack, err = gortsplib.NewTrackAAC(96, aacConfig)
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
tracks = append(tracks, audioTrack)
}
clockRate, _ := audioTrack.ClockRate()
audioRTCPSender = rtcpsender.New(clockRate)
if (!hasVideo || videoTrack != nil) &&
(!hasAudio || audioTrack != nil) {
return nil
}
}
}()
}()
aacEncoder, err = rtpaac.NewEncoder(96, clockRate)
select {
case err := <-confDone:
if err != nil {
s.log(logger.Info, "ERR: %s", err)
return true
}
tracks = append(tracks, audioTrack)
}
if len(tracks) == 0 {
s.log(logger.Info, "ERR: no tracks found")
return true
case <-s.terminate:
nconn.Close()
<-confDone
return false
}
for i, t := range tracks {
@ -284,61 +329,56 @@ func (s *Source) runInner() bool { @@ -284,61 +329,56 @@ func (s *Source) runInner() bool {
readerDone := make(chan error)
go func() {
for {
pkt, err := conn.ReadPacket()
if err != nil {
readerDone <- err
return
}
switch pkt.Type {
case av.H264:
if h264Sps == nil {
readerDone <- fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up")
return
readerDone <- func() error {
for {
pkt, err := conn.ReadPacket()
if err != nil {
return err
}
// decode from AVCC format
nalus, typ := h264.SplitNALUs(pkt.Data)
if typ != h264.NALU_AVCC {
readerDone <- fmt.Errorf("invalid NALU format (%d)", typ)
return
}
switch pkt.Type {
case av.H264:
if videoTrack == nil {
return fmt.Errorf("rtmp source ERR: received an H264 frame, but track is not setup up")
}
// encode into RTP/H264 format
frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus)
if err != nil {
readerDone <- err
return
}
// decode from AVCC format
nalus, typ := h264.SplitNALUs(pkt.Data)
if typ != h264.NALU_AVCC {
return fmt.Errorf("invalid NALU format (%d)", typ)
}
for _, f := range frames {
videoRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f)
}
// encode into RTP/H264 format
frames, err := h264Encoder.Write(pkt.Time+pkt.CTime, nalus)
if err != nil {
return err
}
case av.AAC:
if aacConfig == nil {
readerDone <- fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up")
return
}
for _, f := range frames {
videoRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(videoTrack.ID, gortsplib.StreamTypeRTP, f)
}
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data)
if err != nil {
readerDone <- err
return
}
case av.AAC:
if audioTrack == nil {
return fmt.Errorf("rtmp source ERR: received an AAC frame, but track is not setup up")
}
for _, f := range frames {
audioRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f)
}
frames, err := aacEncoder.Write(pkt.Time+pkt.CTime, pkt.Data)
if err != nil {
return err
}
default:
readerDone <- fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type)
return
for _, f := range frames {
audioRTCPSender.ProcessFrame(time.Now(), gortsplib.StreamTypeRTP, f)
s.parent.OnFrame(audioTrack.ID, gortsplib.StreamTypeRTP, f)
}
default:
return fmt.Errorf("rtmp source ERR: unexpected packet: %v", pkt.Type)
}
}
}
}()
}()
for {

12
main_test.go

@ -651,7 +651,8 @@ func TestSource(t *testing.T) { @@ -651,7 +651,8 @@ func TestSource(t *testing.T) {
"rtsp_udp",
"rtsp_tcp",
"rtsps",
"rtmp",
"rtmp_videoaudio",
"rtmp_video",
} {
t.Run(source, func(t *testing.T) {
switch source {
@ -730,15 +731,20 @@ func TestSource(t *testing.T) { @@ -730,15 +731,20 @@ func TestSource(t *testing.T) {
require.Equal(t, true, ok)
defer p2.close()
case "rtmp":
case "rtmp_videoaudio", "rtmp_video":
cnt1, err := newContainer("nginx-rtmp", "rtmpserver", []string{})
require.NoError(t, err)
defer cnt1.close()
input := "emptyvideoaudio.ts"
if source == "rtmp_video" {
input = "emptyvideo.ts"
}
cnt2, err := newContainer("ffmpeg", "source", []string{
"-re",
"-stream_loop", "-1",
"-i", "emptyvideo.ts",
"-i", input,
"-c", "copy",
"-f", "flv",
"rtmp://" + cnt1.ip() + "/stream/test",

2
testimages/ffmpeg/Dockerfile

@ -3,7 +3,7 @@ FROM amd64/alpine:3.12 @@ -3,7 +3,7 @@ FROM amd64/alpine:3.12
RUN apk add --no-cache \
ffmpeg
COPY emptyvideo.ts /
COPY *.ts /
COPY start.sh /
RUN chmod +x /start.sh

BIN
testimages/ffmpeg/emptyvideoaudio.ts

Binary file not shown.
Loading…
Cancel
Save