Browse Source

rtmp: fix decoding of chunk3 + chunk3

pull/1060/head
aler9 4 years ago
parent
commit
f7c08f577a
  1. 5
      internal/rtmp/rawmessage/reader.go
  2. 335
      internal/rtmp/rawmessage/reader_test.go
  3. 146
      internal/rtmp/rawmessage/writer_test.go

5
internal/rtmp/rawmessage/reader.go

@ -201,6 +201,11 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
v1 := *rc.curTimestamp + *rc.curTimestampDelta v1 := *rc.curTimestamp + *rc.curTimestampDelta
rc.curTimestamp = &v1 rc.curTimestamp = &v1
if *rc.curBodyLen != uint32(len(c3.Body)) {
rc.curBody = &c3.Body
return nil, errMoreChunksNeeded
}
return &Message{ return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType, Type: *rc.curType,

335
internal/rtmp/rawmessage/reader_test.go

@ -11,154 +11,191 @@ import (
"github.com/aler9/rtsp-simple-server/internal/rtmp/chunk" "github.com/aler9/rtsp-simple-server/internal/rtmp/chunk"
) )
func TestReader(t *testing.T) { var cases = []struct {
type sequenceEntry struct { name string
chunk chunk.Chunk messages []*Message
msg *Message chunks []chunk.Chunk
} chunkSizes []uint32
}{
for _, ca := range []struct { {
name string "(chunk0) + (chunk1)",
sequence []sequenceEntry []*Message{
}{ {
{ ChunkStreamID: 27,
"chunk0 + chunk1", Timestamp: 18576 * time.Millisecond,
[]sequenceEntry{ Type: chunk.MessageTypeSetPeerBandwidth,
{ MessageStreamID: 3123,
&chunk.Chunk0{ Body: bytes.Repeat([]byte{0x03}, 64),
ChunkStreamID: 27, },
Timestamp: 18576, {
Type: chunk.MessageTypeSetPeerBandwidth, ChunkStreamID: 27,
MessageStreamID: 3123, Timestamp: (18576 + 15) * time.Millisecond,
BodyLen: 64, Type: chunk.MessageTypeSetWindowAckSize,
Body: bytes.Repeat([]byte{0x02}, 64), MessageStreamID: 3123,
}, Body: bytes.Repeat([]byte{0x04}, 64),
&Message{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x02}, 64),
},
},
{
&chunk.Chunk1{
ChunkStreamID: 27,
TimestampDelta: 15,
Type: chunk.MessageTypeSetPeerBandwidth,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
},
&Message{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
},
},
}, },
}, },
{ []chunk.Chunk{
"chunk0 + chunk2 + chunk3", &chunk.Chunk0{
[]sequenceEntry{ ChunkStreamID: 27,
{ Timestamp: 18576,
&chunk.Chunk0{ Type: chunk.MessageTypeSetPeerBandwidth,
ChunkStreamID: 27, MessageStreamID: 3123,
Timestamp: 18576, BodyLen: 64,
Type: chunk.MessageTypeSetPeerBandwidth, Body: bytes.Repeat([]byte{0x03}, 64),
MessageStreamID: 3123, },
BodyLen: 64, &chunk.Chunk1{
Body: bytes.Repeat([]byte{0x02}, 64), ChunkStreamID: 27,
}, TimestampDelta: 15,
&Message{ Type: chunk.MessageTypeSetWindowAckSize,
ChunkStreamID: 27, BodyLen: 64,
Timestamp: 18576 * time.Millisecond, Body: bytes.Repeat([]byte{0x04}, 64),
Type: chunk.MessageTypeSetPeerBandwidth, },
MessageStreamID: 3123, },
Body: bytes.Repeat([]byte{0x02}, 64), []uint32{
}, 128,
}, 128,
{ },
&chunk.Chunk2{ },
ChunkStreamID: 27, {
TimestampDelta: 15, "(chunk0) + (chunk2) + (chunk3)",
Body: bytes.Repeat([]byte{0x03}, 64), []*Message{
}, {
&Message{ ChunkStreamID: 27,
ChunkStreamID: 27, Timestamp: 18576 * time.Millisecond,
Timestamp: (18576 + 15) * time.Millisecond, Type: chunk.MessageTypeSetPeerBandwidth,
Type: chunk.MessageTypeSetPeerBandwidth, MessageStreamID: 3123,
MessageStreamID: 3123, Body: bytes.Repeat([]byte{0x03}, 64),
Body: bytes.Repeat([]byte{0x03}, 64), },
}, {
}, ChunkStreamID: 27,
{ Timestamp: (18576 + 15) * time.Millisecond,
&chunk.Chunk3{ Type: chunk.MessageTypeSetPeerBandwidth,
ChunkStreamID: 27, MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64), Body: bytes.Repeat([]byte{0x04}, 64),
}, },
&Message{ {
ChunkStreamID: 27, ChunkStreamID: 27,
Timestamp: (18576 + 15 + 15) * time.Millisecond, Timestamp: (18576 + 15 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth, Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123, MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64), Body: bytes.Repeat([]byte{0x05}, 64),
},
},
}, },
}, },
{ []chunk.Chunk{
"chunk0 + chunk3 + chunk2 + chunk3", &chunk.Chunk0{
[]sequenceEntry{ ChunkStreamID: 27,
{ Timestamp: 18576,
&chunk.Chunk0{ Type: chunk.MessageTypeSetPeerBandwidth,
ChunkStreamID: 27, MessageStreamID: 3123,
Timestamp: 18576, BodyLen: 64,
Type: chunk.MessageTypeSetPeerBandwidth, Body: bytes.Repeat([]byte{0x03}, 64),
MessageStreamID: 3123, },
BodyLen: 192, &chunk.Chunk2{
Body: bytes.Repeat([]byte{0x03}, 128), ChunkStreamID: 27,
}, TimestampDelta: 15,
nil, Body: bytes.Repeat([]byte{0x04}, 64),
}, },
{ &chunk.Chunk3{
&chunk.Chunk3{ ChunkStreamID: 27,
ChunkStreamID: 27, Body: bytes.Repeat([]byte{0x05}, 64),
Body: bytes.Repeat([]byte{0x03}, 64),
},
&Message{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 192),
},
},
{
&chunk.Chunk2{
ChunkStreamID: 27,
TimestampDelta: 15,
Body: bytes.Repeat([]byte{0x04}, 128),
},
nil,
},
{
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x04}, 64),
},
&Message{
ChunkStreamID: 27,
Timestamp: 18591 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 192),
},
},
}, },
}, },
} { []uint32{
128,
64,
64,
},
},
{
"(chunk0 + chunk3) + (chunk1 + chunk3) + (chunk2 + chunk3) + (chunk3 + chunk3)",
[]*Message{
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 190),
},
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 192),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x05}, 192),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x06}, 192),
},
},
[]chunk.Chunk{
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 190,
Body: bytes.Repeat([]byte{0x03}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x03}, 62),
},
&chunk.Chunk1{
ChunkStreamID: 27,
TimestampDelta: 0,
Type: chunk.MessageTypeSetPeerBandwidth,
BodyLen: 192,
Body: bytes.Repeat([]byte{0x04}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x04}, 64),
},
&chunk.Chunk2{
ChunkStreamID: 27,
TimestampDelta: 15,
Body: bytes.Repeat([]byte{0x05}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x05}, 64),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x06}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x06}, 64),
},
},
[]uint32{
128,
62,
128,
64,
128,
64,
128,
64,
},
},
}
func TestReader(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer var buf bytes.Buffer
bcr := bytecounter.NewReader(&buf) bcr := bytecounter.NewReader(&buf)
@ -166,16 +203,16 @@ func TestReader(t *testing.T) {
return nil return nil
}) })
for _, entry := range ca.sequence { for _, cach := range ca.chunks {
buf2, err := entry.chunk.Marshal() buf2, err := cach.Marshal()
require.NoError(t, err) require.NoError(t, err)
buf.Write(buf2) buf.Write(buf2)
}
if entry.msg != nil { for _, camsg := range ca.messages {
msg, err := r.Read() msg, err := r.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, entry.msg, msg) require.Equal(t, camsg, msg)
}
} }
}) })
} }

146
internal/rtmp/rawmessage/writer_test.go

@ -12,151 +12,7 @@ import (
) )
func TestWriter(t *testing.T) { func TestWriter(t *testing.T) {
for _, ca := range []struct { for _, ca := range cases {
name string
messages []*Message
chunks []chunk.Chunk
chunkSizes []uint32
}{
{
"chunk0 + chunk1",
[]*Message{
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetWindowAckSize,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64),
},
},
[]chunk.Chunk{
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
},
&chunk.Chunk1{
ChunkStreamID: 27,
TimestampDelta: 15,
Type: chunk.MessageTypeSetWindowAckSize,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x04}, 64),
},
},
[]uint32{
128,
128,
},
},
{
"chunk0 + chunk2 + chunk3",
[]*Message{
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 64),
},
{
ChunkStreamID: 27,
Timestamp: (18576 + 15 + 15) * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x05}, 64),
},
},
[]chunk.Chunk{
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
},
&chunk.Chunk2{
ChunkStreamID: 27,
TimestampDelta: 15,
Body: bytes.Repeat([]byte{0x04}, 64),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x05}, 64),
},
},
[]uint32{
128,
64,
64,
},
},
{
"chunk0 + chunk3 + chunk2 + chunk3",
[]*Message{
{
ChunkStreamID: 27,
Timestamp: 18576 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 192),
},
{
ChunkStreamID: 27,
Timestamp: 18591 * time.Millisecond,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x04}, 192),
},
},
[]chunk.Chunk{
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 192,
Body: bytes.Repeat([]byte{0x03}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x03}, 64),
},
&chunk.Chunk2{
ChunkStreamID: 27,
TimestampDelta: 15,
Body: bytes.Repeat([]byte{0x04}, 128),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{0x04}, 64),
},
},
[]uint32{
128,
64,
128,
64,
},
},
} {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer var buf bytes.Buffer
w := NewWriter(bytecounter.NewWriter(&buf), true) w := NewWriter(bytecounter.NewWriter(&buf), true)

Loading…
Cancel
Save