ActiveMQ-API(五)

负载均衡(取模做负载均衡)简单利用线程实现吞吐量栗子实现:

package bhz.mq.action;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    //单例模式
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
    
    //3:Session对象
    private Session session;
    
    //4:生产者
    private MessageProducer messageProducer;
    
    public Producer(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        } catch (Exception e) {
            e.printStackTrace();    
        }
    }
    
    public Session getSession(){
        return this.session;
    }
    
    public void send1(){
        try {
            Destination destination = this.session.createQueue("first");
            for (int i=0;i<100;i++) {
                MapMessage msg = this.session.createMapMessage();
                int id = i;
                msg.setInt("id", id);
                msg.setString("name", "张"+i);
                msg.setStringProperty("age", ""+i);
                String receiver = id%2==0?"A":"B";
                msg.setStringProperty("receiver", receiver);
                this.messageProducer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 2,1000*60*10L);
                System.out.println("message send id :"+id);
            }
            
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void send2(){
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage msg = this.session.createTextMessage("我是一个字符串内容");
            this.messageProducer.send(destination,msg,DeliveryMode.NON_PERSISTENT,9,1000*60*10L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        Producer p = new Producer();
        p.send1();
    }
    
    
}

package bhz.mq.action;

import javax.jms.MapMessage;

public class MessageTask implements Runnable{

    private MapMessage message;
    
    public MessageTask(MapMessage message){
        this.message = message;
    }
    
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println("当前线程:"+Thread.currentThread().getName()+"处理任务:"+this.message.getString("id"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}

package bhz.mq.action;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerA {

    public final String SELECTOR="receiver = 'A'";
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public ConsumerA(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createQueue("first");
            //创建消费者的时候发生了变化
            this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
            System.out.println("Consumer A start...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
        ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
        
        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage)message;
                    executor.execute(new MessageTask(ret));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        ConsumerA c = new ConsumerA();
        c.receiver();
    }
    
}

package bhz.mq.action;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerB {

    public final String SELECTOR="receiver = 'B'";
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public ConsumerB(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createQueue("first");
            //创建消费者的时候发生了变化
            this.messageConsumer = this.session.createConsumer(this.destination, SELECTOR);
            System.out.println("Consumer B start...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
        ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),20,120L,TimeUnit.SECONDS,queue);
        
        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage)message;
                    executor.execute(new MessageTask(ret));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        ConsumerB c = new ConsumerB();
        c.receiver();
    }
    
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容