序
本文主要研究一下AbstractOMSProducer
AbstractOMSProducer
io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static Logger log = ClientLogger.getLog();
final KeyValue properties;
final DefaultMQProducer rocketmqProducer;
private boolean started = false;
final ClientConfig clientConfig;
AbstractOMSProducer(final KeyValue properties) {
this.properties = properties;
this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName();
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(PropertyKeys.PRODUCER_ID, producerId);
}
@Override
public synchronized void startup() {
if (!started) {
try {
this.rocketmqProducer.start();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
}
}
this.started = true;
}
@Override
public synchronized void shutdown() {
if (this.started) {
this.rocketmqProducer.shutdown();
}
this.started = false;
}
OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
if (e instanceof MQClientException) {
if (e.getCause() != null) {
if (e.getCause() instanceof RemotingTimeoutException) {
return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
} else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
MQBrokerException brokerException = (MQBrokerException) e.getCause();
return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
topic, msgId, brokerException.getErrorMessage()), e);
}
}
// Exception thrown by local.
else {
MQClientException clientException = (MQClientException) e;
if (-1 == clientException.getResponseCode()) {
return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
topic, msgId), e);
} else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
topic, msgId), e);
}
}
}
return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
}
protected void checkMessageType(Message message) {
if (!(message instanceof BytesMessage)) {
throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
}
}
@Override
public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.TOPIC, topic);
return bytesMessage;
}
@Override
public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.QUEUE, queue);
return bytesMessage;
}
}
- AbstractOMSProducer实现了ServiceLifecycle以及MessageFactory
- ServiceLifecycle的startup里头调用DefaultMQProducer的start方法,shutdown里头调用DefaultMQProducer的shutdown方法
- MessageFactory的createBytesMessage的方法主要是返回了BytesMessageImpl
MessageFactory
io/openmessaging/openmessaging-api/0.1.0-alpha/openmessaging-api-0.1.0-alpha-sources.jar!/io/openmessaging/MessageFactory.java
/**
* A factory interface for creating {@code Message} objects.
*
* @author vintagewang@apache.org
* @author yukon@apache.org
*/
public interface MessageFactory {
/**
* Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a
* stream of uninterpreted bytes.
* <p>
* The returned {@code BytesMessage} object only can be sent to the specified topic.
*
* @param topic the target topic to send
* @param body the body data for a message
* @return the created {@code BytesMessage} object
* @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error.
*/
BytesMessage createBytesMessageToTopic(String topic, byte[] body);
/**
* Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a
* stream of uninterpreted bytes.
* <p>
* The returned {@code BytesMessage} object only can be sent to the specified queue.
*
* @param queue the target queue to send
* @param body the body data for a message
* @return the created {@code BytesMessage} object
* @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error.
*/
BytesMessage createBytesMessageToQueue(String queue, byte[] body);
}
- 0.1.0-alpha这个版本区分了topic跟queue,不过在最新版已经移除掉topic,统一为createBytesMessage方法,发送到queue
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java
public interface MessageFactory {
/**
* Creates a {@code BytesMessage} object. A {@code BytesMessage} object is used to send a message containing a
* stream of uninterpreted bytes.
* <p>
* The returned {@code BytesMessage} object only can be sent to the specified queue.
*
* @param queue the target queue to send
* @param body the body data for a message
* @return the created {@code BytesMessage} object
* @throws OMSRuntimeException if the OMS provider fails to create this message due to some internal error.
*/
BytesMessage createBytesMessage(String queue, byte[] body);
}
BytesMessageImpl
io/openmessaging/rocketmq/domain/BytesMessageImpl.java
public class BytesMessageImpl implements BytesMessage {
private KeyValue headers;
private KeyValue properties;
private byte[] body;
public BytesMessageImpl() {
this.headers = OMS.newKeyValue();
this.properties = OMS.newKeyValue();
}
@Override
public byte[] getBody() {
return body;
}
@Override
public BytesMessage setBody(final byte[] body) {
this.body = body;
return this;
}
@Override
public KeyValue headers() {
return headers;
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public Message putHeaders(final String key, final int value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final long value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final double value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final String value) {
headers.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final int value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final long value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final double value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final String value) {
properties.put(key, value);
return this;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
- 用byte[]作为body实现BytesMessage接口
小结
rocketmq的4.2.0版本的AbstractOMSProducer实现了ServiceLifecycle以及MessageFactory,其实现的open-messaging api的版本为0.1.0-alpha,该版本的MessageFactory里头创建message的方法区分了topic和queue,而在最新的0.3.2-alpha-SNAPSHOT版本,已经移除了topic的概念,统一发送到queue。