Skip to content

场景

  • 监护仪某个动作包含连续的三个指令,由于设备指令未做延迟,协议解包后三个指令几乎同时发送MQ

问题

订阅者消费数据日志发现,三个指令出现乱序的问题

测试代码

mica-mqtt-client 模拟两个线程分别执行发布和订阅逻辑,可以发现日志中出现了 消息消费顺序的异常,切换QOS无法解决本问题

Java

@Test
@SneakyThrows
public void pub() {
    MqttClient client = MqttClient.create().ip("127.0.0.1").port(1883).connectSync();
    for (int i = 0; i < 9000; i++) {
        client.publish("cms/patient/release", "11111111111111".getBytes(StandardCharsets.UTF_8));
        client.publish("cms/patient/receive", "22222222222222".getBytes(StandardCharsets.UTF_8));
        Thread.sleep(100);
    }
    Thread.sleep(100000);
}

@Test
@SneakyThrows
public void sub() {
    MqttClient client = MqttClient.create().ip("127.0.0.1").port(1883).connectSync();
    client.subscribe(MqttQoS.QOS0, "cms/patient/#",
            (context, topic, message, payload) -> log.info(new String(payload)));
    Thread.sleep(100000);
}

问题原因

  • 网络延迟或抖动可能导致消息到达的顺序不同。
  • 客户端处理消息的速度不同,也可能导致顺序问题。

本问题的解决重点应该放到消息的生产者上,消费端对指令的顺序是敏感的,不同的指令对于不同的状态,消费端处理成本偏高

解决方案

协议解包添加判断标识,ABC三个指令,若B指令和A指令发送间隔小于100ms,则使用延迟消息发送,且C指令发送前也需要判断, $delayed/1/cms/patient/receive

``

问题延申

消费端的并发问题

由于不同指令对于的db操作可能导致数据事务异常或数据状态异常的问题, 消费端若需要控制mq消息的执行顺序,此时,可以通过 额外本地队列或者锁的方式实现

下述使用读写锁来解决

  • readLock

    • 特点: 提供一个悲观的读锁,多个线程可以同时获取。
    • 用法: 适用于需要确保读操作和写操作互斥时使用。
  • writeLock

    • 特点: 提供一个独占的写锁,只有一个线程可以持有。
    • 用法: 适用于需要修改共享资源并确保没有其他线程读或写时使用。

核心利用 ConcurrentHashMapcomputeIfAbsent操作,保证线程安全,避免锁的并发创建和获取异常

java

public static final Map<String, StampedLock> ONLINE_LOCK = new ConcurrentHashMap<>();

public void execute(String topic, String msg) {
    if (beforeMessageArrived(topic, msg)) {
        afterMessageArrived(topic, msg);
        TDevice tDevice = isDeviceMsg(msg);
        if (null == tDevice) {
            return;
        }
        //每个设备key对应一个锁,用于控制消息执行顺序
        String bedKey = tDevice.getBedKey();
        StampedLock lock = ONLINE_LOCK.computeIfAbsent(bedKey, k -> new StampedLock());
        long stamp = lock.writeLock();
        try {
            if (tDevice.getProtocolVersion() == V3_PROTOCOL_VERSION) {
                v3MessageArrived(topic, tDevice);
            } else if (tDevice.getProtocolVersion() == V5_PROTOCOL_VERSION) {
                v5MessageArrived(topic, tDevice);
            } else if (tDevice.getProtocolVersion() == V7_PROTOCOL_VERSION) {
                v7MessageArrived(topic, tDevice);
            } else {
                log.error(" ->topic:{},msg:{},No match found for the method.", topic, msg);
            }
            onComplete(topic, msg);
        } finally {
            lock.unlockWrite(stamp);
        }
    }
}

反压

emqx 本身不提供反压能力,需要发布订阅双方通过其它方式实现

可以通过定义mq主题,来进行交互,当消费端的本地队列或流量监控超过阈值,则发送给生产端消息,控制生产端的消息发布速率