22 $this->
_init(
'queue',
'id');
36 [
'topic_name' => $messageTopic,
'body' => $messageBody]
52 $data[] = [
'topic_name' => $messageTopic,
'body' =>
$message];
58 ->where(
'qm.id >= ?', $firstId)
87 foreach ($messageIds as $messageId) {
88 foreach ($queueIds as $queueId) {
99 [
'queue_id',
'message_id',
'status'],
117 ->where(
'queue.name IN (?)', $queueNames);
137 'queue_message.id = queue_message_status.message_id',
148 'queue.id = queue_message_status.queue_id',
151 'queue_message_status.status IN (?)',
153 )->where(
'queue.name = ?', $queueName)
154 ->order(
'queue_message_status.updated_at ASC');
178 $condition = count($messageIds) > 0 ? [
'id NOT IN (?)' => $messageIds] :
null;
190 $takenMessagesRelationIds = [];
191 foreach ($relationIds as $relationId) {
195 [
'id = ?' => $relationId]
203 $takenMessagesRelationIds[] = $relationId;
206 return $takenMessagesRelationIds;
221 'number_of_trials' =>
new \
Zend_Db_Expr(
'number_of_trials+1')
223 [
'id = ?' => $relationId]
239 [
'id IN (?)' => $relationIds]
250 return $this->
getTable(
'queue_message_status');
270 return $this->
getTable(
'queue_message');
getQueueIdsByNames($queueNames)
linkQueues($messageId, $queueNames)
_init($mainTable, $idFieldName)
saveMessages($messageTopic, array $messages)
const MESSAGE_NUMBER_OF_TRIALS
takeMessagesInProgress($relationIds)
pushBackForRetry($relationId)
changeStatus($relationIds, $status)
saveMessage($messageTopic, $messageBody)
const MESSAGE_STATUS_TO_BE_DELETED
getMessages($queueName, $limit=null)
const MESSAGE_STATUS_RETRY_REQUIRED
const MESSAGE_STATUS_IN_PROGRESS
linkMessagesWithQueues(array $messageIds, array $queueNames)
const MESSAGE_QUEUE_RELATION_ID