Code Execute Concurrently And Delay()

@OptIn(FlowPreview::class)
@Test
fun someTest() {
    val t1 = System.currentTimeMillis()
    runTest {
        val range = (1..10).asFlow()
        val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
        println(concat)
    }
    val t2 = System.currentTimeMillis()
    println("Elapsed time ${t2 -t1}")
}

private suspend fun triplicate(i: Int): Flow<Int> {
    return coroutineScope {
        val x = async {
            val t1 = System.currentTimeMillis()
            println("pre sleep ${Thread.currentThread().name}")
            delay(10000L)
            val t2 = System.currentTimeMillis()
            println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
            listOf(i, i, i).asFlow()
        }
        x.await()
    }
}

Because this is exactly what runTest() does: it creates a very special coroutine environment where the time flow is simulated. It keeps proper operation ordering, but everything happens immediately. We don't want unit test to run for seconds or even hours just waiting.

If you prefer to wait as normal, use runBlocking() instead of runTest().

There are three reasons for not running concurrently.

runTest() by default uses a single thread. If you need to use more threads then provide a coroutine dispatcher, for example: runTest(Dispatchers.Default) { ... }.

You should almost never block the thread when running inside coroutines. It makes coroutines unresponsive, as you can see in your example. Even if you use Dispatchers.Default then assuming you have 4 CPU cores, it will still take 30s to finish, because you can only execute 4 sleeps at a time.

triplicate() function is really messed up. By using async() and then await() immediately you still execute the code pretty much synchronously. triplicate() returns after waiting for 10s and because flatMapMerge()"calls transform sequentially", you really execute one triplicate() at a time.

I guess what you intended (?) is to return a flow immediately and emit items after 10s. Then flatMapMerge() can acquire multiple such flows and collect them concurrently:

private fun triplicate(i: Int): Flow<Int> = flow {
    val t1 = System.currentTimeMillis()
    println("pre sleep ${Thread.currentThread().name}")
    delay(10000L)
    val t2 = System.currentTimeMillis()
    println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
    emit(i)
    emit(i)
    emit(i)
}

Related Articles

js interview questions