Skip to content

需求

设备配置监听, 实时更新设备配置到数据库中, 某些情况下,设备监听频繁,避免多次执行更新sql

后来者抢占

再不引入异步或队列单独消费的基础上,利用后来者抢占的思路解决此场景问题

  1. 每个设备key都维护一个 配置列表
  2. 判断是否已有线程在处理,是否要终止此线程
  3. 入库前等待一个时间窗口(比如1s), 消费的时候总是取list中的最后一条, 消费完成后清空这个list

实现

Java

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Slf4j
public class LockTests {
    private final Map<Integer, List<Object>> map = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, AtomicReference<Thread>> consumerRefs = new ConcurrentHashMap<>();

    @Test
    @SneakyThrows
    public void test() {
        Thread thread1 = new Thread(() -> {
            long now = System.currentTimeMillis();
            System.out.println("线程1生产 " + now);
            exe(1, now);
        });
        thread1.setName("a");
        thread1.start();

        Thread thread2 = new Thread(() -> {
            LockSupport.parkNanos(30_000L); // 微小延迟确保第二个线程稍后执行
            long now = System.currentTimeMillis();
            System.out.println("线程2生产 " + now);
            exe(1, now);
        });
        thread2.setName("b");
        thread2.start();

        Thread thread3 = new Thread(() -> {
            long now = System.currentTimeMillis();
            System.out.println("线程3生产 " + now);
            exe(2, now);
        });
        thread3.setName("3");
        thread3.start();

        Thread thread4 = new Thread(() -> {
            LockSupport.parkNanos(30_000L); // 微小延迟确保第二个线程稍后执行
            long now = System.currentTimeMillis();
            System.out.println("线程4生产 " + now);
            exe(2, now);
        });
        thread4.setName("4");
        thread4.start();


        Thread.sleep(122000); // 等待消费
    }

    private void exe(Integer k, Object time) {
        map.compute(k, (key, list) -> {
            if (list == null) {
                list = new ArrayList<>();
            }
            list.add(time);
            return list;
        });
        consumerRefs.computeIfAbsent(k, k1 -> new AtomicReference<>());

        AtomicReference<Thread> ref = consumerRefs.get(k);

        if (!ref.compareAndSet(null, Thread.currentThread())) {
            System.out.println(Thread.currentThread().getName() + "<UNK>");
            return; // 已有线程在处理该 key
        }
        try {
            LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
            map.computeIfPresent(k, (key, list) -> {
                if (!list.isEmpty()) {
                    System.out.println("线程 " + Thread.currentThread().getName() + " 消费 " + list.getLast());
                    list.clear();
                }
                return list;
            });
        } finally {
            ref.set(null); // 释放消费权
        }
    }
}