29 protected function setUp()
31 $this->markTestIncomplete(
'Should be converted to queue config v2.');
34 $configPath =
__DIR__ .
'/../etc/queue.xml';
35 $fileResolverMock = $this->createMock(\
Magento\Framework\Config\FileResolverInterface::class);
36 $fileResolverMock->expects($this->any())
41 $xmlReader = $this->objectManager->create(
42 \
Magento\Framework\MessageQueue\Config\Reader\Xml::class,
43 [
'fileResolver' => $fileResolverMock]
46 $newData = $xmlReader->read();
49 $configData = $this->objectManager->get(\
Magento\Framework\MessageQueue\Config\Data::class);
53 $this->publisher = $this->objectManager->create(\
Magento\Framework\MessageQueue\PublisherInterface::class);
56 protected function tearDown()
58 $this->markTestIncomplete(
'Should be converted to queue config v2.');
59 $this->consumeMessages(
'demoConsumerQueueOne', PHP_INT_MAX);
60 $this->consumeMessages(
'demoConsumerQueueTwo', PHP_INT_MAX);
61 $this->consumeMessages(
'demoConsumerQueueThree', PHP_INT_MAX);
62 $this->consumeMessages(
'demoConsumerQueueFour', PHP_INT_MAX);
63 $this->consumeMessages(
'demoConsumerQueueFive', PHP_INT_MAX);
64 $this->consumeMessages(
'demoConsumerQueueOneWithException', PHP_INT_MAX);
66 $objectManagerConfiguration = [\Magento\Framework\MessageQueue\Config\Reader\Xml::class => [
68 'fileResolver' => [
'instance' => \Magento\Framework\Config\FileResolverInterface::class],
72 $this->objectManager->configure($objectManagerConfiguration);
74 $queueConfig = $this->objectManager->get(\
Magento\Framework\MessageQueue\Config\Data::class);
75 $queueConfig->reset();
81 public function testPublishConsumeFlow()
84 $objectFactory = $this->objectManager->create(\
Magento\MysqlMq\Model\DataObjectFactory::class);
86 $object = $objectFactory->create();
87 for (
$i = 0;
$i < 10;
$i++) {
88 $object->setName(
'Object name ' .
$i)->setEntityId(
$i);
89 $this->publisher->publish(
'demo.object.created', $object);
91 for (
$i = 0;
$i < 5;
$i++) {
92 $object->setName(
'Object name ' .
$i)->setEntityId(
$i);
93 $this->publisher->publish(
'demo.object.updated', $object);
95 for (
$i = 0;
$i < 3;
$i++) {
96 $object->setName(
'Object name ' .
$i)->setEntityId(
$i);
97 $this->publisher->publish(
'demo.object.custom.created', $object);
100 $outputPattern =
'/(Processed \d+\s)/';
102 $this->consumeMessages(
'demoConsumerQueueOne', 7, 7, $outputPattern);
104 $this->consumeMessages(
'demoConsumerQueueOne', PHP_INT_MAX, 3, $outputPattern);
105 $this->consumeMessages(
'demoConsumerQueueOne', 7, 0, $outputPattern);
108 $this->consumeMessages(
'demoConsumerQueueTwo', 20, 15, $outputPattern);
111 $this->consumeMessages(
'demoConsumerQueueFour', 11, 0, $outputPattern);
114 $this->consumeMessages(
'demoConsumerQueueThree', 20, 15, $outputPattern);
117 $this->consumeMessages(
'demoConsumerQueueFive', 20, 18, $outputPattern);
123 public function testPublishAndConsumeWithFailedJobs()
126 $objectFactory = $this->objectManager->create(\
Magento\MysqlMq\Model\DataObjectFactory::class);
129 $object = $objectFactory->create();
130 for (
$i = 0;
$i < 5;
$i++) {
131 $object->setName(
'Object name ' .
$i)->setEntityId(
$i);
132 $this->publisher->publish(
'demo.object.created', $object);
134 $outputPattern =
'/(Processed \d+\s)/';
136 $this->consumeMessages(
'demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
138 $this->consumeMessages(
'demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
141 for (
$i = 0;
$i < 5;
$i++) {
142 $object->setName(
'Object name ' .
$i)->setEntityId(
$i);
143 $this->publisher->publish(
'demo.object.created', $object);
146 for (
$i = 0;
$i < self::MAX_NUMBER_OF_TRIALS + 1;
$i++) {
147 $this->consumeMessages(
'demoConsumerQueueOneWithException', PHP_INT_MAX, 0, $outputPattern);
150 $this->consumeMessages(
'demoConsumerQueueOne', PHP_INT_MAX, 0, $outputPattern);
156 public function testPublishAndConsumeSchemaDefinedByMethod()
159 $objectFactory = $this->objectManager->create(\
Magento\MysqlMq\Model\DataObjectFactory::class);
161 $object = $objectFactory->create();
163 $object->setName(
'Object name ' .
$id)->setEntityId(
$id);
164 $requiredStringParam =
'Required value';
165 $optionalIntParam = 44;
166 $this->publisher->publish(
'test.schema.defined.by.method', [$object, $requiredStringParam, $optionalIntParam]);
167 $outputPattern =
"/Processed '{$object->getEntityId()}'; " 168 .
"Required param '{$requiredStringParam}'; Optional param '{$optionalIntParam}'/";
169 $this->consumeMessages(
'delayedOperationConsumer', PHP_INT_MAX, 1, $outputPattern);
180 protected function consumeMessages(
183 $expectedNumberOfProcessedMessages =
null,
184 $outputPattern =
null 187 $consumerFactory = $this->objectManager->create(\
Magento\Framework\MessageQueue\ConsumerFactory::class);
188 $consumer = $consumerFactory->get($consumerName);
190 $consumer->process($messagesToProcess);
191 $consumersOutput = ob_get_contents();
193 if ($outputPattern) {
195 $expectedNumberOfProcessedMessages,
196 preg_match_all($outputPattern, $consumersOutput)
const MAX_NUMBER_OF_TRIALS
defined('TESTS_BP')||define('TESTS_BP' __DIR__
static getObjectManager()