需求
设备配置监听, 实时更新设备配置到数据库中, 某些情况下,设备监听频繁,避免多次执行更新sql
后来者抢占
再不引入异步或队列单独消费的基础上,利用后来者抢占的思路解决此场景问题
- 每个设备key都维护一个 配置列表
- 判断是否已有线程在处理,是否要终止此线程
- 入库前等待一个时间窗口(比如1s), 消费的时候总是取list中的最后一条, 消费完成后清空这个list
实现
Java
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Slf4j
public class LockTests {
private final Map<Integer, List<Object>> map = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, AtomicReference<Thread>> consumerRefs = new ConcurrentHashMap<>();
@Test
@SneakyThrows
public void test() {
Thread thread1 = new Thread(() -> {
long now = System.currentTimeMillis();
System.out.println("线程1生产 " + now);
exe(1, now);
});
thread1.setName("a");
thread1.start();
Thread thread2 = new Thread(() -> {
LockSupport.parkNanos(30_000L); // 微小延迟确保第二个线程稍后执行
long now = System.currentTimeMillis();
System.out.println("线程2生产 " + now);
exe(1, now);
});
thread2.setName("b");
thread2.start();
Thread thread3 = new Thread(() -> {
long now = System.currentTimeMillis();
System.out.println("线程3生产 " + now);
exe(2, now);
});
thread3.setName("3");
thread3.start();
Thread thread4 = new Thread(() -> {
LockSupport.parkNanos(30_000L); // 微小延迟确保第二个线程稍后执行
long now = System.currentTimeMillis();
System.out.println("线程4生产 " + now);
exe(2, now);
});
thread4.setName("4");
thread4.start();
Thread.sleep(122000); // 等待消费
}
private void exe(Integer k, Object time) {
map.compute(k, (key, list) -> {
if (list == null) {
list = new ArrayList<>();
}
list.add(time);
return list;
});
consumerRefs.computeIfAbsent(k, k1 -> new AtomicReference<>());
AtomicReference<Thread> ref = consumerRefs.get(k);
if (!ref.compareAndSet(null, Thread.currentThread())) {
System.out.println(Thread.currentThread().getName() + "<UNK>");
return; // 已有线程在处理该 key
}
try {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
map.computeIfPresent(k, (key, list) -> {
if (!list.isEmpty()) {
System.out.println("线程 " + Thread.currentThread().getName() + " 消费 " + list.getLast());
list.clear();
}
return list;
});
} finally {
ref.set(null); // 释放消费权
}
}
}