[Kotlin] Flow (1) - Flow 기초

Flow 기본 사항

Posted by 동식이 블로그 on November 8, 2022

[Kotlin] Flow (1) - Flow 기초

Flow에 대해서 알아보자

Flow란?

  • 순차적으로 값을 내보내고 정상적으로 또는 예외로 완료되는 비동기적인 데이터 스트림이다

Flow Cold

  • Flow는 기본적으로 Cold Stream 이다
  • 즉, Flow { } 빌더의 코드블럭은 Flow가 collect되기 전까지 실행되지 않는다는 것을 의미한다

Rx의 Cold Observable 개념과 유사

Rx에서는 Subscribe할 때 아이템을 흘려보낸다

  • 일반적으로는 Cold Stream 이지만 SharedFlow 하위 타입은 Hot Stream을 나타낸다

Flow Builder

  • flowOf(..) 사용

    • 고정된 값 집합에서 flow를 생성한다
    1
    
      flowOf(1,2,3).collect { value -> println(value) }
    
  • asFlow() 사용

    • 다양한 타입에 대해서 asFlow() 익스텐션 함수를 사용해서 flow로 변환한다
    1
    2
    
      /* 1~3까지의 출력 */  
      (1..3).asFlow().collect { value -> println(value) }
    
  • flow { ... } 사용

    • 빌더 함수로 순차적 호출에서 emit 함수로 임의의 flow를 구성한다
    1
    2
    3
    
      flow {
          (0..10).forEach { emit(it) }
      }.collect { value -> println(value) }
    
  • channelFlow { ... } 사용

    • 빌더 함수를 통해 잠재적으로 동시 호출에서 send 함수로의 임의의 flow를 구성한다
    1
    2
    3
    
      channelFlow {
          (0..10).forEach { send(it) }
      }.collect { value -> println(value) }
    
  • MutableStateFlow / MutableSharedFlow

    • 해당 생성자 함수를 정의하여 직접 업데이트 할 수 있는 Hot flow를 생성한다
    1
    2
    3
    4
    5
    
      val stateFlow = MutableStateFlow<Int>(0)
      val sharedFlow = MutableSharedFlow<Int>()
        
      stateFlow.send(1)
      sharedFlow.send(1)
    

Flow constraints

  • Flow 인터페이스의 모든 구현은 두 가지 주요 속성을 준수해야 한다
  1. Context preservation

    • flow로 만들어진 collection은 항상 이를 호출한 coroutine context에서 수행되며 이를 Context preservation 이라고 한다
    • 자제척으로 실행하는 context를 캡슐화하고 다운스트림에서 전파하거나 누출하지 않으므로 특정 변환 또는 터미널 연산자의 실행 context에 대한 판단을 간단하게 만든다
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
     fun main() = runBlocking {
         test().collect { 
     				value -> println("Collected ${Thread.currentThread().name} $value") 
     		}
     }
        
     fun test() = flow {
         println("Start flow ${Thread.currentThread().name}")
         for (i in 1..3) { emit(i) }
     }
    
    • 위의 예처럼 collect()를 호출한 부분의 context는 main thread를 사용하고 있기 때문에 flow body 영역의 코드 역시 main thread에서 처리된다
    • flow body 부분의 context를 바꿔줄려면 flowOn 이라는 operator를 사용해야 한다
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
     fun main() = runBlocking {
         test().collect { 
     				value -> println("Collected ${Thread.currentThread().name} $value") 
     		}
     }
        
     fun test() = flow {
         println("Start flow ${Thread.currentThread().name}")
         for (i in 1..3) { emit(i) }
     }.flowOn(Dispatchers.Default) // change context
    
  2. Exception transparency

    • flow 구현에 있어서 exception 에 투명해야 한다
    • 이에 따라서 flow { ... } 빌더를 try / catch 블럭 안에서 사용해서 값을 emit하는 것은 exception에 투명하지 못한 행위이다
    • 즉, exception 에 투명하다는 말은 다운스트림에서 발생한 에러르 미리 처리하여 collector가 알 수 없게끔 되어서는 안된다는 의미이다
    • 에러가 나더라도 어떤 형태로는 collector가 알아차릴수 있어야 한다
    1
    2
    3
    4
    5
    
     flow { emitData() } 
         .map { computeOne(it) } 
         .catch {...} // emitData 및 computeOne에서 예외 포착 
         .map { computeTwo(it) } 
         .collect { process(it) } // 다음에서 예외 발생 처리 및 computeTwo
    

Reactive streams

  • Flow는 Reactive Stream과 호환이 가능하다
  • Flow.asPublisher , Publisher.asFlow 를 사용해서 kotlin-coroutines-reactive 모듈의 리엑티브 스트림과 안전하게 상호 작용할 수 있다
  • Coroutine 1.5.0버전 출시에 따라서 Observable, flux 등의 reactive stream과 coroutine의 안정적인 변환을 위한 함수들이 추가되었다

참고사이트