RabbitMQ消息队列Android的实现方式之接收(二)

上一期说了RabbitMQ发送消息的方法,这部分发一下接收和关闭连接的方法实现,废话少说直接上代码。

private void subscribe(final Handler handler) {//创建消费者,并监听消费信息
        subscribeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (isCycle) {//定义轮询方法
                    try {
                        connection = factory.newConnection();//创建新的连接
                        channel = connection.createChannel();//创建通道
                        channel.basicQos(1);//一条一条接收消息
                        String queueName = "queueName" + System.currentTimeMillis();//channel.queueDeclare().getQueue();//随机生成队列名
                        //参数:1、队列名 2、是否永久保留队列 3、是否独占队列 4、连接不存在时是否自动删除队列 5其他属性(构造参数),没有可以null
                        AMQP.Queue.DeclareOk q = channel.queueDeclare(queueName, false, false, true, null);
                        //将队列名、交换机名称、交换机key绑定在一起。
                        channel.queueBind(q.getQueue(), "CalonDirectExchange", "CalonDirectRouting");
                        isCycle = false;//禁止连接循环
                        Consumer consumer = new DefaultConsumer(channel) {//新版本更新的接收方法,无需手写轮询
                            @Override
                            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                                super.handleDelivery(consumerTag, envelope, properties, body);
                                String mMessage = new String(body, "UTF-8");
                                //根据自己的需求定义的接收内容
                                Gson gson = new Gson();
                                AgvStateBean bean = gson.fromJson(mMessage, AgvStateBean.class);//json转实体
                                int code = bean.getCode();
                                Message message = handler.obtainMessage();
                                Bundle bundle = new Bundle();
                                bundle.putSerializable("bean", bean);
                                message.setData(bundle);
                                switch (code) {
                                    case 2000:
                                        message.what = 2000;
                                        handler.sendMessage(message);
                                        Log.e("AGV", "2000");
                                        break;
                                    case 4000:
                                        message.what = 4000;
                                        handler.sendMessage(message);
                                        Log.e("AGV", "4000");
                                        break;
                                }

                            }
                        };
                        channel.basicConsume(q.getQueue(), true, consumer);
                        Message messageCode = new Message();
                        messageCode.what = 200;//连接成功
                        handler.sendMessage(messageCode);
                        while (!isCycle) {//判断连接是否端口,断开重连
                            if (!connection.isOpen()) {
                                try {
                                    Thread.sleep(5000); //sleep and then try again沉睡5秒返回循环
                                    isCycle = true;
                                } catch (InterruptedException e) {
                                    break;
                                }
                            }

                        }
                        /*while (true) {老方法
                            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                            String message = new String(delivery.getBody());
                            Log.d("","[r] " + message);
                            Message msg = handler.obtainMessage();
                            Bundle bundle = new Bundle();
                            bundle.putString("msg", message);
                            msg.setData(bundle);
                            handler.sendMessage(msg);
                        }*/
                    } catch (Exception e1) {
                        Message message = new Message();
                        message.what = 400;
                        Bundle bundle = new Bundle();
                        if (reconnection_count > 5) {//自定义重连次数,到数停止重连
                            String strConn = "尝试重连失败!";
                            bundle.putString("conn_interrupt", strConn);
                            bundle.putBoolean("close_conn", true);
                            message.setData(bundle);
                            handler.sendMessage(message);
                            reconnection_count = 1;
                            return;
                        }

                        String strConn = "连接中断,正在尝试第" + reconnection_count + "次重连";
                        bundle.putBoolean("close_conn", false);
                        bundle.putString("conn_interrupt", strConn);
                        message.setData(bundle);
                        handler.sendMessage(message);
                        reconnection_count++;
                        Log.d("", "Connection broken: " + e1.getClass().getName());
                        try {
                            Thread.sleep(5000); //sleep and then try again
                        } catch (InterruptedException e) {
                            break;
                        }
                    }
                }
            }
        });
        subscribeThread.start();//start thread
    }

断开连接的方法

public void closeConn() {//关闭连接并暂停子线程
        Thread closeThread = new Thread(() -> {
            try {
                //判断连接是否存在或连接,否则会崩溃
                if (connection != null && connection.isOpen()) connection.close();
                if (channel != null && channel.isOpen()) channel.close();
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        });
        closeThread.start();
        closeThread.interrupt();

        if (publishThread != null) publishThread.interrupt();
        if (subscribeThread != null) subscribeThread.interrupt();
    }

好了,到这里关于Android版RabbitMQ的发送和接收方法就都完成了。代码写的欠妥的地方欢迎各位大神们指正。谢谢大家

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容