1. 服务发现类
public class ServiceDiscovery {
private static Logger log = LoggerFactory.getLogger(ServiceRegistry.class);
private String zookeeperConnect;
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> serviceAddresses = new ArrayList<>();
private String nodePath = "/ppath";
private int timeout = 3000;
public ServiceDiscovery(String zookeeperConnect) {
this.zookeeperConnect = zookeeperConnect;
ZooKeeper zk = connectServer();
if (zk != null) {
//监听节点
watchNode(zk);
}
}
public String discover() {
String data = null;
int size = serviceAddresses.size();
if(size > 0) {
if (size == 1) {
data = serviceAddresses.get(0);
log.info("unique service address :{}", data);
} else {
//使用随机分配法,简单的负载均衡法
data = serviceAddresses.get(ThreadLocalRandom.current().nextInt(size));
log.info("choose an address : {}",data);
}
}
return data;
}
/**
* 连接上zk以后,监听服务列表相关的节点
* @param zk
*/
private void watchNode(ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(nodePath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//监听节点变化事件
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
nodeList.stream().forEach(node -> {
byte[] data = new byte[0];
try {
data = zk.getData(nodePath + "/" + node, false, null);
} catch (KeeperException | InterruptedException e) {
log.error(e.getMessage(), e);
}
dataList.add(new String(data));
log.info("node data: {}", dataList);
this.serviceAddresses = dataList;
});
} catch (KeeperException | InterruptedException e) {
log.error(e.getMessage(), e);
}
}
/**
* 连接 zookeeper
* @return
*/
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(zookeeperConnect, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
log.error("", e);
e.printStackTrace();
}
return zk;
}
}
2. 服务注册类
public class ServiceRegistry {
private static Logger log = LoggerFactory.getLogger(ServiceRegistry.class);
private String zookeeperConnect;
private String ip;
private String port;
private CountDownLatch latch = new CountDownLatch(1);
private String parentPath = "/ppath";
private String childPath = "/cpath";
private int timeout = 3000;
public ServiceRegistry(String zookeeperConnect, String ip, String port){
this.zookeeperConnect = zookeeperConnect;
this.ip = ip;
this.port = port;
register(this.ip + ":" + this.port);
}
/**
* 与zookeeper建立连接并设置监听
* @return
*/
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try{
zk = new ZooKeeper(zookeeperConnect, timeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
log.info("Watcher..........");
}
}
});
latch.await();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return zk;
}
/**
* 创建节点
* @param zk
* @param data
*/
private void createNode(ZooKeeper zk, String data) {
try{
Stat stat = zk.exists(parentPath, true);
if(stat == null) {
zk.create(parentPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String createPath = zk.create(parentPath + childPath, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("create zookeeper node ({} => {} => {})", data, createPath);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 服务注册
* @param data
*/
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
createNode(zk, data);
}
}
}
}
3. 配置类
@Configuration
public class CommonConfig {
@Bean
public ServiceDiscovery serviceDiscovery(@Value("${zookeeper.connect_string}") String zookeeperConnect){
return new ServiceDiscovery(zookeeperConnect);
}
@Bean
public ServiceRegistry serviceRegistry( @Value("${zookeeper.connect_string}") String zookeeperConnect,
@Value("${server.ip}") String ip,
@Value("${server.port}") String port){
return new ServiceRegistry(zookeeperConnect, ip, port);
}
}
4. 测试
@RestController
public class DiscoverController {
@Autowired
private ServiceDiscovery serviceDiscovery;
@RequestMapping("/adderss")
public String discoveryAddress() {
return serviceDiscovery.discover();
}
}
5. application.properties
image.png