Documents

Getting Started the Stream API

Stream API

// https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
int sum = widgets.stream()
                 .filter(w -> w.getColor() == RED)
                 .mapToInt(w -> w.getWeight())
                 .sum();

스트림StreamJava 8에서 추가되었고, 컬렉션의 map/reduce 같은 elements의 흐름을 함수형 스타일로 (람다식도 적용하여) 처리할 수 있도록 지원해주는 라이브러리다. 스트림은 컬렉션과 비슷하지만 다음과 같은 차이점이 있다.

  • 스트림은 값을 읽기만 할 뿐 저장하지 않는다.

  • 스트림의 모든 함수는 순수함수pure function이다.

  • 스트림은 게으르게lazy 동작한다.

  • 무한한 스트림이 가능하다.

  • 스트림은 일회성으로 사용과 동시에 소멸된다.

Functional Interface

Java에서 Functional Interface는 하나의 abstract method를 가진 인터페이스로, 한가지 기능만 할 수 있다는 것을 나타낸다. @FunctionalInterfaceFunctional Interface 로 선언되어 있다는 것을 나타내기 위한 어노테이션으로 컴파일 단계에서 확인해준다. 이는 리팩토링할 때 에러를 빠르게 해결할 수 있도록 해준다.

java.util.function 패키지 하위에는 lambda expressionsmethod references 를 위한 Functional Interface 들이 존재한다.

FunctionalInterfaceNamingGuide
Image: Functional Interface Naming Guide
Lambda Expressions

람다 표현식(람다식)을 짧게 설명하면 간단한 방식으로 코드를 작성하고 실행하는 방법이라고 할 수 있다. 함수를 간결하게 표현하고, 프로그래밍 언어 개념으로는 단순한 익명 함수를 생성하는 문법으로 볼 수 있다. 아래는 람다식으로 간결하게 표현한 코드 예제이다.

interface Printer {
    void print(String str);
}
// as-is
Printer p1 = new Printer() {
    @Override
    public void print(String str) {
        System.out.println(str);
    }
};
// to-be
Printer p2 = str -> System.out.println(str);
Printer p3 = System.out::println; // Java 8 method reference

Stream Creation

스트림을 생성하는 방법에는 여러가지가 있으며, 이미 많은 클래스들에 구현되어 있다.

Array to Stream

String[] arr = new String[]{"a", "b", "c"};
// String[] arr = Arrays.asList("a", "b", "c");
Stream<String> stream = Arrays.stream(arr);

Collection to Stream

stream() 디폴트 메서드가 Collection 인터페이스에 추가되어 있어 모든 컬렉션에서 스트림을 생성할 수 있다.

List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream1 = list.stream();
Stream<String> stream2 = list.parallelStream(); // 병렬 처리 스트림

Empty Stream

// Null-Safe
public Stream<String> streamOf(Collection<String> list) {
  returen list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

Stream.builder()

Stream<String> s = Stream.<String>builder()
                         .add("a")
                         .add("b")
                         .add("c")
                         .build();

Stream.generate()

generate() 는 무한 스트림을 생성하기 때문에 최대 크기를 지정해줘야 한다.

Stream.generate(() -> "str").limit(5);

Stream.iterate()

iterate() 는 초깃값과 다음 요소에 반영될 람다식이 들어간다.

Stream<Integer> s = Stream.iterate(10, n -> n + 1).limit(5); // 10, 11, 12, 13, 14

Primitive Type Stream

IntStream s1 = IntStream.range(1, 5); // 1, 2, 3, 4
LongStream s2 = LongStream.rangeClosed(1, 5); // 1, 2, 3, 4, 5
DoubleStream s3 = DoubleStream.of(0, 4, 3); // 0.0, 4.0, 3.0
DoubleStream s4 = new Random().doubles(3)
// boxing
Stream<Integer> s5 = IntStream.range(1, 5).boxed();

원시 스트림primitive stream을 객체 스트림으로 변환하려면 mapToObj() 를 사용한다.

IntStream.rangeClosed(2, 12) // 2 ~ 12
         .mapToObj(month -> new Month(month))
         .collect(Collectors.toList());

String to Stream

IntStream s1 = "abc".chars();
Stream<String> s2 = Pattern.compile("\\|").splitAsStream("a|b|c");

File to Stream

Stream<String> s1 = Files.lines(Paths.get("~/words.txt"), StandardCharsets.UTF_8);
Stream<Path> s2 = Files.list(Paths.get("~/Documents"));

Parallel Stream

스트림 생성 시 stream() 대신 parallelStream() 을 사용하면 된다.

Example
Stream<String> s = list.parallelStream();

boolean isParallel = s.isParallel(); // Check parallel stream

s.sequential(); // Change parallel to sequential stream

s.parallel();  // Change sequential to parallel stream

스레드를 처리하기 위해 Java 7부터 도입된 Fork/Join framework를 내부적으로 사용한다. 스레드의 갯수의 기본값은 Runtime.getRuntime().availableProcessors() - 1 로 'CPU 갯수 - 1'이다(1은 메인 스레드이지 않을까?). 스래드 갯수는 아래와 같이 지정할 수 있다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

Merging Streams

Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);

Stream<Integer> resultingStream = Stream.concat(s1, s2);
Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);
Stream<Integer> s2 = Stream.of(18, 15, 36);

Stream<Integer> resultingStream = Stream.concat(Stream.concat(s1, s2), s3);
// using flatMap()
// Stream<Integer> resultingStream = Stream.of(s1, s2, s3).flatMap(i -> i);

Stream Operations

스트림 연산은 크게 intermediate operations(return Stream<T>) 과 terminal operations(특정 타입의 결과를 반환) 나뉘어있다. 자세한 내용은 Stream 문서에서 확인할 수 있다.

Intermediate Operations

중간 작업, 중개 연산, 중간 연산자

모든 Intermediate Operations 은 pure function으로 구현되어 있으며, Stream<T> 을 반환해 메서드 체이닝(chaining)이 가능하다.

long count = list.stream()
                 .filter(n -> n > 100) (1)
                 .distinct()
                 .count(); (2)
1 filter() , distinct() 는 intermediate operations를 나타낸다.
2 stream의 사이즈를 반환하는 terminal operations이다.

Filtering

filter
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .filter(str -> {
                            return "a".equals(str);
                        });
Stream<String> s2 = list.stream()
                        .filter(str -> "a".equals(str));
Stream<String> s3 = list.stream()
                        .filter("a"::equals); // Java 8 method reference
// a
limit
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .limit(3);
// a b c
skip
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .skip(3);
// A B C
takeWhile

JDK9에서 추가되었다.

Stream.of(2, 4, 6, 8, 9, 10, 12)
    .takeWhile(n -> n % 2 == 0)
    .forEach(System.out::println);
// 2
// 4
// 6
// 8

Mapping

map
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> s1 = list.stream()
                        .map(s -> s.toUpperCase());
Stream<String> s2 = list.stream()
                        .map(String::toUpperCase); // Java 8 method reference
// A B C
flatMap

중첩 구조를 단일 구조로 풀어주는 작업을 한다. 이러한 작업을 flattening 이라고 한다.

Flattening1
Image: Flattening
Flattening2
Example
List<String> list1 = Arrays.asList("a", "b", "c");
List<String> list2 = Arrays.asList("A", "B", "C");

Stream<String> s1 = Stream.of(list1.stream(), list2.stream())
                          .flatMap(strings -> strings);
// a b c A B C
boxed

Primitive Type Stream의 각 Element를 boxing 시켜 Wrapper class 변환시켜준다.

int[] a1 = IntStream.of(3, 2, 1)
                    .toArray();
// [3, 2, 1]

List<Integer> l1= IntStream.of(3, 2, 1)
                           .boxed()
                           .collect(Collectors.toList());
// [3, 2, 1]

Sorting

sorted
int[] a1 = IntStream.of(3, 2, 1)
                    .sorted()
                    .toArray();
// [1, 2, 3]
List<User> list1 = Arrays.asList(new User("c"), new User("b"), new User("a"));

Stream<User> s1 = list1.stream()
                       .sorted(Comparator.comparing(User::getName));
// User(name=a), User(name=b), User(name=c)

Stream<User> s2 = list1.stream()
                       .sorted(Comparator.comparing(User::getName).reversed());
// User(name=c), User(name=b), User(name=a)
distinct

Stream 내에 Element의 중복을 제거한다.

IntStream.of(1, 2, 3, 3)
         .distinct()
         .toArray()
// [1, 2, 3]

Iterating

peek

'peek’는 '훔쳐보다', '살짝 보이다’는 뜻으로, Stream API에서는 intermediate operation 에서 값을 확인할 수 있는 메서드로 사용한다. peek 메서드만으로는 스트림을 소비하지 않는다.

IntStream.of(1, 2, 3, 4)
         .filter(e -> e > 2)
         .peek(e -> System.out.println("Filtered value: " + e))
         .map(e -> e * e)
         .peek(e -> System.out.println("Mapped value: " + e))
         .sum();
// Filtered value: 3
// Mapped value: 9
// Filtered value: 4
// Mapped value: 16
peek vs fxjs/tap

peek은 한개씩 볼 수 있는 반면에 fxjs의 tap은 전체를 볼 수 있다.

fx.go(
  [10, 20, 30],
  fx.tap(
    a => a,
    log), // [10, 20, 30]
  a => a,
  log); // [10, 20, 30]

Terminal Operations

최종 연산

스트림을 가지고 결과값을 만들어내는 연산이다. 스트림은 평가되기 전까지 실행되지 않는데(lazy), terminal operation이 스트림을 평가하는 역할이다.

Collecting

collect

아마 가장 많이 사용하는 Terminal Operation은 collect() 일 것 같다. 이 메서드와 Collectors 클래스의 static 메서드를 활용하여 스트림을 여러 결과로 반환할 수 있다. Collectors 는 컬렉션의 요소element를 다양한 기준에 따라 summarizing 하거나 모으는 등 유용한 리덕션 연산을 static 메서드로 구현해둔 클래스이다.

일부 메서드에는 Java 8 이전의 코드스타일과 스트림을 사용한 코드와 비교하기 위해 Before 코드와 After 코드를 추가하였다.

Collectors.toList()

결과를 List 로 변환해주는 메서드이다. 내 경험상 Terminal Operation 내에서 가장 자주 사용하는 메서드인 것 같다.

// as-is
final List<String> list = Arrays.asList("a", "b", "c");

// to-be
// [A, B, C]
Before
List<String> upperCaseList = new ArrayList<>();
for (String str : list) {
    upperCaseList.add(str.toUpperCase());
}
After
list.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList())
Collectors.toMap()

List<T> 에서 특정 값을 key로 사용하여 Map 으로 변환하는 메서드이다.

// as-is
final List<Payment> payments = Arrays.asList(
    new Payment("p01", 10_000),
    new Payment("p02", 20_000),
    new Payment("p03", 50_100));

// to-be
// {p01=10000, p03=50100, p02=20000}
Before
Map<String, Payment> map = new HashMap<>();

for (Payment payment : payments) {
    map.put(payment.getId(), payment);
}
After
final Map<String, Payment> map = payments.stream()
    .collect(Collectors.toMap(Payment::getId, payment -> payment));
Collectors.joining()
Stream.of("a", "b", "c")
    .map(String::toUpperCase)
    .collect(Collectors.joining(", "))
// A, B, C

Stream.of("a", "b", "c")
    .map(String::toUpperCase)
    .collect(Collectors.joining(", ", "<", ">"))
// <A, B, C>
Collectors.groupingBy()
Map<Integer, List<User>> u1 = Stream.of(
    User.builder().name("a").age(29).build(),
    User.builder().name("b").age(19).build(),
    User.builder().name("c").age(29).build(),
    User.builder().name("d").age(19).build(),
    User.builder().name("e").age(39).build()
).collect(Collectors.groupingBy(User::getAge));
{
  19=[User(name=b, age=19), User(name=d, age=19)],
  39=[User(name=e, age=39)],
  29=[User(name=a, age=29), User(name=c, age=29)]
}
groupingBy 로 chunk 구현하기
TODO

jdk9 takeWhile 활용해서 작성해보기. 아래 chunk 구현체들은 지연동작하지 않는다. 이미 평가가 된 상태로 반환.

@Test
public void fp() {
    final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

    System.out.println(chunk(list, 4));
    // [[1, 2, 3, 4], [5, 6, 7, 8], [9]]

    chunkStream(list, 4)
        .limit(1)
        .forEach(System.out::println);
    // [1, 2, 3, 4]
}
chunk(), chunkStream()
// https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/
private Collection<? extends List<?>> chunk(List<?> list, int size) {
    final AtomicInteger counter = new AtomicInteger();

    return list.stream()
        .collect(Collectors.groupingBy(
            it -> counter.getAndIncrement() / size))
        .values();
}

private Stream<?> chunkStream(List<?> list, int size) {
    return chunk.stream();
}

References

Collectors.partitioningBy()
Map<Boolean, List<Integer>> result;
result = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
               .collect(Collectors.partitioningBy(n -> n > 3));
// {false=[1, 2, 3], true=[4, 5, 6, 7, 8, 9, 10]}
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .collect(Collectors.partitioningBy(n -> n > 3, Collectors.counting()));
// {false=3, true=7}
Collectors.collectingAndThen()

collect 한 이후에 필요한 작업을 추가한다.

Set<Product> unmodifiableSet = productList.stream()
                                          .collect(Collectors.collectingAndThen(
                                              Collectors.toSet(),
                                              Collections::unmodifiableSet));
Collectors.averageingInt()
// average
Integer averageAge = Stream.of(user1, user2, user2)
                           .collect(Collectors.averageingInt(User::getAge));
Collectors.summingInt()
// 1. sum
Integer sumAge1 = Stream.of(user1, user2, user2)
                        .collect(Collectors.summingInt(User::getAge));
// 2. mapToInt를 활용해 sum 구하는 방법
Integer sumAge2 = Stream.of(user1, user2, user2)
                        .mapToInt(User::getAge)
                        .sum();
Collectors.summarizingInt()
Stream<BigDecimal> s = Stream.iterate(BigDecimal.ONE, n -> n.add(BigDecimal.ONE))
                             .limit(10);
IntSummaryStatistics i = s.collect(Collectors.summarizingInt(BigDecimal::intValue));
// IntSummaryStatistics{count=10, sum=55, min=1, average=5.500000, max=10}
Collectors.of()

collector를 직접 만들어서 사용하고자 할 경우 of() 를 활용할 수 있다.

List<Integer> c = Arrays.asList(1, 2, 3, 4, 5, 6, 7).stream()
                        .collect(Collector.of(
                            ArrayList::new,
                            List::add,
                            (left, right) -> {
                                left.addAll(right);
                                return left;
                            }));
// [1, 2, 3, 4, 5, 6, 7]
toArray
int[] a1 = Stream.of(1, 2, 3)
                 .toArray();
String[] a2 = Arrays.stream("a", "b", "c")
                    .map(String::toUpperCase)
                    .toArray(String[]::new);

Reduction

reduce는 캐터모피즘catamorphism 이라는 목록 조작 개념의 특별한 변형으로, 컬렉션을 줄여나가는 방법이다. 스트림 API에서는 reduce() 로 리듀싱을 제공한다.

Example
Stream.of(1, 2, 3).reduce(0, (x, y) -> x + y); // 6
Stream.of(1, 2, 3).reduce(0, Integer::sum);    // 6
int sum = Stream.of(1, 2, 3)
                .reduce((l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r);
                }).get();
System.out.println("sum:" + sum);
Output
l=1, r= 2
l=3, r= 3
sum: 6
int sum = Stream.of(1, 2, 3)
                .reduce(10, (l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r);
                });
System.out.println("sum:" + sum);
Output
l=10, r= 1
l=11, r= 2
l=13, r= 3
sum: 16
collect vs reduce

collect 는 변경 가능한(mutable) 결과 객체를 사용하여 동작한다. 즉, 내부적으로 mutable한 collection이 하나 생성되고 각 요소를 collection에 축적해나가며 동작한다. 그에 반해, reduce 는 immutable한 결과 객체(누산기accumulator)를 사용한다.

int sum = Stream.of(1, 2, 3)
                .reduce((l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r); (1)
                }).get();
System.out.println("sum: " + sum);
// l=1, r= 2
// l=3, r= 3
// sum: 6

String result = Stream.of("1", "2", "3")
                      .collect(
                          StringBuilder::new, // supplier
                          (sb, s) -> { // accumulator
                              System.out.println("sb:" + sb + ", s: " + s);
                              sb.append(" ").append(s); (2)
                          },
                          (r1, r2) -> { // combiner
                              System.out.println(r1 + ", " + r2);
                              r1.append(",").append(r2.toString());
                          })
                      .toString();
System.out.println("result: " + result);
// sb:, s: 1
// sb: 1, s: 2
// sb: 1 2, s: 3
// Result:  1 2 3
1 reduce 에서는 새로운 immutable 객체를 반환한다.
2 collect 에서는 새로운 값을 반환하지 않고, 값을 변경 시켜준다.

만약 int, double 같은 immutable한 값을 다룬다면 reduce 를, mutable한 데이터를 다룬다면 collect 를 사용하면 된다.

Matching

Predicate 를 받아 해당 조건을 만족하는지 체크한 결과를 반환한다.

IntStream.of(1, 1, 1).allMatch(a -> a == 1);  // true
IntStream.of(1, 2, 3).anyMatch(a -> a == 1);  // true
IntStream.of(1, 2, 3).noneMatch(a -> a == 4); // true
  • anyMatch : 하나라도 만족하는지

  • allMatch : 모두 만족하는지

  • noneMatch : 모두 만족하지 않는지

Iterating

forEach
// stream
Arrays.asList(1, 2, 3)
      .stream()
      .forEach(System.out::println);
1
2
3
Stream.forEach vs Collection.forEach

두 코드의 결과는 동일하다. 하지만 두 코드의 속도만 비교한다면 Collection.forEach() 가 더 빠르게 동작한다. 그러므로 단순히 stream().forEach() 만 사용할 것이라면 Collection.forEach 를 사용할 것을 권장한다. IntelliJ IDEA에서는 stream().forEach() 코드를 검사해준다.

// iterable
Arrays.asList(1, 2, 3).stream().forEach(System.out::println); (1)
Arrays.asList(1, 2, 3).forEach(System.out::println); (2)
1 for-each Loop of Stream
2 for-each of Iterable

Calculating

IntStream.of(1, 2, 3).count()   // 3
IntStream.of(1, 2, 3).sum()     // 6
IntStream.of(1, 2, 3).min()     // OptionalInt[1]
IntStream.of(1, 2, 3).max()     // OptionalInt[3]
IntStream.of(1, 2, 3).average() // OptionalDouble[2.0]
IntSummaryStatistics s = IntStream.of(1, 2, 3).summaryStatistics();
// {count=3, sum=6, min=1, average=2.000000, max=3}

s.getCount();   // 3
s.getMin();     // 1
s.getMax();     // 3
s.getSum();     // 6
s.getAverage(); // 2.0

Concurrency

Asynchronous Execution

Future (java 5) → ListenableFuture (spring 3.0) → CompletableFuture (java 8)

class Future
class ListenableFuture
class CompletableFuture

Future <|-- CompletableFuture
Future <|-- ListenableFuture
HTTP Client

RestTemplate (spring 3.0) → AsyncRestTemplate (spring 4.0) → WebClient (spring 5.0)

Future

Future는 Java 5에 추가된 클래스로, 비동기 연산 결과를 나타낸다. 작업이 완료되었는지 확인하고(isDone()), 완료를 기다리며, 계산 결과를 확인하는(get()) 메서드와 작업을 취소(cancel())하는 메서드가 제공된다. 일단 작업이 완료되면 취소할 수 없으며, 결과는 작업이 완료되었을 때만 메서드를 통해 확인할 수 있고, 필요한 경우 준비가 될 때까지 blocking 한다.

Example
@Test
void futureTest() throws InterruptedException, ExecutionException {
    System.out.println("Thread#" + Thread.currentThread().getId());

    ExecutorService executor = Executors.newFixedThreadPool(1); (1)
    Future<Integer> future = executor.submit(() -> {
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Thread#" + Thread.currentThread().getId());
        return 123;
    });

    System.out.println("future done? " + future.isDone());

    Integer result = future.get(); (2)

    System.out.println("future done? " + future.isDone());
    System.out.println("result:" + result);
}
1 Java5 에서 멀티스레드와 콜백 사용하기 위해 ExecutorService 사용
2 get() 은 blocking 메서드이다.
Output
Thread#1
future done? false
Thread#12
future done? true
result:123

ListenableFuture

Spring Framework 4.0 에서 추가된 ListenableFuture는 callback을 적용한 Future 의 확장 클래스이다. AsyncRestTemplate 의 리턴타입이기도 하다. JQuery에서 Promise 사용하기 전 Ajax 호출하는 것과 비슷한것 같다.

Example
@Test
void async() throws InterruptedException {
    AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();

    ListenableFuture<ResponseEntity<Map>> entity =
        asyncRestTemplate.getForEntity("https://httpbin.org/get", Map.class);

    entity.addCallback(
        result -> System.out.println(result.getStatusCode()), (1)
        err -> System.out.println(Arrays.toString(err.getStackTrace()))); (2)

    System.out.println("test1");
    TimeUnit.SECONDS.sleep(5); (3)
    System.out.println("test2");
}
1 성공시 실행할 callback 함수(SuccessCallback 인스턴스)
2 실패시 실행할 callback 함수(FailureCallback 인스턴스)
3 sleep() 을 주지않으면 메인 스레드가 바로 종료되면서 HTTP 응답값을 확인 할 수 없다.
Output
DEBUG: Created asynchronous GET request for "https://httpbin.org/get" ...
DEBUG: Setting request Accept header to [application/json, application/*+json]
test1 (1)
DEBUG: Async GET request for "https://httpbin.org/get" resulted in 200 (OK)
DEBUG: Reading [interface java.util.Map] as "application/json" using [o.s.h.c.j.MappingJackson2HttpMessageConverter]
200 (2)
test2 (3)
1 코드를 실행시키면 바로 출력
2 HTTP 응답을 받을 때 바로 출력(sleep와 무관하게 동작)
3 "test1" 출력하고 5초 이후에 출력

Java 8 lambda expression을 활용함으로써 코드가 간결해진다.

lambda expression을 사용안할 경우
entity.addCallback(new SuccessCallback<ResponseEntity<Map>>() {
    @Override
    public void onSuccess(ResponseEntity<Map> result) {
        System.out.println(result.getStatusCode());
        System.out.println(result.getBody());
    }
}, new FailureCallback() {
    @Override
    public void onFailure(Throwable err) {
        System.out.println(Arrays.toString(err.getStackTrace()))
    }
});

CompletableFuture

Java 8에 추가된 CompletableFutureFuture 뿐만 아니라 CompletionStage를 함께 확장한 클래스이다. CompletableFuture 의 개념은 다른 프로그래밍 언어에서는 Deferred 또는 Promise 라고 부른다.

CompletionStage 는 다른 CompletionStage 가 완료될 때 작업을 수행하거나 계산하기 위한 비동기 작업의 stage이다. stage는 작업이 종료될 때 완료되지만, 종속적인 다른 stage를 트리거 할 수도 있다.

Example
public Future<String> getStringAsync() throws InterruptedException {
    CompletableFuture<String> cf = new CompletableFuture<>();
    Executors.newCachedThreadPool().submit(() -> {
        TimeUnit.SECONDS.sleep(1);
        cf.complete("Hello");
        return null;
    });
    return cf;
}

@Test
public void completableFutureExample() throws InterruptedException {
    System.out.println("Thread#" + Thread.currentThread().getId() + ": 1");

    CompletableFuture<String> cf = (CompletableFuture)getAsync();
    cf.thenAccept(s -> { (1)
        System.out.println("Thread#" + Thread.currentThread().getId() + ": 2");
        System.out.println(s);
    });

    System.out.println("Thread#" + Thread.currentThread().getId() + ": 3");
    TimeUnit.SECONDS.sleep(3);

    System.out.println("Thread#" + Thread.currentThread().getId() + ": 4");
}
// Output:
//   Thread#1: 1
//   Thread#1: 3 (2)
//   Thread#12: 2 (3)
//   Hello
//   Thread#1: 4 (4)
1 thenAccept 에는 Callable 의 return 값이 아니라 CompletableFuture 인스턴스의 complete 로부터 전달받은 인자가 파라미터로 들어온다.
2 응용 프로그램이 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.
3 메인 스레드는 3초 기다리고 있는 중이지만 cf의 값이 반환되면서 "2"가 출력된다.
4 메인 스레드에서 3초가 지나고나서 "4"가 출력되면서 프로그램이 종료된다.

supplyAsync

CompletableFuture 를 생성하는 팩토리 메서드로 Supplier를 인자로 받는 supplyAsync() 가 있다. 비동기적으로 실행해서 결과를 생성하며 람다식을 이용하여 쉽게 구성하고 조합할 수 있다.

Example
@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1); (1)
        print.accept("2");
        return "Hello World";
    });

    print.accept("3"); (2)
    print.accept(future.get()); (3)
    print.accept("4");
}
// Output:
//   Thread#1: 1
//   Thread#1: 3
//   Thread#12: 2 (4)
//   Thread#12: Hello World (5)
//   Thread#1: 4
1 별도의 스레드에서 1초를 기다린다.
2 응용 프로그램을 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.
3 future 의 값을 확인하기 위해 blocking 된 상태로 기다린다.
4 1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.
5 blocking 되어 있던 get 이 동작하면서 "Hello World"를 출력하고 나머지 작업을 진행한다.

thenApply

thenApplyFunction을 인자로 받으며, 비동기 연산 결과에 추가 작업을 한 뒤 반환하고 싶을 때 사용된다.

Example
private static Function<String, String> appendName = (String name) -> "Hello " + name + "!";

@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1); (1)
        print.accept("2");
        return "Jun";
    }).thenApply(appendName);

    print.accept("3"); (2)
    print.accept(future.get()); (3)
    print.accept("4");
    sleep(3);
}
// Output:
//   Thread#1: 1
//   Thread#1: 3
//   Thread#12: 2 (4)
//   Thread#1: Hello Jun! (5)
//   Thread#1: 4
1 별도의 스레드에서 1초를 기다린다.
2 응용 프로그램을 실행하면 메인 스레드에서 바로 "1", "3"이 출력된다.
3 future 의 값을 확인하기 위해 blocking 된 상태로 기다린다.
4 1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.
5 blocking 되어 있던 getappendName 이후의 결과를 출력한다.

thenCompose

thenCompose는 여러 CompletableFuture 를 조립해서 순차적으로 실행하기 위한 메서드이다. CompletableFuture<U> 를 반환하기 때문에 chaining이 가능하다.

Example
private static Consumer<String> print = (String str) ->
    System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);

@Test
public void completableFutureExample() throws ExecutionException, InterruptedException {
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("1");
        return "Hello";
    }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("2");
        return s + " Beautiful";
    })).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("3");
        return s + " World";
    })).thenAccept(str -> {
        sleep(2);
        print.accept("4");
        System.out.println(str);
    });

    print.accept("5");
    System.out.println(cf.isDone());
    System.out.println(cf.get());

    sleep(10);
}
Output
Thread#1: 5
false (1)
Thread#12: 1 (2)
Thread#13: 2 (3)
Thread#13: 3 (4)
Thread#13: 4 (5)
Hello Beautiful World
null (6)
1 isDone() 의 결과로 아직 완료되지 않아 false가 출력된다.
2 2초 뒤 "1"이 출력되고, "Hello"과 함께 CompletableFuture 값을 반환한다.
3 2초 뒤 "2"이 출력되고, 앞에서 전달받은 "Hello"에 "Beautiful"을 조합한 CompletableFuture 값을 반환한다.
4 2초 뒤 "3"이 출력된고, 앞에서 전달받은 "Hello Beautiful"에 "World"을 조합한 CompletableFuture 값을 반환한다.
5 thenAccept() 에서 2초 뒤 "4"와 함께 "Hello Beautiful World"가 출력된다.
6 cf 의 작업이 완료되었으므로 cf.get() 가 동작한다. 마지막에 호출된 thenAcceptConsumer 를 파라미터로 받는데, 이것은 반환값이 없으므로 null 을 출력한다.

exceptionally

exceptionallyCompletableFuture 작업 내에서 발생하는 Exception을 처리하기 위한 메서드이다. ListenableFuture 에서 FailCallbackonFailure 를 대신할 수 있다. ListenableFuture 는 요청별로 실패 로직을 설정하는 반면 exceptionally 는 모든 Exception을 통합적으로 처리할 수 있다.

@Test
public void exceptionallyExample1() throws InterruptedException, ExecutionException {
    final String result = CompletableFuture.supplyAsync(() -> {
        System.out.println("1");
        return "1";
    }).thenApply(s -> {
        System.out.println("2");
        throw new RuntimeException();
    }).thenApply(s -> {
        System.out.println("3");
        return s + "3";
    }).exceptionally(throwable -> {
        System.out.println("4");
        log.error("catch error", throwable);
        return "exception!!!";
    }).get();

    System.out.println(result);
}
Output
1
2
4
21:11:10.349 [main] ERROR com.example.demo.LocalTest - catch error
java.util.concurrent.CompletionException: java.lang.RuntimeException
  at java.base/java.u.c.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
  at java.base/java.u.c.CompletableFuture.uniApplyNow(CompletableFuture.java:683)
  at java.base/java.u.c.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
  at java.base/java.u.c.CompletableFuture.thenApply(CompletableFuture.java:2094)
  ...
exception!!!

allOf

allOf는 여러 CompletableFuture 의 작업들이 모두 완료되었는지 확인하고자 할 때 사용한다.

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

확인할 CompletableFuture 들을 가변인자varargs 를 받고 CompletableFuture<Void> 를 반환한다. 반환값의 get() 혹은 join() 을 통해 모든 작업이 완료되기를 기다릴 수 있다.

allOf 의 반대로는, 여러 CompletableFuture 중 하나라도 완료되었는지 확인할 수 있는 anyOf 가 있다.

CompletableFuture vs Promise
Java JavaScript

CompletableFuture.allOf

Promise.all

CompletableFuture.anyOf

Promise.race

Example
private static Consumer<String> print = (String str) ->
    System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);

@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        sleep(1);
        print.accept("2");
        return "Hello";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
    CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");

    print.accept("3");

    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(f1, f2, f3);
    print.accept("4");
    combinedFuture.get();
    print.accept("5");

    TimeUnit.SECONDS.sleep(3);
    print.accept("6");
}
Output
Thread#1: 1
Thread#1: 3 (1)
Thread#1: 4
Thread#12: 2 (2)
Thread#1: 5
Thread#1: 6 (3)
1 실행되면 메인 스레드에서 "1", "3", "4" 가 출력된다.
2 1초 뒤 "2"가 출력되면서 combinedFuture.get() 아래의 "5"도 출력된다.
3 "4"가 출력되고 3초 뒤 "5"가 출력되면서 프포그램이 종료된다.

join

joinCompletableFuture 의 작업이 완료되면 결과를 반환하거나, 예외 발생시 (unchecked)예외를 던지는 메서드이다.

Example
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");

String str = Stream.of(f1, f2, f3)
                // .map(f -> { return f.join(); })
                // .map(f -> f.join())
                   .map(CompletableFuture::join)
                   .collect(Collectors.joining(" "));

System.out.println(str); // Hello Beautiful World
get vs join

CompletableFuturegetjoin 은 작업이 완료되기를 기다리고 값을 반환하는 작업을 한다. 동일한 동작을 하지만 다음과 같은 차이점이 있다.

  • getFuture 의 메서드를 구현한 것이고, joinCompletableFuture 의 메서드이다.

  • get 은 checked exception을 던지고, join 은 unchecked exception을 던지지 않는다.

    V get() throws InterruptedException, ExecutionException { ... }
    public T join() { ... }

따라서, get 은 명시적으로 try-catch를 해줘야하는 반면 join 은 에러를 밖으로 던질 수 있게된다. 즉, 여러 CompletableFuture 를 다룰 때 joinexceptionally 를 통해 통합적인 에러 핸들링이 가능해진다.

References

RestTemplate with @Async

주로 배치성 프로젝트에서 컨트롤러에 비동기적으로 동작시키기 위해 @Async 을 사용했었다. 이 때, 대부분(내가 본 프로젝트) void 를 반환하였다. 하지만 Async 문서를 보면 void 혹은 Future 를 반환시킬 수 있다. 즉, CompletableFuture 를 활용할 수 있게 된다.

메서드에 @Async 사용시 반드시 public 접근자이어야 한다.

However, the return type is constrained to either void or Future.
— Spring Framework, Annotation Type Async
ReceiptService.java
@Service
public class ReceiptService {
    @Async
    public CompletableFuture<Receipt> getAsync(@NotNull String paymentId) {
        final Receipt receipt = restTemplate.getForObject(url, Receipt.class);

        return CompletableFuture.completableFuture(receipt);
    }
}
ReceiptController.java
@RestController
public class ReceiptController {
    @GetMapping("/")
    public List<Receipt> getReceipts() {
        CompletableFuture<Receipt> receipt1 = receiptService.getAsync(id);
        ...
        CompletableFuture.allOf(receipt1, ...);

        return ...;
    }
}

TODO: 실제 동작하는 코드인지 테스트해보고 수정할 것

AsyncRestTemplate

AsyncRestTemplate은 Spring Framework 4.0 이후부터 제공되는 비동기 HTTP 요청을 위한 클래스이다. RestTemplate 와 유사하지만 ListenableFuture wrapper를 반환한다.

Spring Framework 5.0 부터 AsyncRestTemplate 클래스는 deprecated 되었고 WebClient 사용을 권장한다. RestTemplate 또한 앞으로 deprecated 될 예정이고 새로운 기능이 추가되지 않는다고 말한다.

Convert ListenableFuture to CompletableFuture
// https://jongmin92.github.io/2019/05/05/Java/java-async-4/
private <T> CompletableFuture<T> convert(ListenableFuture<T> lf) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    lf.addCallback(cf::complete, cf::completeExceptionally);
    return cf;
}

WebClient

Spring Framework 5.0에서 추가된 WebClient는 기존의 AsyncRestTemplate 역할을 한다. WebClient 는 Non-Block I/O 기반에 ThreadSafe 하다.

TODO: Reactive programming, Flux, Mono 등 추가적인 학습이 더 필요할 것 같다.

private static final String HOST = "http://fakerestapi.azurewebsites.net/api";

@Test
public void webClientExample() {
    WebClient client = WebClient.create(HOST);

    WebClient.RequestBodySpec req1 = client
        .method(HttpMethod.GET)
        .uri("/Users/1")
        .contentType(MediaType.APPLICATION_JSON_UTF8)
        .accept(MediaType.APPLICATION_JSON_UTF8);
    WebClient.RequestBodySpec req2 = client
        .method(HttpMethod.GET)
        .uri("/Users/2");

    User user1 = req1.exchange().block().bodyToMono(User.class).block();
    User user2 = req2.exchange().block().bodyToMono(User.class).block();

    System.out.println(user1);
    System.out.println(user2);
}
Output
User(id=1, name=User 1, password=Password1)
User(id=2, name=User 2, password=Password2)

Examples

여러 메서드들을 조합하여 몇가지 예제를 만들어 보고자 한다. Mock 데이터를 위해 Fake Rest APISlowwly를 사용할 것이며, 모든 예제 코드에는 아래와 같은 User 클래스가 선언되어 있다고 가정한다.

User class
@Data
class User {
    @JsonProperty("ID")
    private String id;
    @JsonProperty("UserName")
    private String name;
    @JsonProperty("Password")
    private String password;
}

Example 1: AsyncRestTemplate vs RestTemplate

다음 예제는 AsyncRestTemplateRestTemplate 의 속도를 비교해보려 한다.

asyncRestTemplate , restTemplate , restTemplateWithParallel 메서드로 나눠져 있으며, 모든 메서드가 ID가 1 부터 9까지의 사용자를 조회하고 응답값을 반환하는 작업을 한다. 모든 HTTP API에는 2초간 딜레이가 존재한다.

결과는 asyncRestTemplate 이 가장 빠르게 동작했다. 모든 요청을 한번에 조회하고 CompletableFuture::join 에서 모든 응답을 모아 작업이 진행되었다. restTemplate 은 예상대로 한번 요청하고 응답 받고 하는 식으로 진행되었고, 가장 느린 결과를 보였다. restTemplateWithParallel 은 로컬 환경에서 Runtime.getRuntime().availableProcessors() - 1 이 3으로 3번씩 호출하는 것 같았다.

private final static String SLOW_HOST =
    "http://slowwly.robertomurray.co.uk/delay/2000/url/http://fakerestapi.azurewebsites.net";

private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
private final RestTemplate restTemplate = new RestTemplate();

@Test
public void test() {
    final List<String> ids = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9");
    final Map<String, Long> resultsTime = new HashMap<>(3);
    long startTime;

    // asyncRestTemplate
    startTime = System.nanoTime();
    System.out.println(asyncRestTemplate(ids));
    resultsTime.put("asyncRestTemplateElapsedTime", System.nanoTime() - startTime);

    // restTemplate
    startTime = System.nanoTime();
    System.out.println(restTemplate(ids));
    resultsTime.put("restTemplateElapsedTime", System.nanoTime() - startTime);

    // restTemplate with parallel
    startTime = System.nanoTime();
    System.out.println(restTemplateWithParallel(ids));
    resultsTime.put("restTemplateWithParallelElapsedTime", System.nanoTime() - startTime);

    // results
    final DecimalFormat formatter = new DecimalFormat("#,###ms");
    resultsTime.entrySet().stream()
        .sorted(Comparator.comparing(Map.Entry::getValue))
        .forEach(result -> System.out.println(
            result.getKey() + ": " + formatter.format(result.getValue() / 1_000_000)));
}

private List<User> asyncRestTemplate(List<String> ids) {
    List<CompletableFuture<ResponseEntity<User>>> res = new ArrayList<>(ids.size());

    // stream 으로 만들면 lazy하게 동작하면서 한번씩 API 요청?
    ids.forEach(id -> res.add(convert(asyncRestTemplate.getForEntity(
        SLOW_HOST + "/api/Users/" + id, User.class))));

    return res.stream()
        .map(CompletableFuture::join)
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
}

private List<User> restTemplate(List<String> ids) {
    List<User> users = ids.stream()
        .map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
    return users;
}

private List<User> restTemplateWithParallel(List<String> ids) {
    List<User> users = ids.parallelStream()
        .map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
    return users;
}
Output
asyncRestTemplateElapsedTime: 3,746ms
restTemplateWithParallelElapsedTime: 8,353ms
restTemplateElapsedTime: 25,067ms

Example 2: getUser

슈도 코드

@GetMapping("/users/{userId}")
public UserDetail get(@PathVariable(value = "userId") String id) {
    try {
        return Stream.of(id)
            .map(userMapper::get)
            .map(validate) // throw InvalidException
            .map(user -> {
                final String uuid = user.getId();

                CompletableFuture<Accounts> fAccounts = accountConnector.getAccounts(uuid);
                CompletableFuture<Cards> fCards = cardConnector.getCards(uuid);
                CompletableFuture<Point> fPoint = pointConnector.getPoint(uuid);

                return CompletableFuture.allOf(accountsFuture, cardsFuture, pointFuture)
                    .thenApply(ignore -> UserDetail.builder()
                        .user(user)
                        .accounts(fAccounts.join())
                        .cards(fCards.join())
                        .point(fPoint.join())
                        .build())
                    .exceptionally(err -> {
                        throw new ConnectorException(err);
                    });
            })
            .map(CompletableFuture::join)
            .findFirst()
            .orElse(null);
    } catch (InvalidException | ConnectorException e) {
        log.debug(e.getMessage(), e);
        throw e;
    } catch (Exception e) {
        log.error(e.getMessage(), e);
        throw e;
    }
}
java,stream