From ad52b3fab7c7463b967c0ff3bf665c44dd1b08c2 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Tue, 27 Dec 2022 13:55:30 +0100 Subject: [PATCH] Support publishing with RTMP and H265 (for OBS Studio) (#1333) * support publishing with RTMP and H265 (for OBS Studio) * rtmp source: block H265 tracks --- README.md | 2 +- internal/core/rtmp_conn.go | 39 +++-- internal/core/rtmp_source.go | 4 + internal/rtmp/conn.go | 80 +++++++--- internal/rtmp/conn_test.go | 277 ++++++++++++++++++++++++----------- 5 files changed, 283 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index a49543d7..77ae556e 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Live streams can be published to the server with: |--------|--------|------| |RTSP clients (FFmpeg, GStreamer, etc)|UDP, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG2, M-JPEG, MP3, MPEG4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| |RTSP servers and cameras|UDP, UDP-Multicast, TCP, RTSPS|H264, H265, VP8, VP9, AV1, MPEG2, M-JPEG, MP3, MPEG4 Audio (AAC), Opus, G711, G722, LPCM and any RTP-compatible codec| -|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| +|RTMP clients (OBS Studio)|RTMP, RTMPS|H264, H265, MPEG4 Audio (AAC)| |RTMP servers and cameras|RTMP, RTMPS|H264, MPEG4 Audio (AAC)| |HLS servers and cameras|Low-Latency HLS, MP4-based HLS, legacy HLS|H264, MPEG4 Audio (AAC)| |Raspberry Pi Cameras||H264| diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 9d154f74..cdcd5e64 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -526,6 +526,32 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { // disable write deadline to allow outgoing acknowledges c.nconn.SetWriteDeadline(time.Time{}) + var onVideoData func(time.Duration, [][]byte) + + if _, ok := videoFormat.(*format.H264); ok { + onVideoData = func(pts time.Duration, nalus [][]byte) { + err = rres.stream.writeData(videoMedia, videoFormat, &dataH264{ + pts: pts, + nalus: nalus, + ntp: time.Now(), + }) + if err != nil { + c.log(logger.Warn, "%v", err) + } + } + } else { + onVideoData = func(pts time.Duration, nalus [][]byte) { + err = rres.stream.writeData(videoMedia, videoFormat, &dataH265{ + pts: pts, + nalus: nalus, + ntp: time.Now(), + }) + if err != nil { + c.log(logger.Warn, "%v", err) + } + } + } + for { c.nconn.SetReadDeadline(time.Now().Add(time.Duration(c.readTimeout))) msg, err := c.conn.ReadMessage() @@ -557,7 +583,7 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } } else if tmsg.H264Type == flvio.AVC_NALU { if videoFormat == nil { - return fmt.Errorf("received an H264 packet, but track is not set up") + return fmt.Errorf("received a video packet, but track is not set up") } nalus, err := h264.AVCCUnmarshal(tmsg.Payload) @@ -585,20 +611,13 @@ func (c *rtmpConn) runPublish(ctx context.Context, u *url.URL) error { } } - err = rres.stream.writeData(videoMedia, videoFormat, &dataH264{ - pts: tmsg.DTS + tmsg.PTSDelta, - nalus: validNALUs, - ntp: time.Now(), - }) - if err != nil { - c.log(logger.Warn, "%v", err) - } + onVideoData(tmsg.DTS+tmsg.PTSDelta, validNALUs) } case *message.MsgAudio: if tmsg.AACType == flvio.AAC_RAW { if audioFormat == nil { - return fmt.Errorf("received an AAC packet, but track is not set up") + return fmt.Errorf("received an audio packet, but track is not set up") } err := rres.stream.writeData(audioMedia, audioFormat, &dataMPEG4Audio{ diff --git a/internal/core/rtmp_source.go b/internal/core/rtmp_source.go index 57a0336f..0d2c8dc6 100644 --- a/internal/core/rtmp_source.go +++ b/internal/core/rtmp_source.go @@ -121,6 +121,10 @@ func (s *rtmpSource) run(ctx context.Context) error { return err } + if _, ok := videoFormat.(*format.H265); ok { + return fmt.Errorf("proxying H265 streams with RTMP is not supported") + } + var medias media.Medias var videoMedia *media.Media var audioMedia *media.Media diff --git a/internal/rtmp/conn.go b/internal/rtmp/conn.go index bbf825fe..12a1926c 100644 --- a/internal/rtmp/conn.go +++ b/internal/rtmp/conn.go @@ -10,6 +10,8 @@ import ( "time" "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/h264" + "github.com/aler9/gortsplib/v2/pkg/h265" "github.com/aler9/gortsplib/v2/pkg/mpeg4audio" "github.com/notedit/rtmp/format/flv/flvio" @@ -613,7 +615,7 @@ func trackFromAACDecoderConfig(data []byte) (*format.MPEG4Audio, error) { var errEmptyMetadata = errors.New("metadata is empty") -func (c *Conn) readTracksFromMetadata(payload []interface{}) (*format.H264, *format.MPEG4Audio, error) { +func (c *Conn) readTracksFromMetadata(payload []interface{}) (format.Format, *format.MPEG4Audio, error) { if len(payload) != 1 { return nil, nil, fmt.Errorf("invalid metadata") } @@ -683,7 +685,7 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (*format.H264, *for return nil, nil, errEmptyMetadata } - var videoTrack *format.H264 + var videoTrack format.Format var audioTrack *format.MPEG4Audio for { @@ -694,34 +696,63 @@ func (c *Conn) readTracksFromMetadata(payload []interface{}) (*format.H264, *for switch tmsg := msg.(type) { case *message.MsgVideo: - if tmsg.H264Type == flvio.AVC_SEQHDR { - if !hasVideo { - return nil, nil, fmt.Errorf("unexpected video packet") - } + if !hasVideo { + return nil, nil, fmt.Errorf("unexpected video packet") + } - if videoTrack != nil { - return nil, nil, fmt.Errorf("video track setupped twice") - } + if videoTrack == nil { + if tmsg.H264Type == flvio.AVC_SEQHDR { + videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload) + if err != nil { + return nil, nil, err + } + } else if tmsg.H264Type == 1 && tmsg.IsKeyFrame { + nalus, err := h264.AVCCUnmarshal(tmsg.Payload) + if err != nil { + return nil, nil, err + } - videoTrack, err = trackFromH264DecoderConfig(tmsg.Payload) - if err != nil { - return nil, nil, err + var h265VPS []byte + var h265SPS []byte + var h265PPS []byte + + for _, nalu := range nalus { + typ := h265.NALUType((nalu[0] >> 1) & 0b111111) + + switch typ { + case h265.NALUTypeVPS: + h265VPS = append([]byte(nil), nalu...) + + case h265.NALUTypeSPS: + h265SPS = append([]byte(nil), nalu...) + + case h265.NALUTypePPS: + h265PPS = append([]byte(nil), nalu...) + } + } + + if h265VPS != nil && h265SPS != nil && h265PPS != nil { + videoTrack = &format.H265{ + PayloadTyp: 96, + VPS: h265VPS, + SPS: h265SPS, + PPS: h265PPS, + } + } } } case *message.MsgAudio: - if tmsg.AACType == flvio.AVC_SEQHDR { - if !hasAudio { - return nil, nil, fmt.Errorf("unexpected audio packet") - } - - if audioTrack != nil { - return nil, nil, fmt.Errorf("audio track setupped twice") - } + if !hasAudio { + return nil, nil, fmt.Errorf("unexpected audio packet") + } - audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload) - if err != nil { - return nil, nil, err + if audioTrack == nil { + if tmsg.AACType == flvio.AVC_SEQHDR { + audioTrack, err = trackFromAACDecoderConfig(tmsg.Payload) + if err != nil { + return nil, nil, err + } } } } @@ -808,7 +839,8 @@ outer: } // ReadTracks reads track informations. -func (c *Conn) ReadTracks() (*format.H264, *format.MPEG4Audio, error) { +// It returns the video track and the audio track. +func (c *Conn) ReadTracks() (format.Format, *format.MPEG4Audio, error) { msg, err := func() (message.Message, error) { for { msg, err := c.ReadMessage() diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index 2ed7c438..f5dc6737 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/aler9/gortsplib/v2/pkg/format" + "github.com/aler9/gortsplib/v2/pkg/h264" "github.com/aler9/gortsplib/v2/pkg/mpeg4audio" "github.com/notedit/rtmp/format/flv/flvio" "github.com/stretchr/testify/require" @@ -471,13 +472,118 @@ func TestReadTracks(t *testing.T) { 0x68, 0xee, 0x3c, 0x80, } - for _, ca := range []string{ - "video+audio", - "video", - "metadata without codec id", - "missing metadata", + for _, ca := range []struct { + name string + videoTrack format.Format + audioTrack format.Format + }{ + { + "video+audio", + &format.H264{ + PayloadTyp: 96, + SPS: sps, + PPS: pps, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }, + }, + { + "video", + &format.H264{ + PayloadTyp: 96, + SPS: sps, + PPS: pps, + PacketizationMode: 1, + }, + (*format.MPEG4Audio)(nil), + }, + { + "metadata without codec id", + &format.H264{ + PayloadTyp: 96, + SPS: sps, + PPS: pps, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }, + }, + { + "missing metadata", + &format.H264{ + PayloadTyp: 96, + SPS: sps, + PPS: pps, + PacketizationMode: 1, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }, + }, + { + "obs studio h265", + &format.H265{ + PayloadTyp: 96, + VPS: []byte{ + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x01, 0x40, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x7b, 0xac, 0x09, + }, + SPS: []byte{ + 0x42, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x7b, 0xa0, 0x03, 0xc0, 0x80, 0x11, + 0x07, 0xcb, 0x96, 0xb4, 0xa4, 0x25, 0x92, 0xe3, + 0x01, 0x6a, 0x02, 0x02, 0x02, 0x08, 0x00, 0x00, + 0x03, 0x00, 0x08, 0x00, 0x00, 0x03, 0x01, 0xe3, + 0x00, 0x2e, 0xf2, 0x88, 0x00, 0x09, 0x89, 0x60, + 0x00, 0x04, 0xc4, 0xb4, 0x20, + }, + PPS: []byte{ + 0x44, 0x01, 0xc0, 0xf7, 0xc0, 0xcc, 0x90, + }, + }, + &format.MPEG4Audio{ + PayloadTyp: 96, + Config: &mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }, + SizeLength: 13, + IndexLength: 3, + IndexDeltaLength: 3, + }, + }, } { - t.Run(ca, func(t *testing.T) { + t.Run(ca.name, func(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:9121") require.NoError(t, err) defer ln.Close() @@ -495,79 +601,8 @@ func TestReadTracks(t *testing.T) { videoTrack, audioTrack, err := rconn.ReadTracks() require.NoError(t, err) - - switch ca { - case "video+audio": - require.Equal(t, &format.H264{ - PayloadTyp: 96, - SPS: sps, - PPS: pps, - PacketizationMode: 1, - }, videoTrack) - - require.Equal(t, &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - }, audioTrack) - - case "video": - require.Equal(t, &format.H264{ - PayloadTyp: 96, - SPS: sps, - PPS: pps, - PacketizationMode: 1, - }, videoTrack) - - require.Nil(t, audioTrack) - - case "metadata without codec id": - require.Equal(t, &format.H264{ - PayloadTyp: 96, - SPS: sps, - PPS: pps, - PacketizationMode: 1, - }, videoTrack) - - require.Equal(t, &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - }, audioTrack) - - case "missing metadata": - require.Equal(t, &format.H264{ - PayloadTyp: 96, - SPS: sps, - PPS: pps, - PacketizationMode: 1, - }, videoTrack) - - require.Equal(t, &format.MPEG4Audio{ - PayloadTyp: 96, - Config: &mpeg4audio.Config{ - Type: 2, - SampleRate: 44100, - ChannelCount: 2, - }, - SizeLength: 13, - IndexLength: 3, - IndexDeltaLength: 3, - }, audioTrack) - } - + require.Equal(t, ca.videoTrack, videoTrack) + require.Equal(t, ca.audioTrack, audioTrack) close(done) }() @@ -718,9 +753,8 @@ func TestReadTracks(t *testing.T) { }, }, msg) - switch ca { + switch ca.name { case "video+audio": - err = mrw.Write(&message.MsgDataAMF0{ ChunkStreamID: 4, MessageStreamID: 1, @@ -780,7 +814,6 @@ func TestReadTracks(t *testing.T) { require.NoError(t, err) case "video": - err = mrw.Write(&message.MsgDataAMF0{ ChunkStreamID: 4, MessageStreamID: 1, @@ -823,7 +856,6 @@ func TestReadTracks(t *testing.T) { require.NoError(t, err) case "metadata without codec id": - err = mrw.Write(&message.MsgDataAMF0{ ChunkStreamID: 4, MessageStreamID: 1, @@ -879,7 +911,6 @@ func TestReadTracks(t *testing.T) { require.NoError(t, err) case "missing metadata": - buf, _ := h264conf.Conf{ SPS: sps, PPS: pps, @@ -893,6 +924,84 @@ func TestReadTracks(t *testing.T) { }) require.NoError(t, err) + enc, err := mpeg4audio.Config{ + Type: 2, + SampleRate: 44100, + ChannelCount: 2, + }.Marshal() + require.NoError(t, err) + err = mrw.Write(&message.MsgAudio{ + ChunkStreamID: message.MsgAudioChunkStreamID, + MessageStreamID: 0x1000000, + Rate: flvio.SOUND_44Khz, + Depth: flvio.SOUND_16BIT, + Channels: flvio.SOUND_STEREO, + AACType: flvio.AAC_SEQHDR, + Payload: enc, + }) + require.NoError(t, err) + + case "obs studio h265": + err = mrw.Write(&message.MsgDataAMF0{ + ChunkStreamID: 4, + MessageStreamID: 1, + Payload: []interface{}{ + "@setDataFrame", + "onMetaData", + flvio.AMFMap{ + { + K: "videodatarate", + V: float64(0), + }, + { + K: "videocodecid", + V: float64(codecH264), + }, + { + K: "audiodatarate", + V: float64(0), + }, + { + K: "audiocodecid", + V: float64(codecAAC), + }, + }, + }, + }) + require.NoError(t, err) + + avcc, err := h264.AVCCMarshal([][]byte{ + { // VPS + 0x40, 0x01, 0x0c, 0x01, 0xff, 0xff, 0x01, 0x40, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x7b, 0xac, 0x09, + }, + { // SPS + 0x42, 0x01, 0x01, 0x01, 0x40, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x7b, 0xa0, 0x03, 0xc0, 0x80, 0x11, + 0x07, 0xcb, 0x96, 0xb4, 0xa4, 0x25, 0x92, 0xe3, + 0x01, 0x6a, 0x02, 0x02, 0x02, 0x08, 0x00, 0x00, + 0x03, 0x00, 0x08, 0x00, 0x00, 0x03, 0x01, 0xe3, + 0x00, 0x2e, 0xf2, 0x88, 0x00, 0x09, 0x89, 0x60, + 0x00, 0x04, 0xc4, 0xb4, 0x20, + }, + { + // PPS + 0x44, 0x01, 0xc0, 0xf7, 0xc0, 0xcc, 0x90, + }, + }) + require.NoError(t, err) + + err = mrw.Write(&message.MsgVideo{ + ChunkStreamID: message.MsgVideoChunkStreamID, + MessageStreamID: 0x1000000, + IsKeyFrame: true, + H264Type: 1, + Payload: avcc, + }) + require.NoError(t, err) + enc, err := mpeg4audio.Config{ Type: 2, SampleRate: 44100,