模仿rocket实现mq

项目地址
使用和rocket相似

// producer
public class ProducerExample {
    static final String BROKER = "localhost:8989";
    static final String TOPIC = "test-topic";

    public static void main(String[] args) {
        DefaultProducer producer = new DefaultProducer();
        producer.start();
        producer.addBroker(TOPIC, BROKER);
        for (int i = 0; i < 10; i++) {
            Message msg = new Message();
            msg.setTopic(TOPIC);
            msg.setBody(("this is body" + i).getBytes(Charset.forName("UTF-8")));
            SendResult result = producer.send(msg);
            if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
                System.out.println("success");
            } else {
                System.out.println("error");
            }
        }
    }
}
// consumer
public class ConsumerExample {
    static final String BROKER = "localhost:8989";
    static final String TOPIC = "test-topic";

    public static void main(String[] args) {
        DefaultConsumer consumer = new DefaultConsumer();

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(Message msg) {
                System.out.println(new String(msg.getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        consumer.addBroker(TOPIC, BROKER);
        consumer.subscribe(TOPIC);
    }
}

后期准备做的

  • 完善客户端channel管理。
  • broker分布式。这依赖于客户端的负载均衡,将topic分片存储。由于目前无持久化,因此使用同步双写保证数据一致性。
  • 数据持久化。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。