项目地址
使用和rocket相似
// producer
public class ProducerExample {
static final String BROKER = "localhost:8989";
static final String TOPIC = "test-topic";
public static void main(String[] args) {
DefaultProducer producer = new DefaultProducer();
producer.start();
producer.addBroker(TOPIC, BROKER);
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setTopic(TOPIC);
msg.setBody(("this is body" + i).getBytes(Charset.forName("UTF-8")));
SendResult result = producer.send(msg);
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
System.out.println("success");
} else {
System.out.println("error");
}
}
}
}
// consumer
public class ConsumerExample {
static final String BROKER = "localhost:8989";
static final String TOPIC = "test-topic";
public static void main(String[] args) {
DefaultConsumer consumer = new DefaultConsumer();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(Message msg) {
System.out.println(new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
consumer.addBroker(TOPIC, BROKER);
consumer.subscribe(TOPIC);
}
}
后期准备做的
- 完善客户端channel管理。
- broker分布式。这依赖于客户端的负载均衡,将topic分片存储。由于目前无持久化,因此使用同步双写保证数据一致性。
- 数据持久化。