最近接到一个关于dubbo调用的需求,要求如下:
- 不依赖dubbo服务提供方的sdk调用其服务,只知道服务的包名、接口名、方法名。
- 检索出某个特定包下的所有dubbo服务,对其进行监听(监听提供者的上线、下线、版本更新、参数变化)。
现在功能已实现,在此记录一下。
一、大体方案
1.不依赖提供方的sdk调用其服务
可以通过dubbo的泛化调用实现,关于dubbo的泛化调用,可以查看:
http://dubbo.apache.org/zh-cn/blog/dubbo-generic-invoke.html
我们可以了解到,要进行泛化调用,需要的参数有:
- zookeeper的地址以及端口号。
- 服务的包名+接口名+方法名。
- 方法的参数类型,调用时传入的参数。
如果被泛化调用的方法返回的类型是pojo,那么会自动转为Map<String, Any>格式。
泛化调用需要的参数,只有方法的参数类型需要想办法获取。通过查阅资料,得知获取方法的参数类型有两种:
1.使用dubbo 2.7之后的版本且服务提供方在provider.xml中使用了<dubbo:metadata-report>标签,这样zookeeper上就会存储dubbo服务方法的参数类型,否则不会存。
2.通过zkClient获取到服务提供者的ip地址、端口号,再通过socket编程连接到服务提供方获取方法的参数类型。
通过实验发现我这的dubbo版本在2.7之前或没用<dubbo:metadata-report>标签,只能使用方法2了。
2.检索出特定包下的服务并监听
首先是检索出特定包下的服务,查阅资料得知可以通过zkClient连接到zookeeper,然后获取注册到该zookeeper的所有服务,最后筛选出特定包下的服务即可。
服务的监听,包括两种:
1.有新的特定包的服务时出现时要监听到。
2.监听服务的提供者上线、下线、版本更新、参数变化。
经过实验,这两点都能通过给zkClient添加IZkChildListener监听到。
二、一些关键的代码
1.泛化调用
class GenericCaller private constructor(val env: String) {
private val application = ApplicationConfig()
private val registry = RegistryConfig()
// 按照dubbo 泛化调用文档所说,缓存ReferenceConfig<GenericService>
private val referenceMap : ConcurrentHashMap<String, ReferenceConfig<GenericService>> = ConcurrentHashMap(64)
companion object {
private val listCaller: List<GenericCaller> by lazy { listOf(GenericCaller("dev"), GenericCaller("fat")) }
// env 用于区分开发环境、测试环境
fun get(env: String): GenericCaller? {
if (env == "dev") {
return listCaller[0]
}
if (env == "fat") {
return listCaller[1]
}
return null
}
}
init {
application.name = "$env-databank-generic-consumer"
// 到配置中心获取zookeeper的地址和端口号
val zkHost = ApolloConfigReaderUtils.getString("${env}.zookeeper.hostName")
val zkPort = ApolloConfigReaderUtils.getString("${env}.zookeeper.port")
registry.address = "zookeeper://$zkHost:$zkPort"
application.registry = registry
}
fun call(interfaceName: String, methodName: String, version: String, group: String, parameters: List<Parameter>,
jsonObj: JSONObject): Any {
val listParameterValue = parameters.map {
when (it.parameterType) {
"int", "java.lang.Integer" -> jsonObj.getInteger(it.name)
"String", "java.lang.String" -> jsonObj.getString(it.name)
"com.alibaba.fastjson.JSONArray" -> jsonObj.getJSONArray(it.name)
"long" -> jsonObj.getLong(it.name)
"boolean" -> jsonObj.getBoolean(it.name)
else -> jsonObj[it.name] ?: throw HandlerInvokeException("no parameter!")
}
}
val genericService = getReferenceConfig(interfaceName, version, group).get()
// 泛化调用
return genericService.`$invoke`(methodName, parameters.map { it.parameterType }.toTypedArray(),
listParameterValue.toTypedArray())
}
// 先到缓存取ReferenceConfig,取不到再new
private fun getReferenceConfig(interfaceName: String, version : String, group : String) : ReferenceConfig<GenericService>{
val key = "$interfaceName-$version-$group"
if (referenceMap[key] == null){
synchronized("$env-$key".intern()){
if (referenceMap[key] == null){
val referenceTemp = ReferenceConfig<GenericService>()
referenceTemp.setInterface(interfaceName)
referenceTemp.isGeneric = true
referenceTemp.application = application
referenceTemp.version = version
referenceTemp.group = group
referenceMap[key] = referenceTemp
}
}
}
return referenceMap[key]!!
}
}
2.服务的监听
ZkServiceListener监听是否有新的服务
class ZkServiceListener(val env : String) : IZkChildListener {
override fun handleChildChange(path : String?, listService: MutableList<String>?) {
println("--------This is ZkServiceListener-------")
if (listService == null || listService.isEmpty()){
return
}
// 筛选出databank的服务
val listInnerService = listService.filter { it.contains(ZkCommandLineRunner.getKeyWord()) }
synchronized("$env/dubbo".intern()){
// 筛选出cache中没有的服务, 添加到cache
val listNewInnerService = listInnerService.filter {!DubboServiceCache.contains(it, env) }
listNewInnerService.forEach{
ZkCommandLineRunner.get(env)?.updateProvidersToCache(it)
}
}
}
}
ZkProviderListener监听已有服务的上线、下线
class ZkProviderListener(val env : String) : IZkChildListener {
private val moduleFactory : ModuleFactory by lazy { SpringContextUtil.getBean("moduleFactory") as ModuleFactory}
override fun handleChildChange(path : String?, providersUrl: MutableList<String>?) {
println("--------This is ZkProviderListener-------")
// providerUrl为null或长度为0,说明该服务没有提供方
val tmpUrls = providersUrl ?: listOf<String>()
if (path.isNullOrBlank()){
return
}
// path : /dubbo/xxxxx/providers
val serviceName = path.substring(7, path.lastIndexOf("/"))
ZkCommandLineRunner.get(env)?.updateProvidersToCache(serviceName, tmpUrls)
moduleFactory.updateModuleState(serviceName, env)
// 没法应对参数类型不变,但参数意义改变的情况。
}
}
ZkCommandLineRunner负责启动时读取dubbo服务以及添加上述的监听器
/**
* 服务启动时自动到Zookeeper读取所有dubbo服务,然后筛选出与KEY_WORD相关的服务
*/
class ZkCommandLineRunner private constructor(private val env : String){
private val zkServer : String
private lateinit var zkClient : ZkClient
private val listJob = mutableListOf<Job>()
private var inited = false
companion object{
private val listRunner : List<ZkCommandLineRunner> by lazy { listOf(ZkCommandLineRunner("dev"), ZkCommandLineRunner("fat")) }
fun get(env : String) : ZkCommandLineRunner?{
if (env == "dev"){
return listRunner[0]
}
if (env =="fat"){
return listRunner[1]
}
return null
}
private var KEY_WORD = ".databank."
fun getKeyWord() : String{
return KEY_WORD
}
}
init {
val zkHost = ApolloConfigReaderUtils.getString("${env}.zookeeper.hostName")
val zkPort = ApolloConfigReaderUtils.getString("${env}.zookeeper.port")
zkServer = "$zkHost:$zkPort"
KEY_WORD = ApolloConfigReaderUtils.getString("${env}.key_word")
}
fun init() {
zkClient = ZkClient(zkServer, 5000, 5000, MyZkSerializer())
// 监听是否有新增的服务
zkClient.subscribeChildChanges("/dubbo", ZkServiceListener(env))
val listInnerService = getServiceListAndInitCache()
val zkProviderListener = ZkProviderListener(env)
listInnerService.forEach{
// 监听已有服务的上下线、更新
zkClient.subscribeChildChanges("/dubbo/$it/providers", zkProviderListener)
updateProvidersToCache(it)
}
inited = true
}
private fun getServiceListAndInitCache() : List<String>{
// 加锁,防添加service时zookeeper有service上下线
synchronized("$env/dubbo".intern()){
val dubboChildren = zkClient.getChildren("/dubbo")
val listInnerService = dubboChildren.filter { it.contains(KEY_WORD) }
ProviderEditor.initCache(listInnerService, env)
return listInnerService
}
}
// 该函数在启动时调用
fun updateProvidersToCache(serviceName : String){
// 加锁,防添加providers的同时zookeeper上providers的变化
synchronized("$env/$serviceName".intern()){
val listProviderUrl = zkClient.getChildren("/dubbo/$serviceName/providers")
updateProvidersToCache(serviceName, listProviderUrl)
}
}
// 该函数在ZkProviderListener监听到provider变化时调用
fun updateProvidersToCache(serviceName: String, listProviderUrl : List<String>){
// 加锁,防添加providers的同时zookeeper上providers的变化
synchronized("$env/$serviceName".intern()){
val listProvider = ProviderEditor.updateProviders(serviceName, listProviderUrl, env)
listProvider.forEach{
initProviderMethods(it)
}
}
}
// 判断协程是否全部执行完毕
fun isCompleted() : Boolean{
if (listJob.isEmpty()){
return true
}
listJob.forEach{
if (!it.isCompleted){
return false
}
}
listJob.clear()
return true
}
private fun initProviderMethods(provider : Provider){
// 新建协程,分别用socket到dubbo服务方拿方法信息
val job = GlobalScope.launch {
val socket = AccessDubboProviderSocket(provider.host, provider.port)
val strMethods = socket.getMethodsByInterfaceName(provider.serviceName)
strMethods.forEach{
val splitSpace = it.split(" ")
val returnType = splitSpace[0]
val splitLeftParenthesis = splitSpace[1].split("(")
val methodName = splitLeftParenthesis[0]
val parameterTypes = splitLeftParenthesis[1].substring(0, splitLeftParenthesis[1].lastIndexOf(")")).split(",")
provider.methods.add(Method(methodName, returnType, parameterTypes))
}
}
listJob.add(job)
}
fun isInited() : Boolean{
return inited
}
}
3.直接连接dubbo服务提供方,获取方法参数的socket
关于连接dubbo服务方后可执行的命令,可查看:
https://dubbo.gitbooks.io/dubbo-user-book/content/references/telnet.html
/**
* dubbo使用2.7之前的版本 或者2.7之后的版本但没有使用<dubbo:metadata-report>标签,zookeeper不会存接口方法的具体信息
* 该类使用socket的方式直接连接duboo服务提供方,获取接口中方法的参数类型、返回类型
*
*/
class AccessDubboProviderSocket(val host: String, val port: Int) {
// 通过接口名获取该接口所有方法
fun getMethodsByInterfaceName(interfaceName : String): List<String> {
val socket = Socket()
socket.connect(InetSocketAddress(host, port), 3000)
val writer = PrintWriter(socket.getOutputStream())
val input = socket.getInputStream()
try {
writer.println("ls -l $interfaceName")
writer.flush()
val strRead = getReturnString(input)
val strSplit = strRead.split("\r\n")
writer.println("exit")
writer.flush()
// strRead的最后会有一行 "dubbo>" ,将其过滤掉
return strSplit.subList(0, strSplit.size -1 )
} finally {
input.close()
writer.close()
socket.close()
}
}
// 将dubbo 提供方返回的信息全部read,返回一个字符串String
private fun getReturnString(input: InputStream): String {
val strBuilder = StringBuilder()
val byteArray = ByteArray(512)
input.read(byteArray)
var str = String(byteArray).trim('\u0000')
strBuilder.append(str)
while (!strBuilder.endsWith("dubbo>")) {
// byteArray全元素置0, 否则上次read到 "abcdefg" 这次read到"123" ,byteArray的结果是 "123defg",后半不会被覆盖掉
byteArray.fill(0, 0, byteArray.size)
input.read(byteArray)
str = String(byteArray).trim('\u0000')
strBuilder.append(str)
}
return strBuilder.toString()
}
}
4. 保存监听到的服务
/**
* 该类用于操作DubboServiceCache中的provider
*/
object ProviderEditor {
// 此方法往cache内put空的列表是为了表示 "cache已经知道zookeeper上有该服务", 这样zookeeper监听/duubo路径时可以通过cache判断某服务之前是否存在
fun initCache(listService: List<String>, env: String) {
DubboServiceCache.clear(env)
listService.forEach {
DubboServiceCache.put(it, mutableListOf(), env)
}
}
fun updateProviders(serviceName: String, listProviderUrl: List<String>, env: String): List<Provider> {
val listProvider: MutableList<Provider> = mutableListOf()
listProviderUrl.forEach {
val hostPort = UrlUtils.getHostAndPort(it)
val hostPortSplit = hostPort.split(":")
try {
val host = hostPortSplit[0]
val port = hostPortSplit[1].toInt()
val map = UrlUtils.getUrlParameters(it)
val version = map["version"] ?: ""
val group = map["group"] ?: ""
// 不知为何有时会有地址、端口号、版本都相同的、重复的提供者,只好过滤一下
val flag: Boolean = listProvider.none { it.host == host && it.port == port && it.version == version }
if (flag) {
listProvider.add(Provider(host, port, version, mutableListOf(), serviceName, group))
}
} catch (e: Exception) {
e.printStackTrace()
}
}
val tempList = listProvider.sortedBy { it.group }
val listProviderOrderByVersion = tempList.sortedByDescending { it.version }
DubboServiceCache.put(serviceName, listProviderOrderByVersion, env)
return listProviderOrderByVersion
}
}
5. zookeeper URL解析工具(噗,原来是要进行URLDecode,当时不知道。。。)
object UrlUtils {
// 从zookeeper获取的url ,符号都用 % + 符号的ascii码 表示
fun getUrlParameters(url: String): Map<String, String> {
val map = mutableMapOf<String, String>()
if (url.isNullOrBlank()) {
return map
}
val tmpUrl = url.trim()
if (tmpUrl.isNullOrBlank()) {
return map
}
// %3F ?
val urlParts = tmpUrl.split("%3F")
if (urlParts.size == 1) {
return map
}
// %26 &
val params = urlParts[1].split("%26")
params.forEach {
// %3D =
val keyValue = it.split("%3D")
map[keyValue[0]] = keyValue[1]
}
return map
}
fun getHostAndPort(url: String): String {
// %2F /
val strHostAndPort = url.substring(url.indexOf("%2F%2F") + 6, url.lastIndexOf("%2F"))
// $3A :
return strHostAndPort.replace("%3A", ":")
}
}