基本的に以下からそのまま取ってきています。
- ドキュメント Futures — Util 21.5.0 documentation
- コメント util/Future.scala at util-6.45.0 · twitter/util · GitHub
説明用のざっくりポインタとしてまとめる予定だったのに、あれもこれもと欲張ってしまった代物。(その割に全部あるわけではない。) playからfinagleに移行してきたりするとメソッド名が細かく違ったりするのに最初は戸惑いますが、Scala標準Futureとakkaのschedulerを使ったことがあれば、すぐ慣れつつ便利さを感じられると思います。
例ではThread.sleepを呼びまくっていますが、そのへんの事情はScala標準のFutureと同じなのでちゃんとやるときはFuture.sleepやtimer.doLaterなどを使ったほうが良いです。
目次
- 目次
- Futureの作成
- Futureをまとめる
- 失敗のハンドリング
- FuturePool
- Timer系列
- 複数の処理
- 便利const
- callback
- キャンセル
- monitored
- おまけ: Futureの再帰
Futureの作成
基本的な作り方について。 この作り方だとapplyも含めて全部同期実行になる点に注意。
value/exception
import com.twitter.util.Future // 基本の作り方 Future.value(1) Future.exception(new RuntimeException)
apply/const
Scala標準Futureではapplyは非同期に実行されるが、TwitterFutureでのapplyは同期実行という違いに注意。
import com.twitter.util.Future // applyを使うとTryで包むのでReturn/Throwに仕分けてくれる // Scala標準のFutureと異なり、ただTryで包むだけ == 同期実行な点に注意。 Future(1) Future(throw new RuntimeException) // twitter Tryからの変換 Future.const(Try(1)) Future.const(Try(throw new RuntimeException))
Futureをまとめる
Scala標準のFutureと同じくfor式を使うだけだと並行実行されないことがあるので注意。
map/flatMap
略
join
ScalaFutureのzipと同じ http://qiita.com/mtoyoshi/items/f68beb17710c3819697f#zip コード例は略。
失敗のハンドリング
handle/rescue
import com.twitter.util.{Future, Return, Throw} val successFuture = Future.value(1) val failedFuture = Future.exception[Int](new RuntimeException) // handle // 失敗した例外をSuccessにできる。 failedに対するmap // caseはPartialFunctionなのでcaseにマッチしない例外はそのまま例外として扱われる。 successFuture.handle { case e: RuntimeException => 0 } failedFuture.handle { case e: RuntimeException => 0 } // rescue // 失敗した例外をSuccessにしたり別の例外に変換できる。 failedに対するflatMap // caseはPartialFunctionなのでcaseにマッチしない例外はそのまま例外として扱われる。 successFuture.rescue { case e: RuntimeException => Future.value(0) } failedFuture.rescue { case e: RuntimeException => Future.value(0) }
transform
import com.twitter.util.{Future, Return, Throw} val successFuture = Future.value(1) val failedFuture = Future.exception[Int](new RuntimeException) // transform // rescueと異なり成功時の値も同時に変換できる successFuture.transform { case Return(a) if a == 1 => Future.exception(new RuntimeException) case Return(a) => Future.value(a) case Throw(e: RuntimeException) => Future.value(0) case Throw(e) => Future.exception(e) } failedFuture.transform { case Return(a) => Future.value(a) case Throw(e: RuntimeException) => Future.value(0) case Throw(e) => Future.exception(e) } // transformedByというメソッドもあるが、こちらはFutureTransformerを受け取る。 // FutureTransformerはJavaFriendlyと書いてあるので基本的にはtransformを使えば良い。 // 今回は割愛。
FuturePool
非同期実行したい場合はFuturePoolの力が必要。
FuturePool.unboundedPool
スレッドプール内部のExecutorServiceはglobalのもの(https://github.com/twitter/util/blob/util-6.45.0/util-core/src/main/scala/com/twitter/util/FuturePool.scala#L70-L72) が利用される。
import com.twitter.util.FuturePool // 非同期実行されるプール // 処理の実行方法はFuture.applyと同じくpoolのapplyに処理を渡せばOK // unboundedなので際限なく拡張される。 val unboundedPool = FuturePool.unboundedPool unboundedPool(1) unboundedPool(throw new RuntimeException)
FuturePool(dbExecutorService).apply
import java.util.concurrent.ForkJoinPool import com.twitter.util.FuturePool // 非同期実行されるプールを自分で作る。 val dbExecutorService = new ForkJoinPool(50) val myPool = FuturePool(dbExecutorService) myPool { Thread.sleep(1); 1 } myPool(throw new RuntimeException) // interruptibleUnboundedPoolというキャンセルに対応したPoolもある。
Timer系列
Timerの種類を例ごとに変えてあるが使い方はどれも同じ。Timerの種類については後述。
sleep
import com.twitter.util.{Future, Await} import com.twitter.conversions.time._ implicit val timer = com.twitter.finagle.util.DefaultTimer // 3秒後にUnitが返る val f = Future.sleep(3.seconds) Await.result(f)
delayed
完了が遅れるだけで計算自体はすぐ行われる点に注意。
import com.twitter.util.{Future, Await} import com.twitter.conversions.time._ implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true) // 3秒後にIntが返る val f = Future { println("in future") 1 }.delayed(3.seconds).foreach(_ => println("done")) Await.result(f)
Timer#schedule
Futureのメソッドではないがついでなので紹介。
import com.twitter.util.{Await, Future, Time} import com.twitter.conversions.time._ implicit val timer = new com.twitter.util.ScheduledThreadPoolTimer(poolSize = 5, makeDaemons = true) // schedule 1秒ごとに何度も実行する 。キャンセル可能なTimerTaskを返す。 val timerTask = timer.schedule(1.seconds) { println("1sec!") } Thread.sleep(3000) Await.result(timerTask.close()) // doLater 2秒後に1回実行する。Futureを返す。 val f1 = timer.doLater(2.seconds) { println("2sec!") } Await.result(f1) // doAt 具体的な時刻を指定する。Futureを返す。 val f2 = timer.doAt(Time.Now + 3.seconds) { println("3sec!") } Await.result(f2)
timerの話
tl;dr;
- finagleを使っているならcom.twitter.finagle.util.DefaultTimerを使えばOKだが、blockingな重い処理をするなら自前で定義したほうがよいかも。
- finagle-netty4が依存パスにあればNetty4HashedWheelTimerが使われる。
- 無くてもJavaTimer(isDaemon=true)が使われるので安心。
- finagleは使って無くてtwitter/utilだけ使っている場合はJavaTimerかScheduledThreadPoolTimerが選択肢
- 単に
new JavaTimer
とするとユーザースレッドとして起動するため、明示的にcancelを呼ばないとtimerがGCされるまでプロセスが終了しなくなることがあるのでisDaemon=trueを指定するとよい。
- 単に
細かい話
twitter/utilのこの手のメソッドはcom.twitter.util.Timer
を要求してくる。
スケジューリングを無視して即時実行するNullTimerやテスト用のMockTimerがあるが、 finagleを使っていない場合、基本的にはJavaTimer(isDaemon=true)を、ある程度の性能が欲しい場合はScheduledThreadPoolTimerを使えば良さそう。
finagleを使っている場合は、もう少し性能が出るタイマーがDefaultTimerとしてfinagle自体に用意されているのでそちらを使う方が良さそう。 ただ、DefaultTimerのインスタンスは共通なのでblockingな処理を行う際は、そのスレッドがブロックされる可能性があるので分けたほうが良いかもしれない。(未検証)
com.twitter.finagle.util.DefaultTimer
の実装はServiceLoaderで一番最初に見つかったクラスを使う。
ServiceLoaderで見つからなかった場合はwarningログが出つつcom.twitter.util.JavaTimer
をdaemonThreadをONにした上で使うようになっている。
https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-core/src/main/scala/com/twitter/finagle/util/DefaultTimer.scala#L31-L36
finagle-netty4(finagle-{thrift, http}の依存にある)がNetty4HashedWheelTimerを指定しているので大抵の場合はこれが読み込まれる気がする。 https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-netty4/src/main/resources/META-INF/services/com.twitter.finagle.util.DefaultTimer
もう一つcom.twitter.finagle.util.HashedWheelTimer.Default
というややこしいものが存在するが、こちらはNetty3ベースのHashedWheelTimerを使っている。
Netty3,4間のHashedWheelTimerの差はよくわかっていないが新しい方が良さそうなので基本的にはcom.twitter.finagle.util.DefaultTimerを使うのが良いだろう。
Timerのロードにサービスローダーを使うようになったのはfinagle6.45から。経緯とかは https://github.com/twitter/finagle/commit/d047b4568e07a56b481b5f7c193b0e8c5ec6ff71 のコミットに書いてある通り、finagle-coreからnetty3依存を剥がすためにそうなっているらしい。
複数の処理
select/or
import com.twitter.util.{Future, Try} // selectは一番最初に終わったFutureの値と残りのFutureを返す。 val fs = (1 to 5).map(Future(_)) val (firstDoneTry, others): (Try[Int], Seq[Future[Int]]) = Await.result(Future.select(fs)) println(firstDoneTry) // Return(1) println(others) // Seq(Future(Return(2)), Future(Return(3)), Future(Return(4)), Future(Return(5))) // orはselectの2つ版。selectと異なり、先に終わった値が含まれるFutureのみを返す。 Future(1).or(Future(2)) // selectIndexという一番最初に終わったSeq[Future[A]]のindexを返すメソッドもあるが割愛。
traverseSequentially/collect/collectToTry
traverseSequentiallyは前のFutureが終了してから次を実行し、collectは同時に実行する。
実装を見るとcollectでも結果に含まれる要素の順番は引数と同じになるっぽい。順序についてはドキュメントやコメントにはのってない気がする。
collectToTryはcollectと異なり一部が失敗したときも成功し、一連の結果がTryで取得できる。
import com.twitter.util.{Await, Future} // 順番に実行される val f = Future.traverseSequentially(1 to 5) { i => Future(i) } // 並列に実行される val f = Future.collect((1 to 5).map { i => Future { val wait = scala.util.Random.nextInt(300) println(s"$i => $wait ms") Thread.sleep(wait) i } }) println(Await.result(f)) // 順序は同じ。ArraySeq(1, 2, 3, 4, 5) // 並列に実行される。失敗も補足できる val f2 = Future.collectToTry((1 to 5).map { i => Future { if(i % 2 == 0) throw new RuntimeException else i } }) println(Await.result(f2)) // 順序は同じ。ArraySeq(Return(1), Throw(java.lang.RuntimeException), Return(3), Throw(java.lang.RuntimeException), Return(5))
batched
大量の非同期処理をいくつかのグループに分けて実行する君
同時実行数の制限という意味だと com.twitter.concurrent.AsyncSemaphore
を使う手もある。 Futures — Util 21.5.0 documentation
batchedは複数リクエストをまとめて送るのが前提になっている(Seq[In]=> Seq[Out]で処理を記述する)などの違いがある。
import com.twitter.conversions.time._ import com.twitter.util.{Await, Future} implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true) // まずbatcherを作る val batcher = Future.batched[Int, String]( sizeThreshold = 10, // 実行しきい値は10。正確にはsizeThreshold*sizePercentile個のジョブがエンキューされるまで実行を待つようになっている。 timeThreshold = 500.milliseconds, // sizeの条件を満たさなくてもenqueueからtimeThresholdを超過したらジョブが実行される sizePercentile = 0.3f // sizeThresholdと合わせて最低ジョブ実行数を決める。 // この例では固定値を入れているが名前渡しになっているのでRandom.nextFloat()とか入れるとバッチサイズを都度変化できるようになっている。 // 必要がなければ指定しないでデフォルト値(1.0f)を使えば良さそう。 ) { ids => // Seq[In]を受け取って、Seq[Out]を返す関数を書く Future.sleep(scala.util.Random.nextInt(50).milliseconds).map { _ => println(s"${ids.mkString(", ")} are inserted") ids.map(_.toString) } } val userIds = 1 to 50 // applyに渡してジョブをenqueueする。中でsynchronizedするのでcollectで呼ぶ意味はあまりない。 // バッチグループごとにsleepを入れたりする機能は無さそうなので、enqueue自体のタイミングで制御すると良さそう。 val insertedFuture = Future.collect(userIds.map(batcher)) println(Await.result(insertedFuture)) // thresholdに関係なく全リクエストの実行を開始したいときはflushBatchを呼ぶ。 batcher.flushBatch() // sizeThreshold = 1でuserIdsを10個投入した場合の出力 // 8 are inserted // 5 are inserted // 2 are inserted // 6 are inserted // 3 are inserted // 4 are inserted // 1 are inserted // 10 are inserted // 9 are inserted // 7 are inserted // ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // sizeThreshold = 100でuserIdsを10個投入した場合の出力 // timeThreshold秒経過してから以下が出力される。 // 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 are inserted // ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // sizeThreshold = 20でuserIdsを10個投入した場合の出力(sizePercentile=0.3fなので20*0.3=6個ずつ実行される。 // ただし7~10個目は数が足りないのでtimeThreshold秒経過してから出力される。 // 1, 2, 3, 4, 5, 6 are inserted // 7, 8, 9, 10 are inserted // ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
便利const
import com.twitter.util.Future // 以下の2つは同じ Future.Unit Future.Done // 便利定数 Future.True Future.False Future.None Future.Nil Future.??? // Java用なので使わなくて良い Future.Void
callback
コールバックよりはmap/flatMapでつなぎたい。ensureはありかも。
onSuccess/onFailure
省略
respond/ensure
import com.twitter.util.Future // respond // 完了した際のコールバックを設定する。 // 主にライブラリなどの汎用コード向けとコメントにある。 // respondは結果表示やリソースの後始末などの副作用を起こす前提となる。(promiseを解決したりしているコードもちょくちょく見かける。) val f1 = Future.value(1) val f2 = f1.respond { case Return(a) => println(a) case Throw(e) => println(e) } // ensure // respondとほぼ同じだが、引数として計算の結果を受け取らない。 // 成功しても失敗してもいいから単にリソースをcloseしたい場合などに使える。 val f1 = Future.value(1) val f2 = f1.ensure { println("f1 finished") }
キャンセル
raise
import com.twitter.util.{Await, Future, FuturePool} // Future.valueは即時評価なのでraiseできない。 // またinterruptibleUnboundedPoolを使っても、state=Doneになるとraiseを呼んでも正常系の値が返ってくるのでsleepでごまかしている val f1 = FuturePool.interruptibleUnboundedPool { Thread.sleep(100) 1 } f1.raise(new RuntimeException("interrupt")) Await.result(f) // java.util.concurrent.CancellationException // FuturePool.unboundedPoolを使う場合はFuture#interruptibleを使うとinterruptできるようになる。逆にinterruptibleを呼ばないとcancel出来ない。 val f2 = FuturePool.unboundedPool { Thread.sleep(100) 1 }.interruptible() f2.raise(new RuntimeException("interrupt")) Await.result(f2)
raiseWithin
N秒以内に終わらないとタイムアウトといった指定が出来る。
import com.twitter.util.{Await, Future, FuturePool} import com.twitter.conversions.time._ implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true) val f = FuturePool.interruptibleUnboundedPool { Thread.sleep(3000) 1 } // 2秒後にraiseされる f.raiseWithin(2.seconds) Await.result(f)
within/by
winthinとbyはDurationを受け取るかTimeを受け取るかの違いしか無い。
raseWithinとwithin/byには処理自体のfutureをraiseするか、withinなどの呼び出しの返り値のみをraiseするかの微妙な違いがある。詳細は以下のコメントを参照。
import com.twitter.util.{Await, Future, FuturePool} import com.twitter.conversions.time._ implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true) val f1 = FuturePool.interruptibleUnboundedPool { Thread.sleep(3000) 1 } // 2秒後にf2がraiseされるが、raseWithinではf1,f2の両方raiseされるのに対し、within/byはf2のみがraiseされるためf1自体の結果は普通に取得することが出来る。 val f2 = f1.within(2.seconds) println(Await.result(f1)) // 1が表示される Await.result(f2)
monitored
Promise使いつつネストしていると辛くなるケースを救えるらしい。いまのところ使ったことは無い。
import java.util.concurrent.ForkJoinPool import com.twitter.util.{Future, FuturePool, Try, Return, Throw, Promise} import scala.util.control.NonFatal val exception = new RuntimeException("test") // 以下のようなケースを考えると、notMonitoredは決して終了しない。 val inner1 = new Promise[Int] val inner2 = new Promise[Int] val notMonitored: Future[Int] = { inner1.ensure { throw exception inner2.update(Return(2)) } inner2 } // このようなケースを防ぐために内部で起きた例外を伝搬してくれるのがFuture.monitored val monitored: Future[Int] = Future.monitored { inner1.ensure { throw exception inner2.update(Return(2)) } inner2 } // before inner1 // state=Waiting inner2 // state=Waiting notMonitored.poll // None monitored.poll // None inner1.update(Return(1)) // after inner1 // state=Done inner2 // state=Interuppted notMonitored.poll // None (永久に終わらない) monitored.poll // Some (例外が伝搬されるので終わる)
おまけ: Futureの再帰
ドキュメントにちゃんと実装してあるから再帰してもサンプルにあるようなコードではスタックオーバーフローにならないよと書いてある。