7 declare(strict_types=1);
14 use Magento\AsynchronousOperations\Api\Data\ItemStatusInterfaceFactory;
16 use Magento\AsynchronousOperations\Api\Data\AsyncResponseInterfaceFactory;
20 use Psr\Log\LoggerInterface;
34 private $identityService;
39 private $asyncResponseFactory;
44 private $itemStatusInterfaceFactory;
49 private $bulkManagement;
59 private $operationRepository;
79 ItemStatusInterfaceFactory $itemStatusInterfaceFactory,
80 AsyncResponseInterfaceFactory $asyncResponseFactory,
86 $this->identityService = $identityService;
87 $this->itemStatusInterfaceFactory = $itemStatusInterfaceFactory;
88 $this->asyncResponseFactory = $asyncResponseFactory;
89 $this->bulkManagement = $bulkManagement;
91 $this->operationRepository = $operationRepository;
106 public function publishMass($topicName, array $entitiesArray,
$groupId =
null, $userId =
null)
108 $bulkDescription =
__(
'Topic %1', $topicName);
110 if ($userId ==
null) {
111 $userId = $this->userContext->getUserId();
115 $groupId = $this->identityService->generateId();
118 if (!$this->bulkManagement->scheduleBulk(
$groupId, [], $bulkDescription, $userId)) {
119 throw new LocalizedException(
120 __(
'Something went wrong while processing the request.')
127 $bulkException =
new BulkException();
128 foreach ($entitiesArray as $key => $entityParams) {
130 $requestItem = $this->itemStatusInterfaceFactory->create();
133 $operations[] = $this->operationRepository->createByTopic($topicName, $entityParams,
$groupId);
134 $requestItem->setId($key);
136 $requestItems[] = $requestItem;
137 }
catch (\Exception $exception) {
138 $this->logger->error($exception);
139 $requestItem->setId($key);
141 $requestItem->setErrorMessage($exception);
142 $requestItem->setErrorCode($exception);
143 $requestItems[] = $requestItem;
144 $bulkException->addException(
new LocalizedException(
145 __(
'Error processing %key element of input data', [
'key' => $key]),
151 if (!$this->bulkManagement->scheduleBulk(
$groupId,
$operations, $bulkDescription, $userId)) {
152 throw new LocalizedException(
153 __(
'Something went wrong while processing the request.')
157 $asyncResponse = $this->asyncResponseFactory->create();
158 $asyncResponse->setBulkUuid(
$groupId);
159 $asyncResponse->setRequestItems($requestItems);
161 if ($bulkException->wasErrorAdded()) {
162 $asyncResponse->setErrors(
true);
163 $bulkException->addData($asyncResponse);
164 throw $bulkException;
166 $asyncResponse->setErrors(
false);
169 return $asyncResponse;
__construct(IdentityGeneratorInterface $identityService, ItemStatusInterfaceFactory $itemStatusInterfaceFactory, AsyncResponseInterfaceFactory $asyncResponseFactory, BulkManagementInterface $bulkManagement, LoggerInterface $logger, OperationRepository $operationRepository, UserContextInterface $userContext=null)