<?php 
 
/** @noinspection PhpComposerExtensionStubsInspection */ 
 
declare(strict_types=1); 
 
error_reporting(E_ALL); 
date_default_timezone_set('UTC'); 
include __DIR__ . '/../vendor/autoload.php'; 
 
use Doctrine\DBAL\Connection; 
use Doctrine\DBAL\DriverManager; 
use MySQLReplication\Config\ConfigBuilder; 
use MySQLReplication\Definitions\ConstEventType; 
use MySQLReplication\Event\DTO\UpdateRowsDTO; 
use MySQLReplication\Event\EventSubscribers; 
use MySQLReplication\MySQLReplicationFactory; 
 
/** 
 * Simple benchmark to test how fast events are consumed 
 */ 
class benchmark 
{ 
    private const DB_NAME = 'mysqlreplication_test'; 
    private const DB_USER = 'root'; 
    private const DB_PASS = 'root'; 
    private const DB_HOST = '127.0.0.1'; 
    private const DB_PORT = 3306; 
 
    private MySQLReplicationFactory $binLogStream; 
 
    public function __construct() 
    { 
        $conn = $this->getConnection(); 
        $conn->executeStatement('DROP DATABASE IF EXISTS ' . self::DB_NAME); 
        $conn->executeStatement('CREATE DATABASE ' . self::DB_NAME); 
        $conn->executeStatement('USE ' . self::DB_NAME); 
        $conn->executeStatement('CREATE TABLE test (i INT) ENGINE = MEMORY'); 
        $conn->executeStatement('INSERT INTO test VALUES(1)'); 
        $conn->executeStatement('CREATE TABLE test2 (i INT) ENGINE = MEMORY'); 
        $conn->executeStatement('INSERT INTO test2 VALUES(1)'); 
        $conn->executeStatement('RESET MASTER'); 
 
        $this->binLogStream = new MySQLReplicationFactory( 
            (new ConfigBuilder()) 
                ->withUser(self::DB_USER) 
                ->withPassword(self::DB_PASS) 
                ->withHost(self::DB_HOST) 
                ->withPort(self::DB_PORT) 
                ->withEventsOnly( 
                    [ 
                        ConstEventType::UPDATE_ROWS_EVENT_V2->value, 
                        // for mariadb v1 
                        ConstEventType::UPDATE_ROWS_EVENT_V1->value, 
                    ] 
                ) 
                ->withSlaveId(9999) 
                ->withDatabasesOnly([self::DB_NAME]) 
                ->build() 
        ); 
 
        $this->binLogStream->registerSubscriber( 
            new class() extends EventSubscribers { 
                private float $start; 
 
                private int $counter = 0; 
 
                public function __construct() 
                { 
                    $this->start = microtime(true); 
                } 
 
                public function onUpdate(UpdateRowsDTO $event): void 
                { 
                    ++$this->counter; 
                    if (0 === ($this->counter % 1000)) { 
                        echo ((int)($this->counter / (microtime( 
                            true 
                        ) - $this->start)) . ' event by seconds (' . $this->counter . ' total)') . PHP_EOL; 
                    } 
                } 
            } 
        ); 
    } 
 
    public function run(): void 
    { 
        $pid = pcntl_fork(); 
        if ($pid === -1) { 
            throw new InvalidArgumentException('Could not fork'); 
        } 
 
        if ($pid) { 
            $this->consume(); 
            pcntl_wait($status); 
        } else { 
            $this->produce(); 
        } 
    } 
 
    private function getConnection(): Connection 
    { 
        return DriverManager::getConnection( 
            [ 
                'user' => self::DB_USER, 
                'password' => self::DB_PASS, 
                'host' => self::DB_HOST, 
                'port' => self::DB_PORT, 
                'driver' => 'pdo_mysql', 
                'dbname' => self::DB_NAME, 
            ] 
        ); 
    } 
 
    private function consume(): void 
    { 
        $this->binLogStream->run(); 
    } 
 
    private function produce(): void 
    { 
        $conn = $this->getConnection(); 
 
        echo 'Start insert data' . PHP_EOL; 
 
        /** @phpstan-ignore-next-line */ 
        while (1) { 
            $conn->executeStatement('UPDATE  test SET i = i + 1;'); 
            $conn->executeStatement('UPDATE test2 SET i = i + 1;'); 
        } 
    } 
} 
 
(new benchmark())->run(); 
 
 |