Browse Source

rtmp: rename Message into RawMessage

pull/1003/head
aler9 3 years ago
parent
commit
76e47686b2
  1. 5
      internal/rtmp/base/messagetype.go
  2. 4
      internal/rtmp/base/rawmessage.go
  3. 34
      internal/rtmp/base/rawmessagereader.go
  4. 26
      internal/rtmp/base/rawmessagewriter.go
  5. 60
      internal/rtmp/conn_test.go

5
internal/rtmp/base/messagetype.go

@ -13,11 +13,12 @@ const ( @@ -13,11 +13,12 @@ const (
MessageTypeUserControl MessageType = 4
MessageTypeDataAMF3 MessageType = 15
MessageTypeCommandAMF3 MessageType = 17
MessageTypeDataAMF0 MessageType = 18
MessageTypeCommandAMF0 MessageType = 20
MessageTypeDataAMF3 MessageType = 15
MessageTypeDataAMF0 MessageType = 18
MessageTypeAudio MessageType = 8
MessageTypeVideo MessageType = 9
)

4
internal/rtmp/base/message.go → internal/rtmp/base/rawmessage.go

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
package base
// Message is a message.
type Message struct {
// RawMessage is a message.
type RawMessage struct {
ChunkStreamID byte
Timestamp uint32
Type MessageType

34
internal/rtmp/base/messagereader.go → internal/rtmp/base/rawmessagereader.go

@ -8,8 +8,8 @@ import ( @@ -8,8 +8,8 @@ import (
var errMoreChunksNeeded = errors.New("more chunks are needed")
type messageReaderChunkStream struct {
mr *MessageReader
type rawRawMessageReaderChunkStream struct {
mr *RawMessageReader
curTimestamp *uint32
curType *MessageType
curMessageStreamID *uint32
@ -17,7 +17,7 @@ type messageReaderChunkStream struct { @@ -17,7 +17,7 @@ type messageReaderChunkStream struct {
curBody *[]byte
}
func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
func (rc *rawRawMessageReaderChunkStream) read(typ byte) (*RawMessage, error) {
switch typ {
case 0:
if rc.curBody != nil {
@ -44,7 +44,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) { @@ -44,7 +44,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
return nil, errMoreChunksNeeded
}
return &Message{
return &RawMessage{
Timestamp: c0.Timestamp,
Type: c0.Type,
MessageStreamID: c0.MessageStreamID,
@ -78,7 +78,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) { @@ -78,7 +78,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
return nil, errMoreChunksNeeded
}
return &Message{
return &RawMessage{
Timestamp: *rc.curTimestamp + c1.TimestampDelta,
Type: c1.Type,
MessageStreamID: *rc.curMessageStreamID,
@ -113,7 +113,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) { @@ -113,7 +113,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
return nil, errMoreChunksNeeded
}
return &Message{
return &RawMessage{
Timestamp: *rc.curTimestamp + c2.TimestampDelta,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
@ -149,7 +149,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) { @@ -149,7 +149,7 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
body := *rc.curBody
rc.curBody = nil
return &Message{
return &RawMessage{
Timestamp: *rc.curTimestamp,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
@ -158,28 +158,28 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) { @@ -158,28 +158,28 @@ func (rc *messageReaderChunkStream) read(typ byte) (*Message, error) {
}
}
// MessageReader is a message reader.
type MessageReader struct {
// RawMessageReader is a message reader.
type RawMessageReader struct {
r *bufio.Reader
chunkSize int
chunkStreams map[byte]*messageReaderChunkStream
chunkStreams map[byte]*rawRawMessageReaderChunkStream
}
// NewMessageReader allocates a MessageReader.
func NewMessageReader(r *bufio.Reader) *MessageReader {
return &MessageReader{
// NewRawMessageReader allocates a RawMessageReader.
func NewRawMessageReader(r *bufio.Reader) *RawMessageReader {
return &RawMessageReader{
r: r,
chunkSize: 128,
chunkStreams: make(map[byte]*messageReaderChunkStream),
chunkStreams: make(map[byte]*rawRawMessageReaderChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mr *MessageReader) SetChunkSize(v int) {
func (mr *RawMessageReader) SetChunkSize(v int) {
mr.chunkSize = v
}
func (mr *MessageReader) Read() (*Message, error) {
func (mr *RawMessageReader) Read() (*RawMessage, error) {
for {
byt, err := mr.r.ReadByte()
if err != nil {
@ -191,7 +191,7 @@ func (mr *MessageReader) Read() (*Message, error) { @@ -191,7 +191,7 @@ func (mr *MessageReader) Read() (*Message, error) {
rc, ok := mr.chunkStreams[chunkStreamID]
if !ok {
rc = &messageReaderChunkStream{mr: mr}
rc = &rawRawMessageReaderChunkStream{mr: mr}
mr.chunkStreams[chunkStreamID] = rc
}

26
internal/rtmp/base/messagewriter.go → internal/rtmp/base/rawmessagewriter.go

@ -4,8 +4,8 @@ import ( @@ -4,8 +4,8 @@ import (
"io"
)
type messageWriterChunkStream struct {
mw *MessageWriter
type rawMessageWriterChunkStream struct {
mw *RawMessageWriter
lastMessageStreamID *uint32
lastType *MessageType
lastBodyLen *int
@ -13,7 +13,7 @@ type messageWriterChunkStream struct { @@ -13,7 +13,7 @@ type messageWriterChunkStream struct {
lastTimestampDelta *uint32
}
func (wc *messageWriterChunkStream) write(msg *Message) error {
func (wc *rawMessageWriterChunkStream) write(msg *RawMessage) error {
bodyLen := len(msg.Body)
pos := 0
firstChunk := true
@ -115,32 +115,32 @@ func (wc *messageWriterChunkStream) write(msg *Message) error { @@ -115,32 +115,32 @@ func (wc *messageWriterChunkStream) write(msg *Message) error {
}
}
// MessageWriter is a message writer.
type MessageWriter struct {
// RawMessageWriter is a message writer.
type RawMessageWriter struct {
w io.Writer
chunkSize int
chunkStreams map[byte]*messageWriterChunkStream
chunkStreams map[byte]*rawMessageWriterChunkStream
}
// NewMessageWriter allocates a MessageWriter.
func NewMessageWriter(w io.Writer) *MessageWriter {
return &MessageWriter{
// NewRawMessageWriter allocates a RawMessageWriter.
func NewRawMessageWriter(w io.Writer) *RawMessageWriter {
return &RawMessageWriter{
w: w,
chunkSize: 128,
chunkStreams: make(map[byte]*messageWriterChunkStream),
chunkStreams: make(map[byte]*rawMessageWriterChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mw *MessageWriter) SetChunkSize(v int) {
func (mw *RawMessageWriter) SetChunkSize(v int) {
mw.chunkSize = v
}
// Write writes a Message.
func (mw *MessageWriter) Write(msg *Message) error {
func (mw *RawMessageWriter) Write(msg *RawMessage) error {
wc, ok := mw.chunkStreams[msg.ChunkStreamID]
if !ok {
wc = &messageWriterChunkStream{mw: mw}
wc = &rawMessageWriterChunkStream{mw: mw}
mw.chunkStreams[msg.ChunkStreamID] = wc
}

60
internal/rtmp/conn_test.go

@ -135,8 +135,8 @@ func TestReadTracks(t *testing.T) { @@ -135,8 +135,8 @@ func TestReadTracks(t *testing.T) {
err = base.HandshakeC2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewMessageWriter(conn)
mr := base.NewMessageReader(bufio.NewReader(conn))
mw := base.NewRawMessageWriter(conn)
mr := base.NewRawMessageReader(bufio.NewReader(conn))
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
@ -153,7 +153,7 @@ func TestReadTracks(t *testing.T) { @@ -153,7 +153,7 @@ func TestReadTracks(t *testing.T) {
{K: "videoFunction", V: 1},
},
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: byts,
@ -163,7 +163,7 @@ func TestReadTracks(t *testing.T) { @@ -163,7 +163,7 @@ func TestReadTracks(t *testing.T) {
// S->C window acknowledgement size
msg, err := mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 38, 37, 160},
@ -172,7 +172,7 @@ func TestReadTracks(t *testing.T) { @@ -172,7 +172,7 @@ func TestReadTracks(t *testing.T) {
// S->C set peer bandwidth
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
@ -181,7 +181,7 @@ func TestReadTracks(t *testing.T) { @@ -181,7 +181,7 @@ func TestReadTracks(t *testing.T) {
// S->C set chunk size
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
@ -212,7 +212,7 @@ func TestReadTracks(t *testing.T) { @@ -212,7 +212,7 @@ func TestReadTracks(t *testing.T) {
}, arr)
// C->S set chunk size
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
@ -222,7 +222,7 @@ func TestReadTracks(t *testing.T) { @@ -222,7 +222,7 @@ func TestReadTracks(t *testing.T) {
mw.SetChunkSize(65536)
// C->S releaseStream
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -235,7 +235,7 @@ func TestReadTracks(t *testing.T) { @@ -235,7 +235,7 @@ func TestReadTracks(t *testing.T) {
require.NoError(t, err)
// C->S FCPublish
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -248,7 +248,7 @@ func TestReadTracks(t *testing.T) { @@ -248,7 +248,7 @@ func TestReadTracks(t *testing.T) {
require.NoError(t, err)
// C->S createStream
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -274,7 +274,7 @@ func TestReadTracks(t *testing.T) { @@ -274,7 +274,7 @@ func TestReadTracks(t *testing.T) {
}, arr)
// C->S publish
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 8,
Type: base.MessageTypeCommandAMF0,
MessageStreamID: 1,
@ -331,7 +331,7 @@ func TestReadTracks(t *testing.T) { @@ -331,7 +331,7 @@ func TestReadTracks(t *testing.T) {
},
},
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 4,
Type: base.MessageTypeDataAMF0,
MessageStreamID: 1,
@ -352,7 +352,7 @@ func TestReadTracks(t *testing.T) { @@ -352,7 +352,7 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
@ -367,7 +367,7 @@ func TestReadTracks(t *testing.T) { @@ -367,7 +367,7 @@ func TestReadTracks(t *testing.T) {
ChannelCount: 2,
}.Encode()
require.NoError(t, err)
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 4,
Type: base.MessageTypeAudio,
MessageStreamID: 1,
@ -398,7 +398,7 @@ func TestReadTracks(t *testing.T) { @@ -398,7 +398,7 @@ func TestReadTracks(t *testing.T) {
},
},
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 4,
Type: base.MessageTypeDataAMF0,
MessageStreamID: 1,
@ -419,7 +419,7 @@ func TestReadTracks(t *testing.T) { @@ -419,7 +419,7 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
@ -441,7 +441,7 @@ func TestReadTracks(t *testing.T) { @@ -441,7 +441,7 @@ func TestReadTracks(t *testing.T) {
var n int
codec.ToConfig(b, &n)
body := append([]byte{flvio.FRAME_KEY<<4 | flvio.VIDEO_H264, 0, 0, 0, 0}, b[:n]...)
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 6,
Type: base.MessageTypeVideo,
MessageStreamID: 1,
@ -513,8 +513,8 @@ func TestWriteTracks(t *testing.T) { @@ -513,8 +513,8 @@ func TestWriteTracks(t *testing.T) {
err = base.HandshakeC2{}.Write(conn, s1s2)
require.NoError(t, err)
mw := base.NewMessageWriter(conn)
mr := base.NewMessageReader(bufio.NewReader(conn))
mw := base.NewRawMessageWriter(conn)
mr := base.NewRawMessageReader(bufio.NewReader(conn))
// C->S connect
byts := flvio.FillAMF0ValsMalloc([]interface{}{
@ -531,7 +531,7 @@ func TestWriteTracks(t *testing.T) { @@ -531,7 +531,7 @@ func TestWriteTracks(t *testing.T) {
{K: "videoFunction", V: 1},
},
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: byts,
@ -541,7 +541,7 @@ func TestWriteTracks(t *testing.T) { @@ -541,7 +541,7 @@ func TestWriteTracks(t *testing.T) {
// S->C window acknowledgement size
msg, err := mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 38, 37, 160},
@ -550,7 +550,7 @@ func TestWriteTracks(t *testing.T) { @@ -550,7 +550,7 @@ func TestWriteTracks(t *testing.T) {
// S->C set peer bandwidth
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetPeerBandwidth,
Body: []byte{0x00, 0x26, 0x25, 0xa0, 0x02},
@ -559,7 +559,7 @@ func TestWriteTracks(t *testing.T) { @@ -559,7 +559,7 @@ func TestWriteTracks(t *testing.T) {
// S->C set chunk size
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
@ -590,7 +590,7 @@ func TestWriteTracks(t *testing.T) { @@ -590,7 +590,7 @@ func TestWriteTracks(t *testing.T) {
}, arr)
// C->S window acknowledgement size
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetWindowAckSize,
Body: []byte{0x00, 0x26, 0x25, 0xa0},
@ -598,7 +598,7 @@ func TestWriteTracks(t *testing.T) { @@ -598,7 +598,7 @@ func TestWriteTracks(t *testing.T) {
require.NoError(t, err)
// C->S set chunk size
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeSetChunkSize,
Body: []byte{0x00, 0x01, 0x00, 0x00},
@ -608,7 +608,7 @@ func TestWriteTracks(t *testing.T) { @@ -608,7 +608,7 @@ func TestWriteTracks(t *testing.T) {
mw.SetChunkSize(65536)
// C->S createStream
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 3,
Type: base.MessageTypeCommandAMF0,
Body: flvio.FillAMF0ValsMalloc([]interface{}{
@ -640,7 +640,7 @@ func TestWriteTracks(t *testing.T) { @@ -640,7 +640,7 @@ func TestWriteTracks(t *testing.T) {
nil,
"",
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 8,
Body: byts,
})
@ -654,7 +654,7 @@ func TestWriteTracks(t *testing.T) { @@ -654,7 +654,7 @@ func TestWriteTracks(t *testing.T) {
"",
float64(-2000),
})
err = mw.Write(&base.Message{
err = mw.Write(&base.RawMessage{
ChunkStreamID: 8,
Type: base.MessageTypeCommandAMF0,
Body: byts,
@ -664,7 +664,7 @@ func TestWriteTracks(t *testing.T) { @@ -664,7 +664,7 @@ func TestWriteTracks(t *testing.T) {
// S->C event "stream is recorded"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeUserControl,
Body: []byte{0x00, 0x04, 0x00, 0x00, 0x00, 0x01},
@ -673,7 +673,7 @@ func TestWriteTracks(t *testing.T) { @@ -673,7 +673,7 @@ func TestWriteTracks(t *testing.T) {
// S->C event "stream begin 1"
msg, err = mr.Read()
require.NoError(t, err)
require.Equal(t, &base.Message{
require.Equal(t, &base.RawMessage{
ChunkStreamID: base.ControlChunkStreamID,
Type: base.MessageTypeUserControl,
Body: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x01},

Loading…
Cancel
Save