diff --git a/.gitignore b/.gitignore index dea414f..a74ad8b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ !/extensions/available/.gitkeep !/extensions/enabled/.gitkeep +/tmp/events.bin /tmp/cache/di_* /tmp/plugin_artifacts/* /tmp/cache/database/* diff --git a/chandler-example.yml b/chandler-example.yml index 428ca16..7fe8f69 100644 --- a/chandler-example.yml +++ b/chandler-example.yml @@ -33,3 +33,5 @@ chandler: pass: "word" addr: "noreply@example.com" ssl: true + + redisUrl: "tcp://10.0.0.1:6379" diff --git a/chandler/Signaling/SignalManager.php b/chandler/Signaling/SignalManager.php index b38793f..459fe47 100644 --- a/chandler/Signaling/SignalManager.php +++ b/chandler/Signaling/SignalManager.php @@ -1,12 +1,14 @@ + * @author Vladimir Barinov */ class SignalManager { @@ -43,7 +45,7 @@ private function __construct() * @return array|null Array of events if there are any, null otherwise */ private function eventFor(int $for): ?array - { + { $since = $this->since - 1; $statement = $this->connection->query("SELECT * FROM pool WHERE `for` = $for AND `since` > $since ORDER BY since DESC"); $event = $statement->fetch(\PDO::FETCH_LAZY); @@ -66,8 +68,41 @@ private function eventFor(int $for): ?array */ function listen(\Closure $callback, int $for, int $time = 25): void { + try { + $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"], ['read_write_timeout' => $time]); + + // We will catch the old message first + $oldEvent = $this->eventFor($for); + + if ($oldEvent) { + list($id, $evt) = $oldEvent; + $id = crc32((string)$id); + $callback($evt, $id); + } + + // And then we will subscribe to user's channel + $subscriber = $redisClient->pubSubLoop(); + $subscriber->subscribe('im'.$for); + + foreach($subscriber as $event) { + if ($event->kind == 'message' && $event->channel == 'im'.$for) { + list($id, $evt) = json_decode($event->payload); + $id = crc32((string)$id); + $evt = unserialize(hex2bin($evt)); + $callback($evt, $id); + } + } + + // On timeout we're returning nothing + exit("[]"); + } + catch (Exception $e) + { + error_log("Couldn't connect to Redis server, fallback to old sqlite method. Exception Message: ".$e->getMessage()); + } + $this->since = time() - 1; - for($i = 0; $i < $time; $i++) { + for($i = 0; $i < ($time / 5); $i++) { sleep(1); $event = $this->eventFor($for); @@ -131,7 +166,16 @@ function triggerEvent(object $event, int $for): bool $event = bin2hex(serialize($event)); $since = time(); + // add it to the history $this->connection->query("INSERT INTO pool VALUES (NULL, $since, $for, '$event')"); + $id = $this->connection->lastInsertId(); + + try { + $redisClient = new RedisClient(CHANDLER_ROOT_CONF["redisUrl"]); + $redisClient->publish('im'.$for, json_encode([$id, $event])); + } catch (Exception $e) { + error_log("Couldn't connect to Redis server and push the event. Exception Message: ".$e->getMessage()); + } return true; } diff --git a/composer.json b/composer.json index 7ed21c6..31abd70 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,8 @@ "symfony/yaml": "^5.3", "guzzlehttp/guzzle": "^6.0", "wildbit/postmark-php": "^4.0", - "tracy/tracy": "^2.10" + "tracy/tracy": "^2.10", + "predis/predis": "^3.2" }, "suggest": { "ext-yaml": "for faster yaml parsing"