CompletableFuture
16.1 Future์ ๋จ์ ํ์ฉ
์๊ฐ์ด ๊ฑธ๋ฆด ์ ์๋ ์์ ์ Future ๋ด๋ถ๋ก ์ค์ ํ๋ฉด ํธ์ถ์ ์ค๋ ๋๊ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ๋์ ๋ค๋ฅธ ์์ ์ ํ ์ ์๋ค.
Future ์์ ์ ExecutorService์์ ์ ๊ณตํ๋ ์ค๋ ๋์์ ์ฒ๋ฆฌ๋๊ณ ,
์์ ์ ๊ฒฐ๊ณผ๊ฐ ํ์ํ ์์ ์ Future์ get ๋ฉ์๋๋ก ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ค.
ํ์ง๋ง get ๋ฉ์๋๋ฅผ ํธ์ถํ์ ๋ ๊ฒฐ๊ณผ๊ฐ ์ค๋น๋์ด์์ง ์๋ค๋ฉด ์์ ์ด ์๋ฃ๋ ๋๊น์ง ์ค๋ ๋๋ฅผ ๋ธ๋ก์ํจ๋ค.
16.1.1 Future ์ ํ
Future ์ธํฐํ์ด์ค์๋ ๋น๋๊ธฐ ๊ณ์ฐ์ ๋ํ ๋๊ธฐ์ ๊ฒฐ๊ณผ ์ฒ๋ฆฌ ๋ฉ์๋๋ค์ด ์๋ค. ํ์ง๋ง ์ฌ๋ฌ Future ๊ฐ ์์กด์ฑ์ ํํํ๊ธฐ ์ด๋ ต๋ค.
๊ทธ๋์ ์๋ฐ8์์๋ CompletableFuture ํด๋์ค๋ก ๋ค์๊ณผ ๊ฐ์ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
๋ ๊ฐ์ ๋น๋๊ธฐ ๊ณ์ฐ ๊ฒฐ๊ณผ๋ฅผ ํฉ์น๋ค. ๋ ๊ฒฐ๊ณผ๋ ์๋ก ๋ ๋ฆฝ์ ๋๋ ํ์ชฝ์ ์์กด์ ์ผ ์ ์๋ค.
Future ์งํฉ์ด ์คํํ๋ ๋ชจ๋ ํ์คํฌ์ ์๋ฃ๋ฅผ ๊ธฐ๋ค๋ฆฐ๋ค.
Future ์งํฉ์์ ๊ฐ์ฅ ๋นจ๋ฆฌ ์๋ฃ๋๋ ํ์คํฌ๋ฅผ ๊ธฐ๋ค๋ ธ๋ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ์ป๋๋ค.
ํ๋ก๊ทธ๋จ์ ์ผ๋ก Future๋ฅผ ์๋ฃ์ํจ๋ค.(๋น๋๊ธฐ ๋์์์ ์๋์ผ๋ก ๊ฒฐ๊ณผ ์ ๊ณต)
Future ์๋ฃ ๋์์ ๋ฐ์ํ๋ค.(๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ฉด์ ๋ธ๋ก๋์ง ์์)
16.2 ๋น๋๊ธฐ API ๊ตฌํ
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {
delay(); //1์ด๊ฐ ๋ธ๋ก
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
์ฌ์ฉ์๊ฐ getPrice API๋ฅผ ํธ์ถํ๋ฉด ๋น๋๊ธฐ ๋์์ด ์๋ฃ๋ ๋๊น์ง 1์ด๋์ ๋ธ๋ก๋๋ค.
์ด์ ์ด API๋ฅผ ๋น๋๊ธฐ๋ก ๋ง๋ค์ด๋ณด์.
16.2.1 ๋๊ธฐ ๋ฉ์๋๋ฅผ ๋น๋๊ธฐ ๋ฉ์๋๋ก ๋ณํ
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}
๋น๋๊ธฐ ๊ณ์ฐ๊ณผ ์๋ฃ ๊ฒฐ๊ณผ๋ฅผ ํฌํจํ๋ CompletableFuture ์ธ์คํด์ค๋ฅผ ๋ง๋ค์๋ค.
๊ทธ๋ฆฌ๊ณ ๊ณ์ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๊ฒฐ๊ณผ๋ฅผ ํฌํจํ Future ์ธ์คํดํธ๋ฅผ ๋ฐ๋ก ๋ฐํํ๋ค.
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //์ ํ ๊ฐ๊ฒฉ ์์ฒญ
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
//์ ํ์ ๊ฐ๊ฒฉ์ ๊ณ์ฐํ๋ ๋์
doSomethingElse();
//๋ค๋ฅธ ์์ ๊ฒ์ ๋ฑ ์์
์ํ
try {
double price = future.get(); //๊ฐ๊ฒฉ์ ๋ณด๋ฅผ ๋ฐ์๋๊น์ง ๋ธ๋ก
} catch (Exception e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
๊ฐ๊ฒฉ ๊ณ์ฐ API๋ ๋น๋๊ธฐ๋ก ์ฒ๋ฆฌ๋๋ฏ๋ก ์ฆ์ Future๋ฅผ ๋ฐํํ๊ณ , ๊ทธ ์ฌ์ด์ ๋ค๋ฅธ ์์ ์ ์ฒ๋ฆฌํ ์ ์๋ค.
๋ค๋ฅธ์์ ์ด ๋๋ฌ๋ค๋ฉด Future์ get ๋ฉ์๋๋ฅผ ํธ์ถํด์ ๊ฐ๊ฒฉ์ ๋ณด๋ฅผ ๋ฐ์ ๋๊น์ง ๋๊ธฐํ๋ค.
16.6.2 ์๋ฌ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ
์ ๋ก์ง์์ ๊ฐ๊ฒฉ์ ๊ณ์ฐํ๋ ๋์ ์๋ฌ๊ฐ ๋ฐ์ํ๋ค๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
์์ธ๊ฐ ๋ฐ์ํ๋ฉด ํด๋น ์ค๋ ๋์๋ง ์ํฅ์ ๋ฏธ์น๊ธฐ ๋๋ฌธ์ ํด๋ผ์ด์ธํธ๋ get ๋ฉ์๋๊ฐ ๋ฐํ๋ ๋๊น์ง ์์ํ ๊ธฐ๋ค๋ฆด ์๋ ์๋ค.
๋ฐ๋ผ์ ํ์์์์ ํ์ฉํด ์์ธ์ฒ๋ฆฌ๋ฅผ ํ๊ณ , completeExceptionally ๋ฉ์๋๋ฅผ ์ด์ฉํด CompletableFuture ๋ด๋ถ์์ ๋ฐ์ํ ์์ธ๋ฅผ ํด๋ผ์ด์ธํธ๋ก ์ ๋ฌํด์ผ ํ๋ค.
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch {
futurePrice.completeExceptionally(ex); //์๋ฌ๋ฅผ ํฌํจ์์ผ Future๋ฅผ ์ข
๋ฃ
}
}).start();
return futurePrice;
}
ํฉํ ๋ฆฌ ๋ฉ์๋ supplyAsync๋ก CompletableFuture ๋ง๋ค๊ธฐ
์ข ๋ ๊ฐ๋จํ๊ฒ CompletableFuture๋ฅผ ๋ง๋๋ ๋ฐฉ๋ฒ๋ ์๋ค.
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
supplyAsync ๋ฉ์๋๋ Supplier๋ฅผ ์ธ์๋ก ๋ฐ์์ CompletableFuture๋ฅผ ๋ฐํํ๋ค.
ForkJoinPool์ Executor ์ค ํ๋๊ฐ Supplier๋ฅผ ์คํํ๋ฉฐ, ๋ ๋ฒ์งธ ์ธ์๋ก ๋ค๋ฅธ Executor๋ฅผ ์ง์ ํ ์๋ ์๋ค.
16.3 ๋น๋ธ๋ก ์ฝ๋ ๋ง๋ค๊ธฐ
๋ค์๊ณผ ๊ฐ์ ์์ ๋ฆฌ์คํธ๊ฐ ์๋ค.
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll"));
๊ทธ๋ฆฌ๊ณ ๋ค์์ฒ๋ผ ์ ํ๋ช ์ ์ ๋ ฅํ๋ฉด ์์ ์ด๋ฆ๊ณผ ์ ํ ๊ฐ๊ฒฉ ๋ฌธ์์ด์ ๋ฐํํ๋ List๋ฅผ ๊ตฌํํด์ผ ํ๋ค.
public List<String> findPrices(String product);
์คํธ๋ฆผ์ ์ด์ฉํ๋ฉด ์ํ๋ ๋์์ ๊ตฌํํ ์ ์๋ค.
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
ํ์ง๋ง ๋ค ๊ฐ์ ์์ ์์ ๊ฐ๊ฐ ๊ฐ๊ฒฉ์ ๊ฒ์ํ๋ ๋์ ๋ธ๋ก๋๋ ์๊ฐ์ด ๋ฐ์ํ ๊ฒ์ด๋ค.
16.3.1 ๋ณ๋ ฌ ์คํธ๋ฆผ์ผ๋ก ์์ฒญ ๋ณ๋ ฌํํ๊ธฐ
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
์ด์ ๋ค ๊ฐ์ ์์ ์์ ๋ณ๋ ฌ๋ก ๊ฒ์์ด ์งํ๋๋ฏ๋ก ์๊ฐ์ ํ๋์ ์์ ์์ ๊ฐ๊ฒฉ์ ๊ฒ์ํ๋ ์ ๋๋ง ์์๋ ๊ฒ์ด๋ค.
16.3.2 CompletableFutue๋ก ๋น๋๊ธฐ ํธ์ถ ๊ตฌํํ๊ธฐ
์ด๋ฒ์๋ findPrices ๋ฉ์๋์ ํธ์ถ์ ๋น๋๊ธฐ๋ก ๋ฐ๊ฟ๋ณด์.
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.suppltAsync(
() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
์ ์ฝ๋๋ก List<CompletableFuture<String>>๋ฅผ ์ป์ ์ ์๊ณ , ๋ฆฌ์คํธ์ CompletableFuture๋ ๊ฐ๊ฐ ๊ณ์ฐ ๊ฒฐ๊ณผ๊ฐ ๋๋ ์์ ์ ์ด๋ฆ ๋ฌธ์์ด์ ํฌํจํ๋ค.
ํ์ง๋ง ์ฐ๋ฆฌ๋ List<String> ํ์์ ์ป์ด์ผ ํ๋ฏ๋ก ๋ชจ๋ CompletableFuture์ ๋์์ด ์๋ฃ๋๊ณ ๊ฒฐ๊ด๋ฅด ์ถ์ถํ ๋ค์ ๋ฆฌ์คํธ๋ฅผ ๋ฐํํด์ผ ํ๋ค.
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.suppltAsync(
() -> shop.getName() + "price is " + shop.getPrice(product)))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join) //๋ชจ๋ ๋น๋๊ธฐ ๋์์ด ๋๋๊ธธ ๋๊ธฐ
.collect(toList());
}
๋ map ์ฐ์ฐ์ ํ๋์ ์คํธ๋ฆผ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ์ด ์๋, ๋ ๊ฐ์ ํ์ดํ๋ผ์ธ์ผ๋ก ์ฒ๋ฆฌํ๋ค๋ ์ฌ์ค์ ์ฃผ๋ชฉํ์.
์คํธ๋ฆผ ์ฐ์ฐ์ ๊ฒ์ผ๋ฅธ ํน์ฑ์ด ์์ผ๋ฏ๋ก ํ๋์ ํ์ดํ๋ผ์ธ์ผ๋ก ์ฒ๋ฆฌํ๋ค๋ฉด ๋ชจ๋ ๊ฐ๊ฒฉ ์ ๋ณด ์์ฒญ ๋์์ด ๋๊ธฐ์ , ์์ฐจ์ ์ผ๋ก ์ด๋ฃจ์ด์ง๊ฒ ๋๋ค.
CompletableFuture๋ก ๊ฐ ์์ ์ ์ ๋ณด๋ฅผ ์์ฒญํ ๋ ๊ธฐ์กด ์์ฒญ ์์ ์ด ์๋ฃ๋์ด์ผ join์ด ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ๋ฉด์ ๋ค์ ์์ ์ผ๋ก ์ ๋ณด๋ฅผ ์์ฒญํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
์์ฐจ shop1.getPrice() โ future1.join() โ shop2.getPrice() โ future2.join() โ shop3.getPrice() โ future3.join()
๋ณ๋ ฌ shop1.getPrice() โ future1.join() โ โ shop2.getPrice() โ future2.join() โ โ shop3.getPrice() โ future3.join()
ํ์ง๋ง CompletableFuture๋ฅผ ์ฌ์ฉํ ๊ฒฐ๊ณผ๋ ์์ฐจ ๋ฐฉ์๋ณด๋จ ๋น ๋ฅด์ง๋ง ๋ณ๋ ฌ ์คํธ๋ฆผ๋ณด๋จ ๋๋ฆฌ๋ค. ์ด๋ป๊ฒ ๊ฐ์ ํ ์ ์์๊น?
16.3.3 ๋ ํ์ฅ์ฑ์ด ์ข์ ํด๊ฒฐ๋ฐฉ๋ฒ
๋ณ๋ ฌ ์คํธ๋ฆผ ๋ฒ์ ์์๋ 4๊ฐ์ ์ค๋ ๋์ 4๊ฐ์ ์์ ์ ๋ณ๋ ฌ๋ก ์ํํ๋ฉด์ ๊ฒ์ ์๊ฐ์ ์ต์ํํ๋ค.
ํ์ง๋ง ์์ ์ด 5๊ฐ๊ฐ ๋๋ค๋ฉด, 4๊ฐ ์ค ํ๋์ ์ค๋ ๋๊ฐ ์๋ฃ๋ ํ์ ์ถ๊ฐ๋ก 5๋ฒ์งธ ์ง์์ ์ํํ ์ ์๋ค.
CompletableFuture๋ ๋ณ๋ ฌ ์คํธ๋ฆผ์ ๋นํด ์์ ์ ์ด์ฉํ ์ ์๋ Executor๋ฅผ ์ง์ ํ ์ ์๋ค๋ ์ฅ์ ์ด ์๋ค.
16.3.4 ์ปค์คํ Executor ์ฌ์ฉํ๊ธฐ
์ค์ ๋ก ํ์ํ ์์ ๋์ ๊ณ ๋ คํ ํ์์ ๊ด๋ฆฌํ๋ ์ค๋ ๋ ์์ ๋ง๊ฒ Executor๋ฅผ ๋ง๋ค๋ฉด ์ข์ ๊ฒ ๊ฐ๋ค.
ํ์์ ๊ด๋ฆฌํ๋ ์ค๋ ๋ ์๋ ์ด๋ป๊ฒ ๊ฒฐ์ ํ ์ ์์๊น?
์ค๋ ๋ ํ ํฌ๊ธฐ์กฐ์ N_thread_ = N_cpu_ * U_cpu_ * (1 + W/C) - N_cpu_ : Runtime.getRuntime().availableProcessors()๊ฐ ๋ฐํํ๋ ์ฝ์ด ์ - U_cpu_ : 0๊ณผ 1 ์ฌ์ด์ ๊ฐ์ ๊ฐ๋ CPU ํ์ฉ ๋น์จ - W/C : ๋๊ธฐ์๊ฐ๊ณผ ๊ณ์ฐ์๊ฐ์ ๋น์จ
ํ ์์ ์ ํ๋์ ์ค๋ ๋๊ฐ ํ ๋น๋ ์ ์๋๋ก, ์์ ์๋งํผ Executor๋ฅผ ์ค์ ํ๋ค.
์๋ฒ ํฌ๋์ ๋ฐฉ์ง๋ฅผ ์ํด ํ๋์ Executor์์ ์ฌ์ฉํ ์ค๋ ๋์ ์ต๋ ๊ฐ์๋ 100 ์ดํ๋ก ์ค์ ํ๋ค.
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), //์์ ์๋งํผ์ ์ค๋ ๋๋ฅผ ๊ฐ๋ ํ ์์ฑ(0~100 ์ฌ์ด)
new ThreadFactory() {
public Thread new Thread(Runnable r) {
Thread t = new Thread(r);
t.setDeamon(true);
return t;
}
});
๋ฐ๋ชฌ ์ค๋ ๋๋ฅผ ์ฌ์ฉํ๋ฉด ์๋ฐ ํ๋ก๊ทธ๋จ์ด ์ข ๋ฃ๋ ๋ ๊ฐ์ ๋ก ์ค๋ ๋ ์คํ์ด ์ข ๋ฃ๋ ์ ์๋ค.
์คํธ๋ฆผ ๋ณ๋ ฌํ์ CompletableFuture ๋ณ๋ ฌํ
I/O๊ฐ ํฌํจ๋๋ ์์ ๊ณ์ฐ ์ค์ฌ์ ๋์์ ์คํํ ๋๋ ์คํธ๋ฆผ ์ธํฐํ์ด์ค๊ฐ ๊ฐ์ฅ ๊ตฌํํ๊ธฐ ๊ฐ๋จํ๋ฉฐ ํจ์จ์ ์ผ ์ ์๋ค.
I/O๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์์ ์ ๋ณ๋ ฌ๋ก ์คํํ ๋๋ CompletableFuture๊ฐ ๋ ๋ง์ ์ ์ฐ์ฑ์ ์ ๊ณตํ๋ฉฐ, ๋๊ธฐ/๊ณ์ฐ์ ๋น์จ์ ์ ํฉํ ์ค๋ ๋ ์๋ฅผ ์ค์ ํ ์ ์๋ค. ์คํธ๋ฆผ์ ๊ฒ์ผ๋ฅธ ํน์ฑ ๋๋ฌธ์ ์คํธ๋ฆผ์์ I/O๋ฅผ ์ค์ ๋ก ์ธ์ ์ฒ๋ฆฌํ ์ง ์์ธกํ๊ธฐ ์ด๋ ค์ด ๋ฌธ์ ๋ ์๋ค.
16.4 ๋น๋๊ธฐ ์์
ํ์ดํ๋ผ์ธ ๋ง๋ค๊ธฐ
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
private final int percentage;
Code(int percentage) {
this.percentage = percentage;
}
}
...
}
enum์ผ๋ก ํ ์ธ์จ์ ์ ๊ณตํ๋ ์ฝ๋๋ฅผ ์ ์ํ์๋ค.
๊ทธ๋ฆฌ๊ณ getPrice ๋ฉ์๋๋ ShopName:price:DiscountCode ํ์์ ๋ฌธ์์ด์ ๋ฐํํ๋๋ก ์์ ํ๋ค.
public String getPrice(String product) {
double price = calcuatePrice(product);
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.f:%s", name, price, code);
}
16.4.1 ํ ์ธ ์๋น์ค ๊ตฌํ
์์ ์์ ์ ๊ณตํ ๋ฌธ์์ด ํ์ฑ์ ๋ค์์ฒ๋ผ Quote ํด๋์ค๋ก ์บก์ํํ ์ ์๋ค.
public class Quote {
private final String shopName;
private final double price;
private final Discount.code discountCode;
public Quote(String shopName, double price, Discount.code code) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}
public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Doule.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}
public String getShopName() {
return shopName;
}
public String getPrice() {
return price;
}
public Discount.code getDiscountCode() {
return discountCode;
}
}
์์ ์์ ์ป์ ๋ฌธ์์ด์ ์ ์ ํฉํ ๋ฆฌ ๋ฉ์๋ parse๋ก ๋๊ฒจ์ฃผ๋ฉด ์์ ์ด๋ฆ, ํ ์ธ์ ๊ฐ๊ฒฉ, ํ ์ธ๋ ๊ฐ๊ฒฉ ์ ๋ณด๋ฅผ ํฌํจํ๋ Quote ํด๋์ค ์ธ์คํด์ค๊ฐ ์์ฑ๋๋ค.
๋ค์์ผ๋ก Discount ์๋น์ค์์๋ Quote ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฐ์ ํ ์ธ๋ ๊ฐ๊ฒฉ ๋ฌธ์์ด์ ๋ฐํํ๋ applyDiscount ๋ฉ์๋๋ ์ ๊ณตํ๋ค.
public class Discount {
public enum Code {
...
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(
quote.getPrice(), quote.getDiscountCode());
}
pivate static double apply(double price, Code code) {
delay();
return format(price * (100 - code.percentage) / 100);
}
}
16.4.2 ํ ์ธ ์๋น์ค ์ด์ฉ
๋จผ์ ๊ฐ์ฅ ์ฌ์ด ๋ฐฉ๋ฒ์ธ ์์ฐจ์ &๋๊ธฐ๋ฐฉ์์ผ๋ก findPrice ๋ฉ์๋๋ฅผ ๊ตฌํํ๋ค.
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> sho.getPrice(product)) //๊ฐ ์์ ์์ ํ ์ธ์ ๊ฐ๊ฒฉ ์ป๊ธฐ
.map(Quote::parse) //๋ฐํ๋ ๋ฌธ์์ด์ Quote ๊ฐ์ฒด๋ก ๋ณํ
.map(Discount::applyDiscount) //Quote์ ํ ์ธ ์ ์ฉ
.collect(toList());
}
์ฝ๋๋ฅผ ์ํํด๋ณด๋ฉด ์์ฐจ์ ์ผ๋ก ๋ค์ฏ ์์ ์ ๊ฐ๊ฒฉ์ ์์ฒญํ๋ฉด์ 5์ด๊ฐ ์์๋๊ณ , ํ ์ธ์ฝ๋๋ฅผ ์ ์ฉํ๋ฉด์ 5์ด๊ฐ ์์๋๋ค.
์์ ํ์ธํ ๊ฒ์ฒ๋ผ ๋ณ๋ ฌ ์คํธ๋ฆผ์ผ๋ก ๋ณํํ๋ฉด ์ฑ๋ฅ์ ๊ฐ์ ํ ์ ์๋ค. ํ์ง๋ง ์คํธ๋ฆผ์ด ์ฌ์ฉํ๋ ์ค๋ ๋ ํ์ ํฌ๊ธฐ๊ฐ ๊ณ ์ ๋์ด ์์ผ๋ฏ๋ก, ์์ ์๊ฐ ๋์ด๋๊ฒ๋๋ฉด ์ ์ฐํ๊ฒ ๋์ํ ์ ์๋ค.
๋ฐ๋ผ์ CompletableFutuer์์ ์ํํ๋ ํ์คํฌ๋ฅผ ์ค์ ํ ์ ์๋ ์ปค์คํ Executoer๋ฅผ ์ ์ํด์ CPU ์ฌ์ฉ์ ๊ทน๋ํํด์ผํ๋ค.
16.4.3 ๋๊ธฐ ์์ ๊ณผ ๋น๋๊ธฐ ์์ ์กฐํฉํ๊ธฐ
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.suppltAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor))
.collect(toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
๊ฐ๊ฒฉ์ ๋ณด ์ป๊ธฐ ํฉํ ๋ฆฌ๋ฉ์๋ suuplyAsync์ ๋๋ค ํํ์5์ ์ ๋ฌํด์ ๋น๋๊ธฐ์ ์ผ๋ก ์์ ์์ ์ ๋ณด๋ฅผ ์กฐํํ๋ค. ๋ฐํ ๊ฒฐ๊ณผ๋ Stream<CompletableFuture<String>>์ด๋ค.
Quote ํ์ฑํ๊ธฐ CompletableFuture์ thenApply ๋ฉ์๋๋ฅผ ํธ์ถํด์ Quote ์ธ์คํด์ค๋ก ๋ณํํ๋ Function์ผ๋ก ์ ๋ฌํ๋ค. thenApply ๋ฉ์๋๋ CompletableFutur๊ฐ ๋๋ ๋๊น์ง ๋ธ๋กํ์ง ์๋๋ค.
CompletableFutuer๋ฅผ ์กฐํฉํด์ ํ ์ธ๋ ๊ฐ๊ฒฉ ๊ณ์ฐํ๊ธฐ ์ด๋ฒ์๋ ์๊ฒฉ ์คํ(1์ด์ ์ง์ฐ์ผ๋ก ๋์ฒด)์ด ํฌํจ๋๋ฏ๋ก ์ด์ ๋ ๋ณํ๊ฐ ๋ฌ๋ฆฌ ๋๊ธฐ์ ์ผ๋ก ์์ ์ ์ํํด์ผ ํ๋ค. ๋๋ค ํํ์์ผ๋ก ์ด ๋์์ supplyAsync์ ์ ๋ฌํ ์ ์๋ค. ๊ทธ๋ฌ๋ฉด ๋ค๋ฅธ CompletableFutuer๊ฐ ๋ฐํ๋๋ค. ๊ฒฐ๊ตญ ๋ ๊ฐ์ง CompletableFuture๋ก ์ด๋ฃจ์ด์ง ์ฐ์์ ์ผ๋ก ์ํ๋๋ ๋ ๊ฐ์ ๋น๋๊ธฐ ๋์์ ๋ง๋ค ์ ์๋ค. - ์์ ์์ ๊ฐ๊ฒฉ ์ ๋ณด๋ฅผ ์ป์ด ์์ Quote๋ก ๋ณํํ๊ธฐ - ๋ณํ๋ Quote๋ฅผ Discount ์๋น์ค๋ก ์ ๋ฌํด์ ํ ์ธ๋ ์ต์ข ๊ฐ๊ฒฉ ํ๋ํ๊ธฐ thenCompose ๋ฉ์๋๋ก ๋ ๋น๋๊ธฐ ์ฐ์ฐ์ ํ์ดํ ๋ผ์ธ์ผ๋ก ๋ง๋ค์ ์๋ค.
16.4.4 ๋ ๋ฆฝ CompletableFuture์ ๋น๋ ๋ฆฝ CompletableFuture ํฉ์น๊ธฐ
๋ ๋ฆฝ์ ์ผ๋ก ์คํ๋ ๋ ๊ฐ์ CompletableFuture ๊ฒฐ๊ณผ๋ฅผ ํฉ์ณ์ผํ ๋ thenCombine ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ค.
thenCombine ๋ฉ์๋์ BiFunction ์ธ์๋ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ป๊ฒ ํฉ์ง์ง ์ ์ํ๋ค.
Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) //1๋ฒ์งธ ํ์คํฌ - ๊ฐ๊ฒฉ์ ๋ณด ์์ฒญ
.thenCombine(CompletableFuture.suuplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)), //2๋ฒ์งธ ํ์คํฌ - ํ์จ์ ๋ณด ์์ฒญ
(price, rate) -> price * rate)); //๋ ๊ฒฐ๊ณผ ํฉ์นจ
๋ ๋ฆฝ์ ์ธ ๋ ๊ฐ์ ๋น๋๊ธฐ ํ์คํฌ๋ ๊ฐ๊ฐ ์ํ๋๊ณ , ๋ง์ง๋ง์ ํฉ์ณ์ง๋ค.
16.4.5 Future์ ๋ฆฌํ๋ ์ ๊ณผ CompletableFuture์ ๋ฆฌํ๋ ์
CompletableFuture๋ ๋๋ค ํํ์์ ์ฌ์ฉํด ๋๊ธฐ/๋น๋๊ธฐ ํ์คํฌ๋ฅผ ํ์ฉํ ๋ณต์กํ ์ฐ์ฐ ์ํ ๋ฐฉ๋ฒ์ ํจ๊ณผ์ ์ผ๋ก ์ ์ํ ์ ์๋ค.
๋ํ ์ฝ๋ ๊ฐ๋ ์ฑ๋ ํฅ์๋๋ค. ์์ ์ฝ๋๋ฅผ ์๋ฐ7๋ก ๊ตฌํํ๋ฉด์ ๋น๊ตํด๋ณด์.
ExecutorService executor = Executors.newCachedThreadPool();
final Funtion<Double> futureRate = executor.submit(new Callable<Double>() {
public Double call() {
return exchangeService.getRate(Money.EUR, Money.USD);
}
});
final Funtion<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
public Double call() {
double priceInEUR = shop.getPrice(product);
return priceInEUR * futureRate.get();
}
});
16.4.6 ํ์์์ ํจ๊ณผ์ ์ผ๋ก ์ฌ์ฉํ๊ธฐ
Future๊ฐ ์์ ์ ๋๋ด์ง ๋ชปํ ๊ฒฝ์ฐ TimeoutException์ ๋ฐ์์์ผ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค.
Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CompletableFuture.suuplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate))
.orTimeout(3, TimeUnit.SECONDS);
compleOnTimeout๋ฉ์๋๋ฅผ ํตํด ์์ธ๋ฅผ ๋ฐ์์ํค๋ ๋์ ๋ฏธ๋ฆฌ ์ง์ ๋ ๊ฐ์ ์ฌ์ฉํ๋๋ก ํ ์๋ ์๋ค.
Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CompletableFuture.suuplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)),
.completOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
(price, rate) -> price * rate))
.orTimeout(3, TimeUnit.SECONDS);
16.5 CompletableFuture์ ์ข
๋ฃ์ ๋์ํ๋ ๋ฐฉ๋ฒ
๊ฐ ์์ ์์ ๋ฌผ๊ฑด ๊ฐ๊ฒฉ ์ ๋ณด๋ฅผ ์ป์ด์ค๋ findPrices ๋ฉ์๋๊ฐ ๋ชจ๋ 1์ด์ฉ ์ง์ฐ๋๋ ๋์ , 0.5~2.5์ด์ฉ ์์๋ก ์ง์ฐ๋๋ค๊ณ ํ์.
๊ทธ๋ฆฌ๊ณ ๊ฐ ์์ ์์ ๊ฐ๊ฒฉ ์ ๋ณด๋ฅผ ์ ๊ณตํ ๋๋ง๋ค ์ฆ์ ๋ณด์ฌ์ค ์ ์๋ ์ต์ ๊ฐ๊ฒฉ ๊ฒ์ ์ดํ๋ฆฌ์ผ์ด์ ์ ๋ง๋ค์ด๋ณด์.
16.5.1 ์ต์ ๊ฐ๊ฒฉ ๊ฒ์ ์ํ๋ฆฌ์ผ์ด์ ๋ฆฌํฉํฐ๋ง
public Stream<CompletableFuture<String>> findPriceStream(String product) {
return shop.stream()
.map(shop -> CompletableFuture.suppltAsync(
() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));
}
์ด์ findPriceStream ๋ฉ์๋ ๋ด๋ถ์์ ์ธ ๊ฐ์ง map ์ฐ์ฐ์ ์ ์ฉํ๊ณ ๋ฐํํ๋ ์คํธ๋ฆผ์ ๋ค ๋ฒ์งธ map ์ฐ์ฐ์ ์ ์ฉํ์.
findPriceStream("myPhone").map(f -> f.thenAccept(System.out::println));
ํฉํ ๋ฆฌ ๋ฉ์๋ allOf๋ ์ ๋ฌ๋ ๋ชจ๋ CompletableFuture๊ฐ ์๋ฃ๋ ํ์ CompletableFuture<Void>๋ฅผ ๋ฐํํ๋ค.
์ด๋ฅผ ํตํด ๋ชจ๋ ๊ฒฐ๊ณผ๊ฐ ๋ฐํ๋์์์ ํ์ธํ ์ ์๋ค.
CompletableFuture[] futures = findPriceStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futues).join();
๋ง์ฝ CompletableFuture ์ค ํ๋๋ง ์๋ฃ๋๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ํฉ์ด๋ผ๋ฉด ํฉํ ๋ฆฌ๋ฉ์๋ anyOf๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
Last updated