核心思想
将有数据依赖的任务分工执行每一项,依赖前一项的随后执行。
示例1
计算(B+C)*B/2
/**
* 上下文数据载体
*/
public class Context {
public double B;
public double C;
public double r;
public String org;
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class Plus extends Thread {
public static BlockingQueue<Context> bq = new LinkedBlockingDeque<>();
@Override
public void run() {
while (true) {
try {
Context ctx = bq.take();
ctx.r = ctx.B + ctx.C;
Multiply.bq.add(ctx);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class Multiply extends Thread {
public static BlockingQueue<Context> bq = new LinkedBlockingDeque<>();
@Override
public void run() {
while (true) {
try {
Context ctx = bq.take();
ctx.r = ctx.B * ctx.r;
Div.bq.add(ctx);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class Div extends Thread {
public static BlockingQueue<Context> bq = new LinkedBlockingDeque<>();
@Override
public void run() {
while (true) {
try {
Context ctx = bq.take();
ctx.r = ctx.r / 2;
System.out.println(ctx.org + "=" + ctx.r);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
import java.text.MessageFormat;
public class PStreamMain {
public static void main(String[] args) throws InterruptedException {
//启动线程之后等待数据
new Plus().start();
new Multiply().start();
new Div().start();
for (int i = 1; i <= 10; i++) {
for (int j = 1; j <= 10; j++) {
Context ctx = new Context();
ctx.B = i;
ctx.C = j;
ctx.org = MessageFormat.format("({0}+{1})*{0}/2",ctx.B,ctx.C);
Plus.bq.add(ctx);
}
}
}
}