canal启动的时候第二个启动的类是 CanalAlarmHandler 是canal的alarm报警机制。
if (!alarmHandler.isStart()) {
alarmHandler.start();
}
CanalAlarmHandler 是接口 定义了
/**
* 发送对应destination的报警
*
* @param destination
* @param msg
*/
void sendAlarm(String destination, String msg);
方法,具体是的实现类配置在file-instance.xml配置文件中
<!-- 报警处理类 -->
<bean id="alarmHandler" class="com.alibaba.otter.canal.common.alarm.LogAlarmHandler" />
由于LogAlarmHandler在canal.common模块,所以这次整体分析下这个模块的代码。
1.alarm包 **
CanalAlarmHandler 接口定义了规范,LogAlarmHandler**实现具体的发送方法,看到源码中只是简单的打印了一条日志,所以我们可以自己去实现CanalAlarmHandler 类或者继承LogAlarmHandler 重写sendAlarm方法,实现发邮件等自己的业务通知报警方式。
2.utils包
AddressUtils 获取host的ip和address工具类。
BooleanMutex类 厉害了,这个是用于并发控制的锁实现。根据其实现AQS的代码方式 知道这是个共享锁 关于这一块可以看前面的博客java.util.concurrent.locks包下的锁实现分析
可以看下调用的地方
主要实现系统初始化/授权控制,没权限时阻塞等待。有权限时所有线程都可以快速通过,或者只初始化一次之类的并发问题,到时遇到对应的模块再分析之。
CanalToStringStyle 主要用于输出格式的自定义,可以看ToStringBuilder ToStringStyle这篇文章的记录。
JsonUtils明显 阿里的json处理工具,这里只是包装了下fastjson的方法。
NamedThreadFactory 这个类用于创建线程的工厂,实现了ThreadFactory接口,主要是线程池编程中用到的,该类中还用到了ThreadGroup属性,将这些创建的类放到了一个ThreadGroup中,方便管理。
final private ThreadGroup group;
public NamedThreadFactory(String name, boolean daemon){
this.name = name;
this.daemon = daemon;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
}
group的复制是System group组,也就是系统管理的组,这里还有个需要注意的是一个属性
final private boolean daemon;
public NamedThreadFactory(){
this(DEFAULT_NAME, true);
}
public NamedThreadFactory(String name){
this(name, true);
}
这里默认是true,可以看出创建的都是守护线程。守护线程是跟jvm的生命一致的。当没有非守护线程后jvm就退出了,守护线程也就退出了。
ThreadGroup是为了方便线程管理出现了,可以统一设定线程组的一些属性,比如setDaemon,设置未处理异常的处理方法,设置统一的安全策略等等;也可以通过线程组方便的获得线程的一些信息。
java中每一个ThreadGroup都可以包含一组的子线程和一组子线程组,在一个进程中线程组是以树形的方式存在,通常情况下根线程组是system线程组。system线程组下是main线程组,默认情况下第一级应用自己的线程组是通过main线程组创建出来的。看线下面例子和输出
package com.alibaba.otter.canal.common;
public class ThreadGroupDemo {
public static void main(String[] args) {
printGroupInfo(Thread.currentThread());
Thread appThread = new Thread(new Runnable(){
@Override
public void run() {
for (int i=0;i<5;i++) {
System.out.println("do loop " + i);
}
}
});
appThread.setName("appThread");
appThread.start();
printGroupInfo(appThread);
}
static void printGroupInfo(Thread t) {
ThreadGroup group = t.getThreadGroup();
System.out.println("thread " + t.getName() + " group name is "
+ group.getName()+ " max priority is " + group.getMaxPriority()
+ " thread count is " + group.activeCount());
ThreadGroup parent=group;
do {
ThreadGroup current = parent;
parent = parent.getParent();
if (parent == null) {
break;
}
System.out.println(current.getName() + "'s parent is " + parent.getName());
} while (true);
System.out.println("--------------------------");
}
}
thread main group name is main max priority is 10 thread count is 1
main's parent is system
--------------------------
thread appThread group name is main max priority is 10 thread count is 2
main's parent is system
--------------------------
do loop 0
do loop 1
do loop 2
do loop 3
do loop 4
关于Thread的setDaemon的作用可以看这篇文章Java中守护线程的总结(转) 这是我转的一篇文章,感觉写的好久不重复造轮子了。
UriUtils 这个类主要做url参数的解析。我们可以以后用到自己的项目中去。
3.zookeeper
该包下主要指对zk操作用到的工具类。zk server运行时的内存数据,各种序列化等,最zk client方法的进一步包装。写zk 源码分析的时候再细入。
4.剩下的三个类
CanalLifeCycle 接口定义了canal中的生命周期方法,start stop 和isStart方法,好多涉及到启动停止的都会实现该类。AbstractCanalLifeCycle抽象类简单实现了CanalLifeCycle 接口,大多继承的子类都重写了该方法。
CanalException 是定义的一个异常类继承了NestableRuntimeException类,比较简单。
canal.common 完毕。。。。。。