JAVA 设计模式之代理模式——闭包代理(集成篇)

看完上一篇文章之后, 相信大家对闭包代理有了个初步认识.
如果没看上一篇文章, 最好看一下, 闭包代理模式-初创篇
如果对 函数式接口和lambda表达式关系不熟悉的, 也可以看下这篇文章 函数式接口 与 lambda 表达式 的关系

因为闭包代理是针对函数式接口实例的代理, 代理的函数式接口又主要是通过lambda表达式初始化得到的,

而lambda结构很少, 不同的lambda表达式几乎只有参数和返回值的区别, 不同的结构之间又可以相互适配, 灵活连接在一起, 这就使得闭包代理及其强大.

本篇文章, 就让我来写一个可以直接或间接代理所有方法的通用代理类.

闭包代理集成分析

以上一篇写的 OnceExecutorForConsumer 为例, 对于 OnceExecutorForConsumer 来说, 所有 Consumer 结构的方法(有一个参数, 无返回值), 它都能够代理, 代理后的方法在同一时间只能被一个线程执行.

OnceExecutorForConsumer 如下, 这里去掉了一些无关的注释和多线程的代码!

/**
* 将传入的 Consumer<T> then 作为 被代理函数式接口实例 传入, 将该对象的 onceExe 方法作为 代理方法实例 返回
* 
* 逻辑: 完成 函数式接口实例 的 单线程 代理
*/
public class OnceExecutorForConsumer<T> {

   private final Lock lock = new ReentrantLock();

   /**
   * 被代理的方法
   */
   private final Consumer<T> then;

   private Consumer<T> skip;

   public OnceExecutorForConsumer(Consumer<T> then) {
      this.then = then;
   }

   /**
   * 代理方法
   * 
   * <p>
   *     每次调用新建一个线程对参数进行处理, 主线程不阻塞, 分线程竞争锁, 抢到运行 then, 抢不到运行 skip
   * </p>
   */
   public void onceExe(final T e) {
      if (lock.tryLock()) {
         try {
            if (then != null) {
                  then.accept(e);
            }
         } finally {
            lock.unlock();
         }
      } else {
         if (skip != null) {
            skip.accept(e);
         }
      }
   }

   public OnceExecutorForConsumer<T> setSkip(Consumer<T> skip) {
      this.skip = skip;
      return this;
   }
}

函数式接口之间是可以相互转化的, 那么 OnceExecutorForConsumer 能够代理的就不只是 Consumer 而是所有有一个参数,无返回值的方法它均可以进行代理.

关于函数式接口之间相互转化的知识可以看我另一篇文章

不通的函数式接口实例也可以通过简单的方式适配连接在一起, 这就使得通用闭包代理工具类成为了可能.

接下来就以 OnceExecutorForConsumer 为例, 让 OnceExecutorForConsumer 同时适配一切函数式接口, 而不仅仅是Consumer的结构.

在适配之前, 先对函数式接口结构做一下分类

函数式接口结构单一, 再加上泛型, 就使得函数式接口基本结构相当少, 在此我依据结构将函数式接口的结构分为 4大类

结构特性 代表函数式接口
无参数, 无返回值 Runnable
有参数, 无返回值 Consumer
无参数, 有返回值 Supplier
有参数, 有返回值 Function

所谓函数式接口的转换就是lambda表达式初始化的过程, 以上四类, 实际上也可以看作是lambda表达式的结构分类

之所以分为上面四大类, 是因为返回值就两种情况, 有返回值和无返回值, 但是参数数量可以有多个, 导致无法穷举, 在这里可以使用数组或Bean的形式将多个参数集成为1到两个来解决多个参数的转换问题.

抽象类 AbstractClosureProxy

OnceExecutorForConsumer 的作用是代理一个方法使之变成一个单线程的方法, 之后我可能设计出其它的代理方法, 如代理一个方法为之提供一个限流的功能, 或者对这个方法的调用进行一个缓存, 记录的功能等等.

这些代理类中一定会有很多冗余的方法, 既然如此, 就把里面的方法抽象出来, 把通用代码和扩展功能分开.

也就是说将 OnceExecutorForConsumer 变成两个类.

  1. AbstractClosureProxy: 作用是搭配出闭包代理的基本框架, 以及对函数式接口进行组合和适配集成.
  2. OnceExecClosureProxy: 作用是负责提供代理一个方法使之变成一个单线程的方法的功能

接下来就是这两个类了

  1. AbstractClosureProxy: 作用是搭配出闭包代理的基本框架, 以及对函数式接口进行组合和适配集成.

    package com.github.cosycode.common.ext.hub;
    
    import java.util.Objects;
    import java.util.function.*;
    
    /**
    * <b>Description : </b> 通用抽象闭包代理类
    * <p>
    * <b>created in </b> 2021/4/5
    *
    * @author CPF
    * @since 1.0
    *
    * @param <T> 代理方法的函数式接口实例
    * @param <P> 代理方法可能传入的参数类型
    * @param <R> 代理方法可能的返回值类型
    */
    public abstract class AbstractClosureProxy<T, P, R> {
    
       /**
       * 代理方法的函数式接口实例
       */
       protected final T functional;
    
       /**
       * 函数式接口 then 对传入参数的操作函数, 如果该项为 null, 则默认
       */
       protected final BiFunction<T, P, R> biFunction;
    
       protected AbstractClosureProxy(T functional) {
          Objects.requireNonNull(functional, "functional 不能为 null");
          this.functional = functional;
          this.biFunction = geneDefaultBiFunction();
       }
    
       protected AbstractClosureProxy(T functional, BiConsumer<T, P> biConsumer) {
          this(functional, (t, p) -> {
                biConsumer.accept(t, p);
                return null;
          });
       }
    
       protected AbstractClosureProxy(T functional, BiFunction<T, P, R> biFunction) {
          Objects.requireNonNull(functional, "functional 不能为 null");
          this.functional = functional;
          if (biFunction == null) {
                this.biFunction = geneDefaultBiFunction();
          } else {
                this.biFunction = biFunction;
          }
       }
    
       /**
       * 闭包代理方法: Function
       */
       public abstract R closureFunction(P params);
    
       /**
       * 闭包代理方法: Consumer
       */
       public void closureConsumer(P params) {
          closureFunction(params);
       }
    
       /**
       * 闭包代理方法: Supplier
       */
       public R closureSupplier() {
          return closureFunction(null);
       }
    
       /**
       * 闭包代理方法: Runnable
       */
       public void closureRunnable() {
          closureFunction(null);
       }
    
       /**
       * 根据 then 返回默认的闭包代理方法
       *
       * @return 默认的闭包代理方法
       */
       @SuppressWarnings("unchecked")
       public T proxy() {
          if (functional instanceof Consumer) {
                final Consumer<P> proxy = this::closureConsumer;
                return (T) proxy;
          } else if (functional instanceof Function) {
                final Function<P, R> proxy = this::closureFunction;
                return (T) proxy;
          } else if (functional instanceof Supplier) {
                final Supplier<R> proxy = this::closureSupplier;
                return (T) proxy;
          } else if (functional instanceof Runnable) {
                final Runnable proxy = this::closureRunnable;
                return (T) proxy;
          }
          throw new IllegalArgumentException("参数 functional" + functional + " 必须是支持的函数式接口");
       }
    
       /**
       * 返回自定义的闭包代理函数式接口实例
       *
       * @param function 自定义闭包代理函数式接口实例返回函数
       * @return 自定义的闭包代理函数式接口实例
       * @param <V> 自定义的返回函数式接口类型
       */
       public <V> V proxy(Function<Function<P, R>, V> function) {
          return function.apply(this::closureFunction);
       }
    
       /**
       * 根据函数式接口实例, 生成默认的针对(函数接口then调用的处理方式)的处理函数
       *
       * @return 函数接口then调用的处理方式 的处理函数
       */
       @SuppressWarnings("unchecked")
       private BiFunction<T, P, R> geneDefaultBiFunction() {
          if (functional instanceof Consumer) {
                return (t, p) -> {
                   Consumer<P> consumer = (Consumer<P>) t;
                   consumer.accept(p);
                   return null;
                };
          } else if (functional instanceof Function) {
                return (t, p) -> {
                   Function<P, R> consumer = (Function<P, R>) t;
                   return consumer.apply(p);
                };
          } else if (functional instanceof Supplier) {
                return (t, p) -> {
                   Supplier<R> supplier = ((Supplier<R>) t);
                   return supplier.get();
                };
          } else if (functional instanceof Runnable) {
                return (t, p) -> {
                   ((Runnable) t).run();
                   return null;
                };
          }
          throw new IllegalArgumentException("参数 functional" + functional + " 必须是支持的函数式接口");
       }
    
    }
    
  2. OnceExecClosureProxy: 作用是负责提供代理一个方法使之变成一个单线程的方法的功能

    经它代理过的方法(函数式接口实例)在同一时间内只能够由一个线程执行, 其他的线程则直接跳过.

    package com.github.cosycode.common.ext.proxy;
    
    import com.github.cosycode.common.ext.hub.AbstractClosureProxy;
    import lombok.NonNull;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.function.BiConsumer;
    import java.util.function.BiFunction;
    
    /**
    * <b>Description : </b> 作用就是使得在多个线程在调用同一个方法时, 同一时间只有一个能够执行, 其它的则执行 skip 方法, 如果 skip 方法为空, 则直接跳过.
    * <p>
    * <b>created in </b> 2021/4/6
    *
    * @author CPF
    * @since 1.0
    **/
    public class OnceExecClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {
    
       private final Lock lock = new ReentrantLock();
    
       private T skip;
    
       public OnceExecClosureProxy(@NonNull T then) {
          super(then);
       }
    
       public OnceExecClosureProxy(@NonNull T then, @NonNull BiFunction<T, P, R> function) {
          super(then, function);
       }
    
       public OnceExecClosureProxy(@NonNull T then, @NonNull BiConsumer<T, P> biConsumer) {
          super(then, biConsumer);
       }
    
       public OnceExecClosureProxy<T, P, R> skip(T skip) {
          this.skip = skip;
          return this;
       }
    
       @Override
       public R closureFunction(P params) {
          if (lock.tryLock()) {
                try {
                   if (functional != null) {
                      return biFunction.apply(functional, params);
                   }
                } finally {
                   lock.unlock();
                }
          } else {
                if (skip != null) {
                   return biFunction.apply(skip, params);
                }
          }
          return null;
       }
    
       public static <T> T of(T then) {
          return new OnceExecClosureProxy<>(then).proxy();
       }
    }
    
    

闭包代理工具类测试

AbstractClosureProxyOnceExecClosureProxy 进行基础测试

package cn.cpf.test.proxy;

import com.github.cosycode.common.ext.hub.Throws;
import com.github.cosycode.common.ext.proxy.OnceExecClosureProxy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.stream.IntStream;

@Slf4j
public class OnceExecClosureProxyTest {

   /**
   * 当前方法在同一时间内只能够有一个线程执行业务逻辑
   */
   private void runDemo(String msg) {
      log.info("start {}", msg);
      {
            // ---- 业务逻辑(用睡眠10 ms 表示) ----
            Throws.con(10, Thread::sleep);
      }
      log.info(" end  {}", msg);
   }

   /**
   * 普通情况下 开启8个线程执行
   */
   @Test
   public void test0() {
      OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
      // 正常情况下开启 8 个线程执行
      IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(click::runDemo);
   }

   /**
   * 经过 OnceExecClosureProxy 代理类后 开启8个线程执行
   * 如果同一时间多个线程执行代理方法, 则将会只有一个线程能够执行被代理方法, 其它线程直接跳过
   */
   @Test
   public void test1() {
      OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
      // 对 OnceExecClosureProxyTest::test 方法代理后 开启8个线程运行
      IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(OnceExecClosureProxy.of(click::runDemo));
   }

   /**
   * 经过 OnceExecClosureProxy 代理类后 开启8个线程执行
   * 如果同一时间多个线程执行代理方法, 则将会只有一个线程能够执行被代理方法, 其它线程 执行skip 方法
   */
   @Test
   public void test2() {
      OnceExecClosureProxyTest click = new OnceExecClosureProxyTest();
      // 对 OnceExecClosureProxyTest::test 方法代理后 开启8个线程运行
      IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(
               new OnceExecClosureProxy<>((Consumer<String>)click::runDemo).skip(id -> log.debug("id {} 跳过", id)).proxy()
      );
   }

}

运行 test0() 方法, 打印结果

[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 2
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 5
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 0
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 6
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 1
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 7
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 4
[INFO] 信息 11:38:26[%1] (OnceExecClosureProxyTest.java:17) start 3
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  6
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  7
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  3
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  2
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  0
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  5
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  4
[INFO] 信息 11:38:27[%1] (OnceExecClosureProxyTest.java:22)  end  1

8个线程并发执行了 OnceExecClosureProxyTest::test 方法

运行 test1() 方法, 打印结果

[INFO] 信息 11:43:31[%1] (OnceExecClosureProxyTest.java:18) start 5
[INFO] 信息 11:43:31[%1] (OnceExecClosureProxyTest.java:23)  end  5

8个线程并发执行, 但是只有一个线程成功执行了 OnceExecClosureProxyTest::test 方法

运行 test2() 方法, 打印结果

[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 6 跳过
[INFO] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:18) start 5
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 3 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 1 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 4 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 2 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 0 跳过
[DEBUG] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:56) id 7 跳过
[INFO] 信息 11:47:17[%1] (OnceExecClosureProxyTest.java:23)  end  5

8个线程并发执行, 但是只有一个线程成功执行了 OnceExecClosureProxyTest::test 方法, 其余线程执行了 skip 方法打印了 日志

由上面的结果可以知道, OnceExecClosureProxy 成功发挥了既定代理作用

使用 AbstractClosureProxy 代理各种各样的方法

既然上面说过了要设计一个 能够适配 所有方法的代理类, 接下来就试试 AbstractClosureProxy 强大的代理语法.

照理说, 依然是应该使用 OnceExecClosureProxy 来去展示代理各种各样的方法的, 但是由于 OnceExecClosureProxy 打印的日志太多, 还涉及到多线程, 而这里只是需要展示下 AbstractClosureProxy 的语法代理功能, 所以此处我们不再使用 OnceExecClosureProxy, 这里我们使用另一个代理类 LogCallExecuteProxy

LogCallExecuteProxy 作用相对简单: 在被代理类调用的前后, 打印出调用的参数和返回的结果, 以及运行的时间等信息

package com.github.cosycode.common.ext.proxy;

import com.github.cosycode.common.ext.hub.AbstractClosureProxy;
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

/**
* <b>Description : </b> 闭包代理类: 作用是 在被代理类调用的前后, 打印出调用的参数和返回的结果, 以及运行的时间等信息
* <p>
* <b>created in </b> 2021/4/6
*
* @author CPF
* @since 1.0
**/
@Slf4j
public class LogCallExecuteProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {

   public LogCallExecuteProxy(T functional) {
      super(functional);
   }

   public LogCallExecuteProxy(T functional, BiFunction<T, P, R> function) {
      super(functional, function);
   }

   public LogCallExecuteProxy(T functional, BiConsumer<T, P> biConsumer) {
      super(functional, biConsumer);
   }

   /**
   * 打印 执行 Runnable 函数的执行时长信息, 以及调用参数与返回值
   */
   @Override
   public R closureFunction(P params) {
      // 计算开始调用时间
      final long start = System.nanoTime();
      final long threadId = Thread.currentThread().getId();
      // 参数
      final String paramString;
      if (params != null) {
            paramString = params.getClass().isArray() ? Arrays.toString((Object[]) params) : params.toString();
      } else {
            paramString = "";
      }
      log.info("[{}] ==> call start ==> params : [{}]", threadId, paramString);
      final R result = biFunction.apply(functional, params);
      // 计算调用时间
      final long inVal = System.nanoTime() - start;
      log.info("[{}] ==> end, return: {}, consume time: {} ", threadId, result, inVal);
      return result;
   }

}

接下来使用 LogCallExecuteProxy 代理各种方法, 接口.

AbstractClosureProxy 代理简单方法

  1. LogCallExecuteProxy 代理无参数, 无返回值的方法

    相关代码

    public class FunctionTest {
       /**
       * 当前方法在同一时间内只能够有一个线程执行业务逻辑
       */
       private static void fun01() {
          log.info("call start {}", Thread.currentThread().getId());
          {
                // ---- 业务逻辑(用睡眠10 ms 表示) ----
                Throws.con(RandomUtils.nextInt(50, 200), Thread::sleep);
          }
          log.info("call start {}", Thread.currentThread().getId());
       }
    
       @Test
       public void testProxyForFun01() {
          final Runnable proxy = new LogCallExecuteProxy<>((Runnable) FunctionTest::fun01).proxy();
          proxy.run();
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:42:35.730 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : []
    [INFO] 2021-04-15 15:42:35.730 [main] (FunctionTest.java:32) call start 1
    [INFO] 2021-04-15 15:42:35.817 [main] (FunctionTest.java:37) call start 1
    [INFO] 2021-04-15 15:42:35.817 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 85821400 
    
  2. LogCallExecuteProxy 代理有一个参数, 无返回值的方法

    相关代码

    public class FunctionTest {
       public static void fun02(int i) {
          log.info("call fun2");
       }
    
       @Test
       public void testProxyForFun02() {
          final Consumer<Integer> proxy = new LogCallExecuteProxy<>((Consumer<Integer>) FunctionTest::fun02).proxy();
          proxy.accept(400);
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:45:17.673 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [400]
    [INFO] 2021-04-15 15:45:17.673 [main] (FunctionTest.java:47) call fun2
    [INFO] 2021-04-15 15:45:17.673 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 7166800 
    
  3. LogCallExecuteProxy 代理有一个数组参数, 无返回值的方法

    相关代码

    public class FunctionTest {
       public void fun03(Object[] objects) {
          log.info("call fun2");
       }
    
       @Test
       public void testProxyForFun03() {
          Consumer<Object[]> fun03 = new FunctionTest()::fun03;
          Consumer<Object[]> proxy = new LogCallExecuteProxy<>(fun03).proxy();
          proxy.accept(new Object[]{"haa", 400, new ArrayList<>()});
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:46:28.701 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[haa, 400, []]]
    [INFO] 2021-04-15 15:46:28.709 [main] (FunctionTest.java:57) call fun2
    [INFO] 2021-04-15 15:46:28.709 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 7498800 
    
  4. LogCallExecuteProxy 代理无参数, 有返回值的方法

    相关代码

    public class FunctionTest {
       public String fun11() {
          log.info("call fun11");
          return UUID.randomUUID().toString();
       }
    
       @Test
       public void testProxyForFun11() {
          Supplier<String> fun11 = new FunctionTest()::fun11;
          Supplier<String> proxy = new LogCallExecuteProxy<>(fun11).proxy();
          final String result = proxy.get();
          System.out.println(result);
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:47:45.666 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : []
    [INFO] 2021-04-15 15:47:45.687 [main] (FunctionTest.java:68) call fun11
    [INFO] 2021-04-15 15:47:46.181 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: 616200bd-531c-4b1a-9a08-b59058c49f76, consume time: 509690100 
    616200bd-531c-4b1a-9a08-b59058c49f76
    
  5. LogCallExecuteProxy 代理有参数, 有返回值的方法

    相关代码

    public class FunctionTest {
       public String fun12(String str) {
          log.info("call fun12");
          return str + new Random().nextLong();
       }
    
       @Test
       public void testProxyForFun12() {
          Function<String, String> fun12 = new FunctionTest()::fun12;
          Function<String, String> proxy = new LogCallExecuteProxy<>(fun12).proxy();
          final String result = proxy.apply("lalalala");
          System.out.println(result);
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:48:33.017 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [lalalala]
    [INFO] 2021-04-15 15:48:33.017 [main] (FunctionTest.java:81) call fun12
    [INFO] 2021-04-15 15:48:33.017 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: lalalala6847791148629184306, consume time: 10911400 
    lalalala6847791148629184306
    

AbstractClosureProxy 代理复杂方法

AbstractClosureProxy 对于简单的接口, 是可以自动适配的. 因为我在 AbstractClosureProxy 添加了 四种基本函数式接口结构的自动适配, 类似于两个参数的函数是没有自动适配的, 后续扩展应该会把 BiConsumer, BiFunction 两种结构添加上自动适配.

而对于非基本结构的函数式接口, 那么就需要将它们转化为 基本结构的函数式接口, 进行操作.

但是仅仅这样的话, 就会出现, 代理类对一个复杂函数式接口结构 TDemo 代理后, 返回的函数式接口却是一个简单的接口, 例如返回一个 Function<T, R> 结构的接口.

语法如下

Function<T, R> fun = new LogCallExecuteProxy<>(TDemo, 参数1)::closureFunction;

参数1 负责 将 TDemo 转化为基本结构的函数式接口

然后对这个Function<T, R> 结构的接口进行操作, 也能够实现对 TDemo 的代理.

但是这样的话就使得代理对象和被代理对象的类型不相同了, 关于这个我暂时没有想到比较好的处理方式, 暂时的解决方法类似如下公式.

T t = new LogCallExecuteProxy<>(TDemo, 参数1).proxy(参数2);

参数1 负责 将 TDemo 转化为基本结构的函数式接口
参数2 负责 将 基本的函数式接口再转换为 T 的类型.

接下来就使用这种方式代理几个复杂的接口

  1. LogCallExecuteProxy 代理有两个参数, 无返回值的方法

    相关代码

    public class FunctionTest {
       public void fun13(String str, Map<String, Object> map) {
          log.info("call fun13");
       }
    
       @Test
       public void testProxyForFun13() {
          BiConsumer<String, Map<String, Object>> biConsumer = new FunctionTest()::fun13;
    
          final BiConsumer<String, Map<String, Object>> proxy = new LogCallExecuteProxy<>(biConsumer,
                   (BiConsumer<String, Map<String, Object>> t, Object[] params) -> t.accept((String) params[0], (Map<String, Object>) params[1])
          ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map}));
    
          Map<String, Object> map = new HashMap<>();
          map.put("haha", LocalDate.now());
          map.put("hehe", LocalTime.now());
    
          proxy.accept("gaga", map);
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:53:33.453 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:53:33.453}]]
    [INFO] 2021-04-15 15:53:33.453 [main] (FunctionTest.java:92) call fun13
    [INFO] 2021-04-15 15:53:33.453 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: null, consume time: 10998200 
    
  2. LogCallExecuteProxy 代理有两个参数, 有返回值的方法

    相关代码

    public class FunctionTest {
       public String fun14(String str, Map<String, Object> map) {
          log.info("call fun14");
          return str + " -------- " + map.toString();
       }
    
       @Test
       public void testProxyForFun14() {
          BiFunction<String, Map<String, Object>,String> biConsumer = new FunctionTest()::fun14;
    
          final BiFunction<String, Map<String, Object>,String> proxy = new LogCallExecuteProxy<>(biConsumer,
                   (BiFunction<String, Map<String, Object>,String> t, Object[] params) -> t.apply((String) params[0], (Map<String, Object>) params[1])
          ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map}));
    
          Map<String, Object> map = new HashMap<>();
          map.put("haha", LocalDate.now());
          map.put("hehe", LocalTime.now());
    
          final String gaga = proxy.apply("gaga", map);
          System.out.println(gaga);
       }
    }
    

    运行结果

    [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:58:02.699}]]
    [INFO] 2021-04-15 15:58:02.699 [main] (FunctionTest.java:112) call fun14
    [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: gaga -------- {haha=2021-04-15, hehe=15:58:02.699}, consume time: 7540000 
    gaga -------- {haha=2021-04-15, hehe=15:58:02.699}
    
  3. LogCallExecuteProxy 代理复杂接口的方法

    这个算是包含所有复杂的方式了, 所有种类的结构都可以通过这种方法去代理

    例如我们想要代理这个方法

    public class FunctionTest {
       public String fun14(String str, Map<String, Object> map) {
          log.info("call fun14");
          return str + " -------- " + map.toString();
       }
    }
    

    对于上述的方法来说, 直接去代理是不够的, 应为我们在使用lambda表达式的时候, 需要一个和这个lambda表达式匹配的接口, 如下

    @FunctionalInterface
    interface Function24 {
       Map<Integer, String> fun24(String str, boolean flag, Integer... integers);
    }
    

    那么接下来的代理就成了这样

    @Test
    public void testProxyForFun14() {
       BiFunction<String, Map<String, Object>,String> biConsumer = new FunctionTest()::fun14;
    
       final BiFunction<String, Map<String, Object>,String> proxy = new LogCallExecuteProxy<>(biConsumer,
                (BiFunction<String, Map<String, Object>,String> t, Object[] params) -> t.apply((String) params[0], (Map<String, Object>) params[1])
       ).proxy(t -> (String s, Map<String, Object> map) -> t.apply(new Object[]{s, map}));
    
       Map<String, Object> map = new HashMap<>();
       map.put("haha", LocalDate.now());
       map.put("hehe", LocalTime.now());
    
       final String gaga = proxy.apply("gaga", map);
       System.out.println(gaga);
    }
    

    运行结果

    [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:48) [1] ==> call start ==> params : [[gaga, {haha=2021-04-15, hehe=15:58:02.699}]]
    [INFO] 2021-04-15 15:58:02.699 [main] (FunctionTest.java:112) call fun14
    [INFO] 2021-04-15 15:58:02.699 [main] (LogCallExecuteProxy.java:52) [1] ==> end, return: gaga -------- {haha=2021-04-15, hehe=15:58:02.699}, consume time: 7540000 
    gaga -------- {haha=2021-04-15, hehe=15:58:02.699}
    

其它闭包代理实例

除了上述两个代理类 LogCallExecuteProxy 和 OnceExecClosureProxy 之外, 我还实现了两个个代理类

闭包代理实现限流:CurrentLimitClosureProxy

CurrentLimitClosureProxy 是一个闭包代理类, 它的作用是经它代理过的方法(函数式接口实例)在同一时间内只能够允许几个线程运行, 其它的则阻塞等待.

CurrentLimitClosureProxy.java

package com.github.cosycode.common.ext.proxy;

import com.github.cosycode.common.ext.hub.AbstractClosureProxy;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

/**
* <b>Description : </b> 限流闭包代理类
* <p>
* <b>created in </b> 2021/4/7
*
* @author CPF
* @since 1.2
**/
@Slf4j
public class CurrentLimitClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {

   private final Semaphore semaphore;

   public CurrentLimitClosureProxy(int limit, @NonNull T then) {
      super(then);
      check(limit);
      semaphore = new Semaphore(limit);
   }

   public CurrentLimitClosureProxy(int limit, @NonNull T then, @NonNull BiFunction<T, P, R> function) {
      super(then, function);
      check(limit);
      semaphore = new Semaphore(limit);
   }

   public CurrentLimitClosureProxy(int limit, @NonNull T then, @NonNull BiConsumer<T, P> biConsumer) {
      super(then, biConsumer);
      check(limit);
      semaphore = new Semaphore(limit);
   }

   private void check(int limit) {
      if (limit <= 0) {
            throw new IllegalArgumentException("limit 不能小于 1");
      }
   }

   @Override
   public R closureFunction(P params) {
      try {
            semaphore.acquire();
            biFunction.apply(functional, params);
      } catch (InterruptedException e) {
            log.error("获取信号失败 params: " + params, e);
            Thread.currentThread().interrupt();
      } finally {
            semaphore.release();
      }
      return null;
   }
}

测试示例:

import com.github.cosycode.common.ext.hub.Throws;
import com.github.cosycode.common.ext.proxy.CurrentLimitClosureProxy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.function.Consumer;
import java.util.stream.IntStream;

/**
* <b>Description : </b>
* <p>
* <b>created in </b> 2021/4/9
*
* @author CPF
* @since 1.0
**/
@Slf4j
public class CurrentLimitClosureProxyTest {

   /**
   * 当前方法在同一时间内只能够有一个线程执行业务逻辑
   */
   private void runDemo(String msg) {
      log.info("start {}", msg);
      {
            // ---- 业务逻辑(用睡眠1s 表示) ----
            Throws.con(1000, Thread::sleep);
      }
      log.info(" end  {}", msg);
   }

   /**
   * 普通情况下 开启8个线程执行
   */
   @Test
   public void test0() {
      CurrentLimitClosureProxyTest click = new CurrentLimitClosureProxyTest();
      IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(click::runDemo);
   }

   /**
   * 经过 CurrentLimitClosureProxy 处理后 开启8个线程执行
   */
   @Test
   public void test1() {
      CurrentLimitClosureProxyTest click = new CurrentLimitClosureProxyTest();
      // 通过 CurrentLimitClosureProxy 类代理, 同一时间只能够由两个线程执行方法
      final Consumer<String> proxy = new CurrentLimitClosureProxy<>(2, (Consumer<String>) click::runDemo).proxy();
      IntStream.range(0, 8).parallel().mapToObj(Integer::toString).forEach(proxy);
   }
}

运行 test0() 方法, 打印结果

[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 6
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 0
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 2
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 3
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 5
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 4
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 1
[INFO] 2021-04-13 17:41:24.262 (CurrentLimitClosureProxyTest.java:26) start 7
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  0
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  3
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  6
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  2
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  5
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  1
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  4
[INFO] 2021-04-13 17:41:25.293 (CurrentLimitClosureProxyTest.java:31)  end  7

结果可见, 所有线程同时执行.

运行 test1() 方法, 打印结果

[INFO] 2021-04-13 17:42:54.222 (CurrentLimitClosureProxyTest.java:26) start 2
[INFO] 2021-04-13 17:42:54.222 (CurrentLimitClosureProxyTest.java:26) start 5
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:31)  end  2
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:31)  end  5
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:26) start 6
[INFO] 2021-04-13 17:42:55.253 (CurrentLimitClosureProxyTest.java:26) start 1
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:31)  end  1
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:31)  end  6
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:26) start 7
[INFO] 2021-04-13 17:42:56.268 (CurrentLimitClosureProxyTest.java:26) start 3
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:31)  end  7
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:31)  end  3
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:26) start 0
[INFO] 2021-04-13 17:42:57.284 (CurrentLimitClosureProxyTest.java:26) start 4
[INFO] 2021-04-13 17:42:58.299 (CurrentLimitClosureProxyTest.java:31)  end  4
[INFO] 2021-04-13 17:42:58.299 (CurrentLimitClosureProxyTest.java:31)  end  0

结果可见, 同一时间内只有两个线程执行了 runDemo 方法, 其它的都阻塞了, 起到了限流的效果.

闭包代理实现单例模式:SingletonClosureProxy

作用是 将一个原本不是单例的方法 在经过代理后变成单例方法.

SingletonClosureProxy.java

package com.github.cosycode.common.ext.hub;

import lombok.NonNull;

import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/**
* <b>Description : </b> 闭包代理单例模式:
* <p> 作用是 将一个原本不是单例的方法 在经过代理后变成单例方法.
* <b>created in </b> 2021/4/6
*
* @author CPF
* @since 1.0
**/
public class SingletonClosureProxy<T, P, R> extends AbstractClosureProxy<T, P, R> {

   /**
   * 单例模式仅仅保证引用可见性即可
   */
   @SuppressWarnings("java:S3077")
   private volatile R obj;

   public SingletonClosureProxy(@NonNull T then) {
      super(then);
   }

   public SingletonClosureProxy(@NonNull T then, @NonNull BiFunction<T, P, R> function) {
      super(then, function);
   }

   public SingletonClosureProxy(@NonNull T then, @NonNull BiConsumer<T, P> biConsumer) {
      super(then, biConsumer);
   }

   @Override
   public R closureFunction(P params) {
      if (obj == null) {
            synchronized (this) {
               if (obj == null) {
                  final R apply = biFunction.apply(functional, params);
                  obj = apply;
                  return apply;
               }
            }
      }
      return obj;
   }

   public static <R> Supplier<R> of(Supplier<R> supplier) {
      return new SingletonClosureProxy<>(supplier).proxy();
   }

}

测试示例:

   // 一个简单的返回 uuid 的方法
   public static String geneDemo() {
      return UUID.randomUUID().toString();
   }

   @Test
   public void test1() {
      System.out.println("代理前");
      Supplier<Object> newObject = SingletonClosureProxyTest::geneDemo;
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());

      newObject = SingletonClosureProxy.of(newObject);
      System.out.println("代理后");
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());
      System.out.println(newObject.get());
   }

运行 test1() 方法, 打印结果

代理前
026710a3-f1af-4064-8a29-252ca6183a0a
ca56dc28-3129-4ff8-8334-1ad5dd10bc40
bbb15588-123b-48a2-8912-aacfb14c4555
b8cca777-ad08-41d4-add6-b65e0f0f2fff
a367ff34-0c71-4083-a287-1da3fe6f498f
代理后
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a
44e04209-5f55-48b8-b4dd-ff920a5eab9a

TODO

  1. AbstractClosureProxy 功能升级

    适应一些常用的JDK自带函数式接口, 可能会加上 UnaryOperator, BiFunction, BiConsumer 两个类

    四种基本接口实际上已经拥有比较宽泛的功能了, 现有的AbstractClosureProxy<T, P, R> 还是比较简单的;
    如果加上 BiFunction, BiConsumer, 会需要再添加一个泛型, 再加上 UnaryOperator 也没什么, 但是类似于 IntConsumer, IntPredicate 这些基本类型我暂时不打算加上适配, 因为把这些类型添加上之后将会使AbstractClosureProxy 变得比较大, 而且 Consumer<Integer>IntConsumer 实际上很容易转换, 加上IntConsumer, IntPredicate暂时没什么必要性.

  2. 抛出异常外部catch的闭包代理接口

    JDK 自带函数式接口, 实际上是没有抛出异常的功能的, 之后可能会添加一套可向外抛出异常的函数式接口, 相关内容可参考 ``

  3. 序列化的函数式接口

    序列化的函数式接口比没有序列化的函数式接口有更强的功能的, 但是也会付出一部分转换的性能代价, 后续这个可能会加上.

  4. AbstractClosureProxy 抽象优化.

    其实在对复杂接口进行代理的时候, 还是可以使用反射进行一些代码简化的, 但是简化之后会降低一些性能, 这个之后可能会添加这个简化的功能.

总之, 有时间再优化吧!


本篇文章应该包含了涉及到的完整的代码, 若可能有遗漏的地方, 可以查看我放到github上面的源码, 欢迎大家查看交流

源码在 common-lang 模块, 类路径参考文章的类的 package 路径.

同时我也将代码打包成jar, 发布到 maven 仓库, 欢迎大家直接使用和交流

git

github: https://github.com/cosycode/common-lang

repo

Apache Maven

<dependency>
   <groupId>com.github.cosycode</groupId>
   <artifactId>common-lang</artifactId>
   <version>1.4</version>
</dependency>

gradle

implementation 'com.github.cosycode:common-lang:1.4'

觉得这篇文章好的, 请点个赞, 有兴趣也可以翻翻我其它的文章.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容