본문 바로가기
Language/Java

Producer-Consumer 패턴

by 태하팍 2021. 3. 18.
반응형
더보기

Runnable과 Callable의 차이점은? acet.pe.kr/819

> 상황
성능이 좋지 않은 부분이 발생하여 그 부분을 해소하기 위해 멀티 쓰레드로 처리하려고 했다.
그런데 멀티 쓰레드 처리 후 나오는 리스트형 데이터가 재사용 되어지는 문제가 발생.

> 결론
생산자-소비자 패턴을 사용하기로 함.
생상자 - 멀티쓰레드 처리 및 리스트형 데이터를 블라킹 큐에 넣어줌. 넣기만 함!
소비자 - BlockingQueue에 넣어진 것들을 사용함. 사용만 함!

> 간단한 프로젝트 만들어서 테스트 함.

@SpringBootTest
class DemoApplicationTests {

    @Test
    void producerAndConsumerMethodTest() {

        BlockingQueue q = new LinkedBlockingQueue(); // 공용 큐 하나를 만들어준다.
        ProducerAndConsumer pc = new ProducerAndConsumer();  // 클래스에서 큐? 메소드에서 큐?

        // call method
        pc.producer(q); // 메소드 안에서 thread 처리
        pc.consumer(q); // 공용 queue를 사용하는지 보자!


    }
}
public class ProducerAndConsumer {

    Logger logger = LoggerFactory.getLogger(Caller.class);

    public void producer(BlockingQueue queue) {

        List testData = new ArrayList();
        final ExecutorService executor = Executors.newFixedThreadPool(10);

        testData.add("1");
        testData.add("2");
        testData.add("3");
        testData.add("4");
        testData.add("5");
        testData.add("6");
        testData.add("7");
        testData.add("8");
        testData.add("9");
        testData.add("10");

        for (Object item : testData) {
            Future<?> future = executor.submit(() -> {
                try {
                    queue.put(settingValue(item.toString()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

            try {
                // 결과가 리턴되기를 기다립니다. null이면 정상, 비정상 시 예외 발생
                System.out.println("result : " + future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        logger.info("producer >> queue size=" + queue.size());
        executor.shutdown();
        logger.info("end? producer >> queue size=" + queue.size());

    }

    public void consumer(BlockingQueue queue) {

        logger.info("consumer >> queue size=" + queue.size());


        int i=0;

        try {
            while(true){
                if(queue.size() == 0) break;
                TestVo testVo = (TestVo) queue.take();
                logger.info(i+"->"+testVo.getId() + "/" + testVo.getName());
                i++;


            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public TestVo settingValue(String item) {
        logger.info("item =" + item);
        TestVo tv = new TestVo();
        tv.setId(item);
        tv.setName("th-" + item);
        return tv;
    }


}

 

> 내용
패턴을 찾아보면 다들 Class별로 나눴는데 그냥 method로 나눔.
소스의 핵심은 멀티 쓰레드 처리 및 BlockingQueue 사용.

핵심 부분!
future.get(); 가 있고 없고의 차이가 중요!

// 결과가 리턴되기를 기다립니다. null이면 정상, 비정상 시 예외 발생
System.out.println("result : " + future.get());


우선 소스를 이해하기 위해서는?
쓰레드를 사용하기 위해 ExecutorService를 사용!

final ExecutorService executor = Executors.newFixedThreadPool(10);

그리고 lamda식을 사용하여 쓰레드를 수행 함.

Future<?> future = executor.submit(() -> {
    try {
        queue.put(settingValue(item.toString()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});

여기에서 ExecutorService 메소드 중 submit()과 excute()가 있는데 리턴값이 있는 submit을 사용하여 쓰레드 작업 완료 처리를 하였다.
리턴값은 Future객체를 리턴.

그리고 위에서 중요하다고 했던 future.get() 함수의 리턴에 대해 알아보면
Future의 get() 메소드를 호출하면 쓰레드가 작업을 완료할 때까지 블라킹 했다가 작업이 완료되면 처리 결과를 리턴한다.
아래의 표를 보자.
       ! 쓰레드 작업 처리 완료 후 리턴 타입이 null이면 정상! Exception이 발생하면 예외처리!

리턴 설명


 참고 : m.blog.naver.com/mals93/220743747346 (설명도 좋고 위의 사진을 퍼옴) 

마지막으로 결과를 통해 알아보자!

future.get()을 사용하지 않았을 경우

future.get()을 사용하지 않으면 언제 Thread 완료 시점을 보장 못한다. 

 

future.get()을 사용 했을 경우

future.get()을 통해 Thread가 전부 처리 된 후 Consumer가 수행.

 간단한 테스트를 해보았다.
실제적인 현업 소스에 적용을 해보자~
끝!

 

반응형