diff --git a/internal/rtmp/base/base.go b/internal/rtmp/base/base.go index 3f1d82c2..abf062d2 100644 --- a/internal/rtmp/base/base.go +++ b/internal/rtmp/base/base.go @@ -1,5 +1,10 @@ package base +const ( + // ControlChunkStreamID is the stream ID used for control messages. + ControlChunkStreamID = 2 +) + const ( rtmpVersion = 0x03 ) diff --git a/internal/rtmp/base/chunk0.go b/internal/rtmp/base/chunk0.go index 39087aa6..17a4596e 100644 --- a/internal/rtmp/base/chunk0.go +++ b/internal/rtmp/base/chunk0.go @@ -12,7 +12,7 @@ import ( type Chunk0 struct { ChunkStreamID byte Timestamp uint32 - Typ byte + Type MessageType MessageStreamID uint32 BodyLen uint32 Body []byte @@ -33,7 +33,7 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error { c.ChunkStreamID = header[0] & 0x3F c.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1]) c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6]) - c.Typ = header[7] + c.Type = MessageType(header[7]) c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11]) chunkBodyLen := int(c.BodyLen) @@ -56,7 +56,7 @@ func (c Chunk0) Write(w io.Writer) error { header[4] = byte(c.BodyLen >> 16) header[5] = byte(c.BodyLen >> 8) header[6] = byte(c.BodyLen) - header[7] = c.Typ + header[7] = byte(c.Type) header[8] = byte(c.MessageStreamID) _, err := w.Write(header) if err != nil { diff --git a/internal/rtmp/base/chunk1.go b/internal/rtmp/base/chunk1.go index 143cb809..7745e061 100644 --- a/internal/rtmp/base/chunk1.go +++ b/internal/rtmp/base/chunk1.go @@ -12,7 +12,7 @@ import ( // message after the first. type Chunk1 struct { ChunkStreamID byte - Typ byte + Type MessageType BodyLen uint32 Body []byte } @@ -24,7 +24,7 @@ func (c Chunk1) Write(w io.Writer) error { header[4] = byte(c.BodyLen >> 16) header[5] = byte(c.BodyLen >> 8) header[6] = byte(c.BodyLen) - header[7] = c.Typ + header[7] = byte(c.Type) _, err := w.Write(header) if err != nil { return err diff --git a/internal/rtmp/base/message.go b/internal/rtmp/base/message.go index 8578074b..2883f996 100644 --- a/internal/rtmp/base/message.go +++ b/internal/rtmp/base/message.go @@ -4,7 +4,7 @@ package base type Message struct { ChunkStreamID byte Timestamp uint32 - Typ byte + Type MessageType MessageStreamID uint32 Body []byte } diff --git a/internal/rtmp/base/messagetype.go b/internal/rtmp/base/messagetype.go new file mode 100644 index 00000000..17cf99b4 --- /dev/null +++ b/internal/rtmp/base/messagetype.go @@ -0,0 +1,13 @@ +package base + +// MessageType is a message type. +type MessageType byte + +// message types. +const ( + MessageTypeSetChunkSize MessageType = 1 + MessageTypeAbortMessage MessageType = 2 + MessageTypeAcknowledge MessageType = 3 + MessageTypeSetWindowAckSize MessageType = 5 + MessageTypeSetPeerBandwidth MessageType = 6 +) diff --git a/internal/rtmp/base/messagewriter.go b/internal/rtmp/base/messagewriter.go index ba268d61..ad4736ec 100644 --- a/internal/rtmp/base/messagewriter.go +++ b/internal/rtmp/base/messagewriter.go @@ -43,7 +43,7 @@ func (mw *MessageWriter) Write(msg *Message) error { if v, ok := mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID]; !ok || v != msg.MessageStreamID { err := Chunk0{ ChunkStreamID: msg.ChunkStreamID, - Typ: msg.Typ, + Type: msg.Type, MessageStreamID: msg.MessageStreamID, BodyLen: uint32(bodyLen), Body: msg.Body[pos : pos+chunkBodyLen], @@ -56,7 +56,7 @@ func (mw *MessageWriter) Write(msg *Message) error { } else { err := Chunk1{ ChunkStreamID: msg.ChunkStreamID, - Typ: msg.Typ, + Type: msg.Type, BodyLen: uint32(bodyLen), Body: msg.Body[pos : pos+chunkBodyLen], }.Write(mw.w) diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index 2e02dbb8..35a7bef4 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -153,7 +153,7 @@ func TestReadTracks(t *testing.T) { }) err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: byts, }) require.NoError(t, err) @@ -163,8 +163,8 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 5, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetWindowAckSize, BodyLen: 4, Body: []byte{0x00, 38, 37, 160}, }, c0) @@ -173,8 +173,8 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 6, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetPeerBandwidth, BodyLen: 5, Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02}, }, c0) @@ -183,8 +183,8 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 1, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetChunkSize, BodyLen: 4, Body: []byte{0x00, 0x01, 0x00, 0x00}, }, c0) @@ -193,7 +193,7 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(3), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err := flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -213,8 +213,8 @@ func TestReadTracks(t *testing.T) { // C->S set chunk size err = mw.Write(&base.Message{ - ChunkStreamID: 2, - Typ: 1, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetChunkSize, Body: []byte{0x00, 0x01, 0x00, 0x00}, }) require.NoError(t, err) @@ -224,7 +224,7 @@ func TestReadTracks(t *testing.T) { // C->S releaseStream err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: flvio.FillAMF0ValsMalloc([]interface{}{ "releaseStream", float64(2), @@ -237,7 +237,7 @@ func TestReadTracks(t *testing.T) { // C->S FCPublish err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: flvio.FillAMF0ValsMalloc([]interface{}{ "FCPublish", float64(3), @@ -250,7 +250,7 @@ func TestReadTracks(t *testing.T) { // C->S createStream err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: flvio.FillAMF0ValsMalloc([]interface{}{ "createStream", float64(4), @@ -263,7 +263,7 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(3), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -276,7 +276,7 @@ func TestReadTracks(t *testing.T) { // C->S publish err = mw.Write(&base.Message{ ChunkStreamID: 8, - Typ: 0x14, + Type: 0x14, MessageStreamID: 1, Body: flvio.FillAMF0ValsMalloc([]interface{}{ "publish", @@ -292,7 +292,7 @@ func TestReadTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(5), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -333,7 +333,7 @@ func TestReadTracks(t *testing.T) { }) err = mw.Write(&base.Message{ ChunkStreamID: 4, - Typ: 0x12, + Type: 0x12, MessageStreamID: 1, Body: byts, }) @@ -354,7 +354,7 @@ func TestReadTracks(t *testing.T) { body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) err = mw.Write(&base.Message{ ChunkStreamID: 6, - Typ: flvio.TAG_VIDEO, + Type: flvio.TAG_VIDEO, MessageStreamID: 1, Body: body, }) @@ -369,7 +369,7 @@ func TestReadTracks(t *testing.T) { require.NoError(t, err) err = mw.Write(&base.Message{ ChunkStreamID: 4, - Typ: flvio.TAG_AUDIO, + Type: flvio.TAG_AUDIO, MessageStreamID: 1, Body: append([]byte{ flvio.SOUND_AAC<<4 | flvio.SOUND_44Khz<<2 | flvio.SOUND_16BIT<<1 | flvio.SOUND_STEREO, @@ -400,7 +400,7 @@ func TestReadTracks(t *testing.T) { }) err = mw.Write(&base.Message{ ChunkStreamID: 4, - Typ: 0x12, + Type: 0x12, MessageStreamID: 1, Body: byts, }) @@ -421,7 +421,7 @@ func TestReadTracks(t *testing.T) { body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) err = mw.Write(&base.Message{ ChunkStreamID: 6, - Typ: flvio.TAG_VIDEO, + Type: flvio.TAG_VIDEO, MessageStreamID: 1, Body: body, }) @@ -443,7 +443,7 @@ func TestReadTracks(t *testing.T) { body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...) err = mw.Write(&base.Message{ ChunkStreamID: 6, - Typ: flvio.TAG_VIDEO, + Type: flvio.TAG_VIDEO, MessageStreamID: 1, Body: body, }) @@ -532,7 +532,7 @@ func TestWriteTracks(t *testing.T) { }) err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: byts, }) require.NoError(t, err) @@ -542,8 +542,8 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 5, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetWindowAckSize, BodyLen: 4, Body: []byte{0x00, 38, 37, 160}, }, c0) @@ -552,8 +552,8 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 6, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetPeerBandwidth, BodyLen: 5, Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02}, }, c0) @@ -562,8 +562,8 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 128) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 1, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetChunkSize, BodyLen: 4, Body: []byte{0x00, 0x01, 0x00, 0x00}, }, c0) @@ -572,7 +572,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(3), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err := flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -592,24 +592,26 @@ func TestWriteTracks(t *testing.T) { // C->S window acknowledgement size err = mw.Write(&base.Message{ - ChunkStreamID: 2, - Typ: 0x05, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetWindowAckSize, Body: []byte{0x00, 0x26, 0x25, 0xa0}, }) require.NoError(t, err) // C->S set chunk size err = mw.Write(&base.Message{ - ChunkStreamID: 2, - Typ: 1, + ChunkStreamID: base.ControlChunkStreamID, + Type: base.MessageTypeSetChunkSize, Body: []byte{0x00, 0x01, 0x00, 0x00}, }) require.NoError(t, err) + mw.SetChunkSize(65536) + // C->S createStream err = mw.Write(&base.Message{ ChunkStreamID: 3, - Typ: 0x14, + Type: 0x14, Body: flvio.FillAMF0ValsMalloc([]interface{}{ "createStream", float64(2), @@ -622,7 +624,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(3), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -655,7 +657,7 @@ func TestWriteTracks(t *testing.T) { }) err = mw.Write(&base.Message{ ChunkStreamID: 8, - Typ: 0x14, + Type: 0x14, Body: byts, }) require.NoError(t, err) @@ -664,8 +666,8 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 4, + ChunkStreamID: base.ControlChunkStreamID, + Type: 4, BodyLen: 6, Body: []byte{0x00, 0x04, 0x00, 0x00, 0x00, 0x01}, }, c0) @@ -674,8 +676,8 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, base.Chunk0{ - ChunkStreamID: 2, - Typ: 4, + ChunkStreamID: base.ControlChunkStreamID, + Type: 4, BodyLen: 6, Body: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x01}, }, c0) @@ -684,7 +686,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(5), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -702,7 +704,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(5), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -720,7 +722,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(5), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -738,7 +740,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(5), c0.ChunkStreamID) - require.Equal(t, uint8(0x14), c0.Typ) + require.Equal(t, base.MessageType(0x14), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -756,7 +758,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(4), c0.ChunkStreamID) - require.Equal(t, uint8(0x12), c0.Typ) + require.Equal(t, base.MessageType(0x12), c0.Type) arr, err = flvio.ParseAMFVals(c0.Body, false) require.NoError(t, err) require.Equal(t, []interface{}{ @@ -773,7 +775,7 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(6), c0.ChunkStreamID) - require.Equal(t, uint8(0x09), c0.Typ) + require.Equal(t, base.MessageType(0x09), c0.Type) require.Equal(t, []byte{ 0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0, 0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0, @@ -787,6 +789,6 @@ func TestWriteTracks(t *testing.T) { err = c0.Read(conn, 65536) require.NoError(t, err) require.Equal(t, uint8(4), c0.ChunkStreamID) - require.Equal(t, uint8(0x08), c0.Typ) + require.Equal(t, base.MessageType(0x08), c0.Type) require.Equal(t, []byte{0xae, 0x0, 0x12, 0x10}, c0.Body) }