本文参考学习了:
Java代码示例
public class ConnectionFactoryHolder {
private final static ConnectionFactory connectionFactory;
static {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
connectionFactory = factory;
} catch (Exception e) {
throw new RuntimeException("Failed to create connection factory", e);
}
}
public static ConnectionFactory getConnectionFactory(String username, String password, String virtualHost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
public static ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public static Connection newConnection() throws IOException, TimeoutException {
return getConnectionFactory().newConnection();
}
}
public class InternalConsumer implements DeliverCallback {
private final String name;
private final String queueName;
private final Channel channel;
private final boolean autoAck;
public InternalConsumer(Channel channel, String queueName, boolean autoAck) {
this.channel = channel;
this.queueName = queueName;
this.name = "Consumer-" + queueName;
this.autoAck = autoAck;
}
@Override
public void handle(String consumerTag, Delivery delivery) {
try {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(name + " Received message: " + message);
if (!autoAck) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
public void basic() throws IOException {
channel.basicConsume(queueName, autoAck, this, consumerTag -> {
});
}
}
简单队列/Work模式:
生产消息
public class SimpleSend {
public final static String QUEUE_NAME = "test";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryHolder.getConnectionFactory().newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
AtomicInteger atomicInteger = new AtomicInteger(0);
Runnable r = () -> {
for (; atomicInteger.get() < 1; atomicInteger.incrementAndGet()) {
String message = "Message#" + atomicInteger.get();
try {
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
r.run();
// new Thread(r).start();
// new Thread(r).start();
}
}
消费消息
public class SimpleConsumers {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionFactoryHolder.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(SimpleSend.QUEUE_NAME, true, false, false, null);
// channel.basicQos(1);
// System.out.println("Consumer " + 1 + " started");
// channel.basicConsume(Send.QUEUE_NAME, false,
// new InternalConsumer(channel, "consumer" + 1), consumerTag -> {
// });
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
int finalI = i;
executor.execute(() -> {
try {
Channel cl = connection.createChannel();
cl.basicQos(1);
System.out.println("Consumer " + finalI + " started");
new InternalConsumer(cl, SimpleSend.QUEUE_NAME, false).basic();
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 关闭线程池(这里只是为了展示,实际应用中应根据业务逻辑决定何时关闭)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executor.shutdown();
try {
if (!executor.awaitTermination(800, java.util.concurrent.TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
executor.shutdownNow();
}
}));
}
}
订阅模式
生产消息
public class FanoutSend {
public static final String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费消息
public class FanoutConsumers {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FanoutSend.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName1 = "fanout_test1";
channel.queueDeclare(queueName1, true, false, false, null);
String queueName2 = "fanout_test2";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName1, FanoutSend.EXCHANGE_NAME, "");
channel.queueBind(queueName2, FanoutSend.EXCHANGE_NAME, "");
new InternalConsumer(channel, queueName1, false).basic();
new InternalConsumer(channel, queueName2, false).basic();
}
}
路由模式
生产消息
public class DirectSend {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, "error", null, "error".getBytes());
channel.basicPublish(EXCHANGE_NAME, "info", null, "info".getBytes());
channel.basicPublish(EXCHANGE_NAME, "wran", null, "wran".getBytes());
channel.basicPublish(EXCHANGE_NAME, "debug", null, "debug".getBytes());
channel.basicPublish(EXCHANGE_NAME, "", null, "empty".getBytes());
}
}
}
消费消息
public class DirectConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DirectSend.EXCHANGE_NAME, "direct");
String queue1 = "direct_test1";
channel.queueDeclare(queue1, true, false, false, null);
channel.queueBind(queue1, DirectSend.EXCHANGE_NAME, "error");
channel.queueBind(queue1, DirectSend.EXCHANGE_NAME, "info");
channel.queueBind(queue1, DirectSend.EXCHANGE_NAME, "wran");
channel.queueUnbind(queue1, DirectSend.EXCHANGE_NAME, "");
String queue2 = "direct_test2";
channel.queueDeclare(queue2, true, false, false, null);
channel.queueBind(queue2, DirectSend.EXCHANGE_NAME, "info");
channel.queueBind(queue2, DirectSend.EXCHANGE_NAME, "debug");
channel.queueBind(queue2, DirectSend.EXCHANGE_NAME, "");
new InternalConsumer(channel, queue1, false).basic();
new InternalConsumer(channel, queue2, false).basic();
}
}
主题模式
生产消息
public class TopicSend {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.basicPublish(EXCHANGE_NAME, "log.error", null, "error".getBytes());
channel.basicPublish(EXCHANGE_NAME, "log.info", null, "info".getBytes());
channel.basicPublish(EXCHANGE_NAME, "log.wran", null, "wran".getBytes());
channel.basicPublish(EXCHANGE_NAME, "log.debug", null, "debug".getBytes());
channel.basicPublish(EXCHANGE_NAME, "log.debug.important.abc", null, "debug important".getBytes());
channel.basicPublish(EXCHANGE_NAME, "", null, "empty".getBytes());
}
}
}
消费消息
public class TopicConsumers {
public static final String EXCHANGE_NAME = TopicSend.EXCHANGE_NAME;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = ConnectionFactoryHolder.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName1 = "topic_test1";
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueBind(queueName1, EXCHANGE_NAME, "log.*");
channel.queueBind(queueName1, EXCHANGE_NAME, "log.*.*.abc");
String queueName2 = "topic_test2";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, EXCHANGE_NAME, "log.#");
channel.queueBind(queueName2, EXCHANGE_NAME, "");
new InternalConsumer(channel, queueName1, false).basic();
new InternalConsumer(channel, queueName2, false).basic();
}
}