사용자가 getPrice API를 호출하면 비동기 동작이 완료될 때까지 1초동안 블록된다.
이제 이 API를 비동기로 만들어보자.
16.2.1 동기 메서드를 비동기 메서드로 변환
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
비동기 계산과 완료 결과를 포함하는 CompletableFuture 인스턴스를 만들었다.
그리고 계산 결과를 기다리지 않고 결과를 포함할 Future 인스턴트를 바로 반환했다.
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //제품 가격 요청
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
//제품의 가격을 계산하는 동안
doSomethingElse();
//다른 상점 검색 등 작업 수행
try {
double price = future.get(); //가격정보를 받을때까지 블록
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
가격 계산 API는 비동기로 처리되므로 즉시 Future를 반환하고, 그 사이에 다른 작업을 처리할 수 있다.
다른작업이 끝났다면 Future의 get 메서드를 호출해서 가격정보를 받을 때까지 대기한다.
16.6.2 에러 처리 방법
위 로직에서 가격을 계산하는 동안 에러가 발생한다면 어떻게 될까?
예외가 발생하면 해당 스레드에만 영향을 미치기 때문에 클라이언트는 get 메서드가 반환될 때까지 영원히 기다릴 수도 있다.
따라서 타임아웃을 활용해 예외처리를 하고, completeExceptionally 메서드를 이용해 CompletableFuture 내부에서 발생한 에외를 클라이언트로 전달해야 한다.
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch {
futurePrice.completeExceptionally(ex); //에러를 포함시켜 Future를 종료
}
}).start();
return futurePrice;
}
팩토리 메서드 supplyAsync로 CompletableFuture 만들기
좀 더 간단하게 CompletableFuture를 만드는 방법도 있다.
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
supplyAsync 메서드는 Supplier를 인수로 받아서 CompletableFuture를 반환한다.
ForkJoinPool의 Executor 중 하나가 Supplier를 실행하며, 두 번째 인수로 다른 Executor를 지정할 수도 있다.
16.3 비블록 코드 만들기
다음과 같은 상점 리스트가 있다.
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
그리고 다음처럼 제품명을 입력하면 상점 이름과 제품 가격 문자열을 반환하는 List를 구현해야 한다.
public List<String> findPrices(String product);
스트림을 이용하면 원하는 동작을 구현할 수 있다.
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
하지만 네 개의 상점에서 각각 가격을 검색하는 동안 블록되는 시간이 발생할 것이다.
16.3.1 병렬 스트림으로 요청 병렬화하기
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
이제 네 개의 상점에서 병렬로 검색이 진행되므로 시간은 하나의 상점에서 가격을 검색하는 정도만 소요될 것이다.
하지만 CompletableFuture를 사용한 결과는 순차 방식보단 빠르지만 병렬 스트림보단 느리다. 어떻게 개선할 수 있을까?
16.3.3 더 확장성이 좋은 해결방법
병렬 스트림 버전에서는 4개의 스레드에 4개의 작업을 병렬로 수행하면서 검색 시간을 최소화했다.
하지만 작업이 5개가 된다면, 4개 중 하나의 스레드가 완료된 후에 추가로 5번째 질의을 수행할 수 있다.
CompletableFuture는 병렬 스트림에 비해 작업에 이용할 수 있는 Executor를 지정할 수 있다는 장점이 있다.
16.3.4 커스텀 Executor 사용하기
실제로 필요한 작업량을 고려한 풀에서 관리하는 스레드 수에 맞게 Executor를 만들면 좋을 것 같다.
풀에서 관리하는 스레드 수는 어떻게 결정할 수 있을까?
스레드 풀 크기조절
N_thread_ = N_cpu_ * U_cpu_ * (1 + W/C)
- N_cpu_ : Runtime.getRuntime().availableProcessors()가 반환하는 코어 수
- U_cpu_ : 0과 1 사이의 값을 갖는 CPU 활용 비율
- W/C : 대기시간과 계산시간의 비율
한 상점에 하나의 스레드가 할당될 수 있도록, 상점 수만큼 Executor를 설정한다.
서버 크래시 방지를 위해 하나의 Executor에서 사용할 스레드의 최대 개수는 100 이하로 설정한다.
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), //상점 수만큼의 스레드를 갖는 풀 생성(0~100 사이)
new ThreadFactory() {
public Thread new Thread(Runnable r) {
Thread t = new Thread(r);
t.setDeamon(true);
return t;
}
});
데몬 스레드를 사용하면 자바 프로그램이 종료될 때 강제로 스레드 실행이 종료될 수 있다.
스트림 병렬화와 CompletableFuture 병렬화
I/O가 포함되는 않은 계산 중심의 동작을 실행할 때는 스트림 인터페이스가 가장 구현하기 간단하며 효율적일 수 있다.
I/O를 기다리는 작업을 병렬로 실행할 때는 CompletableFuture가 더 많은 유연성을 제공하며, 대기/계산의 비율에 적합한 스레드 수를 설정할 수 있다. 스트림의 게으른 특성 때문에 스트림에서 I/O를 실제로 언제 처리할지 예측하기 어려운 문제도 있다.
16.4 비동기 작업 파이프라인 만들기
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
...
}
enum으로 할인율을 제공하는 코드를 정의하였다.
그리고 getPrice 메서드는 ShopName:price:DiscountCode 형식의 문자열을 반환하도록 수정했다.
가격정보 얻기
팩토리메서드 suuplyAsync에 람다 표현식5을 전달해서 비동기적으로 상점에서 정보를 조회했다.
반환 결과는 Stream<CompletableFuture<String>>이다.
Quote 파싱하기
CompletableFuture의 thenApply 메서드를 호출해서 Quote 인스턴스로 변환하는 Function으로 전달한다.
thenApply 메서드는 CompletableFutur가 끝날 때까지 블록하지 않는다.
CompletableFutuer를 조합해서 할인된 가격 계산하기
이번에는 원격 실행(1초의 지연으로 대체)이 포함되므로 이전 두 변환가 달리 동기적으로 작업을 수행해야 한다.
람다 표현식으로 이 동작을 supplyAsync에 전달할 수 있다. 그러면 다른 CompletableFutuer가 반환된다.
결국 두 가지 CompletableFuture로 이루어진 연쇄적으로 수행되는 두 개의 비동기 동작을 만들 수 있다.
- 상점에서 가격 정보를 얻어 와서 Quote로 변환하기
- 변환된 Quote를 Discount 서비스로 전달해서 할인된 최종가격 획득하기
thenCompose 메서드로 두 비동기 연산을 파이프 라인으로 만들수 있다.
16.4.4 독립 CompletableFuture와 비독립 CompletableFuture 합치기
독립적으로 실행된 두 개의 CompletableFuture 결과를 합쳐야할 때 thenCombine 메서드를 사용한다.