上一期说了RabbitMQ发送消息的方法,这部分发一下接收和关闭连接的方法实现,废话少说直接上代码。
private void subscribe(final Handler handler) {//创建消费者,并监听消费信息
subscribeThread = new Thread(new Runnable() {
@Override
public void run() {
while (isCycle) {//定义轮询方法
try {
connection = factory.newConnection();//创建新的连接
channel = connection.createChannel();//创建通道
channel.basicQos(1);//一条一条接收消息
String queueName = "queueName" + System.currentTimeMillis();//channel.queueDeclare().getQueue();//随机生成队列名
//参数:1、队列名 2、是否永久保留队列 3、是否独占队列 4、连接不存在时是否自动删除队列 5其他属性(构造参数),没有可以null
AMQP.Queue.DeclareOk q = channel.queueDeclare(queueName, false, false, true, null);
//将队列名、交换机名称、交换机key绑定在一起。
channel.queueBind(q.getQueue(), "CalonDirectExchange", "CalonDirectRouting");
isCycle = false;//禁止连接循环
Consumer consumer = new DefaultConsumer(channel) {//新版本更新的接收方法,无需手写轮询
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String mMessage = new String(body, "UTF-8");
//根据自己的需求定义的接收内容
Gson gson = new Gson();
AgvStateBean bean = gson.fromJson(mMessage, AgvStateBean.class);//json转实体
int code = bean.getCode();
Message message = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putSerializable("bean", bean);
message.setData(bundle);
switch (code) {
case 2000:
message.what = 2000;
handler.sendMessage(message);
Log.e("AGV", "2000");
break;
case 4000:
message.what = 4000;
handler.sendMessage(message);
Log.e("AGV", "4000");
break;
}
}
};
channel.basicConsume(q.getQueue(), true, consumer);
Message messageCode = new Message();
messageCode.what = 200;//连接成功
handler.sendMessage(messageCode);
while (!isCycle) {//判断连接是否端口,断开重连
if (!connection.isOpen()) {
try {
Thread.sleep(5000); //sleep and then try again沉睡5秒返回循环
isCycle = true;
} catch (InterruptedException e) {
break;
}
}
}
/*while (true) {老方法
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
Log.d("","[r] " + message);
Message msg = handler.obtainMessage();
Bundle bundle = new Bundle();
bundle.putString("msg", message);
msg.setData(bundle);
handler.sendMessage(msg);
}*/
} catch (Exception e1) {
Message message = new Message();
message.what = 400;
Bundle bundle = new Bundle();
if (reconnection_count > 5) {//自定义重连次数,到数停止重连
String strConn = "尝试重连失败!";
bundle.putString("conn_interrupt", strConn);
bundle.putBoolean("close_conn", true);
message.setData(bundle);
handler.sendMessage(message);
reconnection_count = 1;
return;
}
String strConn = "连接中断,正在尝试第" + reconnection_count + "次重连";
bundle.putBoolean("close_conn", false);
bundle.putString("conn_interrupt", strConn);
message.setData(bundle);
handler.sendMessage(message);
reconnection_count++;
Log.d("", "Connection broken: " + e1.getClass().getName());
try {
Thread.sleep(5000); //sleep and then try again
} catch (InterruptedException e) {
break;
}
}
}
}
});
subscribeThread.start();//start thread
}
断开连接的方法
public void closeConn() {//关闭连接并暂停子线程
Thread closeThread = new Thread(() -> {
try {
//判断连接是否存在或连接,否则会崩溃
if (connection != null && connection.isOpen()) connection.close();
if (channel != null && channel.isOpen()) channel.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
});
closeThread.start();
closeThread.interrupt();
if (publishThread != null) publishThread.interrupt();
if (subscribeThread != null) subscribeThread.interrupt();
}
好了,到这里关于Android版RabbitMQ的发送和接收方法就都完成了。代码写的欠妥的地方欢迎各位大神们指正。谢谢大家