这次的Demo基于的次的基础上,生产者每秒钟向key1,key2分别放10条消息。消费者每秒处理10条消息,要求从key1和key2处理的消息比例是8:2。
代码
消费者类
package t2;
import redis.clients.jedis.Jedis;
public class Consumer implements Runnable {
Jedis jedis = new Jedis("127.0.0.1");
@Override
public void run() {
// TODO Auto-generated method stub
//
for(int i=0;i<10000;i++){
try{
Thread.sleep(1000);
int n;
for(i=1,n=0;i<=8;i++,n++){
System.out.println(jedis.blpop(0,"key1","key2"));
}
for(i=1;i<=(10-n);i++){
System.out.println(jedis.blpop(0,"key2","key1"));
}
System.out.print("");
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生产者1
package t2;
import redis.clients.jedis.Jedis;
public class Producer1 implements Runnable{
static Jedis jedis = new Jedis("127.0.0.1");
public static void push(String key,String msg){
jedis.rpush(key, msg);
}
public static String getMsg(String tel,String content){
return tel+content;
}
@Override
public void run() {
for (int i = 1; i < 100; i++) {
jedis.rpush("key1", "这是第"+i+"条验证码");
try {
Thread.sleep(100);
System.out.print("");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// TODO Auto-generated method stub
}
}
生产者2
package t2;
import redis.clients.jedis.Jedis;
public class Producer2 implements Runnable{
static Jedis jedis = new Jedis("127.0.0.1");
public static void push(String key,String msg){
jedis.rpush(key, msg);
}
public static String getMsg(String tel,String content){
return tel+content;
}
@Override
public void run() {
for (int i = 1; i < 100; i++) {
try {
Thread.sleep(100);
jedis.rpush("key2", "这是第"+i+"条广告");
System.out.print("");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// TODO Auto-generated method stub
}
}
主程序
package t2;
import redis.clients.jedis.Jedis;
public class Test {
public static void main(String[] args){
Jedis jedis = new Jedis("127.0.0.1");
jedis.del("key1");
jedis.del("key2");
Consumer d1=new Consumer();
Producer1 d2=new Producer1();
Producer2 d3=new Producer2();
Thread t1=new Thread(d1);
Thread t2=new Thread(d2);
Thread t3=new Thread(d3);
t1.start();
t2.start();
t3.start();
}
}