Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
Queue.php
Go to the documentation of this file.
1 <?php
7 
11 use Psr\Log\LoggerInterface;
13 use Magento\Framework\Locale\ResolverInterface as LocaleResolver;
14 
20 class Queue
21 {
26 
30  const DEFAULT_MAX_EXEC_TIME = 400;
31 
35  private $packages = [];
36 
40  private $processIds = [];
41 
45  private $inProgress = [];
46 
50  private $maxProcesses;
51 
55  private $maxExecTime;
56 
60  private $appState;
61 
65  private $localeResolver;
66 
70  private $resourceConnection;
71 
75  private $logger;
76 
80  private $deployPackageService;
81 
85  private $options = [];
86 
90  private $start = 0;
91 
95  private $lastJobStarted = 0;
96 
107  public function __construct(
108  AppState $appState,
109  LocaleResolver $localeResolver,
110  ResourceConnection $resourceConnection,
111  LoggerInterface $logger,
112  DeployPackage $deployPackageService,
113  array $options = [],
114  $maxProcesses = self::DEFAULT_MAX_PROCESSES_AMOUNT,
115  $maxExecTime = self::DEFAULT_MAX_EXEC_TIME
116  ) {
117  $this->appState = $appState;
118  $this->localeResolver = $localeResolver;
119  $this->resourceConnection = $resourceConnection;
120  $this->logger = $logger;
121  $this->deployPackageService = $deployPackageService;
122  $this->options = $options;
123  $this->maxProcesses = $maxProcesses;
124  $this->maxExecTime = $maxExecTime;
125  }
126 
132  public function add(Package $package, array $dependencies = [])
133  {
134  $this->packages[$package->getPath()] = [
135  'package' => $package,
136  'dependencies' => $dependencies
137  ];
138 
139  return true;
140  }
141 
145  public function getPackages()
146  {
147  return $this->packages;
148  }
149 
155  public function process()
156  {
157  $returnStatus = 0;
158  $this->start = $this->lastJobStarted = time();
159  $packages = $this->packages;
160  while (count($packages) && $this->checkTimeout()) {
161  foreach ($packages as $name => $packageJob) {
162  $this->assertAndExecute($name, $packages, $packageJob);
163  }
164  $this->logger->info('.');
165  sleep(3);
166  foreach ($this->inProgress as $name => $package) {
167  if ($this->isDeployed($package)) {
168  unset($this->inProgress[$name]);
169  }
170  }
171  }
172 
173  $this->awaitForAllProcesses();
174 
175  return $returnStatus;
176  }
177 
186  private function assertAndExecute($name, array & $packages, array $packageJob)
187  {
189  $package = $packageJob['package'];
190  $dependenciesNotFinished = false;
191  if ($package->getParent() && $package->getParent() !== $package) {
192  foreach ($packageJob['dependencies'] as $dependencyName => $dependency) {
193  if (!$this->isDeployed($dependency)) {
194  //If it's not present in $packages then it's already
195  //in progress so just waiting...
196  if (!array_key_exists($dependencyName, $packages)) {
197  $dependenciesNotFinished = true;
198  } else {
199  $this->assertAndExecute(
200  $dependencyName,
201  $packages,
202  $packages[$dependencyName]
203  );
204  }
205  }
206  }
207  }
208  $this->executePackage($package, $name, $packages, $dependenciesNotFinished);
209  }
210 
218  private function executePackage(
219  Package $package,
220  string $name,
221  array &$packages,
222  bool $dependenciesNotFinished
223  ) {
224  if (!$dependenciesNotFinished
225  && !$this->isDeployed($package)
226  && ($this->maxProcesses < 2 || (count($this->inProgress) < $this->maxProcesses))
227  ) {
228  unset($packages[$name]);
229  $this->execute($package);
230  }
231  }
232 
238  private function awaitForAllProcesses()
239  {
240  while ($this->inProgress && $this->checkTimeout()) {
241  foreach ($this->inProgress as $name => $package) {
242  if ($this->isDeployed($package)) {
243  unset($this->inProgress[$name]);
244  }
245  }
246  $this->logger->info('.');
247  sleep(5);
248  }
249  if ($this->isCanBeParalleled()) {
250  // close connections only if ran with forks
251  $this->resourceConnection->closeConnection();
252  }
253  }
254 
258  private function isCanBeParalleled()
259  {
260  return function_exists('pcntl_fork') && $this->maxProcesses > 1;
261  }
262 
268  private function execute(Package $package)
269  {
270  $this->lastJobStarted = time();
271  $this->logger->info(
272  "Execute: " . $package->getPath(),
273  [
274  'process' => $package->getPath(),
275  'count' => count($package->getFiles()),
276  ]
277  );
278 
279  $this->appState->emulateAreaCode(
280  $package->getArea() == Package::BASE_AREA ? 'global' : $package->getArea(),
281  function () use ($package) {
282  // emulate application locale needed for correct file path resolving
283  $this->localeResolver->setLocale($package->getLocale());
284 
285  // execute package pre-processors
286  // (may add more files to deploy, so it needs to be executed in main thread)
287  foreach ($package->getPreProcessors() as $processor) {
288  $processor->process($package, $this->options);
289  }
290  }
291  );
292 
293  if ($this->isCanBeParalleled()) {
294  $pid = pcntl_fork();
295  if ($pid === -1) {
296  throw new \RuntimeException('Unable to fork a new process');
297  }
298 
299  if ($pid) {
300  $this->inProgress[$package->getPath()] = $package;
301  $this->processIds[$package->getPath()] = $pid;
302  return true;
303  }
304 
305  // process child process
306  $this->inProgress = [];
307  $this->deployPackageService->deploy($package, $this->options, true);
308  exit(0);
309  } else {
310  $this->deployPackageService->deploy($package, $this->options);
311  return true;
312  }
313  }
314 
319  private function isDeployed(Package $package)
320  {
321  if ($this->isCanBeParalleled()) {
322  if ($package->getState() === null) {
323  $pid = pcntl_waitpid($this->getPid($package), $status, WNOHANG);
324  if ($pid === $this->getPid($package)) {
325  $package->setState(Package::STATE_COMPLETED);
326 
327  unset($this->inProgress[$package->getPath()]);
328  return pcntl_wexitstatus($status) === 0;
329  }
330  return false;
331  }
332  }
333  return $package->getState();
334  }
335 
340  private function getPid(Package $package)
341  {
342  return isset($this->processIds[$package->getPath()])
343  ? $this->processIds[$package->getPath()]
344  : null;
345  }
346 
350  private function checkTimeout()
351  {
352  return time() - $this->lastJobStarted < $this->maxExecTime;
353  }
354 
362  public function __destruct()
363  {
364  foreach ($this->inProgress as $package) {
365  if (pcntl_waitpid($this->getPid($package), $status) === -1) {
366  throw new \RuntimeException(
367  'Error while waiting for package deployed: ' . $this->getPid($package) . '; Status: ' . $status
368  );
369  }
370  }
371  }
372 }
$processor
Definition: 404.php:10
$logger
const DEFAULT_MAX_PROCESSES_AMOUNT
Definition: Queue.php:25
exit
Definition: redirect.phtml:12
__construct(AppState $appState, LocaleResolver $localeResolver, ResourceConnection $resourceConnection, LoggerInterface $logger, DeployPackage $deployPackageService, array $options=[], $maxProcesses=self::DEFAULT_MAX_PROCESSES_AMOUNT, $maxExecTime=self::DEFAULT_MAX_EXEC_TIME)
Definition: Queue.php:107
$status
Definition: order_status.php:8
add(Package $package, array $dependencies=[])
Definition: Queue.php:132
if(!isset($_GET['name'])) $name
Definition: log.php:14