diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index 67cc09eb..0c1275f3 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -49,372 +49,509 @@ func getTcURL(u string) string { } func TestClientHandshake(t *testing.T) { - ln, err := net.Listen("tcp", "127.0.0.1:9121") - require.NoError(t, err) - defer ln.Close() - - done := make(chan struct{}) + for _, ca := range []string{"read", "publish"} { + t.Run(ca, func(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:9121") + require.NoError(t, err) + defer ln.Close() - go func() { - conn, err := ln.Accept() - require.NoError(t, err) - defer conn.Close() - bc := bytecounter.NewReadWriter(conn) + done := make(chan struct{}) - // C->S handshake C0 - err = handshake.C0S0{}.Read(bc) - require.NoError(t, err) + go func() { + conn, err := ln.Accept() + require.NoError(t, err) + defer conn.Close() + bc := bytecounter.NewReadWriter(conn) - // C->S handshake C1 - c1 := handshake.C1S1{} - err = c1.Read(bc, true) - require.NoError(t, err) + // C->S handshake C0 + err = handshake.C0S0{}.Read(bc) + require.NoError(t, err) - // S->C handshake S0 - err = handshake.C0S0{}.Write(bc) - require.NoError(t, err) + // C->S handshake C1 + c1 := handshake.C1S1{} + err = c1.Read(bc, true) + require.NoError(t, err) - // S->C handshake S1 - s1 := handshake.C1S1{} - err = s1.Write(bc, false) - require.NoError(t, err) + // S->C handshake S0 + err = handshake.C0S0{}.Write(bc) + require.NoError(t, err) - // S->C handshake S2 - err = handshake.C2S2{Digest: c1.Digest}.Write(bc) - require.NoError(t, err) + // S->C handshake S1 + s1 := handshake.C1S1{} + err = s1.Write(bc, false) + require.NoError(t, err) - // C->S handshake C2 - err = (&handshake.C2S2{Digest: s1.Digest}).Read(bc) - require.NoError(t, err) + // S->C handshake S2 + err = handshake.C2S2{Digest: c1.Digest}.Write(bc) + require.NoError(t, err) - mrw := message.NewReadWriter(bc) + // C->S handshake C2 + err = (&handshake.C2S2{Digest: s1.Digest}).Read(bc) + require.NoError(t, err) - // C->S set window ack size - msg, err := mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetWindowAckSize{ - Value: 2500000, - }, msg) + mrw := message.NewReadWriter(bc) - // C->S set peer bandwidth - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetPeerBandwidth{ - Value: 0x2625a0, - Type: 2, - }, msg) + // C->S set window ack size + msg, err := mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetWindowAckSize{ + Value: 2500000, + }, msg) - // C->S set chunk size - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetChunkSize{ - Value: 65536, - }, msg) + // C->S set peer bandwidth + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetPeerBandwidth{ + Value: 0x2625a0, + Type: 2, + }, msg) - // C->S connect - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "connect", - float64(1), - flvio.AMFMap{ - {K: "app", V: "stream"}, - {K: "flashVer", V: "LNX 9,0,124,2"}, - {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, - {K: "fpad", V: false}, - {K: "capabilities", V: float64(15)}, - {K: "audioCodecs", V: float64(4071)}, - {K: "videoCodecs", V: float64(252)}, - {K: "videoFunction", V: float64(1)}, - }, - }, - }, msg) - - // S->C result - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "_result", - float64(1), - flvio.AMFMap{ - {K: "fmsVer", V: "LNX 9,0,124,2"}, - {K: "capabilities", V: float64(31)}, - }, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetConnection.Connect.Success"}, - {K: "description", V: "Connection succeeded."}, - {K: "objectEncoding", V: float64(0)}, - }, - }, - }) - require.NoError(t, err) + // C->S set chunk size + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetChunkSize{ + Value: 65536, + }, msg) - // C->S create stream - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "createStream", - float64(2), - nil, - }, - }, msg) - - // S->C result - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "_result", - float64(2), - nil, - float64(1), - }, - }) - require.NoError(t, err) + // C->S connect + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "connect", + float64(1), + flvio.AMFMap{ + {K: "app", V: "stream"}, + {K: "flashVer", V: "LNX 9,0,124,2"}, + {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, + {K: "fpad", V: false}, + {K: "capabilities", V: float64(15)}, + {K: "audioCodecs", V: float64(4071)}, + {K: "videoCodecs", V: float64(252)}, + {K: "videoFunction", V: float64(1)}, + }, + }, + }, msg) - // C->S user control set buffer length - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgUserControlSetBufferLength{ - BufferLength: 0x64, - }, msg) + // S->C result + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(1), + flvio.AMFMap{ + {K: "fmsVer", V: "LNX 9,0,124,2"}, + {K: "capabilities", V: float64(31)}, + }, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetConnection.Connect.Success"}, + {K: "description", V: "Connection succeeded."}, + {K: "objectEncoding", V: float64(0)}, + }, + }, + }) + require.NoError(t, err) - // C->S play - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgCommandAMF0{ - ChunkStreamID: 4, - MessageStreamID: 16777216, - Payload: []interface{}{ - "play", - float64(0), - nil, - "", - }, - }, msg) - - // S->C onStatus - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 5, - MessageStreamID: 16777216, - Payload: []interface{}{ - "onStatus", - float64(4), - nil, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetStream.Play.Reset"}, - {K: "description", V: "play reset"}, - }, - }, - }) - require.NoError(t, err) + if ca == "read" { + // C->S create stream + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "createStream", + float64(2), + nil, + }, + }, msg) + + // S->C result + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(2), + nil, + float64(1), + }, + }) + require.NoError(t, err) + + // C->S user control set buffer length + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgUserControlSetBufferLength{ + BufferLength: 0x64, + }, msg) + + // C->S play + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Payload: []interface{}{ + "play", + float64(0), + nil, + "", + }, + }, msg) + + // S->C onStatus + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(4), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Play.Reset"}, + {K: "description", V: "play reset"}, + }, + }, + }) + require.NoError(t, err) + } else { + // C->S releaseStream + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "releaseStream", + float64(2), + nil, + "", + }, + }, msg) + + // C->S FCPublish + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "FCPublish", + float64(3), + nil, + "", + }, + }, msg) + + // C->S createStream + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "createStream", + float64(4), + nil, + }, + }, msg) + + // S->C result + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(4), + nil, + float64(1), + }, + }) + require.NoError(t, err) + + // C->S publish + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Payload: []interface{}{ + "publish", + float64(5), + nil, + "", + "stream", + }, + }, msg) + + // S->C onStatus + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 5, + MessageStreamID: 16777216, + Payload: []interface{}{ + "onStatus", + float64(5), + nil, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetStream.Publish.Start"}, + {K: "description", V: "publish start"}, + }, + }, + }) + require.NoError(t, err) + } - close(done) - }() + close(done) + }() - u, err := url.Parse("rtmp://127.0.0.1:9121/stream") - require.NoError(t, err) + u, err := url.Parse("rtmp://127.0.0.1:9121/stream") + require.NoError(t, err) - nconn, err := net.Dial("tcp", u.Host) - require.NoError(t, err) - defer nconn.Close() - conn := NewClientConn(nconn, u) + nconn, err := net.Dial("tcp", u.Host) + require.NoError(t, err) + defer nconn.Close() + conn := NewClientConn(nconn, u) - err = conn.ClientHandshake(true) - require.NoError(t, err) + err = conn.ClientHandshake(ca == "read") + require.NoError(t, err) - <-done + <-done + }) + } } func TestServerHandshake(t *testing.T) { - ln, err := net.Listen("tcp", "127.0.0.1:9121") - require.NoError(t, err) - defer ln.Close() + for _, ca := range []string{"read", "publish"} { + t.Run(ca, func(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:9121") + require.NoError(t, err) + defer ln.Close() - done := make(chan struct{}) + done := make(chan struct{}) - go func() { - nconn, err := ln.Accept() - require.NoError(t, err) - defer nconn.Close() + go func() { + nconn, err := ln.Accept() + require.NoError(t, err) + defer nconn.Close() - conn := NewServerConn(nconn) - err = conn.ServerHandshake() - require.NoError(t, err) + conn := NewServerConn(nconn) + err = conn.ServerHandshake() + require.NoError(t, err) - close(done) - }() + close(done) + }() - conn, err := net.Dial("tcp", "127.0.0.1:9121") - require.NoError(t, err) - defer conn.Close() - bc := bytecounter.NewReadWriter(conn) + conn, err := net.Dial("tcp", "127.0.0.1:9121") + require.NoError(t, err) + defer conn.Close() + bc := bytecounter.NewReadWriter(conn) - // C->S handshake C0 - err = handshake.C0S0{}.Write(bc) - require.NoError(t, err) + // C->S handshake C0 + err = handshake.C0S0{}.Write(bc) + require.NoError(t, err) - // C->S handshake C1 - c1 := handshake.C1S1{} - err = c1.Write(bc, true) - require.NoError(t, err) + // C->S handshake C1 + c1 := handshake.C1S1{} + err = c1.Write(bc, true) + require.NoError(t, err) - // S->C handshake S0 - err = handshake.C0S0{}.Read(bc) - require.NoError(t, err) + // S->C handshake S0 + err = handshake.C0S0{}.Read(bc) + require.NoError(t, err) - // S->C handshake S1 - s1 := handshake.C1S1{} - err = s1.Read(bc, false) - require.NoError(t, err) + // S->C handshake S1 + s1 := handshake.C1S1{} + err = s1.Read(bc, false) + require.NoError(t, err) - // S->C handshake S2 - err = (&handshake.C2S2{Digest: c1.Digest}).Read(bc) - require.NoError(t, err) + // S->C handshake S2 + err = (&handshake.C2S2{Digest: c1.Digest}).Read(bc) + require.NoError(t, err) - // C->S handshake C2 - err = handshake.C2S2{Digest: s1.Digest}.Write(bc) - require.NoError(t, err) + // C->S handshake C2 + err = handshake.C2S2{Digest: s1.Digest}.Write(bc) + require.NoError(t, err) - mrw := message.NewReadWriter(bc) + mrw := message.NewReadWriter(bc) - // C->S connect - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "connect", - 1, - flvio.AMFMap{ - {K: "app", V: "/stream"}, - {K: "flashVer", V: "LNX 9,0,124,2"}, - {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, - {K: "fpad", V: false}, - {K: "capabilities", V: 15}, - {K: "audioCodecs", V: 4071}, - {K: "videoCodecs", V: 252}, - {K: "videoFunction", V: 1}, - }, - }, - }) - require.NoError(t, err) + // C->S connect + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "connect", + 1, + flvio.AMFMap{ + {K: "app", V: "/stream"}, + {K: "flashVer", V: "LNX 9,0,124,2"}, + {K: "tcUrl", V: getTcURL("rtmp://127.0.0.1:9121/stream")}, + {K: "fpad", V: false}, + {K: "capabilities", V: 15}, + {K: "audioCodecs", V: 4071}, + {K: "videoCodecs", V: 252}, + {K: "videoFunction", V: 1}, + }, + }, + }) + require.NoError(t, err) - // S->C window acknowledgement size - msg, err := mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetWindowAckSize{ - Value: 2500000, - }, msg) + // S->C window acknowledgement size + msg, err := mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetWindowAckSize{ + Value: 2500000, + }, msg) - // S->C set peer bandwidth - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetPeerBandwidth{ - Value: 2500000, - Type: 2, - }, msg) + // S->C set peer bandwidth + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetPeerBandwidth{ + Value: 2500000, + Type: 2, + }, msg) - // S->C set chunk size - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgSetChunkSize{ - Value: 65536, - }, msg) + // S->C set chunk size + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgSetChunkSize{ + Value: 65536, + }, msg) - // S->C result - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "_result", - float64(1), - flvio.AMFMap{ - {K: "fmsVer", V: "LNX 9,0,124,2"}, - {K: "capabilities", V: float64(31)}, - }, - flvio.AMFMap{ - {K: "level", V: "status"}, - {K: "code", V: "NetConnection.Connect.Success"}, - {K: "description", V: "Connection succeeded."}, - {K: "objectEncoding", V: float64(0)}, - }, - }, - }, msg) + // S->C result + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(1), + flvio.AMFMap{ + {K: "fmsVer", V: "LNX 9,0,124,2"}, + {K: "capabilities", V: float64(31)}, + }, + flvio.AMFMap{ + {K: "level", V: "status"}, + {K: "code", V: "NetConnection.Connect.Success"}, + {K: "description", V: "Connection succeeded."}, + {K: "objectEncoding", V: float64(0)}, + }, + }, + }, msg) - // C->S set chunk size - err = mrw.Write(&message.MsgSetChunkSize{ - Value: 65536, - }) - require.NoError(t, err) + // C->S set chunk size + err = mrw.Write(&message.MsgSetChunkSize{ + Value: 65536, + }) + require.NoError(t, err) - // C->S releaseStream - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "releaseStream", - float64(2), - nil, - "", - }, - }) - require.NoError(t, err) + if ca == "read" { + // C->S createStream + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "createStream", + float64(2), + nil, + }, + }) + require.NoError(t, err) - // C->S FCPublish - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "FCPublish", - float64(3), - nil, - "", - }, - }) - require.NoError(t, err) + // S->C result + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(2), + nil, + float64(1), + }, + }, msg) - // C->S createStream - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "createStream", - float64(4), - nil, - }, - }) - require.NoError(t, err) + // C->S user control set buffer length + err = mrw.Write(&message.MsgUserControlSetBufferLength{ + BufferLength: 0x64, + }) + require.NoError(t, err) - // S->C result - msg, err = mrw.Read() - require.NoError(t, err) - require.Equal(t, &message.MsgCommandAMF0{ - ChunkStreamID: 3, - Payload: []interface{}{ - "_result", - float64(4), - nil, - float64(1), - }, - }, msg) + // C->S play + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Payload: []interface{}{ + "play", + float64(0), + nil, + "", + }, + }) + require.NoError(t, err) + } else { + // C->S releaseStream + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "releaseStream", + float64(2), + nil, + "", + }, + }) + require.NoError(t, err) - // C->S publish - err = mrw.Write(&message.MsgCommandAMF0{ - ChunkStreamID: 8, - MessageStreamID: 1, - Payload: []interface{}{ - "publish", - float64(5), - nil, - "", - "live", - }, - }) - require.NoError(t, err) + // C->S FCPublish + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "FCPublish", + float64(3), + nil, + "", + }, + }) + require.NoError(t, err) + + // C->S createStream + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "createStream", + float64(4), + nil, + }, + }) + require.NoError(t, err) - <-done + // S->C result + msg, err = mrw.Read() + require.NoError(t, err) + require.Equal(t, &message.MsgCommandAMF0{ + ChunkStreamID: 3, + Payload: []interface{}{ + "_result", + float64(4), + nil, + float64(1), + }, + }, msg) + + // C->S publish + err = mrw.Write(&message.MsgCommandAMF0{ + ChunkStreamID: 4, + MessageStreamID: 16777216, + Payload: []interface{}{ + "publish", + float64(5), + nil, + "", + "stream", + }, + }) + require.NoError(t, err) + } + + <-done + }) + } } func TestReadTracks(t *testing.T) {