_queue; } $msgs = array(); $info = $this->_msg_table->info(); $microtime = microtime(true); // cache microtime $db = $this->_msg_table->getAdapter(); try { // transaction must start before the select query. $db->beginTransaction(); // changes: added forUpdate $query = $db->select()->forUpdate(); $query->from($info['name'], array('*')); $query->where('queue_id=?', $this->getQueueId($queue->getName())); $query->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime); $query->limit($maxMessages); foreach ($db->fetchAll($query) as $data) { // setup our changes to the message $data['handle'] = md5(uniqid(rand(), true)); $update = array( 'handle' => $data['handle'], 'timeout' => $microtime ); // update the database $where = array(); $where[] = $db->quoteInto('message_id=?', $data['message_id']); $count = $db->update($info['name'], $update, $where); // we check count to make sure no other thread has gotten // the rows after our select, but before our update. if ($count > 0) { $msgs[] = $data; $this->getLogger()->debug('Received message:' . $data['message_id'] . ' byte size=' . strlen($data['body'])); } } $db->commit(); } catch (Exception $e) { $db->rollBack(); $this->getLogger()->err($e->getMessage() . ' code ' . $e->getCode()); /** * @see Zend_Queue_Exception */ require_once 'Zend/Queue/Exception.php'; throw new Zend_Queue_Exception($e->getMessage(), $e->getCode()); } $config = array( 'queue' => $queue, 'data' => $msgs, 'messageClass' => $queue->getMessageClass() ); $classname = $queue->getMessageSetClass(); Zend_Loader::loadClass($classname); return new $classname($config); } }