Browse Source

rtmp: improve tests

pull/1060/head
aler9 4 years ago
parent
commit
2a0567ab4a
  1. 775
      internal/rtmp/conn_test.go

775
internal/rtmp/conn_test.go

@ -49,372 +49,509 @@ func getTcURL(u string) string {
} }
func TestClientHandshake(t *testing.T) { func TestClientHandshake(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:9121") for _, ca := range []string{"read", "publish"} {
require.NoError(t, err) t.Run(ca, func(t *testing.T) {
defer ln.Close() ln, err := net.Listen("tcp", "127.0.0.1:9121")
require.NoError(t, err)
done := make(chan struct{}) defer ln.Close()
go func() { done := make(chan struct{})
conn, err := ln.Accept()
require.NoError(t, err)
defer conn.Close()
bc := bytecounter.NewReadWriter(conn)
// C->S handshake C0 go func() {
err = handshake.C0S0{}.Read(bc) conn, err := ln.Accept()
require.NoError(t, err) require.NoError(t, err)
defer conn.Close()
bc := bytecounter.NewReadWriter(conn)
// C->S handshake C1 // C->S handshake C0
c1 := handshake.C1S1{} err = handshake.C0S0{}.Read(bc)
err = c1.Read(bc, true) require.NoError(t, err)
require.NoError(t, err)
// S->C handshake S0 // C->S handshake C1
err = handshake.C0S0{}.Write(bc) c1 := handshake.C1S1{}
require.NoError(t, err) err = c1.Read(bc, true)
require.NoError(t, err)
// S->C handshake S1 // S->C handshake S0
s1 := handshake.C1S1{} err = handshake.C0S0{}.Write(bc)
err = s1.Write(bc, false) require.NoError(t, err)
require.NoError(t, err)
// S->C handshake S2 // S->C handshake S1
err = handshake.C2S2{Digest: c1.Digest}.Write(bc) s1 := handshake.C1S1{}
require.NoError(t, err) err = s1.Write(bc, false)
require.NoError(t, err)
// C->S handshake C2 // S->C handshake S2
err = (&handshake.C2S2{Digest: s1.Digest}).Read(bc) err = handshake.C2S2{Digest: c1.Digest}.Write(bc)
require.NoError(t, err) require.NoError(t, err)
mrw := message.NewReadWriter(bc) // C->S handshake C2
err = (&handshake.C2S2{Digest: s1.Digest}).Read(bc)
require.NoError(t, err)
// C->S set window ack size mrw := message.NewReadWriter(bc)
msg, err := mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgSetWindowAckSize{
Value: 2500000,
}, msg)
// C->S set peer bandwidth // C->S set window ack size
msg, err = mrw.Read() msg, err := mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgSetPeerBandwidth{ require.Equal(t, &message.MsgSetWindowAckSize{
Value: 0x2625a0, Value: 2500000,
Type: 2, }, msg)
}, msg)
// C->S set chunk size // C->S set peer bandwidth
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgSetChunkSize{ require.Equal(t, &message.MsgSetPeerBandwidth{
Value: 65536, Value: 0x2625a0,
}, msg) Type: 2,
}, msg)
// C->S connect // C->S set chunk size
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{ require.Equal(t, &message.MsgSetChunkSize{
ChunkStreamID: 3, Value: 65536,
Payload: []interface{}{ }, msg)
"connect",
float64(1),
flvio.AMFMap{
{K: "app", V: "stream"},
{K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false},
{K: "capabilities", V: float64(15)},
{K: "audioCodecs", V: float64(4071)},
{K: "videoCodecs", V: float64(252)},
{K: "videoFunction", V: float64(1)},
},
},
}, msg)
// S->C result
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
},
})
require.NoError(t, err)
// C->S create stream // C->S connect
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{ require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3, ChunkStreamID: 3,
Payload: []interface{}{ Payload: []interface{}{
"createStream", "connect",
float64(2), float64(1),
nil, flvio.AMFMap{
}, {K: "app", V: "stream"},
}, msg) {K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
// S->C result {K: "fpad", V: false},
err = mrw.Write(&message.MsgCommandAMF0{ {K: "capabilities", V: float64(15)},
ChunkStreamID: 3, {K: "audioCodecs", V: float64(4071)},
Payload: []interface{}{ {K: "videoCodecs", V: float64(252)},
"_result", {K: "videoFunction", V: float64(1)},
float64(2), },
nil, },
float64(1), }, msg)
},
})
require.NoError(t, err)
// C->S user control set buffer length // S->C result
msg, err = mrw.Read() err = mrw.Write(&message.MsgCommandAMF0{
require.NoError(t, err) ChunkStreamID: 3,
require.Equal(t, &message.MsgUserControlSetBufferLength{ Payload: []interface{}{
BufferLength: 0x64, "_result",
}, msg) float64(1),
flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)},
},
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)},
},
},
})
require.NoError(t, err)
// C->S play if ca == "read" {
msg, err = mrw.Read() // C->S create stream
require.NoError(t, err) msg, err = mrw.Read()
require.Equal(t, &message.MsgCommandAMF0{ require.NoError(t, err)
ChunkStreamID: 4, require.Equal(t, &message.MsgCommandAMF0{
MessageStreamID: 16777216, ChunkStreamID: 3,
Payload: []interface{}{ Payload: []interface{}{
"play", "createStream",
float64(0), float64(2),
nil, nil,
"", },
}, }, msg)
}, msg)
// S->C result
// S->C onStatus err = mrw.Write(&message.MsgCommandAMF0{
err = mrw.Write(&message.MsgCommandAMF0{ ChunkStreamID: 3,
ChunkStreamID: 5, Payload: []interface{}{
MessageStreamID: 16777216, "_result",
Payload: []interface{}{ float64(2),
"onStatus", nil,
float64(4), float64(1),
nil, },
flvio.AMFMap{ })
{K: "level", V: "status"}, require.NoError(t, err)
{K: "code", V: "NetStream.Play.Reset"},
{K: "description", V: "play reset"}, // C->S user control set buffer length
}, msg, err = mrw.Read()
}, require.NoError(t, err)
}) require.Equal(t, &message.MsgUserControlSetBufferLength{
require.NoError(t, err) BufferLength: 0x64,
}, msg)
// C->S play
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 4,
MessageStreamID: 16777216,
Payload: []interface{}{
"play",
float64(0),
nil,
"",
},
}, msg)
// S->C onStatus
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(4),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Play.Reset"},
{K: "description", V: "play reset"},
},
},
})
require.NoError(t, err)
} else {
// C->S releaseStream
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"releaseStream",
float64(2),
nil,
"",
},
}, msg)
// C->S FCPublish
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"FCPublish",
float64(3),
nil,
"",
},
}, msg)
// C->S createStream
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"createStream",
float64(4),
nil,
},
}, msg)
// S->C result
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(4),
nil,
float64(1),
},
})
require.NoError(t, err)
// C->S publish
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 4,
MessageStreamID: 16777216,
Payload: []interface{}{
"publish",
float64(5),
nil,
"",
"stream",
},
}, msg)
// S->C onStatus
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 5,
MessageStreamID: 16777216,
Payload: []interface{}{
"onStatus",
float64(5),
nil,
flvio.AMFMap{
{K: "level", V: "status"},
{K: "code", V: "NetStream.Publish.Start"},
{K: "description", V: "publish start"},
},
},
})
require.NoError(t, err)
}
close(done) close(done)
}() }()
u, err := url.Parse("rtmp://127.0.0.1:9121/stream") u, err := url.Parse("rtmp://127.0.0.1:9121/stream")
require.NoError(t, err) require.NoError(t, err)
nconn, err := net.Dial("tcp", u.Host) nconn, err := net.Dial("tcp", u.Host)
require.NoError(t, err) require.NoError(t, err)
defer nconn.Close() defer nconn.Close()
conn := NewClientConn(nconn, u) conn := NewClientConn(nconn, u)
err = conn.ClientHandshake(true) err = conn.ClientHandshake(ca == "read")
require.NoError(t, err) require.NoError(t, err)
<-done <-done
})
}
} }
func TestServerHandshake(t *testing.T) { func TestServerHandshake(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:9121") for _, ca := range []string{"read", "publish"} {
require.NoError(t, err) t.Run(ca, func(t *testing.T) {
defer ln.Close() ln, err := net.Listen("tcp", "127.0.0.1:9121")
require.NoError(t, err)
defer ln.Close()
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
nconn, err := ln.Accept() nconn, err := ln.Accept()
require.NoError(t, err) require.NoError(t, err)
defer nconn.Close() defer nconn.Close()
conn := NewServerConn(nconn) conn := NewServerConn(nconn)
err = conn.ServerHandshake() err = conn.ServerHandshake()
require.NoError(t, err) require.NoError(t, err)
close(done) close(done)
}() }()
conn, err := net.Dial("tcp", "127.0.0.1:9121") conn, err := net.Dial("tcp", "127.0.0.1:9121")
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
bc := bytecounter.NewReadWriter(conn) bc := bytecounter.NewReadWriter(conn)
// C->S handshake C0 // C->S handshake C0
err = handshake.C0S0{}.Write(bc) err = handshake.C0S0{}.Write(bc)
require.NoError(t, err) require.NoError(t, err)
// C->S handshake C1 // C->S handshake C1
c1 := handshake.C1S1{} c1 := handshake.C1S1{}
err = c1.Write(bc, true) err = c1.Write(bc, true)
require.NoError(t, err) require.NoError(t, err)
// S->C handshake S0 // S->C handshake S0
err = handshake.C0S0{}.Read(bc) err = handshake.C0S0{}.Read(bc)
require.NoError(t, err) require.NoError(t, err)
// S->C handshake S1 // S->C handshake S1
s1 := handshake.C1S1{} s1 := handshake.C1S1{}
err = s1.Read(bc, false) err = s1.Read(bc, false)
require.NoError(t, err) require.NoError(t, err)
// S->C handshake S2 // S->C handshake S2
err = (&handshake.C2S2{Digest: c1.Digest}).Read(bc) err = (&handshake.C2S2{Digest: c1.Digest}).Read(bc)
require.NoError(t, err) require.NoError(t, err)
// C->S handshake C2 // C->S handshake C2
err = handshake.C2S2{Digest: s1.Digest}.Write(bc) err = handshake.C2S2{Digest: s1.Digest}.Write(bc)
require.NoError(t, err) require.NoError(t, err)
mrw := message.NewReadWriter(bc) mrw := message.NewReadWriter(bc)
// C->S connect // C->S connect
err = mrw.Write(&message.MsgCommandAMF0{ err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3, ChunkStreamID: 3,
Payload: []interface{}{ Payload: []interface{}{
"connect", "connect",
1, 1,
flvio.AMFMap{ flvio.AMFMap{
{K: "app", V: "/stream"}, {K: "app", V: "/stream"},
{K: "flashVer", V: "LNX 9,0,124,2"}, {K: "flashVer", V: "LNX 9,0,124,2"},
{K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")},
{K: "fpad", V: false}, {K: "fpad", V: false},
{K: "capabilities", V: 15}, {K: "capabilities", V: 15},
{K: "audioCodecs", V: 4071}, {K: "audioCodecs", V: 4071},
{K: "videoCodecs", V: 252}, {K: "videoCodecs", V: 252},
{K: "videoFunction", V: 1}, {K: "videoFunction", V: 1},
}, },
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
// S->C window acknowledgement size // S->C window acknowledgement size
msg, err := mrw.Read() msg, err := mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgSetWindowAckSize{ require.Equal(t, &message.MsgSetWindowAckSize{
Value: 2500000, Value: 2500000,
}, msg) }, msg)
// S->C set peer bandwidth // S->C set peer bandwidth
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgSetPeerBandwidth{ require.Equal(t, &message.MsgSetPeerBandwidth{
Value: 2500000, Value: 2500000,
Type: 2, Type: 2,
}, msg) }, msg)
// S->C set chunk size // S->C set chunk size
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgSetChunkSize{ require.Equal(t, &message.MsgSetChunkSize{
Value: 65536, Value: 65536,
}, msg) }, msg)
// S->C result // S->C result
msg, err = mrw.Read() msg, err = mrw.Read()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{ require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3, ChunkStreamID: 3,
Payload: []interface{}{ Payload: []interface{}{
"_result", "_result",
float64(1), float64(1),
flvio.AMFMap{ flvio.AMFMap{
{K: "fmsVer", V: "LNX 9,0,124,2"}, {K: "fmsVer", V: "LNX 9,0,124,2"},
{K: "capabilities", V: float64(31)}, {K: "capabilities", V: float64(31)},
}, },
flvio.AMFMap{ flvio.AMFMap{
{K: "level", V: "status"}, {K: "level", V: "status"},
{K: "code", V: "NetConnection.Connect.Success"}, {K: "code", V: "NetConnection.Connect.Success"},
{K: "description", V: "Connection succeeded."}, {K: "description", V: "Connection succeeded."},
{K: "objectEncoding", V: float64(0)}, {K: "objectEncoding", V: float64(0)},
}, },
}, },
}, msg) }, msg)
// C->S set chunk size // C->S set chunk size
err = mrw.Write(&message.MsgSetChunkSize{ err = mrw.Write(&message.MsgSetChunkSize{
Value: 65536, Value: 65536,
}) })
require.NoError(t, err) require.NoError(t, err)
// C->S releaseStream if ca == "read" {
err = mrw.Write(&message.MsgCommandAMF0{ // C->S createStream
ChunkStreamID: 3, err = mrw.Write(&message.MsgCommandAMF0{
Payload: []interface{}{ ChunkStreamID: 3,
"releaseStream", Payload: []interface{}{
float64(2), "createStream",
nil, float64(2),
"", nil,
}, },
}) })
require.NoError(t, err) require.NoError(t, err)
// C->S FCPublish // S->C result
err = mrw.Write(&message.MsgCommandAMF0{ msg, err = mrw.Read()
ChunkStreamID: 3, require.NoError(t, err)
Payload: []interface{}{ require.Equal(t, &message.MsgCommandAMF0{
"FCPublish", ChunkStreamID: 3,
float64(3), Payload: []interface{}{
nil, "_result",
"", float64(2),
}, nil,
}) float64(1),
require.NoError(t, err) },
}, msg)
// C->S createStream // C->S user control set buffer length
err = mrw.Write(&message.MsgCommandAMF0{ err = mrw.Write(&message.MsgUserControlSetBufferLength{
ChunkStreamID: 3, BufferLength: 0x64,
Payload: []interface{}{ })
"createStream", require.NoError(t, err)
float64(4),
nil,
},
})
require.NoError(t, err)
// S->C result // C->S play
msg, err = mrw.Read() err = mrw.Write(&message.MsgCommandAMF0{
require.NoError(t, err) ChunkStreamID: 4,
require.Equal(t, &message.MsgCommandAMF0{ MessageStreamID: 16777216,
ChunkStreamID: 3, Payload: []interface{}{
Payload: []interface{}{ "play",
"_result", float64(0),
float64(4), nil,
nil, "",
float64(1), },
}, })
}, msg) require.NoError(t, err)
} else {
// C->S releaseStream
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"releaseStream",
float64(2),
nil,
"",
},
})
require.NoError(t, err)
// C->S publish // C->S FCPublish
err = mrw.Write(&message.MsgCommandAMF0{ err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 8, ChunkStreamID: 3,
MessageStreamID: 1, Payload: []interface{}{
Payload: []interface{}{ "FCPublish",
"publish", float64(3),
float64(5), nil,
nil, "",
"", },
"live", })
}, require.NoError(t, err)
})
require.NoError(t, err) // C->S createStream
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"createStream",
float64(4),
nil,
},
})
require.NoError(t, err)
<-done // S->C result
msg, err = mrw.Read()
require.NoError(t, err)
require.Equal(t, &message.MsgCommandAMF0{
ChunkStreamID: 3,
Payload: []interface{}{
"_result",
float64(4),
nil,
float64(1),
},
}, msg)
// C->S publish
err = mrw.Write(&message.MsgCommandAMF0{
ChunkStreamID: 4,
MessageStreamID: 16777216,
Payload: []interface{}{
"publish",
float64(5),
nil,
"",
"stream",
},
})
require.NoError(t, err)
}
<-done
})
}
} }
func TestReadTracks(t *testing.T) { func TestReadTracks(t *testing.T) {

Loading…
Cancel
Save