DelayQueue란? (feat. Java표준, Redis)
Blocking Queue&DelayQueue
세상에나 마상에나!
Queue는 우리가 보통 pub/sub을 하는 블로킹 큐(Blocking Queue)와 블로킹 지연큐(Delay Queue) 2가지가 있습니다.
- Blocking Queue
- 데이터가 Queue에 들어오는 즉시 Consumer가 가져갑니다.
- Queue가 비어 있으면 Consumer는 새로운 항목이 들어올 때까지 자동으로 대기(Blocking) 합니다.
- 보통 Producer–Consumer 패턴, 작업 큐, 비동기 처리 등에 사용합니다.
- Delay Queue(=지연 큐, DelayQueue)
- Queue에 항목이 들어왔더라도, 각 객체에 설정된 지연시간(Delay Time)이 지나야만 Consumer가 가져갈 수 있습니다.
- 아직 지연시간이 남았다면 Consumer는 해당 항목이 만료될 때까지 대기합니다.
- 보통 지연 작업, 예약 작업 스케줄링에 사용 됩니다.
- 예: “2시간 후 티켓 복구”, “10분 후 취소 처리” 등에 사용합니다.
Java 표준 DelayQueue
tip. psvm+Enter = IntelliJ에서 main을 만들어줍니다.
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
class DelayQueuePlay implements Delayed {
private String tiketId;
private long expireTime;
public DelayQueuePlay(String tiketId, long delay, TimeUnit unit) {
this.tiketId = tiketId;
this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
long diff = expireTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(
this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS)
);
}
public String getTiketId() {
return tiketId;
}
private static void recoveryTicket(String tiketId) {
System.out.println("ticket No."+tiketId+"의 대한 티켓이 복구 되었습니다.");
}
public static void main(String[] args) throws Exception {
DelayQueue<DelayQueuePlay> queue = new DelayQueue<>();
System.out.println("티켓이 사용되었습니다.");
queue.offer(new DelayQueuePlay("ticket-123", 10, TimeUnit.SECONDS));
while (true) {
DelayQueuePlay task = queue.take();
recoveryTicket(task.getTiketId());
}
}
}
10초 후에 사용된 티켓이 복구가 됩니다.
티켓 사용 후 time지정이 가능(스케줄러) 합니다.

Redis 기반 DelayedQueue

Redisson의 RDelayedQueue는 위의 Java의 DelayQueue와 개념이 같지만
Redis 기반의 분산환경에서도 안전하게 동작 합니다.
Interface RDelayedQueue<V>
내부 동작 원리는 아래 참고 사이트들을 기반으로 정리된 내용입니다.
Redisson은 다양한 분산 Lock, Delay Queue 등 고급 기능을 제공하는 Redis 기반 Java 클라이언트입니다.
특히 Redisson의 지연 큐 기능인 RDelayedQueue를 사용하면 지연 실행이 필요한 작업을 쉽게 구현할 수 있습니다.
RDelayedQueue의 내부 동작 원리
Redisson의 Delay Queue(RDelayedQueue)는 Redis SortedSet(ZSET) 을 기반으로 구현되어 있습니다.
SortedSet은 각 요소에 score(정렬 기준 숫자) 를 부여할 수 있는 자료구조이며,
Redisson은 이 score를 실행 예정 시각(timestamp) 으로 사용합니다.
RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
// move object to destinationQueue in 10 seconds
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// move object to destinationQueue in 1 minute
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
public static void main(String[] args) {
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://10.23.3.24:7000")
.addNodeAddress("redis://10.23.3.24:7001")
.addNodeAddress("redis://10.23.3.24:7002")
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue2");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
new Thread() {
public void run() {
while(true) {
try {
String take = blockingQueue.take();
System.out.println("take:"+take);
long time = Long.valueOf(take.split("---")[0]);
System.out.println("delay:"+(System.currentTimeMillis()-time-2*60*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
for(int i=1;i<=100000;i++) {
long time = System.currentTimeMillis();
delayedQueue.offer(time+"---"+i, 2, TimeUnit.MINUTES);
}
delayedQueue.destroy();
}
1. 지연 작업 등록
- 작업이 등록되면 Redisson은
- score = 현재시간 + delay 형태로 실행 예정 시각을 score로 하여 SortedSet에 저장합니다.
2. 만료된 작업 탐색(=timer based)
- Redisson 내부의 타이머(HashedWheelTimer) 가 주기적으로 실행되어명령을 사용해 지금 실행해야 할 작업만 조회합니다.
- ZRANGEBYSCORE timeoutSet 0 now LIMIT 0 N
- 전체 SortedSet을 매번 전부 스캔하지 않고,
- 정렬된 구조의 앞부분(만료된 범위) 만 읽기 때문에 매우 효율적입니다.
3. 만료 작업 이동
- 조회된 작업들은 SortedSet에서 제거(ZREM)된 뒤
- 준비 큐(RBlockingQueue 등) 로 이동합니다.
- 준비 큐는 Redis List 기반의 블로킹 큐이므로
- 소비자는 take() 호출 시 작업이 들어오는 즉시 처리할 수 있습니다.
4. 여러 서버에서도 중복 처리 없이 안전
- Redis의 ZSET, LIST 연산은 원자적이기 때문에
여러 애플리케이션 인스턴스가 동시에 사용해도 동일 작업이 중복 처리되지 않습니다.
참고 :
https://leapcell.io/blog/redis-delayed-queues-made-simple
https://github.com/redisson/redisson/blob/master/README.md
https://gitee.com/big-black-fish/redisson-wiki/blob/master/7.-Distributed-collections.md#715-delayed-queue
https://inma.tistory.com/198
https://www.javadoc.io/doc/org.redisson/redisson/3.6.0/index.html
https://www.javadoc.io/doc/org.redisson/redisson/3.6.0/org/redisson/api/RBlockingQueue.html
https://www.jianshu.com/p/eae22e9ee9d8
https://dzone.com/articles/distributed-java-queues-on-top-of-redis?utm_source=chatgpt.com