Stream API
// https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
스트림Stream은 Java 8에서 추가되었고, 컬렉션의 map/reduce 같은 elements의 흐름을 함수형 스타일로 (람다식도 적용하여) 처리할 수 있도록 지원해주는 라이브러리다. 스트림은 컬렉션과 비슷하지만 다음과 같은 차이점이 있다.
-
스트림은 값을 읽기만 할 뿐 저장하지 않는다.
-
스트림의 모든 함수는 순수함수pure function이다.
-
스트림은 게으르게lazy 동작한다.
-
무한한 스트림이 가능하다.
-
스트림은 일회성으로 사용과 동시에 소멸된다.
Note
|
Functional Interface
Java에서 Functional Interface는 하나의 abstract method를 가진 인터페이스로, 한가지 기능만 할 수 있다는 것을 나타낸다.
java.util.function 패키지 하위에는 lambda expressions 과 method references 를 위한 Functional Interface 들이 존재한다. |
Note
|
Lambda Expressions
람다 표현식(람다식)을 짧게 설명하면 간단한 방식으로 코드를 작성하고 실행하는 방법이라고 할 수 있다. 함수를 간결하게 표현하고, 프로그래밍 언어 개념으로는 단순한 익명 함수를 생성하는 문법으로 볼 수 있다. 아래는 람다식으로 간결하게 표현한 코드 예제이다.
|
Stream Creation
스트림을 생성하는 방법에는 여러가지가 있으며, 이미 많은 클래스들에 구현되어 있다.
Array to Stream
String[] arr = new String[]{"a", "b", "c"};
// String[] arr = Arrays.asList("a", "b", "c");
Stream<String> stream = Arrays.stream(arr);
Collection to Stream
stream()
디폴트 메서드가 Collection 인터페이스에 추가되어 있어 모든 컬렉션에서 스트림을 생성할 수 있다.
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream1 = list.stream();
Stream<String> stream2 = list.parallelStream(); // 병렬 처리 스트림
Empty Stream
// Null-Safe
public Stream<String> streamOf(Collection<String> list) {
returen list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
Stream.builder()
Stream<String> s = Stream.<String>builder()
.add("a")
.add("b")
.add("c")
.build();
Stream.generate()
generate()
는 무한 스트림을 생성하기 때문에 최대 크기를 지정해줘야 한다.
Stream.generate(() -> "str").limit(5);
Stream.iterate()
iterate()
는 초깃값과 다음 요소에 반영될 람다식이 들어간다.
Stream<Integer> s = Stream.iterate(10, n -> n + 1).limit(5); // 10, 11, 12, 13, 14
Primitive Type Stream
IntStream s1 = IntStream.range(1, 5); // 1, 2, 3, 4
LongStream s2 = LongStream.rangeClosed(1, 5); // 1, 2, 3, 4, 5
DoubleStream s3 = DoubleStream.of(0, 4, 3); // 0.0, 4.0, 3.0
DoubleStream s4 = new Random().doubles(3)
// boxing
Stream<Integer> s5 = IntStream.range(1, 5).boxed();
원시 스트림primitive stream을 객체 스트림으로 변환하려면 mapToObj()
를 사용한다.
IntStream.rangeClosed(2, 12) // 2 ~ 12
.mapToObj(month -> new Month(month))
.collect(Collectors.toList());
String to Stream
IntStream s1 = "abc".chars();
Stream<String> s2 = Pattern.compile("\\|").splitAsStream("a|b|c");
File to Stream
Stream<String> s1 = Files.lines(Paths.get("~/words.txt"), StandardCharsets.UTF_8);
Stream<Path> s2 = Files.list(Paths.get("~/Documents"));
Parallel Stream
스트림 생성 시 stream()
대신 parallelStream()
을 사용하면 된다.
Stream<String> s = list.parallelStream();
boolean isParallel = s.isParallel(); // Check parallel stream
s.sequential(); // Change parallel to sequential stream
s.parallel(); // Change sequential to parallel stream
스레드를 처리하기 위해 Java 7부터 도입된 Fork/Join framework를 내부적으로 사용한다.
스레드의 개수의 기본값은 Runtime.getRuntime().availableProcessors() - 1
로 'CPU 개수 - 1'이다(1은 메인 스레드이지 않을까?).
스래드 개수는 아래와 같이 지정할 수 있다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Merging Streams
Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);
Stream<Integer> resultingStream = Stream.concat(s1, s2);
Stream<Integer> s1 = Stream.of(1, 3, 5);
Stream<Integer> s2 = Stream.of(2, 4, 6);
Stream<Integer> s2 = Stream.of(18, 15, 36);
Stream<Integer> resultingStream = Stream.concat(Stream.concat(s1, s2), s3);
// using flatMap()
// Stream<Integer> resultingStream = Stream.of(s1, s2, s3).flatMap(i -> i);
Stream Operations
스트림 연산은 크게 intermediate operations(return Stream<T>) 과 terminal operations(특정 타입의 결과를 반환) 나뉘어있다. 자세한 내용은 Stream 문서에서 확인할 수 있다.
Intermediate Operations
중간 작업, 중개 연산, 중간 연산자
모든 Intermediate Operations 은 pure function으로 구현되어 있으며, Stream<T> 을 반환해 메서드 체이닝(chaining)이 가능하다.
long count = list.stream()
.filter(n -> n > 100) // (1)
.distinct()
.count(); // (2)
-
filter()
,distinct()
는 intermediate operations를 나타낸다. -
stream의 사이즈를 반환하는 terminal operations이다.
Filtering
filter
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
.filter(str -> {
return "a".equals(str);
});
Stream<String> s2 = list.stream()
.filter(str -> "a".equals(str));
Stream<String> s3 = list.stream()
.filter("a"::equals); // Java 8 method reference
// a
limit
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
.limit(3);
// a b c
skip
ArrayList<String> list = Arrays.asList("a", "b", "c", "A", "B", "C");
Stream<String> s1 = list.stream()
.skip(3);
// A B C
takeWhile
JDK9에서 추가되었다.
Stream.of(2, 4, 6, 8, 9, 10, 12)
.takeWhile(n -> n % 2 == 0)
.forEach(System.out::println);
// 2
// 4
// 6
// 8
Mapping
map
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> s1 = list.stream()
.map(s -> s.toUpperCase());
Stream<String> s2 = list.stream()
.map(String::toUpperCase); // Java 8 method reference
// A B C
flatMap
중첩 구조를 단일 구조로 풀어주는 작업을 한다. 이러한 작업을 flattening 이라고 한다.
List<String> list1 = Arrays.asList("a", "b", "c");
List<String> list2 = Arrays.asList("A", "B", "C");
Stream<String> s1 = Stream.of(list1.stream(), list2.stream())
.flatMap(strings -> strings);
// a b c A B C
boxed
Primitive Type Stream의 각 Element를 boxing 시켜 Wrapper class 변환시켜준다.
int[] a1 = IntStream.of(3, 2, 1)
.toArray();
// [3, 2, 1]
List<Integer> l1= IntStream.of(3, 2, 1)
.boxed()
.collect(Collectors.toList());
// [3, 2, 1]
Sorting
sorted
int[] a1 = IntStream.of(3, 2, 1)
.sorted()
.toArray();
// [1, 2, 3]
List<User> list1 = Arrays.asList(new User("c"), new User("b"), new User("a"));
Stream<User> s1 = list1.stream()
.sorted(Comparator.comparing(User::getName));
// User(name=a), User(name=b), User(name=c)
Stream<User> s2 = list1.stream()
.sorted(Comparator.comparing(User::getName).reversed());
// User(name=c), User(name=b), User(name=a)
Tip
|
비교하려는 값에 Null이 있을 경우
정렬 기준 값이 Null이 있을 경우 NPE가 발생하게 된다. 이러한 NPE를 피하기 위해서는 Null safe
|
distinct
Stream 내에 Element의 중복을 제거한다.
IntStream.of(1, 2, 3, 3)
.distinct()
.toArray()
// [1, 2, 3]
Iterating
peek
'peek’는 '훔쳐보다', '살짝 보이다’는 뜻으로, Stream API에서는 intermediate operation 에서 값을 확인할 수 있는 메서드로 사용한다. peek 메서드만으로는 스트림을 소비하지 않는다.
IntStream.of(1, 2, 3, 4)
.filter(e -> e > 2)
.peek(e -> System.out.println("Filtered value: " + e))
.map(e -> e * e)
.peek(e -> System.out.println("Mapped value: " + e))
.sum();
// Filtered value: 3
// Mapped value: 9
// Filtered value: 4
// Mapped value: 16
Terminal Operations
최종 연산
스트림을 가지고 결과값을 만들어내는 연산이다. 스트림은 평가되기 전까지 실행되지 않는데(lazy), terminal operation이 스트림을 평가하는 역할이다.
Collecting
collect
아마 가장 많이 사용하는 Terminal Operation은 collect()
일 것 같다.
이 메서드와 Collectors
클래스의 static 메서드를 활용하여 스트림을 여러 결과로 반환할 수 있다.
Collectors
는 컬렉션의 요소element를 다양한 기준에 따라 summarizing 하거나 모으는 등 유용한 리덕션 연산을 static 메서드로 구현해둔 클래스이다.
Note
|
일부 메서드에는 Java 8 이전의 코드스타일과 스트림을 사용한 코드와 비교하기 위해 |
Collectors.toList()
결과를 List
로 변환해주는 메서드이다. 내 경험상 Terminal Operation 내에서 가장 자주 사용하는 메서드인 것 같다.
// as-is
final List<String> list = Arrays.asList("a", "b", "c");
// to-be
// [A, B, C]
List<String> upperCaseList = new ArrayList<>();
for (String str : list) {
upperCaseList.add(str.toUpperCase());
}
list.stream()
.map(String::toUpperCase)
.collect(Collectors.toList())
Note
|
JDK 버전업에 따른 변화
|
Collectors.toMap()
List<T>
에서 특정 값을 key로 사용하여 Map
으로 변환하는 메서드이다.
// as-is
final List<Payment> payments = Arrays.asList(
new Payment("p01", 10_000),
new Payment("p02", 20_000),
new Payment("p03", 50_100));
// to-be
// {p01=10000, p03=50100, p02=20000}
Map<String, Payment> map = new HashMap<>();
for (Payment payment : payments) {
map.put(payment.getId(), payment);
}
final Map<String, Payment> map = payments.stream()
.collect(Collectors.toMap(Payment::getId, payment -> payment));
Collectors.joining()
Stream.of("a", "b", "c")
.map(String::toUpperCase)
.collect(Collectors.joining(", "))
// A, B, C
Stream.of("a", "b", "c")
.map(String::toUpperCase)
.collect(Collectors.joining(", ", "<", ">"))
// <A, B, C>
Collectors.groupingBy()
Map<Integer, List<User>> u1 = Stream.of(
User.builder().name("a").age(29).build(),
User.builder().name("b").age(19).build(),
User.builder().name("c").age(29).build(),
User.builder().name("d").age(19).build(),
User.builder().name("e").age(39).build()
).collect(Collectors.groupingBy(User::getAge));
{
19=[User(name=b, age=19), User(name=d, age=19)],
39=[User(name=e, age=39)],
29=[User(name=a, age=29), User(name=c, age=29)]
}
Note
|
groupingBy 로 chunk 구현하기
chunk(), chunkStream()
References |
Collectors.partitioningBy()
Map<Boolean, List<Integer>> result;
result = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collect(Collectors.partitioningBy(n -> n > 3));
// {false=[1, 2, 3], true=[4, 5, 6, 7, 8, 9, 10]}
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collect(Collectors.partitioningBy(n -> n > 3, Collectors.counting()));
// {false=3, true=7}
Collectors.collectingAndThen()
collect 한 이후에 필요한 작업을 추가한다.
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(
Collectors.toSet(),
Collections::unmodifiableSet));
Collectors.averageingInt()
// average
Integer averageAge = Stream.of(user1, user2, user2)
.collect(Collectors.averageingInt(User::getAge));
Collectors.summingInt()
// 1. sum
Integer sumAge1 = Stream.of(user1, user2, user2)
.collect(Collectors.summingInt(User::getAge));
// 2. mapToInt를 활용해 sum 구하는 방법
Integer sumAge2 = Stream.of(user1, user2, user2)
.mapToInt(User::getAge)
.sum();
Collectors.summarizingInt()
Stream<BigDecimal> s = Stream.iterate(BigDecimal.ONE, n -> n.add(BigDecimal.ONE))
.limit(10);
IntSummaryStatistics i = s.collect(Collectors.summarizingInt(BigDecimal::intValue));
// IntSummaryStatistics{count=10, sum=55, min=1, average=5.500000, max=10}
Collectors.of()
collector를 직접 만들어서 사용하고자 할 경우 of() 를 활용할 수 있다.
List<Integer> c = Arrays.asList(1, 2, 3, 4, 5, 6, 7).stream()
.collect(Collector.of(
ArrayList::new,
List::add,
(left, right) -> {
left.addAll(right);
return left;
}));
// [1, 2, 3, 4, 5, 6, 7]
toArray
int[] a1 = Stream.of(1, 2, 3)
.toArray();
String[] a2 = Arrays.stream("a", "b", "c")
.map(String::toUpperCase)
.toArray(String[]::new);
Reduction
reduce는 캐터모피즘catamorphism 이라는 목록 조작 개념의 특별한 변형으로, 컬렉션을 줄여나가는 방법이다. 스트림 API에서는 reduce() 로 리듀싱을 제공한다.
Stream.of(1, 2, 3).reduce(0, (x, y) -> x + y); // 6
Stream.of(1, 2, 3).reduce(0, Integer::sum); // 6
int sum = Stream.of(1, 2, 3)
.reduce((l, r) -> {
System.out.println("l=" + l + ", r= " + r);
return (l + r);
}).get();
System.out.println("sum:" + sum);
l=1, r= 2 l=3, r= 3 sum: 6
int sum = Stream.of(1, 2, 3)
.reduce(10, (l, r) -> {
System.out.println("l=" + l + ", r= " + r);
return (l + r);
});
System.out.println("sum:" + sum);
l=10, r= 1 l=11, r= 2 l=13, r= 3 sum: 16
Important
|
collect vs reduce
collect 는 변경 가능한(mutable) 결과 객체를 사용하여 동작한다. 즉, 내부적으로 mutable한 collection이 하나 생성되고 각 요소를 collection에 축적해나가며 동작한다. 그에 반해, reduce 는 immutable한 결과 객체(누산기accumulator)를 사용한다.
만약 |
Matching
Predicate 를 받아 해당 조건을 만족하는지 체크한 결과를 반환한다.
IntStream.of(1, 1, 1).allMatch(a -> a == 1); // true
IntStream.of(1, 2, 3).anyMatch(a -> a == 1); // true
IntStream.of(1, 2, 3).noneMatch(a -> a == 4); // true
-
anyMatch : 하나라도 만족하는지
-
allMatch : 모두 만족하는지
-
noneMatch : 모두 만족하지 않는지
Iterating
forEach
// stream
Arrays.asList(1, 2, 3)
.stream()
.forEach(System.out::println);
1 2 3
Important
|
Stream.forEach vs Collection.forEach
두 코드의 결과는 동일하다. 하지만 두 코드의 속도만 비교한다면
|
Calculating
IntStream.of(1, 2, 3).count() // 3
IntStream.of(1, 2, 3).sum() // 6
IntStream.of(1, 2, 3).min() // OptionalInt[1]
IntStream.of(1, 2, 3).max() // OptionalInt[3]
IntStream.of(1, 2, 3).average() // OptionalDouble[2.0]
IntSummaryStatistics s = IntStream.of(1, 2, 3).summaryStatistics();
// {count=3, sum=6, min=1, average=2.000000, max=3}
s.getCount(); // 3
s.getMin(); // 1
s.getMax(); // 3
s.getSum(); // 6
s.getAverage(); // 2.0
Concurrency
- Asynchronous Execution
-
Future (java 5) → ListenableFuture (spring 3.0) → CompletableFuture (java 8)
class Future class ListenableFuture class CompletableFuture Future <|-- CompletableFuture Future <|-- ListenableFuture
- HTTP Client
-
RestTemplate (spring 3.0) → AsyncRestTemplate (spring 4.0) → WebClient (spring 5.0)
Future
Future는 Java 5에 추가된 클래스로, 비동기 연산 결과를 나타낸다.
작업이 완료되었는지 확인하고(isDone()
), 완료를 기다리며, 계산 결과를 확인하는(get()
) 메서드와 작업을 취소(cancel()
)하는 메서드가 제공된다.
일단 작업이 완료되면 취소할 수 없으며, 결과는 작업이 완료되었을 때만 메서드를 통해 확인할 수 있고, 필요한 경우 준비가 될 때까지 blocking 한다.
@Test
void futureTest() throws InterruptedException, ExecutionException {
System.out.println("Thread#" + Thread.currentThread().getId());
ExecutorService executor = Executors.newFixedThreadPool(1); // (1)
Future<Integer> future = executor.submit(() -> {
TimeUnit.SECONDS.sleep(1);
System.out.println("Thread#" + Thread.currentThread().getId());
return 123;
});
System.out.println("future done? " + future.isDone());
Integer result = future.get(); // (2)
System.out.println("future done? " + future.isDone());
System.out.println("result:" + result);
}
-
Java5 에서 멀티스레드와 콜백 사용하기 위해 ExecutorService 사용
-
get() 은 blocking 메서드이다.
Thread#1 future done? false Thread#12 future done? true result:123
ListenableFuture
Spring Framework 4.0 에서 추가된 ListenableFuture는 callback을 적용한 Future 의 확장 클래스이다. AsyncRestTemplate 의 리턴타입이기도 하다. JQuery에서 Promise 사용하기 전 Ajax 호출하는 것과 비슷한것 같다.
@Test
void async() throws InterruptedException {
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
ListenableFuture<ResponseEntity<Map>> entity =
asyncRestTemplate.getForEntity("https://httpbin.org/get", Map.class);
entity.addCallback(
result -> System.out.println(result.getStatusCode()), // (1)
err -> System.out.println(Arrays.toString(err.getStackTrace()))); // (2)
System.out.println("test1");
TimeUnit.SECONDS.sleep(5); // (3)
System.out.println("test2");
}
-
성공시 실행할 callback 함수(SuccessCallback 인스턴스)
-
실패시 실행할 callback 함수(FailureCallback 인스턴스)
-
sleep() 을 주지않으면 메인 스레드가 바로 종료되면서 HTTP 응답값을 확인 할 수 없다.
DEBUG: Created asynchronous GET request for "https://httpbin.org/get" ... DEBUG: Setting request Accept header to [application/json, application/*+json] test1 (1) DEBUG: Async GET request for "https://httpbin.org/get" resulted in 200 (OK) DEBUG: Reading [interface java.util.Map] as "application/json" using [o.s.h.c.j.MappingJackson2HttpMessageConverter] 200 (2) test2 (3)
-
코드를 실행시키면 바로 출력
-
HTTP 응답을 받을 때 바로 출력(sleep와 무관하게 동작)
-
"test1" 출력하고 5초 이후에 출력
Note
|
Java 8 lambda expression을 활용함으로써 코드가 간결해진다. lambda expression을 사용안할 경우
|
References
CompletableFuture
Java 8에 추가된 CompletableFuture는 Future 뿐만 아니라 CompletionStage를 함께 확장한 클래스이다. CompletableFuture 의 개념은 다른 프로그래밍 언어에서는 Deferred 또는 Promise 라고 부른다.
CompletionStage 는 다른 CompletionStage 가 완료될 때 작업을 수행하거나 계산하기 위한 비동기 작업의 stage이다. stage는 작업이 종료될 때 완료되지만, 종속적인 다른 stage를 트리거 할 수도 있다.
public Future<String> getStringAsync() throws InterruptedException {
CompletableFuture<String> cf = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
TimeUnit.SECONDS.sleep(1);
cf.complete("Hello");
return null;
});
return cf;
}
@Test
public void completableFutureExample() throws InterruptedException {
System.out.println("Thread#" + Thread.currentThread().getId() + ": 1");
CompletableFuture<String> cf = (CompletableFuture)getAsync();
cf.thenAccept(s -> { // (1)
System.out.println("Thread#" + Thread.currentThread().getId() + ": 2");
System.out.println(s);
});
System.out.println("Thread#" + Thread.currentThread().getId() + ": 3");
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread#" + Thread.currentThread().getId() + ": 4");
}
// Output:
// Thread#1: 1
// Thread#1: 3 (2)
// Thread#12: 2 (3)
// Hello
// Thread#1: 4 (4)
-
thenAccept 에는 Callable 의 return 값이 아니라 CompletableFuture 인스턴스의 complete 로부터 전달받은 인자가 파라미터로 들어온다.
-
응용 프로그램이 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.
-
메인 스레드는 3초 기다리고 있는 중이지만 cf의 값이 반환되면서 "2"가 출력된다.
-
메인 스레드에서 3초가 지나고나서 "4"가 출력되면서 프로그램이 종료된다.
supplyAsync
CompletableFuture 를 생성하는 팩토리 메서드로 Supplier를 인자로 받는 supplyAsync() 가 있다. 비동기적으로 실행해서 결과를 생성하며 람다식을 이용하여 쉽게 구성하고 조합할 수 있다.
@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
print.accept("1");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1); // (1)
print.accept("2");
return "Hello World";
});
print.accept("3"); // (2)
print.accept(future.get()); // (3)
print.accept("4");
}
// Output:
// Thread#1: 1
// Thread#1: 3
// Thread#12: 2 // (4)
// Thread#12: Hello World (5)
// Thread#1: 4
-
별도의 스레드에서 1초를 기다린다.
-
응용 프로그램을 실행되면 메인 스레드에서 바로 "1", "3"이 출력된다.
-
future
의 값을 확인하기 위해 blocking 된 상태로 기다린다. -
1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.
-
blocking 되어 있던 get 이 동작하면서 "Hello World"를 출력하고 나머지 작업을 진행한다.
runAsync
CompletableFuture 를 생성하는 팩토리 메서드로 Runnable 을 인자로 받기 떄문에 반환값이 없다.
CompletableFuture.runAsync(() -> {
log.info("test");
}).get();
thenApply
private static Function<String, String> appendName = (String name) -> "Hello " + name + "!";
@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
print.accept("1");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1); // (1)
print.accept("2");
return "Jun";
}).thenApply(appendName);
print.accept("3"); // (2)
print.accept(future.get()); // (3)
print.accept("4");
sleep(3);
}
// Output:
// Thread#1: 1
// Thread#1: 3
// Thread#12: 2 // (4)
// Thread#1: Hello Jun! // (5)
// Thread#1: 4
-
별도의 스레드에서 1초를 기다린다.
-
응용 프로그램을 실행하면 메인 스레드에서 바로 "1", "3"이 출력된다.
-
future
의 값을 확인하기 위해 blocking 된 상태로 기다린다. -
1초간 멈춰있던 Thread#12가 동작하며 "2"를 출력한다.
-
blocking 되어 있던 get 은
appendName
이후의 결과를 출력한다.
thenCompose
thenCompose는 여러 CompletableFuture 를 조립해서 순차적으로 실행하기 위한 메서드이다. CompletableFuture<U>
를 반환하기 때문에 chaining이 가능하다.
private static Consumer<String> print = (String str) ->
System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);
@Test
public void completableFutureExample() throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
sleep(2);
print.accept("1");
return "Hello";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
sleep(2);
print.accept("2");
return s + " Beautiful";
})).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
sleep(2);
print.accept("3");
return s + " World";
})).thenAccept(str -> {
sleep(2);
print.accept("4");
System.out.println(str);
});
print.accept("5");
System.out.println(cf.isDone());
System.out.println(cf.get());
sleep(10);
}
Thread#1: 5
false (1)
Thread#12: 1 (2)
Thread#13: 2 (3)
Thread#13: 3 (4)
Thread#13: 4 (5)
Hello Beautiful World
null (6)
-
isDone()
의 결과로 아직 완료되지 않아 false가 출력된다. -
2초 뒤 "1"이 출력되고, "Hello"과 함께 CompletableFuture 값을 반환한다.
-
2초 뒤 "2"이 출력되고, 앞에서 전달받은 "Hello"에 "Beautiful"을 조합한 CompletableFuture 값을 반환한다.
-
2초 뒤 "3"이 출력된고, 앞에서 전달받은 "Hello Beautiful"에 "World"을 조합한 CompletableFuture 값을 반환한다.
-
thenAccept()
에서 2초 뒤 "4"와 함께 "Hello Beautiful World"가 출력된다. -
cf
의 작업이 완료되었으므로cf.get()
가 동작한다. 마지막에 호출된 thenAccept 는 Consumer 를 파라미터로 받는데, 이것은 반환값이 없으므로 null 을 출력한다.
exceptionally
exceptionally는 CompletableFuture 작업 내에서 발생하는 Exception을 처리하기 위한 메서드이다. ListenableFuture 에서 FailCallback 의 onFailure 를 대신할 수 있다. ListenableFuture 는 요청별로 실패 로직을 설정하는 반면 exceptionally 는 모든 Exception을 통합적으로 처리할 수 있다.
@Test
public void exceptionallyExample1() throws InterruptedException, ExecutionException {
final String result = CompletableFuture.supplyAsync(() -> {
System.out.println("1");
return "1";
}).thenApply(s -> {
System.out.println("2");
throw new RuntimeException();
}).thenApply(s -> {
System.out.println("3");
return s + "3";
}).exceptionally(throwable -> {
System.out.println("4");
log.error("catch error", throwable);
return "exception!!!";
}).get();
System.out.println(result);
}
1 2 4 21:11:10.349 [main] ERROR com.example.demo.LocalTest - catch error java.util.concurrent.CompletionException: java.lang.RuntimeException at java.base/java.u.c.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.u.c.CompletableFuture.uniApplyNow(CompletableFuture.java:683) at java.base/java.u.c.CompletableFuture.uniApplyStage(CompletableFuture.java:658) at java.base/java.u.c.CompletableFuture.thenApply(CompletableFuture.java:2094) ... exception!!!
allOf
allOf는 여러 CompletableFuture 의 작업들이 모두 완료되었는지 확인하고자 할 때 사용한다.
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
확인할 CompletableFuture 들을 가변인자varargs 를 받고 CompletableFuture<Void>
를 반환한다.
반환값의 get()
혹은 join()
을 통해 모든 작업이 완료되기를 기다릴 수 있다.
allOf 의 반대로는, 여러 CompletableFuture 중 하나라도 완료되었는지 확인할 수 있는 anyOf 가 있다.
Tip
|
CompletableFuture vs Promise
|
private static Consumer<String> print = (String str) ->
System.out.println("Thread#" + Thread.currentThread().getId() + ": " + str);
@Test
public void completableFutureExample() throws InterruptedException, ExecutionException {
print.accept("1");
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
sleep(1);
print.accept("2");
return "Hello";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");
print.accept("3");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(f1, f2, f3);
print.accept("4");
combinedFuture.get();
print.accept("5");
TimeUnit.SECONDS.sleep(3);
print.accept("6");
}
Thread#1: 1 Thread#1: 3 (1) Thread#1: 4 Thread#12: 2 (2) Thread#1: 5 Thread#1: 6 (3)
-
실행되면 메인 스레드에서 "1", "3", "4" 가 출력된다.
-
1초 뒤 "2"가 출력되면서
combinedFuture.get()
아래의 "5"도 출력된다. -
"4"가 출력되고 3초 뒤 "5"가 출력되면서 프포그램이 종료된다.
join
join은 CompletableFuture 의 작업이 완료되면 결과를 반환하거나, 예외 발생시 (unchecked)예외를 던지는 메서드이다.
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "World");
String str = Stream.of(f1, f2, f3)
// .map(f -> { return f.join(); })
// .map(f -> f.join())
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
System.out.println(str); // Hello Beautiful World
Note
|
get vs join
CompletableFuture 의 get 과 join 은 작업이 완료되기를 기다리고 값을 반환하는 작업을 한다. 동일한 동작을 하지만 다음과 같은 차이점이 있다.
따라서, get 은 명시적으로 try-catch를 해줘야하는 반면 join 은 에러를 밖으로 던질 수 있게된다. 즉, 여러 CompletableFuture 를 다룰 때 join 과 exceptionally 를 통해 통합적인 에러 핸들링이 가능해진다. References |
RestTemplate with @Async
주로 배치성 프로젝트에서 컨트롤러에 비동기적으로 동작시키기 위해 @Async
을 사용했었다. 이 때, 대부분(내가 본 프로젝트) void
를 반환하였다.
하지만 Async 문서를 보면 void
혹은 Future
를 반환시킬 수 있다. 즉, CompletableFuture
를 활용할 수 있게 된다.
Warning
|
메서드에 |
However, the return type is constrained to either void or Future.
@Service
public class ReceiptService {
@Async
public CompletableFuture<Receipt> getAsync(@NotNull String paymentId) {
final Receipt receipt = restTemplate.getForObject(url, Receipt.class);
return CompletableFuture.completableFuture(receipt);
}
}
@RestController
public class ReceiptController {
@GetMapping("/")
public List<Receipt> getReceipts() {
CompletableFuture<Receipt> receipt1 = receiptService.getAsync(id);
...
CompletableFuture.allOf(receipt1, ...);
return ...;
}
}
TODO: 실제 동작하는 코드인지 테스트해보고 수정할 것
AsyncRestTemplate
AsyncRestTemplate은 Spring Framework 4.0 이후부터 제공되는 비동기 HTTP 요청을 위한 클래스이다. RestTemplate 와 유사하지만 ListenableFuture wrapper를 반환한다.
Spring Framework 5.0 부터 AsyncRestTemplate 클래스는 deprecated 되었고 WebClient 사용을 권장한다. RestTemplate 또한 앞으로 deprecated 될 예정이고 새로운 기능이 추가되지 않는다고 말한다.
Tip
|
Convert ListenableFuture to CompletableFuture
|
WebClient
Spring Framework 5.0에서 추가된 WebClient는 기존의 AsyncRestTemplate 역할을 한다. WebClient 는 Non-Block I/O 기반에 ThreadSafe 하다.
TODO: Reactive programming, Flux, Mono 등 추가적인 학습이 더 필요할 것 같다.
private static final String HOST = "http://fakerestapi.azurewebsites.net/api";
@Test
public void webClientExample() {
WebClient client = WebClient.create(HOST);
WebClient.RequestBodySpec req1 = client
.method(HttpMethod.GET)
.uri("/Users/1")
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8);
WebClient.RequestBodySpec req2 = client
.method(HttpMethod.GET)
.uri("/Users/2");
User user1 = req1.exchange().block().bodyToMono(User.class).block();
User user2 = req2.exchange().block().bodyToMono(User.class).block();
System.out.println(user1);
System.out.println(user2);
}
User(id=1, name=User 1, password=Password1) User(id=2, name=User 2, password=Password2)
Examples
여러 메서드들을 조합하여 몇가지 예제를 만들어 보고자 한다. Mock 데이터를 위해 Fake Rest API와 Slowwly를 사용할 것이며, 모든 예제 코드에는 아래와 같은 User 클래스가 선언되어 있다고 가정한다.
@Data
class User {
@JsonProperty("ID")
private String id;
@JsonProperty("UserName")
private String name;
@JsonProperty("Password")
private String password;
}
Example 1: AsyncRestTemplate vs RestTemplate
다음 예제는 AsyncRestTemplate
과 RestTemplate
의 속도를 비교해보려 한다.
asyncRestTemplate
, restTemplate
, restTemplateWithParallel
메서드로 나눠져 있으며,
모든 메서드가 ID가 1 부터 9까지의 사용자를 조회하고 응답값을 반환하는 작업을 한다. 모든 HTTP API에는 2초간 딜레이가 존재한다.
결과는 asyncRestTemplate
이 가장 빠르게 동작했다. 모든 요청을 한번에 조회하고 CompletableFuture::join
에서 모든 응답을 모아 작업이 진행되었다.
restTemplate
은 예상대로 한번 요청하고 응답 받고 하는 식으로 진행되었고, 가장 느린 결과를 보였다.
restTemplateWithParallel
은 로컬 환경에서 Runtime.getRuntime().availableProcessors() - 1
이 3으로 3번씩 호출하는 것 같았다.
private final static String SLOW_HOST =
"http://slowwly.robertomurray.co.uk/delay/2000/url/http://fakerestapi.azurewebsites.net";
private final AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate();
private final RestTemplate restTemplate = new RestTemplate();
@Test
public void test() {
final List<String> ids = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9");
final Map<String, Long> resultsTime = new HashMap<>(3);
long startTime;
// asyncRestTemplate
startTime = System.nanoTime();
System.out.println(asyncRestTemplate(ids));
resultsTime.put("asyncRestTemplateElapsedTime", System.nanoTime() - startTime);
// restTemplate
startTime = System.nanoTime();
System.out.println(restTemplate(ids));
resultsTime.put("restTemplateElapsedTime", System.nanoTime() - startTime);
// restTemplate with parallel
startTime = System.nanoTime();
System.out.println(restTemplateWithParallel(ids));
resultsTime.put("restTemplateWithParallelElapsedTime", System.nanoTime() - startTime);
// results
final DecimalFormat formatter = new DecimalFormat("#,###ms");
resultsTime.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getValue))
.forEach(result -> System.out.println(
result.getKey() + ": " + formatter.format(result.getValue() / 1_000_000)));
}
private List<User> asyncRestTemplate(List<String> ids) {
List<CompletableFuture<ResponseEntity<User>>> res = new ArrayList<>(ids.size());
// stream 으로 만들면 lazy하게 동작하면서 한번씩 API 요청?
ids.forEach(id -> res.add(convert(asyncRestTemplate.getForEntity(
SLOW_HOST + "/api/Users/" + id, User.class))));
return res.stream()
.map(CompletableFuture::join)
.map(HttpEntity::getBody)
.collect(Collectors.toList());
}
private List<User> restTemplate(List<String> ids) {
List<User> users = ids.stream()
.map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
.map(HttpEntity::getBody)
.collect(Collectors.toList());
return users;
}
private List<User> restTemplateWithParallel(List<String> ids) {
List<User> users = ids.parallelStream()
.map(id -> restTemplate.getForEntity(SLOW_HOST + "/api/Users/" + id, User.class))
.map(HttpEntity::getBody)
.collect(Collectors.toList());
return users;
}
asyncRestTemplateElapsedTime: 3,746ms restTemplateWithParallelElapsedTime: 8,353ms restTemplateElapsedTime: 25,067ms
Example 2: getUser
슈도 코드
@GetMapping("/users/{userId}")
public UserDetail get(@PathVariable(value = "userId") String id) {
try {
return Stream.of(id)
.map(userMapper::get)
.map(validate) // throw InvalidException
.map(user -> {
final String uuid = user.getId();
CompletableFuture<Accounts> fAccounts = accountConnector.getAccounts(uuid);
CompletableFuture<Cards> fCards = cardConnector.getCards(uuid);
CompletableFuture<Point> fPoint = pointConnector.getPoint(uuid);
return CompletableFuture.allOf(accountsFuture, cardsFuture, pointFuture)
.thenApply(ignore -> UserDetail.builder()
.user(user)
.accounts(fAccounts.join())
.cards(fCards.join())
.point(fPoint.join())
.build())
.exceptionally(err -> {
throw new ConnectorException(err);
});
})
.map(CompletableFuture::join)
.findFirst()
.orElse(null);
} catch (InvalidException | ConnectorException e) {
log.debug(e.getMessage(), e);
throw e;
} catch (Exception e) {
log.error(e.getMessage(), e);
throw e;
}
}
Cautions
TODO