个人学习笔记分享,当前能力有限,请勿贬低,菜鸟互学,大佬绕道
如有勘误,欢迎指出和讨论,本文后期也会进行修正和补充
前言
前面有一篇文章已经介绍了订阅/发布模式,即生产者和消费者通过一个中介者来交互
- 生产者只负责向中介传递数据,不关心其余步骤
- 消费者在中介者处进行注册,告知中介者自己需要数据
- 中介者接受来自生产者的数据,并传递给在自己这里注册过的消费者
当生产者只有一个的时候,可以省略掉中介者,直接在生产者处注册消费者
通常满足N-1-N或者1-N的交互模型
消费者在中介者处或者直接向生产者订阅消息,而生产者负责发布消息,由中介者或者生产者
因而被称为订阅/发布模式
可以看到,注册过的消费者总是在等待消息,无论消息来自中介者,或者直接来源于生产者,最终目的都是观察生产者
因此这种模式也被称为观察者模式
在实际生活中,最常见的就是订阅,无论是短信订阅,还是微信上的订阅号,我们都是在作为消费者,被动的接受消息(虽然很多时候都是单方面在骚扰我们。。。)
而在开发中,生产者负责生产消息,不关心如何被消费以及消费者是谁;消费者注册并接受消息,不关心消息的来源和时间;生产者和消费者并不需要时刻保持联系
其核心目的还是那个老生常谈的,解耦
1.介绍
适用目的:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
主要解决:一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。
何时使用:一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。
如何解决:使用面向对象技术,可以将这种依赖关系弱化。
关键代码:在抽象类里有一个集合存放观察者们。
应用实例:短信/公众号推送;平台的公告;股票与股民;
优点:
- 降低了目标与观察者之间的耦合关系,两者之间是抽象耦合关系
- 目标与观察者之间建立了一套触发机制。
缺点:
- 目标与观察者之间的依赖关系并没有完全解除,而且有可能出现循环引用。
- 当观察者对象很多时,通知的发布会花费很多时间,影响程序的效率
使用场景:
- 一个抽象模型有两个方面,其中一个方面依赖于另一个方面。将这些方面封装在独立的对象中使它们可以各自独立地改变和复用。
- 一个对象的改变将导致其他一个或多个对象也发生改变,而不知道具体有多少对象将发生改变,可以降低对象之间的耦合度。
- 一个对象必须通知其他对象,而并不知道这些对象是谁。
- 需要在系统中创建一个触发链,A对象的行为将影响B对象,B对象的行为将影响C对象……,可以使用观察者模式创建一种链式触发机制。
注意事项:
- 避免循环引用
- 异步以防止某一个观察者出错导致整个系统卡壳
2.结构
观察者模式的主要角色
- 抽象主题(Subject):也叫抽象目标类,它提供了一个用于保存观察者对象的聚集类和增加、删除观察者对象的方法,以及通知所有观察者的抽象方法。
- 具体主题(Concrete Subject):也叫具体目标类,它实现抽象目标中的通知方法,当具体主题的内部状态发生改变时,通知所有注册过的观察者对象。
- 抽象观察者(Observer):它是一个抽象类或接口,它包含了一个更新自己的抽象方法,当接到具体主题的更改通知时被调用。
- 具体观察者(Concrete Observer) :实现抽象观察者中定义的抽象方法,以便在得到目标的更改通知时更新自身的状态。
[图片上传失败...(image-253ff8-1624958200920)]
3.步骤
-
创建抽象目标
// 抽象目标 abstract class Subject { protected Collection<Observer> observers = new HashSet<>(); public void add(Observer observer) { observers.add(observer); } public void remove(Observer observer) { observers.remove(observer); } public abstract void notifyObserver(String msg); }
-
创建具体目标,继承抽象目标,并实现其虚拟方法
// 具体目标 class ConcreteSubject extends Subject { @Override public void notifyObserver(String msg) { System.out.println("具体目标发生改变!" + msg); observers.parallelStream().forEach(m -> m.response(msg)); } }
-
创建抽象观察者
// 抽象观察者 interface Observer { void response(String msg); }
-
创建具体观察者,实现抽象观察者接口
// 具体观察者A class ConcreteObserverA implements Observer { @Override public void response(String msg) { System.out.println("具体观察者A作出反应!" + msg); } } // 具体观察者B class ConcreteObserverB implements Observer { @Override public void response(String msg) { System.out.println("具体观察者B作出反应!" + msg); } }
测试代码
public class ObserverTest {
public static void main(String[] args) {
Subject subject = new ConcreteSubject();
Observer observerA = new ConcreteObserverA();
Observer observerB = new ConcreteObserverB();
subject.add(observerA);
subject.add(observerB);
subject.notifyObserver("hello world");
subject.remove(observerA);
subject.notifyObserver("你好");
}
}
运行结果
[图片上传失败...(image-2fd49f-1624958200920)]
4.扩展
实际上在Java的jdk中,已经通过 java.util.Observable 类和 java.util.Observer 接口定义了观察者模式,只要实现他们的子类即可编写观察者模式实例
但是两个已被jdk9弃用,官方推荐的做法是使用java.util.concurrent.Flow的API
下面会对这两种分别给出示例
4.1.Observable类 + Observer类
Observable类是抽象目标类,持有一个Vector向量,用于保存所有要通知的观察者对象。
主要方法如下:
-
void addObserver(Observer o)
:用于将新的观察者对象添加到向量中 -
void notifyObservers(Object arg)
:调用向量中所有观察者的update()
方法,通知他们数据已发生改变。通常先通知后放入的观察者;可以通过参数arg
向update()
传递数据 -
void setChange()
:用于设置一个布尔类型的内部标志位,注明目标对象已发生改变;当它为真时,notifyObservers
才会通知观察者
完整示例如下
package com.company.designPattern.observer;
import java.util.Date;
import java.util.Observable;
import java.util.Observer;
// 被观察者(具体目标)
class NumObservable extends Observable {
private int num = 0;
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
setChanged();
notifyObservers(new Date());
}
}
// 观察者A
class ObserverA implements Observer {
@Override
public void update(Observable o, Object arg) {
NumObservable object = (NumObservable) o;
System.out.println("ObserverA: Num has changed to " + object.getNum() + "\n Message: " + arg);
}
}
// 观察者B
class ObserverB implements Observer {
@Override
public void update(Observable o, Object arg) {
NumObservable object = (NumObservable) o;
System.out.println("ObserverB: Num has changed to " + object.getNum() + "\n Message: " + arg);
}
}
public class ObserverTest1 {
public static void main(String[] args) {
// 创建被观察者和观察者
NumObservable observable = new NumObservable();
Observer observerA = new ObserverA();
Observer observerB = new ObserverB();
// 关联
observable.addObserver(observerA);
observable.addObserver(observerB);
// 修改数据10
observable.setNum(10);
// 修改数据20
observable.setNum(20);
// 解除observerA的观察关联,修改数据30
observable.deleteObserver(observerA);
observable.setNum(30);
}
}
运行结果
[图片上传失败...(image-beeca4-1624958200920)]
前两次,按照后加入先通知的顺序,分别通知了A和B
第三次,解除了A的关联,所以只通知了B
4.2.Flow API
Flow API 是 Java 9 引入的响应式编程的接口,其中包含4个接口:
- Publisher:发布者,负责发布消息;
- Subscriber:订阅者,负责订阅处理消息;
- Subscription:订阅控制类,可用于发布者和订阅者之间通信;
- Processor:处理者,同时充当Publisher和Subscriber的角色。
请注意Flow API仅提供接口,并不提供具体实现,请自行按照需求实现
[图片上传失败...(image-285ca2-1624958200920)]
示例如下
-
定义一个类,用于订阅者和发布者之间传输数据
/** * 定义一个用于传递数据的类 */ class Message { public String msg = ""; public int leftCount = 0; public Message(String msg, int leftCount) { this.msg = msg; this.leftCount = leftCount; } }
可以根据自己的需求构造类的内容
-
定义一个发布者
/** * 自定义发布者 * 需要指定订阅者发送给发布者的数据类型 */ class MyPublisher implements Flow.Publisher<Message> { private int count = 0; // 计数器,从0开始 private final int maxCount; // 最大计数器 private int leftCount = 0; // 剩余计数 private final long interval; // 发送间隔 private boolean isCanceled; // 是否被取消 /** * 构造函数,根据需要初始化数据 * * @param interval 初始化发送间隔 * @param maxCount 最大计数器,达到数量后自动停止 */ public MyPublisher(long interval, int maxCount) { this.interval = interval; this.maxCount = maxCount; } /** * 订阅事件 * 在这里定义订阅者订阅后的操作,通常是在某条件下传递一个对象给订阅者 * 为方便演示,我们每隔一段时间向订阅者发送当前计数N次,N由订阅者传递给我们 * * @param subscriber */ @Override public void subscribe(Flow.Subscriber<? super Message> subscriber) { // 使用线程来异步执行每个订阅操作 new Thread(() -> { try { // 给订阅者分配一个控制器 subscriber.onSubscribe(new MySubscription()); // 循环执行核心操作 while (!isCanceled && count < maxCount) { // 当剩余数量大于0时,传递数据给订阅者 if (leftCount > 0) { subscriber.onNext(new Message(new Date() + ":" + ++count, --leftCount)); Thread.sleep(interval); } } // 结束订阅后,通知订阅者已结束 subscriber.onComplete(); } catch (Exception e) { // 出现错误时,通知订阅者发生错误 subscriber.onError(e); } }).start(); } /** * 自定义订阅控制类 * 重写request和cancel方法,提供给订阅者使用 */ private class MySubscription implements Flow.Subscription { /** * 接受到来自订阅者的数据请求 * * @param n 请求次数 */ @Override public void request(long n) { // 将次数累加到剩余次数中 leftCount += n; } /** * 接收到来自订阅者的取消请求 */ @Override public void cancel() { isCanceled = true; } } }
发布者的核心任务即
subscribe
,需要在这里定义订阅后的操作,通常异步执行 -
定义一个订阅者
/** * 自定义订阅者 * 需要指定从发布者接收到的数据类型 * 模拟事件:请求一定数量的数据,并且根据需要分批请求 */ class MySubscriber implements Flow.Subscriber<Message> { private Flow.Subscription subscription; // 用于持有来自订阅者的控制器(其实并不必要) private int perNum; // 每轮数量 private int count; // 计数器 /** * 构造函数,根据需要初始化数据 * * @param perNum 每轮订阅次数 * @param count 订阅次数 */ public MySubscriber(int perNum, int count) { this.perNum = perNum; this.count = count; } /** * 发起一轮请求 */ private void startNewRound() { System.out.println("Start a new round"); int requestCount = Math.min(count, perNum); count -= requestCount; subscription.request(requestCount); } /** * 订阅事件 * * @param subscription */ @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; // 发起第一轮请求 startNewRound(); } // 接受来自发布者的触发指令 @Override public void onNext(Message item) { System.out.println("receive message: " + item.msg); System.out.println("now left: " + item.leftCount); // 本轮结束的时候,开启下一轮 if (item.leftCount == 0 && count > 0) { startNewRound(); } } // 接受来自发布者的错误 @Override public void onError(Throwable throwable) { System.out.println("onError:" + throwable.getMessage()); } // 接受来自发布者的完成指令 @Override public void onComplete() { System.out.println("onComplete!"); } }
核心部分为
onSubscribe
和onNext
,分别用于发起第一次请求,和发起后续请求
客户端代码
public class FlowDemo {
public static void main(String[] args) {
MyPublisher publisher = new MyPublisher(500L, 10); // 每500ms发送一次,最多20次
MySubscriber subscriber = new MySubscriber(3, 20); //每轮发送3次,总共8轮
publisher.subscribe(subscriber);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
<img src="https://i.loli.net/2021/06/29/pwYSqAKDyaRWtl8.png" alt="image-20210629165110149" style="zoom:50%;" />
可以看到一共发起了4轮查询,最后一轮仅有1个数据
完整demo:https://gitee.com/echo_ye/practice/tree/master/src/main/java/com/company/designPattern/observer
后记
在实际使用中观察者模式相当常见,其最根本的生产者-消费者模型更是成为了面试必考题。。。
Flow的做法也是令人眼前一亮,提供全套的模型,但只提供接口,在保证模型的功能和效率的前提下,也尽可能的给我们开发者自由发挥的空间,可以在开发中尝试这种模式
作者:Echo_Ye
WX:Echo_YeZ
Email :echo_yezi@qq.com
个人站点:在搭了在搭了。。。(右键 - 新建文件夹)