こんにちは!
UPSIDERのWebチームでサーバサイドKotlinを書いているエンジニアのおかだです。
突然ですが、コルーチンの並行処理数、制限したくなったことありませんか?
ぼくはよくあります。
数千数万という数のコルーチンを同時に起動することもできる、と軽量さが強調されることもありますが、それはメモリ消費が少ないとかそういう話であって、コルーチンによって呼び出される側がその同時大量アクセスをさばけるかは別問題です。たとえば次のような制約が問題になることが時々あります。
特に、DBコネクションの取り扱いについてはKotlinコルーチンならではの落とし穴があったりするので、また別の記事で取り上げたいと思います。
追記:書きました → KotlinコルーチンでDBアクセスするときのアンチパターン - UPSIDER Techblog
まずはシンプルに実装してみる
さて、コルーチン数制限、雑でよければいろいろ書きようはありますが、結構あちこちで必要になってくるのでライブラリ化しておきたいところ。とりあえず、なるべくシンプルに作ってみましょう。
class LimitedCoroutineScope( private val underlying: CoroutineScope, limit: Int ) : CoroutineScope by underlying { private val semaphore: Semaphore = Semaphore(limit) private fun <T> withPermit(block: suspend CoroutineScope.() -> T): suspend CoroutineScope.() -> T = { semaphore.withPermit { block() } } fun launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job = underlying.launch(context, start, withPermit(block)) fun <T> async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> = underlying.async(context, start, withPermit(block)) } suspend fun <T> limitedCoroutineScope( limit: Int, context: CoroutineContext = EmptyCoroutineContext, block: suspend LimitedCoroutineScope.() -> T ): T = withContext(context) { LimitedCoroutineScope(this, limit).block() }
使い方はこう。こちらもシンプルです。
fun doSomething() = limitedCoroutineScope(20, context = Dispatchers.IO) { val users = userRepository.getAll().map { user -> async { findExtraInfoOf(user) } }.awaitAll() // ... }
説明の必要はあまりないと思いますが、一点だけ。3行目の async
は、 CoroutineScope.async
ではなく、 LimitedCoroutineScope
クラスの async
メソッドです。これは、ある型に対して同じシグネチャの拡張関数とメンバ関数が定義されている場合、メンバ関数が優先される、というKotlinの仕様に依拠しています。ちょっと紛らわしいかもしれない。
CoroutineContextを使った実用的な実装
実際のプロジェクトでコルーチンまわりを共通化しようとするとき、上記のように新しい型を作ってそこに機能をもたせるやり方だと拡張が難しくなります。 CoroutineScope
ってネストさせることもよくあり、たとえば次のように書くだけで、コルーチン数制限はできなくなります。
fun doSomething() = limitedCoroutineScope(20, context = Dispatchers.IO) { withTimeout(60.seconds) { // thisはLimitedCoroutineScopeではないので、コルーチン数制限はできない } }
そもそも解決の仕方がKotlinコルーチンの設計思想とマッチしてないんでしょうね、たぶん。同時処理数をセマフォで制限する、という点は問題なさそうなので、そこは踏襲しつつ、もっとKotlinコルーチンらしい設計に変えてみましょう。こういうときに使えそうなのが CoroutineContext
です。
たとえば、UPSIDERでも使っているORMのExposed。コルーチンがサスペンドしてスレッドを失ってもDBトランザクションを保つためには、スレッドローカルではなくコルーチン側にトランザクションを紐付けておく必要があります。Exposedは CoroutineContext
内にトランザクションを保持することでそれを実現しています。トランザクションの途中で新しい CoroutineScope
を起ち上げた場合も、 CoroutineContext
なら親から子へ引き継がれるので、その処理は同じトランザクションの一部となります。
まず、セマフォをラップして、 CoroutineContext
の要素( CoroutineContext.Element
型)として扱えるようにします。要素と言いましたが、 CoroutineContext
は構成要素ひとつひとつが CoroutineContext
でもあります。Compositeパターンですね。
internal data class CoroutineLimit( val limit: Int ) : AbstractCoroutineContextElement(CoroutineLimit) { internal val semaphore: Semaphore by lazy { Semaphore(limit) } companion object Key : CoroutineContext.Key<CoroutineLimit> override fun toString(): String = "CoroutineLimit($limit)" }
続いて、 CoroutineLimit
を CoroutineContext
に付け足してくれる CoroutineScope
ビルダを作ります。 limit
が0以下のときはコルーチン数制限はされない、ということにしておきます。
suspend fun <T> limitedCoroutineScope( limit: Int, context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T ): T { val newContext = coroutineContext + context + (if (limit > 0) CoroutineLimit(limit) else EmptyCoroutineContext) return withContext(newContext, block) }
最後に、 CoroutineLimit
が見つかったらコルーチン数を制限するという機能付きのコルーチンビルダを、 CoroutineScope
の拡張関数として用意します。最初の例と違ってわかりやすい名前にしてみました。
fun CoroutineScope.launchWithLimitedConcurrency( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = coroutineContext + context return this.launch(context, start, newContext.createLimitedBlock(block)) } fun <T> CoroutineScope.asyncWithLimitedConcurrency( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> { val newContext = coroutineContext + context return this.async(context, start, newContext.createLimitedBlock(block)) } private fun <T> CoroutineContext.createLimitedBlock(block: suspend CoroutineScope.() -> T): suspend CoroutineScope.() -> T = this[CoroutineLimit]?.let { { it.semaphore.withPermit { this.block() } } } ?: block
使い方は、 launch
と async
の関数名が異なるだけで、基本的に最初の例と同じです。この実装なら、 CoroutineScope
を入れ子にしても正しく動いてくれます。
fun doSomething() = limitedCoroutineScope(20, context = Dispatchers.IO) { val users = userRepository.getAll().map { user -> asyncWithLimitedConcurrency { findExtraInfoOf(user) } }.awaitAll() // ... } fun doSomething() = limitedCoroutineScope(20, context = Dispatchers.IO) { withTimeout(60.seconds) { repeat(100) { // 内側のCoroutineScopeにCoroutineLimitが引き継がれているので、 // 並行処理数は期待通り20個までに制限される launchWithLimitedConcurrency { /* do something */ } } } }
CoroutineLimit
を共有しさえすれば、スコープが入れ子になっていなくても同じコルーチン数制限が適用されてしまう点は、役に立つ場合もあるかもしれない反面、自由度が高すぎてシステムの挙動が読めなくなる可能性がありそうです。 CoroutineLimit
を internal
としているのはそのためです。
宣伝
UPSIDERでは、成長企業のための法人カード「UPSIDER」と、すべてのBtoB取引でクレジットカードを利用できるビジネスあと払いサービス「支払い.com」を提供しています。
ぼくが所属するWebチームはもちろん、UPSIDERの開発組織のいたるところでエンジニアが足りません。いろんな新しい技術にもこれから挑戦していきます。
カジュアル面談もやっておりますので、少しでもご興味のある方は、ぜひご連絡ください!