再看kafka——spring boot集成kafka

之前自己写过一篇入门文章kafka简单入门及与spring boot整合,主要是结合kafka官方的文档入门,学习了一些基本概念,以及在spring boot中简单使用。这次主要是学习如何使用kafka,尤其是自定义的一些配置。自己看了一些文档感觉都不是特别的清晰,而且github上的demo又过于简略,所以自己决定来踩踩坑。spring kafka文档,这个是spring的感觉相对还是不错的,比spring boot文档中kafka的内容要详细很多。今天这次学习主要是使用kafka作为消息队列使用,不涉及关于"流"方面的内容。


一、项目准备

快速创建一个spring boot项目,选择需要的依赖web、kafka等等。我因为使用的还是以前的项目,主要就是postgresql、web、mybatis、kafka还有mybatis的generator插件等等。根据数据库表生成对应的model。创建相应的controller、service。
先本地启动kafka,因为kafka依赖于zookeeper,所以先启动zookeeper

./zookeeper-server-start.sh ../config/zookeeper.properties 

然后启动kafka

./kafka-server-start.sh ../config/server.properties

接着在相关的 service注入KafkaTemplate,以便发送消息。KafkaTemplate有多个重载的send方法,虽然方法参数不同,但是最终执行的都是doSend方法,而这个方法的参数为ProducerRecord<?,?>,其实就是将重载方法的参数都转换成ProducerRecord<?,?>而已,看下KafkaTemplate的代码:

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
        return doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
        return doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
        return doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
            @Nullable V data) {

        ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
        return doSend(producerRecord);
    }

    @Override
    public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
        return doSend(record);
    }

    @SuppressWarnings("unchecked")
    @Override
    public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
        ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
        if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
            byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
            if (correlationId != null) {
                producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
            }
        }
        return doSend((ProducerRecord<K, V>) producerRecord);
    }

这一点知道就可以了,现在也没必要去深究,目前还是学习如何使用为目的。

二、使用自动配置

目前我的项目有3张表,3个对应的model以及对应的controller、service、mapper。现在假设一个场景:在成功向数据库写入一条数据之后,使用kafka发送一条消息,消息内容就是对应的model,即写入数据库的数据,然后消费者接到消息后做进一步处理(进一步处理是啥我也不清楚....)。
所以我需要一个consumer来接收消息。接下来定一个MessageConsumer用来接收kafka发送的消息,因为有3个不同的model,所以我定义3个不同的Topic来区分。代码如下:

@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {
    @KafkaHandler
    public void listen(ConsumerRecord<?,?> record) {
        log.info(">>>> kafka consumer record message={} <<<<",record.toString());
    }

}

@KafkaListener注解加在类上或者方法上都是可以的,个人感觉没什么区别。这里topics是一个数组,监听所有Topic的消息。不过如果@KafkaListener加在类上,监听方法则需要再加上@KafkaHandler注解。这里接收参数使用的是ConsumerRecord<?,?>,对应上面提到的ProducerRecord<?,?>。都是将消息进行了一层包装而已。
基本上一个简单的消息发送和接收就完成了,发送消息时带上自定义的Topic和对应的数据就可以了。到现在还没有做任何和kafka相关的配置内容,但是这样启动项目肯定是会报错的,因为没有配置消费者的groupId。当然如果不配置也是可以的,不过需要在@KafkaListener增加一个id属性,我选择在配置文件指定一个groupId,配置文件增加一个配置:

spring.kafka.consumer.group-id=custom_kafka

接着再启动项目,并在写入数据库之后使用KafkaTemplate发送消息,service代码如下:

    @Override
    public Map<String, Object> save(Product product) {
        Map<String,Object> resultMap = new HashMap<>();

        int insert = productMapper.insert(product);
        if (insert == 1) {
            kafkaTemplate.send(TOPIC,product);
            resultMap.put("success",true);
            resultMap.put("message","save success and send to MQ succeed");
        } else {
            // 保存失败
            resultMap.put("message","save failed");
        }
        return resultMap;
    }

项目启动后使用rest client调用接口,结果抛出异常,异常信息如下:

java.lang.ClassCastException: com.ypc.kafka.entity.Product cannot be cast to java.lang.String
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar:na]

根据上面的代码可知发送的消息为一个Product对象,而异常信息表示是Product不能转成String。看到应该很清楚了就是序列化的问题,而且kafka默认的kafka默认的序列化就是StringSerializer。而且根据项目启动时kafka的一些信息也可以得出这个结论。


图-1.png

既然kafka默认不配置不行,那就需要进行自己的配置。


三、自定义配置

既然要修改默认序列化配置,那么我们需要知道有哪些,在"org.apache.kafka.common.serialization"下有9个序列化相关的类,但是除了StringSerializer之外其他的序列化基本都是基础数据类型,如果是自定义类型还是没办法使用。其实每次说到序列化我一般就会想到使用json,因为json使用非常的广泛,后面网上找了一下spring提供了对json序列化的支持,所以配置文件添加对key和value该项配置:

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

接着我们再次启动项目,并调用接口测试接收消息,依然报错,这个真的有点出乎意料,异常信息如下:

rg.springframework.kafka.KafkaException: No method found for class java.lang.String
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:170) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:279) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:67) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]

No method found for class java.lang.String??只能跟踪源码了。看DelegatingInvocableHandler的invoke方法的代码:

    public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
        Class<? extends Object> payloadClass = message.getPayload().getClass();
        InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
        Object result = handler.invoke(message, providedArgs);
        Expression replyTo = this.handlerSendTo.get(handler);
        return new InvocationResult(result, replyTo, this.handlerReturnsMessage.get(handler));
    }

    protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
        InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
        if (handler == null) {
            handler = findHandlerForPayload(payloadClass);
            if (handler == null) {
                throw new KafkaException("No method found for " + payloadClass);
            }
            this.cachedHandlers.putIfAbsent(payloadClass, handler); //NOSONAR
            setupReplyTo(handler);
        }
        return handler;
    }

Message中封装的就是发送的消息,即payload,因为前面将Product序列化成了json,即String类型,所以得到的payloadClass变量为String类型,接着getHandlerForPayload方法从cachedHandlers没有获取到对应的handler,findHandlerForPayload方法返回结果依然为null。难道是因为Consumer里面没有定义接收String类型参数的方法吗??测试一下吧,修改MessageConsumer代码如下:

@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {

    @KafkaHandler
    public void listen(ConsumerRecord<?,?> record) {
        System.out.println(record);
        log.info(">>>> kafka consumer record message={} <<<<",record.toString());
    }

    @KafkaHandler
    public void listen(String message) {
        log.info(">>>> kafka consumer String message={} <<<<",message);
    }
}

添加一个专门接收String类型参数的方法,然后再次测试,测试结果成功:


图-2.png

反思一下:其实之所以遇到上面这个问题,有自己先入为主的思想,以为监听的时候接收的数据类型都可以包装在ConsumerRecord<?,?>对象中,然后从这个对象中获取具体的消息体即可。但是自己忘了一个前提条件,就是这个项目中@KafkaListener注解是在类上,而不是方法上,有兴趣的可以去测试一下,对比一下@KafkaListener在类上和在方法上的区别。我测试发现如果@KafkaListener加在方法上使用ConsumerRecord<?,?>是可以接收任何消息的,因为这时候使用的是默认的handler。另外如果@KafkaListener在方法上和在类上的消费者同时存在,在类上的消费者是无法接收到消息的。也就是优先使用@KafkaListene在方法上的消费者。
根据上面遇到的情况,我觉得可以重载多个不同的方法针处理不同的消息类型,比如我有Order、User、Product三种不同的类型,那么我可以定义三个不同的方法来处理这些数据。不过因为对数据进行了json格式序列化,所以如果要接收这三种类型的数据,还需要对其进行反序列化处理,所以配置文件增加反序列化配置:

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

然后MessageConsumer增加两个方法,代码如下:

@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {

    @KafkaHandler
    public void user(User user) {
        // 处理 user ...
        log.info(">>>> kafka consumer UserType message={} <<<<",user.toString());
    }

    @KafkaHandler
    public void order(Order order) {
        // 处理 order ...
        log.info(">>>> kafka consumer OrderType message={} <<<<",order.toString());
    }

    @KafkaHandler
    public void listen(ConsumerRecord<?,?> record) {
        log.info(">>>> kafka consumer record message={} <<<<",record.toString());
    }

    @KafkaHandler
    public void listen(String message) {
        log.info(">>>> kafka consumer String message={} <<<<",message);
    }

}

之所以没有定义接收Product类型方法,我还是想通过ConsumerRecord<?,?>这个对象来处理所有未知数据类型。
启动项目,调用Order类型的接口,测试能否接收到数据。调用之后报错,报错信息如下:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ORDER_TOPIC-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.ypc.kafka.entity.Order' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

意思就是我定义的实体类不在反序列信任的包内,默认只信任java.util, java.lang这两个包中的类,所以需要将实体类的包添加为可信任,或者使用"*"将所有的都添加为可信任的。所以需要再添加一个配置

spring.kafka.consumer.properties.spring.json.trusted.packages=com.ypc.kafka.entity

然后再次测试,没有报错结果如下:


图-3.png

通过上面的这些配置,一步步实现了发送数据的序列化和反序列化,也完成了通过具体数据类型来执行不同的方法。当然也不是非要这么做,其实也可以通过监听器监听的Topic来实现,即不同的数据类型使用不同的Topic,比如Order类型那么只监听该类型的Topic就行了,但是前提是@KafkaListener需要在方法上。


接下来就是关于通用类型的处理,在MessageConsumer中我只写了处理Order和User这两种类型的方法,之所以没有写Product的类型,我是想准备一个通用的方法来处理未知类型(假设Product类型未知),但是ConsumerRecord<?,?>类型接收是不可行的,这个上面已经说过了。那我使用一个Object类型是否可行呢?添加一个方法:

    @KafkaHandler
    public void object(Object object) {
        log.info(">>>> kafka consumer UnknownType message={} <<<<",object.toString());

    }

然后调用Product相关接口测试一下,结果报错:


图-4.png

可以接收到Product类型,而且根据输出结果可以看出消息其实还是封装在ConsumerRecord里面的,是不是我上面使用ConsumerRecord的姿势不正确啊???使用Object真的可行吗,当然不行,至少现在是不可行的。如果我再调用一下Order的接口,结果报错:

org.springframework.kafka.KafkaException: Ambiguous methods for payload type: class com.ypc.kafka.entity.Order: object and order

也就是说其实识别出来这个类型为Order了,但是因为有了Object的存在导致它不知道该使用哪个方法来处理这个数据。如果只定义一个方法,即所有类型都使用Object这样是没问题的。那么能不能指定一个默认的handler,即如果类型确定的情况下由相应的方法处理,不确定的情况则由公用方法处理。@KafkaHandler正好有一个属性就是设置是否为默认处理器的,所以Object方法修改如下:

    @KafkaHandler(isDefault = true)
    public void object(Object object) {
        log.info(">>>> kafka consumer UnknownType message={} <<<<",object.toString());

    }

再次使用Order和Product的接口进行测试,结果如下:


图-5.png

从日志输出结果来看是没有问题了,即可以接收其他类型数据,且不影响已知的类型数据处理。但是因为这个Object之包含消息体本身,所以很难进一步处理,所以我觉得还是回到使用ConsumerRecord<?,?>这个路子上比较好,将MessageConsumer代码最终修改如下:

@Slf4j
@Component
@KafkaListener(topics = {"USER_TOPIC","ORDER_TOPIC","PRODUCT_TOPIC"})
public class MessageConsumer {

    @KafkaHandler
    public void user(User user) {
        // 处理 user ...
        log.info(">>>> kafka consumer UserType message={} <<<<",user.toString());
    }

    @KafkaHandler
    public void order(Order order) {
        // 处理 order ...
        log.info(">>>> kafka consumer OrderType message={} <<<<",order.toString());
    }

    @KafkaHandler(isDefault = true)
    public void listen(ConsumerRecord<?, ?> record) {
        log.info(">>>> kafka consumer CommonType message={} <<<<",record.toString());
    }
}

四、使用java配置的方式

因为spring推荐使用java配置的方式,所以根据将相关配置转成java代码就行了,代码如下:

@Configuration
//@EnableKafka
public class KafkaConfig {

    @Bean
    public DefaultKafkaHeaderMapper defaultKafkaHeaderMapper() {
        DefaultKafkaHeaderMapper defaultKafkaHeaderMapper = new DefaultKafkaHeaderMapper();
        defaultKafkaHeaderMapper.addTrustedPackages("com.ypc.kafka.entity");

        return defaultKafkaHeaderMapper;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return props;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"custom_kafka");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.TRUSTED_PACKAGES,"com.ypc.kafka.entity");

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public ConsumerFactory<?,?> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<String, Object>(producerFactory(),true);
    }

}

我看的官方文档说需要在配置类加上@EnableKafka注解,但是我测试的时候发现不加也是可以的,这个不知道是什么情况。
配置文件如下:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=custom_kafka

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=true
#
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
# kafka额外配置 
#spring.kafka.consumer.properties.spring.json.value.default.type=com.ypc.kafka.entity.Order    
#spring.kafka.producer.properties.spring.json.type.mapping=order:com.ypc.kafka.entity.Order,user:com.ypc.kafka.entity.User
spring.kafka.consumer.properties.spring.json.trusted.packages=com.ypc.kafka.entity
#spring.kafka.consumer.properties.spring.json.type.mapping=order:com.ypc.kafka.entity.Order,user:com.ypc.kafka.entity.User

有些配置自己是在java代码中没有设置的,比如自动提交、添加json类型到header等等。


五、小结

其实这次学习也算是从0入门的一个过程,即从默认配置一点点增加、修改,然后测试,内容并不算多。这次相对上次有几个不同之处:一是使用注解不同,这次@KafkaListener注解在类上,且使用了@KafkaHandler注解;二是一个监听器监听多个Topic,且有多个类型的handler。当然其中涉及到自定义序列化和反序列化的问题是这次学习的主要内容,现在看来确实挺简单,但是实际中还是遇到了很多的坑。在上面的配置文件中还有两个遗留问题,没有解决,一是spring.kafka.producer.properties.spring.json.add.type.headers=true
这个配置的使用和意义,自己没有去查询具体head的内容,因为是二进制数据,debug的时候没有看到具体信息。二是:
spring.kafka.producer.properties.spring.json.type.mapping=
spring.kafka.consumer.properties.spring.json.type.mapping=
这两个配置的使用问题,这个让我有点迷惑,等有时间自己再试着看看。这次的代码提交到我的github,如果有什么问题欢迎交流。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容