-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.php
59 lines (48 loc) · 2.27 KB
/
consumer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
<?php
require __DIR__ . "/vendor/autoload.php";
$amqpConnection = \SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpConnectionFactory::create()
->setUsername("guest")
->setPassword("guest")
->setHost("rabbitmq")
->setVirtualHost("/");
/** @var \SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpQueue $amqpQueue */
$amqpQueue = \SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpQueueBuilder::createWithDirectExchangeBinding("test", "amqp_connection_factory", "matching", "")
->withMessageAck(true)
->withQueueDurability(true)
->withMessageConverterReferenceNames([])
->registerQueueOnInitialization(false)
->build(\SimplyCodedSoftware\IntegrationMessaging\Handler\InMemoryReferenceSearchService::createWith([
"amqp_connection_factory" => $amqpConnection
]));
$requestChannel = \SimplyCodedSoftware\IntegrationMessaging\Channel\DirectChannel::create();
$requestChannel->subscribe(new class implements \SimplyCodedSoftware\IntegrationMessaging\MessageHandler {
/**
* @inheritDoc
*/
public function handle(\SimplyCodedSoftware\IntegrationMessaging\Message $message): void
{
/** @var \SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpAcknowledgementCallback $ack */
// $ack = $message->getHeaders()->get(\SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpHeaders::ACKNOWLEDGEMENT_CALLBACK);
// $ack->accept();
echo $message->getPayload() . "\n";
}
});
$amqpInboundAdapter = \SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpInboundAdapterBuilder::createWith(
"inboundGateway",
"test",
"reply"
)->build(
\SimplyCodedSoftware\IntegrationMessaging\Config\InMemoryChannelResolver::createFromAssociativeArray([
"test" => $amqpQueue,
"reply" => $requestChannel
]),
\SimplyCodedSoftware\IntegrationMessaging\Handler\InMemoryReferenceSearchService::createEmpty()
);
//$receivedMessage = $amqpQueue->receive();
//while(!$receivedMessage) {
// $receivedMessage = $amqpQueue->receive();
//}
///** @var \SimplyCodedSoftware\IntegrationMessaging\Amqp\AcknowledgementCallback $callback */
//$callback = $receivedMessage->getHeaders()->get(\SimplyCodedSoftware\IntegrationMessaging\Amqp\AmqpHeaders::ACKNOWLEDGEMENT_CALLBACK);
//$callback->reject();
$amqpInboundAdapter->start();