You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
222 lines
6.3 KiB
222 lines
6.3 KiB
<?php |
|
|
|
namespace React\Tests\Stream; |
|
|
|
use React\Stream\Stream; |
|
use React\EventLoop as rel; |
|
|
|
class StreamIntegrationTest extends TestCase |
|
{ |
|
public function loopProvider() |
|
{ |
|
return array( |
|
array(function() { return true; }, function() { return new rel\StreamSelectLoop; }), |
|
array(function() { return function_exists('event_base_new'); }, function() { return new rel\LibEventLoop; }), |
|
array(function() { return class_exists('libev\EventLoop'); }, function() { return new rel\LibEvLoop; }), |
|
array(function() { return class_exists('EventBase'); }, function() { return new rel\ExtEventLoop; }) |
|
); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testBufferReadsLargeChunks($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); |
|
|
|
$streamA = new Stream($sockA, $loop); |
|
$streamB = new Stream($sockB, $loop); |
|
|
|
$bufferSize = 4096; |
|
$streamA->bufferSize = $bufferSize; |
|
$streamB->bufferSize = $bufferSize; |
|
|
|
$testString = str_repeat("*", $streamA->bufferSize + 1); |
|
|
|
$buffer = ""; |
|
$streamB->on('data', function ($data, $streamB) use (&$buffer, &$testString) { |
|
$buffer .= $data; |
|
}); |
|
|
|
$streamA->write($testString); |
|
|
|
$loop->tick(); |
|
$loop->tick(); |
|
$loop->tick(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
|
|
$this->assertEquals($testString, $buffer); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testWriteLargeChunk($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); |
|
|
|
$streamA = new Stream($sockA, $loop); |
|
$streamB = new Stream($sockB, $loop); |
|
|
|
// limit seems to be 192 KiB |
|
$size = 256 * 1024; |
|
|
|
// sending side sends and expects clean close with no errors |
|
$streamA->end(str_repeat('*', $size)); |
|
$streamA->on('close', $this->expectCallableOnce()); |
|
$streamA->on('error', $this->expectCallableNever()); |
|
|
|
// receiving side counts bytes and expects clean close with no errors |
|
$received = 0; |
|
$streamB->on('data', function ($chunk) use (&$received) { |
|
$received += strlen($chunk); |
|
}); |
|
$streamB->on('close', $this->expectCallableOnce()); |
|
$streamB->on('error', $this->expectCallableNever()); |
|
|
|
$loop->run(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
|
|
$this->assertEquals($size, $received); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testDoesNotEmitDataIfNothingHasBeenWritten($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); |
|
|
|
$streamA = new Stream($sockA, $loop); |
|
$streamB = new Stream($sockB, $loop); |
|
|
|
// end streamA without writing any data |
|
$streamA->end(); |
|
|
|
// streamB should not emit any data |
|
$streamB->on('data', $this->expectCallableNever()); |
|
|
|
$loop->run(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testDoesNotWriteDataIfRemoteSideFromPairHasBeenClosed($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
list($sockA, $sockB) = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0); |
|
|
|
$streamA = new Stream($sockA, $loop); |
|
$streamB = new Stream($sockB, $loop); |
|
|
|
// end streamA without writing any data |
|
$streamA->pause(); |
|
$streamA->write('hello'); |
|
$streamA->on('close', $this->expectCallableOnce()); |
|
|
|
$streamB->on('data', $this->expectCallableNever()); |
|
$streamB->close(); |
|
|
|
$loop->run(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testDoesNotWriteDataIfServerSideHasBeenClosed($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
$server = stream_socket_server('tcp://localhost:0'); |
|
|
|
$client = stream_socket_client(stream_socket_get_name($server, false)); |
|
$peer = stream_socket_accept($server); |
|
|
|
$streamA = new Stream($client, $loop); |
|
$streamB = new Stream($peer, $loop); |
|
|
|
// end streamA without writing any data |
|
$streamA->pause(); |
|
$streamA->write('hello'); |
|
$streamA->on('close', $this->expectCallableOnce()); |
|
|
|
$streamB->on('data', $this->expectCallableNever()); |
|
$streamB->close(); |
|
|
|
$loop->run(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
} |
|
|
|
/** |
|
* @dataProvider loopProvider |
|
*/ |
|
public function testDoesNotWriteDataIfClientSideHasBeenClosed($condition, $loopFactory) |
|
{ |
|
if (true !== $condition()) { |
|
return $this->markTestSkipped('Loop implementation not available'); |
|
} |
|
|
|
$loop = $loopFactory(); |
|
|
|
$server = stream_socket_server('tcp://localhost:0'); |
|
|
|
$client = stream_socket_client(stream_socket_get_name($server, false)); |
|
$peer = stream_socket_accept($server); |
|
|
|
$streamA = new Stream($peer, $loop); |
|
$streamB = new Stream($client, $loop); |
|
|
|
// end streamA without writing any data |
|
$streamA->pause(); |
|
$streamA->write('hello'); |
|
$streamA->on('close', $this->expectCallableOnce()); |
|
|
|
$streamB->on('data', $this->expectCallableNever()); |
|
$streamB->close(); |
|
|
|
$loop->run(); |
|
|
|
$streamA->close(); |
|
$streamB->close(); |
|
} |
|
}
|
|
|