Magento 2 Documentation  2.3
Documentation for Magento 2 CMS v2.3 (December 2018)
MessageEncoder.php
Go to the documentation of this file.
1 <?php
7 
12 use Magento\Framework\Communication\ConfigInterface as CommunicationConfig;
13 
20 {
21  const DIRECTION_ENCODE = 'encode';
22  const DIRECTION_DECODE = 'decode';
23 
27  private $dataObjectEncoder;
28 
32  private $dataObjectDecoder;
33 
37  private $jsonEncoder;
38 
42  private $jsonDecoder;
43 
47  private $communicationConfig;
48 
60  public function __construct(
61  QueueConfig $queueConfig,
62  \Magento\Framework\Json\EncoderInterface $jsonEncoder,
63  \Magento\Framework\Json\DecoderInterface $jsonDecoder,
64  \Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder,
65  \Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder
66  ) {
67  $this->dataObjectEncoder = $dataObjectEncoder;
68  $this->dataObjectDecoder = $dataObjectDecoder;
69  $this->jsonEncoder = $jsonEncoder;
70  $this->jsonDecoder = $jsonDecoder;
71  }
72 
82  public function encode($topic, $message, $requestType = true)
83  {
84  $convertedMessage = $this->convertMessage($topic, $message, self::DIRECTION_ENCODE, $requestType);
85  return $this->jsonEncoder->encode($convertedMessage);
86  }
87 
97  public function decode($topic, $message, $requestType = true)
98  {
99  try {
100  $decodedMessage = $this->jsonDecoder->decode($message);
101  } catch (\Exception $e) {
102  throw new LocalizedException(new Phrase("Error occurred during message decoding."));
103  }
104  return $this->convertMessage($topic, $decodedMessage, self::DIRECTION_DECODE, $requestType);
105  }
106 
115  protected function getTopicSchema($topic, $requestType)
116  {
117  $topicConfig = $this->getCommunicationConfig()->getTopic($topic);
118  if ($topicConfig === null) {
119  throw new LocalizedException(new Phrase('Specified topic "%topic" is not declared.', ['topic' => $topic]));
120  }
121  if ($requestType) {
122  return [
123  'schema_type' => $topicConfig[CommunicationConfig::TOPIC_REQUEST_TYPE],
124  'schema_value' => $topicConfig[CommunicationConfig::TOPIC_REQUEST]
125  ];
126  } else {
127  return [
128  'schema_type' => isset($topicConfig[CommunicationConfig::TOPIC_RESPONSE])
129  ? CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS
130  : null,
131  'schema_value' => $topicConfig[CommunicationConfig::TOPIC_RESPONSE]
132  ];
133  }
134  }
135 
146  protected function convertMessage($topic, $message, $direction, $requestType)
147  {
148  $topicSchema = $this->getTopicSchema($topic, $requestType);
149  if ($topicSchema['schema_type'] == CommunicationConfig::TOPIC_REQUEST_TYPE_CLASS) {
151  $messageDataType = $topicSchema[QueueConfig::TOPIC_SCHEMA_VALUE];
152  try {
153  $convertedMessage = $this->getConverter($direction)->convertValue($message, $messageDataType);
154  } catch (LocalizedException $e) {
155  throw new LocalizedException(
156  new Phrase(
157  'Message with topic "%topic" must be an instance of "%class".',
158  ['topic' => $topic, 'class' => $messageDataType]
159  )
160  );
161  }
162  } else {
164  $message = (array)$message;
165  $isIndexedArray = array_keys($message) === range(0, count($message) - 1);
166  $convertedMessage = [];
168  foreach ($topicSchema['schema_value'] as $methodParameterMeta) {
169  $paramName = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_NAME];
170  $paramType = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_TYPE];
171  if ($isIndexedArray) {
173  $paramPosition = $methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_POSITION];
174  if (isset($message[$paramPosition])) {
175  $convertedMessage[$paramName] = $this->getConverter($direction)
176  ->convertValue($message[$paramPosition], $paramType);
177  }
178  } else {
180  if (isset($message[$paramName])) {
181  $convertedMessage[$paramName] = $this->getConverter($direction)
182  ->convertValue($message[$paramName], $paramType);
183  }
184  }
185 
187  if ($methodParameterMeta[CommunicationConfig::SCHEMA_METHOD_PARAM_IS_REQUIRED]
188  && !isset($convertedMessage[$paramName])
189  ) {
190  throw new LocalizedException(
191  new Phrase(
192  'Data item corresponding to "%param" must be specified '
193  . 'in the message with topic "%topic".',
194  [
195  'topic' => $topic,
196  'param' => $paramName
197  ]
198  )
199  );
200  }
201  }
202  }
203  return $convertedMessage;
204  }
205 
212  protected function getConverter($direction)
213  {
214  return ($direction == self::DIRECTION_ENCODE) ? $this->dataObjectEncoder : $this->dataObjectDecoder;
215  }
216 
224  private function getCommunicationConfig()
225  {
226  if ($this->communicationConfig === null) {
227  $this->communicationConfig = \Magento\Framework\App\ObjectManager::getInstance()
228  ->get(CommunicationConfig::class);
229  }
230  return $this->communicationConfig;
231  }
232 }
__construct(QueueConfig $queueConfig, \Magento\Framework\Json\EncoderInterface $jsonEncoder, \Magento\Framework\Json\DecoderInterface $jsonDecoder, \Magento\Framework\Webapi\ServiceOutputProcessor $dataObjectEncoder, \Magento\Framework\Webapi\ServiceInputProcessor $dataObjectDecoder)
$message
convertMessage($topic, $message, $direction, $requestType)
encode($topic, $message, $requestType=true)
decode($topic, $message, $requestType=true)