Coroutine은 Kotlin만의 기능은 아니지만, Kotlin을 사용하면서 처음 접하게 되어서 /kotlin 에 추가하였다.

들어가기 앞서

코루틴(coroutine)은 루틴의 일종으로서, 협동 루틴이라 할 수 있다(코루틴의 "Co"는 with 또는 togather를 뜻한다). 상호 연계 프로그램을 일컫는다고도 표현가능하다. 루틴과 서브 루틴은 서로 비대칭적인 관계이지만, 코루틴들은 완전히 대칭적인, 즉 서로가 서로를 호출하는 관계이다. 코루틴들에서는 무엇이 무엇의 서브루틴인지를 구분하는 것이 불가능하다. 코루틴 A와 B가 있다고 할 때, A를 프로그래밍 할 때는 B를 A의 서브루틴으로 생각한다. 그러나 B를 프로그래밍할 때는 A가 B의 서브루틴이라고 생각한다. 어떠한 코루틴이 발동될 때 마다 해당 코루틴은 이전에 자신의 실행이 마지막으로 중단되었던 지점 다음의 장소에서 실행을 재개한다.

— 위키백과
코루틴

코루틴은 컴퓨터 프로그램 구성 요소 중 하나로 비선점형 멀티태스킹(non-preemptive multitasking)을 수행하는 일반화한 서브루틴(subroutine)이다. 코루틴은 실행을 일시 중단(suspend)하고 재개(resume)할 수 있는 여러 진입 지점(entry point)을 허용한다.

— Kotlin in Action
서브루틴subroutine
  • 여러 명령어를 모아 이름을 부여해서 반복 호출을 할 수 있게 정의한 프로그램 구성 요소(a.k.a. 함수).

  • 객체지향 언어에서는 메서드도 서브루틴이라 할 수 있음.

  • 서브루틴에 진입하는 방법은 오직 한 가지 뿐.

    • 해당 함수를 호출하면 서브루틴의 맨 처음부터 실행 시작.

    • 시작될 때 마다 활성 레코드activation record가 스택에 할당되면서 서브루틴 내부의 로컬 변수 등이 초기화 됨.

  • 서브루틴 안에서 여러 번 return 을 사용할 수 있음.

    • 서브루틴이 실행을 중단하고 제어를 호출한쪽caller에게 돌려주는 지점은 여럴 있을 수 있음.

    • 다만 서브루틴에서 반환되고 나면 활성 레코드가 스택에서 사라지기 때문에 실행 중이던 모든 상태를 잃어버림.

      • 서브루틴을 여러 번 반복 실행해도 항상 같은 결과를 얻게 됨(side-effect가 있지 않는 한)

비선점형non-preemptive
  • 멀티태스킹의 각 작업을 실행하는 참여자들의 실행을 운영체제가 강제로 일시 중단시키고 다른 참여자를 실행하게 만들 수 없다는 뜻.

  • 각 참여자들이 서로 자발적으로 협력해야만 비선점형 멀티태스킹이 제대로 작동할 수 있음.

멀티태스킹multitasking
  • 여러 작업을 동시에 수행하는 것처럼 보이거나, 실제로 동시에 수행하는 것.

Coroutine

  • 코루틴coroutine은 코틀린kotlin과 이름이 비슷해서 코틀린 기능이라고 생각할 수 있지만, 여러 언어에서 지원하는 개념.

  • concurrency design pattern.

  • 코틀린 팀은 코루틴을 경량 스레드: Light-weighted thread 로 정의.

    • 한 thread에서 다수의 coroutine을 수행할 수 있음과 Context Switching이 필요없기 때문에 이렇게 부름

  • 코루틴은 코드 블록을 실행하고 비슷한 라이프 사이클을 가졌지만 반환 값이나 예외를 사용해 완료할 수 있는 아주 가벼운 스레드다.

  • 기술적으로 코루틴은 중지 가능한 계산의 인스턴스며, 일시 중단할 수 있는 계산이다.

  • 코루틴은 특정 스레드에 바인딩되지 않으며, 한 스레드에서 일시 중지하고 다른 스레드에서 재개할 수 있다.

코루틴을 사용하면…​

  • Thread 보다 리소스를 더 효율적으로 사용하면서 더 쉽게 작동.

  • Thread는 아니지만 비동기적인asynchronous 프로그래밍이 가능하게 만들어줌.

  • Thread는 '중단block'되지만, Coroutine은 '보류suspend' 됨.

  • Thread보다 더 좋은 성능을 제공.

  • Thread는 중단될 때 중단이 풀릴 때까지 아무 일도 할 수 없음.

  • 코루틴은 Thread에 의해 실행되며, 코루틴을 실행하는 스레드를 중단시키지 않음.

    • 대신에 보류된 함수를 실행하는 Thread는 다른 Coroutine을 실행하는 데 사용될 수 있음.

    • 내부적으로 실행이 보류되는 함수를 suspend 키워드로 나타냄.

@startuml
[Component] --> Interface1
[Component] -> Interface2
@enduml

Thread vs Coroutine

  • 아래 예제는 Thread.sleep 을 사용하 I/O 계산을 시뮬레이팅한다.

    코루틴이 없는 간단한 예제 1
    import kotlin.concurrent.thread
    
    fun main(args: Array><String>) {
      thread {
        Thread.sleep(1_000)
        println("World!")
      }
      print("Hello ")
      Thread.sleep(2_000)
    }
  • 보다 예쁜 코드는 아래와 같다.

    fun main(args: Array<String>) {
      var computation = thread {
        Thread.sleep(1_000)
        println("World!")
      }
      print("Hello ")
      computation.join() // (1)
    }
    1. 이 메서드가 완료되기를 기다리므로, 예측한 시간을 기다리는 것보다 훨씬 똑똑한 방식이다.

  • 스레드는 JVM에서 비동기 동시 애플리케이션의 빌딩 블록

  • JVM 스레드는 대부분 (프로세서 내의 코어 같은)하드웨어 스레드에 의해 백업된다.

  • 하드웨어 스레드는 여러 소프트웨어 스레드(JVM 스레드는 소프트웨어 스레드의 일종이다)를 지원할 수 있지만, 오직 하나의 소프트웨어 스레드만이 주어진 시간에 실행된다.

  • OS는 각 하드웨어 스레드에서 실행되는 소프트웨어 스레드를 결정하고 생존한 스레드 사이를 빠르게 전환하므로, 여러 소프트웨어 스레드가 동시에 실행되는 것처럼 보이게 한다. (라운드로빈?)

  • JVM 스레드는 매루 빠르고 반응이 좋지만 비용이 크다.

    • 각 스레드는 생성, 처분, 컨텍스트 스위치 시 CPU 타임과 메모리를 소모한다.

    • 이 비용이 상대적으로 높기 때문에 JVM 애플리케이션은 많은 수의 스레드를 가질 수 없다.

  • 현재의 JVM 애플리케이션에서 스레드를 생성하고 파괴하는 것은 나쁜 습관 습관으로 간주된다.

    • 대신 스레드를 관리하고 재사용해 생성과 처분의 비용을 줄일 수 있는 추상적인 Excutor를 사용한다.

      fun main(args: Array<String>) {
        val executor = Executors.newFixedThreadPool(1024)
        repeat(10_000) {
          executor.submit {
            Thread.sleep(1_000)
            println(".")
          }
          executor.shutdown()
        }
      }

Kotlin의 Coroutine

Suspending functions


async 생성하기
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async

fun fetchCharacterData(): Deferred<CharacterGenerator.CharacterData> { // (1)
  return GlobalScope.async { // (2)
    val apiData = URL(API_URL).readText()
    CharacterGenerator.fromApiData(apiData)
  }
}
  1. Deferred 는 우리가 요청할 때까지 데이터를 반환하지 않는다.

  2. async 는 하나의 인자로 람다를 받으며, 람다에 백그라운드에서 처리할 작업을 지정한다.

await로 결과 기다리기
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch

class AppService {
  fun onCreate() {
    GlobalScope.launch(Dispatchers.Main) { // (1) (2)
      characterData = fetchCharacterData().await() (4)
      displayData() // (3)
    }
  }
}
  1. launch 함수는 코루팀을 생성하며, launch 함수는 블록안에 지정한 람다(코루틴 코드)를 시작시킨다.

  2. launch 함수의 파라미터에는 해당 작업이 실행되는 스레드를 나타낸다. Dispatcher.Main 은 안드로이드의 UI 스레드이다.

  3. 이 코드를 안드로이드로 예를 들었을 때, displayData() 함수는 UI를 변경시키는 작업이므로, UI 스레드를 지정시켰다.

  4. 코루틴 컨텍스트의 기본 인자는 CommonPool 이다. 이것은 코루틴이 실행될 때 사용될 수 있는 백그라운드 스레드 풀이다.
    따라서 await 를 호출할 때 해당 작업은 CommonPool의 스레드 중 하나를 사용한다.

launch vs async/await

  • async, launch 함수를 coroutine builder function 이라고 한다.

    • 이 함수들은 특정 방법으로 작업을 수행하도록 코루틴을 설정한다.

  • launch 는 우리가 지정한 작업을 올바르게 수행하는 코루틴을 빌드한다.

  • async 는 지연된(아직 완료되지 않은) 작업을 나타내는 Deferred 를 반환하는 코루팀을 빌드한다.

    • 즉, 해당 작업이 바로 시작되어 끝나는 것이 아니다.

  • Deferred 타입은 await 함수를 제공한다.

    • await 함수는 우리가 원하는 작업 수행 시점에 호출한다.

    • await 함수는 지연된 작업이 완료될 때까지 다음에 할 작업을 보류한다.

  • Deferred 는 Java의 Future 와 유사한 방법으로 동작한다.

yield

reactive vs coroutine

  • 리액티브 프로그래밍은 현재의 프로그래밍 패러다임으로, 변화의 전파에 대해 말한다.

    • 즉, 일련의 상태를 월드로 표현하는 대신 리팩티브 프로그래밍 모델 행동으로 표현한다.

  • 리액티브 프로그래밍은 데이터 스트림과 변화의 전파를 중심으로 하는 비동기 프로그래밍 패러다임이다.

    • 간단히 말하자면 데이터/데이터 스트림에 영향을 주는 모든 변경점을 관련된 당사자에게 전파하는 프로그램을 리액티브 프로그램으로 부른다.

  • 리액티브 매니페스토Reactive Manifesto(https://www.reactivemanifesto.org/)는 다음과 같은 네가지 리액티브 원리를 정의하느 문서다.

    1. 반응Responsive

    2. 복원Resilient

    3. 탄력Elastic

    4. 메시지 중심Message-driven

Fiber

WebClient with Coroutines

suspending extension 함수인 awaitBody() 를 활용할 수 있다.

val htmlResponse = webClient.get()
    .uri("https://www.baeldung.com/")
    .retrieve()
    .awaitBody<String>()

retrieve() 함수는 API 요청의 응답 코드가 2xx일 경우에만 반환하고, 나머지는 예외를 던진다. 다양한 응답 코드에 대한 핸들링이 필요하다면 awaitExchange() 확장 함수를 활용할 수 있다.

val response: ResponseEntity<String> = webClient.get()
    .uri("https://www.baeldung.com/")
    .awaitExchange()
    .awaitEntity()

위와 같은 코드에서는 API 응답이 ResponseEntity 로 반환되므로, 상태 코드에 따른 처리가 가능해진다.

@GetMapping("/payments/{id}/")
suspend fun fundPayment(@PathVariable id: String): PaymentView {
    val

    return PaymentView()
}

Flow

List > Collection > Iterable
fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
  simple().forEach { println(it) }
}
Sequence
fun simple(): Sequence<Int> = sequence {
  for (i in 1..3) {
    Thread.sleep(100)
    yield(i)
  }
}

fun main() {
  simple().forEach { println(it) }
}
  • 첫번째 코드와 동일하지만 각 숫자를 출력할 때마다 100ms를 기다림

  • (위 코드 기준으로) 메인 스레드를 스탑함

suspend funtion
suspend fun simple(): List<Int> {
  delay(100)
  return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
  simple().forEach { println(it) }
}
Flow
suspend fun simple(): Flow<Int> = flow{
  for (i in 1..3) {
    delay(100)
    emit(i)
  }
}

fun main() = runBlocking<Unit> {
  simple().collect { println(it) }
}
  • List<Int> 를 반환하면 한번에 모든 값을 반환함

  • 동기식으로 계산된 값에 Sequence<Int> 를 사용하것처럼 비동기식으로 계산되는 값의 스트림을 나타내기 위해 Flow<Int> 를 사용할 수 있음

  • Flow는 cold stream.

    • collect가 호출될때까지 실행되지 않음

  • multiple flow로 동작하도록 하지 않는이상 sequential하게 동작함

cold stream vs. hot stream

Coroutine Context

  • 코루틴은 항상 컨텍스트에서 실행된다.

  • asyncsk launch같은 코루틴 빌더는 기본적으로 DefaultDispatcher 디스페처를 사용한다(현재 코루틴 버전 0.2.1에서는 DefaultDispatcher와 CommonPool은 동일하다.)

  • 코루틴 컨텍스트는 값을 보유할 수도 있다.

Channel

  • 두 코루틴이 통신할 수 있는 방법

    • Deferred<T>: 값 하나는 가능하다 시쿼스나 스트림을 불가능

      fun main(args: Array<String>) = runBlocking {
        val result = CompletableDeferred<String>()
      
        val world = launch {
          delay(500)
          result.complete("World")
        }
      
        val hello = launch {
          println("Hello ${result.await()}")
        }
      
        hello.join()
        world.join()
      }
    • Channel

      fun main(args: Array<String>) = runBlocking {
        val channel = Channel<String>()
      
        val world = launch {
          delay(500)
          channel.send("World")
        }
      
        val hello = launch {
          println("Hello ${result.receive()}")
        }
      
        hello.join()
        world.join()
      }
      • 당연히 이러한 코드는 채널의 의도된 용도가 아니다.

  • 일반적으로 단일 혹은 여러 코루틴은 채널로 메시지를 보낸다.

    fun main(args: Array<String>) = funBlocking<Unit> {
      val channel = Channel<Char>()
    
      val sender = launch {
        repeat(1000) {
          delay(10)
          channel.send('.')
          delay(10)
          channel.send(',')
        }
        channel.close()
      }
    
      for (msg in channel) { // (1)
        println(mst)
      }
    
      sender.join()
    }
    1. 채널 자체는 반복자이므로 for 블록에서 사용할 수 있다.

  • 위 코드는 produce 빌드를 사용해서 더 간단하게 작성할 수 있다.

    fun dotsAndCommas(size: Int) = produce { // (1)
      repeat(size) {
        delay(10)
        channel.send('.')
        delay(10)
        channel.send(',')
      }
    }
    
    fun main(args: Array<String>) = funBlocking<Unit> {
      val channel = dotsAndCommas(1000)
    
      for (msg in channel) { // (1)
        println(mst)
      }
    }
    1. produce 빌더는 수신만을 위한 채널 타입 ReceiveChannel<T>를 반환한다.

  • 채널 파이프라인

    • 채널이 있을 때 파이프라인과 같은 관련된 패턴을 가질 수 있다.

    • 파이프라인은 유닉스 파이프나 엔터프라이즈 인티그레이션 패턴EIP, Enterprise Integration Pattern 같이 소비자와 생산자를 연결하는 일련의 채널이다.

Actor

  • 비동기 코드를 다를 때의 주요 관심사는 변경 가능한 상태를 처리하는 방법

  • 액터는 메시지를 통해 외부 월드 및 다른 액터와 상호작용하는 일종의 오브젝트다.

  • 액터 오브젝트는 메시지를 통해 외부적으로 수정 및 접근할 수 있지만, 직접 할 수는 없는 private 내부 변경 가능한 상태를 가진다.

ETC

  • Iterator

  • Mono, Flux, Flow, Sequence

  • Sequence는 코루틴 빌더가 아닌 kotlin 에서 제공하는 빌더

  • suspend

References