Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
ConsumersRunner.php
Go to the documentation of this file.
1 <?php
7 
12 use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
14 use Psr\Log\LoggerInterface;
15 use Symfony\Component\Process\PhpExecutableFinder;
17 
22 {
26  const PID_FILE_EXT = '.pid';
27 
33  private $shellBackground;
34 
40  private $consumerConfig;
41 
47  private $deploymentConfig;
48 
54  private $phpExecutableFinder;
55 
61  private $pidConsumerManager;
62 
66  private $mqConnectionTypeResolver;
67 
71  private $logger;
72 
83  public function __construct(
84  PhpExecutableFinder $phpExecutableFinder,
85  ConsumerConfigInterface $consumerConfig,
86  DeploymentConfig $deploymentConfig,
87  ShellInterface $shellBackground,
88  PidConsumerManager $pidConsumerManager,
89  ConnectionTypeResolver $mqConnectionTypeResolver = null,
90  LoggerInterface $logger = null
91  ) {
92  $this->phpExecutableFinder = $phpExecutableFinder;
93  $this->consumerConfig = $consumerConfig;
94  $this->deploymentConfig = $deploymentConfig;
95  $this->shellBackground = $shellBackground;
96  $this->pidConsumerManager = $pidConsumerManager;
97  $this->mqConnectionTypeResolver = $mqConnectionTypeResolver
98  ?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
99  $this->logger = $logger
100  ?: ObjectManager::getInstance()->get(LoggerInterface::class);
101  }
102 
106  public function run()
107  {
108  $runByCron = $this->deploymentConfig->get('cron_consumers_runner/cron_run', true);
109 
110  if (!$runByCron) {
111  return;
112  }
113 
114  $maxMessages = (int) $this->deploymentConfig->get('cron_consumers_runner/max_messages', 10000);
115  $allowedConsumers = $this->deploymentConfig->get('cron_consumers_runner/consumers', []);
116  $php = $this->phpExecutableFinder->find() ?: 'php';
117 
118  foreach ($this->consumerConfig->getConsumers() as $consumer) {
119  if (!$this->canBeRun($consumer, $allowedConsumers)) {
120  continue;
121  }
122 
123  $consumerName = $consumer->getName();
124 
125  $arguments = [
126  $consumerName,
127  '--pid-file-path=' . $this->getPidFilePath($consumerName),
128  ];
129 
130  if ($maxMessages) {
131  $arguments[] = '--max-messages=' . $maxMessages;
132  }
133 
134  $command = $php . ' ' . BP . '/bin/magento queue:consumers:start %s %s'
135  . ($maxMessages ? ' %s' : '');
136 
137  $this->shellBackground->execute($command, $arguments);
138  }
139  }
140 
150  private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $allowedConsumers = []): bool
151  {
152  $consumerName = $consumerConfig->getName();
153  if (!empty($allowedConsumers) && !in_array($consumerName, $allowedConsumers)) {
154  return false;
155  }
156 
157  if ($this->pidConsumerManager->isRun($this->getPidFilePath($consumerName))) {
158  return false;
159  }
160 
161  $connectionName = $consumerConfig->getConnection();
162  try {
163  $this->mqConnectionTypeResolver->getConnectionType($connectionName);
164  } catch (\LogicException $e) {
165  $this->logger->info(sprintf(
166  'Consumer "%s" skipped as required connection "%s" is not configured. %s',
167  $consumerName,
168  $connectionName,
169  $e->getMessage()
170  ));
171  return false;
172  }
173 
174  return true;
175  }
176 
183  private function getPidFilePath($consumerName)
184  {
185  $sanitizedHostname = preg_replace('/[^a-z0-9]/i', '', gethostname());
186 
187  return $consumerName . '-' . $sanitizedHostname . static::PID_FILE_EXT;
188  }
189 }
__construct(PhpExecutableFinder $phpExecutableFinder, ConsumerConfigInterface $consumerConfig, DeploymentConfig $deploymentConfig, ShellInterface $shellBackground, PidConsumerManager $pidConsumerManager, ConnectionTypeResolver $mqConnectionTypeResolver=null, LoggerInterface $logger=null)
$deploymentConfig
$arguments
const BP
Definition: autoload.php:14