Browse Source

rtmp: split MessageWriter into single handlers per chunk stream

pull/956/head
aler9 4 years ago
parent
commit
420b6b21c4
  1. 78
      internal/rtmp/base/messagewriter.go

78
internal/rtmp/base/messagewriter.go

@ -4,62 +4,46 @@ import (
"io" "io"
) )
// MessageWriter is a message writer. type messageWriterChunkStream struct {
type MessageWriter struct { mw *MessageWriter
w io.Writer lastMessageStreamID *uint32
chunkMaxBodyLen int
lastMessageStreamIDPerChunkStreamID map[byte]uint32
}
// NewMessageWriter instantiates a MessageWriter.
func NewMessageWriter(w io.Writer) *MessageWriter {
return &MessageWriter{
w: w,
chunkMaxBodyLen: 128,
lastMessageStreamIDPerChunkStreamID: make(map[byte]uint32),
}
} }
// SetChunkSize sets the chunk size. func (wc *messageWriterChunkStream) write(msg *Message) error {
func (mw *MessageWriter) SetChunkSize(v int) {
mw.chunkMaxBodyLen = v
}
// Write writes a Message.
func (mw *MessageWriter) Write(msg *Message) error {
bodyLen := len(msg.Body) bodyLen := len(msg.Body)
pos := 0 pos := 0
first := true firstChunk := true
for { for {
chunkBodyLen := bodyLen - pos chunkBodyLen := bodyLen - pos
if chunkBodyLen > mw.chunkMaxBodyLen { if chunkBodyLen > wc.mw.chunkSize {
chunkBodyLen = mw.chunkMaxBodyLen chunkBodyLen = wc.mw.chunkSize
} }
if first { if firstChunk {
first = false firstChunk = false
if v, ok := mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID]; !ok || v != msg.MessageStreamID { if wc.lastMessageStreamID == nil || *wc.lastMessageStreamID != msg.MessageStreamID {
err := Chunk0{ err := Chunk0{
ChunkStreamID: msg.ChunkStreamID, ChunkStreamID: msg.ChunkStreamID,
Type: msg.Type, Type: msg.Type,
MessageStreamID: msg.MessageStreamID, MessageStreamID: msg.MessageStreamID,
BodyLen: uint32(bodyLen), BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen], Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w) }.Write(wc.mw.w)
if err != nil { if err != nil {
return err return err
} }
mw.lastMessageStreamIDPerChunkStreamID[msg.ChunkStreamID] = msg.MessageStreamID v := msg.MessageStreamID
wc.lastMessageStreamID = &v
} else { } else {
err := Chunk1{ err := Chunk1{
ChunkStreamID: msg.ChunkStreamID, ChunkStreamID: msg.ChunkStreamID,
Type: msg.Type, Type: msg.Type,
BodyLen: uint32(bodyLen), BodyLen: uint32(bodyLen),
Body: msg.Body[pos : pos+chunkBodyLen], Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w) }.Write(wc.mw.w)
if err != nil { if err != nil {
return err return err
} }
@ -68,7 +52,7 @@ func (mw *MessageWriter) Write(msg *Message) error {
err := Chunk3{ err := Chunk3{
ChunkStreamID: msg.ChunkStreamID, ChunkStreamID: msg.ChunkStreamID,
Body: msg.Body[pos : pos+chunkBodyLen], Body: msg.Body[pos : pos+chunkBodyLen],
}.Write(mw.w) }.Write(wc.mw.w)
if err != nil { if err != nil {
return err return err
} }
@ -81,3 +65,35 @@ func (mw *MessageWriter) Write(msg *Message) error {
} }
} }
} }
// MessageWriter is a message writer.
type MessageWriter struct {
w io.Writer
chunkSize int
chunkStreams map[byte]*messageWriterChunkStream
}
// NewMessageWriter instantiates a MessageWriter.
func NewMessageWriter(w io.Writer) *MessageWriter {
return &MessageWriter{
w: w,
chunkSize: 128,
chunkStreams: make(map[byte]*messageWriterChunkStream),
}
}
// SetChunkSize sets the maximum chunk size.
func (mw *MessageWriter) SetChunkSize(v int) {
mw.chunkSize = v
}
// Write writes a Message.
func (mw *MessageWriter) Write(msg *Message) error {
cs, ok := mw.chunkStreams[msg.ChunkStreamID]
if !ok {
cs = &messageWriterChunkStream{mw: mw}
mw.chunkStreams[msg.ChunkStreamID] = cs
}
return cs.write(msg)
}

Loading…
Cancel
Save