import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.springframework.jms.support.JmsUtils;
public class MyThread extends Thread {
private int REDELIVER_LIMIT = 3;
private volatile boolean pause = false;
private volatile boolean stop = false;
private ActiveMQConnectionFactory factory;
private Session session;
private Connection conn;
private MessageProducer producer;
private MessageConsumer consumer;
private String selector;
private List<Message> consumedMessage = new ArrayList<Message>();
private List<Long> cachedProcessed = new ArrayList<Long>();
public MyThread(ActiveMQConnectionFactory factory, String clientId) {
this.factory = factory;
selector = String.format("clientId='%s'", clientId);
}
public void run() {
doConnect();
cleanRemainedResponse();
while (stop) {
try {
while (pause) {
//需要clean?
doClean();
doDisconnect();
onPause();
doConnect();
}
if (!process()) {
pause = true;
}
doClean();
} catch (Exception e) {
if (e.getCause() instanceof InterruptedException) {
// 清理interrupt状态
System.out.println("Interrupted");
}
pause = true;
//需要clean吗
doClean();
}
}
doClean();
doDisconnect();
}
//有意义吗? 成功了ack不了, 失败了不如重试?
//假如可以拿到coSeqNo
private void cleanRemainedResponse() {
String message = null;
while(true){
try {
message = consumer.receiveNoWait().toString();
if(message == null) break;
if (message.equals("T")) {
stashProcessed.add(1l);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private boolean process() throws Exception {
doConsume();
int cnt = 0;
while (cnt++ < REDELIVER_LIMIT) {
System.out.println(String.format("Start to process, redeliver count: %d", cnt));
String messageFinished = doTrigger(cnt);
if (messageFinished == null)
continue;
boolean status = doHandleResponse(messageFinished, cnt);
System.out.println(String.format("Finish to process, status:%b, redeliver count: %d", status, cnt));
if (!status)
continue;
}
return cnt > REDELIVER_LIMIT;
}
private void doConsume() throws Exception {
try {
for (int i = 0; i < 100; i++) {
Message message = consumer.receive();
if(1l > cachedProcessed.get(cachedProcessed.size()-1)){
//没意义了
cachedProcessed.clear();
}else{
}
if (parseMessage(message)) {
doHandleDataError(message, "", 1l);
pause = true; //Stop consuming
System.out.println(String.format("Finish consuming, cause: the format of %d is incorrect. Consumed message: [%s]", 1l));
return;
}
}
System.out.println(String.format("Finish consuming, cause: time out. Consumed message: [%s]", ""));
} catch (JMSException e) {
// 接收出错, 停止接收
System.out.println(String.format("Finish consuming, cause: %s. Consumed message: [%s]", e.getMessage(), ""));
doHandleJmsException();
return;
}
}
private void doHandleJmsException() {
doDisconnect();
doConnect();
}
private String doTrigger(int redeliverCount) throws Exception {
String triggerId = consumer.receive().toString();
if (triggerId == null) {
if (redeliverCount > REDELIVER_LIMIT) {
doNotify("PROCESS_ERROR", "", 1l, 2l);
}
System.out.println(String.format("Failure to generate mini-batch trigger for coSeqNo:[%s], redeliver count: %d", "", redeliverCount));
return null;
}
System.out.println(String.format("coSeqNo:[%s] is merged to %s", triggerId));
return consumer.receive().toString();
}
private void doHandleDataError(Message message, String stackTrace, long conSeq) throws Exception {
//先发通知,万一发送失败,也不会Ack
sendNotification("DATA_ERROR", stackTrace, conSeq);
message.acknowledge();
}
private boolean doHandleResponse(String messageFinished, int redeliverCount) throws Exception {
if (messageFinished.equals("t")) {
System.out.println(String.format("coSeqNo:[%s] is successfully processed", ""));
doNotify("NO_ERROR", null, 1l, 2l);
for (Message message : consumedMessage) {
message.acknowledge();
}
return true;
} else if (redeliverCount >= REDELIVER_LIMIT) {
System.out.println(String.format("coSeqNo:[%s] is failed processed, redeliver count: %d", "", redeliverCount));
doNotify("PROCESS_ERROR", null, 1l, 2l);
}
return false;
}
private boolean doNotify(String type, String stackTrace, long... conSeqNoList) throws Exception {
return sendNotification(type, stackTrace, conSeqNoList);
}
// 发送通知失败,尝试重试
// 如果重试还是失败 抛Exception?
private boolean sendNotification(String type, String stackTrace, long... conSeqNoList) throws Exception {
Message msg = new ActiveMQTextMessage();
boolean isSent = false;
for (int i = 0; i < 3 && !isSent; i++) {
try {
producer.send(msg);
} catch (JMSException e) {
System.out.println(String.format("Failure to send notifaction, coSeqNo:[%s], retry count: %d", conSeqNoList.toString(), i));
doHandleJmsException();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
if (!isSent) {
System.out.println(String.format("Failure to send notifaction, coSeqNo:[]", conSeqNoList.toString()));
throw new Exception("");
}
return isSent;
}
public void doPause() {
pause = true;
}
public void doStop() {
stop = true;
}
public boolean isPaused() {
return pause;
}
public synchronized void onPause() throws InterruptedException {
wait();
}
public synchronized void doResume() {
pause = false;
notify();
}
private void doDisconnect() {
JmsUtils.closeMessageProducer(producer);
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeSession(session);
JmsUtils.closeConnection(conn);
}
private boolean doConnect() {
try {
conn = factory.createConnection();
session = conn.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue(""));
consumer = session.createConsumer(session.createQueue(""), selector);
} catch (JMSException e) {
e.printStackTrace();
return false;
}
return true;
}
private void doClean() {
consumedMessage.clear();
}
private boolean parseMessage(Message msg) {
return true;
}
}