|
2 | 2 |
|
3 | 3 | namespace Aloware\FairQueue;
|
4 | 4 |
|
| 5 | +use Aloware\FairQueue\Facades\FairQueue; |
5 | 6 | use Aloware\FairQueue\Interfaces\RepositoryInterface;
|
| 7 | +use Aloware\FairQueue\Repositories\RedisKeys; |
6 | 8 | use Illuminate\Bus\Queueable;
|
7 | 9 | use Illuminate\Contracts\Queue\ShouldQueue;
|
8 | 10 | use Illuminate\Foundation\Bus\Dispatchable;
|
|
11 | 13 |
|
12 | 14 | class FairSignalJob implements ShouldQueue
|
13 | 15 | {
|
14 |
| - use Dispatchable, InteractsWithQueue, Queueable; |
| 16 | + use Dispatchable, InteractsWithQueue, Queueable, RedisKeys; |
15 | 17 |
|
16 | 18 | public $partition;
|
17 | 19 |
|
@@ -63,6 +65,10 @@ public function handle()
|
63 | 65 | }
|
64 | 66 |
|
65 | 67 | $job->handle();
|
| 68 | + |
| 69 | + // Update Fair Queue Stats |
| 70 | + $this->updateStats($job->uuid); |
| 71 | + |
66 | 72 | } catch (\Throwable $e) {
|
67 | 73 | printf('[%s] %s' . PHP_EOL, get_class($job), $e->getMessage());
|
68 | 74 |
|
@@ -171,4 +177,18 @@ private function selectPartition($repository, $partitionsMethod = 'partitions')
|
171 | 177 |
|
172 | 178 | return $partitions[$partitionIndex];
|
173 | 179 | }
|
| 180 | + |
| 181 | + public function updateStats($uuid) |
| 182 | + { |
| 183 | + $redis = FairQueue::getConnection(); |
| 184 | + $queue = $this->queue; |
| 185 | + $partition = $this->partition; |
| 186 | + |
| 187 | + $past_minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 1); |
| 188 | + $past_20minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 20); |
| 189 | + $past_60minute_key = $this->partitionProcessedJobsInPastMinutesKey($queue, $partition, 60); |
| 190 | + $redis->zadd($past_minute_key, now()->getPreciseTimestamp(3), $uuid); |
| 191 | + $redis->zadd($past_20minute_key, now()->getPreciseTimestamp(3), $uuid); |
| 192 | + $redis->zadd($past_60minute_key, now()->getPreciseTimestamp(3), $uuid); |
| 193 | + } |
174 | 194 | }
|
0 commit comments