序
本文主要研究一下scalecube-cluster的GossipProtocol
GossipProtocol
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocol.java
/**
* Gossip Protocol component responsible for spreading information (gossips) over the cluster
* members using infection-style dissemination algorithms. It provides reliable cross-cluster
* broadcast.
*/
public interface GossipProtocol {
/** Starts running gossip protocol. After started it begins to receive and send gossip messages */
void start();
/** Stops running gossip protocol and releases occupied resources. */
void stop();
/**
* Spreads given message between cluster members.
*
* @return future result with gossip id once gossip fully spread.
*/
Mono<String> spread(Message message);
/** Listens for gossips from other cluster members. */
Flux<Message> listen();
}
- GossipProtocol接口定义了start、stop、spread、listen方法
GossipProtocolImpl
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol {
private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
// Qualifiers
public static final String GOSSIP_REQ = "sc/gossip/req";
// Injected
private final Member localMember;
private final Transport transport;
private final GossipConfig config;
// Local State
private long currentPeriod = 0;
private long gossipCounter = 0;
private Map<String, GossipState> gossips = new HashMap<>();
private Map<String, MonoSink<String>> futures = new HashMap<>();
private List<Member> remoteMembers = new ArrayList<>();
private int remoteMembersIndex = -1;
// Disposables
private final Disposable.Composite actionsDisposables = Disposables.composite();
// Subject
private final FluxProcessor<Message, Message> subject =
DirectProcessor.<Message>create().serialize();
private final FluxSink<Message> sink = subject.sink();
// Scheduled
private final Scheduler scheduler;
/**
* Creates new instance of gossip protocol with given memberId, transport and settings.
*
* @param localMember local cluster member
* @param transport cluster transport
* @param membershipProcessor membership event processor
* @param config gossip protocol settings
* @param scheduler scheduler
*/
public GossipProtocolImpl(
Member localMember,
Transport transport,
Flux<MembershipEvent> membershipProcessor,
GossipConfig config,
Scheduler scheduler) {
this.transport = Objects.requireNonNull(transport);
this.config = Objects.requireNonNull(config);
this.localMember = Objects.requireNonNull(localMember);
this.scheduler = Objects.requireNonNull(scheduler);
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
membershipProcessor //
.publishOn(scheduler)
.subscribe(this::onMemberEvent, this::onError),
transport
.listen()
.publishOn(scheduler)
.filter(this::isGossipReq)
.subscribe(this::onGossipReq, this::onError)));
}
@Override
public void start() {
actionsDisposables.add(
scheduler.schedulePeriodically(
this::doSpreadGossip,
config.getGossipInterval(),
config.getGossipInterval(),
TimeUnit.MILLISECONDS));
}
@Override
public void stop() {
// Stop accepting gossip requests and spreading gossips
actionsDisposables.dispose();
// Stop publishing events
sink.complete();
}
@Override
public Mono<String> spread(Message message) {
return Mono.fromCallable(() -> message)
.subscribeOn(scheduler)
.flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));
}
@Override
public Flux<Message> listen() {
return subject.onBackpressureBuffer();
}
private void onMemberEvent(MembershipEvent event) {
Member member = event.member();
if (event.isRemoved()) {
remoteMembers.remove(member);
}
if (event.isAdded()) {
remoteMembers.add(member);
}
}
private void onGossipReq(Message message) {
long period = this.currentPeriod;
GossipRequest gossipRequest = message.data();
for (Gossip gossip : gossipRequest.gossips()) {
GossipState gossipState = gossips.get(gossip.gossipId());
if (gossipState == null) { // new gossip
gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
sink.next(gossip.message());
}
gossipState.addToInfected(gossipRequest.from());
}
}
private boolean isGossipReq(Message message) {
return GOSSIP_REQ.equals(message.qualifier());
}
private String createAndPutGossip(Message message) {
long period = this.currentPeriod;
Gossip gossip = new Gossip(generateGossipId(), message);
GossipState gossipState = new GossipState(gossip, period);
gossips.put(gossip.gossipId(), gossipState);
return gossip.gossipId();
}
//......
}
- GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
- 它的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
- start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中
doSpreadGossip
scalecube-cluster-2.2.5/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
public final class GossipProtocolImpl implements GossipProtocol {
//......
private List<Member> remoteMembers = new ArrayList<>();
private int remoteMembersIndex = -1;
private void doSpreadGossip() {
// Increment period
long period = currentPeriod++;
// Check any gossips exists
if (gossips.isEmpty()) {
return; // nothing to spread
}
try {
// Spread gossips to randomly selected member(s)
selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));
// Sweep gossips
sweepGossips(period);
} catch (Exception ex) {
LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);
}
}
private void spreadGossipsTo(long period, Member member) {
// Select gossips to send
List<Gossip> gossips = selectGossipsToSend(period, member);
if (gossips.isEmpty()) {
return; // nothing to spread
}
// Send gossip request
Address address = member.address();
gossips
.stream()
.map(this::buildGossipRequestMessage)
.forEach(
message ->
transport
.send(address, message)
.subscribe(
null,
ex ->
LOGGER.debug(
"Failed to send GossipReq[{}]: {} to {}, cause: {}",
period,
message,
address,
ex.toString())));
}
private List<Gossip> selectGossipsToSend(long period, Member member) {
int periodsToSpread =
ClusterMath.gossipPeriodsToSpread(config.getGossipRepeatMult(), remoteMembers.size() + 1);
return gossips
.values()
.stream()
.filter(
gossipState -> gossipState.infectionPeriod() + periodsToSpread >= period) // max rounds
.filter(gossipState -> !gossipState.isInfected(member.id())) // already infected
.map(GossipState::gossip)
.collect(Collectors.toList());
}
private List<Member> selectGossipMembers() {
int gossipFanout = config.getGossipFanout();
if (remoteMembers.size() < gossipFanout) { // select all
return remoteMembers;
} else { // select random members
// Shuffle members initially and once reached top bound
if (remoteMembersIndex < 0 || remoteMembersIndex + gossipFanout > remoteMembers.size()) {
Collections.shuffle(remoteMembers);
remoteMembersIndex = 0;
}
// Select members
List<Member> selectedMembers =
gossipFanout == 1
? Collections.singletonList(remoteMembers.get(remoteMembersIndex))
: remoteMembers.subList(remoteMembersIndex, remoteMembersIndex + gossipFanout);
// Increment index and return result
remoteMembersIndex += gossipFanout;
return selectedMembers;
}
}
private Message buildGossipRequestMessage(Gossip gossip) {
GossipRequest gossipRequest = new GossipRequest(gossip, localMember.id());
return Message.withData(gossipRequest)
.qualifier(GOSSIP_REQ)
.sender(localMember.address())
.build();
}
private void sweepGossips(long period) {
// Select gossips to sweep
int periodsToSweep =
ClusterMath.gossipPeriodsToSweep(config.getGossipRepeatMult(), remoteMembers.size() + 1);
Set<GossipState> gossipsToRemove =
gossips
.values()
.stream()
.filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
.collect(Collectors.toSet());
// Check if anything selected
if (gossipsToRemove.isEmpty()) {
return; // nothing to sweep
}
// Sweep gossips
LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
for (GossipState gossipState : gossipsToRemove) {
gossips.remove(gossipState.gossip().gossipId());
MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());
if (sink != null) {
sink.success(gossipState.gossip().gossipId());
}
}
}
//......
}
- doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
- selectGossipMembers方法会根据gossipFanout配置随机选择gossipFanout个member,这里维护了remoteMembersIndex,具体是对remoteMembers进行subList,当remoteMembersIndex小于0或remoteMembersIndex + gossipFanout > remoteMembers.size()时会Collections.shuffle(remoteMembers)并重置remoteMembersIndex为0,之后对remoteMembersIndex加上gossipFanout
- spreadGossipsTo方法首先执行selectGossipsToSend获取要发送的gossips,然后通过buildGossipRequestMessage构造GOSSIP_REQ消息,最后通过transport.send方法发送
- sweepGossips方法则选取periodsToSweep,然后从gossips移除period > gossipState.infectionPeriod() + periodsToSweep的gossipState
小结
- GossipProtocol接口定义了start、stop、spread、listen方法;GossipProtocolImpl实现了GossipProtocol接口,它维护了名为gossips的gossipId与GossipState的map,以及remoteMembers列表
- GossipProtocolImpl的构造器订阅了membershipProcessor,触发onMemberEvent方法,该方法根据MembershipEvent来对remoteMembers进行添加或移除member;订阅了transport.listen(),过滤出GossipReq,触发onGossipReq方法,该方法合并GossipRequest的gossips到本地的gossips,对于新的gossip的message则发送到sink,并维护该gossip的gossipState,将请求的memberId添加到infected中;spread方法则将message放入到本地的gossips中
- GossipProtocolImpl的start方法每隔gossipInterval执行doSpreadGossip方法;spread方法则通过createAndPutGossip创建Gossip并放入gossips中;doSpreadGossip方法首先递增currentPeriod,然后执行selectGossipMembers,遍历该member执行spreadGossipsTo,最后执行sweepGossips
这里GossipProtocolImpl注册了onMemberEvent及onGossipReq,其中onMemberEvent用于监听MembershipEvent,并根据该event来维护remoteMembers列表;onGossipReq则是监听其他member的doSpreadGossip方法发送过来的GossipReq消息,合并该消息的gossips到本地的gossips;而doSpreadGossip方法则是每隔gossipInterval执行,根据gossipFanout配置随机选择gossipFanout个member,然后针对每个member选择要发送的gossips进行spread(
onGossipReq及spread方法会更改gossips,而每隔gossipInterval触发的doSpreadGossip则从gossips选择待spread的消息进行发送
)