diff --git a/internal/clientrtmp/client.go b/internal/clientrtmp/client.go index 5d110931..4b46f24a 100644 --- a/internal/clientrtmp/client.go +++ b/internal/clientrtmp/client.go @@ -57,7 +57,7 @@ func pathNameAndQuery(inURL *url.URL) (string, url.Values) { return pathName, ur.Query() } -type trackIDBufPair struct { +type trackIDPayloadPair struct { trackID int buf []byte } @@ -82,8 +82,8 @@ type Client struct { readBufferCount int runOnConnect string runOnConnectRestart bool - stats *stats.Stats wg *sync.WaitGroup + stats *stats.Stats conn *rtmp.Conn pathMan PathMan parent Parent @@ -293,8 +293,7 @@ func (c *Client) runRead() { if !ok { return fmt.Errorf("terminated") } - - pair := data.(trackIDBufPair) + pair := data.(trackIDPayloadPair) now := time.Now() @@ -308,14 +307,22 @@ func (c *Client) runRead() { } for _, nt := range nts { + // remove SPS, PPS and AUD, not needed by RTMP + typ := h264.NALUType(nt.NALU[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: + continue + } + if !videoInitialized { videoInitialized = true videoStartDTS = now videoPTS = nt.Timestamp } - // aggregate NALUs by PTS - // this delays the stream by one frame, but is required by RTMP + // aggregate NALUs by PTS. + // for instance, aggregate a SEI and a IDR. + // this delays the stream by one frame, but is required by RTMP. if nt.Timestamp != videoPTS { c.conn.NetConn().SetWriteDeadline(time.Now().Add(c.writeTimeout)) err := c.conn.WriteH264(videoBuf, now.Sub(videoStartDTS)) @@ -515,10 +522,17 @@ func (c *Client) runPublish() { ts := pkt.Time + pkt.CTime var nts []*rtph264.NALUAndTimestamp - for _, nt := range nalus { + for _, nalu := range nalus { + // remove SPS, PPS and AUD, not needed by RTSP / RTMP + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS, h264.NALUTypePPS, h264.NALUTypeAccessUnitDelimiter: + continue + } + nts = append(nts, &rtph264.NALUAndTimestamp{ Timestamp: ts, - NALU: nt, + NALU: nalu, }) } @@ -616,8 +630,8 @@ func (c *Client) Authenticate(authMethods []headers.AuthMethod, } // OnFrame implements path.Reader. -func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, buf []byte) { +func (c *Client) OnFrame(trackID int, streamType gortsplib.StreamType, payload []byte) { if streamType == gortsplib.StreamTypeRTP { - c.ringBuffer.Push(trackIDBufPair{trackID, buf}) + c.ringBuffer.Push(trackIDPayloadPair{trackID, payload}) } } diff --git a/internal/h264/annexb.go b/internal/h264/annexb.go index 4da4e546..a29e608f 100644 --- a/internal/h264/annexb.go +++ b/internal/h264/annexb.go @@ -4,93 +4,6 @@ import ( "fmt" ) -func removeAntiCompetition(nalu []byte) []byte { - // 0x00 0x00 0x03 0x00 -> 0x00 0x00 0x00 - // 0x00 0x00 0x03 0x01 -> 0x00 0x00 0x01 - // 0x00 0x00 0x03 0x02 -> 0x00 0x00 0x02 - // 0x00 0x00 0x03 0x03 -> 0x00 0x00 0x03 - - var ret []byte - step := 0 - start := 0 - - for i, b := range nalu { - switch step { - case 0: - if b == 0 { - step++ - } - - case 1: - if b == 0 { - step++ - } else { - step = 0 - } - - case 2: - if b == 3 { - step++ - } else { - step = 0 - } - - case 3: - switch b { - case 3, 2, 1, 0: - ret = append(ret, nalu[start:i-3]...) - ret = append(ret, []byte{0x00, 0x00, b}...) - step = 0 - start = i + 1 - - default: - step = 0 - } - } - } - - ret = append(ret, nalu[start:]...) - - return ret -} - -func addAntiCompetition(dest []byte, nalu []byte) []byte { - step := 0 - start := 0 - - for i, b := range nalu { - switch step { - case 0: - if b == 0 { - step++ - } - - case 1: - if b == 0 { - step++ - } else { - step = 0 - } - - case 2: - switch b { - case 3, 2, 1, 0: - dest = append(dest, nalu[start:i-2]...) - dest = append(dest, []byte{0x00, 0x00, 0x03, b}...) - step = 0 - start = i + 1 - - default: - step = 0 - } - } - } - - dest = append(dest, nalu[start:]...) - - return dest -} - // DecodeAnnexB decodes NALUs from the Annex-B code stream format. func DecodeAnnexB(byts []byte) ([][]byte, error) { bl := len(byts) @@ -134,7 +47,7 @@ func DecodeAnnexB(byts []byte) ([][]byte, error) { if len(nalu) == 0 { return nil, fmt.Errorf("empty NALU") } - ret = append(ret, removeAntiCompetition(nalu)) + ret = append(ret, nalu) start = i + 1 } zeros = 0 @@ -148,7 +61,7 @@ func DecodeAnnexB(byts []byte) ([][]byte, error) { if len(nalu) == 0 { return nil, fmt.Errorf("empty NALU") } - ret = append(ret, removeAntiCompetition(nalu)) + ret = append(ret, nalu) return ret, nil } @@ -159,7 +72,7 @@ func EncodeAnnexB(nalus [][]byte) ([]byte, error) { for _, nalu := range nalus { ret = append(ret, []byte{0x00, 0x00, 0x00, 0x01}...) - ret = addAntiCompetition(ret, nalu) + ret = append(ret, nalu...) } return ret, nil diff --git a/internal/h264/annexb_test.go b/internal/h264/annexb_test.go index 06f2b0dd..86d87ffc 100644 --- a/internal/h264/annexb_test.go +++ b/internal/h264/annexb_test.go @@ -63,31 +63,6 @@ var annexBCases = []struct { {0xee, 0xff}, }, }, - { - "anti-competition", - []byte{ - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x03, 0x00, - 0x00, 0x00, 0x03, 0x01, - 0x00, 0x00, 0x03, 0x02, - 0x00, 0x00, 0x03, 0x03, - }, - []byte{ - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x03, 0x00, - 0x00, 0x00, 0x03, 0x01, - 0x00, 0x00, 0x03, 0x02, - 0x00, 0x00, 0x03, 0x03, - }, - [][]byte{ - { - 0x00, 0x00, 0x00, - 0x00, 0x00, 0x01, - 0x00, 0x00, 0x02, - 0x00, 0x00, 0x03, - }, - }, - }, } func TestAnnexBDecode(t *testing.T) { diff --git a/internal/h264/anticompetition.go b/internal/h264/anticompetition.go new file mode 100644 index 00000000..3dcc7359 --- /dev/null +++ b/internal/h264/anticompetition.go @@ -0,0 +1,90 @@ +package h264 + +// AntiCompetitionAdd adds the anti-competition bytes to a NALU. +func AntiCompetitionAdd(nalu []byte) []byte { + var ret []byte + step := 0 + start := 0 + + for i, b := range nalu { + switch step { + case 0: + if b == 0 { + step++ + } + + case 1: + if b == 0 { + step++ + } else { + step = 0 + } + + case 2: + switch b { + case 3, 2, 1, 0: + ret = append(ret, nalu[start:i-2]...) + ret = append(ret, []byte{0x00, 0x00, 0x03, b}...) + step = 0 + start = i + 1 + + default: + step = 0 + } + } + } + + ret = append(ret, nalu[start:]...) + return ret +} + +// AntiCompetitionRemove removes the anti-competition bytes from a NALU. +func AntiCompetitionRemove(nalu []byte) []byte { + // 0x00 0x00 0x03 0x00 -> 0x00 0x00 0x00 + // 0x00 0x00 0x03 0x01 -> 0x00 0x00 0x01 + // 0x00 0x00 0x03 0x02 -> 0x00 0x00 0x02 + // 0x00 0x00 0x03 0x03 -> 0x00 0x00 0x03 + + var ret []byte + step := 0 + start := 0 + + for i, b := range nalu { + switch step { + case 0: + if b == 0 { + step++ + } + + case 1: + if b == 0 { + step++ + } else { + step = 0 + } + + case 2: + if b == 3 { + step++ + } else { + step = 0 + } + + case 3: + switch b { + case 3, 2, 1, 0: + ret = append(ret, nalu[start:i-3]...) + ret = append(ret, []byte{0x00, 0x00, b}...) + step = 0 + start = i + 1 + + default: + step = 0 + } + } + } + + ret = append(ret, nalu[start:]...) + + return ret +} diff --git a/internal/h264/anticompetition_test.go b/internal/h264/anticompetition_test.go new file mode 100644 index 00000000..214dca46 --- /dev/null +++ b/internal/h264/anticompetition_test.go @@ -0,0 +1,47 @@ +package h264 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +var casesAntiCompetition = []struct { + name string + unproc []byte + proc []byte +}{ + { + "base", + []byte{ + 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, + 0x00, 0x00, 0x02, + 0x00, 0x00, 0x03, + }, + []byte{ + 0x00, 0x00, 0x03, 0x00, + 0x00, 0x00, 0x03, 0x01, + 0x00, 0x00, 0x03, 0x02, + 0x00, 0x00, 0x03, 0x03, + }, + }, +} + +func TestAntiCompetitionAdd(t *testing.T) { + for _, ca := range casesAntiCompetition { + t.Run(ca.name, func(t *testing.T) { + proc := AntiCompetitionAdd(ca.unproc) + require.Equal(t, ca.proc, proc) + }) + } +} + +func TestAntiCompetitionRemove(t *testing.T) { + for _, ca := range casesAntiCompetition { + t.Run(ca.name, func(t *testing.T) { + unproc := AntiCompetitionRemove(ca.proc) + require.Equal(t, ca.unproc, unproc) + }) + } +} diff --git a/internal/h264/nalutype.go b/internal/h264/nalutype.go new file mode 100644 index 00000000..120f727c --- /dev/null +++ b/internal/h264/nalutype.go @@ -0,0 +1,106 @@ +package h264 + +import ( + "fmt" +) + +// NALUType is the type of a NALU. +type NALUType uint8 + +// standard NALU types. +const ( + NALUTypeNonIDR NALUType = 1 + NALUTypeDataPartitionA NALUType = 2 + NALUTypeDataPartitionB NALUType = 3 + NALUTypeDataPartitionC NALUType = 4 + NALUTypeIDR NALUType = 5 + NALUTypeSei NALUType = 6 + NALUTypeSPS NALUType = 7 + NALUTypePPS NALUType = 8 + NALUTypeAccessUnitDelimiter NALUType = 9 + NALUTypeEndOfSequence NALUType = 10 + NALUTypeEndOfStream NALUType = 11 + NALUTypeFillerData NALUType = 12 + NALUTypeSPSExtension NALUType = 13 + NALUTypePrefix NALUType = 14 + NALUTypeSubsetSPS NALUType = 15 + NALUTypeReserved16 NALUType = 16 + NALUTypeReserved17 NALUType = 17 + NALUTypeReserved18 NALUType = 18 + NALUTypeSliceLayerWithoutPartitioning NALUType = 19 + NALUTypeSliceExtension NALUType = 20 + NALUTypeSliceExtensionDepth NALUType = 21 + NALUTypeReserved22 NALUType = 22 + NALUTypeReserved23 NALUType = 23 + NALUTypeStapA NALUType = 24 + NALUTypeStapB NALUType = 25 + NALUTypeMtap16 NALUType = 26 + NALUTypeMtap24 NALUType = 27 + NALUTypeFuA NALUType = 28 + NALUTypeFuB NALUType = 29 +) + +// String implements fmt.Stringer. +func (nt NALUType) String() string { + switch nt { + case NALUTypeNonIDR: + return "NonIDR" + case NALUTypeDataPartitionA: + return "DataPartitionA" + case NALUTypeDataPartitionB: + return "DataPartitionB" + case NALUTypeDataPartitionC: + return "DataPartitionC" + case NALUTypeIDR: + return "IDR" + case NALUTypeSei: + return "Sei" + case NALUTypeSPS: + return "SPS" + case NALUTypePPS: + return "PPS" + case NALUTypeAccessUnitDelimiter: + return "AccessUnitDelimiter" + case NALUTypeEndOfSequence: + return "EndOfSequence" + case NALUTypeEndOfStream: + return "EndOfStream" + case NALUTypeFillerData: + return "FillerData" + case NALUTypeSPSExtension: + return "SPSExtension" + case NALUTypePrefix: + return "Prefix" + case NALUTypeSubsetSPS: + return "SubsetSPS" + case NALUTypeReserved16: + return "Reserved16" + case NALUTypeReserved17: + return "Reserved17" + case NALUTypeReserved18: + return "Reserved18" + case NALUTypeSliceLayerWithoutPartitioning: + return "SliceLayerWithoutPartitioning" + case NALUTypeSliceExtension: + return "SliceExtension" + case NALUTypeSliceExtensionDepth: + return "SliceExtensionDepth" + case NALUTypeReserved22: + return "Reserved22" + case NALUTypeReserved23: + return "Reserved23" + case NALUTypeStapA: + return "StapA" + case NALUTypeStapB: + return "StapB" + case NALUTypeMtap16: + return "Mtap16" + case NALUTypeMtap24: + return "Mtap24" + case NALUTypeFuA: + return "FuA" + case NALUTypeFuB: + return "FuB" + } + return fmt.Sprintf("unknown (%d)", nt) +}