java基于注解实现注册不同topic的处理器到消息总线

1: 自定义注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface MessageSubscribeHandler {

    @AliasFor("topic")
    String value() default "";

    @AliasFor("value")
    String topic() default "";
}

2: 定义处理器函数接口, 用户处理MQTT消息

@FunctionalInterface
public interface Handler {

  /**
   * Something has happened, so handle it.
   *
   * @param mqttMessage  the eventMessage to handle
   */
  void handle(MqttPublishMessage mqttMessage);
}

3: 函数接口实现:

@MessageSubscribeHandler("/+/+/event/+")
@Component
public class EventMessageHandler  implements Handler  {

  
    @Override
    public void handle(MqttPublishMessage mqttMessage) {
        log.debug("注册事件消息处理器");
    }
}


4: 消息总线定义

public interface MessageBus {

    //注册不同topic消息处理器
    void registerHandler(String topic, Handler handler);
}

5: 消息总线实现:
分布式消息总线可基于分布式的消息中间件(如emqx等)实现, 不建议使用单机版的消息总线,具体实现省略

6:扫描代码中的事件处理器, 并注册到消息总线中


@Slf4j
@Component
public class EventBusLogicRegisterProvider implements EventBusLogicRegister {

    @Override
    public void registerTopics(MessageBus messageBus) throws Exception {
        ResourceLoader resourceLoader = new PathMatchingResourcePatternResolver();
        ResourcePatternResolver resolver = ResourcePatternUtils.getResourcePatternResolver(resourceLoader);
        MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourceLoader);

        String path = ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX
                .concat(ClassUtils.convertClassNameToResourcePath("org.lbb.demo")
                        .concat("/**/*.class"));
        for (Resource resource: resolver.getResources(path)) {
            if (resource.isReadable()) {
                MetadataReader reader = readerFactory.getMetadataReader(resource);
                ClassMetadata classMetadata = reader.getClassMetadata();
                if (!classMetadata.isInterface()) {
                    Class handlerClass = ClassUtils.forName(classMetadata.getClassName(), null);
                    if (handlerClass.getAnnotation(MessageSubscribeHandler.class) != null) {
                        MessageSubscribeHandler messageSubscribeHandler = AnnotationUtils.findAnnotation(handlerClass, MessageSubscribeHandler.class);
                        String topic = Optional.of(messageSubscribeHandler.topic()).orElseThrow(() ->new NullPointerException("topic不能为空!"));
                        Handler handler = (Handler) SpringContextUtils.getBean(handlerClass);
                        messageBus.registerHandler(topic, handler);
                    }
                }
            }
        }
    }
}


7: 系统启动时自动注册消息处理器

@Configuration
@Slf4j
public class MessageRegisterConfiguration implements CommandLineRunner {

    @Autowired
    private MessageBus messageBus;

    @Autowired
    private EventBusLogicRegisterProvider eventBusLogicRegisterProvider;

  

    @Override
    public void run(String... args) throws Exception {
        eventBusLogicRegisterProvider.registerTopics(messageBus);
    }
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容