MQ-代码灵活配置(RocketMQ为例)
编写Consumer配置类
@Configuration
public class MqConsumerConfig {
@Value("${demo.rocketmq.nameSrvAddr}")
private String nameSrvAddr;
@Value("${demo.rocketmq.accessKeyId}")
private String accessKeyId;
@Value("${demo.rocketmq.accessKeySecret}")
private String accessKeySecret;
// 广播类型消费组
@Value("${demo.rocketmq.broadcastingGroupId}")
private String broadcastingGroupId;
@Value("${demo.rocketmq.demoGroupId}")
private String demoGroupId;
private Properties getMqProperties() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.AccessKey, accessKeyId);
properties.put(PropertyKeyConst.SecretKey, accessKeySecret); properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
properties.put(PropertyKeyConst.ConsumeThreadNums,20);
properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "4");
return properties;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean demoConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getMqProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID, deviceStatusGroupId);
properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
consumerBean.setProperties(properties);
consumerBean.setSubscriptionTable(new HashMap<>(8));
return consumerBean;
}
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean demoBroadcastingConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getMqProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID, deviceStatusGroupId);
properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
consumerBean.setProperties(properties);
properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
consumerBean.setSubscriptionTable(new HashMap<>(8));
return consumerBean;
}
}
编写Consumer注册器
@Slf4j(topic = "MqLogAspect_C")
@Component
public class DemoMqConsumerRegistry {
private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
private static final PrivacyFilter PRIVACY_FILTER = new PrivacyFilter();
private static final String CLAZZ_FLAG = "class";
@Resource
private ConsumerBean demoBroadcastingConsumer;
public CityResult<Void> register(ConsumerBean consumer,String topic, String tag, Consumer<String> processor) {
registerConsumer(consumer,topic,tag,processor);
return new CityResult<>();
}
public CityResult<Void> register(ConsumerBean consumer,String topic, String tag, BiConsumer<String, String> processor) {
registerBiConsumer(consumer,topic,tag,processor);
return new CityResult<>();
}
public CityResult<Void> registerBroadcasting(String topic, String tag, BiConsumer<String, String> processor) {
registerBiConsumer(demoBroadcastingConsumer,topic,tag,processor);
return new CityResult<>();
}
private void registerConsumer(ConsumerBean consumer,String topic, String tag, Consumer<String> processor){
consumer.subscribe(topic, tag, (message, consumeContext) -> {
startTrace(message);
long start = System.currentTimeMillis();
try {
processor.accept(new String(message.getBody(), Charset.defaultCharset()));
logNormal(processor, message, start);
return Action.CommitMessage;
} catch (Exception e) {
logException(processor, message, start, e);
return Action.ReconsumeLater;
}finally {
endTrace();
}
});
}
private void registerBiConsumer(ConsumerBean consumer,String topic, String tag, BiConsumer<String, String> processor){
consumer.subscribe(topic, tag, (message, consumeContext) -> {
startTrace(message);
long start = System.currentTimeMillis();
try {
processor.accept(message.getTag(), new String(message.getBody(), Charset.defaultCharset()));
logNormal(processor, message, start);
return Action.CommitMessage;
} catch (Exception e) {
logException(processor, message, start, e);
return Action.ReconsumeLater;
}finally {
endTrace();
}
});
}
private void logNormal(Object processor, Message message, long start) {
log.info("{}`{}`{}`{}",
this.getProcessorClazz(processor),
JSON.toJSONString(message, new SerializeFilter[] {PRIVACY_FILTER, BYTE_ARRAY_SERIALIZER}),
true,
System.currentTimeMillis() - start);
}
private void logException(Object processor, Message message, long start, Exception e) {
log.error("{}`{}`{}`{}",
this.getProcessorClazz(processor),
JSON.toJSONString(message, new SerializeFilter[] {PRIVACY_FILTER, BYTE_ARRAY_SERIALIZER}),
e.getMessage(),
System.currentTimeMillis() - start);
}
private String getProcessorClazz(Object processor) {
String processName = null;
String clazzName = processor.getClass().toString();
if (clazzName.startsWith(CLAZZ_FLAG)) {
int clzIdx = clazzName.indexOf(CLAZZ_FLAG);
int innerIdx = clazzName.indexOf("$");
processName = clazzName.substring(clzIdx + 6, innerIdx > 0 ? innerIdx : clazzName.length());
}
if (processName == null) {
processName = processor.getClass().getSimpleName();
}
return processName;
}
}
编写Consumer Listener
@Slf4j
@Component
public class DevicePropertyListener {
@Value("${demo.rocketmq.topic.demo}")
private String topicDemo;
@Resource
private DemoMqConsumerRegistry demoMqConsumerRegistry;
@Resource
private ConsumerBean demoConsumer;
@PostConstruct
public void consumerMessage() {
jointAppMqConsumerRegistry.register(jointAppDevicePropertyConsumer,topicPropertyPost, "*", new Consumer<String>() {
@Override
public void accept(String s) {
// do some things
}
});
}
}
生产者类似 也可以进一步封装
public class MqProducer {
private ProducerBean producer;
private Properties properties;
public void init(String group,String accessKey,String secretKey,String domain ){
this.properties = new Properties();
this.properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
this.properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
this.properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
this.properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, domain);
this.properties.setProperty(PropertyKeyConst.GROUP_ID,group);
this.producer = new ProducerBean();
producer.setProperties(this.properties);
producer.start();
log.info("producer domain:{}, group:{}, accessKey:{}, isStarted: {}",domain, group, accessKey,producer.isStarted());
}
public void shutdown(){
if(null == this.producer ){
return;
}
producer.shutdown();
}
public boolean isStarted(){
return producer.isStarted();
}
public boolean isClosed(){
return producer.isClosed();
}
public CityResult<SendResult> send(Message message) {
try {
SendResult sendResult = producer.send(message);
return new CityResult<>(sendResult);
} catch (Exception e) {
log.error("send MQ error,errMsg:{}", e.getMessage(), e);
return new CityResult<>(JointAppCodes.SERVER_ERROR, e.getMessage(), e.getLocalizedMessage());
}
}
public CityResult<SendResult> sendOneway(Message message) {
try {
producer.sendOneway(message);
return new CityResult<>();
} catch (Exception e) {
log.error("send MQ error,errMsg:{}", e.getMessage(), e);
return new CityResult<>(JointAppCodes.SERVER_ERROR, e.getMessage(), e.getLocalizedMessage());
}
}
public CityResult<SendResult> send(String topic, String body) {
Message message = new Message();
message.setTopic(topic);
message.setBody(body.getBytes());
return send(message);
}
public CityResult<SendResult> send(String topic, String tag, String body) {
Message message = new Message();
message.setTopic(topic);
message.setTag(tag);
message.setBody(body.getBytes());
return send(message);
}
public CityResult<SendResult> sendOneway(String topic, String tag, String body) {
Message message = new Message();
message.setTopic(topic);
message.setTag(tag);
message.setBody(body.getBytes());
return sendOneway(message);
}
}
Q.E.D.