看完上一篇文章之后, 相信大家对闭包代理有了个初步认识.
如果没看上一篇文章, 最好看一下, 闭包代理模式-初创篇
如果对 函数式接口和lambda表达式关系不熟悉的, 也可以看下这篇文章 函数式接口 与 lambda 表达式 的关系
因为闭包代理是针对函数式接口实例的代理, 代理的函数式接口又主要是通过lambda表达式初始化得到的,
而lambda结构很少, 不同的lambda表达式几乎只有参数和返回值的区别, 不同的结构之间又可以相互适配, 灵活连接在一起, 这就使得闭包代理及其强大.
本篇文章, 就让我来写一个可以直接或间接代理所有方法
的通用代理类.
闭包代理集成分析
以上一篇写的 OnceExecutorForConsumer
为例, 对于 OnceExecutorForConsumer
来说, 所有 Consumer 结构的方法(有一个参数, 无返回值), 它都能够代理, 代理后的方法在同一时间只能被一个线程执行.
OnceExecutorForConsumer
如下, 这里去掉了一些无关的注释和多线程的代码!
/**
* 将传入的 Consumer<T> then 作为 被代理函数式接口实例 传入, 将该对象的 onceExe 方法作为 代理方法实例 返回
*
* 逻辑: 完成 函数式接口实例 的 单线程 代理
*/
public class OnceExecutorForConsumer<T> {
private final Lock lock = new ReentrantLock();
/**
* 被代理的方法
*/
private final Consumer<T> then;
private Consumer<T> skip;
public OnceExecutorForConsumer(Consumer<T> then) {
this.then = then;
}
/**
* 代理方法
*
* <p>
* 每次调用新建一个线程对参数进行处理, 主线程不阻塞, 分线程竞争锁, 抢到运行 then, 抢不到运行 skip
* </p>
*/
public void onceExe(final T e) {
if (lock.tryLock()) {
try {
if (then != null) {
then.accept(e);
}
} finally {
lock.unlock();
}
} else {
if (skip != null) {
skip.accept(e);
}
}
}
public OnceExecutorForConsumer<T> setSkip(Consumer<T> skip) {
this.skip = skip;
return this;
}
}
函数式接口之间是可以相互转化的, 那么 OnceExecutorForConsumer
能够代理的就不只是 Consumer
而是所有有一个参数,无返回值
的方法它均可以进行代理.
关于函数式接口之间相互转化的知识可以看我另一篇文章
不通的函数式接口实例也可以通过简单的方式适配连接在一起, 这就使得通用闭包代理工具类成为了可能.
接下来就以 OnceExecutorForConsumer
为例, 让 OnceExecutorForConsumer
同时适配一切函数式接口, 而不仅仅是Consumer
的结构.
在适配之前, 先对函数式接口结构做一下分类
函数式接口结构单一, 再加上泛型, 就使得函数式接口基本结构相当少, 在此我依据结构将函数式接口的结构分为 4大类
结构特性 | 代表函数式接口 |
---|---|
无参数, 无返回值 | Runnable |
有参数, 无返回值 | Consumer |
无参数, 有返回值 | Supplier |
有参数, 有返回值 | Function |
所谓函数式接口的转换就是lambda表达式初始化的过程, 以上四类, 实际上也可以看作是lambda表达式的结构分类
之所以分为上面四大类, 是因为返回值就两种情况, 有返回值和无返回值, 但是参数数量可以有多个, 导致无法穷举, 在这里可以使用数组或Bean的形式将多个参数集成为1到两个来解决多个参数的转换问题.
抽象类 AbstractClosureProxy
OnceExecutorForConsumer
的作用是代理一个方法使之变成一个单线程的方法, 之后我可能设计出其它的代理方法, 如代理一个方法为之提供一个限流的功能, 或者对这个方法的调用进行一个缓存, 记录的功能等等.
这些代理类中一定会有很多冗余的方法, 既然如此, 就把里面的方法抽象出来, 把通用代码和扩展功能分开.
也就是说将 OnceExecutorForConsumer
变成两个类.
- AbstractClosureProxy: 作用是搭配出闭包代理的基本框架, 以及对函数式接口进行组合和适配集成.
- OnceExecClosureProxy: 作用是负责提供代理一个方法使之变成一个单线程的方法的功能
接下来就是这两个类了
-
AbstractClosureProxy: 作用是搭配出闭包代理的基本框架, 以及对函数式接口进行组合和适配集成.
package com.github.cosycode.common.ext.hub; import java.util.Objects; import java.util.function.*; /** * <b>Description : </b> 通用抽象闭包代理类 * <p> * <b>created in </b> 2021/4/5 * * @author CPF * @since 1.0 * * @param <T> 代理方法的函数式接口实例 * @param <P> 代理方法可能传入的参数类型 * @param <R> 代理方法可能的返回值类型 */ public abstract class AbstractClosureProxy<T, P, R> { /** * 代理方法的函数式接口实例 */ protected final T functional; /** * 函数式接口 then 对传入参数的操作函数, 如果该项为 null, 则默认 */ protected final BiFunction<T, P, R> biFunction; protected AbstractClosureProxy(T functional) { Objects.requireNonNull(functional, "functional 不能为 null"); this.functional = functional; this.biFunction = geneDefaultBiFunction(); } protected AbstractClosureProxy(T functional, BiConsumer<T, P> biConsumer) { this(functional, (t, p) -> { biConsumer.accept(t, p); return null; }); } protected AbstractClosureProxy(T functional, BiFunction<T, P, R> biFunction) { Objects.requireNonNull(functional, "functional 不能为 null"); this.functional = functional; if (biFunction == null) { this.biFunction = geneDefaultBiFunction(); } else { this.biFunction = biFunction; } } /** * 闭包代理方法: Function */ public abstract R closureFunction(P params); /** * 闭包代理方法: Consumer */ public void closureConsumer(P params) { closureFunction(params); } /** * 闭包代理方法: Supplier */ public R closureSupplier() { return closureFunction(null); } /** * 闭包代理方法: Runnable */ public void closureRunnable() { closureFunction(null); } /** * 根据 then 返回默认的闭包代理方法 * * @return 默认的闭包代理方法 */ @SuppressWarnings("unchecked") public T proxy() { if (functional instanceof Consumer) { final Consumer<P> proxy = this::closureConsumer; return (T) proxy; } else if (functional instanceof Function) { final Function<P, R> proxy = this::closureFunction; return (T) proxy; } else if (functional instanceof Supplier) { final Supplier<R> proxy = this::closureSupplier; return (T) proxy; } else if (functional instanceof Runnable) { final Runnable proxy = this::closureRunnable; return (T) proxy; } throw new IllegalArgumentException("参数 functional" + functional + " 必须是支持的函数式接口"); } /** * 返回自定义的闭包代理函数式接口实例 * * @param function 自定义闭包代理函数式接口实例返回函数 * @return 自定义的闭包代理函数式接口实例 * @param <V> 自定义的返回函数式接口类型 */ public <V> V proxy(Function<Function<P, R>, V> function) { return function.apply(this::closureFunction); } /** * 根据函数式接口实例, 生成默认的针对(函数接口then调用的处理方式)的处理函数 * * @return 函数接口then调用的处理方式 的处理函数 */ @SuppressWarnings("unchecked") private BiFunction<T, P, R> geneDefaultBiFunction() { if (functional instanceof Consumer) { return (t, p) -> { Consumer<P> consumer = (Consumer<P>) t; consumer.accept(p); return null; }; } else if (functional instanceof Function) { return (t, p) -> { Function<P, R> consumer = (Function<P, R>) t; return consumer.apply(p); }; } else if (functional instanceof Supplier) { return (t, p) -> { Supplier<R> supplier = ((Supplier<R>) t); return supplier.get(); }; } else if (functional instanceof Runnable) { return (t, p) -> { ((Runnable) t).run(); return null; }; } throw new IllegalArgumentException("参数 functional" + functional + " 必须是支持的函数式接口"); } }
-
OnceExecClosureProxy: 作用是负责提供代理一个方法使之变成一个单线程的方法的功能
经它代理过的方法(函数式接口实例)在同一时间内只能够由一个线程执行, 其他的线程则直接跳过.
package com.github.cosycode.common.ext.proxy; import com.github.cosycode.common.ext.hub.AbstractClosureProxy; import lombok.NonNull; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BiFunction; /** * <b>Description : </b> 作用就是使得在多个线程在调用同一个方法时, 同一时间只有一个能够执行, 其它的则执行 skip 方法, 如果 skip 方法为空, 则直接跳过. * <p> * <b>created in </b> 2021/4/6 * * @author CPF * @since 1.0 **/ public class OnceExecClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> { private final Lock lock = new ReentrantLock(); private T skip; public OnceExecClosureProxy(@NonNull T then) { super(then); } public OnceExecClosureProxy(@NonNull T then, @NonNull BiFunction<T, P, R> function) { super(then, function); } public OnceExecClosureProxy(@NonNull T then, @NonNull BiConsumer<T, P> biConsumer) { super(then, biConsumer); } public OnceExecClosureProxy<T, P, R> skip(T skip) { this.skip = skip; return this; } @Override public R closureFunction(P params) { if (lock.tryLock()) { try { if (functional != null) { return biFunction.apply(functional, params); } } finally { lock.unlock(); } } else { if (skip != null) { return biFunction.apply(skip, params); } } return null; } public static <T> T of(T then) { return new OnceExecClosureProxy<>(then).proxy(); } }
闭包代理工具类测试
对 AbstractClosureProxy
和 OnceExecClosureProxy
进行基础测试
package cn.cpf.test.proxy;
import com.github.cosycode.common.ext.hub.Throws;
import com.github.cosycode.common.ext.proxy.OnceExecClosureProxy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.stream.IntStream;
@Slf4j
public class OnceExecClosureProxyTest {
/**
* 当前方法在同一时间内只能够有一个线程执行业务逻辑
*/
private void runDemo(String msg) {
log.info("start {}", msg);
{
// ---- 业务逻辑(用睡眠10 ms 表示) ----
Throws.con(10, Thread::sleep);
}
log.info(" end {}", msg);
}
/**
* 普通情况下 开启8个线程执行
*/
@Test
public void test0() {
OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
// 正常情况下开启 8 个线程执行
IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(click::runDemo);
}
/**
* 经过 OnceExecClosureProxy 代理类后 开启8个线程执行
* 如果同一时间多个线程执行代理方法, 则将会只有一个线程能够执行被代理方法, 其它线程直接跳过
*/
@Test
public void test1() {
OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
// 对 OnceExecClosureProxyTest::test 方法代理后 开启8个线程运行
IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(OnceExecClosureProxy.of(click::runDemo));
}
/**
* 经过 OnceExecClosureProxy 代理类后 开启8个线程执行
* 如果同一时间多个线程执行代理方法, 则将会只有一个线程能够执行被代理方法, 其它线程 执行skip 方法
*/
@Test
public void test2() {
OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
// 对 OnceExecClosureProxyTest::test 方法代理后 开启8个线程运行
IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(
new OnceExecClosureProxy<>((Consumer<String>)click::runDemo).skip(id -> log.debug("id {} 跳过", id)).proxy()
);
}
}
运行 test0() 方法, 打印结果
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 2
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 5
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 0
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 6
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 1
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 7
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 4
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 3
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 6
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 7
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 3
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 2
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 0
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 5
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 4
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22) end 1
8个线程并发执行了 OnceExecClosureProxyTest::test 方法
运行 test1() 方法, 打印结果
[INFO] 信息 11:43:31[%1] (OnceExecClosureProxyTest.java:18) start 5
[INFO] 信息 11:43:31[%1] (OnceExecClosureProxyTest.java:23) end 5
8个线程并发执行, 但是只有一个线程成功执行了 OnceExecClosureProxyTest::test 方法
运行 test2() 方法, 打印结果
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 6 跳过
[INFO] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:18) start 5
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 3 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 1 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 4 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 2 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 0 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 7 跳过
[INFO] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:23) end 5
8个线程并发执行, 但是只有一个线程成功执行了 OnceExecClosureProxyTest::test 方法, 其余线程执行了 skip 方法打印了 日志
由上面的结果可以知道, OnceExecClosureProxy 成功发挥了既定代理作用
使用 AbstractClosureProxy 代理各种各样的方法
既然上面说过了要设计一个 能够适配 所有方法的代理类, 接下来就试试 AbstractClosureProxy 强大的代理语法.
照理说, 依然是应该使用 OnceExecClosureProxy 来去展示代理各种各样的方法的, 但是由于 OnceExecClosureProxy 打印的日志太多, 还涉及到多线程, 而这里只是需要展示下 AbstractClosureProxy 的语法代理功能, 所以此处我们不再使用 OnceExecClosureProxy, 这里我们使用另一个代理类 LogCallExecuteProxy
LogCallExecuteProxy
作用相对简单: 在被代理类调用的前后, 打印出调用的参数和返回的结果, 以及运行的时间等信息
package com.github.cosycode.common.ext.proxy;
import com.github.cosycode.common.ext.hub.AbstractClosureProxy;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
/**
* <b>Description : </b> 闭包代理类: 作用是 在被代理类调用的前后, 打印出调用的参数和返回的结果, 以及运行的时间等信息
* <p>
* <b>created in </b> 2021/4/6
*
* @author CPF
* @since 1.0
**/
@Slf4j
public class LogCallExecuteProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {
public LogCallExecuteProxy(T functional) {
super(functional);
}
public LogCallExecuteProxy(T functional, BiFunction<T, P, R> function) {
super(functional, function);
}
public LogCallExecuteProxy(T functional, BiConsumer<T, P> biConsumer) {
super(functional, biConsumer);
}
/**
* 打印 执行 Runnable 函数的执行时长信息, 以及调用参数与返回值
*/
@Override
public R closureFunction(P params) {
// 计算开始调用时间
final long start = System.nanoTime();
final long threadId = Thread.currentThread().getId();
// 参数
final String paramString;
if (params != null) {
paramString = params.getClass().isArray() ? Arrays.toString((Object[]) params) : params.toString();
} else {
paramString = "";
}
log.info("[{}] ==> call start ==> params : [{}]", threadId, paramString);
final R result = biFunction.apply(functional, params);
// 计算调用时间
final long inVal = System.nanoTime() - start;
log.info("[{}] ==> end, return: {}, consume time: {} ", threadId, result, inVal);
return result;
}
}
接下来使用 LogCallExecuteProxy
代理各种方法, 接口.
AbstractClosureProxy
代理简单方法
-
LogCallExecuteProxy
代理无参数, 无返回值的方法相关代码
public class FunctionTest { /** * 当前方法在同一时间内只能够有一个线程执行业务逻辑 */ private static void fun01() { log.info("call start {}", Thread.currentThread().getId()); { // ---- 业务逻辑(用睡眠10 ms 表示) ---- Throws.con(RandomUtils.nextInt(50, 200), Thread::sleep); } log.info("call start {}", Thread.currentThread().getId()); } @Test public void testProxyForFun01() { final Runnable proxy = new LogCallExecuteProxy<>((Runnable) FunctionTest::fun01).proxy(); proxy.run(); } }
运行结果
[INFO] 2021-04-15 15:42:35.730 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [] [INFO] 2021-04-15 15:42:35.730 [main] (FunctionTest.java:32) call start 1 [INFO] 2021-04-15 15:42:35.817 [main] (FunctionTest.java:37) call start 1 [INFO] 2021-04-15 15:42:35.817 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 85821400
-
LogCallExecuteProxy
代理有一个参数, 无返回值的方法相关代码
public class FunctionTest { public static void fun02(int i) { log.info("call fun2"); } @Test public void testProxyForFun02() { final Consumer<Integer> proxy = new LogCallExecuteProxy<>((Consumer<Integer>) FunctionTest::fun02).proxy(); proxy.accept(400); } }
运行结果
[INFO] 2021-04-15 15:45:17.673 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [400] [INFO] 2021-04-15 15:45:17.673 [main] (FunctionTest.java:47) call fun2 [INFO] 2021-04-15 15:45:17.673 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 7166800
-
LogCallExecuteProxy
代理有一个数组参数, 无返回值的方法相关代码
public class FunctionTest { public void fun03(Object[] objects) { log.info("call fun2"); } @Test public void testProxyForFun03() { Consumer<Object[]> fun03 = new FunctionTest()::fun03; Consumer<Object[]> proxy = new LogCallExecuteProxy<>(fun03).proxy(); proxy.accept(new Object[]{"haa", 400, new ArrayList<>()}); } }
运行结果
[INFO] 2021-04-15 15:46:28.701 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[haa, 400, []]] [INFO] 2021-04-15 15:46:28.709 [main] (FunctionTest.java:57) call fun2 [INFO] 2021-04-15 15:46:28.709 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 7498800
-
LogCallExecuteProxy
代理无参数, 有返回值的方法相关代码
public class FunctionTest { public String fun11() { log.info("call fun11"); return UUID.randomUUID().toString(); } @Test public void testProxyForFun11() { Supplier<String> fun11 = new FunctionTest()::fun11; Supplier<String> proxy = new LogCallExecuteProxy<>(fun11).proxy(); final String result = proxy.get(); System.out.println(result); } }
运行结果
[INFO] 2021-04-15 15:47:45.666 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [] [INFO] 2021-04-15 15:47:45.687 [main] (FunctionTest.java:68) call fun11 [INFO] 2021-04-15 15:47:46.181 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: 616200bd-531c-4b1a-9a08-b59058c49f76, consume time: 509690100 616200bd-531c-4b1a-9a08-b59058c49f76
-
LogCallExecuteProxy
代理有参数, 有返回值的方法相关代码
public class FunctionTest { public String fun12(String str) { log.info("call fun12"); return str + new Random().nextLong(); } @Test public void testProxyForFun12() { Function<String, String> fun12 = new FunctionTest()::fun12; Function<String, String> proxy = new LogCallExecuteProxy<>(fun12).proxy(); final String result = proxy.apply("lalalala"); System.out.println(result); } }
运行结果
[INFO] 2021-04-15 15:48:33.017 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [lalalala] [INFO] 2021-04-15 15:48:33.017 [main] (FunctionTest.java:81) call fun12 [INFO] 2021-04-15 15:48:33.017 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: lalalala6847791148629184306, consume time: 10911400 lalalala6847791148629184306
AbstractClosureProxy
代理复杂方法
AbstractClosureProxy
对于简单的接口, 是可以自动适配的. 因为我在 AbstractClosureProxy
添加了 四种基本函数式接口结构的自动适配, 类似于两个参数的函数是没有自动适配的, 后续扩展应该会把 BiConsumer
, BiFunction
两种结构添加上自动适配.
而对于非基本结构的函数式接口, 那么就需要将它们转化为 基本结构的函数式接口, 进行操作.
但是仅仅这样的话, 就会出现, 代理类对一个复杂函数式接口结构 TDemo
代理后, 返回的函数式接口却是一个简单的接口, 例如返回一个 Function<T, R>
结构的接口.
语法如下
Function<T, R> fun = new LogCallExecuteProxy<>(TDemo, 参数1)::closureFunction;
参数1 负责 将 TDemo 转化为基本结构的函数式接口
然后对这个Function<T, R>
结构的接口进行操作, 也能够实现对 TDemo
的代理.
但是这样的话就使得代理对象和被代理对象的类型不相同了, 关于这个我暂时没有想到比较好的处理方式, 暂时的解决方法类似如下公式.
T t = new LogCallExecuteProxy<>(TDemo, 参数1).proxy(参数2);
参数1 负责 将 TDemo 转化为基本结构的函数式接口
参数2 负责 将 基本的函数式接口再转换为 T 的类型.
接下来就使用这种方式代理几个复杂的接口
-
LogCallExecuteProxy
代理有两个参数, 无返回值的方法相关代码
public class FunctionTest { public void fun13(String str, Map<String, Object> map) { log.info("call fun13"); } @Test public void testProxyForFun13() { BiConsumer<String, Map<String, Object>> biConsumer = new FunctionTest()::fun13; final BiConsumer<String, Map<String, Object>> proxy = new LogCallExecuteProxy<>(biConsumer, (BiConsumer<String, Map<String, Object>> t, Object[] params) -> t.accept((String) params[0], (Map<String, Object>) params[1]) ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map})); Map<String, Object> map = new HashMap<>(); map.put("haha", LocalDate.now()); map.put("hehe", LocalTime.now()); proxy.accept("gaga", map); } }
运行结果
[INFO] 2021-04-15 15:53:33.453 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:53:33.453}]] [INFO] 2021-04-15 15:53:33.453 [main] (FunctionTest.java:92) call fun13 [INFO] 2021-04-15 15:53:33.453 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 10998200
-
LogCallExecuteProxy
代理有两个参数, 有返回值的方法相关代码
public class FunctionTest { public String fun14(String str, Map<String, Object> map) { log.info("call fun14"); return str + " -------- " + map.toString(); } @Test public void testProxyForFun14() { BiFunction<String, Map<String, Object>,String> biConsumer = new FunctionTest()::fun14; final BiFunction<String, Map<String, Object>,String> proxy = new LogCallExecuteProxy<>(biConsumer, (BiFunction<String, Map<String, Object>,String> t, Object[] params) -> t.apply((String) params[0], (Map<String, Object>) params[1]) ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map})); Map<String, Object> map = new HashMap<>(); map.put("haha", LocalDate.now()); map.put("hehe", LocalTime.now()); final String gaga = proxy.apply("gaga", map); System.out.println(gaga); } }
运行结果
[INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:58:02.699}]] [INFO] 2021-04-15 15:58:02.699 [main] (FunctionTest.java:112) call fun14 [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: gaga -------- {haha=2021-04-15, hehe=15:58:02.699}, consume time: 7540000 gaga -------- {haha=2021-04-15, hehe=15:58:02.699}
-
LogCallExecuteProxy
代理复杂接口的方法这个算是包含所有复杂的方式了, 所有种类的结构都可以通过这种方法去代理
例如我们想要代理这个方法
public class FunctionTest { public String fun14(String str, Map<String, Object> map) { log.info("call fun14"); return str + " -------- " + map.toString(); } }
对于上述的方法来说, 直接去代理是不够的, 应为我们在使用lambda表达式的时候, 需要一个和这个lambda表达式匹配的接口, 如下
@FunctionalInterface interface Function24 { Map<Integer, String> fun24(String str, boolean flag, Integer... integers); }
那么接下来的代理就成了这样
@Test public void testProxyForFun14() { BiFunction<String, Map<String, Object>,String> biConsumer = new FunctionTest()::fun14; final BiFunction<String, Map<String, Object>,String> proxy = new LogCallExecuteProxy<>(biConsumer, (BiFunction<String, Map<String, Object>,String> t, Object[] params) -> t.apply((String) params[0], (Map<String, Object>) params[1]) ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map})); Map<String, Object> map = new HashMap<>(); map.put("haha", LocalDate.now()); map.put("hehe", LocalTime.now()); final String gaga = proxy.apply("gaga", map); System.out.println(gaga); }
运行结果
[INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:58:02.699}]] [INFO] 2021-04-15 15:58:02.699 [main] (FunctionTest.java:112) call fun14 [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: gaga -------- {haha=2021-04-15, hehe=15:58:02.699}, consume time: 7540000 gaga -------- {haha=2021-04-15, hehe=15:58:02.699}
其它闭包代理实例
除了上述两个代理类 LogCallExecuteProxy 和 OnceExecClosureProxy 之外, 我还实现了两个个代理类
闭包代理实现限流:CurrentLimitClosureProxy
CurrentLimitClosureProxy 是一个闭包代理类, 它的作用是经它代理过的方法(函数式接口实例)在同一时间内只能够允许几个线程运行, 其它的则阻塞等待.
CurrentLimitClosureProxy.java
package com.github.cosycode.common.ext.proxy;
import com.github.cosycode.common.ext.hub.AbstractClosureProxy;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
/**
* <b>Description : </b> 限流闭包代理类
* <p>
* <b>created in </b> 2021/4/7
*
* @author CPF
* @since 1.2
**/
@Slf4j
public class CurrentLimitClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {
private final Semaphore semaphore;
public CurrentLimitClosureProxy(int limit, @NonNull T then) {
super(then);
check(limit);
semaphore = new Semaphore(limit);
}
public CurrentLimitClosureProxy(int limit, @NonNull T then, @NonNull BiFunction<T, P, R> function) {
super(then, function);
check(limit);
semaphore = new Semaphore(limit);
}
public CurrentLimitClosureProxy(int limit, @NonNull T then, @NonNull BiConsumer<T, P> biConsumer) {
super(then, biConsumer);
check(limit);
semaphore = new Semaphore(limit);
}
private void check(int limit) {
if (limit <= 0) {
throw new IllegalArgumentException("limit 不能小于 1");
}
}
@Override
public R closureFunction(P params) {
try {
semaphore.acquire();
biFunction.apply(functional, params);
} catch (InterruptedException e) {
log.error("获取信号失败 params: " + params, e);
Thread.currentThread().interrupt();
} finally {
semaphore.release();
}
return null;
}
}
测试示例:
import com.github.cosycode.common.ext.hub.Throws;
import com.github.cosycode.common.ext.proxy.CurrentLimitClosureProxy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.function.Consumer;
import java.util.stream.IntStream;
/**
* <b>Description : </b>
* <p>
* <b>created in </b> 2021/4/9
*
* @author CPF
* @since 1.0
**/
@Slf4j
public class CurrentLimitClosureProxyTest {
/**
* 当前方法在同一时间内只能够有一个线程执行业务逻辑
*/
private void runDemo(String msg) {
log.info("start {}", msg);
{
// ---- 业务逻辑(用睡眠1s 表示) ----
Throws.con(1000, Thread::sleep);
}
log.info(" end {}", msg);
}
/**
* 普通情况下 开启8个线程执行
*/
@Test
public void test0() {
CurrentLimitClosureProxyTest click = new CurrentLimitClosureProxyTest();
IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(click::runDemo);
}
/**
* 经过 CurrentLimitClosureProxy 处理后 开启8个线程执行
*/
@Test
public void test1() {
CurrentLimitClosureProxyTest click = new CurrentLimitClosureProxyTest();
// 通过 CurrentLimitClosureProxy 类代理, 同一时间只能够由两个线程执行方法
final Consumer<String> proxy = new CurrentLimitClosureProxy<>(2, (Consumer<String>) click::runDemo).proxy();
IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(proxy);
}
}
运行 test0() 方法, 打印结果
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 6
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 0
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 2
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 3
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 5
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 4
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 1
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 7
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 0
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 3
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 6
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 2
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 5
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 1
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 4
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31) end 7
结果可见, 所有线程同时执行.
运行 test1() 方法, 打印结果
[INFO] 2021-04-13 17:42:54.222 (CurrentLimitClosureProxyTest.java:26) start 2
[INFO] 2021-04-13 17:42:54.222 (CurrentLimitClosureProxyTest.java:26) start 5
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:31) end 2
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:31) end 5
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:26) start 6
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:26) start 1
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:31) end 1
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:31) end 6
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:26) start 7
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:26) start 3
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:31) end 7
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:31) end 3
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:26) start 0
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:26) start 4
[INFO] 2021-04-13 17:42:58.299 (CurrentLimitClosureProxyTest.java:31) end 4
[INFO] 2021-04-13 17:42:58.299 (CurrentLimitClosureProxyTest.java:31) end 0
结果可见, 同一时间内只有两个线程执行了 runDemo 方法, 其它的都阻塞了, 起到了限流的效果.
闭包代理实现单例模式:SingletonClosureProxy
作用是 将一个原本不是单例的方法 在经过代理后变成单例方法.
SingletonClosureProxy.java
package com.github.cosycode.common.ext.hub;
import lombok.NonNull;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
/**
* <b>Description : </b> 闭包代理单例模式:
* <p> 作用是 将一个原本不是单例的方法 在经过代理后变成单例方法.
* <b>created in </b> 2021/4/6
*
* @author CPF
* @since 1.0
**/
public class SingletonClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {
/**
* 单例模式仅仅保证引用可见性即可
*/
@SuppressWarnings("java:S3077")
private volatile R obj;
public SingletonClosureProxy(@NonNull T then) {
super(then);
}
public SingletonClosureProxy(@NonNull T then, @NonNull BiFunction<T, P, R> function) {
super(then, function);
}
public SingletonClosureProxy(@NonNull T then, @NonNull BiConsumer<T, P> biConsumer) {
super(then, biConsumer);
}
@Override
public R closureFunction(P params) {
if (obj == null) {
synchronized (this) {
if (obj == null) {
final R apply = biFunction.apply(functional, params);
obj = apply;
return apply;
}
}
}
return obj;
}
public static <R> Supplier<R> of(Supplier<R> supplier) {
return new SingletonClosureProxy<>(supplier).proxy();
}
}
测试示例:
// 一个简单的返回 uuid 的方法
public static String geneDemo() {
return UUID.randomUUID().toString();
}
@Test
public void test1() {
System.out.println("代理前");
Supplier<Object> newObject = SingletonClosureProxyTest::geneDemo;
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
newObject = SingletonClosureProxy.of(newObject);
System.out.println("代理后");
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
System.out.println(newObject.get());
}
运行 test1() 方法, 打印结果
代理前
026710a3-f1af-4064-8a29-252ca6183a0a
ca56dc28-3129-4ff8-8334-1ad5dd10bc40
bbb15588-123b-48a2-8912-aacfb14c4555
b8cca777-ad08-41d4-add6-b65e0f0f2fff
a367ff34-0c71-4083-a287-1da3fe6f498f
代理后
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
TODO
-
AbstractClosureProxy 功能升级
适应一些常用的JDK自带函数式接口, 可能会加上 UnaryOperator, BiFunction, BiConsumer 两个类
四种基本接口实际上已经拥有比较宽泛的功能了, 现有的
AbstractClosureProxy<T, P, R>
还是比较简单的;
如果加上 BiFunction, BiConsumer, 会需要再添加一个泛型, 再加上UnaryOperator
也没什么, 但是类似于IntConsumer, IntPredicate
这些基本类型我暂时不打算加上适配, 因为把这些类型添加上之后将会使AbstractClosureProxy
变得比较大, 而且Consumer<Integer>
和IntConsumer
实际上很容易转换, 加上IntConsumer, IntPredicate
暂时没什么必要性. -
抛出异常外部catch的闭包代理接口
JDK 自带函数式接口, 实际上是没有抛出异常的功能的, 之后可能会添加一套可向外抛出异常的函数式接口, 相关内容可参考 ``
-
序列化的函数式接口
序列化的函数式接口比没有序列化的函数式接口有更强的功能的, 但是也会付出一部分转换的性能代价, 后续这个可能会加上.
-
AbstractClosureProxy 抽象优化.
其实在对复杂接口进行代理的时候, 还是可以使用反射进行一些代码简化的, 但是简化之后会降低一些性能, 这个之后可能会添加这个简化的功能.
总之, 有时间再优化吧!
本篇文章应该包含了涉及到的完整的代码, 若可能有遗漏的地方, 可以查看我放到github上面的源码, 欢迎大家查看交流
源码在
common-lang
模块, 类路径参考文章的类的 package 路径.
同时我也将代码打包成jar, 发布到 maven 仓库, 欢迎大家直接使用和交流
git
github: https://github.com/cosycode/common-lang
repo
Apache Maven
<dependency>
<groupId>com.github.cosycode</groupId>
<artifactId>common-lang</artifactId>
<version>1.4</version>
</dependency>
gradle
implementation 'com.github.cosycode:common-lang:1.4'
觉得这篇文章好的, 请点个赞, 有兴趣也可以翻翻我其它的文章.