我理解的是在观察者方面,在一定情况下,可以看作是一种事件驱动的形式。
由对象的行为触发,然后执行不同的行为驱动方式,有很好的解耦合性。
/**
* Created by kevin on 17/8/8.
*/
public interface IObserver {
abstract void notifyUser(Message message);
}
/**
* Created by kevin on 17/8/8.
*/
public abstract class AbstractObserver{
protected Message message;
protected abstract void notifyUser(Message message);
}
/**
* Created by kevin on 17/8/8.
*/
public class AObserverImpl extends AbstractObserver {
@Override
public void notifyUser(Message message) {
this.message = message;
this.message.addObserver(this);
System.out.println("A已接收到消息!"+ this.message.getMessage());
}
}
/**
* Created by kevin on 17/8/8.
*/
public class BObserverImpl extends AbstractObserver {
@Override
public void notifyUser(Message message) {
this.message = message;
this.message.addObserver(this);
System.out.println("B已接收到消息!"+this.message.getMessage());
}
}
/**
* Created by kevin on 17/8/8.
*/
public class CObserverImpl extends AbstractObserver {
@Override
public void notifyUser(Message message) {
this.message = message;
this.message.addObserver(this);
System.out.println("C已接收到消息!"+this.message.getMessage());
}
}
/**
* Created by kevin on 17/8/8.
*/
public class Message{
private List<AbstractObserver> observers = new ArrayList<AbstractObserver>();
public void addObserver(AbstractObserver observer){
observers.add(observer);
}
private String message;
public String getMessage(){
return message;
}
public void pushMessage(String message){
this.message = message;
}
public void notifyMessage() {
for(int i = 0; i < observers.size(); i++){
observers.get(i).notifyUser(this);
}
}
}
完全可以模拟这样的场景,进行读取,然后订阅发布
所以可以将nio通道内的数据一次分发给三个观察者,来模拟触发事件
/**
* Created by kevin on 17/8/10.
*/
public class RunThread extends Thread {
private int i;
private MappedByteBuffer mappedByteBuffer;
private int singleCpuPosition;
private FileChannel fileChannel;
public RunThread(int i, MappedByteBuffer mappedByteBuffer, int singleCpuPosition,FileChannel fileChannel ){
this.i = i;
this.mappedByteBuffer = mappedByteBuffer;
this.singleCpuPosition = singleCpuPosition;
this.fileChannel = fileChannel;
}
@Override
public void run() {
//消息
String mes = "";
//定义每次发送给客户端(a b c)的数据量大小
int len = 50;
//定义byte[]数组
byte[] bytes = null;
//内存映射对象MappedByteBuffer非常大,但是发送到客户端的数据是需要分段的
try {
int currentPosition = i*singleCpuPosition;
int currentPosition2 = (i+1)*singleCpuPosition;
if(Runtime.getRuntime().availableProcessors() == i+1) {
currentPosition2 = (int) fileChannel.size();
}
int size = currentPosition2 - currentPosition;
int count = size / len;
int remainder = size % len;
mappedByteBuffer.position((int) currentPosition);
for(int j = 0; j < count; j++){
bytes = new byte[len];
//将buffer中指定位置的信息复制给bytes
mappedByteBuffer.get(bytes);
mes = new String(bytes);
Message message = new Message();
message.pushMessage(mes);
new AObserverImpl().notifyUser(message);
new BObserverImpl().notifyUser(message);
new CObserverImpl().notifyUser(message);
//设置偏移量
mappedByteBuffer.position((j+1)*len);
}
if(remainder > 0 ){
byte[] bytes1 = new byte[remainder];
mappedByteBuffer.get(bytes1);
mes = new String(bytes1);
Message message = new Message();
message.pushMessage(mes);
new AObserverImpl().notifyUser(message);
new BObserverImpl().notifyUser(message);
new CObserverImpl().notifyUser(message);
}
mappedByteBuffer.flip();
mappedByteBuffer.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 模拟一套高效的文件读取工具,文件很大但不超过2g,并将获取的文件发送给a b c三人
*
* Created by kevin on 17/8/9.
*/
public class runNio {
public static void main(String[] args) {
try {
//读取文件
String root = System.getProperty("user.dir");
RandomAccessFile raf = new RandomAccessFile(root+"/src/main/resources/spider.txt","r");
//设置文件通道
FileChannel fileChannel = raf.getChannel();
//映射整个文件到该对象
final MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY,0,fileChannel.size());
//获取当前cpu核心数目
final int cpus = Runtime.getRuntime().availableProcessors();
//计算每个cpu应该从什么位置处理
final int singleCpuPosition = (int) (fileChannel.size()/cpus);
//开启cpus个线程发送数据
for(int i = 0; i < cpus; i++){
new RunThread(i,mappedByteBuffer,singleCpuPosition,fileChannel).start();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
自定义一个文件,可以尝试运行