功能:可实现使用低级API读取指定topic,指定partition,指定offset的数据
步骤:
- 根据指定的分区从主题元数据中找到副本
- 从主副本拉取分区的消息
方法:
- getLeader() 客户端向种子节点发送主题元数据,将副本集加入备用节点
-
getData() 消费者低级AP I拉取消息的主要方法
低级消费API流程分析
具体代码实现:
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.BrokerEndPoint;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class LowLevelConsumer {
public static void main(String[] args) {
ArrayList<String> brokers = new ArrayList<>();//kafka集群
brokers.add("hadoop102");
brokers.add("hadoop103");
brokers.add("hadoop104");
int port = 9092;//连接kafka集群的端口号
String topic = "first";//待消费的主题
int partition = 0;//待消费的分区
long offset = 0;//待消费的位置信息
getData(brokers, port, topic, partition, offset);
}
//根据集群、主题和分区信息获取待消费的Leader信息
private static BrokerEndPoint getLeader(ArrayList<String> brokers, int port, String topic, int partition) {
for (String broker : brokers) {
//1、遍历集群,根据节点信息创建SimpleConsumer
SimpleConsumer getLeader = new SimpleConsumer(broker, port, 2000, 1024 * 4, "getLeader");
//2、发送元数据信息请求
//2.1、根据传入的主题信息创建元数据请求
TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic));
//2.2、发送元数据请求得到返回值
TopicMetadataResponse metadataResponse = getLeader.send(topicMetadataRequest);
//3、解析元数据返回值
//3.1、创建请求传入的为Topic集合,返回值中包含多个Topic元数据信息
List<TopicMetadata> topicsMetadata = metadataResponse.topicsMetadata();
//3.2、遍历多个Topic的元数据信息
for (TopicMetadata topicMetadata : topicsMetadata) {
//一个Topic由多个Partition组成
List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
//遍历多个分区的元数据信息
for (PartitionMetadata partitionMetadata : partitionsMetadata) {
//匹配传入的分区号
if (partition == partitionMetadata.partitionId()) {
//匹配上则直接返回leader信息
return partitionMetadata.leader();
}
}
}
}
return null;
}
private static void getData(ArrayList<String> brokers, int port, String topic, int partition, long offset) {
//获取待消费分区的Leader信息
BrokerEndPoint leader = getLeader(brokers, port, topic, partition);
if (leader == null) {
System.out.println("未找到指定主题指定分区的Leader信息!!!");
return;
}
String leaderHost = leader.host();
//4、根据Leader信息创建SimpleConsumer
SimpleConsumer getData = new SimpleConsumer(leaderHost, port, 2000, 1024 * 1024 * 4, "getData");
//5、发送抓取数据的请求
//5.1、构建抓取数据的请求
FetchRequest fetchRequest = new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 1024 * 2).build();
//5.2、发送抓取数据的请求并得到返回值
FetchResponse fetchResponse = getData.fetch(fetchRequest);
//6、解析返回值(打印)
//6.1、addFetch方法可以被循环调用,所以此处我们需要传入特定的主题和分区信息来获取具体的数据集
ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
//6.2、对结果集进行迭代操作,逐条解析
for (MessageAndOffset messageAndOffset : messageAndOffsets) {
//提取每一条消息中的数据信息
ByteBuffer payload = messageAndOffset.message().payload();
//将数据写入byte数据
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
//打印结果
System.out.println(new String(bytes));
}
}
}