paxos算法java代码实现

paxos算法以其难以理解而著称,主要体现在两个方面:

1、Lamport最初的论文以叙事的方式叙述了算法核心

2、算法即使有非常严谨的推导过程也很难被确定为严格正确的(但目前事实证明确实是有效的)

目前在网络上很多类似的推导过程,自己看着也很困惑,所以尝试使用代码实现了这个过程,记录如下:


importjava.util.ArrayList;

importjava.util.List;

importjava.util.Random;

/**

* Phase 1

* (a) A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.

* (b) If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered pro-posal (if any) that it has accepted.

* Phase 2

* (a) If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v , where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.

* (b) If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

*/

public classPaxos {

public static voidmain(String[] args) {

ComputerManager computerManager =newPaxos().newComputerManager();

try{

computerManager.start(7);

}catch(Exception e) {

e.printStackTrace();

}

}

classComputerManager {

privateListcomputers=newArrayList();//定义一个集合

privateIntegerstartSize;

/**

*启动所有服务器

*

*@throwsException

*/

public voidstart(Integer startSize)throwsException {

if(computers!=null&&computers.size() >0)

throw newException("restart error");

this.startSize= startSize;

Paxos paxos =newPaxos();

for(inti =0;i < startSize;i++) {

Computer computer = paxos.newComputer(this);

Thread thread =newThread(computer);

thread.start();

}

}

/**

*启动完成的服务器注册

*

*@return

*/

public voidregister(Computer computer) {

computers.add(computer);

}

/**

*获取所有服务器

*

*@return

*/

publicIntegergetHelfSize() {

returncomputers.size() /2+1;

//            return startSize / 2 + 1;

}

/**

*获取一个法定集合

*

*@return

*/

public synchronizedListgetLegalComputers() {

List list =newArrayList();

intcount =0;

intcomputerSize =computers.size();

inthelfCount = computerSize /2+1;

Random random =newRandom();

while(count < helfCount) {

//生成一个随机数

int_random = Math.abs(random.nextInt(computerSize));

if(_random >=0&& _random < computerSize) {

Computer _computer =computers.get(_random);

if(!list.contains(_computer)) {

list.add(_computer);

count++;

}

}

}

returnlist;

}

}

classComputerimplementsRunnable {

privateIntegerid= Math.abs(newRandom().nextInt());//服务器ID

privateIntegermaxN;//当前接收到的提案号

privateIntegeracceptN;//已经同意的提案号

privateIntegeracceptV;//已经同意的提案号对应的值

privateComputerManagercomputerManager;

Computer(ComputerManager computerManager) {

this.computerManager= computerManager;

}

public synchronizedObject[]prepaer(Integer acceptN) {

System.out.println("---------------------------------------------------分割线------------------------------");

System.out.println(acceptN +"申请提案:"+this.id+".........."+this.maxN+"........"+this.acceptN+"......"+this.acceptV);

/*这里模拟一个断网情况,如果随机为2则断网*/

Random random =newRandom();

intstate = random.nextInt(10);

if(state ==2)

return null;

/*以下为正常情况*/

//如果之前没有接受过提案,直接返回null

if(maxN==null) {

this.maxN= acceptN;//令当前接收到的提案号=当前申请的提案号

return newObject[]{"pok", null, null};

}

if(maxN> acceptN) {

//由于当前申请提案号小于已经同意的提案号,所以不接收提案申请

return newObject[]{"error", null, null};

}

if(acceptN >maxN) {//判断新申请的提案是否为新提案

this.maxN= acceptN;//令当前接收到的提案号=当前申请的提案号

if(this.acceptN==null) {//如果之前没有通过任何提案,返回null

return newObject[]{"pok", null, null};

}else{

//如果之前同意过提案,返回最后同意的提案编号和提案值

return newObject[]{"pok", this.acceptN, this.acceptV};

}

}

return null;

}

public synchronizedStringaccept(Integer acceptN,Integer acceptV) {

//首先当前申请的提案号acceptN不能小于maxN

if(maxN<= acceptN) {

maxN= acceptN;

this.acceptN= acceptN;

this.acceptV= acceptV;

return"aok";

}

return"error";

}

/**

*进行选举

*/

public voidpaxos(Computer computer) {

//获取一个法定集合

List computers =computerManager.getLegalComputers();

Integer _acceptN =0;

Integer _acceptV =0;

intcount =0;

Integer cid = CId.getCid();

for(Computer _computer : computers) {

Object[] prepaer = _computer.prepaer(cid);//申请提交提案

if(prepaer ==null)

continue;

System.out.println(cid +"("+ _acceptN +":"+ _acceptV +")"+"返回提案:"+ _computer.id+".........."+ prepaer[0] +"........"+ prepaer[1] +"......"+ prepaer[2]);

String state = (String) prepaer[0];

if("pok".equals(state))//接收到申请的情况

{

count++;

if(_acceptN ==0&& prepaer[1] ==null) {

//生成一个新的acceptV

_acceptV = computer.id;

}else{

Integer acceptN = (Integer) prepaer[1];

Integer acceptV = (Integer) prepaer[2];

//使用返回的acceptV

if(acceptN >= _acceptN) {

_acceptN = acceptN;

_acceptV = acceptV;

}

}

}

}

//如果接收到的回复超过了半数,则正式提交提案

if(count >=computerManager.getHelfSize()) {

_acceptN = cid;

//获取一个法定集合

List computers1 =computerManager.getLegalComputers();

intacount =0;

for(Computer _computer : computers1) {

System.out.println(_acceptN +"("+ _acceptV +")"+"提交提案:"+ _computer.id+".........."+ _computer.maxN+"........"+ _computer.acceptN+"......"+ _computer.acceptV);

String accept = _computer.accept(_acceptN,_acceptV);//申请提交提案

if("aok".equals(accept)) {

acount++;

}

}

if(acount >=computerManager.getHelfSize()) {

System.out.println("提案被多数通过:"+ _acceptN +"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"+ _acceptV);

for(Computer _computer : computers1) {

System.out.println(_computer.id+".........."+ _computer.maxN+"........"+ _computer.acceptN+"......"+ _computer.acceptV);

}

}

}

}

/**

*启动命令

*/

public voidrun() {

//            Random random = new Random();

//            try {

//                Thread.sleep(random.nextInt(10) * 1000);//随机延迟几秒,模拟消息发送过程或启动过程

//            } catch (InterruptedException e) {

//                e.printStackTrace();

//            }

Computer computer =this;

computerManager.register(computer);//注册到启动集群中

paxos(computer);

}

}

/**

*提案号管理类d

*/

static classCId {

privateIntegercid=1;

privateCId() {

}

private staticCIdinstance=newCId();

public synchronized staticIntegergetCid() {

returninstance.cid++;

}

}

}

程序在运行过程中会出现异常,但是也能选出合适的人选作为Leader,这不正是paxos所期望的事情么?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容