Stream API

// https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
int sum = widgets.stream()
                 .filter(w -> w.getColor() == RED)
                 .mapToInt(w -> w.getWeight())
                 .sum();

스트림StreamJava 8에서 추가되었고, 컬렉션의 map/reduce 같은 elements의 흐름을 함수형 스타일로 (람다식도 적용하여) 처리할 수 있도록 지원해주는 라이브러리다. 스트림은 컬렉션과 비슷하지만 다음과 같은 차이점이 있다.

  • 스트림은 값을 읽기만 할 뿐 저장하지 않는다.

  • 스트림의 모든 함수는 순수함수pure function이다.

  • 스트림은 게으르게lazy 동작한다.

  • 무한한 스트림이 가능하다.

  • 스트림은 일회성으로 사용과 동시에 소멸된다.

Note
Functional Interface

Java에서 Functional Interface는 하나의 abstract method를 가진 인터페이스로, 한가지 기능만 할 수 있다는 것을 나타낸다. @FunctionalInterfaceFunctional Interface 로 선언되어 있다는 것을 나타내기 위한 어노테이션으로 컴파일 단계에서 확인해준다. 이는 리팩토링할 때 에러를 빠르게 해결할 수 있도록 해준다.

java.util.function 패키지 하위에는 lambda expressionsmethod references 를 위한 Functional Interface 들이 존재한다.

FunctionalInterfaceNamingGuide
Image: Functional Interface Naming Guide
Note
Lambda Expressions

람다 표현식(람다식)을 짧게 설명하면 간단한 방식으로 코드를 작성하고 실행하는 방법이라고 할 수 있다. 함수를 간결하게 표현하고, 프로그래밍 언어 개념으로는 단순한 익명 함수를 생성하는 문법으로 볼 수 있다. 아래는 람다식으로 간결하게 표현한 코드 예제이다.

interface Printer {
    void print(String str);
}
// as-is
Printer p1 = new Printer() {
    @Override
    public void print(String str) {
        System.out.println(str);
    }
};
// to-be
Printer p2 = str -> System.out.println(str);
Printer p3 = System.out::println; // Java 8 method reference

Stream Creation

스트림을 생성하는 방법에는 여러가지가 있으며, 이미 많은 클래스들에 구현되어 있다.

Array to Stream

String[] arr = new String[]{"a", "b", "c"};
// String[] arr = Arrays.asList("a", "b", "c");
Stream<String> stream = Arrays.stream(arr);

Collection to Stream

stream() 디폴트 메서드가 Collection 인터페이스에 추가되어 있어 모든 컬렉션에서 스트림을 생성할 수 있다.

List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream1 = list.stream();
Stream<String> stream2 = list.parallelStream(); // 병렬 처리 스트림

Empty Stream

// Null-Safe
public Stream<String> streamOf(Collection<String> list) {
  returen list == null || list.isEmpty() ? Stream.empty() : list.stream();
}

Stream.builder()

Stream<String> s = Stream.<String>builder()
                         .add("a")
                         .add("b")
                         .add("c")
                         .build();

Stream.generate()

generate() 는 무한 스트림을 생성하기 때문에 최대 크기를 지정해줘야 한다.

Stream.generate(() -> "str").limit(5);

Stream.iterate()

iterate() 는 초깃값과 다음 요소에 반영될 람다식이 들어간다.

Stream<Integer> s = Stream.iterate(10, n -> n + 1).limit(5); // 10, 11, 12, 13, 14

Primitive Type Stream

IntStream s1 = IntStream.range(1, 5); // 1, 2, 3, 4
LongStream s2 = LongStream.rangeClosed(1, 5); // 1, 2, 3, 4, 5
DoubleStream s3 = DoubleStream.of(0, 4, 3); // 0.0, 4.0, 3.0
DoubleStream s4 = new Random().doubles(3)
// boxing
Stream<Integer> s5 = IntStream.range(1, 5).boxed();

원시 스트림primitive stream을 객체 스트림으로 변환하려면 mapToObj() 를 사용한다.

IntStream.rangeClosed(2, 12) // 2 ~ 12
         .mapToObj(month -> new Month(month))
         .collect(Collectors.toList());

String to Stream

IntStream s1 = "abc".chars();
Stream<String> s2 = Pattern.compile("\\|").splitAsStream("a|b|c");

File to Stream

Stream<String> s1 = Files.lines(Paths.get("~/words.txt"), StandardCharsets.UTF_8);
Stream<Path> s2 = Files.list(Paths.get("~/Documents"));

Parallel Stream

스트림 생성 시 stream() 대신 parallelStream() 을 사용하면 된다.

Example
Stream<String> s = list.parallelStream();

boolean isParallel = s.isParallel(); // Check parallel stream

s.sequential(); // Change parallel to sequential stream

s.parallel();  // Change sequential to parallel stream

스레드를 처리하기 위해 Java 7부터 도입된 Fork/Join framework를 내부적으로 사용한다. 스레드의 개수의 기본값은 Runtime.getRuntime().availableProcessors() - 1 로 'CPU 개수 - 1'이다(1은 메인 스레드이지 않을까?). 스래드 개수는 아래와 같이 지정할 수 있다.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

Merging Streams

Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);

Stream<Integer> resultingStream = Stream.concat(s1, s2);
Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);
Stream<Integer> s2 = Stream.of(18, 15, 36);

Stream<Integer> resultingStream = Stream.concat(Stream.concat(s1, s2), s3);
// using flatMap()
// Stream<Integer> resultingStream = Stream.of(s1, s2, s3).flatMap(i -> i);

Stream Operations

스트림 연산은 크게 intermediate operations(return Stream<T>) 과 terminal operations(특정 타입의 결과를 반환) 나뉘어있다. 자세한 내용은 Stream 문서에서 확인할 수 있다.

Intermediate Operations

중간 작업, 중개 연산, 중간 연산자

모든 Intermediate Operations 은 pure function으로 구현되어 있으며, Stream<T> 을 반환해 메서드 체이닝(chaining)이 가능하다.

long count = list.stream()
                 .filter(n -> n > 100) // (1)
                 .distinct()
                 .count(); // (2)
  1. filter() , distinct() 는 intermediate operations를 나타낸다.

  2. stream의 사이즈를 반환하는 terminal operations이다.

Filtering

filter
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .filter(str -> {
                            return "a".equals(str);
                        });
Stream<String> s2 = list.stream()
                        .filter(str -> "a".equals(str));
Stream<String> s3 = list.stream()
                        .filter("a"::equals); // Java 8 method reference
// a
limit
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .limit(3);
// a b c
skip
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
                        .skip(3);
// A B C
takeWhile

JDK9에서 추가되었다.

Stream.of(2, 4, 6, 8, 9, 10, 12)
    .takeWhile(n -> n % 2 == 0)
    .forEach(System.out::println);
// 2
// 4
// 6
// 8

Mapping

map
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> s1 = list.stream()
                        .map(s -> s.toUpperCase());
Stream<String> s2 = list.stream()
                        .map(String::toUpperCase); // Java 8 method reference
// A B C
flatMap

중첩 구조를 단일 구조로 풀어주는 작업을 한다. 이러한 작업을 flattening 이라고 한다.

Flattening1
Image: Flattening
Flattening2
Example
List<String> list1 = Arrays.asList("a", "b", "c");
List<String> list2 = Arrays.asList("A", "B", "C");

Stream<String> s1 = Stream.of(list1.stream(), list2.stream())
                          .flatMap(strings -> strings);
// a b c A B C
boxed

Primitive Type Stream의 각 Element를 boxing 시켜 Wrapper class 변환시켜준다.

int[] a1 = IntStream.of(3, 2, 1)
                    .toArray();
// [3, 2, 1]

List<Integer> l1= IntStream.of(3, 2, 1)
                           .boxed()
                           .collect(Collectors.toList());
// [3, 2, 1]

Sorting

sorted
int[] a1 = IntStream.of(3, 2, 1)
                    .sorted()
                    .toArray();
// [1, 2, 3]
List<User> list1 = Arrays.asList(new User("c"), new User("b"), new User("a"));

Stream<User> s1 = list1.stream()
                       .sorted(Comparator.comparing(User::getName));
// User(name=a), User(name=b), User(name=c)

Stream<User> s2 = list1.stream()
                       .sorted(Comparator.comparing(User::getName).reversed());
// User(name=c), User(name=b), User(name=a)
Tip
비교하려는 값에 Null이 있을 경우

정렬 기준 값이 Null이 있을 경우 NPE가 발생하게 된다. 이러한 NPE를 피하기 위해서는 Comparator.nullsLast(), Comparator.nullsFirst() 를 사용하면 안전하게 정렬을 할 수 있다.

Null safe
List<User> list1 = Arrays.asList(new User("c"), new User(null), new User("a"));

Stream<User> s1 = list1.stream()
                       .sorted(Comparator.comparing(User::getName, Comparator.nullsLast(Comparator.naturalOrder())));
// User(name=a), User(name=c), User(name=null)

Stream<User> s2 = list1.stream()
                       .sorted(Comparator.comparing(User::getName, Comparator.nullsLast(Comparator.reverseOrder())));
// User(name=c), User(name=a), User(name=null)

Stream<User> s3 = list1.stream()
                       .sorted(Comparator.comparing(User::getName, Comparator.nullsFirst(Comparator.naturalOrder())));
// User(name=null), User(name=a), User(name=c)

Stream<User> s4 = list1.stream()
                       .sorted(Comparator.comparing(User::getName, Comparator.nullsFirst(Comparator.reverseOrder())));
// User(name=null), User(name=c), User(name=a)
distinct

Stream 내에 Element의 중복을 제거한다.

IntStream.of(1, 2, 3, 3)
         .distinct()
         .toArray()
// [1, 2, 3]

Iterating

peek

'peek’는 '훔쳐보다', '살짝 보이다’는 뜻으로, Stream API에서는 intermediate operation 에서 값을 확인할 수 있는 메서드로 사용한다. peek 메서드만으로는 스트림을 소비하지 않는다.

IntStream.of(1, 2, 3, 4)
         .filter(e -> e > 2)
         .peek(e -> System.out.println("Filtered value: " + e))
         .map(e -> e * e)
         .peek(e -> System.out.println("Mapped value: " + e))
         .sum();
// Filtered value: 3
// Mapped value: 9
// Filtered value: 4
// Mapped value: 16
Note
peek vs fxjs/tap

peek은 한개씩 볼 수 있는 반면에 fxjs의 tap은 전체를 볼 수 있다.

fx.go(
  [10, 20, 30],
  fx.tap(
    a => a,
    log), // [10, 20, 30]
  a => a,
  log); // [10, 20, 30]

Terminal Operations

최종 연산

스트림을 가지고 결과값을 만들어내는 연산이다. 스트림은 평가되기 전까지 실행되지 않는데(lazy), terminal operation이 스트림을 평가하는 역할이다.

Collecting

collect

아마 가장 많이 사용하는 Terminal Operation은 collect() 일 것 같다. 이 메서드와 Collectors 클래스의 static 메서드를 활용하여 스트림을 여러 결과로 반환할 수 있다. Collectors 는 컬렉션의 요소element를 다양한 기준에 따라 summarizing 하거나 모으는 등 유용한 리덕션 연산을 static 메서드로 구현해둔 클래스이다.

Note

일부 메서드에는 Java 8 이전의 코드스타일과 스트림을 사용한 코드와 비교하기 위해 Before 코드와 After 코드를 추가하였다.

Collectors.toList()

결과를 List 로 변환해주는 메서드이다. 내 경험상 Terminal Operation 내에서 가장 자주 사용하는 메서드인 것 같다.

// as-is
final List<String> list = Arrays.asList("a", "b", "c");

// to-be
// [A, B, C]
Before
List<String> upperCaseList = new ArrayList<>();
for (String str : list) {
    upperCaseList.add(str.toUpperCase());
}
After
list.stream()
    .map(String::toUpperCase)
    .collect(Collectors.toList())
Note
JDK 버전업에 따른 변화
  1. JDK8: .collect(Collectors.toList()) 추가. but, mutable list 반환

  2. JDK10: .collect(Collectors.toUnmodifiableList()) 추가.

  3. JDK16: Stream.toList() 추가.

toUnmodifiableList() 의 경우 엘리먼트로 null 을 허용하지 않으므로 스트림에 null 이 있을 경우 NPE가 발생한다. 이와 달리, Stream.toList()null 을 허용하므로 NPE가 발생하지 않는다.

Collectors.toMap()

List<T> 에서 특정 값을 key로 사용하여 Map 으로 변환하는 메서드이다.

// as-is
final List<Payment> payments = Arrays.asList(
    new Payment("p01", 10_000),
    new Payment("p02", 20_000),
    new Payment("p03", 50_100));

// to-be
// {p01=10000, p03=50100, p02=20000}
Before
Map<String, Payment> map = new HashMap<>();

for (Payment payment : payments) {
    map.put(payment.getId(), payment);
}
After
final Map<String, Payment> map = payments.stream()
    .collect(Collectors.toMap(Payment::getId, payment -> payment));
Collectors.joining()
Stream.of("a", "b", "c")
    .map(String::toUpperCase)
    .collect(Collectors.joining(", "))
// A, B, C

Stream.of("a", "b", "c")
    .map(String::toUpperCase)
    .collect(Collectors.joining(", ", "<", ">"))
// <A, B, C>
Collectors.groupingBy()
Map<Integer, List<User>> u1 = Stream.of(
    User.builder().name("a").age(29).build(),
    User.builder().name("b").age(19).build(),
    User.builder().name("c").age(29).build(),
    User.builder().name("d").age(19).build(),
    User.builder().name("e").age(39).build()
).collect(Collectors.groupingBy(User::getAge));
{
  19=[User(name=b, age=19), User(name=d, age=19)],
  39=[User(name=e, age=39)],
  29=[User(name=a, age=29), User(name=c, age=29)]
}
Note
groupingBy 로 chunk 구현하기
TODO

jdk9 takeWhile 활용해서 작성해보기. 아래 chunk 구현체들은 지연동작하지 않는다. 이미 평가가 된 상태로 반환.

@Test
public void fp() {
    final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

    System.out.println(chunk(list, 4));
    // [[1, 2, 3, 4], [5, 6, 7, 8], [9]]

    chunkStream(list, 4)
        .limit(1)
        .forEach(System.out::println);
    // [1, 2, 3, 4]
}
chunk(), chunkStream()
// https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/
private Collection<? extends List<?>> chunk(List<?> list, int size) {
    final AtomicInteger counter = new AtomicInteger();

    return list.stream()
        .collect(Collectors.groupingBy(
            it -> counter.getAndIncrement() / size))
        .values();
}

private Stream<?> chunkStream(List<?> list, int size) {
    return chunk.stream();
}

References

Collectors.partitioningBy()
Map<Boolean, List<Integer>> result;
result = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
               .collect(Collectors.partitioningBy(n -> n > 3));
// {false=[1, 2, 3], true=[4, 5, 6, 7, 8, 9, 10]}
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      .collect(Collectors.partitioningBy(n -> n > 3, Collectors.counting()));
// {false=3, true=7}
Collectors.collectingAndThen()

collect 한 이후에 필요한 작업을 추가한다.

Set<Product> unmodifiableSet = productList.stream()
                                          .collect(Collectors.collectingAndThen(
                                              Collectors.toSet(),
                                              Collections::unmodifiableSet));
Collectors.averageingInt()
// average
Integer averageAge = Stream.of(user1, user2, user2)
                           .collect(Collectors.averageingInt(User::getAge));
Collectors.summingInt()
// 1. sum
Integer sumAge1 = Stream.of(user1, user2, user2)
                        .collect(Collectors.summingInt(User::getAge));
// 2. mapToInt를 활용해 sum 구하는 방법
Integer sumAge2 = Stream.of(user1, user2, user2)
                        .mapToInt(User::getAge)
                        .sum();
Collectors.summarizingInt()
Stream<BigDecimal> s = Stream.iterate(BigDecimal.ONE, n -> n.add(BigDecimal.ONE))
                             .limit(10);
IntSummaryStatistics i = s.collect(Collectors.summarizingInt(BigDecimal::intValue));
// IntSummaryStatistics{count=10, sum=55, min=1, average=5.500000, max=10}
Collectors.of()

collector를 직접 만들어서 사용하고자 할 경우 of() 를 활용할 수 있다.

List<Integer> c = Arrays.asList(1, 2, 3, 4, 5, 6, 7).stream()
                        .collect(Collector.of(
                            ArrayList::new,
                            List::add,
                            (left, right) -> {
                                left.addAll(right);
                                return left;
                            }));
// [1, 2, 3, 4, 5, 6, 7]
toArray
int[] a1 = Stream.of(1, 2, 3)
                 .toArray();
String[] a2 = Arrays.stream("a", "b", "c")
                    .map(String::toUpperCase)
                    .toArray(String[]::new);

Reduction

reduce는 캐터모피즘catamorphism 이라는 목록 조작 개념의 특별한 변형으로, 컬렉션을 줄여나가는 방법이다. 스트림 API에서는 reduce() 로 리듀싱을 제공한다.

Example
Stream.of(1, 2, 3).reduce(0, (x, y) -> x + y); // 6
Stream.of(1, 2, 3).reduce(0, Integer::sum);    // 6
int sum = Stream.of(1, 2, 3)
                .reduce((l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r);
                }).get();
System.out.println("sum:" + sum);
Output
l=1, r= 2
l=3, r= 3
sum: 6
int sum = Stream.of(1, 2, 3)
                .reduce(10, (l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r);
                });
System.out.println("sum:" + sum);
Output
l=10, r= 1
l=11, r= 2
l=13, r= 3
sum: 16
Important
collect vs reduce

collect 는 변경 가능한(mutable) 결과 객체를 사용하여 동작한다. 즉, 내부적으로 mutable한 collection이 하나 생성되고 각 요소를 collection에 축적해나가며 동작한다. 그에 반해, reduce 는 immutable한 결과 객체(누산기accumulator)를 사용한다.

int sum = Stream.of(1, 2, 3)
                .reduce((l, r) -> {
                    System.out.println("l=" + l + ", r= " + r);
                    return (l + r); // (1)
                }).get();
System.out.println("sum: " + sum);
// l=1, r= 2
// l=3, r= 3
// sum: 6

String result = Stream.of("1", "2", "3")
                      .collect(
                          StringBuilder::new, // supplier
                          (sb, s) -> { // accumulator
                              System.out.println("sb:" + sb + ", s: " + s);
                              sb.append(" ").append(s); // (2)
                          },
                          (r1, r2) -> { // combiner
                              System.out.println(r1 + ", " + r2);
                              r1.append(",").append(r2.toString());
                          })
                      .toString();
System.out.println("result: " + result);
// sb:, s: 1
// sb: 1, s: 2
// sb: 1 2, s: 3
// Result:  1 2 3
  1. reduce 에서는 새로운 immutable 객체를 반환한다.

  2. collect 에서는 새로운 값을 반환하지 않고, 값을 변경 시켜준다.

만약 int, double 같은 immutable한 값을 다룬다면 reduce 를, mutable한 데이터를 다룬다면 collect 를 사용하면 된다.

Matching

Predicate 를 받아 해당 조건을 만족하는지 체크한 결과를 반환한다.

IntStream.of(1, 1, 1).allMatch(a -> a == 1);  // true
IntStream.of(1, 2, 3).anyMatch(a -> a == 1);  // true
IntStream.of(1, 2, 3).noneMatch(a -> a == 4); // true
  • anyMatch : 하나라도 만족하는지

  • allMatch : 모두 만족하는지

  • noneMatch : 모두 만족하지 않는지

Iterating

forEach
// stream
Arrays.asList(1, 2, 3)
      .stream()
      .forEach(System.out::println);
1
2
3
Important
Stream.forEach vs Collection.forEach

두 코드의 결과는 동일하다. 하지만 두 코드의 속도만 비교한다면 Collection.forEach() 가 더 빠르게 동작한다. 그러므로 단순히 stream().forEach() 만 사용할 것이라면 Collection.forEach 를 사용할 것을 권장한다. IntelliJ IDEA에서는 stream().forEach() 코드를 검사해준다.

// iterable
Arrays.asList(1, 2, 3).stream().forEach(System.out::println); // (1)
Arrays.asList(1, 2, 3).forEach(System.out::println); // (2)
  1. for-each Loop of Stream

  2. for-each of Iterable

Calculating

IntStream.of(1, 2, 3).count()   // 3
IntStream.of(1, 2, 3).sum()     // 6
IntStream.of(1, 2, 3).min()     // OptionalInt[1]
IntStream.of(1, 2, 3).max()     // OptionalInt[3]
IntStream.of(1, 2, 3).average() // OptionalDouble[2.0]
IntSummaryStatistics s = IntStream.of(1, 2, 3).summaryStatistics();
// {count=3, sum=6, min=1, average=2.000000, max=3}

s.getCount();   // 3
s.getMin();     // 1
s.getMax();     // 3
s.getSum();     // 6
s.getAverage(); // 2.0

Concurrency

Asynchronous Execution

Future (java 5) → ListenableFuture (spring 3.0) → CompletableFuture (java 8)

class Future
class ListenableFuture
class CompletableFuture

Future <|-- CompletableFuture
Future <|-- ListenableFuture
HTTP Client

RestTemplate (spring 3.0) → AsyncRestTemplate (spring 4.0) → WebClient (spring 5.0)

Future

Future는 Java 5에 추가된 클래스로, 비동기 연산 결과를 나타낸다. 작업이 완료되었는지 확인하고(isDone()), 완료를 기다리며, 계산 결과를 확인하는(get()) 메서드와 작업을 취소(cancel())하는 메서드가 제공된다. 일단 작업이 완료되면 취소할 수 없으며, 결과는 작업이 완료되었을 때만 메서드를 통해 확인할 수 있고, 필요한 경우 준비가 될 때까지 blocking 한다.

Example
@Test
void futureTest() throws InterruptedException, ExecutionException {
    System.out.println("Thread#" + Thread.currentThread().getId());

    ExecutorService executor = Executors.newFixedThreadPool(1); // (1)
    Future<Integer> future = executor.submit(() -> {
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Thread#" + Thread.currentThread().getId());
        return 123;
    });

    System.out.println("future done? " + future.isDone());

    Integer result = future.get(); // (2)

    System.out.println("future done? " + future.isDone());
    System.out.println("result:" + result);
}
  1. Java5 에서 멀티스레드와 콜백 사용하기 위해 ExecutorService 사용

  2. get() 은 blocking 메서드이다.

Output
Thread#1
future done? false
Thread#12
future done? true
result:123

ListenableFuture

Spring Framework 4.0 에서 추가된 ListenableFuture는 callback을 적용한 Future 의 확장 클래스이다. AsyncRestTemplate 의 리턴타입이기도 하다. JQuery에서 Promise 사용하기 전 Ajax 호출하는 것과 비슷한것 같다.

Example
@Test
void async() throws InterruptedException {
    AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();

    ListenableFuture<ResponseEntity<Map>> entity =
        asyncRestTemplate.getForEntity("https://httpbin.org/get", Map.class);

    entity.addCallback(
        result -> System.out.println(result.getStatusCode()), // (1)
        err -> System.out.println(Arrays.toString(err.getStackTrace()))); // (2)

    System.out.println("test1");
    TimeUnit.SECONDS.sleep(5); // (3)
    System.out.println("test2");
}
  1. 성공시 실행할 callback 함수(SuccessCallback 인스턴스)

  2. 실패시 실행할 callback 함수(FailureCallback 인스턴스)

  3. sleep() 을 주지않으면 메인 스레드가 바로 종료되면서 HTTP 응답값을 확인 할 수 없다.

Output
DEBUG: Created asynchronous GET request for "https://httpbin.org/get" ...
DEBUG: Setting request Accept header to [application/json, application/*+json]
test1 (1)
DEBUG: Async GET request for "https://httpbin.org/get" resulted in 200 (OK)
DEBUG: Reading [interface java.util.Map] as "application/json" using [o.s.h.c.j.MappingJackson2HttpMessageConverter]
200 (2)
test2 (3)
  1. 코드를 실행시키면 바로 출력

  2. HTTP 응답을 받을 때 바로 출력(sleep와 무관하게 동작)

  3. "test1" 출력하고 5초 이후에 출력

Note

Java 8 lambda expression을 활용함으로써 코드가 간결해진다.

lambda expression을 사용안할 경우
entity.addCallback(new SuccessCallback<ResponseEntity<Map>>() {
    @Override
    public void onSuccess(ResponseEntity<Map> result) {
        System.out.println(result.getStatusCode());
        System.out.println(result.getBody());
    }
}, new FailureCallback() {
    @Override
    public void onFailure(Throwable err) {
        System.out.println(Arrays.toString(err.getStackTrace()))
    }
});

CompletableFuture

Java 8에 추가된 CompletableFutureFuture 뿐만 아니라 CompletionStage를 함께 확장한 클래스이다. CompletableFuture 의 개념은 다른 프로그래밍 언어에서는 Deferred 또는 Promise 라고 부른다.

CompletionStage 는 다른 CompletionStage 가 완료될 때 작업을 수행하거나 계산하기 위한 비동기 작업의 stage이다. stage는 작업이 종료될 때 완료되지만, 종속적인 다른 stage를 트리거 할 수도 있다.

Example
public Future<String> getStringAsync() throws InterruptedException {
    CompletableFuture<String> cf = new CompletableFuture<>();
    Executors.newCachedThreadPool().submit(() -> {
        TimeUnit.SECONDS.sleep(1);
        cf.complete("Hello");
        return null;
    });
    return cf;
}

@Test
public void completableFutureExample() throws InterruptedException {
    System.out.println("Thread#" + Thread.currentThread().getId() + ": 1");

    CompletableFuture<String> cf = (CompletableFuture)getAsync();
    cf.thenAccept(s -> { // (1)
        System.out.println("Thread#" + Thread.currentThread().getId() + ": 2");
        System.out.println(s);
    });

    System.out.println("Thread#" + Thread.currentThread().getId() + ": 3");
    TimeUnit.SECONDS.sleep(3);

    System.out.println("Thread#" + Thread.currentThread().getId() + ": 4");
}
// Output:
//   Thread#1: 1
//   Thread#1: 3 (2)
//   Thread#12: 2 (3)
//   Hello
//   Thread#1: 4 (4)
  1. thenAccept 에는 Callable 의 return 값이 아니라 CompletableFuture 인스턴스의 complete 로부터 전달받은 인자가 파라미터로 들어온다.

  2. 응용 프로그램이 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.

  3. 메인 스레드는 3초 기다리고 있는 중이지만 cf의 값이 반환되면서 "2"가 출력된다.

  4. 메인 스레드에서 3초가 지나고나서 "4"가 출력되면서 프로그램이 종료된다.

supplyAsync

CompletableFuture 를 생성하는 팩토리 메서드로 Supplier를 인자로 받는 supplyAsync() 가 있다. 비동기적으로 실행해서 결과를 생성하며 람다식을 이용하여 쉽게 구성하고 조합할 수 있다.

Example
@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1); // (1)
        print.accept("2");
        return "Hello World";
    });

    print.accept("3"); // (2)
    print.accept(future.get()); // (3)
    print.accept("4");
}
// Output:
//   Thread#1: 1
//   Thread#1: 3
//   Thread#12: 2 // (4)
//   Thread#12: Hello World (5)
//   Thread#1: 4
  1. 별도의 스레드에서 1초를 기다린다.

  2. 응용 프로그램을 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.

  3. future 의 값을 확인하기 위해 blocking 된 상태로 기다린다.

  4. 1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.

  5. blocking 되어 있던 get 이 동작하면서 "Hello World"를 출력하고 나머지 작업을 진행한다.

runAsync

CompletableFuture 를 생성하는 팩토리 메서드로 Runnable 을 인자로 받기 떄문에 반환값이 없다.

CompletableFuture.runAsync(() -> {
    log.info("test");
}).get();

thenApply

thenApplyFunction을 인자로 받으며, 비동기 연산 결과에 추가 작업을 한 뒤 반환하고 싶을 때 사용된다.

Example
private static Function<String, String> appendName = (String name) -> "Hello " + name + "!";

@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        sleep(1); // (1)
        print.accept("2");
        return "Jun";
    }).thenApply(appendName);

    print.accept("3"); // (2)
    print.accept(future.get()); // (3)
    print.accept("4");
    sleep(3);
}
// Output:
//   Thread#1: 1
//   Thread#1: 3
//   Thread#12: 2 // (4)
//   Thread#1: Hello Jun! // (5)
//   Thread#1: 4
  1. 별도의 스레드에서 1초를 기다린다.

  2. 응용 프로그램을 실행하면 메인 스레드에서 바로 "1", "3"이 출력된다.

  3. future 의 값을 확인하기 위해 blocking 된 상태로 기다린다.

  4. 1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.

  5. blocking 되어 있던 getappendName 이후의 결과를 출력한다.

thenCompose

thenCompose는 여러 CompletableFuture 를 조립해서 순차적으로 실행하기 위한 메서드이다. CompletableFuture<U> 를 반환하기 때문에 chaining이 가능하다.

Example
private static Consumer<String> print = (String str) ->
    System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);

@Test
public void completableFutureExample() throws ExecutionException, InterruptedException {
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("1");
        return "Hello";
    }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("2");
        return s + " Beautiful";
    })).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        sleep(2);
        print.accept("3");
        return s + " World";
    })).thenAccept(str -> {
        sleep(2);
        print.accept("4");
        System.out.println(str);
    });

    print.accept("5");
    System.out.println(cf.isDone());
    System.out.println(cf.get());

    sleep(10);
}
Output
Thread#1: 5
false (1)
Thread#12: 1 (2)
Thread#13: 2 (3)
Thread#13: 3 (4)
Thread#13: 4 (5)
Hello Beautiful World
null (6)
  1. isDone() 의 결과로 아직 완료되지 않아 false가 출력된다.

  2. 2초 뒤 "1"이 출력되고, "Hello"과 함께 CompletableFuture 값을 반환한다.

  3. 2초 뒤 "2"이 출력되고, 앞에서 전달받은 "Hello"에 "Beautiful"을 조합한 CompletableFuture 값을 반환한다.

  4. 2초 뒤 "3"이 출력된고, 앞에서 전달받은 "Hello Beautiful"에 "World"을 조합한 CompletableFuture 값을 반환한다.

  5. thenAccept() 에서 2초 뒤 "4"와 함께 "Hello Beautiful World"가 출력된다.

  6. cf 의 작업이 완료되었으므로 cf.get() 가 동작한다. 마지막에 호출된 thenAcceptConsumer 를 파라미터로 받는데, 이것은 반환값이 없으므로 null 을 출력한다.

exceptionally

exceptionallyCompletableFuture 작업 내에서 발생하는 Exception을 처리하기 위한 메서드이다. ListenableFuture 에서 FailCallbackonFailure 를 대신할 수 있다. ListenableFuture 는 요청별로 실패 로직을 설정하는 반면 exceptionally 는 모든 Exception을 통합적으로 처리할 수 있다.

@Test
public void exceptionallyExample1() throws InterruptedException, ExecutionException {
    final String result = CompletableFuture.supplyAsync(() -> {
        System.out.println("1");
        return "1";
    }).thenApply(s -> {
        System.out.println("2");
        throw new RuntimeException();
    }).thenApply(s -> {
        System.out.println("3");
        return s + "3";
    }).exceptionally(throwable -> {
        System.out.println("4");
        log.error("catch error", throwable);
        return "exception!!!";
    }).get();

    System.out.println(result);
}
Output
1
2
4
21:11:10.349 [main] ERROR com.example.demo.LocalTest - catch error
java.util.concurrent.CompletionException: java.lang.RuntimeException
  at java.base/java.u.c.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
  at java.base/java.u.c.CompletableFuture.uniApplyNow(CompletableFuture.java:683)
  at java.base/java.u.c.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
  at java.base/java.u.c.CompletableFuture.thenApply(CompletableFuture.java:2094)
  ...
exception!!!

allOf

allOf는 여러 CompletableFuture 의 작업들이 모두 완료되었는지 확인하고자 할 때 사용한다.

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

확인할 CompletableFuture 들을 가변인자varargs 를 받고 CompletableFuture<Void> 를 반환한다. 반환값의 get() 혹은 join() 을 통해 모든 작업이 완료되기를 기다릴 수 있다.

allOf 의 반대로는, 여러 CompletableFuture 중 하나라도 완료되었는지 확인할 수 있는 anyOf 가 있다.

Tip
CompletableFuture vs Promise
Java JavaScript

CompletableFuture.allOf

Promise.all

CompletableFuture.anyOf

Promise.race

Example
private static Consumer<String> print = (String str) ->
    System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);

@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
    print.accept("1");
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
        sleep(1);
        print.accept("2");
        return "Hello";
    });
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
    CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");

    print.accept("3");

    CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(f1, f2, f3);
    print.accept("4");
    combinedFuture.get();
    print.accept("5");

    TimeUnit.SECONDS.sleep(3);
    print.accept("6");
}
Output
Thread#1: 1
Thread#1: 3 (1)
Thread#1: 4
Thread#12: 2 (2)
Thread#1: 5
Thread#1: 6 (3)
  1. 실행되면 메인 스레드에서 "1", "3", "4" 가 출력된다.

  2. 1초 뒤 "2"가 출력되면서 combinedFuture.get() 아래의 "5"도 출력된다.

  3. "4"가 출력되고 3초 뒤 "5"가 출력되면서 프포그램이 종료된다.

join

joinCompletableFuture 의 작업이 완료되면 결과를 반환하거나, 예외 발생시 (unchecked)예외를 던지는 메서드이다.

Example
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");

String str = Stream.of(f1, f2, f3)
                // .map(f -> { return f.join(); })
                // .map(f -> f.join())
                   .map(CompletableFuture::join)
                   .collect(Collectors.joining(" "));

System.out.println(str); // Hello Beautiful World
Note
get vs join

CompletableFuturegetjoin 은 작업이 완료되기를 기다리고 값을 반환하는 작업을 한다. 동일한 동작을 하지만 다음과 같은 차이점이 있다.

  • getFuture 의 메서드를 구현한 것이고, joinCompletableFuture 의 메서드이다.

  • get 은 checked exception을 던지고, join 은 unchecked exception을 던지지 않는다.

    V get() throws InterruptedException, ExecutionException { ... }
    public T join() { ... }

따라서, get 은 명시적으로 try-catch를 해줘야하는 반면 join 은 에러를 밖으로 던질 수 있게된다. 즉, 여러 CompletableFuture 를 다룰 때 joinexceptionally 를 통해 통합적인 에러 핸들링이 가능해진다.

References

RestTemplate with @Async

주로 배치성 프로젝트에서 컨트롤러에 비동기적으로 동작시키기 위해 @Async 을 사용했었다. 이 때, 대부분(내가 본 프로젝트) void 를 반환하였다. 하지만 Async 문서를 보면 void 혹은 Future 를 반환시킬 수 있다. 즉, CompletableFuture 를 활용할 수 있게 된다.

Warning

메서드에 @Async 사용시 반드시 public 접근자이어야 한다.

However, the return type is constrained to either void or Future.
— Spring Framework, Annotation Type Async
ReceiptService.java
@Service
public class ReceiptService {
    @Async
    public CompletableFuture<Receipt> getAsync(@NotNull String paymentId) {
        final Receipt receipt = restTemplate.getForObject(url, Receipt.class);

        return CompletableFuture.completableFuture(receipt);
    }
}
ReceiptController.java
@RestController
public class ReceiptController {
    @GetMapping("/")
    public List<Receipt> getReceipts() {
        CompletableFuture<Receipt> receipt1 = receiptService.getAsync(id);
        ...
        CompletableFuture.allOf(receipt1, ...);

        return ...;
    }
}

TODO: 실제 동작하는 코드인지 테스트해보고 수정할 것

AsyncRestTemplate

AsyncRestTemplate은 Spring Framework 4.0 이후부터 제공되는 비동기 HTTP 요청을 위한 클래스이다. RestTemplate 와 유사하지만 ListenableFuture wrapper를 반환한다.

Spring Framework 5.0 부터 AsyncRestTemplate 클래스는 deprecated 되었고 WebClient 사용을 권장한다. RestTemplate 또한 앞으로 deprecated 될 예정이고 새로운 기능이 추가되지 않는다고 말한다.

Tip
Convert ListenableFuture to CompletableFuture
// https://jongmin92.github.io/2019/05/05/Java/java-async-4/
private <T> CompletableFuture<T> convert(ListenableFuture<T> lf) {
    final CompletableFuture<T> cf = new CompletableFuture<>();
    lf.addCallback(cf::complete, cf::completeExceptionally);
    return cf;
}

WebClient

Spring Framework 5.0에서 추가된 WebClient는 기존의 AsyncRestTemplate 역할을 한다. WebClient 는 Non-Block I/O 기반에 ThreadSafe 하다.

TODO: Reactive programming, Flux, Mono 등 추가적인 학습이 더 필요할 것 같다.

private static final String HOST = "http://fakerestapi.azurewebsites.net/api";

@Test
public void webClientExample() {
    WebClient client = WebClient.create(HOST);

    WebClient.RequestBodySpec req1 = client
        .method(HttpMethod.GET)
        .uri("/Users/1")
        .contentType(MediaType.APPLICATION_JSON_UTF8)
        .accept(MediaType.APPLICATION_JSON_UTF8);
    WebClient.RequestBodySpec req2 = client
        .method(HttpMethod.GET)
        .uri("/Users/2");

    User user1 = req1.exchange().block().bodyToMono(User.class).block();
    User user2 = req2.exchange().block().bodyToMono(User.class).block();

    System.out.println(user1);
    System.out.println(user2);
}
Output
User(id=1, name=User 1, password=Password1)
User(id=2, name=User 2, password=Password2)

Examples

여러 메서드들을 조합하여 몇가지 예제를 만들어 보고자 한다. Mock 데이터를 위해 Fake Rest APISlowwly를 사용할 것이며, 모든 예제 코드에는 아래와 같은 User 클래스가 선언되어 있다고 가정한다.

User class
@Data
class User {
    @JsonProperty("ID")
    private String id;
    @JsonProperty("UserName")
    private String name;
    @JsonProperty("Password")
    private String password;
}

Example 1: AsyncRestTemplate vs RestTemplate

다음 예제는 AsyncRestTemplateRestTemplate 의 속도를 비교해보려 한다.

asyncRestTemplate , restTemplate , restTemplateWithParallel 메서드로 나눠져 있으며, 모든 메서드가 ID가 1 부터 9까지의 사용자를 조회하고 응답값을 반환하는 작업을 한다. 모든 HTTP API에는 2초간 딜레이가 존재한다.

결과는 asyncRestTemplate 이 가장 빠르게 동작했다. 모든 요청을 한번에 조회하고 CompletableFuture::join 에서 모든 응답을 모아 작업이 진행되었다. restTemplate 은 예상대로 한번 요청하고 응답 받고 하는 식으로 진행되었고, 가장 느린 결과를 보였다. restTemplateWithParallel 은 로컬 환경에서 Runtime.getRuntime().availableProcessors() - 1 이 3으로 3번씩 호출하는 것 같았다.

private final static String SLOW_HOST =
    "http://slowwly.robertomurray.co.uk/delay/2000/url/http://fakerestapi.azurewebsites.net";

private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
private final RestTemplate restTemplate = new RestTemplate();

@Test
public void test() {
    final List<String> ids = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9");
    final Map<String, Long> resultsTime = new HashMap<>(3);
    long startTime;

    // asyncRestTemplate
    startTime = System.nanoTime();
    System.out.println(asyncRestTemplate(ids));
    resultsTime.put("asyncRestTemplateElapsedTime", System.nanoTime() - startTime);

    // restTemplate
    startTime = System.nanoTime();
    System.out.println(restTemplate(ids));
    resultsTime.put("restTemplateElapsedTime", System.nanoTime() - startTime);

    // restTemplate with parallel
    startTime = System.nanoTime();
    System.out.println(restTemplateWithParallel(ids));
    resultsTime.put("restTemplateWithParallelElapsedTime", System.nanoTime() - startTime);

    // results
    final DecimalFormat formatter = new DecimalFormat("#,###ms");
    resultsTime.entrySet().stream()
        .sorted(Comparator.comparing(Map.Entry::getValue))
        .forEach(result -> System.out.println(
            result.getKey() + ": " + formatter.format(result.getValue() / 1_000_000)));
}

private List<User> asyncRestTemplate(List<String> ids) {
    List<CompletableFuture<ResponseEntity<User>>> res = new ArrayList<>(ids.size());

    // stream 으로 만들면 lazy하게 동작하면서 한번씩 API 요청?
    ids.forEach(id -> res.add(convert(asyncRestTemplate.getForEntity(
        SLOW_HOST + "/api/Users/" + id, User.class))));

    return res.stream()
        .map(CompletableFuture::join)
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
}

private List<User> restTemplate(List<String> ids) {
    List<User> users = ids.stream()
        .map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
    return users;
}

private List<User> restTemplateWithParallel(List<String> ids) {
    List<User> users = ids.parallelStream()
        .map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
        .map(HttpEntity::getBody)
        .collect(Collectors.toList());
    return users;
}
Output
asyncRestTemplateElapsedTime: 3,746ms
restTemplateWithParallelElapsedTime: 8,353ms
restTemplateElapsedTime: 25,067ms

Example 2: getUser

슈도 코드

@GetMapping("/users/{userId}")
public UserDetail get(@PathVariable(value = "userId") String id) {
    try {
        return Stream.of(id)
            .map(userMapper::get)
            .map(validate) // throw InvalidException
            .map(user -> {
                final String uuid = user.getId();

                CompletableFuture<Accounts> fAccounts = accountConnector.getAccounts(uuid);
                CompletableFuture<Cards> fCards = cardConnector.getCards(uuid);
                CompletableFuture<Point> fPoint = pointConnector.getPoint(uuid);

                return CompletableFuture.allOf(accountsFuture, cardsFuture, pointFuture)
                    .thenApply(ignore -> UserDetail.builder()
                        .user(user)
                        .accounts(fAccounts.join())
                        .cards(fCards.join())
                        .point(fPoint.join())
                        .build())
                    .exceptionally(err -> {
                        throw new ConnectorException(err);
                    });
            })
            .map(CompletableFuture::join)
            .findFirst()
            .orElse(null);
    } catch (InvalidException | ConnectorException e) {
        log.debug(e.getMessage(), e);
        throw e;
    } catch (Exception e) {
        log.error(e.getMessage(), e);
        throw e;
    }
}

Cautions

References