11 use Magento\AsynchronousOperations\Api\Data\BulkSummaryInterfaceFactory;
16 use Magento\AsynchronousOperations\Model\ResourceModel\Operation\CollectionFactory;
29 private $entityManager;
34 private $bulkSummaryFactory;
39 private $operationCollectionFactory;
49 private $metadataPool;
54 private $resourceConnection;
79 BulkSummaryInterfaceFactory $bulkSummaryFactory,
80 CollectionFactory $operationCollectionFactory,
87 $this->entityManager = $entityManager;
88 $this->bulkSummaryFactory= $bulkSummaryFactory;
89 $this->operationCollectionFactory = $operationCollectionFactory;
90 $this->metadataPool = $metadataPool;
92 $this->publisher = $publisher;
102 $metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
103 $connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
106 $userType = $this->userContext->getUserType();
107 if ($userType ===
null) {
112 $bulkSummary = $this->bulkSummaryFactory->create();
113 $this->entityManager->load($bulkSummary, $bulkUuid);
114 $bulkSummary->setBulkId($bulkUuid);
116 $bulkSummary->setUserId($userId);
117 $bulkSummary->setUserType($userType);
118 $bulkSummary->setOperationCount((
int)$bulkSummary->getOperationCount() + count(
$operations));
120 $this->entityManager->save($bulkSummary);
123 }
catch (\Exception $exception) {
125 $this->logger->critical($exception->getMessage());
140 public function retryBulk($bulkUuid, array $errorCodes)
142 $metadata = $this->metadataPool->getMetadata(BulkSummaryInterface::class);
143 $connection = $this->resourceConnection->getConnectionByName($metadata->getEntityConnectionName());
146 $retriablyFailedOperations = $this->operationCollectionFactory->create()
147 ->addFieldToFilter(
'error_code', [
'in' => $errorCodes])
148 ->addFieldToFilter(
'bulk_uuid', [
'eq' => $bulkUuid])
155 $currentBatchSize = 0;
156 $maxBatchSize = 10000;
158 foreach ($retriablyFailedOperations as $operation) {
159 if ($currentBatchSize === $maxBatchSize) {
161 $this->resourceConnection->getTableName(
'magento_operation'),
165 $currentBatchSize = 0;
168 $operationIds[] = $operation->getId();
170 $operation->setId(
null);
173 if (!empty($operationIds)) {
175 $this->resourceConnection->getTableName(
'magento_operation'),
181 }
catch (\Exception $exception) {
183 $this->logger->critical($exception->getMessage());
186 $this->publishOperations($retriablyFailedOperations);
188 return count($retriablyFailedOperations);
197 private function publishOperations(array
$operations)
199 $operationsByTopics = [];
201 $operationsByTopics[$operation->getTopicName()][] = $operation;
203 foreach ($operationsByTopics as $topicName =>
$operations) {
204 $this->publisher->publish($topicName,
$operations);
213 return $this->entityManager->delete(
214 $this->entityManager->load(
215 $this->bulkSummaryFactory->create(),
__construct(EntityManager $entityManager, BulkSummaryInterfaceFactory $bulkSummaryFactory, CollectionFactory $operationCollectionFactory, BulkPublisherInterface $publisher, MetadataPool $metadataPool, ResourceConnection $resourceConnection, \Psr\Log\LoggerInterface $logger, UserContextInterface $userContext=null)
scheduleBulk($bulkUuid, array $operations, $description, $userId=null)