聊聊如何实现一个带幂等模板的Kafka消费者

前言

不知道大家有没有这样的体验,你跟你团队的成员,宣导一些开发时注意事项,比如在使用消息队列时,在消费端处理消息时,需根据业务场景,考虑一下幂等。后面走查代码的时,会发现一些资浅的开发,在需要幂等判断的场景的情况下,仍然没做幂等判断。既然宣导无效,就干脆实现一个带幂等模板的消费者,然后开发基于这个模板进行消费端业务处理。本文就以spring-kafka举例,聊聊如何实现一个带幂等模板的kafka消费者

实现步骤

1、kafka自动提交改为手动提交

spring:
    kafka:
        consumer:
            #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
            enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}

2、定义消费端模板抽象基类

@Slf4j
public abstract class BaseComusmeListener {

    @KafkaHandler
    public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic,
                              @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){
        KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey);

        boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad);
        if(isRepeateConsume){
            log.warn("messageKey:【{}】,topic:【{}】存在重复消息数据-->【{}】",receivedMessageKey,receivedTopic,data);
            //手工确认
            ack.acknowledge();
            return;
        }

        if(doBiz(kafkaComsumePayLoad)){
            //手工确认
            ack.acknowledge();
        }


    }

    /**
     * 是否重复消费
     * @param kafkaComsumePayLoad
     * @return
     */
    public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad);

    /**
     * 业务处理
     * @param kafkaComsumerPayLoad
     */
    public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad);


    private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){
        return KafkaComsumePayLoad.builder()
                .data(data)
                .receivedTimestamp(receivedTimestamp)
                .receivedTopic(receivedTopic)
                .receivedMessageKey(receivedMessageKey)
                .build();
    }
}

3、自定义监听注解【可选】

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
@Documented
@Component
public @interface LybGeekKafkaListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")
    String containerFactory() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics() default {};


    @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")
    String topicPattern() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")
    TopicPartition[] topicPartitions() default {};


    @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")
    String containerGroup() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")
    String errorHandler() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")
    boolean idIsGroup() default true;


    @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")
    String clientIdPrefix() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "beanRef")
    String beanRef() default "__listener";


    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")
    String autoStartup() default "";


    @AliasFor(annotation = KafkaListener.class, attribute = "properties")
    String[] properties() default {};


    @AliasFor(annotation = Component.class, attribute = "value")
    String value() default "";

}

3、重写KafkaListener注解后置处理器【可选】

注: 因示例项目的springboot版本比较低,直接使用@LybGeekKafkaListener不起作用

public class LybGeekKafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {

    private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";

    /**
     * The bean name of the default {@link KafkaListenerContainerFactory}.
     */
    public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

    private final Log logger = LogFactory.getLog(getClass());

    private final ListenerScope listenerScope = new ListenerScope();

    private KafkaListenerEndpointRegistry endpointRegistry;

    private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;

    private DefaultListableBeanFactory beanFactory;

    private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
            new KafkaHandlerMethodFactoryAdapter();

    private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();

    private final AtomicInteger counter = new AtomicInteger();

    private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();

    private BeanExpressionContext expressionContext;

    private Charset charset = StandardCharsets.UTF_8;

    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE;
    }

    /**
     * Set the {@link KafkaListenerEndpointRegistry} that will hold the created
     * endpoint and manage the lifecycle of the related listener container.
     * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.
     */
    public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
        this.endpointRegistry = endpointRegistry;
    }

    /**
     * Set the name of the {@link KafkaListenerContainerFactory} to use by default.
     * <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.
     * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.
     */
    public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
        this.defaultContainerFactoryBeanName = containerFactoryBeanName;
    }

    /**
     * Set the {@link MessageHandlerMethodFactory} to use to configure the message
     * listener responsible to serve an endpoint detected by this processor.
     * <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it
     * can be configured further to support additional method arguments
     * or to customize conversion and validation support. See
     * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.
     * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
     */
    public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {
        this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    }

    /**
     * Making a {@link BeanFactory} available is optional; if not set,
     * {@link KafkaListenerConfigurer} beans won't get autodetected and an
     * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.
     * @param beanFactory the {@link BeanFactory} to be used.
     */
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
                    this.listenerScope);
        }
    }

    /**
     * Set a charset to use when converting byte[] to String in method arguments.
     * Default UTF-8.
     * @param charset the charset.
     * @since 2.2
     */
    public void setCharset(Charset charset) {
        Assert.notNull(charset, "'charset' cannot be null");
        this.charset = charset;
    }

    @Override
    public void afterSingletonsInstantiated() {
        this.registrar.setBeanFactory(this.beanFactory);

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, KafkaListenerConfigurer> instances =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
            for (KafkaListenerConfigurer configurer : instances.values()) {
                configurer.configureKafkaListeners(this.registrar);
            }
        }

        if (this.registrar.getEndpointRegistry() == null) {
            if (this.endpointRegistry == null) {
                Assert.state(this.beanFactory != null,
                        "BeanFactory must be set to find endpoint registry by bean name");
                this.endpointRegistry = this.beanFactory.getBean(
                        KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                        KafkaListenerEndpointRegistry.class);
            }
            this.registrar.setEndpointRegistry(this.endpointRegistry);
        }

        if (this.defaultContainerFactoryBeanName != null) {
            this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
        }

        // Set the custom handler method factory once resolved by the configurer
        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
        if (handlerMethodFactory != null) {
            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
        }
        else {
            addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
        }

        // Actually register all listeners
        this.registrar.afterPropertiesSet();

        beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);

    }


    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            Collection<LybGeekKafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List<Method> multiMethods = new ArrayList<>();
            Map<Method, Set<LybGeekKafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<LybGeekKafkaListener>>) method -> {
                        Set<LybGeekKafkaListener> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(bean.getClass());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No @LybGeekKafkaListener annotations found on bean type: " + bean.getClass());
                }
            }
            else {
                // Non-empty set of methods
                for (Map.Entry<Method, Set<LybGeekKafkaListener>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (LybGeekKafkaListener listener : entry.getValue()) {
                        processKafkaListener(listener, method, bean, beanName);
                    }
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(annotatedMethods.size() + " @LybGeekKafkaListener methods processed on bean '"
                            + beanName + "': " + annotatedMethods);
                }
            }
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }

    /*
     * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
     */
    private Collection<LybGeekKafkaListener> findListenerAnnotations(Class<?> clazz) {
        Set<LybGeekKafkaListener> listeners = new HashSet<>();
        LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);
        if (ann != null) {
            listeners.add(ann);
        }

        return listeners;
    }

    /*
     * AnnotationUtils.getRepeatableAnnotations does not look at interfaces
     */
    private Set<LybGeekKafkaListener> findListenerAnnotations(Method method) {
        Set<LybGeekKafkaListener> listeners = new HashSet<>();
        LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);
        if (ann != null) {
            listeners.add(ann);
        }

        return listeners;
    }

    private void processMultiMethodListeners(Collection<LybGeekKafkaListener> classLevelListeners, List<Method> multiMethods,
            Object bean, String beanName) {

        List<Method> checkedMethods = new ArrayList<>();
        Method defaultMethod = null;
        for (Method method : multiMethods) {
            Method checked = checkProxy(method, bean);
            KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
            if (annotation != null && annotation.isDefault()) {
                final Method toAssert = defaultMethod;
                Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
                        + toAssert.toString() + " and " + method.toString());
                defaultMethod = checked;
            }
            checkedMethods.add(checked);
        }
        for (LybGeekKafkaListener classLevelListener : classLevelListeners) {
            MultiMethodKafkaListenerEndpoint<K, V> endpoint =
                    new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
            processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
        }
    }

    protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setMethod(methodToUse);
        processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
    }

    private Method checkProxy(Method methodArg, Object bean) {
        Method method = methodArg;
        if (AopUtils.isJdkDynamicProxy(bean)) {
            try {
                // Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->
                // is it also present on the proxy itself?
                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
                Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
                for (Class<?> iface : proxiedInterfaces) {
                    try {
                        method = iface.getMethod(method.getName(), method.getParameterTypes());
                        break;
                    }
                    catch (NoSuchMethodException noMethod) {
                    }
                }
            }
            catch (SecurityException ex) {
                ReflectionUtils.handleReflectionException(ex);
            }
            catch (NoSuchMethodException ex) {
                throw new IllegalStateException(String.format(
                        "@LybGeekKafkaListener method '%s' found on bean target class '%s', " +
                                "but not found in any interface(s) for bean JDK proxy. Either " +
                                "pull the method up to an interface or switch to subclass (CGLIB) " +
                                "proxies by setting proxy-target-class/proxyTargetClass " +
                                "attribute to 'true'", method.getName(),
                        method.getDeclaringClass().getSimpleName()), ex);
            }
        }
        return method;
    }

    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, LybGeekKafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        String beanRef = kafkaListener.beanRef();
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.addListener(beanRef, bean);
        }
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
        endpoint.setTopics(resolveTopics(kafkaListener));
        endpoint.setTopicPattern(resolvePattern(kafkaListener));
        endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
        String group = kafkaListener.containerGroup();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }
        String concurrency = kafkaListener.concurrency();
        if (StringUtils.hasText(concurrency)) {
            endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
        }
        String autoStartup = kafkaListener.autoStartup();
        if (StringUtils.hasText(autoStartup)) {
            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
        }
        resolveKafkaProperties(endpoint, kafkaListener.properties());

        KafkaListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
                        + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
                        + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        endpoint.setBeanFactory(this.beanFactory);
        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
        }
        this.registrar.registerEndpoint(endpoint, factory);
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.removeListener(beanRef);
        }
    }

    private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
        if (propertyStrings.length > 0) {
            Properties properties = new Properties();
            for (String property : propertyStrings) {
                String value = resolveExpressionAsString(property, "property");
                if (value != null) {
                    try {
                        properties.load(new StringReader(value));
                    }
                    catch (IOException e) {
                        this.logger.error("Failed to load property " + property + ", continuing...", e);
                    }
                }
            }
            endpoint.setConsumerProperties(properties);
        }
    }

    private String getEndpointId(LybGeekKafkaListener kafkaListener) {
        if (StringUtils.hasText(kafkaListener.id())) {
            return resolveExpressionAsString(kafkaListener.id(), "id");
        }
        else {
            return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
        }
    }

    private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {
        String groupId = null;
        if (StringUtils.hasText(kafkaListener.groupId())) {
            groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
        }
        if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {
            groupId = id;
        }
        return groupId;
    }

    private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {
        TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        if (topicPartitions.length > 0) {
            for (TopicPartition topicPartition : topicPartitions) {
                result.addAll(resolveTopicPartitionsList(topicPartition));
            }
        }
        return result.toArray(new TopicPartitionInitialOffset[0]);
    }

    private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {
        String[] topics = kafkaListener.topics();
        List<String> result = new ArrayList<>();
        if (topics.length > 0) {
            for (String topic1 : topics) {
                Object topic = resolveExpression(topic1);
                resolveAsString(topic, result);
            }
        }
        return result.toArray(new String[0]);
    }

    private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {
        Pattern pattern = null;
        String text = kafkaListener.topicPattern();
        if (StringUtils.hasText(text)) {
            Object resolved = resolveExpression(text);
            if (resolved instanceof Pattern) {
                pattern = (Pattern) resolved;
            }
            else if (resolved instanceof String) {
                pattern = Pattern.compile((String) resolved);
            }
            else if (resolved != null) {
                throw new IllegalStateException(
                        "topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
            }
        }
        return pattern;
    }

    private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
        Object topic = resolveExpression(topicPartition.topic());
        Assert.state(topic instanceof String,
                "topic in @TopicPartition must resolve to a String, not " + topic.getClass());
        Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
        String[] partitions = topicPartition.partitions();
        PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
        Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
                "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        for (String partition : partitions) {
            resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
        }

        for (PartitionOffset partitionOffset : partitionOffsets) {
            TopicPartitionInitialOffset topicPartitionOffset =
                    new TopicPartitionInitialOffset((String) topic,
                            resolvePartition(topic, partitionOffset),
                            resolveInitialOffset(topic, partitionOffset),
                            isRelative(topic, partitionOffset));
            if (!result.contains(topicPartitionOffset)) {
                result.add(topicPartitionOffset);
            }
            else {
                throw new IllegalArgumentException(
                        String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
                                topicPartitionOffset));
            }
        }
        return result;
    }

    private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {
        Object partitionValue = resolveExpression(partitionOffset.partition());
        Integer partition;
        if (partitionValue instanceof String) {
            Assert.state(StringUtils.hasText((String) partitionValue),
                    "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
            partition = Integer.valueOf((String) partitionValue);
        }
        else if (partitionValue instanceof Integer) {
            partition = (Integer) partitionValue;
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
                    topic, partitionOffset.partition(), partitionValue.getClass()));
        }
        return partition;
    }

    private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
        Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
        Long initialOffset;
        if (initialOffsetValue instanceof String) {
            Assert.state(StringUtils.hasText((String) initialOffsetValue),
                    "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
            initialOffset = Long.valueOf((String) initialOffsetValue);
        }
        else if (initialOffsetValue instanceof Long) {
            initialOffset = (Long) initialOffsetValue;
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
                    topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
        }
        return initialOffset;
    }

    private boolean isRelative(Object topic, PartitionOffset partitionOffset) {
        Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
        Boolean relativeToCurrent;
        if (relativeToCurrentValue instanceof String) {
            relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
        }
        else if (relativeToCurrentValue instanceof Boolean) {
            relativeToCurrent = (Boolean) relativeToCurrentValue;
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
                    topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
        }
        return relativeToCurrent;
    }

    @SuppressWarnings("unchecked")
    private void resolveAsString(Object resolvedValue, List<String> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            result.add((String) resolvedValue);
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@LybGeekKafkaListener can't resolve '%s' as a String", resolvedValue));
        }
    }

    @SuppressWarnings("unchecked")
    private void resolvePartitionAsInteger(String topic, Object resolvedValue,
            List<TopicPartitionInitialOffset> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            Assert.state(StringUtils.hasText((String) resolvedValue),
                    "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
            result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
        }
        else if (resolvedValue instanceof Integer[]) {
            for (Integer partition : (Integer[]) resolvedValue) {
                result.add(new TopicPartitionInitialOffset(topic, partition));
            }
        }
        else if (resolvedValue instanceof Integer) {
            result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@LybGeekKafkaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
        }
    }

    private String resolveExpressionAsString(String value, String attribute) {
        Object resolved = resolveExpression(value);
        if (resolved instanceof String) {
            return (String) resolved;
        }
        else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
                    + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return null;
    }

    private Integer resolveExpressionAsInteger(String value, String attribute) {
        Object resolved = resolveExpression(value);
        Integer result = null;
        if (resolved instanceof String) {
            result = Integer.parseInt((String) resolved);
        }
        else if (resolved instanceof Number) {
            result = ((Number) resolved).intValue();
        }
        else if (resolved != null) {
            throw new IllegalStateException(
                    "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
                            + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    private Boolean resolveExpressionAsBoolean(String value, String attribute) {
        Object resolved = resolveExpression(value);
        Boolean result = null;
        if (resolved instanceof Boolean) {
            result = (Boolean) resolved;
        }
        else if (resolved instanceof String) {
            result = Boolean.parseBoolean((String) resolved);
        }
        else if (resolved != null) {
            throw new IllegalStateException(
                    "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
                            + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    private Object resolveExpression(String value) {
        return this.resolver.evaluate(resolve(value), this.expressionContext);
    }

    /**
     * Resolve the specified value if possible.
     * @param value the value to resolve
     * @return the resolved value
     * @see ConfigurableBeanFactory#resolveEmbeddedValue
     */
    private String resolve(String value) {
        if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
            return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
        }
        return value;
    }

    private void addFormatters(FormatterRegistry registry) {
        for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
            registry.addConverter(converter);
        }
        for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
            registry.addConverter(converter);
        }
        for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {
            registry.addFormatter(formatter);
        }
    }

    private <T> Collection<T> getBeansOfType(Class<T> type) {
        if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
            return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
                    .getBeansOfType(type)
                    .values();
        }
        else {
            return Collections.emptySet();
        }
    }

    /**
     * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying
     * instance to use. Useful if the factory to use is determined once the endpoints
     * have been registered but not created yet.
     * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory
     */
    private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {

        private final DefaultFormattingConversionService defaultFormattingConversionService =
                new DefaultFormattingConversionService();

        private MessageHandlerMethodFactory messageHandlerMethodFactory;

        public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
            this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
        }

        @Override
        public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
            return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
        }

        private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
            if (this.messageHandlerMethodFactory == null) {
                this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
            }
            return this.messageHandlerMethodFactory;
        }

        private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
            Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
            if (validator != null) {
                defaultFactory.setValidator(validator);
            }
            defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);

            ConfigurableBeanFactory cbf =
                    LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
                            (ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :
                            null;


            this.defaultFormattingConversionService.addConverter(
                    new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));

            defaultFactory.setConversionService(this.defaultFormattingConversionService);

            List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();

            // Annotation-based argument resolution
            argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
            argumentResolvers.add(new HeadersMethodArgumentResolver());

            // Type-based argument resolution
            final GenericMessageConverter messageConverter =
                    new GenericMessageConverter(this.defaultFormattingConversionService);
            argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
            argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {


                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
                    Object resolved = super.resolveArgument(parameter, message);
                    /*
                     * Replace KafkaNull list elements with null.
                     */
                    if (resolved instanceof List) {
                        List<?> list = ((List<?>) resolved);
                        for (int i = 0; i < list.size(); i++) {
                            if (list.get(i) instanceof KafkaNull) {
                                list.set(i, null);
                            }
                        }
                    }
                    return resolved;
                }

                @Override
                protected boolean isEmptyPayload(Object payload) {
                    return payload == null || payload instanceof KafkaNull;
                }

            });
            defaultFactory.setArgumentResolvers(argumentResolvers);

            defaultFactory.afterPropertiesSet();
            return defaultFactory;
        }

    }

    private static class BytesToStringConverter implements Converter<byte[], String> {


        private final Charset charset;

        BytesToStringConverter(Charset charset) {
            this.charset = charset;
        }

        @Override
        public String convert(byte[] source) {
            return new String(source, this.charset);
        }

    }

    private static class ListenerScope implements Scope {

        private final Map<String, Object> listeners = new HashMap<>();

        ListenerScope() {
            super();
        }

        public void addListener(String key, Object bean) {
            this.listeners.put(key, bean);
        }

        public void removeListener(String key) {
            this.listeners.remove(key);
        }

        @Override
        public Object get(String name, ObjectFactory<?> objectFactory) {
            return this.listeners.get(name);
        }

        @Override
        public Object remove(String name) {
            return null;
        }

        @Override
        public void registerDestructionCallback(String name, Runnable callback) {
        }

        @Override
        public Object resolveContextualObject(String key) {
            return this.listeners.get(key);
        }

        @Override
        public String getConversationId() {
            return null;
        }

    }

}

业务侧如何使用

示例

@LybGeekKafkaListener(id = "createUser",topics = Constant.USER_TOPIC)
public class UserComsumer extends BaseComusmeListener {

    @Autowired
    private UserService userService;

    @Override
    public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
        User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);

        return userService.isExistUserByUsername(user.getUsername());
    }

    @Override
    public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
        User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
        return userService.save(user);
    }
}

总结

有时候我们在宣导一些事情时,往往会发现即使我们已经说了N遍了,事情仍然会出现纰漏。这时候我们可以考虑把我们想宣导的东西工具化,通过工具来规范。比如有些业务,可能一些开发没考虑全面,我们就可以基于业务,把一些核心的场景抽象成方法,然后开发人员基于这些抽象方法,做具体实现。

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容