<?php
 
 
namespace ZeusTest;
 
 
use PHPUnit_Framework_TestCase;
 
use Zend\EventManager\EventManager;
 
use Zeus\Kernel\ProcessManager\Process;
 
use Zeus\Kernel\ProcessManager\SchedulerEvent;
 
use Zeus\ServerService\Async\Config;
 
use Zeus\ServerService\Shared\Networking\SocketEventSubscriber;
 
use Zeus\Kernel\Networking\SocketServer;
 
use ZeusTest\Helpers\SocketTestMessage;
 
use ZeusTest\Helpers\ZeusFactories;
 
 
class SocketEventSubscriberTest extends PHPUnit_Framework_TestCase
 
{
 
    use ZeusFactories;
 
 
    /** @var SocketServer */
 
    protected $server;
 
    protected $port;
 
 
    public function setUp()
 
    {
 
        $config = new Config();
 
        $this->port = 7777;
 
        $config->setListenPort($this->port);
 
        $config->setListenAddress('0.0.0.0');
 
        $this->server = new SocketServer($config);
 
    }
 
 
    public function tearDown()
 
    {
 
        $this->server->stop();
 
    }
 
 
    public function testSubscriberRequestHandling()
 
    {
 
        $events = new EventManager();
 
        $event = new SchedulerEvent();
 
        $event->setScheduler($this->getScheduler(0));
 
        $process = new Process($event);
 
        $process->setConfig(new \Zeus\Kernel\ProcessManager\Config([]));
 
        $event->setProcess($process);
 
        $process->attach($events);
 
 
        $received = null;
 
        $steps = 0;
 
        $message = new SocketTestMessage(function($connection, $data) use (&$received, &$steps) {
 
            $received = $data;
 
            $steps ++;
 
        }, function($connection) use (& $heartBeats) {
 
            $heartBeats++;
 
 
            if ($heartBeats == 2) {
 
                $connection->close();
 
            }
 
        });
 
        $eventSubscriber = new SocketEventSubscriber($this->server, $message);
 
        $eventSubscriber->attach($events);
 
 
        $events->attach(SchedulerEvent::EVENT_SCHEDULER_START, function(SchedulerEvent $event) use (& $schedulerStarted) {
 
            $event->stopPropagation(true);
 
        }, SchedulerEvent::PRIORITY_FINALIZE + 1);
 
        
 
        $events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) use (& $schedulerStarted) {
 
            $event->stopPropagation(true);
 
        }, SchedulerEvent::PRIORITY_FINALIZE + 1);
 
 
        $event->setName(SchedulerEvent::EVENT_SCHEDULER_START);
 
        $events->triggerEvent($event);
 
 
        $event->setName(SchedulerEvent::EVENT_PROCESS_INIT);
 
        $events->triggerEvent($event);
 
 
        $client = stream_socket_client('tcp://localhost:' . $this->port);
 
        stream_set_blocking($client, false);
 
 
        $requestString = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
 
        fwrite($client, $requestString);
 
 
        $event->setName(SchedulerEvent::EVENT_PROCESS_LOOP);
 
        $events->triggerEvent($event);
 
        $events->triggerEvent($event);
 
 
        fclose($client);
 
 
        $event->setName(SchedulerEvent::EVENT_PROCESS_EXIT);
 
        $events->triggerEvent($event);
 
 
        $this->assertEquals($requestString, $received);
 
        $this->assertEquals(1, $steps, "Message should be fetched twice");
 
        $this->assertEquals(2, $heartBeats, "Heartbeat should be called twice");
 
    }
 
 
    public function testSubscriberErrorHandling()
 
    {
 
        $events = new EventManager();
 
        $event = new SchedulerEvent();
 
        $event->setScheduler($this->getScheduler(0));
 
        $process = new Process($event);
 
        $process->setConfig(new \Zeus\Kernel\ProcessManager\Config([]));
 
        $event->setProcess($process);
 
        $process->attach($events);
 
 
        $received = null;
 
        $message = new SocketTestMessage(function($connection, $data) use (&$received) {
 
            throw new \RuntimeException("TEST");
 
        });
 
        $eventSubscriber = new SocketEventSubscriber($this->server, $message);
 
        $eventSubscriber->attach($events);
 
 
        $events->attach(SchedulerEvent::EVENT_SCHEDULER_START, function(SchedulerEvent $event) use (& $schedulerStarted) {
 
            $event->stopPropagation(true);
 
        }, SchedulerEvent::PRIORITY_FINALIZE + 1);
 
 
        $events->attach(SchedulerEvent::EVENT_PROCESS_INIT, function(SchedulerEvent $event) use (& $schedulerStarted) {
 
            $event->stopPropagation(true);
 
        }, SchedulerEvent::PRIORITY_FINALIZE + 1);
 
 
        $event->setName(SchedulerEvent::EVENT_SCHEDULER_START);
 
        $events->triggerEvent($event);
 
 
        $event->setName(SchedulerEvent::EVENT_PROCESS_INIT);
 
 
        $events->triggerEvent($event);
 
 
        $client = stream_socket_client('tcp://localhost:' . $this->port);
 
        stream_set_blocking($client, false);
 
 
        $requestString = "GET / HTTP/1.0\r\nConnection: keep-alive\r\n\r\n";
 
        fwrite($client, $requestString);
 
 
        $event->setName(SchedulerEvent::EVENT_PROCESS_LOOP);
 
        $exception = null;
 
        try {
 
            $events->triggerEvent($event);
 
        } catch (\RuntimeException $exception) {
 
        }
 
 
        $this->assertTrue(is_object($exception), 'Exception should be raised');
 
        $this->assertInstanceOf(\RuntimeException::class, $exception, 'Correct exception should be raised');
 
        $this->assertEquals("TEST", $exception->getMessage(), 'Correct exception should be raised');
 
        $read = @stream_get_contents($client);
 
        $eof = feof($client);
 
        $this->assertEquals("", $read, 'Stream should not contain any message');
 
        $this->assertEquals(true, $eof, 'Client stream should not be readable when disconnected');
 
 
        fclose($client);
 
    }
 
}
 
 |