Browse Source

rtmp: improve tests

pull/1060/head
aler9 3 years ago
parent
commit
772c5b2363
  1. 5
      internal/rtmp/bytecounter/reader.go
  2. 4
      internal/rtmp/bytecounter/reader_test.go
  3. 5
      internal/rtmp/bytecounter/writer.go
  4. 5
      internal/rtmp/bytecounter/writer_test.go
  5. 1
      internal/rtmp/rawmessage/reader.go
  6. 60
      internal/rtmp/rawmessage/reader_test.go
  7. 19
      internal/rtmp/rawmessage/writer.go
  8. 53
      internal/rtmp/rawmessage/writer_test.go

5
internal/rtmp/bytecounter/reader.go

@ -35,3 +35,8 @@ func NewReader(r io.Reader) *Reader { @@ -35,3 +35,8 @@ func NewReader(r io.Reader) *Reader {
func (r Reader) Count() uint32 {
return r.ri.count
}
// SetCount sets read bytes.
func (r *Reader) SetCount(v uint32) {
r.ri.count = v
}

4
internal/rtmp/bytecounter/reader_test.go

@ -12,10 +12,12 @@ func TestReader(t *testing.T) { @@ -12,10 +12,12 @@ func TestReader(t *testing.T) {
buf.Write(bytes.Repeat([]byte{0x01}, 1024))
r := NewReader(&buf)
r.SetCount(100)
buf2 := make([]byte, 64)
n, err := r.Read(buf2)
require.NoError(t, err)
require.Equal(t, 64, n)
require.Equal(t, uint32(1024), r.Count())
require.Equal(t, uint32(100+1024), r.Count())
}

5
internal/rtmp/bytecounter/writer.go

@ -28,3 +28,8 @@ func (w *Writer) Write(p []byte) (int, error) { @@ -28,3 +28,8 @@ func (w *Writer) Write(p []byte) (int, error) {
func (w Writer) Count() uint32 {
return w.count
}
// SetCount sets written bytes.
func (w *Writer) SetCount(v uint32) {
w.count = v
}

5
internal/rtmp/bytecounter/writer_test.go

@ -9,7 +9,10 @@ import ( @@ -9,7 +9,10 @@ import (
func TestWriter(t *testing.T) {
var buf bytes.Buffer
w := NewWriter(&buf)
w.SetCount(100)
w.Write(bytes.Repeat([]byte{0x01}, 64))
require.Equal(t, uint32(64), w.Count())
require.Equal(t, uint32(100+64), w.Count())
}

1
internal/rtmp/rawmessage/reader.go

@ -30,7 +30,6 @@ func (rc *readerChunkStream) readChunk(c chunk.Chunk, chunkBodySize uint32) erro @@ -30,7 +30,6 @@ func (rc *readerChunkStream) readChunk(c chunk.Chunk, chunkBodySize uint32) erro
if rc.mr.ackWindowSize != 0 {
count := rc.mr.r.Count()
diff := count - rc.mr.lastAckCount
// TODO: handle overflow
if diff > (rc.mr.ackWindowSize) {
err := rc.mr.onAckNeeded(count)

60
internal/rtmp/rawmessage/reader_test.go

@ -158,34 +158,40 @@ func TestReader(t *testing.T) { @@ -158,34 +158,40 @@ func TestReader(t *testing.T) {
}
func TestReaderAcknowledge(t *testing.T) {
onAckCalled := make(chan struct{})
var buf bytes.Buffer
bcr := bytecounter.NewReader(&buf)
r := NewReader(bcr, func(count uint32) error {
close(onAckCalled)
return nil
})
r.SetWindowAckSize(100)
for _, ca := range []string{"standard", "overflow"} {
t.Run(ca, func(t *testing.T) {
onAckCalled := make(chan struct{})
var buf bytes.Buffer
bcr := bytecounter.NewReader(&buf)
r := NewReader(bcr, func(count uint32) error {
close(onAckCalled)
return nil
})
if ca == "overflow" {
bcr.SetCount(4294967096)
r.lastAckCount = 4294967096
}
r.SetChunkSize(65536)
r.SetWindowAckSize(100)
buf2, err := chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 200,
Body: bytes.Repeat([]byte{0x03}, 200),
}.Marshal()
require.NoError(t, err)
buf.Write(buf2)
for i := 0; i < 2; i++ {
buf2, err := chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
BodyLen: 64,
Body: bytes.Repeat([]byte{0x03}, 64),
}.Marshal()
require.NoError(t, err)
buf.Write(buf2)
}
_, err = r.Read()
require.NoError(t, err)
for i := 0; i < 2; i++ {
_, err := r.Read()
require.NoError(t, err)
<-onAckCalled
})
}
<-onAckCalled
}

19
internal/rtmp/rawmessage/writer.go

@ -17,6 +17,15 @@ type writerChunkStream struct { @@ -17,6 +17,15 @@ type writerChunkStream struct {
}
func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error {
// check if we received an acknowledge
if wc.mw.ackWindowSize != 0 {
diff := wc.mw.w.Count() - (wc.mw.ackValue)
if diff > (wc.mw.ackWindowSize * 3 / 2) {
return fmt.Errorf("no acknowledge received within window")
}
}
buf, err := c.Marshal()
if err != nil {
return err
@ -27,16 +36,6 @@ func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error { @@ -27,16 +36,6 @@ func (wc *writerChunkStream) writeChunk(c chunk.Chunk) error {
return err
}
// check if we received an acknowledge
if wc.mw.ackWindowSize != 0 {
diff := wc.mw.w.Count() - (wc.mw.ackValue)
// TODO: handle overflow
if diff > (wc.mw.ackWindowSize * 3 / 2) {
return fmt.Errorf("no acknowledge received within window")
}
}
return nil
}

53
internal/rtmp/rawmessage/writer_test.go

@ -153,28 +153,37 @@ func TestWriter(t *testing.T) { @@ -153,28 +153,37 @@ func TestWriter(t *testing.T) {
}
func TestWriterAcknowledge(t *testing.T) {
var buf bytes.Buffer
w := NewWriter(bytecounter.NewWriter(&buf))
w.SetWindowAckSize(100)
for i := 0; i < 2; i++ {
err := w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
for _, ca := range []string{"standard", "overflow"} {
t.Run(ca, func(t *testing.T) {
var buf bytes.Buffer
bcw := bytecounter.NewWriter(&buf)
w := NewWriter(bcw)
if ca == "overflow" {
bcw.SetCount(4294967096)
w.ackValue = 4294967096
}
w.SetChunkSize(65536)
w.SetWindowAckSize(100)
err := w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 200),
})
require.NoError(t, err)
err = w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 200),
})
require.EqualError(t, err, "no acknowledge received within window")
})
require.NoError(t, err)
}
err := w.Write(&Message{
ChunkStreamID: 27,
Timestamp: 18576,
Type: chunk.MessageTypeSetPeerBandwidth,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{0x03}, 64),
})
require.EqualError(t, err, "no acknowledge received within window")
}

Loading…
Cancel
Save