27 private $dataObjectEncoder;
32 private $dataObjectDecoder;
47 private $communicationConfig;
61 QueueConfig $queueConfig,
62 \
Magento\Framework\
Json\EncoderInterface $jsonEncoder,
63 \
Magento\Framework\
Json\DecoderInterface $jsonDecoder,
64 \
Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder,
65 \
Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder
67 $this->dataObjectEncoder = $dataObjectEncoder;
68 $this->dataObjectDecoder = $dataObjectDecoder;
69 $this->jsonEncoder = $jsonEncoder;
70 $this->jsonDecoder = $jsonDecoder;
85 return $this->jsonEncoder->encode($convertedMessage);
100 $decodedMessage = $this->jsonDecoder->decode(
$message);
101 }
catch (\Exception $e) {
104 return $this->
convertMessage($topic, $decodedMessage, self::DIRECTION_DECODE, $requestType);
117 $topicConfig = $this->getCommunicationConfig()->getTopic($topic);
118 if ($topicConfig ===
null) {
123 'schema_type' => $topicConfig[CommunicationConfig::TOPIC_REQUEST_TYPE],
124 'schema_value' => $topicConfig[CommunicationConfig::TOPIC_REQUEST]
128 'schema_type' => isset($topicConfig[CommunicationConfig::TOPIC_RESPONSE])
129 ? CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
131 'schema_value' => $topicConfig[CommunicationConfig::TOPIC_RESPONSE]
149 if ($topicSchema[
'schema_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS) {
151 $messageDataType = $topicSchema[QueueConfig::TOPIC_SCHEMA_VALUE];
157 'Message with topic "%topic" must be an instance of "%class".',
158 [
'topic' => $topic,
'class' => $messageDataType]
166 $convertedMessage = [];
168 foreach ($topicSchema[
'schema_value'] as $methodParameterMeta) {
169 $paramName = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_NAME];
170 $paramType = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_TYPE];
171 if ($isIndexedArray) {
173 $paramPosition = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_POSITION];
174 if (isset(
$message[$paramPosition])) {
175 $convertedMessage[$paramName] = $this->
getConverter($direction)
176 ->convertValue(
$message[$paramPosition], $paramType);
181 $convertedMessage[$paramName] = $this->
getConverter($direction)
182 ->convertValue(
$message[$paramName], $paramType);
187 if ($methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_IS_REQUIRED]
188 && !isset($convertedMessage[$paramName])
192 'Data item corresponding to "%param" must be specified ' 193 .
'in the message with topic "%topic".',
196 'param' => $paramName
203 return $convertedMessage;
214 return ($direction == self::DIRECTION_ENCODE) ? $this->dataObjectEncoder : $this->dataObjectDecoder;
224 private function getCommunicationConfig()
226 if ($this->communicationConfig ===
null) {
228 ->get(CommunicationConfig::class);
230 return $this->communicationConfig;
getTopicSchema($topic, $requestType)
__construct(QueueConfig $queueConfig, \Magento\Framework\Json\EncoderInterface $jsonEncoder, \Magento\Framework\Json\DecoderInterface $jsonDecoder, \Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder, \Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder)
convertMessage($topic, $message, $direction, $requestType)
encode($topic, $message, $requestType=true)
decode($topic, $message, $requestType=true)