Browse Source

rtmp: add MessageWriter

pull/956/head
aler9 4 years ago
parent
commit
9519bf670a
  1. 52
      internal/rtmp/base/chunk0.go
  2. 16
      internal/rtmp/base/chunk1.go
  3. 6
      internal/rtmp/base/chunk3.go
  4. 10
      internal/rtmp/base/message.go
  5. 83
      internal/rtmp/base/messagewriter.go
  6. 172
      internal/rtmp/conn_test.go

52
internal/rtmp/base/chunk0.go

@ -10,16 +10,16 @@ import ( @@ -10,16 +10,16 @@ import (
// the start of a chunk stream, and whenever the stream timestamp goes
// backward (e.g., because of a backward seek).
type Chunk0 struct {
ChunkStreamID byte
Timestamp uint32
Typ byte
StreamID uint32
BodyLen uint32
Body []byte
ChunkStreamID byte
Timestamp uint32
Typ byte
MessageStreamID uint32
BodyLen uint32
Body []byte
}
// Read reads the chunk.
func (m *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error {
func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error {
header := make([]byte, 12)
_, err := r.Read(header)
if err != nil {
@ -30,39 +30,39 @@ func (m *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error { @@ -30,39 +30,39 @@ func (m *Chunk0) Read(r io.Reader, chunkMaxBodyLen int) error {
return fmt.Errorf("wrong chunk header type")
}
m.ChunkStreamID = header[0] & 0x3F
m.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
m.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
m.Typ = header[7]
m.StreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])
c.ChunkStreamID = header[0] & 0x3F
c.Timestamp = uint32(header[3])<<16 | uint32(header[2])<<8 | uint32(header[1])
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Typ = header[7]
c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])
chunkBodyLen := int(m.BodyLen)
chunkBodyLen := int(c.BodyLen)
if chunkBodyLen > chunkMaxBodyLen {
chunkBodyLen = chunkMaxBodyLen
}
m.Body = make([]byte, chunkBodyLen)
_, err = r.Read(m.Body)
c.Body = make([]byte, chunkBodyLen)
_, err = r.Read(c.Body)
return err
}
// Write writes the chunk.
func (m Chunk0) Write(w io.Writer) error {
func (c Chunk0) Write(w io.Writer) error {
header := make([]byte, 12)
header[0] = m.ChunkStreamID
header[1] = byte(m.Timestamp >> 16)
header[2] = byte(m.Timestamp >> 8)
header[3] = byte(m.Timestamp)
header[4] = byte(m.BodyLen >> 16)
header[5] = byte(m.BodyLen >> 8)
header[6] = byte(m.BodyLen)
header[7] = m.Typ
header[8] = byte(m.StreamID)
header[0] = c.ChunkStreamID
header[1] = byte(c.Timestamp >> 16)
header[2] = byte(c.Timestamp >> 8)
header[3] = byte(c.Timestamp)
header[4] = byte(c.BodyLen >> 16)
header[5] = byte(c.BodyLen >> 8)
header[6] = byte(c.BodyLen)
header[7] = c.Typ
header[8] = byte(c.MessageStreamID)
_, err := w.Write(header)
if err != nil {
return err
}
_, err = w.Write(m.Body)
_, err = w.Write(c.Body)
return err
}

16
internal/rtmp/base/chunk1.go

@ -13,23 +13,23 @@ import ( @@ -13,23 +13,23 @@ import (
type Chunk1 struct {
ChunkStreamID byte
Typ byte
BodyLen uint32
Body []byte
}
// Write writes the chunk.
func (m Chunk1) Write(w io.Writer) error {
func (c Chunk1) Write(w io.Writer) error {
header := make([]byte, 8)
header[0] = 1<<6 | m.ChunkStreamID
l := uint32(len(m.Body))
header[4] = byte(l >> 16)
header[5] = byte(l >> 8)
header[6] = byte(l)
header[7] = m.Typ
header[0] = 1<<6 | c.ChunkStreamID
header[4] = byte(c.BodyLen >> 16)
header[5] = byte(c.BodyLen >> 8)
header[6] = byte(c.BodyLen)
header[7] = c.Typ
_, err := w.Write(header)
if err != nil {
return err
}
_, err = w.Write(m.Body)
_, err = w.Write(c.Body)
return err
}

6
internal/rtmp/base/chunk3.go

@ -16,14 +16,14 @@ type Chunk3 struct { @@ -16,14 +16,14 @@ type Chunk3 struct {
}
// Write writes the chunk.
func (m Chunk3) Write(w io.Writer) error {
func (c Chunk3) Write(w io.Writer) error {
header := make([]byte, 1)
header[0] = 3<<6 | m.ChunkStreamID
header[0] = 3<<6 | c.ChunkStreamID
_, err := w.Write(header)
if err != nil {
return err
}
_, err = w.Write(m.Body)
_, err = w.Write(c.Body)
return err
}

10
internal/rtmp/base/message.go

@ -0,0 +1,10 @@ @@ -0,0 +1,10 @@
package base
// Message is a message.
type Message struct {
ChunkStreamID byte
Timestamp uint32
Typ byte
MessageStreamID uint32
Body []byte
}

83
internal/rtmp/base/messagewriter.go

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
package base
import (
"io"
)
// MessageWriter is a message writer.
type MessageWriter struct {
w io.Writer
chunkMaxBodyLen int
lastMessageStreamIDPerChunkStreamID map[byte]uint32
}
// NewMessageWriter instantiates a MessageWriter.
func NewMessageWriter(w io.Writer) *MessageWriter {
return &MessageWriter{
w: w,
chunkMaxBodyLen: 128,
lastMessageStreamIDPerChunkStreamID: make(map[byte]uint32),
}
}
// SetChunkSize sets the chunk size.
func (mw *MessageWriter) SetChunkSize(v int) {
mw.chunkMaxBodyLen = v
}
// Write writes a Message.
func (mw *MessageWriter) Write(msg *Message) error {
bodyLen := len(msg.Body)
pos := 0
first := true
for {
chunkBodyLen := bodyLen - pos
if chunkBodyLen > mw.chunkMaxBodyLen {
chunkBodyLen = mw.chunkMaxBodyLen
}
if first {
first = false
if v, ok := mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID]; !ok || v != msg.MessageStreamID {
err := Chunk0{
ChunkStreamID: msg.ChunkStreamID,
Typ: msg.Typ,
MessageStreamID: msg.MessageStreamID,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w)
if err != nil {
return err
}
mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID] = msg.MessageStreamID
} else {
err := Chunk1{
ChunkStreamID: msg.ChunkStreamID,
Typ: msg.Typ,
BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w)
if err != nil {
return err
}
}
} else {
err := Chunk3{
ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w)
if err != nil {
return err
}
}
pos += chunkBodyLen
if (bodyLen - pos) == 0 {
return nil
}
}
}

172
internal/rtmp/conn_test.go

@ -134,6 +134,8 @@ func TestReadTracks(t *testing.T) { @@ -134,6 +134,8 @@ func TestReadTracks(t *testing.T) {
err = base.HandshakeC2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewMessageWriter(conn)
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
"connect",
@ -149,17 +151,11 @@ func TestReadTracks(t *testing.T) { @@ -149,17 +151,11 @@ func TestReadTracks(t *testing.T) {
{K: "videoFunction", V: 1},
},
})
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
BodyLen: uint32(len(byts)),
Body: byts[:128],
}.Write(conn)
require.NoError(t, err)
err = base.Chunk3{
ChunkStreamID: 3,
Body: byts[128:],
}.Write(conn)
Body: byts,
})
require.NoError(t, err)
// S->C window acknowledgement size
@ -216,16 +212,17 @@ func TestReadTracks(t *testing.T) { @@ -216,16 +212,17 @@ func TestReadTracks(t *testing.T) {
}, arr)
// C->S set chunk size
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 1,
BodyLen: 4,
Body: []byte{0x00, 0x01, 0x00, 0x00},
}.Write(conn)
})
require.NoError(t, err)
mw.SetChunkSize(65536)
// C->S releaseStream
err = base.Chunk1{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -234,11 +231,11 @@ func TestReadTracks(t *testing.T) { @@ -234,11 +231,11 @@ func TestReadTracks(t *testing.T) {
nil,
"",
}),
}.Write(conn)
})
require.NoError(t, err)
// C->S FCPublish
err = base.Chunk1{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -247,18 +244,19 @@ func TestReadTracks(t *testing.T) { @@ -247,18 +244,19 @@ func TestReadTracks(t *testing.T) {
nil,
"",
}),
}.Write(conn)
})
require.NoError(t, err)
// C->S createStream
err = base.Chunk3{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"createStream",
float64(4),
nil,
}),
}.Write(conn)
})
require.NoError(t, err)
// S->C result
@ -276,20 +274,18 @@ func TestReadTracks(t *testing.T) { @@ -276,20 +274,18 @@ func TestReadTracks(t *testing.T) {
}, arr)
// C->S publish
byts = flvio.FillAMF0ValsMalloc([]interface{}{
"publish",
float64(5),
nil,
"",
"live",
err = mw.Write(&base.Message{
ChunkStreamID: 8,
Typ: 0x14,
MessageStreamID: 1,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
"publish",
float64(5),
nil,
"",
"live",
}),
})
err = base.Chunk0{
ChunkStreamID: 8,
Typ: 0x14,
StreamID: 1,
BodyLen: uint32(len(byts)),
Body: byts,
}.Write(conn)
require.NoError(t, err)
// S->C onStatus
@ -335,13 +331,12 @@ func TestReadTracks(t *testing.T) { @@ -335,13 +331,12 @@ func TestReadTracks(t *testing.T) {
},
},
})
err = base.Chunk0{
ChunkStreamID: 4,
Typ: 0x12,
StreamID: 1,
BodyLen: uint32(len(byts)),
Body: byts,
}.Write(conn)
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: 0x12,
MessageStreamID: 1,
Body: byts,
})
require.NoError(t, err)
// C->S H264 decoder config
@ -357,13 +352,12 @@ func TestReadTracks(t *testing.T) { @@ -357,13 +352,12 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = base.Chunk0{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
StreamID: 1,
BodyLen: uint32(len(body)),
Body: body,
}.Write(conn)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
require.NoError(t, err)
// C->S AAC decoder config
@ -373,16 +367,15 @@ func TestReadTracks(t *testing.T) { @@ -373,16 +367,15 @@ func TestReadTracks(t *testing.T) {
ChannelCount: 2,
}.Encode()
require.NoError(t, err)
err = base.Chunk0{
ChunkStreamID: 4,
Typ: flvio.TAG_AUDIO,
StreamID: 1,
BodyLen: uint32(len(enc) + 2),
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: flvio.TAG_AUDIO,
MessageStreamID: 1,
Body: append([]byte{
flvio.SOUND_AAC<<4 | flvio.SOUND_44Khz<<2 | flvio.SOUND_16BIT<<1 | flvio.SOUND_STEREO,
flvio.AAC_SEQHDR,
}, enc...),
}.Write(conn)
})
require.NoError(t, err)
case "metadata without codec id":
@ -405,13 +398,12 @@ func TestReadTracks(t *testing.T) { @@ -405,13 +398,12 @@ func TestReadTracks(t *testing.T) {
},
},
})
err = base.Chunk0{
ChunkStreamID: 4,
Typ: 0x12,
StreamID: 1,
BodyLen: uint32(len(byts)),
Body: byts,
}.Write(conn)
err = mw.Write(&base.Message{
ChunkStreamID: 4,
Typ: 0x12,
MessageStreamID: 1,
Body: byts,
})
require.NoError(t, err)
// C->S H264 decoder config
@ -427,13 +419,12 @@ func TestReadTracks(t *testing.T) { @@ -427,13 +419,12 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = base.Chunk0{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
StreamID: 1,
BodyLen: uint32(len(body)),
Body: body,
}.Write(conn)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
require.NoError(t, err)
case "no metadata":
@ -450,13 +441,12 @@ func TestReadTracks(t *testing.T) { @@ -450,13 +441,12 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = base.Chunk0{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
StreamID: 1,
BodyLen: uint32(len(body)),
Body: body,
}.Write(conn)
err = mw.Write(&base.Message{
ChunkStreamID: 6,
Typ: flvio.TAG_VIDEO,
MessageStreamID: 1,
Body: body,
})
require.NoError(t, err)
}
@ -523,6 +513,8 @@ func TestWriteTracks(t *testing.T) { @@ -523,6 +513,8 @@ func TestWriteTracks(t *testing.T) {
err = base.HandshakeC2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewMessageWriter(conn)
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
"connect",
@ -538,17 +530,11 @@ func TestWriteTracks(t *testing.T) { @@ -538,17 +530,11 @@ func TestWriteTracks(t *testing.T) {
{K: "videoFunction", V: 1},
},
})
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
BodyLen: uint32(len(byts)),
Body: byts[:128],
}.Write(conn)
require.NoError(t, err)
err = base.Chunk3{
ChunkStreamID: 3,
Body: byts[128:],
}.Write(conn)
Body: byts,
})
require.NoError(t, err)
// S->C window acknowledgement size
@ -605,25 +591,23 @@ func TestWriteTracks(t *testing.T) { @@ -605,25 +591,23 @@ func TestWriteTracks(t *testing.T) {
}, arr)
// C->S window acknowledgement size
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 0x05,
BodyLen: 4,
Body: []byte{0x00, 0x26, 0x25, 0xa0},
}.Write(conn)
})
require.NoError(t, err)
// C->S set chunk size
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 2,
Typ: 1,
BodyLen: 4,
Body: []byte{0x00, 0x01, 0x00, 0x00},
}.Write(conn)
})
require.NoError(t, err)
// C->S createStream
err = base.Chunk1{
err = mw.Write(&base.Message{
ChunkStreamID: 3,
Typ: 0x14,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -631,7 +615,7 @@ func TestWriteTracks(t *testing.T) { @@ -631,7 +615,7 @@ func TestWriteTracks(t *testing.T) {
float64(2),
nil,
}),
}.Write(conn)
})
require.NoError(t, err)
// S->C result
@ -655,11 +639,10 @@ func TestWriteTracks(t *testing.T) { @@ -655,11 +639,10 @@ func TestWriteTracks(t *testing.T) {
nil,
"",
})
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 8,
BodyLen: uint32(len(byts)),
Body: byts,
}.Write(conn)
})
require.NoError(t, err)
// C->S play
@ -670,12 +653,11 @@ func TestWriteTracks(t *testing.T) { @@ -670,12 +653,11 @@ func TestWriteTracks(t *testing.T) {
"",
float64(-2000),
})
err = base.Chunk0{
err = mw.Write(&base.Message{
ChunkStreamID: 8,
Typ: 0x14,
BodyLen: uint32(len(byts)),
Body: byts,
}.Write(conn)
})
require.NoError(t, err)
// S->C event "stream is recorded"

Loading…
Cancel
Save