站在巨人的肩膀上
- ContinuousQuery(简称CQ):持续查询,是指Client可以按照OQL(Object Query Language)查询语句注册自己感兴趣的event,而这些event将发送给Client的Listener,一旦Server有event发生,就会将此event传递给Client。
- 监听的事件类型:update create destroy
- CQ查询的特性:
能够使用标准的OQL语句
对CQ事件进行管理
完全整合C/S架构
基于数据值的订阅
活跃查询执行
一个简单的业务需求:
Client向Server订阅监听年龄在15~35岁Customer之间的数据:
Server
@SpringBootApplication
@CacheServerApplication(name = "GemFireContinuousQueryServer")
public class Application {
@Bean(name = "Customers")
PartitionedRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
PartitionedRegionFactoryBean<Long, Customer> customers = new PartitionedRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(false);
customers.setPersistent(false);
return customers;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
pom.xml 中的关键依赖:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.gj.demo</groupId>
<artifactId>gemfire-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
Client
@SpringBootApplication
@ClientCacheApplication(name = "GemFireContinuousQueryClient", subscriptionEnabled = true)
@SuppressWarnings("unused")
public class Application {
@Bean(name = "Customers")
ClientRegionFactoryBean<Long, Customer> customersRegion(GemFireCache gemfireCache) {
ClientRegionFactoryBean<Long, Customer> customers = new ClientRegionFactoryBean<>();
customers.setCache(gemfireCache);
customers.setClose(true);
customers.setShortcut(ClientRegionShortcut.PROXY);
return customers;
}
@Bean
ContinuousQueryListenerContainer continuousQueryListenerContainer(GemFireCache gemfireCache) {
Region<Long, Customer> customers = gemfireCache.getRegion("/Customers");
ContinuousQueryListenerContainer container = new ContinuousQueryListenerContainer();
container.setCache(gemfireCache);
container.setQueryListeners(asSet(ageQueryDefinition(customers, 15,35)));
return container;
}
private ContinuousQueryDefinition ageQueryDefinition(Region<Long, Customer> customers, int
ageFrom,int ageTo){
String query = String.format("SELECT * FROM /Customers c WHERE c.getAge().intValue() > %d AND c.getAge().intValue() < %d ", ageFrom,ageTo);
return new ContinuousQueryDefinition("Young Query ",query,newQueryListener(customers,"Young Query"));
}
private ContinuousQueryListener newQueryListener(Region<Long, Customer> customers, String qualifier) {
return event -> {
System.err.printf("new order!" + event.toString());
};
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Customer:
@Region
public class Customer implements Serializable {
private static final long serialVersionUID = -3860687524824507124L;
private String firstname, lastname;
private int age;
Long id;
//此处省略了get,set方法
@Override
public String toString() {
return String.format("%1$s %2$s %3$s", getFirstname(), getLastname(),getAge());
}
}
程序跑起来~
分别启动Server和Client,在浏览器传入相关参数:
查看Client的控制台日志:
至此,就是一个完整的Client向Server订阅监听,收到订阅消息的全过程。
CQ查询的数据流
当数据条目在服务器端更新时,新数据会经过下面的步骤:
1.region条目发生变更
2.每一个事件,服务器的CQ处理框架检查是否与运行的CQ匹配
3.如果数据条目的变更匹配了CQ查询,CQ事件将被发送到客户端上的CQ监听器,CQ监听器获得此事件。
如上图所示:
X条目新值和旧值都匹配了CQ查询,因此查询结果的更新的事件被发送出来。
Y条目旧值匹配了,但这是查询结果的一部分,Y条目操作为失败 ,因为查询结果被销毁的事件被发送出来。
Z条目为新创,并不匹配CQ事件,所以事件不发送。
值得注意的是,CQ并不更新客户端的Region,CQ作为CQ监听器的通告工具而服务,CQ监听器可以按照客户应用的要求任意编程。
当一个CQ运行在服务器Region的时候,每一个Server条目更新线程都放在CQ查询中,如果old value或者new value 满足查询条件,线程将放到CqEvent的Client队列中去,一旦Client接受了此事件,CqEvent将被传递到CqListeners的onEvent方法上,如下图所示:
QueryService 接口提供的方法
create a new CQ and specify whether it is durable
execute a CQ,with or without an initial set
list all the CQs registered by the client
close and stop CQs at the cache and region level
get a handle on CqStatistics for the client
CqQuery:管理持续查询的方法,通过QueryService 创建,用于开启和停止CQ执行,同时查询其他与CQ想关联的对象,such as CQ属性,CQ统计和CQ状态。
CqListener:用于处理持续查询的事件。
*CqEvent:提供了从Server发送的所有的CQ事件信息,此事件被传递到CqListener的onEvent方法。
程序媛小白一枚,如有错误,烦请批评指正!(#.#)