OpenSource/Redis

DelayQueue란? (feat. Java표준, Redis)

태하팍 2025. 11. 24. 16:20
반응형

Blocking Queue&DelayQueue

세상에나 마상에나!
Queue는 우리가 보통 pub/sub을 하는 블로킹 큐(Blocking Queue)와 블로킹 지연큐(Delay Queue) 2가지가 있습니다.

  • Blocking Queue
    • 데이터가 Queue에 들어오는 즉시 Consumer가 가져갑니다.
    • Queue가 비어 있으면 Consumer는 새로운 항목이 들어올 때까지 자동으로 대기(Blocking) 합니다.
    • 보통 Producer–Consumer 패턴, 작업 큐, 비동기 처리 등에 사용합니다.
  • Delay Queue(=지연 큐, DelayQueue)
    • Queue에 항목이 들어왔더라도, 각 객체에 설정된 지연시간(Delay Time)이 지나야만 Consumer가 가져갈 수 있습니다.
    • 아직 지연시간이 남았다면 Consumer는 해당 항목이 만료될 때까지 대기합니다.
    • 보통 지연 작업, 예약 작업 스케줄링에 사용 됩니다.
      • 예: “2시간 후 티켓 복구”, “10분 후 취소 처리” 등에 사용합니다.

Java 표준 DelayQueue

tip. psvm+Enter = IntelliJ에서 main을 만들어줍니다.

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayQueuePlay implements Delayed {
    private String tiketId;
    private long expireTime;

    public DelayQueuePlay(String tiketId, long delay, TimeUnit unit) {
        this.tiketId = tiketId;
        this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long diff = expireTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return Long.compare(
                this.getDelay(TimeUnit.MILLISECONDS),
                o.getDelay(TimeUnit.MILLISECONDS)
        );
    }

    public String getTiketId() {
        return tiketId;
    }

    private static void recoveryTicket(String tiketId) {
        System.out.println("ticket No."+tiketId+"의 대한 티켓이 복구 되었습니다.");
    }

    public static void main(String[] args) throws Exception {
        DelayQueue<DelayQueuePlay> queue = new DelayQueue<>();
        System.out.println("티켓이 사용되었습니다.");
        queue.offer(new DelayQueuePlay("ticket-123", 10, TimeUnit.SECONDS));

        while (true) {
            DelayQueuePlay task = queue.take();
            recoveryTicket(task.getTiketId());
        }
    }
}

10초 후에 사용된 티켓이 복구가 됩니다.
티켓 사용 후 time지정이 가능(스케줄러) 합니다.

10초 후에 티켓이 복구 됩니다:)

Redis 기반 DelayedQueue

Redisson의 RDelayedQueue는 위의 Java의 DelayQueue와 개념이 같지만
Redis 기반의 분산환경에서도 안전하게 동작 합니다.

org.redisson.api

Interface RDelayedQueue<V>

https://github.com/redisson/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonDelayedQueue.java

내부 동작 원리는 아래 참고 사이트들을 기반으로 정리된 내용입니다.

Redisson은 다양한 분산 Lock, Delay Queue 등 고급 기능을 제공하는 Redis 기반 Java 클라이언트입니다.
특히 Redisson의 지연 큐 기능인 RDelayedQueue를 사용하면 지연 실행이 필요한 작업을 쉽게 구현할 수 있습니다.

RDelayedQueue의 내부 동작 원리

Redisson의 Delay Queue(RDelayedQueue)는 Redis SortedSet(ZSET) 을 기반으로 구현되어 있습니다.
SortedSet은 각 요소에 score(정렬 기준 숫자) 를 부여할 수 있는 자료구조이며,
Redisson은 이 score를 실행 예정 시각(timestamp) 으로 사용합니다.

RQueue<String> destinationQueue = redisson.getQueue("anyQueue");

RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);

// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);

// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

 

public static void main(String[] args) {
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress("redis://10.23.3.24:7000")
                .addNodeAddress("redis://10.23.3.24:7001")
                .addNodeAddress("redis://10.23.3.24:7002")
        RedissonClient redisson = Redisson.create(config);
        RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue2");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
        new Thread() {
            public void run() {
                while(true) {
                    try {
                        String take = blockingQueue.take();
                        System.out.println("take:"+take);
                        long time = Long.valueOf(take.split("---")[0]);                        
                        System.out.println("delay:"+(System.currentTimeMillis()-time-2*60*1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
        }.start();

        for(int i=1;i<=100000;i++) {            
            long time = System.currentTimeMillis();
            delayedQueue.offer(time+"---"+i, 2, TimeUnit.MINUTES);
        }
        delayedQueue.destroy();
    }


1. 지연 작업 등록

  • 작업이 등록되면 Redisson은
  • score = 현재시간 + delay 형태로 실행 예정 시각을 score로 하여 SortedSet에 저장합니다.

2. 만료된 작업 탐색(=timer based)

  • Redisson 내부의 타이머(HashedWheelTimer) 가 주기적으로 실행되어명령을 사용해 지금 실행해야 할 작업만 조회합니다.
  • ZRANGEBYSCORE timeoutSet 0 now LIMIT 0 N
  • 전체 SortedSet을 매번 전부 스캔하지 않고,
  • 정렬된 구조의 앞부분(만료된 범위) 만 읽기 때문에 매우 효율적입니다.

3. 만료 작업 이동

  • 조회된 작업들은 SortedSet에서 제거(ZREM)된 뒤
  • 준비 큐(RBlockingQueue 등) 로 이동합니다.
  • 준비 큐는 Redis List 기반의 블로킹 큐이므로
  • 소비자는 take() 호출 시 작업이 들어오는 즉시 처리할 수 있습니다.

4. 여러 서버에서도 중복 처리 없이 안전

  • Redis의 ZSET, LIST 연산은 원자적이기 때문에
    여러 애플리케이션 인스턴스가 동시에 사용해도 동일 작업이 중복 처리되지 않습니다.

 

참고 :

https://leapcell.io/blog/redis-delayed-queues-made-simple
https://github.com/redisson/redisson/blob/master/README.md
https://gitee.com/big-black-fish/redisson-wiki/blob/master/7.-Distributed-collections.md#715-delayed-queue
https://inma.tistory.com/198

https://www.javadoc.io/doc/org.redisson/redisson/3.6.0/index.html

https://www.javadoc.io/doc/org.redisson/redisson/3.6.0/org/redisson/api/RBlockingQueue.html

https://www.jianshu.com/p/eae22e9ee9d8

https://dzone.com/articles/distributed-java-queues-on-top-of-redis?utm_source=chatgpt.com

반응형