基本的に以下からそのまま取ってきています。
説明用のざっくりポインタとしてまとめる予定だったのに、あれもこれもと欲張ってしまった代物。(その割に全部あるわけではない。)
playからfinagleに移行してきたりするとメソッド名が細かく違ったりするのに最初は戸惑いますが、Scala標準Futureとakkaのschedulerを使ったことがあれば、すぐ慣れつつ便利さを感じられると思います。
例ではThread.sleepを呼びまくっていますが、そのへんの事情はScala標準のFutureと同じなのでちゃんとやるときはFuture.sleepやtimer.doLaterなどを使ったほうが良いです。
目次
Futureの作成
基本的な作り方について。
この作り方だとapplyも含めて全部同期実行になる点に注意。
import com.twitter.util.Future
Future.value(1)
Future.exception(new RuntimeException)
apply/const
Scala標準Futureではapplyは非同期に実行されるが、TwitterFutureでのapplyは同期実行という違いに注意。
import com.twitter.util.Future
Future(1)
Future(throw new RuntimeException)
Future.const(Try(1))
Future.const(Try(throw new RuntimeException))
Futureをまとめる
Scala標準のFutureと同じくfor式を使うだけだと並行実行されないことがあるので注意。
map/flatMap
略
join
ScalaFutureのzipと同じ http://qiita.com/mtoyoshi/items/f68beb17710c3819697f#zip
コード例は略。
失敗のハンドリング
handle/rescue
import com.twitter.util.{Future, Return, Throw}
val successFuture = Future.value(1)
val failedFuture = Future.exception[Int](new RuntimeException)
successFuture.handle {
case e: RuntimeException => 0
}
failedFuture.handle {
case e: RuntimeException => 0
}
successFuture.rescue {
case e: RuntimeException => Future.value(0)
}
failedFuture.rescue {
case e: RuntimeException => Future.value(0)
}
import com.twitter.util.{Future, Return, Throw}
val successFuture = Future.value(1)
val failedFuture = Future.exception[Int](new RuntimeException)
successFuture.transform {
case Return(a) if a == 1 => Future.exception(new RuntimeException)
case Return(a) => Future.value(a)
case Throw(e: RuntimeException) => Future.value(0)
case Throw(e) => Future.exception(e)
}
failedFuture.transform {
case Return(a) => Future.value(a)
case Throw(e: RuntimeException) => Future.value(0)
case Throw(e) => Future.exception(e)
}
FuturePool
非同期実行したい場合はFuturePoolの力が必要。
FuturePool.unboundedPool
スレッドプール内部のExecutorServiceはglobalのもの(https://github.com/twitter/util/blob/util-6.45.0/util-core/src/main/scala/com/twitter/util/FuturePool.scala#L70-L72) が利用される。
import com.twitter.util.FuturePool
val unboundedPool = FuturePool.unboundedPool
unboundedPool(1)
unboundedPool(throw new RuntimeException)
FuturePool(dbExecutorService).apply
import java.util.concurrent.ForkJoinPool
import com.twitter.util.FuturePool
val dbExecutorService = new ForkJoinPool(50)
val myPool = FuturePool(dbExecutorService)
myPool { Thread.sleep(1); 1 }
myPool(throw new RuntimeException)
Timer系列
Timerの種類を例ごとに変えてあるが使い方はどれも同じ。Timerの種類については後述。
sleep
import com.twitter.util.{Future, Await}
import com.twitter.conversions.time._
implicit val timer = com.twitter.finagle.util.DefaultTimer
val f = Future.sleep(3.seconds)
Await.result(f)
delayed
完了が遅れるだけで計算自体はすぐ行われる点に注意。
import com.twitter.util.{Future, Await}
import com.twitter.conversions.time._
implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)
val f = Future {
println("in future")
1
}.delayed(3.seconds).foreach(_ => println("done"))
Await.result(f)
Timer#schedule
Futureのメソッドではないがついでなので紹介。
import com.twitter.util.{Await, Future, Time}
import com.twitter.conversions.time._
implicit val timer = new com.twitter.util.ScheduledThreadPoolTimer(poolSize = 5, makeDaemons = true)
val timerTask = timer.schedule(1.seconds) {
println("1sec!")
}
Thread.sleep(3000)
Await.result(timerTask.close())
val f1 = timer.doLater(2.seconds) {
println("2sec!")
}
Await.result(f1)
val f2 = timer.doAt(Time.Now + 3.seconds) {
println("3sec!")
}
Await.result(f2)
timerの話
tl;dr;
- finagleを使っているならcom.twitter.finagle.util.DefaultTimerを使えばOKだが、blockingな重い処理をするなら自前で定義したほうがよいかも。
- finagle-netty4が依存パスにあればNetty4HashedWheelTimerが使われる。
- 無くてもJavaTimer(isDaemon=true)が使われるので安心。
- finagleは使って無くてtwitter/utilだけ使っている場合はJavaTimerかScheduledThreadPoolTimerが選択肢
- 単に
new JavaTimer
とするとユーザースレッドとして起動するため、明示的にcancelを呼ばないとtimerがGCされるまでプロセスが終了しなくなることがあるのでisDaemon=trueを指定するとよい。
細かい話
twitter/utilのこの手のメソッドはcom.twitter.util.Timer
を要求してくる。
スケジューリングを無視して即時実行するNullTimerやテスト用のMockTimerがあるが、
finagleを使っていない場合、基本的にはJavaTimer(isDaemon=true)を、ある程度の性能が欲しい場合はScheduledThreadPoolTimerを使えば良さそう。
finagleを使っている場合は、もう少し性能が出るタイマーがDefaultTimerとしてfinagle自体に用意されているのでそちらを使う方が良さそう。
ただ、DefaultTimerのインスタンスは共通なのでblockingな処理を行う際は、そのスレッドがブロックされる可能性があるので分けたほうが良いかもしれない。(未検証)
com.twitter.finagle.util.DefaultTimer
の実装はServiceLoaderで一番最初に見つかったクラスを使う。
ServiceLoaderで見つからなかった場合はwarningログが出つつcom.twitter.util.JavaTimer
をdaemonThreadをONにした上で使うようになっている。
https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-core/src/main/scala/com/twitter/finagle/util/DefaultTimer.scala#L31-L36
finagle-netty4(finagle-{thrift, http}の依存にある)がNetty4HashedWheelTimerを指定しているので大抵の場合はこれが読み込まれる気がする。
https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-netty4/src/main/resources/META-INF/services/com.twitter.finagle.util.DefaultTimer
もう一つcom.twitter.finagle.util.HashedWheelTimer.Default
というややこしいものが存在するが、こちらはNetty3ベースのHashedWheelTimerを使っている。
Netty3,4間のHashedWheelTimerの差はよくわかっていないが新しい方が良さそうなので基本的にはcom.twitter.finagle.util.DefaultTimerを使うのが良いだろう。
Timerのロードにサービスローダーを使うようになったのはfinagle6.45から。経緯とかは
https://github.com/twitter/finagle/commit/d047b4568e07a56b481b5f7c193b0e8c5ec6ff71 のコミットに書いてある通り、finagle-coreからnetty3依存を剥がすためにそうなっているらしい。
複数の処理
select/or
import com.twitter.util.{Future, Try}
val fs = (1 to 5).map(Future(_))
val (firstDoneTry, others): (Try[Int], Seq[Future[Int]]) = Await.result(Future.select(fs))
println(firstDoneTry)
println(others)
Future(1).or(Future(2))
traverseSequentially/collect/collectToTry
traverseSequentiallyは前のFutureが終了してから次を実行し、collectは同時に実行する。
実装を見るとcollectでも結果に含まれる要素の順番は引数と同じになるっぽい。順序についてはドキュメントやコメントにはのってない気がする。
collectToTryはcollectと異なり一部が失敗したときも成功し、一連の結果がTryで取得できる。
import com.twitter.util.{Await, Future}
val f = Future.traverseSequentially(1 to 5) { i =>
Future(i)
}
val f = Future.collect((1 to 5).map { i =>
Future {
val wait = scala.util.Random.nextInt(300)
println(s"$i => $wait ms")
Thread.sleep(wait)
i
}
})
println(Await.result(f))
val f2 = Future.collectToTry((1 to 5).map { i =>
Future {
if(i % 2 == 0) throw new RuntimeException else i
}
})
println(Await.result(f2))
batched
大量の非同期処理をいくつかのグループに分けて実行する君
同時実行数の制限という意味だと com.twitter.concurrent.AsyncSemaphore
を使う手もある。 Futures — Util 21.5.0 documentation
batchedは複数リクエストをまとめて送るのが前提になっている(Seq[In]=> Seq[Out]で処理を記述する)などの違いがある。
import com.twitter.conversions.time._
import com.twitter.util.{Await, Future}
implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)
val batcher = Future.batched[Int, String](
sizeThreshold = 10,
timeThreshold = 500.milliseconds,
sizePercentile = 0.3f
) { ids =>
Future.sleep(scala.util.Random.nextInt(50).milliseconds).map { _ =>
println(s"${ids.mkString(", ")} are inserted")
ids.map(_.toString)
}
}
val userIds = 1 to 50
val insertedFuture = Future.collect(userIds.map(batcher))
println(Await.result(insertedFuture))
batcher.flushBatch()
便利const
import com.twitter.util.Future
Future.Unit
Future.Done
Future.True
Future.False
Future.None
Future.Nil
Future.???
Future.Void
callback
コールバックよりはmap/flatMapでつなぎたい。ensureはありかも。
onSuccess/onFailure
省略
respond/ensure
import com.twitter.util.Future
val f1 = Future.value(1)
val f2 = f1.respond {
case Return(a) => println(a)
case Throw(e) => println(e)
}
val f1 = Future.value(1)
val f2 = f1.ensure {
println("f1 finished")
}
キャンセル
raise
import com.twitter.util.{Await, Future, FuturePool}
val f1 = FuturePool.interruptibleUnboundedPool {
Thread.sleep(100)
1
}
f1.raise(new RuntimeException("interrupt"))
Await.result(f)
val f2 = FuturePool.unboundedPool {
Thread.sleep(100)
1
}.interruptible()
f2.raise(new RuntimeException("interrupt"))
Await.result(f2)
raiseWithin
N秒以内に終わらないとタイムアウトといった指定が出来る。
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.conversions.time._
implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)
val f = FuturePool.interruptibleUnboundedPool {
Thread.sleep(3000)
1
}
f.raiseWithin(2.seconds)
Await.result(f)
within/by
winthinとbyはDurationを受け取るかTimeを受け取るかの違いしか無い。
raseWithinとwithin/byには処理自体のfutureをraiseするか、withinなどの呼び出しの返り値のみをraiseするかの微妙な違いがある。詳細は以下のコメントを参照。
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.conversions.time._
implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)
val f1 = FuturePool.interruptibleUnboundedPool {
Thread.sleep(3000)
1
}
val f2 = f1.within(2.seconds)
println(Await.result(f1))
Await.result(f2)
monitored
Promise使いつつネストしていると辛くなるケースを救えるらしい。いまのところ使ったことは無い。
import java.util.concurrent.ForkJoinPool
import com.twitter.util.{Future, FuturePool, Try, Return, Throw, Promise}
import scala.util.control.NonFatal
val exception = new RuntimeException("test")
val inner1 = new Promise[Int]
val inner2 = new Promise[Int]
val notMonitored: Future[Int] = {
inner1.ensure {
throw exception
inner2.update(Return(2))
}
inner2
}
val monitored: Future[Int] = Future.monitored {
inner1.ensure {
throw exception
inner2.update(Return(2))
}
inner2
}
inner1
inner2
notMonitored.poll
monitored.poll
inner1.update(Return(1))
inner1
inner2
notMonitored.poll
monitored.poll
おまけ: Futureの再帰
ドキュメントにちゃんと実装してあるから再帰してもサンプルにあるようなコードではスタックオーバーフローにならないよと書いてある。
Futures — Util 21.5.0 documentation