[ * 'connection' => Connection, * 'page' => Page (объект страницы), * 'module' => Название модуля, * 'method' => Название страницы, * 'params' => Параметры страницы указанные при подключении, * 'user_id' => ID пользователя (если был указан в параметрах), * 'last' => Время последней проверки подключения, * ], * ... * ] */ protected $connections = []; /** @var array таймеры */ protected $timers = []; /** * @var int интервал опроса страниц (в секундах) */ protected $pingInterval = 60; /** * @var int интервал подключения к БД (в секундах) * Расчитывается следующим образом: (wait_timeout / 2, show variables like '%wait_timeout%') TODO */ protected $databasePingInterval = 14400; /** @var int время последнего подключения к базе данных */ protected $databasePingLast = 0; /** * Вызывается при старте работы сервера * @var callback */ public $onStart = null; /** * Вызывается при завершении работы сервера * @var callback */ public $onStop = null; /** * @var Worker */ public $worker = null; /** * Server name * @var string */ public $name; /** * Server constructor */ public function __construct() { $this->databasePingLast = time(); } /** * Устанавливаем интервал опроса подключенных страниц * @param integer $interval интервал в секундах */ public function setPingInterval($interval) { if ($interval > 0) { if ($interval < 10) { $interval = 10; } $this->pingInterval = $interval; } } /** * Устанавливаем интервал опроса подключения к базе данных * @param integer $interval интервал в секундах */ public function setDatabasePingInterval($interval) { if ($interval > 0) { $this->databasePingInterval = $interval; } } /** * Адрес websocket сервера * @param string $host хост, по умолчанию 127.0.0.1 * @param string $port порт, по умолчанию 8000 * @return string */ public static function name($host = '127.0.0.1', $port = '8000') { return 'websocket://' . $host . ':' . $port; } /** * Адрес inner сервера для обработки внутренних сообщений * @param string $host хост, по умолчанию 127.0.0.1 * @param string $port порт, по умолчанию 1234 * @return string */ public static function innerName($host = '127.0.0.1', $port = '1234') { return 'tcp://' . $host . ':' . $port; } /** * Строка URL для подключения в javascript * @param array $params параметры подключения [ * string 'module' - название модуля, * string 'method' - название метода, * ... * ] * @param string $uri путь URI, по умолчанию /ws/ * @return string */ public static function connectUrl(array $params = [], $uri = '/ws/') { return Url::to($uri, [ 'scheme' => (Request::scheme(true) === 'https' ? 'wss' : 'ws'), 'query' => $params, 'lang' => false, ]); } /** * Стартуем сервер * @param string $name адрес websocket сервера * @param string|false $innerName адрес inner сервера или false (не стартовать) * @param string $bffBootstrap путь для запуска ядра bff * @return Worker */ public function start($name, $innerName, $bffBootstrap) { if (empty($name)) { $name = static::name(); } $this->worker = new Worker(($this->name = $name)); $this->worker->onWorkerStart = function () use ($bffBootstrap, $innerName) { # Inner messages server if ($innerName !== false) { $this->startInner($innerName); } # Maintenance timer $this->timerStart(function () { $this->onTimer(); }, 10); # Core require_once $bffBootstrap; bff()->setDevelopersMode(false); # Callback if ($this->onStart) { try { call_user_func($this->onStart, $this); } catch (Throwable $e) { $this->log($e->getMessage(), ['function' => __METHOD__]); } } }; $this->worker->onWorkerStop = function () { # Stop timers foreach ($this->timers as $v) { Timer::del($v); } # Callback if ($this->onStop) { try { call_user_func($this->onStop, $this); } catch (Throwable $e) { $this->log($e->getMessage(), ['function' => __METHOD__]); } } }; $this->worker->onConnect = function ($connection) { $connection->onWebSocketConnect = [$this, 'onConnect']; $connection->onMessage = [$this, 'onMessage']; }; $this->worker->onClose = [$this, 'onClose']; return $this->worker; } /** * Обработка события подключения страницы * $_GET[ * string 'module' модуль * string 'method' метод * ] * Модуль должен иметь метод websocket(method, Server), для обработки страниц типа module, method * @param Connection $connection */ public function onConnect($connection) { /** @var \bff\base\Input object */ $input = bff::input(); # проверка подключения к БД if (! bff::database()->isConnected()) { bff::database()->connect(); $this->databasePingLast = time(); } do { $module = $input->get('module', TYPE_TEXT); if (empty($module)) { $this->log('Module is empty', ['function' => __METHOD__]); break; } $method = $input->get('method', TYPE_TEXT); if (empty($method)) { $this->log('Method is empty', ['function' => __METHOD__]); break; } $page = bff()->callController($module, 'websocket', [$method, $this]); if ($page === false) { $this->log('Websocket method not exist: ' . $method, ['function' => __METHOD__]); break; } $params = [ 'module' => $module, 'method' => $method, ]; if (method_exists($page, 'verify')) { $verify = $page->verify(); if ($verify === false) { $this->log('Connection not valid', ['function' => __METHOD__]); break; } $params = array_merge($params, $verify); } $this->connections[] = [ 'connection' => $connection, 'page' => $page, 'module' => $module, 'method' => $method, 'params' => $params, 'user_id' => ! empty($params['userID']) ? $params['userID'] : 0, 'last' => time(), ]; if (method_exists($page, 'onConnect')) { end($this->connections); $k = key($this->connections); $page->onConnect($this->connections[$k]['params'], $k); } return; } while (false); $connection->destroy(); } /** * Обработка события отключения страницы * @param Connection $connection */ public function onClose($connection) { foreach ($this->connections as $k => $v) { if ($v['connection'] == $connection) { if (method_exists($v['page'], 'onClose')) { $v['page']->onClose($v['params'], $k); } unset($this->connections[$k]); } } } /** * Обработка события получения данных от страницы * @param Connection $connection * @param string $data данные в формате JSON [action, ...] */ public function onMessage($connection, $data) { $data = json_decode($data, true); if (! isset($data['action'])) { $this->log('Action not found', ['function' => __METHOD__]); return; } $exist = false; $now = time(); foreach ($this->connections as $k => $v) { if ($v['connection'] == $connection) { # TODO $this->connections[$k]['last'] = $now; if ($data['action'] == 'pong') { $exist = true; continue; } if (method_exists($v['page'], 'onMessage')) { $exist = true; $this->databasePingLast = $now; $v['page']->onMessage($data, $v['params'], $k); } } } if (! $exist) { $this->log('Connection not found', ['function' => __METHOD__]); $connection->destroy(); } } /** * Отправка данных странице * @param integer $connectionID ID страницы * @param string|array $data данные */ public function send($connectionID, $data) { if (! isset($this->connections[$connectionID])) { return; } if (empty($data)) { return; } if (is_array($data)) { $data = json_encode($data); } $this->connections[$connectionID]['connection']->send($data); } /** * Разорвать соединение со страницей * @param int $connectionID ID подключения */ public function destroy($connectionID) { if (! isset($this->connections[$connectionID])) { return; } $this->connections[$connectionID]['connection']->destroy(); unset($this->connections[$connectionID]); } /** * Стартуем обработчик внутренних сообщений * @param string|bool $name адрес inner сервера или false * @return Worker */ public function startInner($name = false) { if (empty($name)) { $name = static::innerName(); } $inner = new Worker($name); $inner->onMessage = array($this, 'onInnerMessage'); $inner->listen(); return $inner; } /** * Отправка внутреннего сообщения * @param string|array $data данные для отправки [module, method, action, ...] * @param string|bool $socket адрес inner сервера */ public static function sendInner($data, $socket = false) { if (empty($socket)) { $socket = static::innerName(); } if (empty($data)) { return; } if (is_array($data)) { $data = json_encode($data); } # TODO set_error_handler(function () { return true; }); try { $instance = stream_socket_client($socket, $errno, $errstr, 30); if (! $instance) { bff::log("sendInner: $errstr ($errno)", Logger::ERROR, 'websockets.log', ['function' => __METHOD__]); return; } fwrite($instance, $data . PHP_EOL); fclose($instance); } catch (Throwable $e) { bff::log($e->getMessage(), Logger::ERROR, 'websockets.log', ['function' => __METHOD__]); } restore_error_handler(); } /** * Обработка внутренних сообщений * @param Connection $connection * @param string $data данные в формате JSON [module, method, action, ...] */ public function onInnerMessage($connection, $data) { $data = json_decode($data, true); do { if (empty($data['module'])) { $this->log('Module is empty', ['function' => __METHOD__]); break; } if (empty($data['method'])) { $this->log('Method is empty', ['function' => __METHOD__]); break; } $page = bff()->callController($data['module'], 'websocket', [$data['method'], $this]); if ($page === false) { $this->log('Page handler does not exist: ' . $data['method'], ['function' => __METHOD__]); break; } if (method_exists($page, 'onInnerMessage')) { $page->onInnerMessage($data); } } while (false); } /** * Старт таймера * @param array|callable $callback обработчик таймера * @param int $interval таймаут вызова обработчика * @param array $args * @param bool $persistent * @return int идентификатор таймера */ public function timerStart(callable $callback, $interval = 10, $args = [], $persistent = true) { $id = Timer::add($interval, $callback, $args, $persistent); $this->timers[] = $id; return $id; } /** * Обработка обслуживающего таймера */ protected function onTimer() { $now = time(); # Проверка связи с подключенными страницами foreach ($this->connections as $k => $v) { if (($now - $v['last']) < $this->pingInterval) { continue; } /* Не работает в Firefox TODO $v['last'] = $now; # отправка PING пакета $v['connection']->send(pack('H*', '890000000000'), true); */ $this->send($k, ['action' => 'ping']); } unset($v); # Отключаемся от БД если нет открытых страниц if (empty($this->connections)) { if (bff('database')->isConnected()) { bff('database')->disconnect(); } } else { # Поддерживаем длительное подключение с БД if (($now - $this->databasePingLast) > $this->databasePingInterval) { $this->databasePingLast = time(); bff('database')->one_data('SELECT COUNT(*) FROM ' . Users::TABLE_USERS); # TODO } } } /** * Подготовка HTML строки, для загрузки в DOMDocument * @param string $html * @return string */ public static function prepareHtml($html) { $html = strtr($html, [ '"' => '__quot__', # " ''' => '__apos__', # ' ' ' => '__nbsp__', # ' ' '<' => '__lt__', # < '>' => '__gt__', # > ]); $html = strtr($html, [ '&' => '&', # & ]); return $html; } /** * Подготовка HTML строки, для отправки странице, после обработки DOMDocument * @param string $html * @return string */ public static function resultHtml($html) { return strtr($html, [ '__quot__' => '"', # " '__apos__' => ''', # ' '__nbsp__' => ' ', # ' ' '__lt__' => '<', # < '__gt__' => '>', # > ]); } /** * Сохранение сообщения в лог файл * @param string|array $message * @param array $context * @param int $level */ public function log($message, $context = [], $level = Logger::ERROR) { bff::log($message, $level, 'websockets.log', $context); } }