Browse Source

rtmp: add MessageType, ControlChunkStreamID

pull/956/head
aler9 3 years ago
parent
commit
896c799f99
  1. 5
      internal/rtmp/base/base.go
  2. 6
      internal/rtmp/base/chunk0.go
  3. 4
      internal/rtmp/base/chunk1.go
  4. 2
      internal/rtmp/base/message.go
  5. 13
      internal/rtmp/base/messagetype.go
  6. 4
      internal/rtmp/base/messagewriter.go
  7. 98
      internal/rtmp/conn_test.go

5
internal/rtmp/base/base.go

@ -1,5 +1,10 @@ @@ -1,5 +1,10 @@
package base
const (
// ControlChunkStreamID is the stream ID used for control messages.
ControlChunkStreamID = 2
)
const (
rtmpVersion = 0x03
)

6
internal/rtmp/base/chunk0.go

@ -12,7 +12,7 @@ import ( @@ -12,7 +12,7 @@ import (
type Chunk0 struct {
ChunkStreamID byte
Timestamp uint32
Typ byte
Type MessageType
MessageStreamID uint32
BodyLen uint32
Body []byte
@ -33,7 +33,7 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error { @@ -33,7 +33,7 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error {
c.ChunkStreamID = header[0] & 0x3F
c.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Typ = header[7]
c.Type = MessageType(header[7])
c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])
chunkBodyLen := int(c.BodyLen)
@ -56,7 +56,7 @@ func (c Chunk0) Write(w io.Writer) error { @@ -56,7 +56,7 @@ func (c Chunk0) Write(w io.Writer) error {
header[4] = byte(c.BodyLen >> 16)
header[5] = byte(c.BodyLen >> 8)
header[6] = byte(c.BodyLen)
header[7] = c.Typ
header[7] = byte(c.Type)
header[8] = byte(c.MessageStreamID)
_, err := w.Write(header)
if err != nil {

4
internal/rtmp/base/chunk1.go

@ -12,7 +12,7 @@ import ( @@ -12,7 +12,7 @@ import (
// message after the first.
type Chunk1 struct {
ChunkStreamID byte
Typ byte
Type MessageType
BodyLen uint32
Body []byte
}
@ -24,7 +24,7 @@ func (c Chunk1) Write(w io.Writer) error { @@ -24,7 +24,7 @@ func (c Chunk1) Write(w io.Writer) error {
header[4] = byte(c.BodyLen >> 16)
header[5] = byte(c.BodyLen >> 8)
header[6] = byte(c.BodyLen)
header[7] = c.Typ
header[7] = byte(c.Type)
_, err := w.Write(header)
if err != nil {
return err

2
internal/rtmp/base/message.go

@ -4,7 +4,7 @@ package base @@ -4,7 +4,7 @@ package base
type Message struct {
ChunkStreamID byte
Timestamp uint32
Typ byte
Type MessageType
MessageStreamID uint32
Body []byte
}

13
internal/rtmp/base/messagetype.go

@ -0,0 +1,13 @@ @@ -0,0 +1,13 @@
package base
// MessageType is a message type.
type MessageType byte
// message types.
const (
MessageTypeSetChunkSize MessageType = 1
MessageTypeAbortMessage MessageType = 2
MessageTypeAcknowledge MessageType = 3
MessageTypeSetWindowAckSize MessageType = 5
MessageTypeSetPeerBandwidth MessageType = 6
)

4
internal/rtmp/base/messagewriter.go

@ -43,7 +43,7 @@ func (mw *MessageWriter) Write(msg *Message) error { @@ -43,7 +43,7 @@ func (mw *MessageWriter) Write(msg *Message) error {
if v, ok := mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID]; !ok || v != msg.MessageStreamID {
err := Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Typ: msg.Typ,
Type: msg.Type,
MessageStreamID: msg.MessageStreamID,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
@ -56,7 +56,7 @@ func (mw *MessageWriter) Write(msg *Message) error { @@ -56,7 +56,7 @@ func (mw *MessageWriter) Write(msg *Message) error {
} else {
err := Chunk1{
ChunkStreamID: msg.ChunkStreamID,
Typ: msg.Typ,
Type: msg.Type,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w)

98
internal/rtmp/conn_test.go

@ -153,7 +153,7 @@ func TestReadTracks(t *testing.T) { @@ -153,7 +153,7 @@ func TestReadTracks(t *testing.T) {
})
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: byts,
})
require.NoError(t, err)
@ -163,8 +163,8 @@ func TestReadTracks(t *testing.T) { @@ -163,8 +163,8 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 5,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
BodyLen: 4,
Body: []byte{0x00, 38, 37, 160},
}, c0)
@ -173,8 +173,8 @@ func TestReadTracks(t *testing.T) { @@ -173,8 +173,8 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 6,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
BodyLen: 5,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
}, c0)
@ -183,8 +183,8 @@ func TestReadTracks(t *testing.T) { @@ -183,8 +183,8 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 1,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
BodyLen: 4,
Body: []byte{0x00, 0x01, 0x00, 0x00},
}, c0)
@ -193,7 +193,7 @@ func TestReadTracks(t *testing.T) { @@ -193,7 +193,7 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(3), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err := flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -213,8 +213,8 @@ func TestReadTracks(t *testing.T) { @@ -213,8 +213,8 @@ func TestReadTracks(t *testing.T) {
// C->S set chunk size
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 1,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
})
require.NoError(t, err)
@ -224,7 +224,7 @@ func TestReadTracks(t *testing.T) { @@ -224,7 +224,7 @@ func TestReadTracks(t *testing.T) {
// C->S releaseStream
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"releaseStream",
float64(2),
@ -237,7 +237,7 @@ func TestReadTracks(t *testing.T) { @@ -237,7 +237,7 @@ func TestReadTracks(t *testing.T) {
// C->S FCPublish
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"FCPublish",
float64(3),
@ -250,7 +250,7 @@ func TestReadTracks(t *testing.T) { @@ -250,7 +250,7 @@ func TestReadTracks(t *testing.T) {
// C->S createStream
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"createStream",
float64(4),
@ -263,7 +263,7 @@ func TestReadTracks(t *testing.T) { @@ -263,7 +263,7 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(3), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -276,7 +276,7 @@ func TestReadTracks(t *testing.T) { @@ -276,7 +276,7 @@ func TestReadTracks(t *testing.T) {
// C->S publish
err = mw.Write(&base.Message{
ChunkStreamID: 8,
Typ: 0x14,
Type: 0x14,
MessageStreamID: 1,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"publish",
@ -292,7 +292,7 @@ func TestReadTracks(t *testing.T) { @@ -292,7 +292,7 @@ func TestReadTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(5), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -333,7 +333,7 @@ func TestReadTracks(t *testing.T) { @@ -333,7 +333,7 @@ func TestReadTracks(t *testing.T) {
})
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: 0x12,
Type: 0x12,
MessageStreamID: 1,
Body: byts,
})
@ -354,7 +354,7 @@ func TestReadTracks(t *testing.T) { @@ -354,7 +354,7 @@ func TestReadTracks(t *testing.T) {
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
Type: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
@ -369,7 +369,7 @@ func TestReadTracks(t *testing.T) { @@ -369,7 +369,7 @@ func TestReadTracks(t *testing.T) {
require.NoError(t, err)
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: flvio.TAG_AUDIO,
Type: flvio.TAG_AUDIO,
MessageStreamID: 1,
Body: append([]byte{
flvio.SOUND_AAC<<4 | flvio.SOUND_44Khz<<2 | flvio.SOUND_16BIT<<1 | flvio.SOUND_STEREO,
@ -400,7 +400,7 @@ func TestReadTracks(t *testing.T) { @@ -400,7 +400,7 @@ func TestReadTracks(t *testing.T) {
})
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: 0x12,
Type: 0x12,
MessageStreamID: 1,
Body: byts,
})
@ -421,7 +421,7 @@ func TestReadTracks(t *testing.T) { @@ -421,7 +421,7 @@ func TestReadTracks(t *testing.T) {
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
Type: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
@ -443,7 +443,7 @@ func TestReadTracks(t *testing.T) { @@ -443,7 +443,7 @@ func TestReadTracks(t *testing.T) {
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
Type: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
@ -532,7 +532,7 @@ func TestWriteTracks(t *testing.T) { @@ -532,7 +532,7 @@ func TestWriteTracks(t *testing.T) {
})
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: byts,
})
require.NoError(t, err)
@ -542,8 +542,8 @@ func TestWriteTracks(t *testing.T) { @@ -542,8 +542,8 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 5,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
BodyLen: 4,
Body: []byte{0x00, 38, 37, 160},
}, c0)
@ -552,8 +552,8 @@ func TestWriteTracks(t *testing.T) { @@ -552,8 +552,8 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 6,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
BodyLen: 5,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
}, c0)
@ -562,8 +562,8 @@ func TestWriteTracks(t *testing.T) { @@ -562,8 +562,8 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 128)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 1,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
BodyLen: 4,
Body: []byte{0x00, 0x01, 0x00, 0x00},
}, c0)
@ -572,7 +572,7 @@ func TestWriteTracks(t *testing.T) { @@ -572,7 +572,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(3), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err := flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -592,24 +592,26 @@ func TestWriteTracks(t *testing.T) { @@ -592,24 +592,26 @@ func TestWriteTracks(t *testing.T) {
// C->S window acknowledgement size
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 0x05,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 0x26, 0x25, 0xa0},
})
require.NoError(t, err)
// C->S set chunk size
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 1,
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
})
require.NoError(t, err)
mw.SetChunkSize(65536)
// C->S createStream
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Type: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"createStream",
float64(2),
@ -622,7 +624,7 @@ func TestWriteTracks(t *testing.T) { @@ -622,7 +624,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(3), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -655,7 +657,7 @@ func TestWriteTracks(t *testing.T) { @@ -655,7 +657,7 @@ func TestWriteTracks(t *testing.T) {
})
err = mw.Write(&base.Message{
ChunkStreamID: 8,
Typ: 0x14,
Type: 0x14,
Body: byts,
})
require.NoError(t, err)
@ -664,8 +666,8 @@ func TestWriteTracks(t *testing.T) { @@ -664,8 +666,8 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 4,
ChunkStreamID: base.ControlChunkStreamID,
Type: 4,
BodyLen: 6,
Body: []byte{0x00, 0x04, 0x00, 0x00, 0x00, 0x01},
}, c0)
@ -674,8 +676,8 @@ func TestWriteTracks(t *testing.T) { @@ -674,8 +676,8 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, base.Chunk0{
ChunkStreamID: 2,
Typ: 4,
ChunkStreamID: base.ControlChunkStreamID,
Type: 4,
BodyLen: 6,
Body: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x01},
}, c0)
@ -684,7 +686,7 @@ func TestWriteTracks(t *testing.T) { @@ -684,7 +686,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(5), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -702,7 +704,7 @@ func TestWriteTracks(t *testing.T) { @@ -702,7 +704,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(5), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -720,7 +722,7 @@ func TestWriteTracks(t *testing.T) { @@ -720,7 +722,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(5), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -738,7 +740,7 @@ func TestWriteTracks(t *testing.T) { @@ -738,7 +740,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(5), c0.ChunkStreamID)
require.Equal(t, uint8(0x14), c0.Typ)
require.Equal(t, base.MessageType(0x14), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -756,7 +758,7 @@ func TestWriteTracks(t *testing.T) { @@ -756,7 +758,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(4), c0.ChunkStreamID)
require.Equal(t, uint8(0x12), c0.Typ)
require.Equal(t, base.MessageType(0x12), c0.Type)
arr, err = flvio.ParseAMFVals(c0.Body, false)
require.NoError(t, err)
require.Equal(t, []interface{}{
@ -773,7 +775,7 @@ func TestWriteTracks(t *testing.T) { @@ -773,7 +775,7 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(6), c0.ChunkStreamID)
require.Equal(t, uint8(0x09), c0.Typ)
require.Equal(t, base.MessageType(0x09), c0.Type)
require.Equal(t, []byte{
0x17, 0x0, 0x0, 0x0, 0x0, 0x1, 0x64, 0x0,
0xc, 0xff, 0xe1, 0x0, 0x15, 0x67, 0x64, 0x0,
@ -787,6 +789,6 @@ func TestWriteTracks(t *testing.T) { @@ -787,6 +789,6 @@ func TestWriteTracks(t *testing.T) {
err = c0.Read(conn, 65536)
require.NoError(t, err)
require.Equal(t, uint8(4), c0.ChunkStreamID)
require.Equal(t, uint8(0x08), c0.Typ)
require.Equal(t, base.MessageType(0x08), c0.Type)
require.Equal(t, []byte{0xae, 0x0, 0x12, 0x10}, c0.Body)
}

Loading…
Cancel
Save