MQ-代码灵活配置(RocketMQ为例)

编写Consumer配置类

@Configuration
public class MqConsumerConfig {

    @Value("${demo.rocketmq.nameSrvAddr}")
    private String nameSrvAddr;
    @Value("${demo.rocketmq.accessKeyId}")
    private String accessKeyId;
    @Value("${demo.rocketmq.accessKeySecret}")
    private String accessKeySecret;
    // 广播类型消费组
    @Value("${demo.rocketmq.broadcastingGroupId}")
    private String broadcastingGroupId;
    @Value("${demo.rocketmq.demoGroupId}")
    private String demoGroupId;

    private Properties getMqProperties() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.AccessKey, accessKeyId);
        properties.put(PropertyKeyConst.SecretKey, accessKeySecret); properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
properties.put(PropertyKeyConst.ConsumeThreadNums,20);
 properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, "4");
        return properties;
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean demoConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = getMqProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, deviceStatusGroupId);
        properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
        consumerBean.setProperties(properties);
        consumerBean.setSubscriptionTable(new HashMap<>(8));
        return consumerBean;
    }

@Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean demoBroadcastingConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = getMqProperties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, deviceStatusGroupId);
        properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
        consumerBean.setProperties(properties);
        properties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        consumerBean.setSubscriptionTable(new HashMap<>(8));
        return consumerBean;
    }
}

编写Consumer注册器

@Slf4j(topic = "MqLogAspect_C")
@Component
public class DemoMqConsumerRegistry {

    private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
    private static final PrivacyFilter PRIVACY_FILTER = new PrivacyFilter();
    private static final String CLAZZ_FLAG = "class";

    @Resource
    private ConsumerBean demoBroadcastingConsumer;

    public CityResult<Void> register(ConsumerBean consumer,String topic, String tag, Consumer<String> processor) {
        registerConsumer(consumer,topic,tag,processor);
        return new CityResult<>();
    }

    public CityResult<Void> register(ConsumerBean consumer,String topic, String tag, BiConsumer<String, String> processor) {
        registerBiConsumer(consumer,topic,tag,processor);
        return new CityResult<>();
    }

    public CityResult<Void> registerBroadcasting(String topic, String tag, BiConsumer<String, String> processor) {
        registerBiConsumer(demoBroadcastingConsumer,topic,tag,processor);
        return new CityResult<>();
    }

    private void registerConsumer(ConsumerBean consumer,String topic, String tag, Consumer<String> processor){
        consumer.subscribe(topic, tag, (message, consumeContext) -> {
            startTrace(message);
            long start = System.currentTimeMillis();
            try {
                processor.accept(new String(message.getBody(), Charset.defaultCharset()));
                logNormal(processor, message, start);
                return Action.CommitMessage;
            } catch (Exception e) {
                logException(processor, message, start, e);
                return Action.ReconsumeLater;
            }finally {
                endTrace();
            }
        });
    }

    private void registerBiConsumer(ConsumerBean consumer,String topic, String tag, BiConsumer<String, String> processor){
        consumer.subscribe(topic, tag, (message, consumeContext) -> {
            startTrace(message);
            long start = System.currentTimeMillis();
            try {
                processor.accept(message.getTag(), new String(message.getBody(), Charset.defaultCharset()));
                logNormal(processor, message, start);
                return Action.CommitMessage;
            } catch (Exception e) {
                logException(processor, message, start, e);
                return Action.ReconsumeLater;
            }finally {
                endTrace();
            }
        });
    }

    private void logNormal(Object processor, Message message, long start) {
        log.info("{}`{}`{}`{}",
                this.getProcessorClazz(processor),
                JSON.toJSONString(message, new SerializeFilter[] {PRIVACY_FILTER, BYTE_ARRAY_SERIALIZER}),
                true,
                System.currentTimeMillis() - start);
    }

    private void logException(Object processor, Message message, long start, Exception e) {
        log.error("{}`{}`{}`{}",
                this.getProcessorClazz(processor),
                JSON.toJSONString(message, new SerializeFilter[] {PRIVACY_FILTER, BYTE_ARRAY_SERIALIZER}),
                e.getMessage(),
                System.currentTimeMillis() - start);
    }

    private String getProcessorClazz(Object processor) {
        String processName = null;
        String clazzName = processor.getClass().toString();
        if (clazzName.startsWith(CLAZZ_FLAG)) {
            int clzIdx = clazzName.indexOf(CLAZZ_FLAG);
            int innerIdx = clazzName.indexOf("$");
            processName = clazzName.substring(clzIdx + 6, innerIdx > 0 ? innerIdx : clazzName.length());
        }
        if (processName == null) {
            processName = processor.getClass().getSimpleName();
        }
        return processName;
    }
}

编写Consumer Listener

@Slf4j
@Component
public class DevicePropertyListener {

    @Value("${demo.rocketmq.topic.demo}")
    private String topicDemo;
    @Resource
    private DemoMqConsumerRegistry demoMqConsumerRegistry;
    @Resource
    private ConsumerBean demoConsumer;
    @PostConstruct
    public void consumerMessage() {
        jointAppMqConsumerRegistry.register(jointAppDevicePropertyConsumer,topicPropertyPost, "*", new Consumer<String>() {
            @Override
            public void accept(String s) {
                // do some things
            }
        });
    }
}

生产者类似 也可以进一步封装

public class MqProducer {

    private ProducerBean producer;

    private Properties properties;

    public void init(String group,String accessKey,String secretKey,String domain ){
        this.properties = new Properties();
        this.properties.setProperty(PropertyKeyConst.MQType, MQType.METAQ.name());
        this.properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        this.properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        this.properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, domain);
        this.properties.setProperty(PropertyKeyConst.GROUP_ID,group);
        this.producer = new ProducerBean();
        producer.setProperties(this.properties);
        producer.start();
        log.info("producer domain:{}, group:{}, accessKey:{}, isStarted: {}",domain, group, accessKey,producer.isStarted());
    }

    public void shutdown(){
        if(null == this.producer ){
            return;
        }
        producer.shutdown();
    }

    public boolean isStarted(){
        return producer.isStarted();
    }

    public boolean isClosed(){
        return producer.isClosed();
    }

    public CityResult<SendResult> send(Message message) {
        try {
            SendResult sendResult = producer.send(message);
            return new CityResult<>(sendResult);
        } catch (Exception e) {
            log.error("send MQ error,errMsg:{}", e.getMessage(), e);
            return new CityResult<>(JointAppCodes.SERVER_ERROR, e.getMessage(), e.getLocalizedMessage());
        }
    }

    public CityResult<SendResult> sendOneway(Message message) {
        try {
            producer.sendOneway(message);
            return new CityResult<>();
        } catch (Exception e) {
            log.error("send MQ error,errMsg:{}", e.getMessage(), e);
            return new CityResult<>(JointAppCodes.SERVER_ERROR, e.getMessage(), e.getLocalizedMessage());
        }
    }

    public CityResult<SendResult> send(String topic, String body) {
        Message message = new Message();
        message.setTopic(topic);
        message.setBody(body.getBytes());
        return send(message);
    }

    public CityResult<SendResult> send(String topic, String tag, String body) {
        Message message = new Message();
        message.setTopic(topic);
        message.setTag(tag);
        message.setBody(body.getBytes());
        return send(message);
    }

    public CityResult<SendResult> sendOneway(String topic, String tag, String body) {
        Message message = new Message();
        message.setTopic(topic);
        message.setTag(tag);
        message.setBody(body.getBytes());
        return sendOneway(message);
    }
}

Q.E.D.


爱调味品的大哥