java

Java의 병렬 처리를 알아보자 - Parallel Stream(병렬 스트림)

Han_5ung 2024. 8. 2. 03:03

때는 2023년 9, 10월...

핀테크 프로젝트를 진행하고 있던 중 API의 호출부터 반환까지 약 3초 이상 걸리는 현상이 발생했다. 사용자 카드에 바코드 번호를 부여해서 반환하는 API였다. 간편 결제 서비스인 건 고사하고 메인 화면에서 3초의 응답 시간을 가진 API를 사용할 수 없어, 디버깅을 진행했고 사용자가 보유한 모든 카드에 바코드 번호를 부여하고 Redis에 부여하는 과정에서 응답 시간이 늘어났다.

 

디버깅을 통해 로직이 순차적으로 진행되면서 시간이 점점 누적되는 것이 가장 큰 문제였다. 카드를 많이 보유한 만큼 시간이 늘어날 수밖에 없는 구조. 바코드 번호를 초기에 생성하고 유지하면 되지 않느냐라는 질문의 대답은 X. 보안상 결제와 직접적인 연관이 있는 바코드 번호가 탈취당했을 경우 언제든 의도치 않은 결제가 이뤄질 수 있으므로 바코드는 일정 시간 후에 초기화되어야 하며 필요시 새로고침으로 초기화가 가능해야 한다고 판단했다.

 

순차적인 로직을 병렬로 처리할 방법을 고안하고 결과적으로 병렬 스트림을 사용해 결과를 얻었지만, 당시 제대로 알고 사용했다는 느낌이 들지 않아 병렬 스트림에 대해 알아보도록 하자.

병렬 스트림 (ParallelStream)이란?

병렬 스트림은 Java 8에서 도입된 Stream 기능 중 하나로, 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 내부적으로 Fork / Join Framework를 사용하여 병렬 처리를 진행하며 개발자가 직접 스레드나 스레드 풀을 생성, 관리하지 않고 메서드로 간단하게 병렬 처리를 진행할 수 있다.

Fork / Join Framwork란?

Fork / Join Framwork란 Java 7에서 도입된 병렬 처리 프레임워크로, 큰 작업을 더 작은 작업으로 분할(Fork)하고, 각각의 작은 작업을 병렬로 처리한 후 결과를 결합(Join)하는 방식으로 동작한다. (분할 정복 알고리즘과 비슷하다.)

Fork / Join Framwork 동작 예시

work stealing (작업 훔치기)

Fork / Join Framwork는 work stealing 기법을 사용한다. 이는 각 스레드가 자신에게 할당된 작업을 처리하면서 다른 스레드의 작업 큐에서 작업을 훔쳐 처리하는 방식이다. work stealing을 이용해서 모든 스레드가 가능한 쉬지 않고 작업을 수행할 수 있다. 역시 이해를 위해 그림을 살펴보자. (담부턴 패드로 그려야겠다..)

.work stealing 동작 예시

1. 각 스레드(A, B)는 작업 큐를 가지고 있으며 분할된 작업을 추가한다.

2. 큐에서 작업을 꺼내 처리한다.

3. 만약 자신의 큐가 비어있으면 다른 작업 큐에서 작업을 훔쳐 온다.

  • B의 작업 큐가 비었기 때문에 A의 작업을 훔쳐온다.

쉽게 말해, 아직도 일하고 있는 친구 도와준다고 생각하면 편하다.(참된 친구다.)

병렬 스트림 예제 코드

병렬 스트림이 어떤 친구인지 알아봤으니 이제 코드로 살펴보도록 하자 비교를 위해 순차적으로 처리하는 for문을 먼저 보자

for문을 이용한 순차 순회

public void cal() throws InterruptedException {
    List<Integer> list = new ArrayList<>();

    for(int i = 1; i <= 100; i++) {
        list.add(i);
    }

    long s = System.currentTimeMillis();
    for(int i : list) {
        System.out.println("현재 숫자 : " + i + " 스레드 이름 : " + Thread.currentThread().getName());
        Thread.sleep(1000);
    }
    long e = System.currentTimeMillis();

    System.out.println((e - s) + "ms");
}

이거 하려고 100초 기다림

100까지의 숫자가 있는 list를 순차적으로 순회한다. 각 요소를 꺼내고 1초를 기다리고 있기 때문에 약 100초 이상이 걸리는 것을 확인할 수 있다. 100,727 ms (100.727초) 또한 스레드 이름을 확인해 보니 Test worker에서 모든 작업을 진행하고 있다. 즉, 하나의 스레드에서 모든 요소를 순회하고 1초씩 기다리면서 시간이 누적된 것이다.

이제 병렬 스트림을 사용해, 스레드 이름과 응답 시간을 살펴보자.

병렬 스트림을 이용한 병렬 처리

public void parallelStream() {
    int availableProcessors = Runtime.getRuntime().availableProcessors();
    System.out.println("사용 가능한 스레드 : " + availableProcessors);

    long s = System.currentTimeMillis();
    list.parallelStream()
            .forEach(i -> {
                try {
                    System.out.println("현재 숫자 : " + i + " 스레드 이름 : " + Thread.currentThread().getName());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
    long e = System.currentTimeMillis();
    System.out.println("총 걸린 시간 : " + (e -s) + "ms");
}

먼저 Runtime.getRuntime(). availableProcessors()를 이용해 사용 가능한 스레드의 수를 확인했을 때 12개가 나온다 이는 Main을 포함한 스레드의 개수이므로 추가적으로 n - 1개의 스레드를 더 사용할 수 있다.

이미지에 보이는 것처럼 Main 스레드인 Test worker와 1 ~ 11까지 총 12개의 스레드가 작업을 처리한다. 그렇다면 12개의 스레드로 100개의 요소를 처리한다면 어떤 결과가 나올지 예상해 봤을 때, 100 / 12 = 8.33 즉, 약 8.3초에 근사한 값이 나올 것이라 예상할 수 있다.

결과는 9.051초로 8.3보다 약간 더 나왔지만 근사한 값이라고 할 수 있을 것이다. 기존 순차 처리에서 병렬 처리로 변경했을 때 100초 -> 9초로 응답 시간이 개선된 것을 확인할 수 있다.

그럼 이번에는 스레드의 수를 고정해서 처리해 보자

public void parallelStream() throws ExecutionException, InterruptedException {
    ForkJoinPool threadPool = new ForkJoinPool(4); // 4개의 스레드를 사용

    long s = System.currentTimeMillis();
    threadPool.submit(() -> {
        list.parallelStream()
                .forEach(i -> {
                    try {
                        System.out.println("현재 숫자 : " + i + " 스레드 이름 : " + Thread.currentThread().getName());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
    }).get();
    threadPool.shutdown();

    long e = System.currentTimeMillis();
    System.out.println("총 걸린 시간 : " + (e -s) + "ms");
}

new ForkJoinPoll(n)을 이용하여 4개의 스레드를 독립적인 스레드 풀로 할당했다. 결과를 예상했을 때, 4개의 스레드로 100개의 요소를 처리하며 100 / 4= 25. 약 25초의 처리 시간이 걸릴 것으로 예상된다.

  • submit() : 주어진 작업을 ForkJoinPoll에 제출한다. submit()은 비동기적으로 작업을 진행하게 된다.
  • get() : submit()으로 제출된 모든 작업이 끝날 때까지 대기한다. submit()이 비동기로 작업을 제출했기 때문에 모든 작업이 끝나기 전까지 다음 단계로 진행되지 않도록 보장하기 위함이다.
  • shutdown() : 더 이상 새로운 작업을 받을 수 없도록 스레드 풀을 종료하고 반환한다. 이는 리소스를 정리하고 사용되지 않는 스레드를 실행시키지 않도록 보장하기 위함이다.

결과를 확인하면 worker 1 ~ 4까지 사용하며 총 4개의 스레드를 사용하여 25823ms -> 25.823초가 소요된 것을 확인할 수 있다.

Parallel Stream 주의사항

1. 요수의 수와 처리 시간

처리하고자 하는 요수의 개수가 적고 처리 시간이 짧다면 되려 반복문을 이용한 순차 처리가 빠를 가능성이 있다.

병렬 스트림을 사용하게 되는 경우 요소들을 분할하고 합치는 과정에서 비용이 발생하기 때문에 반드시 테스트가 필요하다.

2. CPU 바운드와 I/O 바운드

처리하고자 하는 작업이 CPU에 적재되어 처리되느냐, I/O 네트워킹을 진행하느냐를 고려해야 한다. I/O 작업 시 스레드가 block 되기 때문에 스레드는 대기 상태로 진입하여 다른 작업을 수행하지 못한다. 즉, 처리 프로세스를 확인하고 연산 위주의 CPU 바운드 작업인 경우 병렬 스트림이 적절하다. 데이터베이스 접근과 같은 I/O 바운드가 있을 경우 비동기를 통한 멀티 스레드 작업을 진행하는 것이 권장된다.

3. 스레드 풀 공유

병렬 스트림은 독립적인 스레드 풀을 생성하는 것이 아닌 기존 스레드 풀을 공유하게 된다. 즉 미리 설정한 스레드 풀에서 남은 스레드를 가져와 실행하게 된다.

사용 중인 스레드 / 총 스레드 : 8 / 10이라면 병렬 스트림을 사용하더라도 최대 2개의 스레드를 사용하게 되며 모두 사용 중이면 요청을 처리하지 못하고 스레드가 반납될 때까지 기다리게 된다.

 


필자의 경우, 프로젝트 당시 바코드 번호를 생성하고 저장하는 과정에서 redis 통신이 이루어지며 I/O 작업이 진행된다. 결과적으로 성능 평균 2,600ms -> 평균 17ms로 성능 개선의 효과는 봤지만 적절한 선택을 가져가지 못했었다. 해당 부분을 개선하기 위해 다음 글에서는 비동기 통신을 사용하여 해당 작업을 개선해 보겠다.

//당시 사용한 코드
public List<MainCardDto> setBarcodeNum(List<MainCardDto> list, int userId) {
    Faker faker = new Faker(new Locale("ko"));
    long start = System.currentTimeMillis();

    list.parallelStream()
            .forEach(v -> {
                String barcodeNum = makeBarcode(userId, v.getId(), faker);
                v.setBarcodeNum(barcodeNum);
            });

    long end = System.currentTimeMillis() - start;
    log.info("end : {}", end);

    return list.stream()
            .sorted(Comparator.comparing(MainCardDto::getCardOrder))
            .toList();
}