场景
- 监护仪某个动作包含连续的三个指令,由于设备指令未做延迟,协议解包后三个指令几乎同时发送MQ
问题
订阅者消费数据日志发现,三个指令出现乱序的问题
测试代码
mica-mqtt-client
模拟两个线程分别执行发布和订阅逻辑,可以发现日志中出现了 消息消费顺序的异常,切换QOS
无法解决本问题
Java
@Test
@SneakyThrows
public void pub() {
MqttClient client = MqttClient.create().ip("127.0.0.1").port(1883).connectSync();
for (int i = 0; i < 9000; i++) {
client.publish("cms/patient/release", "11111111111111".getBytes(StandardCharsets.UTF_8));
client.publish("cms/patient/receive", "22222222222222".getBytes(StandardCharsets.UTF_8));
Thread.sleep(100);
}
Thread.sleep(100000);
}
@Test
@SneakyThrows
public void sub() {
MqttClient client = MqttClient.create().ip("127.0.0.1").port(1883).connectSync();
client.subscribe(MqttQoS.QOS0, "cms/patient/#",
(context, topic, message, payload) -> log.info(new String(payload)));
Thread.sleep(100000);
}
问题原因
- 网络延迟或抖动可能导致消息到达的顺序不同。
- 客户端处理消息的速度不同,也可能导致顺序问题。
本问题的解决重点应该放到消息的生产者上,消费端对指令的顺序是敏感的,不同的指令对于不同的状态,消费端处理成本偏高
解决方案
协议解包添加判断标识,ABC三个指令,若B指令和A指令发送间隔小于100ms,则使用延迟消息发送,且C指令发送前也需要判断, $delayed/1/cms/patient/receive
``
问题延申
消费端的并发问题
由于不同指令对于的db操作可能导致数据事务异常或数据状态异常的问题, 消费端若需要控制mq消息的执行顺序,此时,可以通过 额外本地队列或者锁的方式实现
下述使用读写锁来解决
readLock
- 特点: 提供一个悲观的读锁,多个线程可以同时获取。
- 用法: 适用于需要确保读操作和写操作互斥时使用。
writeLock
- 特点: 提供一个独占的写锁,只有一个线程可以持有。
- 用法: 适用于需要修改共享资源并确保没有其他线程读或写时使用。
核心利用 ConcurrentHashMap
的 computeIfAbsent
操作,保证线程安全,避免锁的并发创建和获取异常
java
public static final Map<String, StampedLock> ONLINE_LOCK = new ConcurrentHashMap<>();
public void execute(String topic, String msg) {
if (beforeMessageArrived(topic, msg)) {
afterMessageArrived(topic, msg);
TDevice tDevice = isDeviceMsg(msg);
if (null == tDevice) {
return;
}
//每个设备key对应一个锁,用于控制消息执行顺序
String bedKey = tDevice.getBedKey();
StampedLock lock = ONLINE_LOCK.computeIfAbsent(bedKey, k -> new StampedLock());
long stamp = lock.writeLock();
try {
if (tDevice.getProtocolVersion() == V3_PROTOCOL_VERSION) {
v3MessageArrived(topic, tDevice);
} else if (tDevice.getProtocolVersion() == V5_PROTOCOL_VERSION) {
v5MessageArrived(topic, tDevice);
} else if (tDevice.getProtocolVersion() == V7_PROTOCOL_VERSION) {
v7MessageArrived(topic, tDevice);
} else {
log.error(" ->topic:{},msg:{},No match found for the method.", topic, msg);
}
onComplete(topic, msg);
} finally {
lock.unlockWrite(stamp);
}
}
}
反压
emqx
本身不提供反压能力,需要发布订阅双方通过其它方式实现
可以通过定义mq主题,来进行交互,当消费端的本地队列或流量监控超过阈值,则发送给生产端消息,控制生产端的消息发布速率