[Coroutine] 데이터 통신을 위한 Channel에 대해 알아보자

최근 MVI를 공부하면서 여러 예제 프로젝트를 보면서

공부하던 중 Channel을 이용하여 상태값을 주고 받는 것을 알게 되었습니다. 

이참에 Channel에 대해서도 알아보고자 합니다!


1. Channel 이란

Channel은 두개의 Coroutine 사이를 연결한 파이프이며, 단방향보다는 여러 방향에서 데이터를 주고 받는 형식으로 코루틴끼리 데이터를 전달하기 위한 것입니다.

Channel은 BlockingQueue와 유사하지만, 약간의 차이점이 존재합니다.

BlockingQueue은 put(), take()를 사용하여 차단해서 전송하는 방법이고 Channel은 send(), receive()를 사용하여 일시중단(suspend)으로 전송하는 방식입니다. 또한, Channel은 더 이상 사용하지 않을 때 채널을 close로 닫음으로서 더 이상 오지 않음을 표현할 수 있습니다.

 

2. 코드 예제

Channel은 송신측에서 채널에 send로 데이터를 전달하고 수신 측에서 채널을 통해 receive 받습니다.

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..10) {
            channel.send(x)
        }
    }

    repeat(10) {
            println(channel.receive())
    }
    println("완료")
}

Channel<>()를 통해 생성할 수 있으며, 데이터를 스트림에 넣을 때는 send(), 스트림에서 받을 때에는 receive를 사용하면 됩니다. send와 receive는 supsend 함수이기 때문에 코루틴 내부에서 호출해야 합니다. 또한, Channel에 더 이상 아무 데이터도 보내거나 받지 않는다면 close 메서드를 통해 채널을 종료시켜야 합니다.

 

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>()
    launch {
        for (x in 1..10) {
            channel.send(x)
        }
        channel.close()
    }

    for (x in channel) {
        println(x)
    }
    println("완료")
}

위 예제처럼, 채널에서 더 이상 보낼 자료가 없으면 close 메서드를 이용해 채널을 닫을 수 있습니다. 채널은 for in 을 이용해서 반복적으로 receive할 수 있고 close 되면 for in은 자동으로 종료됩니다. 수신자의 for루프에서 특별한 '닫기 토큰'을 수신하면 반복이 중지되므로 이전의 모든 전송요소들이 보장됩니다.

 

 

🧐 만약 같은 코루틴에서 채널을 읽고 쓰면 어떻게 될까요?

- send나 receive가 suspension point이고 서로에게 의존적이기 때문에 같은 코루틴에서 사용하는 것은 위험할 수 있으며 코드가 무한으로 실행될 수 있습니다.

 

 

3. Channel Producer

채널은 일반적으로 생산하는 쪽과 소비하는 쪽을 구현하는 producer-consumer 패턴입니다. 이를 구현하기 위해  한 쪽에서 데이터를 만들고 다른 쪽에서 받는 것을 도와주는 확장 함수들이 존재합니다. 

생산하는 형태를 쉽게 구현하도록 제공하는 coroutine builder produce가 있습니다.

소비하는 쪽에서 사용하는 extension function으로 cosumeEach가 있습니다.(for-loop로 대체 가능가 가능합니다.)

fun main() = runBlocking<Unit> {
    val oneToTen = produce {
        for (x in 1..10) {
            channel.send(x)
        }
    }

    oneToTen.consumeEach {
        println(it)
    }
    println("완료")
}

 

produce 내부 구조를 살펴봅시다.

@ExperimentalCoroutinesApi
fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0, 
    block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>

produce를 사용하면 ProducerScope를 상속받은 ProducerCoroutine을 얻게 됩니다. 여기서, ProducerScope는 CoroutineScope와 SendChannel 인터페이스를 상속받아 코루틴 컨텍스트와 몇몇 채널 인터페이스를 사용할 수 있는 스코프입니다.

 

 

4. 데이터를 받는 방식

채널의 데이터를 가져오는 방법은 receive 이외에도 여러 가지 방법이 존재합니다.

1. receive를 통해서 하나의 값 받기

2. for-loop, consumeEach를 통해서 값 전체 하나씩 받기

3. consumeAsFlow를 이용하여 데이터를 Flow 형식으로 받기 

 - 해당 메서드는 소비를 cold stream으로 할뿐, 데이터 배출 자체는 hot stream이기 떄문에 사용을 기다리지 않고 한 번에 배출합니다. 

   즉, 한번 밖에 배출이 안되니, 실제 Flow처럼 여러 곳에서 동일한 데이터를 받는 것은 불가능합니다.

channel.consumeAsFlow().collect{
    println(it)
}

그렇다면 consumeEach와 달리 consumeAsFlow는 언제 사용하는 것이 좋을까요?

 

데이터를 가공하지 않고 그대로 사용한다면, consumeAsFlow를 굳이 사용할 필요가 없습니다. 만약, 데이터를 조작하기 위해

find, map, filter와 같은 메서드를 사용해야 한다면 consumeAsFlow를 사용해주면 됩니다.

 

 

마무리

최근 MVI 패턴을 공부하면서 다양한 예제를 찾아보니 MVI+coroutine을 사용한 예제가 대다수였고 코루틴을 더 열심히 공부해야겠다고 느끼게 되었습니다~!

 

 

출처

https://tourspace.tistory.com/156

https://kotlinlang.org/docs/channels.html