atomic包
在java.util并发包中,有很多继承number的原子计数类,例如AtomicLong, AtomicInteger等等。在分布式开发过程中,往往也有相应的需求对同一资源进行计数。在curator中也提供了分布式原子计数类,例如DistributedAtomicInteger,DistributedAtomicLong。在zk中每一个节点,可以存储相应的数据,这些数据是二进制方式存储的。不光有对应的数据,还有节点存储的状态信息。
在zookeeper包中DataNode类中,有对应的状态对象StatPersisted
public class StatPersisted implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private long pzxid;
// getter and setter...
}
熟悉一下几个重要的属性
cZxid:数据节点创建时的事务ID
ctime:数据节点创建时的时间
mZxid:数据节点最后一次更新时的事务ID
mtime:数据节点最后一次更新时的时间
version:数据节点的版本号
有一个记录节点数据变更的属性version,通过测试观察,对节点数据变更,version版本也会随着变化,会逐渐递增。在curator的分布式原子计数类,也真是通过该特性实现的。
• DistributedAtomicInteger
举个integer类型的分布式原子计数,考虑一下,因为zk DataNode中存储的是二进制数据,那么获取数据肯定需要通过二进制转成integer,或者更新数据时需要将integer转换成二进制。其实本质就是二进制数据的变更,与需要Integer,或者Long类型没有关系。所以方法中 get,increment,decrement等等,通过DistributedAtomicValue类实现的。
举一个increment方法为例
private AtomicValue<Integer> worker(final Integer addAmount) throws Exception {
Preconditions.checkNotNull(addAmount, "addAmount cannot be null");
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
int previousValue = (previous != null) ? bytesToValue(previous) : 0;
int newValue = previousValue + addAmount;
return valueToBytes(newValue);
}
};
AtomicValue<byte[]> result = value.trySet(makeValue);
return new AtomicInteger(result);
}
在DistributedAtomicInteger中increment或者decrement方法,都是调用worker方法,只是参数中addAmount 可以设置为1,或-1表示增加或者减少。其中有个内部类MakeValue该接口主要实现了将int转换成byte数组了。
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception {
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}
在DistributedAtomicValue中,如何设置更新zk节点值。首先,是通过tryOptimistic方法,该方法采用乐观方式去更新,如果更新失败,最后通过方法tryWithMutex 分布式锁去更新数据。
private void tryOptimistic(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception
{
long startMs = System.currentTimeMillis();
int retryCount = 0;
boolean done = false;
while ( !done )
{
result.stats.incrementOptimisticTries();
if ( tryOnce(result, makeValue) )
{
result.succeeded = true;
done = true;
}
else
{
if ( !retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) )
{
done = true;
}
}
}
result.stats.setOptimisticTimeMs(System.currentTimeMillis() - startMs);
}
乐观方式更新,主要思想是多次更新,直到更新成功或者尝试次数,时间已经到达最大值。看一下tryOnce方法:
private boolean tryOnce(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
Stat stat = new Stat();
boolean createIt = getCurrentValue(result, stat);
boolean success = false;
try {
byte[] newValue = makeValue.makeFrom(result.preValue);
if ( createIt ) {
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
} else {
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
}
result.postValue = Arrays.copyOf(newValue, newValue.length);
success = true;
}
catch ( KeeperException.NodeExistsException e )
// do Retry
}
catch ( KeeperException.BadVersionException e ) {
// do Retry
}
catch ( KeeperException.NoNodeException e ) {
// do Retry
}
return success;
}
首先需要获取当前节点的值,通过getCurrentValue方法,并且stat状态信息也得到了最新的值。在getCurrentValue方法也能知道createIt状态,节点是否存在然后去创建。如果不需要创建的时候,更新值时,会withVersion(stat.getVersion())执行该方法,就是带了最近的stat中version信息。在抛异常时,一定要合理处理,
例如:
NodeExistsException异常,说明创建的时候,已经存在了,需要重试;
BadVersionException异常,版本异常,通过stat.getVersion()控制,如果zk当前的记录节点的version与请求的version不一致时拒绝更新,返回异常。这个异常在更新时最为常见;
NoNodeException异常,可能节点突然被删除了,那么节点不存在了,需要重试;
通过BadVersionException异常中,看到一些与数据库设计中乐观锁很相似。
既然乐观更新失败,那么需要更加严格方式,例如通过锁,就执行了互斥方式更新
private void tryWithMutex(MutableAtomicValue<byte[]> result, MakeValue makeValue) throws Exception {
long startMs = System.currentTimeMillis();
int retryCount = 0;
if ( mutex.acquire(promotedToLock.getMaxLockTime(), promotedToLock.getMaxLockTimeUnit()) ) {
try {
boolean done = false;
while ( !done )
{
result.stats.incrementPromotedTries();
if ( tryOnce(result, makeValue) ) {
result.succeeded = true;
done = true;
}
else {
if ( !promotedToLock.getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMs, RetryLoop.getDefaultRetrySleeper()) ) {
done = true;
}
}
}
}
finally {
mutex.release();
}
}
result.stats.setPromotedTimeMs(System.currentTimeMillis() - startMs);
}
他很简单,就是在更新值前,通过mutex尝试获取锁。其他与乐观方式基本一致。
在分布式原子计数中,都有可能会设置失败的,需要我们后续处理。
在DistributedAtomicInteger中,任然以increment举例,他返回的对象是AtomicValue接口,其中包含了判断是否成功succeeded方法。
leader包
选举包,在分布式开发,选举leader也是很重要的功能。因为在分布式服务中,服务器的角色是对等,但是有些业务往往需要通过leader单独处理,不能同时多个服务处理业务,可能会造成数据功能混乱等。
在curator中 选举leader提供了2中方式:
LeaderLatch, LeaderSelector。其中LeaderSelector实现原理很简单,利用Mutex互斥锁,获取锁即是leader,利用实现LeaderSelectorListener接口,takeLeadership方法就是当是leader时调用该方法,该方法执行完后,会主动释放leader(释放锁),让其他线程或者服务争夺leader。
LeaderLatch,稍微复杂,但是本质与分布式锁的实现原理一致。首先如何知道获取leader,或者不是leader,通过实现接口 LeaderLatchListener
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
public void isLeader();
/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/
public void notLeader();
其中调用isLeader时,说明当前是leader;调用notLeader时,说明不是leader;其中实现方法即是开发人员需要去实现的业务逻辑。
在调用start方法时,启动线程后调用internalStart方法,最终调用reset方法
void reset() throws Exception
{
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
其中创建的路径是EPHEMERAL_SEQUENTIAL模式的,说明节点是非持久且递增的。然后创建成功时候,调用callback回调类。确认创建成功后,开始判断是否是leader。setNode方法记录一下当前的节点名称,便于变更或者close时删除路径。那么getChildren方法,肯定是得到当前有多少子节点创建了
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
checkLeadership(event.getChildren());
}
}
};
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
通过该方法能知道,他来控制选举产生leader,其实通过getChildren后,执行回调方法,最终执行checkLeadership方法。
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if ( ourIndex < 0 )
{
log.error("Can't find our node. Resetting. Index: " + ourIndex);
reset();
}
else if ( ourIndex == 0 )
{
setLeadership(true);
}
else
{
String watchPath = sortedChildren.get(ourIndex - 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch ( Exception ex )
{
ThreadUtils.checkInterrupted(ex);
log.error("An error occurred checking the leadership.", ex);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
// previous node is gone - reset
reset();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
该方法核心思想也是,得到最小节点的编号,选出他会leader。如果当前不是leader,那么需要监听排名前一位的路径节点,如果前一位的路径变更,那么就需要重新执行getChildren方法了。
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
if ( oldValue && !newValue )
{ // Lost leadership, was true, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
}
notifyAll();
}
确认leader状态变化,最后遍历所有的LeaderLatchListener。