diff --git a/src/Illuminate/Broadcasting/BroadcastManager.php b/src/Illuminate/Broadcasting/BroadcastManager.php index 7f48613a1a44..002c9fe891dc 100644 --- a/src/Illuminate/Broadcasting/BroadcastManager.php +++ b/src/Illuminate/Broadcasting/BroadcastManager.php @@ -18,6 +18,7 @@ use Illuminate\Contracts\Bus\Dispatcher as BusDispatcherContract; use Illuminate\Contracts\Cache\Repository as Cache; use Illuminate\Contracts\Foundation\CachesRoutes; +use Illuminate\Support\Queue\Concerns\ResolvesQueueRoutes; use InvalidArgumentException; use Psr\Log\LoggerInterface; use Pusher\Pusher; @@ -29,6 +30,8 @@ */ class BroadcastManager implements FactoryContract { + use ResolvesQueueRoutes; + /** * The application instance. * @@ -199,6 +202,10 @@ public function queue($event) $queue = $event->queue; } + if (is_null($queue)) { + $queue = $this->resolveQueueFromQueueRoute($event) ?? null; + } + $broadcastEvent = new BroadcastEvent(clone $event); if ($event instanceof ShouldBeUnique) { @@ -210,7 +217,11 @@ public function queue($event) } $push = fn () => $this->app->make('queue') - ->connection($event->connection ?? null) + ->connection( + $event->connection + ?? $this->resolveConnectionFromQueueRoute($event) + ?? null + ) ->pushOn($queue, $broadcastEvent); $event instanceof ShouldRescue diff --git a/src/Illuminate/Bus/Dispatcher.php b/src/Illuminate/Bus/Dispatcher.php index 891573219c5a..ff9b5325284a 100644 --- a/src/Illuminate/Bus/Dispatcher.php +++ b/src/Illuminate/Bus/Dispatcher.php @@ -12,10 +12,13 @@ use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\Jobs\SyncJob; use Illuminate\Support\Collection; +use Illuminate\Support\Queue\Concerns\ResolvesQueueRoutes; use RuntimeException; class Dispatcher implements QueueingDispatcher { + use ResolvesQueueRoutes; + /** * The container implementation. * @@ -215,7 +218,9 @@ protected function commandShouldBeQueued($command) */ public function dispatchToQueue($command) { - $connection = $command->connection ?? null; + $connection = $command->connection + ?? $this->resolveConnectionFromQueueRoute($command) + ?? null; $queue = ($this->queueResolver)($connection); @@ -239,11 +244,13 @@ public function dispatchToQueue($command) */ protected function pushCommandToQueue($queue, $command) { + $queueName = $command->queue ?? $this->resolveQueueFromQueueRoute($command) ?? null; + if (isset($command->delay)) { - return $queue->later($command->delay, $command, queue: $command->queue ?? null); + return $queue->later($command->delay, $command, queue: $queueName); } - return $queue->push($command, queue: $command->queue ?? null); + return $queue->push($command, queue: $queueName); } /** diff --git a/src/Illuminate/Events/Dispatcher.php b/src/Illuminate/Events/Dispatcher.php index fd5766f297ad..e287146616c3 100755 --- a/src/Illuminate/Events/Dispatcher.php +++ b/src/Illuminate/Events/Dispatcher.php @@ -16,6 +16,7 @@ use Illuminate\Contracts\Queue\ShouldQueueAfterCommit; use Illuminate\Support\Arr; use Illuminate\Support\Collection; +use Illuminate\Support\Queue\Concerns\ResolvesQueueRoutes; use Illuminate\Support\Str; use Illuminate\Support\Traits\Macroable; use Illuminate\Support\Traits\ReflectsClosures; @@ -25,7 +26,7 @@ class Dispatcher implements DispatcherContract { - use Macroable, ReflectsClosures; + use Macroable, ReflectsClosures, ResolvesQueueRoutes; /** * The IoC container instance. @@ -647,9 +648,13 @@ protected function queueHandler($class, $method, $arguments) { [$listener, $job] = $this->createListenerAndJob($class, $method, $arguments); - $connection = $this->resolveQueue()->connection(method_exists($listener, 'viaConnection') + $connectionName = method_exists($listener, 'viaConnection') ? (isset($arguments[0]) ? $listener->viaConnection($arguments[0]) : $listener->viaConnection()) - : $listener->connection ?? null); + : $listener->connection ?? null; + + $connection = $this->resolveQueue()->connection( + $connectionName ?? $this->resolveConnectionFromQueueRoute($listener) ?? null + ); $queue = method_exists($listener, 'viaQueue') ? (isset($arguments[0]) ? $listener->viaQueue($arguments[0]) : $listener->viaQueue()) @@ -659,6 +664,10 @@ protected function queueHandler($class, $method, $arguments) ? (isset($arguments[0]) ? $listener->withDelay($arguments[0]) : $listener->withDelay()) : $listener->delay ?? null; + if (is_null($queue)) { + $queue = $this->resolveQueueFromQueueRoute($listener) ?? null; + } + is_null($delay) ? $connection->pushOn(enum_value($queue), $job) : $connection->laterOn(enum_value($queue), $delay, $job); diff --git a/src/Illuminate/Mail/Mailable.php b/src/Illuminate/Mail/Mailable.php index 7e47f06a030d..e7233e8803b6 100644 --- a/src/Illuminate/Mail/Mailable.php +++ b/src/Illuminate/Mail/Mailable.php @@ -230,7 +230,17 @@ public function queue(Queue $queue) $connection = property_exists($this, 'connection') ? $this->connection : null; - $queueName = property_exists($this, 'queue') ? $this->queue : null; + if (is_null($connection) && method_exists($queue, 'resolveConnectionFromQueueRoute')) { + $connection = $queue->resolveConnectionFromQueueRoute($this); + } + + $queueName = property_exists($this, 'queue') + ? $this->queue + : null; + + if (is_null($queueName) && method_exists($queue, 'resolveQueueFromQueueRoute')) { + $queueName = $queue->resolveQueueFromQueueRoute($this); + } return $queue->connection($connection)->pushOn( $queueName ?: null, $this->newQueuedJob() @@ -248,7 +258,17 @@ public function later($delay, Queue $queue) { $connection = property_exists($this, 'connection') ? $this->connection : null; - $queueName = property_exists($this, 'queue') ? $this->queue : null; + $queueName = property_exists($this, 'queue') + ? $this->queue + : null; + + if (is_null($connection) && method_exists($queue, 'resolveConnectionFromQueueRoute')) { + $connection = $queue->resolveConnectionFromQueueRoute($this); + } + + if (is_null($queueName) && method_exists($queue, 'resolveQueueFromQueueRoute')) { + $queueName = $queue->resolveQueueFromQueueRoute($this); + } return $queue->connection($connection)->laterOn( $queueName ?: null, $delay, $this->newQueuedJob() diff --git a/src/Illuminate/Notifications/ChannelManager.php b/src/Illuminate/Notifications/ChannelManager.php index 85bb615bd06c..a7b76583683f 100644 --- a/src/Illuminate/Notifications/ChannelManager.php +++ b/src/Illuminate/Notifications/ChannelManager.php @@ -7,10 +7,13 @@ use Illuminate\Contracts\Notifications\Dispatcher as DispatcherContract; use Illuminate\Contracts\Notifications\Factory as FactoryContract; use Illuminate\Support\Manager; +use Illuminate\Support\Queue\Concerns\ResolvesQueueRoutes; use InvalidArgumentException; class ChannelManager extends Manager implements DispatcherContract, FactoryContract { + use ResolvesQueueRoutes; + /** * The default channel used to deliver messages. * diff --git a/src/Illuminate/Notifications/NotificationSender.php b/src/Illuminate/Notifications/NotificationSender.php index 4309d4992db0..d7ed0b1159fa 100644 --- a/src/Illuminate/Notifications/NotificationSender.php +++ b/src/Illuminate/Notifications/NotificationSender.php @@ -229,13 +229,17 @@ protected function queueNotification($notifiables, $notification) $notification->locale = $this->locale; } - $connection = $notification->connection; + $connection = $notification->connection + ?? $this->manager->resolveConnectionFromQueueRoute($notification) + ?? null; if (method_exists($notification, 'viaConnections')) { $connection = $notification->viaConnections()[$channel] ?? $connection; } - $queue = $notification->queue; + $queue = $notification->queue + ?? $this->manager->resolveQueueFromQueueRoute($notification) + ?? null; if (method_exists($notification, 'viaQueues')) { $queue = $notification->viaQueues()[$channel] ?? $queue; diff --git a/src/Illuminate/Queue/QueueManager.php b/src/Illuminate/Queue/QueueManager.php index 9ff422f13f61..2c2ef67e3a11 100755 --- a/src/Illuminate/Queue/QueueManager.php +++ b/src/Illuminate/Queue/QueueManager.php @@ -5,6 +5,7 @@ use Closure; use Illuminate\Contracts\Queue\Factory as FactoryContract; use Illuminate\Contracts\Queue\Monitor as MonitorContract; +use Illuminate\Support\Queue\Concerns\ResolvesQueueRoutes; use InvalidArgumentException; /** @@ -12,6 +13,8 @@ */ class QueueManager implements FactoryContract, MonitorContract { + use ResolvesQueueRoutes; + /** * The application instance. * @@ -120,6 +123,19 @@ public function stopping($callback) $this->app['events']->listen(Events\WorkerStopping::class, $callback); } + /** + * Set the queue route for the given class. + * + * @param array|class-string $class + * @param string|null $queue + * @param string|null $connection + * @return void + */ + public function route(array|string $class, $queue = null, $connection = null) + { + $this->queueRoutes()->set($class, $queue, $connection); + } + /** * Determine if the driver is connected. * diff --git a/src/Illuminate/Queue/QueueRoutes.php b/src/Illuminate/Queue/QueueRoutes.php new file mode 100644 index 000000000000..1ec30328c547 --- /dev/null +++ b/src/Illuminate/Queue/QueueRoutes.php @@ -0,0 +1,106 @@ + + */ + protected $routes = []; + + /** + * Get the queue connection that a given queueable instance should be routed to. + * + * @param object $queueable + * @return string|null + */ + public function getConnection($queueable) + { + $route = $this->getRoute($queueable); + + if (is_null($route)) { + return; + } + + return is_string($route) + ? $route + : $route[0]; + } + + /** + * Get the queue that a given queueable instance should be routed to. + * + * @param object $queueable + * @return string|null + */ + public function getQueue($queueable) + { + $route = $this->getRoute($queueable); + + if (is_null($route)) { + return; + } + + return is_string($route) + ? $route + : $route[1]; + } + + /** + * Get the queue that a given queueable instance should be routed to. + * + * @param object $queueable + * @return string|null + */ + public function getRoute($queueable) + { + if (empty($this->routes)) { + return null; + } + + $classes = array_merge( + [get_class($queueable)], + class_parents($queueable) ?: [], + class_implements($queueable) ?: [], + class_uses_recursive($queueable) + ); + + foreach ($classes as $class) { + if (isset($this->routes[$class])) { + return $this->routes[$class]; + } + } + + return null; + } + + /** + * Register the queue route for the given class. + * + * @param array|class-string $class + * @param string|null $queue + * @param string|null $connection + * @return void + */ + public function set(array|string $class, $queue = null, $connection = null) + { + $routes = is_array($class) ? $class : [$class => [$connection, $queue]]; + + foreach ($routes as $from => $to) { + $this->routes[$from] = $to; + } + } + + /** + * Get all registered queue routes. + * + * @return array + */ + public function all() + { + return $this->routes; + } +} diff --git a/src/Illuminate/Queue/QueueServiceProvider.php b/src/Illuminate/Queue/QueueServiceProvider.php index 898ef7242f3f..b820b07c0b30 100755 --- a/src/Illuminate/Queue/QueueServiceProvider.php +++ b/src/Illuminate/Queue/QueueServiceProvider.php @@ -42,6 +42,7 @@ public function register() $this->registerConnection(); $this->registerWorker(); $this->registerListener(); + $this->registerRoutes(); $this->registerFailedJobServices(); } @@ -288,6 +289,18 @@ protected function registerListener() }); } + /** + * Register the default queue routes binding. + * + * @return void + */ + protected function registerRoutes() + { + $this->app->singleton('queue.routes', function () { + return new QueueRoutes; + }); + } + /** * Register the failed job services. * @@ -388,6 +401,7 @@ public function provides() 'queue.connection', 'queue.failer', 'queue.listener', + 'queue.routes', 'queue.worker', ]; } diff --git a/src/Illuminate/Support/Queue/Concerns/ResolvesQueueRoutes.php b/src/Illuminate/Support/Queue/Concerns/ResolvesQueueRoutes.php new file mode 100644 index 000000000000..641f90dfc95a --- /dev/null +++ b/src/Illuminate/Support/Queue/Concerns/ResolvesQueueRoutes.php @@ -0,0 +1,45 @@ +queueRoutes()->getConnection($queueable); + } + + /** + * Resolve the default queue name for a given queueable instance. + * + * @param object $queueable + * @return string|null + */ + public function resolveQueueFromQueueRoute($queueable) + { + return $this->queueRoutes()->getQueue($queueable); + } + + /** + * Get the queue routes manager instance. + * + * @return \Illuminate\Queue\QueueRoutes + */ + protected function queueRoutes() + { + $container = Container::getInstance(); + + return $container->bound('queue.routes') + ? $container->make('queue.routes') + : new QueueRoutes; + } +} diff --git a/tests/Bus/BusDispatcherTest.php b/tests/Bus/BusDispatcherTest.php index d68b159f73c6..83a65ff13212 100644 --- a/tests/Bus/BusDispatcherTest.php +++ b/tests/Bus/BusDispatcherTest.php @@ -23,6 +23,10 @@ protected function tearDown(): void public function testCommandsThatShouldQueueIsQueued() { $container = new Container; + $container->instance('queue.routes', $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + Container::setInstance($container); $dispatcher = new Dispatcher($container, function () { $mock = m::mock(Queue::class); $mock->shouldReceive('push')->once(); @@ -31,11 +35,17 @@ public function testCommandsThatShouldQueueIsQueued() }); $dispatcher->dispatch(m::mock(ShouldQueue::class)); + + Container::setInstance(null); } public function testCommandsThatShouldQueueIsQueuedUsingCustomHandler() { $container = new Container; + $container->instance('queue.routes', $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + Container::setInstance($container); $dispatcher = new Dispatcher($container, function () { $mock = m::mock(Queue::class); $mock->shouldReceive('push')->once(); @@ -44,11 +54,17 @@ public function testCommandsThatShouldQueueIsQueuedUsingCustomHandler() }); $dispatcher->dispatch(new BusDispatcherTestCustomQueueCommand); + + Container::setInstance(null); } public function testCommandsThatShouldQueueIsQueuedUsingCustomQueueAndDelay() { $container = new Container; + $container->instance('queue.routes', $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + Container::setInstance($container); $dispatcher = new Dispatcher($container, function () { $mock = m::mock(Queue::class); $mock->shouldReceive('later')->once()->with(10, m::type(BusDispatcherTestSpecificQueueAndDelayCommand::class), '', 'foo'); @@ -57,6 +73,27 @@ public function testCommandsThatShouldQueueIsQueuedUsingCustomQueueAndDelay() }); $dispatcher->dispatch(new BusDispatcherTestSpecificQueueAndDelayCommand); + + Container::setInstance(null); + } + + public function testCommandsAreDispatchedWithQueueRoute() + { + Container::setInstance($container = new Container); + $container->instance('queue.routes', $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn('high-priority'); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + + $mock = m::mock(Queue::class); + $mock->shouldReceive('push')->once()->with(BusDispatcherQueueable::class, '', 'high-priority'); + + $dispatcher = new Dispatcher($container, function () use ($mock) { + return $mock; + }); + + $dispatcher->dispatch(new BusDispatcherQueueable); + + Container::setInstance(null); } public function testDispatchNowShouldNeverQueue() @@ -88,7 +125,7 @@ public function testDispatcherCanDispatchStandAloneHandler() public function testOnConnectionOnJobWhenDispatching() { - $container = new Container; + Container::setInstance($container = new Container); $container->singleton('config', function () { return new Config([ 'queue' => [ @@ -99,6 +136,10 @@ public function testOnConnectionOnJobWhenDispatching() ], ]); }); + $container->instance('queue.routes', $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + Container::setInstance($container); $dispatcher = new Dispatcher($container, function () { $mock = m::mock(Queue::class); @@ -110,6 +151,8 @@ public function testOnConnectionOnJobWhenDispatching() $job = (new ShouldNotBeDispatched)->onConnection('null'); $dispatcher->dispatch($job); + + Container::setInstance(null); } } @@ -147,6 +190,11 @@ class BusDispatcherTestSpecificQueueAndDelayCommand implements ShouldQueue public $delay = 10; } +class BusDispatcherQueueable implements ShouldQueue +{ + use Queueable; +} + class StandAloneCommand { // diff --git a/tests/Events/QueuedEventsTest.php b/tests/Events/QueuedEventsTest.php index 6add993c079d..9f023b1bc1d1 100644 --- a/tests/Events/QueuedEventsTest.php +++ b/tests/Events/QueuedEventsTest.php @@ -8,6 +8,7 @@ use Illuminate\Events\CallQueuedListener; use Illuminate\Events\Dispatcher; use Illuminate\Queue\QueueManager; +use Illuminate\Queue\QueueRoutes; use Illuminate\Support\Testing\Fakes\QueueFake; use Laravel\SerializableClosure\SerializableClosure; use Mockery as m; @@ -145,6 +146,31 @@ public function testQueueIsSetByGetConnectionDynamically() ]); } + public function testQueueIsSetUsingQueueRoutes() + { + $container = new Container; + $d = new Dispatcher($container); + + $queueRoutes = new QueueRoutes; + $queueRoutes->set(TestDispatcherQueueRoutes::class, 'event-queue', 'event-connection'); + $container->instance('queue.routes', $queueRoutes); + + $fakeQueue = new QueueFake($container); + + Container::setInstance($container); + + $d->setQueueResolver(function () use ($fakeQueue) { + return $fakeQueue; + }); + + $d->listen('some.event', TestDispatcherQueueRoutes::class.'@handle'); + $d->dispatch('some.event', ['foo', 'bar']); + + $fakeQueue->connection('event-connection')->assertPushedOn('event-queue', CallQueuedListener::class); + + Container::setInstance(null); + } + public function testDelayIsSetByWithDelayDynamically() { $d = new Dispatcher; @@ -553,3 +579,11 @@ public function viaQueue() return TestQueueType::EnumeratedQueue; } } + +class TestDispatcherQueueRoutes implements ShouldQueue +{ + public function handle() + { + // + } +} diff --git a/tests/Integration/Broadcasting/BroadcastManagerTest.php b/tests/Integration/Broadcasting/BroadcastManagerTest.php index 44e3dcec2b33..47f475b37fc6 100644 --- a/tests/Integration/Broadcasting/BroadcastManagerTest.php +++ b/tests/Integration/Broadcasting/BroadcastManagerTest.php @@ -43,6 +43,18 @@ public function testEventsCanBeBroadcast() Queue::assertPushed(BroadcastEvent::class); } + public function testEventsCanBeBroadcastUsingQueueRoutes() + { + Bus::fake(); + Queue::fake(); + + Queue::route(TestEvent::class, 'broadcast-queue', 'broadcast-connection'); + + Broadcast::queue(new TestEvent); + Bus::assertNotDispatched(BroadcastEvent::class); + Queue::connection('broadcast-connection')->assertPushedOn('broadcast-queue', BroadcastEvent::class); + } + public function testEventsCanBeRescued() { Bus::fake(); diff --git a/tests/Integration/Console/JobSchedulingTest.php b/tests/Integration/Console/JobSchedulingTest.php index 5d9dab40c6a5..055c7237240a 100644 --- a/tests/Integration/Console/JobSchedulingTest.php +++ b/tests/Integration/Console/JobSchedulingTest.php @@ -80,6 +80,32 @@ public function testJobQueuingRespectsJobConnection(): void return $job->connection === 'bar'; })->count()); } + + public function testJobQueuingRespectsQueueRoutes(): void + { + Queue::fake(); + + Queue::route(JobWithDefaultQueue::class, 'default-queue'); + Queue::route(JobWithoutDefaultQueue::class, 'fallback-queue'); + Queue::route(JobWithoutDefaultConnection::class, 'some-queue', 'some-connection'); + + /** @var \Illuminate\Console\Scheduling\Schedule $scheduler */ + $scheduler = $this->app->make(Schedule::class); + + $scheduler->job(JobWithDefaultQueue::class)->name('')->everyMinute(); + $scheduler->job(JobWithoutDefaultQueue::class)->name('')->everyMinute(); + $scheduler->job(JobWithoutDefaultConnection::class)->name('')->everyMinute(); + + $events = $scheduler->events(); + foreach ($events as $event) { + $event->run($this->app); + } + + // Own queue takes precedence over default + Queue::assertPushedOn('test-queue', JobWithDefaultQueue::class); + Queue::assertPushedOn('fallback-queue', JobWithoutDefaultQueue::class); + Queue::connection('some-queue')->assertPushedOn('some-queue', JobWithoutDefaultConnection::class); + } } class JobWithDefaultQueue implements ShouldQueue diff --git a/tests/Integration/Mail/SendingQueuedMailTest.php b/tests/Integration/Mail/SendingQueuedMailTest.php index e20a44659cbb..3804d7a10fa0 100644 --- a/tests/Integration/Mail/SendingQueuedMailTest.php +++ b/tests/Integration/Mail/SendingQueuedMailTest.php @@ -28,6 +28,17 @@ public function testMailIsSentWithDefaultLocale() return $job->middleware[0] instanceof RateLimited; }); } + + public function testMailIsSentWhenRoutingQueue() + { + Queue::fake(); + + Queue::route(Mailable::class, 'mail-queue', 'mail-connection'); + + Mail::to('test@mail.com')->queue(new SendingQueuedMailTestMail); + + Queue::connection('mail-connection')->assertPushedOn('mail-queue', SendQueuedMailable::class); + } } class SendingQueuedMailTestMail extends Mailable diff --git a/tests/Notifications/NotificationChannelManagerTest.php b/tests/Notifications/NotificationChannelManagerTest.php index 54cbcc8cffd6..c26c71ada1ac 100644 --- a/tests/Notifications/NotificationChannelManagerTest.php +++ b/tests/Notifications/NotificationChannelManagerTest.php @@ -16,6 +16,7 @@ use Illuminate\Notifications\Notification; use Illuminate\Notifications\SendQueuedNotifications; use Illuminate\Queue\InteractsWithQueue; +use Illuminate\Queue\QueueRoutes; use Illuminate\Queue\SerializesModels; use Illuminate\Support\Collection; use Laravel\SerializableClosure\SerializableClosure; @@ -155,6 +156,10 @@ public function testNotificationCanBeQueued() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance(QueueRoutes::class, $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + $container->instance('queue.routes', $queueRoutes); $bus->shouldReceive('dispatch')->with(m::type(SendQueuedNotifications::class)); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); @@ -169,6 +174,10 @@ public function testSendQueuedNotificationsCanBeOverrideViaContainer() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance(QueueRoutes::class, $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + $container->instance('queue.routes', $queueRoutes); $bus->shouldReceive('dispatch')->with(m::type(TestSendQueuedNotifications::class)); $container->bind(SendQueuedNotifications::class, TestSendQueuedNotifications::class); Container::setInstance($container); @@ -189,6 +198,10 @@ public function testQueuedNotificationForwardsMessageGroupFromMethodToQueueJob() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance(QueueRoutes::class, $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + $container->instance('queue.routes', $queueRoutes); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) use ($mockedMessageGroupId) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertEquals($mockedMessageGroupId, $job->messageGroup); @@ -215,6 +228,10 @@ public function testQueuedNotificationForwardsMessageGroupFromPropertyOverriding $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance(QueueRoutes::class, $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + $container->instance('queue.routes', $queueRoutes); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) use ($mockedMessageGroupId) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertEquals($mockedMessageGroupId, $job->messageGroup); @@ -239,6 +256,10 @@ public function testQueuedNotificationForwardsMessageGroupSetToQueueJob() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance(QueueRoutes::class, $queueRoutes = m::mock()); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); + $container->instance('queue.routes', $queueRoutes); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) use ($mockedMessageGroupSet) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertEquals($mockedMessageGroupSet[$job->channels[0]], $job->messageGroup); @@ -264,12 +285,15 @@ public function testQueuedNotificationForwardsMessageGroupSetFromClassToQueueJob $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance('queue.routes', $queueRoutes = m::mock()); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) use ($mockedMessageGroupSet) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertEquals($mockedMessageGroupSet[$job->channels[0]], $job->messageGroup); return true; }); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); $events->shouldReceive('listen')->once(); @@ -286,6 +310,7 @@ public function testQueuedNotificationForwardsDeduplicatorToQueueJob() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance('queue.routes', $queueRoutes = m::mock()); $bus->shouldReceive('dispatch')->once()->withArgs(function ($job) use ($mockedDeduplicator) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertInstanceOf(SerializableClosure::class, $job->deduplicator); @@ -293,6 +318,8 @@ public function testQueuedNotificationForwardsDeduplicatorToQueueJob() return true; }); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); $events->shouldReceive('listen')->once(); @@ -312,6 +339,7 @@ public function testQueuedNotificationForwardsDeduplicatorSetToQueueJob() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance('queue.routes', $queueRoutes = m::mock()); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) use ($mockedDeduplicatorSet) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertInstanceOf(SerializableClosure::class, $job->deduplicator); @@ -319,6 +347,8 @@ public function testQueuedNotificationForwardsDeduplicatorSetToQueueJob() return true; }); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); $events->shouldReceive('listen')->once(); @@ -333,12 +363,15 @@ public function testQueuedNotificationForwardsDeduplicatorSetFromClassToQueueJob $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance('queue.routes', $queueRoutes = m::mock()); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertEquals($job->notification->deduplicatorResults[$job->channels[0]], call_user_func($job->deduplicator, '', null)); return true; }); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); $events->shouldReceive('listen')->once(); @@ -353,6 +386,7 @@ public function testQueuedNotificationForwardsDeduplicationIdMethodToQueueJob() $container->instance('config', ['app.name' => 'Name', 'app.logo' => 'Logo']); $container->instance(Dispatcher::class, $events = m::mock()); $container->instance(Bus::class, $bus = m::mock()); + $container->instance('queue.routes', $queueRoutes = m::mock()); $bus->shouldReceive('dispatch')->twice()->withArgs(function ($job) { $this->assertInstanceOf(SendQueuedNotifications::class, $job); $this->assertInstanceOf(SerializableClosure::class, $job->deduplicator); @@ -360,6 +394,8 @@ public function testQueuedNotificationForwardsDeduplicationIdMethodToQueueJob() return true; }); + $queueRoutes->shouldReceive('getQueue')->andReturn(null); + $queueRoutes->shouldReceive('getConnection')->andReturn(null); Container::setInstance($container); $manager = m::mock(ChannelManager::class.'[driver]', [$container]); $events->shouldReceive('listen')->once(); diff --git a/tests/Notifications/NotificationSenderTest.php b/tests/Notifications/NotificationSenderTest.php index 966c0e6a29ea..e55a2ab536c3 100644 --- a/tests/Notifications/NotificationSenderTest.php +++ b/tests/Notifications/NotificationSenderTest.php @@ -33,6 +33,8 @@ public function testItCanSendQueuedNotificationsWithAStringVia() $notifiable = m::mock(Notifiable::class); $manager = m::mock(ChannelManager::class); $manager->shouldReceive('getContainer')->andReturn(app()); + $manager->shouldReceive('resolveQueueFromQueueRoute')->andReturn(null); + $manager->shouldReceive('resolveConnectionFromQueueRoute')->andReturn(null); $bus = m::mock(BusDispatcher::class); $bus->shouldReceive('dispatch'); $events = m::mock(EventDispatcher::class); @@ -109,6 +111,8 @@ public function testItCanSendQueuedNotificationsThroughMiddleware() $events = m::mock(EventDispatcher::class); $events->shouldReceive('listen')->once(); $manager->shouldReceive('getContainer')->andReturn(app()); + $manager->shouldReceive('resolveQueueFromQueueRoute')->andReturn(null); + $manager->shouldReceive('resolveConnectionFromQueueRoute')->andReturn(null); $sender = new NotificationSender($manager, $bus, $events); @@ -120,6 +124,8 @@ public function testItCanSendQueuedMultiChannelNotificationsThroughDifferentMidd $notifiable = m::mock(Notifiable::class); $manager = m::mock(ChannelManager::class); $manager->shouldReceive('getContainer')->andReturn(app()); + $manager->shouldReceive('resolveQueueFromQueueRoute')->andReturn(null); + $manager->shouldReceive('resolveConnectionFromQueueRoute')->andReturn(null); $bus = m::mock(BusDispatcher::class); $bus->shouldReceive('dispatch') ->once() @@ -194,6 +200,29 @@ public function testItCanSendQueuedWithViaQueuesNotifications() $sender->send($notifiable, new DummyNotificationWithViaQueues); } + public function testItCanSendQueuedNotificationsWithQueueRoute() + { + $notifiable = new AnonymousNotifiable; + $manager = m::mock(ChannelManager::class); + $manager->shouldReceive('getContainer')->andReturn(app()); + $manager->shouldReceive('resolveQueueFromQueueRoute')->andReturn('notification-queue'); + $manager->shouldReceive('resolveConnectionFromQueueRoute')->andReturn('notification-connection'); + + $bus = m::mock(BusDispatcher::class); + $bus->shouldReceive('dispatch') + ->once() + ->withArgs(function ($job) { + return $job->queue === 'notification-queue' && $job->channels === ['mail'] && $job->connection === 'notification-connection'; + }); + + $events = m::mock(EventDispatcher::class); + $events->shouldReceive('listen')->once(); + + $sender = new NotificationSender($manager, $bus, $events); + + $sender->send($notifiable, new DummyQueuedNotificationWithStringVia); + } + public function testNotificationFailedSentWithoutHttpTransportException() { $this->expectException(TransportException::class); diff --git a/tests/Queue/QueueRoutesTest.php b/tests/Queue/QueueRoutesTest.php new file mode 100644 index 000000000000..46de4cdf8f61 --- /dev/null +++ b/tests/Queue/QueueRoutesTest.php @@ -0,0 +1,97 @@ +set(QueueRoutes::class, 'some-queue'); + $defaults->set(BaseNotification::class, 'some-queue', 'some-connection'); + + $this->assertSame([ + QueueRoutes::class => [null, 'some-queue'], + BaseNotification::class => ['some-connection', 'some-queue'], + ], $defaults->all()); + + // Ensure same class overrides + $defaults->set([ + QueueRoutes::class => 'queue-many', + SomeJob::class => 'important', + ]); + + $this->assertSame([ + QueueRoutes::class => 'queue-many', + BaseNotification::class => ['some-connection', 'some-queue'], + SomeJob::class => 'important', + ], $defaults->all() + ); + } + + public function testGetQueue() + { + $defaults = new QueueRoutes(); + + $defaults->set([ + BaseNotification::class => 'notifications', + CustomTrait::class => 'jobs', + PaymentContract::class => 'payments', + ]); + + // No queue set + $defaults->set(PaymentContract::class, connection: 'payment-connection'); + + $this->assertSame('notifications', $defaults->getQueue(new FinanceNotification)); + $this->assertSame('jobs', $defaults->getQueue(new SomeJob)); + $this->assertNull($defaults->getQueue(new Payment)); + } + + public function testGetConnection() + { + $defaults = new QueueRoutes(); + + $defaults->set([ + BaseNotification::class => ['notification-connection', 'notifications'], + CustomTrait::class => ['job-connection', 'jobs'], + ]); + + // No connection set + $defaults->set(PaymentContract::class, 'payments'); + + $this->assertSame('notification-connection', $defaults->getConnection(new FinanceNotification)); + $this->assertSame('job-connection', $defaults->getConnection(new SomeJob)); + $this->assertNull($defaults->getConnection(new Payment)); + } +} + +trait CustomTrait +{ +} + +class SomeJob +{ + use Queueable, CustomTrait; +} + +class BaseNotification +{ + use Queueable; +} + +class FinanceNotification extends BaseNotification +{ +} + +interface PaymentContract +{ +} + +class Payment implements PaymentContract +{ +} diff --git a/tests/Queue/QueueSqsQueueTest.php b/tests/Queue/QueueSqsQueueTest.php index af5e28abf20c..70dc7927bbbd 100755 --- a/tests/Queue/QueueSqsQueueTest.php +++ b/tests/Queue/QueueSqsQueueTest.php @@ -8,6 +8,7 @@ use Illuminate\Container\Container; use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract; use Illuminate\Queue\Jobs\SqsJob; +use Illuminate\Queue\QueueRoutes; use Illuminate\Queue\SqsQueue; use Illuminate\Support\Carbon; use Illuminate\Support\Str; @@ -100,6 +101,20 @@ protected function setUp(): void ]); } + protected function createSpyContainer() + { + $container = m::spy(Container::class); + + $container->shouldReceive('bound') + ->with('queue.routes') + ->andReturn(true); + $container->shouldReceive('offsetGet') + ->with('queue.routes') + ->andReturn(new QueueRoutes()); + + return $container; + } + public function testPopProperlyPopsJobOffOfSqs() { $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); @@ -273,7 +288,7 @@ public function testPendingDispatchProperlyPushesJobObjectOntoSqs() $pendingDispatch = FakeSqsJob::dispatch(); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->queueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->queueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel); @@ -306,7 +321,7 @@ public function testPendingDispatchProperlyPushesJobObjectOntoSqsFairQueue() $pendingDispatch = FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->queueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->queueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'MessageGroupId' => $this->mockedMessageGroupId])->andReturn($this->mockedSendMessageResponseModel); @@ -471,7 +486,7 @@ public function testPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueue() $pendingDispatch = FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->fifoQueueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with([ @@ -499,7 +514,7 @@ public function testPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueueWithDe $pendingDispatch = FakeSqsJobWithDeduplication::dispatch()->onGroup($this->mockedMessageGroupId); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->fifoQueueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with([ @@ -536,7 +551,7 @@ public function testPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueueWithDe }); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->fifoQueueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with([ @@ -567,7 +582,7 @@ public function testJobObjectCanBeSerializedOntoSqsFifoQueueWithDeduplicator() }); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl); $this->sqs->shouldReceive('sendMessage')->once()->withArgs(function ($args) { $this->assertIsArray($args); @@ -646,7 +661,7 @@ public function testDelayedPendingDispatchProperlyPushesJobObjectOntoSqsFifoQueu $pendingDispatch = FakeSqsJob::dispatch()->onGroup($this->mockedMessageGroupId)->delay($this->mockedDelay); $queue = $this->getMockBuilder(SqsQueue::class)->onlyMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->fifoQueueName, $this->account])->getMock(); - $queue->setContainer($container = m::spy(Container::class)); + $queue->setContainer($container = $this->createSpyContainer()); $queue->expects($this->once())->method('createPayload')->with($pendingDispatch->getJob(), $this->fifoQueueName, '')->willReturn($this->mockedPayload); $queue->expects($this->once())->method('getQueue')->with(null)->willReturn($this->fifoQueueUrl); $this->sqs->shouldReceive('sendMessage')->once()->with([