用PHP写的基于Memcache的Queue实现代码

  php类代码:

  

复制代码 代码如下:

  <?php

  class MQ{

  public static $client;

  private static $m_real;

  private static $m_front;

  private static $m_data = array();

  const QUEUE_MAX_NUM = 100000000;

  const QUEUE_FRONT_KEY = '_queue_item_front';

  const QUEUE_REAL_KEY = '_queue_item_real';

  public static function setupMq($conf) {

  self::$client = memcache_pconnect($conf);

  self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY);

  self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY);

  if (!isset(self::$m_real) || emptyempty(self::$m_real)) {

  self::$real= 0;

  }

  if (!isset(self::$m_front) || emptyempty(self::$m_front)) {

  self::$m_front = 0;

  }

  return self::$client;

  }

  public static function add($queue, $data) {

  $result = false;

  if (self::$m_real < self::QUEUE_MAX_NUM) {

  if (memcache_add(self::$client, $queue.self::$m_real, $data)) {

  self::mqRealChange();

  $result = true;

  }

  }

  return $result;

  }

  public static function get($key, $count) {

  $num = 0;

  for ($i=self::$m_front;$i<self::$m_front + $count;$i++) {

  if ($dataTmp = memcache_get(self::$client, $key.$i)) {

  self::$m_data[] = $dataTmp;

  memcache_delete(self::$client, $key.$i);

  $num++;

  }

  }

  if ($num>0) {

  self::mqFrontChange($num);

  }

  return self::$m_data;

  }

  private static function mqRealChange() {

  memcache_add(self::$client, self::QUEUE_REAL_KEY, 0);

  self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1);

  }

  private static function mqFrontChange($num) {

  memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0);

  self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num);

  }

  public static function mflush($memcache_obj) {

  memcache_flush($memcache_obj);

  }

  public static function Debug() {

  echo 'real:'.self::$m_real."<br>/r/n";

  echo 'front:'.self::$m_front."<br>/r/n";

  echo 'wait for process data:'.intval(self::$m_real - self::$m_front);

  echo "<br>/r/n";

  echo '<pre>';

  print_r(self::$m_data);

  echo '<pre>';

  }

  }

  define('FLUSH_MQ',0);//CLEAN ALL DATA

  define('IS_ADD',0);//SET DATA

  $mobj = MQ::setupMq('127.0.0.1','11211');

  if (FLUSH_MQ) {

  MQ::mflush($mobj);

  } else {

  if (IS_ADD) {

  MQ::add('user_sync', '1test');

  MQ::add('user_sync', '2test');

  MQ::add('user_sync', '3test');

  MQ::add('user_sync', '4test');

  MQ::add('user_sync', '5test');

  MQ::add('user_sync', '6test');

  } else {

  MQ::get('user_sync', 10);

  }

  }

  MQ::Debug();

  ?>

  使用方法

  

复制代码 代码如下:

  MQ::setupMq('127.0.0.1','11211');//连接

  MQ::add($key, $value);//添加数据到队列

  MQ::add($key, $value);//添加数据到队列

  MQ::add($key, $value);//添加数据到队列

  MQ::add($key, $value);//添加数据到队列

  MQ::add($key, $value);//添加数据到队列

  MQ::add($key, $value);//添加数据到队列

  MQ:get($key, 10);//取出一定数量的数据