7 declare(strict_types=1);
19 use Psr\Log\LoggerInterface;
43 private $operationManagement;
48 private $messageEncoder;
53 private $messageValidator;
58 private $configuration;
68 private $serviceOutputProcessor;
73 private $communicationConfig;
94 CommunicationConfig $communicationConfig,
97 $this->messageValidator = $messageValidator;
98 $this->messageEncoder = $messageEncoder;
101 $this->operationManagement = $operationManagement;
103 $this->serviceOutputProcessor = $serviceOutputProcessor;
104 $this->communicationConfig = $communicationConfig;
113 public function process(
string $encodedMessage)
115 $operation = $this->messageEncoder->decode(AsyncConfig::SYSTEM_TOPIC_NAME, $encodedMessage);
116 $this->messageValidator->validate(AsyncConfig::SYSTEM_TOPIC_NAME, $operation);
121 $topicName = $operation->getTopicName();
122 $handlers = $this->configuration->getHandlers($topicName);
124 $data = $this->jsonHelper->unserialize($operation->getSerializedData());
125 $entityParams = $this->messageEncoder->decode($topicName,
$data[
'meta_information']);
126 $this->messageValidator->validate($topicName, $entityParams);
128 $this->logger->error($e->getMessage());
130 $errorCode = $e->getCode();
131 $messages[] = $e->getMessage();
135 if ($errorCode ===
null) {
136 foreach ($handlers as $callback) {
137 $result = $this->executeHandler($callback, $entityParams);
139 $errorCode =
$result[
'error_code'];
140 $messages = array_merge($messages,
$result[
'messages']);
141 $outputData =
$result[
'output_data'];
145 if (isset($outputData)) {
147 $communicationConfig = $this->communicationConfig->getTopic($topicName);
149 $communicationConfig[CommunicationConfig::TOPIC_HANDLERS][AsyncConfig::DEFAULT_HANDLER_NAME];
150 $serviceClass = $asyncHandler[CommunicationConfig::HANDLER_TYPE];
151 $serviceMethod = $asyncHandler[CommunicationConfig::HANDLER_METHOD];
152 $outputData = $this->serviceOutputProcessor->process(
157 $outputData = $this->jsonHelper->serialize($outputData);
159 $messages[] = $e->getMessage();
163 $serializedData = (isset($errorCode)) ? $operation->getSerializedData() :
null;
164 $this->operationManagement->changeOperationStatus(
168 implode(
'; ', $messages),
181 private function executeHandler($callback, $entityParams)
185 'error_code' =>
null,
187 'output_data' => null
190 $result[
'output_data'] = call_user_func_array($callback, $entityParams);
191 $result[
'messages'][] = sprintf(
'Service execution success %s::%s', get_class($callback[0]), $callback[1]);
193 $this->logger->critical($e->getMessage());
194 if ($e instanceof LockWaitException
195 || $e instanceof DeadlockException
196 || $e instanceof ConnectionException
199 $result[
'error_code'] = $e->getCode();
200 $result[
'messages'][] =
__($e->getMessage());
203 $result[
'error_code'] = $e->getCode();
205 __(
'Sorry, something went wrong during product prices update. Please see log for details.');
207 }
catch (NoSuchEntityException $e) {
208 $this->logger->error($e->getMessage());
209 $result[
'status'] = ($e instanceof TemporaryStateExceptionInterface) ?
212 $result[
'error_code'] = $e->getCode();
213 $result[
'messages'][] = $e->getMessage();
214 }
catch (LocalizedException $e) {
215 $this->logger->error($e->getMessage());
217 $result[
'error_code'] = $e->getCode();
218 $result[
'messages'][] = $e->getMessage();
219 }
catch (\Exception $e) {
220 $this->logger->error($e->getMessage());
222 $result[
'error_code'] = $e->getCode();
223 $result[
'messages'][] = $e->getMessage();
const STATUS_TYPE_NOT_RETRIABLY_FAILED
const STATUS_TYPE_RETRIABLY_FAILED
__construct(MessageValidator $messageValidator, MessageEncoder $messageEncoder, ConsumerConfigurationInterface $configuration, Json $jsonHelper, OperationManagementInterface $operationManagement, ServiceOutputProcessor $serviceOutputProcessor, CommunicationConfig $communicationConfig, LoggerInterface $logger)
const STATUS_TYPE_COMPLETE
process(string $encodedMessage)