7 declare(strict_types=1);
12 use Psr\Log\LoggerInterface;
44 private $configuration;
49 private $messageController;
59 private $operationProcessor;
76 OperationProcessorFactory $operationProcessorFactory,
79 $this->invoker = $invoker;
81 $this->messageController = $messageController;
83 $this->operationProcessor = $operationProcessorFactory->create([
92 public function process($maxNumberOfMessages =
null)
94 $queue = $this->configuration->getQueue();
96 if (!isset($maxNumberOfMessages)) {
97 $queue->subscribe($this->getTransactionCallback(
$queue));
99 $this->invoker->invoke(
$queue, $maxNumberOfMessages, $this->getTransactionCallback(
$queue));
115 $topicName =
$message->getProperties()[
'topic_name'];
116 $lock = $this->messageController->lock(
$message, $this->configuration->getConsumerName());
118 $allowedTopics = $this->configuration->getTopicNames();
119 if (in_array($topicName, $allowedTopics)) {
120 $this->operationProcessor->process(
$message->getBody());
126 }
catch (MessageLockException $exception) {
128 }
catch (ConnectionLostException $e) {
130 $this->resource->getConnection()
131 ->delete($this->resource->getTableName(
'queue_lock'), [
'id = ?' => $lock->getId()]);
133 }
catch (NotFoundException $e) {
135 $this->logger->warning($e->getMessage());
136 }
catch (\Exception $e) {
139 $this->resource->getConnection()
140 ->delete($this->resource->getTableName(
'queue_lock'), [
'id = ?' => $lock->getId()]);
process($maxNumberOfMessages=null)
__construct(CallbackInvoker $invoker, ResourceConnection $resource, MessageController $messageController, ConsumerConfigurationInterface $configuration, OperationProcessorFactory $operationProcessorFactory, LoggerInterface $logger)