参考文章:
http://ju.outofmemory.cn/entry/152607
http://blog.51cto.com/zero01/2115528
<?php
# 模拟Redis连接
class MyRedis {
private $_host = '';
private $_port = '';
public function pconnect($host, $port){
$this->_host = $host;
$this->_port = $port;
return $this;
}
}
# 分布式核心类
class RedisCache {
public $servers = array(); //真实服务器
private $_servers = array(); //虚拟节点 key--虚拟节点的hash值 value--虚拟节点指向的服务器的index
private $_serverKeys = array(); //虚拟节点的hash值
private $_badServers = array(); // 故障服务器列表
private $_count = 0;// 服务器的总台数
const SERVER_REPLICAS = 64; //每台服务器副本数量,提高一致性哈希算法的数据分布均匀程度
public function __construct( $servers ){
$this->servers = $servers;
$this->_count = count($this-> servers);
//Redis虚拟节点哈希表
foreach ($this ->servers as $k => $server) {
for ($i = 0; $i < self::SERVER_REPLICAS; $i++) {
$hash = crc32($server[ 'host'] . '#' .$server['port'] . '#'. $i);
$this->_servers [$hash] = $k;
}
}
ksort( $this->_servers );
$this->_serverKeys = array_keys($this-> _servers);
}
/**
* 使用一致性哈希分派服务器,附加故障检测及转移功能
*/
public function getRedis($key){
$hash = crc32($key);
$slen = $this->_count * self:: SERVER_REPLICAS;// 总的服务器副本数量
// 快速定位虚拟节点
$sid = $hash > $this->_serverKeys [$slen-1] ? 0 : $this->quickSearch($this->_serverKeys, $hash, 0, $slen);
$conn = false;
$i = 0;
do {
$server = $this->_servers [$this->_serverKeys[$sid]];
$conn = $this->getRedisConnect($server);
$sid = ($sid + 1) % $slen;
} while (!$conn && $i++ < $slen);
return $conn;
}
/**
* 二分法快速查找
*/
private function quickSearch($stack, $find, $start, $length) {
if ($length == 1) {
return $start;
} else if ($length == 2) {
return $find <= $stack[$start] ? $start : ($start +1);
}
$mid = intval($length / 2);
if ($find <= $stack[$start + $mid - 1]) {
return $this->quickSearch($stack, $find, $start, $mid);
} else {
return $this->quickSearch($stack, $find, $start+$mid, $length-$mid);
}
}
/**
* 连接Redis服务器
* $server 表示
*/
private function getRedisConnect($server){
static $REDIS = array();
if (!$REDIS[$server]){
$REDIS[$server] = new MyRedis();
try{
$ret = $REDIS[$server]->pconnect( $this->servers [$server]['host'], $this->servers [$server]['port']);
if (!$ret) {
unset($REDIS[$server]);
$this->_badServers [] = $server;
return false;
}
} catch(Exception $e){
unset($REDIS[$server]);
$this->_badServers [] = $server;
return false;
}
}
return $REDIS[$server];
}
public function getValue($key){
try{
$getValue = $this->getRedis($key)->get($key);
} catch(Exception $e){
$getValue = null;
}
return $getValue;
}
public function setValue($key,$value,$expire){
if($expire == 0){
try{
$ret = $this->getRedis($key)->set($key, $value);
} catch(Exception $e){
$ret = false;
}
} else {
try{
$ret = $this->getRedis($key)->setex($key, $expire, $value);
} catch(Exception $e){
$ret = false;
}
}
return $ret;
}
public function deleteValue($key){
return $this->getRedis($key)->delete($key);
}
public function flushValues(){
//TODO
return true;
}
}
// Usage:
$redis_servers = array(
array(
'host' => '10.0.0.1',
'port' => 6379,
),
array(
'host' => '10.0.0.2',
'port' => 6379,
),
array(
'host' => '10.0.0.3',
'port' => 6379,
),
array(
'host' => '10.0.0.4',
'port' => 6379,
),
);
$redisCache = new RedisCache($redis_servers);
for ($i=0; $i < 1000; $i++) {
$conn = $redisCache->getRedis('key' . $i);
// 打印分布式Redis服务器的IP地址
var_dump($conn);echo "<hr>";
}