CompletableFuture

16.1 Future์˜ ๋‹จ์ˆœ ํ™œ์šฉ

์‹œ๊ฐ„์ด ๊ฑธ๋ฆด ์ˆ˜ ์žˆ๋Š” ์ž‘์—…์„ Future ๋‚ด๋ถ€๋กœ ์„ค์ •ํ•˜๋ฉด ํ˜ธ์ถœ์ž ์Šค๋ ˆ๋“œ๊ฐ€ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ๋™์•ˆ ๋‹ค๋ฅธ ์ž‘์—…์„ ํ•  ์ˆ˜ ์žˆ๋‹ค.

Future ์ž‘์—…์€ ExecutorService์—์„œ ์ œ๊ณตํ•˜๋Š” ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌ๋˜๊ณ ,

์ž‘์—…์˜ ๊ฒฐ๊ณผ๊ฐ€ ํ•„์š”ํ•œ ์‹œ์ ์— Future์˜ get ๋ฉ”์„œ๋“œ๋กœ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

ํ•˜์ง€๋งŒ get ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ–ˆ์„ ๋•Œ ๊ฒฐ๊ณผ๊ฐ€ ์ค€๋น„๋˜์–ด์žˆ์ง€ ์•Š๋‹ค๋ฉด ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ์Šค๋ ˆ๋“œ๋ฅผ ๋ธ”๋ก์‹œํ‚จ๋‹ค.

16.1.1 Future ์ œํ•œ

Future ์ธํ„ฐํŽ˜์ด์Šค์—๋Š” ๋น„๋™๊ธฐ ๊ณ„์‚ฐ์— ๋Œ€ํ•œ ๋Œ€๊ธฐ์™€ ๊ฒฐ๊ณผ ์ฒ˜๋ฆฌ ๋ฉ”์„œ๋“œ๋“ค์ด ์žˆ๋‹ค. ํ•˜์ง€๋งŒ ์—ฌ๋Ÿฌ Future ๊ฐ„ ์˜์กด์„ฑ์€ ํ‘œํ˜„ํ•˜๊ธฐ ์–ด๋ ต๋‹ค.

๊ทธ๋ž˜์„œ ์ž๋ฐ”8์—์„œ๋Š” CompletableFuture ํด๋ž˜์Šค๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

  • ๋‘ ๊ฐœ์˜ ๋น„๋™๊ธฐ ๊ณ„์‚ฐ ๊ฒฐ๊ณผ๋ฅผ ํ•ฉ์นœ๋‹ค. ๋‘ ๊ฒฐ๊ณผ๋Š” ์„œ๋กœ ๋…๋ฆฝ์  ๋˜๋Š” ํ•œ์ชฝ์— ์˜์กด์ ์ผ ์ˆ˜ ์žˆ๋‹ค.

  • Future ์ง‘ํ•ฉ์ด ์‹คํ–‰ํ•˜๋Š” ๋ชจ๋“  ํƒœ์Šคํฌ์˜ ์™„๋ฃŒ๋ฅผ ๊ธฐ๋‹ค๋ฆฐ๋‹ค.

  • Future ์ง‘ํ•ฉ์—์„œ ๊ฐ€์žฅ ๋นจ๋ฆฌ ์™„๋ฃŒ๋˜๋Š” ํƒœ์Šคํฌ๋ฅผ ๊ธฐ๋‹ค๋ ธ๋‹ค๊ฐ€ ๊ฒฐ๊ณผ๋ฅผ ์–ป๋Š”๋‹ค.

  • ํ”„๋กœ๊ทธ๋žจ์ ์œผ๋กœ Future๋ฅผ ์™„๋ฃŒ์‹œํ‚จ๋‹ค.(๋น„๋™๊ธฐ ๋™์ž‘์—์„œ ์ˆ˜๋™์œผ๋กœ ๊ฒฐ๊ณผ ์ œ๊ณต)

  • Future ์™„๋ฃŒ ๋™์ž‘์— ๋ฐ˜์‘ํ•œ๋‹ค.(๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋ฉด์„œ ๋ธ”๋ก๋˜์ง€ ์•Š์Œ)


16.2 ๋น„๋™๊ธฐ API ๊ตฌํ˜„

public double getPrice(String product) {
  return calculatePrice(product);
}

private double calculatePrice(String product) {
  delay(); //1์ดˆ๊ฐ„ ๋ธ”๋ก
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

์‚ฌ์šฉ์ž๊ฐ€ getPrice API๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ๋น„๋™๊ธฐ ๋™์ž‘์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ 1์ดˆ๋™์•ˆ ๋ธ”๋ก๋œ๋‹ค.

์ด์ œ ์ด API๋ฅผ ๋น„๋™๊ธฐ๋กœ ๋งŒ๋“ค์–ด๋ณด์ž.

16.2.1 ๋™๊ธฐ ๋ฉ”์„œ๋“œ๋ฅผ ๋น„๋™๊ธฐ ๋ฉ”์„œ๋“œ๋กœ ๋ณ€ํ™˜

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread(() -> {
    double price = calculatePrice(product);
    futurePrice.complete(price);
  }).start();
  return futurePrice;
}

๋น„๋™๊ธฐ ๊ณ„์‚ฐ๊ณผ ์™„๋ฃŒ ๊ฒฐ๊ณผ๋ฅผ ํฌํ•จํ•˜๋Š” CompletableFuture ์ธ์Šคํ„ด์Šค๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๊ณ„์‚ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ  ๊ฒฐ๊ณผ๋ฅผ ํฌํ•จํ•  Future ์ธ์Šคํ„ดํŠธ๋ฅผ ๋ฐ”๋กœ ๋ฐ˜ํ™˜ํ–ˆ๋‹ค.

Shop shop = new Shop("BestShop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); //์ œํ’ˆ ๊ฐ€๊ฒฉ ์š”์ฒญ
long invocationTime = ((System.nanoTime() - start) / 1_000_000);

//์ œํ’ˆ์˜ ๊ฐ€๊ฒฉ์„ ๊ณ„์‚ฐํ•˜๋Š” ๋™์•ˆ
doSomethingElse();
//๋‹ค๋ฅธ ์ƒ์  ๊ฒ€์ƒ‰ ๋“ฑ ์ž‘์—… ์ˆ˜ํ–‰
try {
  double price = future.get(); //๊ฐ€๊ฒฉ์ •๋ณด๋ฅผ ๋ฐ›์„๋•Œ๊นŒ์ง€ ๋ธ”๋ก
} catch (Exception e) {
  throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);

๊ฐ€๊ฒฉ ๊ณ„์‚ฐ API๋Š” ๋น„๋™๊ธฐ๋กœ ์ฒ˜๋ฆฌ๋˜๋ฏ€๋กœ ์ฆ‰์‹œ Future๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ , ๊ทธ ์‚ฌ์ด์— ๋‹ค๋ฅธ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹ค๋ฅธ์ž‘์—…์ด ๋๋‚ฌ๋‹ค๋ฉด Future์˜ get ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์„œ ๊ฐ€๊ฒฉ์ •๋ณด๋ฅผ ๋ฐ›์„ ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•œ๋‹ค.

16.6.2 ์—๋Ÿฌ ์ฒ˜๋ฆฌ ๋ฐฉ๋ฒ•

์œ„ ๋กœ์ง์—์„œ ๊ฐ€๊ฒฉ์„ ๊ณ„์‚ฐํ•˜๋Š” ๋™์•ˆ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ?

์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ํ•ด๋‹น ์Šค๋ ˆ๋“œ์—๋งŒ ์˜ํ–ฅ์„ ๋ฏธ์น˜๊ธฐ ๋•Œ๋ฌธ์— ํด๋ผ์ด์–ธํŠธ๋Š” get ๋ฉ”์„œ๋“œ๊ฐ€ ๋ฐ˜ํ™˜๋  ๋•Œ๊นŒ์ง€ ์˜์›ํžˆ ๊ธฐ๋‹ค๋ฆด ์ˆ˜๋„ ์žˆ๋‹ค.

๋”ฐ๋ผ์„œ ํƒ€์ž„์•„์›ƒ์„ ํ™œ์šฉํ•ด ์˜ˆ์™ธ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๊ณ , completeExceptionally ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•ด CompletableFuture ๋‚ด๋ถ€์—์„œ ๋ฐœ์ƒํ•œ ์—์™ธ๋ฅผ ํด๋ผ์ด์–ธํŠธ๋กœ ์ „๋‹ฌํ•ด์•ผ ํ•œ๋‹ค.

public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread(() -> {
    try {
      double price = calculatePrice(product);
      futurePrice.complete(price);
    } catch {
      futurePrice.completeExceptionally(ex); //์—๋Ÿฌ๋ฅผ ํฌํ•จ์‹œ์ผœ Future๋ฅผ ์ข…๋ฃŒ
    }
  }).start();
  return futurePrice;
}

ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ supplyAsync๋กœ CompletableFuture ๋งŒ๋“ค๊ธฐ

์ข€ ๋” ๊ฐ„๋‹จํ•˜๊ฒŒ CompletableFuture๋ฅผ ๋งŒ๋“œ๋Š” ๋ฐฉ๋ฒ•๋„ ์žˆ๋‹ค.

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync ๋ฉ”์„œ๋“œ๋Š” Supplier๋ฅผ ์ธ์ˆ˜๋กœ ๋ฐ›์•„์„œ CompletableFuture๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

ForkJoinPool์˜ Executor ์ค‘ ํ•˜๋‚˜๊ฐ€ Supplier๋ฅผ ์‹คํ–‰ํ•˜๋ฉฐ, ๋‘ ๋ฒˆ์งธ ์ธ์ˆ˜๋กœ ๋‹ค๋ฅธ Executor๋ฅผ ์ง€์ •ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.


16.3 ๋น„๋ธ”๋ก ์ฝ”๋“œ ๋งŒ๋“ค๊ธฐ

๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ƒ์  ๋ฆฌ์ŠคํŠธ๊ฐ€ ์žˆ๋‹ค.

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                                 new Shop("LetsSaveBig"),
                                 new Shop("MyFavoriteShop"),
                                 new Shop("BuyItAll"));

๊ทธ๋ฆฌ๊ณ  ๋‹ค์Œ์ฒ˜๋Ÿผ ์ œํ’ˆ๋ช…์„ ์ž…๋ ฅํ•˜๋ฉด ์ƒ์  ์ด๋ฆ„๊ณผ ์ œํ’ˆ ๊ฐ€๊ฒฉ ๋ฌธ์ž์—ด์„ ๋ฐ˜ํ™˜ํ•˜๋Š” List๋ฅผ ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค.

public List<String> findPrices(String product);

์ŠคํŠธ๋ฆผ์„ ์ด์šฉํ•˜๋ฉด ์›ํ•˜๋Š” ๋™์ž‘์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

ํ•˜์ง€๋งŒ ๋„ค ๊ฐœ์˜ ์ƒ์ ์—์„œ ๊ฐ๊ฐ ๊ฐ€๊ฒฉ์„ ๊ฒ€์ƒ‰ํ•˜๋Š” ๋™์•ˆ ๋ธ”๋ก๋˜๋Š” ์‹œ๊ฐ„์ด ๋ฐœ์ƒํ•  ๊ฒƒ์ด๋‹ค.

16.3.1 ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ์œผ๋กœ ์š”์ฒญ ๋ณ‘๋ ฌํ™”ํ•˜๊ธฐ

public List<String> findPrices(String product) {
  return shops.parallelStream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

์ด์ œ ๋„ค ๊ฐœ์˜ ์ƒ์ ์—์„œ ๋ณ‘๋ ฌ๋กœ ๊ฒ€์ƒ‰์ด ์ง„ํ–‰๋˜๋ฏ€๋กœ ์‹œ๊ฐ„์€ ํ•˜๋‚˜์˜ ์ƒ์ ์—์„œ ๊ฐ€๊ฒฉ์„ ๊ฒ€์ƒ‰ํ•˜๋Š” ์ •๋„๋งŒ ์†Œ์š”๋  ๊ฒƒ์ด๋‹ค.

16.3.2 CompletableFutue๋กœ ๋น„๋™๊ธฐ ํ˜ธ์ถœ ๊ตฌํ˜„ํ•˜๊ธฐ

์ด๋ฒˆ์—๋Š” findPrices ๋ฉ”์„œ๋“œ์˜ ํ˜ธ์ถœ์„ ๋น„๋™๊ธฐ๋กœ ๋ฐ”๊ฟ”๋ณด์ž.

List<CompletableFuture<String>> priceFutures = 
  shops.stream()
    .map(shop -> CompletableFuture.suppltAsync(
      () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

์œ„ ์ฝ”๋“œ๋กœ List<CompletableFuture<String>>๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๊ณ , ๋ฆฌ์ŠคํŠธ์˜ CompletableFuture๋Š” ๊ฐ๊ฐ ๊ณ„์‚ฐ ๊ฒฐ๊ณผ๊ฐ€ ๋๋‚œ ์ƒ์ ์˜ ์ด๋ฆ„ ๋ฌธ์ž์—ด์„ ํฌํ•จํ•œ๋‹ค.

ํ•˜์ง€๋งŒ ์šฐ๋ฆฌ๋Š” List<String> ํ˜•์‹์„ ์–ป์–ด์•ผ ํ•˜๋ฏ€๋กœ ๋ชจ๋“  CompletableFuture์˜ ๋™์ž‘์ด ์™„๋ฃŒ๋˜๊ณ  ๊ฒฐ๊ด„๋ฅด ์ถ”์ถœํ•œ ๋‹ค์Œ ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ฐ˜ํ™˜ํ•ด์•ผ ํ•œ๋‹ค.

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
      .map(shop -> CompletableFuture.suppltAsync(
        () -> shop.getName() + "price is " + shop.getPrice(product)))
      .collect(toList());
      
  return priceFutures.stream()
    .map(CompletableFuture::join) //๋ชจ๋“  ๋น„๋™๊ธฐ ๋™์ž‘์ด ๋๋‚˜๊ธธ ๋Œ€๊ธฐ
    .collect(toList());
}

๋‘ map ์—ฐ์‚ฐ์„ ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์ด ์•„๋‹Œ, ๋‘ ๊ฐœ์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋‹ค๋Š” ์‚ฌ์‹ค์— ์ฃผ๋ชฉํ•˜์ž.

์ŠคํŠธ๋ฆผ ์—ฐ์‚ฐ์€ ๊ฒŒ์œผ๋ฅธ ํŠน์„ฑ์ด ์žˆ์œผ๋ฏ€๋กœ ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋‹ค๋ฉด ๋ชจ๋“  ๊ฐ€๊ฒฉ ์ •๋ณด ์š”์ฒญ ๋™์ž‘์ด ๋™๊ธฐ์ , ์ˆœ์ฐจ์ ์œผ๋กœ ์ด๋ฃจ์–ด์ง€๊ฒŒ ๋œ๋‹ค.

CompletableFuture๋กœ ๊ฐ ์ƒ์ ์˜ ์ •๋ณด๋ฅผ ์š”์ฒญํ•  ๋•Œ ๊ธฐ์กด ์š”์ฒญ ์ž‘์—…์ด ์™„๋ฃŒ๋˜์–ด์•ผ join์ด ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด์„œ ๋‹ค์Œ ์ƒ์ ์œผ๋กœ ์ •๋ณด๋ฅผ ์š”์ฒญํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

  • ์ˆœ์ฐจ shop1.getPrice() โ†’ future1.join() โ†’ shop2.getPrice() โ†’ future2.join() โ†’ shop3.getPrice() โ†’ future3.join()

  • ๋ณ‘๋ ฌ shop1.getPrice() โ†’ future1.join() โ†“ โ†“ shop2.getPrice() โ†’ future2.join() โ†“ โ†“ shop3.getPrice() โ†’ future3.join()

ํ•˜์ง€๋งŒ CompletableFuture๋ฅผ ์‚ฌ์šฉํ•œ ๊ฒฐ๊ณผ๋Š” ์ˆœ์ฐจ ๋ฐฉ์‹๋ณด๋‹จ ๋น ๋ฅด์ง€๋งŒ ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ๋ณด๋‹จ ๋А๋ฆฌ๋‹ค. ์–ด๋–ป๊ฒŒ ๊ฐœ์„ ํ•  ์ˆ˜ ์žˆ์„๊นŒ?

16.3.3 ๋” ํ™•์žฅ์„ฑ์ด ์ข‹์€ ํ•ด๊ฒฐ๋ฐฉ๋ฒ•

๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ ๋ฒ„์ „์—์„œ๋Š” 4๊ฐœ์˜ ์Šค๋ ˆ๋“œ์— 4๊ฐœ์˜ ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ์ˆ˜ํ–‰ํ•˜๋ฉด์„œ ๊ฒ€์ƒ‰ ์‹œ๊ฐ„์„ ์ตœ์†Œํ™”ํ–ˆ๋‹ค.

ํ•˜์ง€๋งŒ ์ž‘์—…์ด 5๊ฐœ๊ฐ€ ๋œ๋‹ค๋ฉด, 4๊ฐœ ์ค‘ ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๊ฐ€ ์™„๋ฃŒ๋œ ํ›„์— ์ถ”๊ฐ€๋กœ 5๋ฒˆ์งธ ์งˆ์˜์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

CompletableFuture๋Š” ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ์— ๋น„ํ•ด ์ž‘์—…์— ์ด์šฉํ•  ์ˆ˜ ์žˆ๋Š” Executor๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์žฅ์ ์ด ์žˆ๋‹ค.

16.3.4 ์ปค์Šคํ…€ Executor ์‚ฌ์šฉํ•˜๊ธฐ

์‹ค์ œ๋กœ ํ•„์š”ํ•œ ์ž‘์—…๋Ÿ‰์„ ๊ณ ๋ คํ•œ ํ’€์—์„œ ๊ด€๋ฆฌํ•˜๋Š” ์Šค๋ ˆ๋“œ ์ˆ˜์— ๋งž๊ฒŒ Executor๋ฅผ ๋งŒ๋“ค๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค.

ํ’€์—์„œ ๊ด€๋ฆฌํ•˜๋Š” ์Šค๋ ˆ๋“œ ์ˆ˜๋Š” ์–ด๋–ป๊ฒŒ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ์„๊นŒ?

์Šค๋ ˆ๋“œ ํ’€ ํฌ๊ธฐ์กฐ์ ˆ N_thread_ = N_cpu_ * U_cpu_ * (1 + W/C) - N_cpu_ : Runtime.getRuntime().availableProcessors()๊ฐ€ ๋ฐ˜ํ™˜ํ•˜๋Š” ์ฝ”์–ด ์ˆ˜ - U_cpu_ : 0๊ณผ 1 ์‚ฌ์ด์˜ ๊ฐ’์„ ๊ฐ–๋Š” CPU ํ™œ์šฉ ๋น„์œจ - W/C : ๋Œ€๊ธฐ์‹œ๊ฐ„๊ณผ ๊ณ„์‚ฐ์‹œ๊ฐ„์˜ ๋น„์œจ

ํ•œ ์ƒ์ ์— ํ•˜๋‚˜์˜ ์Šค๋ ˆ๋“œ๊ฐ€ ํ• ๋‹น๋  ์ˆ˜ ์žˆ๋„๋ก, ์ƒ์  ์ˆ˜๋งŒํผ Executor๋ฅผ ์„ค์ •ํ•œ๋‹ค.

์„œ๋ฒ„ ํฌ๋ž˜์‹œ ๋ฐฉ์ง€๋ฅผ ์œ„ํ•ด ํ•˜๋‚˜์˜ Executor์—์„œ ์‚ฌ์šฉํ•  ์Šค๋ ˆ๋“œ์˜ ์ตœ๋Œ€ ๊ฐœ์ˆ˜๋Š” 100 ์ดํ•˜๋กœ ์„ค์ •ํ•œ๋‹ค.

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), //์ƒ์  ์ˆ˜๋งŒํผ์˜ ์Šค๋ ˆ๋“œ๋ฅผ ๊ฐ–๋Š” ํ’€ ์ƒ์„ฑ(0~100 ์‚ฌ์ด)
    new ThreadFactory() {
  public Thread new Thread(Runnable r) {
    Thread t = new Thread(r);
    t.setDeamon(true);
    return t;
  }
});

๋ฐ๋ชฌ ์Šค๋ ˆ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ž๋ฐ” ํ”„๋กœ๊ทธ๋žจ์ด ์ข…๋ฃŒ๋  ๋•Œ ๊ฐ•์ œ๋กœ ์Šค๋ ˆ๋“œ ์‹คํ–‰์ด ์ข…๋ฃŒ๋  ์ˆ˜ ์žˆ๋‹ค.

์ŠคํŠธ๋ฆผ ๋ณ‘๋ ฌํ™”์™€ CompletableFuture ๋ณ‘๋ ฌํ™”

  • I/O๊ฐ€ ํฌํ•จ๋˜๋Š” ์•Š์€ ๊ณ„์‚ฐ ์ค‘์‹ฌ์˜ ๋™์ž‘์„ ์‹คํ–‰ํ•  ๋•Œ๋Š” ์ŠคํŠธ๋ฆผ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ๊ฐ€์žฅ ๊ตฌํ˜„ํ•˜๊ธฐ ๊ฐ„๋‹จํ•˜๋ฉฐ ํšจ์œจ์ ์ผ ์ˆ˜ ์žˆ๋‹ค.

  • I/O๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ž‘์—…์„ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ๋•Œ๋Š” CompletableFuture๊ฐ€ ๋” ๋งŽ์€ ์œ ์—ฐ์„ฑ์„ ์ œ๊ณตํ•˜๋ฉฐ, ๋Œ€๊ธฐ/๊ณ„์‚ฐ์˜ ๋น„์œจ์— ์ ํ•ฉํ•œ ์Šค๋ ˆ๋“œ ์ˆ˜๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ŠคํŠธ๋ฆผ์˜ ๊ฒŒ์œผ๋ฅธ ํŠน์„ฑ ๋•Œ๋ฌธ์— ์ŠคํŠธ๋ฆผ์—์„œ I/O๋ฅผ ์‹ค์ œ๋กœ ์–ธ์ œ ์ฒ˜๋ฆฌํ• ์ง€ ์˜ˆ์ธกํ•˜๊ธฐ ์–ด๋ ค์šด ๋ฌธ์ œ๋„ ์žˆ๋‹ค.

16.4 ๋น„๋™๊ธฐ ์ž‘์—… ํŒŒ์ดํ”„๋ผ์ธ ๋งŒ๋“ค๊ธฐ

public class Discount {
  public enum Code {
    NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);
    
    private final int percentage;
    
    Code(int percentage) {
      this.percentage = percentage;
    }
  }
  ...
}

enum์œผ๋กœ ํ• ์ธ์œจ์„ ์ œ๊ณตํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ •์˜ํ•˜์˜€๋‹ค.

๊ทธ๋ฆฌ๊ณ  getPrice ๋ฉ”์„œ๋“œ๋Š” ShopName:price:DiscountCode ํ˜•์‹์˜ ๋ฌธ์ž์—ด์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ์ˆ˜์ •ํ–ˆ๋‹ค.

public String getPrice(String product) {
  double price = calcuatePrice(product);
  Discount.Code code = Discount.Code.values()[
    random.nextInt(Discount.Code.values().length)];
  return String.format("%s:%.f:%s", name, price, code);
}

16.4.1 ํ• ์ธ ์„œ๋น„์Šค ๊ตฌํ˜„

์ƒ์ ์—์„œ ์ œ๊ณตํ•œ ๋ฌธ์ž์—ด ํŒŒ์‹ฑ์€ ๋‹ค์Œ์ฒ˜๋Ÿผ Quote ํด๋ž˜์Šค๋กœ ์บก์Аํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

public class Quote {
  private final String shopName;
  private final double price;
  private final Discount.code discountCode;
  
  public Quote(String shopName, double price, Discount.code code) {
    this.shopName = shopName;
    this.price = price;
    this.discountCode = discountCode;
  }
  
  public static Quote parse(String s) {
    String[] split = s.split(":");
    String shopName = split[0];
    double price = Doule.parseDouble(split[1]);
    Discount.Code discountCode = Discount.Code.valueOf(split[2]);
    return new Quote(shopName, price, discountCode);
  }
  
  public String getShopName() {
    return shopName;
  }
  
  public String getPrice() {
    return price;
  }
  
  public Discount.code getDiscountCode() {
    return discountCode;
  }
  
  
}

์ƒ์ ์—์„œ ์–ป์€ ๋ฌธ์ž์—ด์„ ์ •์  ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ parse๋กœ ๋„˜๊ฒจ์ฃผ๋ฉด ์ƒ์  ์ด๋ฆ„, ํ• ์ธ์ „ ๊ฐ€๊ฒฉ, ํ• ์ธ๋œ ๊ฐ€๊ฒฉ ์ •๋ณด๋ฅผ ํฌํ•จํ•˜๋Š” Quote ํด๋ž˜์Šค ์ธ์Šคํ„ด์Šค๊ฐ€ ์ƒ์„ฑ๋œ๋‹ค.

๋‹ค์Œ์œผ๋กœ Discount ์„œ๋น„์Šค์—์„œ๋Š” Quote ๊ฐ์ฒด๋ฅผ ์ธ์ˆ˜๋กœ ๋ฐ›์•„ ํ• ์ธ๋œ ๊ฐ€๊ฒฉ ๋ฌธ์ž์—ด์„ ๋ฐ˜ํ™˜ํ•˜๋Š” applyDiscount ๋ฉ”์„œ๋“œ๋„ ์ œ๊ณตํ•œ๋‹ค.

public class Discount {
  public enum Code {
    ...
  }
  
  public static String applyDiscount(Quote quote) {
    return quote.getShopName() + " price is " + Discount.apply(
      quote.getPrice(), quote.getDiscountCode());
  }
  
  pivate static double apply(double price, Code code) {
    delay();
    return format(price * (100 - code.percentage) / 100);
  }
}

16.4.2 ํ• ์ธ ์„œ๋น„์Šค ์ด์šฉ

๋จผ์ € ๊ฐ€์žฅ ์‰ฌ์šด ๋ฐฉ๋ฒ•์ธ ์ˆœ์ฐจ์ &๋™๊ธฐ๋ฐฉ์‹์œผ๋กœ findPrice ๋ฉ”์„œ๋“œ๋ฅผ ๊ตฌํ˜„ํ•œ๋‹ค.

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> sho.getPrice(product)) //๊ฐ ์ƒ์ ์—์„œ ํ• ์ธ์ „ ๊ฐ€๊ฒฉ ์–ป๊ธฐ
    .map(Quote::parse) //๋ฐ˜ํ™˜๋œ ๋ฌธ์ž์—ด์„ Quote ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜
    .map(Discount::applyDiscount)  //Quote์— ํ• ์ธ ์ ์šฉ
    .collect(toList());
}

์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•ด๋ณด๋ฉด ์ˆœ์ฐจ์ ์œผ๋กœ ๋‹ค์„ฏ ์ƒ์ ์— ๊ฐ€๊ฒฉ์„ ์š”์ฒญํ•˜๋ฉด์„œ 5์ดˆ๊ฐ€ ์†Œ์š”๋˜๊ณ , ํ• ์ธ์ฝ”๋“œ๋ฅผ ์ ์šฉํ•˜๋ฉด์„œ 5์ดˆ๊ฐ€ ์†Œ์š”๋œ๋‹ค.

์•ž์„œ ํ™•์ธํ•œ ๊ฒƒ์ฒ˜๋Ÿผ ๋ณ‘๋ ฌ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋ฉด ์„ฑ๋Šฅ์„ ๊ฐœ์„ ํ•  ์ˆ˜ ์žˆ๋‹ค. ํ•˜์ง€๋งŒ ์ŠคํŠธ๋ฆผ์ด ์‚ฌ์šฉํ•˜๋Š” ์Šค๋ ˆ๋“œ ํ’€์˜ ํฌ๊ธฐ๊ฐ€ ๊ณ ์ •๋˜์–ด ์žˆ์œผ๋ฏ€๋กœ, ์ƒ์  ์ˆ˜๊ฐ€ ๋Š˜์–ด๋‚˜๊ฒŒ๋˜๋ฉด ์œ ์—ฐํ•˜๊ฒŒ ๋Œ€์‘ํ•  ์ˆ˜ ์—†๋‹ค.

๋”ฐ๋ผ์„œ CompletableFutuer์—์„œ ์ˆ˜ํ–‰ํ•˜๋Š” ํƒœ์Šคํฌ๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋Š” ์ปค์Šคํ…€ Executoer๋ฅผ ์ •์˜ํ•ด์„œ CPU ์‚ฌ์šฉ์„ ๊ทน๋Œ€ํ™”ํ•ด์•ผํ•œ๋‹ค.

16.4.3 ๋™๊ธฐ ์ž‘์—…๊ณผ ๋น„๋™๊ธฐ ์ž‘์—… ์กฐํ•ฉํ•˜๊ธฐ

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures = 
    shops.stream()
      .map(shop -> CompletableFuture.suppltAsync(
        () -> shop.getPrice(product), executor))
      .map(future -> future.thenApply(Quote::parse))
      .map(future -> future.thenCompose(quote ->
        CompletableFuture.supplyAsync(
          () -> Discount.applyDiscount(quote), executor))
      .collect(toList());
      
  return priceFutures.stream()
    .map(CompletableFuture::join)
    .collect(toList());
}
  1. ๊ฐ€๊ฒฉ์ •๋ณด ์–ป๊ธฐ ํŒฉํ† ๋ฆฌ๋ฉ”์„œ๋“œ suuplyAsync์— ๋žŒ๋‹ค ํ‘œํ˜„์‹5์„ ์ „๋‹ฌํ•ด์„œ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ƒ์ ์—์„œ ์ •๋ณด๋ฅผ ์กฐํšŒํ–ˆ๋‹ค. ๋ฐ˜ํ™˜ ๊ฒฐ๊ณผ๋Š” Stream<CompletableFuture<String>>์ด๋‹ค.

  2. Quote ํŒŒ์‹ฑํ•˜๊ธฐ CompletableFuture์˜ thenApply ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์„œ Quote ์ธ์Šคํ„ด์Šค๋กœ ๋ณ€ํ™˜ํ•˜๋Š” Function์œผ๋กœ ์ „๋‹ฌํ•œ๋‹ค. thenApply ๋ฉ”์„œ๋“œ๋Š” CompletableFutur๊ฐ€ ๋๋‚  ๋•Œ๊นŒ์ง€ ๋ธ”๋กํ•˜์ง€ ์•Š๋Š”๋‹ค.

  3. CompletableFutuer๋ฅผ ์กฐํ•ฉํ•ด์„œ ํ• ์ธ๋œ ๊ฐ€๊ฒฉ ๊ณ„์‚ฐํ•˜๊ธฐ ์ด๋ฒˆ์—๋Š” ์›๊ฒฉ ์‹คํ–‰(1์ดˆ์˜ ์ง€์—ฐ์œผ๋กœ ๋Œ€์ฒด)์ด ํฌํ•จ๋˜๋ฏ€๋กœ ์ด์ „ ๋‘ ๋ณ€ํ™˜๊ฐ€ ๋‹ฌ๋ฆฌ ๋™๊ธฐ์ ์œผ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•œ๋‹ค. ๋žŒ๋‹ค ํ‘œํ˜„์‹์œผ๋กœ ์ด ๋™์ž‘์„ supplyAsync์— ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋Ÿฌ๋ฉด ๋‹ค๋ฅธ CompletableFutuer๊ฐ€ ๋ฐ˜ํ™˜๋œ๋‹ค. ๊ฒฐ๊ตญ ๋‘ ๊ฐ€์ง€ CompletableFuture๋กœ ์ด๋ฃจ์–ด์ง„ ์—ฐ์‡„์ ์œผ๋กœ ์ˆ˜ํ–‰๋˜๋Š” ๋‘ ๊ฐœ์˜ ๋น„๋™๊ธฐ ๋™์ž‘์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. - ์ƒ์ ์—์„œ ๊ฐ€๊ฒฉ ์ •๋ณด๋ฅผ ์–ป์–ด ์™€์„œ Quote๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ - ๋ณ€ํ™˜๋œ Quote๋ฅผ Discount ์„œ๋น„์Šค๋กœ ์ „๋‹ฌํ•ด์„œ ํ• ์ธ๋œ ์ตœ์ข…๊ฐ€๊ฒฉ ํš๋“ํ•˜๊ธฐ thenCompose ๋ฉ”์„œ๋“œ๋กœ ๋‘ ๋น„๋™๊ธฐ ์—ฐ์‚ฐ์„ ํŒŒ์ดํ”„ ๋ผ์ธ์œผ๋กœ ๋งŒ๋“ค์ˆ˜ ์žˆ๋‹ค.

16.4.4 ๋…๋ฆฝ CompletableFuture์™€ ๋น„๋…๋ฆฝ CompletableFuture ํ•ฉ์น˜๊ธฐ

๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰๋œ ๋‘ ๊ฐœ์˜ CompletableFuture ๊ฒฐ๊ณผ๋ฅผ ํ•ฉ์ณ์•ผํ•  ๋•Œ thenCombine ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

thenCombine ๋ฉ”์„œ๋“œ์˜ BiFunction ์ธ์ˆ˜๋Š” ๊ฒฐ๊ณผ๋ฅผ ์–ด๋–ป๊ฒŒ ํ•ฉ์งˆ์ง€ ์ •์˜ํ•œ๋‹ค.

Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) //1๋ฒˆ์งธ ํƒœ์Šคํฌ - ๊ฐ€๊ฒฉ์ •๋ณด ์š”์ฒญ
  .thenCombine(CompletableFuture.suuplyAsync(
      () -> exchangeService.getRate(Money.EUR, Money.USD)), //2๋ฒˆ์งธ ํƒœ์Šคํฌ - ํ™˜์œจ์ •๋ณด ์š”์ฒญ
    (price, rate) -> price * rate)); //๋‘ ๊ฒฐ๊ณผ ํ•ฉ์นจ

๋…๋ฆฝ์ ์ธ ๋‘ ๊ฐœ์˜ ๋น„๋™๊ธฐ ํƒœ์Šคํฌ๋Š” ๊ฐ๊ฐ ์ˆ˜ํ–‰๋˜๊ณ , ๋งˆ์ง€๋ง‰์— ํ•ฉ์ณ์ง„๋‹ค.

16.4.5 Future์˜ ๋ฆฌํ”Œ๋ ‰์…˜๊ณผ CompletableFuture์˜ ๋ฆฌํ”Œ๋ ‰์…˜

CompletableFuture๋Š” ๋žŒ๋‹ค ํ‘œํ˜„์‹์„ ์‚ฌ์šฉํ•ด ๋™๊ธฐ/๋น„๋™๊ธฐ ํƒœ์Šคํฌ๋ฅผ ํ™œ์šฉํ•œ ๋ณต์žกํ•œ ์—ฐ์‚ฐ ์ˆ˜ํ–‰ ๋ฐฉ๋ฒ•์„ ํšจ๊ณผ์ ์œผ๋กœ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋˜ํ•œ ์ฝ”๋“œ ๊ฐ€๋…์„ฑ๋„ ํ–ฅ์ƒ๋œ๋‹ค. ์•ž์„  ์ฝ”๋“œ๋ฅผ ์ž๋ฐ”7๋กœ ๊ตฌํ˜„ํ•˜๋ฉด์„œ ๋น„๊ตํ•ด๋ณด์ž.

ExecutorService executor = Executors.newCachedThreadPool();
final Funtion<Double> futureRate = executor.submit(new Callable<Double>() {
  public Double call() {
    return exchangeService.getRate(Money.EUR, Money.USD);
  }
});

final Funtion<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
  public Double call() {
    double priceInEUR = shop.getPrice(product);
    return priceInEUR * futureRate.get();
  }
});

16.4.6 ํƒ€์ž„์•„์›ƒ ํšจ๊ณผ์ ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ธฐ

Future๊ฐ€ ์ž‘์—…์„ ๋๋‚ด์ง€ ๋ชปํ•  ๊ฒฝ์šฐ TimeoutException์„ ๋ฐœ์ƒ์‹œ์ผœ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค.

Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
  .thenCombine(CompletableFuture.suuplyAsync(
      () -> exchangeService.getRate(Money.EUR, Money.USD)),
    (price, rate) -> price * rate))
  .orTimeout(3, TimeUnit.SECONDS);

compleOnTimeout๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ๋Œ€์‹  ๋ฏธ๋ฆฌ ์ง€์ •๋œ ๊ฐ’์„ ์‚ฌ์šฉํ•˜๋„๋ก ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

Funtion<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
  .thenCombine(CompletableFuture.suuplyAsync(
      () -> exchangeService.getRate(Money.EUR, Money.USD)),
      .completOnTimeout(DEFAULT_RATE, 1, TimeUnit.SECONDS),
    (price, rate) -> price * rate))
  .orTimeout(3, TimeUnit.SECONDS);

16.5 CompletableFuture์˜ ์ข…๋ฃŒ์— ๋Œ€์‘ํ•˜๋Š” ๋ฐฉ๋ฒ•

๊ฐ ์ƒ์ ์—์„œ ๋ฌผ๊ฑด ๊ฐ€๊ฒฉ ์ •๋ณด๋ฅผ ์–ป์–ด์˜ค๋Š” findPrices ๋ฉ”์„œ๋“œ๊ฐ€ ๋ชจ๋‘ 1์ดˆ์”ฉ ์ง€์—ฐ๋˜๋Š” ๋Œ€์‹ , 0.5~2.5์ดˆ์”ฉ ์ž„์˜๋กœ ์ง€์—ฐ๋œ๋‹ค๊ณ  ํ•˜์ž.

๊ทธ๋ฆฌ๊ณ  ๊ฐ ์ƒ์ ์—์„œ ๊ฐ€๊ฒฉ ์ •๋ณด๋ฅผ ์ œ๊ณตํ• ๋•Œ๋งˆ๋‹ค ์ฆ‰์‹œ ๋ณด์—ฌ์ค„ ์ˆ˜ ์žˆ๋Š” ์ตœ์ €๊ฐ€๊ฒฉ ๊ฒ€์ƒ‰ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ๋งŒ๋“ค์–ด๋ณด์ž.

16.5.1 ์ตœ์ €๊ฐ€๊ฒฉ ๊ฒ€์ƒ‰ ์—ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋ฆฌํŒฉํ„ฐ๋ง

public Stream<CompletableFuture<String>> findPriceStream(String product) {
  return shop.stream()
    .map(shop -> CompletableFuture.suppltAsync(
      () -> shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote ->
      CompletableFuture.supplyAsync(
        () -> Discount.applyDiscount(quote), executor)));
}

์ด์ œ findPriceStream ๋ฉ”์„œ๋“œ ๋‚ด๋ถ€์—์„œ ์„ธ ๊ฐ€์ง€ map ์—ฐ์‚ฐ์„ ์ ์šฉํ•˜๊ณ  ๋ฐ˜ํ™˜ํ•˜๋Š” ์ŠคํŠธ๋ฆผ์— ๋„ค ๋ฒˆ์งธ map ์—ฐ์‚ฐ์„ ์ ์šฉํ•˜์ž.

findPriceStream("myPhone").map(f -> f.thenAccept(System.out::println));

ํŒฉํ† ๋ฆฌ ๋ฉ”์„œ๋“œ allOf๋Š” ์ „๋‹ฌ๋œ ๋ชจ๋“  CompletableFuture๊ฐ€ ์™„๋ฃŒ๋œ ํ›„์— CompletableFuture<Void>๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

์ด๋ฅผ ํ†ตํ•ด ๋ชจ๋“  ๊ฒฐ๊ณผ๊ฐ€ ๋ฐ˜ํ™˜๋˜์—ˆ์Œ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

CompletableFuture[] futures = findPriceStream("myPhone")
  .map(f -> f.thenAccept(System.out::println))
  .toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futues).join();

๋งŒ์•ฝ CompletableFuture ์ค‘ ํ•˜๋‚˜๋งŒ ์™„๋ฃŒ๋˜๊ธฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ƒํ™ฉ์ด๋ผ๋ฉด ํŒฉํ† ๋ฆฌ๋ฉ”์„œ๋“œ anyOf๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

Last updated