ExecutionContextとblockingについて調べたメモ [scala]

この記事の結論

  • globalExecutionContextではブロックする処理をblockingで包むとスレッド数が勝手に増えるから空きスレッドが無くて実行できないといったことを防げる。

  • ExecutionContext.fromExecutorService(new ForkJoinPool(100)) で生成されるThreadBlockContextトレイトを継承してないのでblockingを使ってもスレッド数を増やした方が良いという情報がスケジューラに伝わらない。

  • akkaのdispatcherExecutionContextとして使うとBlockContext付きのForkJoinPoolを簡単に作れる。

  • ExecutionContext.fromExecutorServiceでもForkJoinPoolのコンストラクタに自前定義したThreadFactoryを渡すようにすればblockContextを渡せる。scala 2.11.xの実装はあまり重大でない(?)バグがあるから2.12.xからコピペしても良いかもしれない。

追記:この記事の続編として blockingとOOM [scala] - だいたいよくわからないブログ を書きました。blockingがあることによって困ることもあるので用法用量を守って使うことが求められます。 追記の追記: 続編に書いたblockingでOOMになる問題はscala 2.12で解決済みのようなのでそこまで気にしなくて良さそうです。

blockingとは・・・

def blocking[T](body: ⇒ T): T

Scala Standard Library API (Scaladoc) 2.10.0

Used to designate a piece of code which potentially blocks, allowing the current BlockContext to adjust the runtime’s behavior.
Properly marking blocking code may improve performance or avoid deadlocks.

ランタイムビヘイビアーが調節されますって言われても困る(´;ω;`)

ということで blocking っていうメソッドが謎に包まれていたのでちょっと調べました。 解説というより自分用メモなのあえてまとめて無くてリファレンスとして持ってこれるように冗長になっています。なのでこの記事を副読本がてらコードを実際に追ったほうが速いと思います(◞‸◟) そして、調べ初めなので間違っている可能性もあります。(特にスケジューラ周りがあやふや)

ForkJoinPool, ManagedBlocker#onBlock, BlockContext については下記のエントリが詳しいです。 ForkJoinPoolとblocking - テストステ論

以下は引用です。

  • BlockContextというのがどこかに存在し, ブロックしたい場合はそこに通知がいく
  • 通知はヒントであり, BlockContextが必要と判断すればスレッドを追加する
  • blockingは, BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)である.
  • ForkJoinPool以外のBlockContextは, DefaultBlockContextであり, blockOnメソッドは空であるから, ForkJoinPool以外の時は「何もしない」である.

ForkJoinPoolについてはよく調べられていないのですが、今のところの理解は以下のようになりました。

  • work-stealing: fork() したタスクを join() するとタスクの終了までブロックするが、これはThread.sleepやIO待ちのブロックのようにスレッドが完全に何もしなくなるのとは違い「一度タスク実行を中断した上で、スレッドが他のタスクの処理を開始してくれるようになる」*1

  • 動的なスレッド生成: スケジューラが必要と判断したらスレッドを増やしてくれる。(ただしタスクがブロックしていることを自動で検出してくれたりはしないのでヒントを伝える必要がある)

スレッドを使いまわしてスレッドの生成コストやコンテキストスイッチを抑えつつ、本当に必要ならスレッド生成もしてくれる便利な奴といったところでしょうか。

blockingの中身

さて、肝心のblockingメソッドは(上の引用にもありますが)、scala.concurrentのパッケージオブジェクト内で定義されています。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/package.scala#L123

def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

実装を見るとBlockContext内のメソッドを呼んでいるようです。BlockContextは先程も出てきました。

BlockContextはどこにあるか

さてBlockContextがどこかに存在するらしいのでどこにあるのかを探すと以下の様なものが見つかります。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L41-L49 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L51-L65

trait BlockContext {
  def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T
}

object BlockContext {
  private object DefaultBlockContext extends BlockContext {
   override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk
 }

  private val contextLocal = new ThreadLocal[BlockContext]()

  /** Obtain the current thread's current `BlockContext`. */
  def current: BlockContext = contextLocal.get match {
    case null => Thread.currentThread match {
      case ctx: BlockContext => ctx
      case _ => DefaultBlockContext
    }
    case some => some
  }
//  ...
}

blockingメソッドで呼んでいたBlockContext.currentは現在のスレッドが「BlockContextを継承している場合は現在のスレッド」を、そうでない場合は「DefaultBlockContextというblockOnメソッドの中が何もせず中身を評価するだけのメソッドになっているBlockContextを返す」ようになっています。

これを見るとBlockContextThreadに継承させて使う物のようです。

ということは、blockOnが実際に何をやるかについて知りたい場合はここを見ても仕方がなさそうです。 IntelliJを使って気合で探すと ExecutionContextImpl 内で見つかります。*2

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L43-L54

def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
  override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
    var result: T = null.asInstanceOf[T]
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      @volatile var isdone = false
      override def block(): Boolean = {
        result = try thunk finally { isdone = true }
        true
      }
      override def isReleasable = isdone
    })
    result
  }
})

overrideしたblockOnでは色々やっていますがつまるところ ForkJoinPool.ManagedBlocker#blockでブロックするかもしれない処理を包んだ後にForkJoinPool.managedBlockというメソッドに渡しているようです。名前からしてblockする処理を渡すと何かマネージしてくれそうなオーラを感じます。

ManagedBlockerについて

ManagedBlockerについては上述した記事でも触れられていますが、 ForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 の「ForkJoinTaskの中で協調的に動作するSleepの実装例」が分かりやすかったです。

詳しい解説は元記事によりますが、isReleasableでまだブロックする必要があるかを判定し、終わっていなかったらblockメソッド呼んでくれるようです。

Sleepの例ではblockが何度も実行されるようになっていましたがExecutionContextImpl#newThreadの実装ではthunkの評価が終わるまでブロックする(そして終わり次第isReleasableがtrueになる)ので一度しか実行されないようです。

ManagedBlockerの内部実装は読んでいないのですが、 ForkJoinPool.ManagedBlocker (Java Platform SE 7 ) では

they allow more efficient internal handling of cases in which additional workers may be, but usually are not, needed to ensure sufficient parallelism

とあります。(usuallyとは・・。)ForkJoinPoolは分割統治アルゴリズムに向けたフレームワークということなのでDB操作でブロックするとかそういうことに使うのはusuallyから逸脱しているとかあるのでしょうか・・。どのくらいスレッドが増えてくれるのかは謎ですが、以下の記事にもスレッドが増えそうな言及がありました。

This will give the thread pool a chance to spawn new threads in order to prevent starvation

When informed that one of those threads is about to block, it compensates by starting an additional thread.

Welcome, ManagedBlocker — the way to signal to the ForkJoinPool that it should extend its parallelism, to compensate for potential blocked worker threads.

とりあえずどれだけ増えるか分からないけどManagedBlockerに渡せば必要なら増えてくれるという理解になりました。

ここまでのまとめ

ここまでの流れを整理すると以下のようになります。

  • blockingメソッドでブロックする処理を包むとThreadBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)
  • ExecutionContextImplではonBlock内でManagedBlockerにブロックする処理を渡すようなThreadを生成するような実装を行っている場所がどうやらあるらしい。
  • ManagedBlocker にブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。

ExecutionContextImplは何者なのか

ExecutionContextImplが出てきましたが、一部分しか見ていないのでもう少し詳しく見てみます。

ExecutionContextImplは名前の通りExecutionContextの元です。

実装上は以下のように利用されています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L119-L164

def global: ExecutionContextExecutor = Implicits.global

object Implicits {
  implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
}

def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
  impl.ExecutionContextImpl.fromExecutorService(e, reporter)

def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)

def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
  impl.ExecutionContextImpl.fromExecutor(e, reporter)

import scala.concurrent.ExecutionContext.Implicits.globalで使えるようになるExecutionContextfromExecutornullを渡すことで生成されているようです。 fromExecutorServiceもありますが、これはJavaと同じようにExecutorにいくつかのメソッドが追加されたExecutorServiceが生成されるというだけで大きくは違わず、最終的にはExecutionContextImplがnewされるようです。(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L132-L150)

ExecutionContextImpl は渡されたes: Executorを以下のようにパターンマッチしてnullの場合に備えています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L27-L30

val executor: Executor = es match {
  case null => createExecutorService
  case some => some
}

Implicits.globalglobalの際はnullが渡されるので、このcreateExecutorService が呼ばれるようです。(試してないですが、ExecutionContext.fromExecutor(null)とかすると標準のと同じExecutionContextが作れそうです。)

createExecutorServiceは以下のようなっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L74-L97

val threadFactory = new DefaultThreadFactory(daemonic = true)

try {
  new ForkJoinPool(
    desiredParallelism,
    threadFactory,
    uncaughtExceptionHandler,
    true) // Async all the way baby
} catch {...}

ついにForkJoinPoolが生成されました。そしてForkJoinPoolにはデフォルトの並列数*3と、DefaultThreadFactoryが渡されています。uncaughtExceptionHandlerは名前のとおりです。最後のasyncModetrueにすると通常スタックとして積まれるタスクがキューのようにFIFOになるようです。

asyncModeについての余談

playのconfigについてのドキュメントにこの設定について書いてありました。ThreadPools - 2.4.x

# Setting this to LIFO changes the fork-join-executor
# to use a stack discipline for task scheduling. This usually
# improves throughput at the cost of possibly increasing
# latency and risking task starvation (which should be rare).
task-peeking-mode = LIFO

この設定は正確にはakkaのdispatcherの設定です。akkaではtask-peeking-modeというキーでasyncModeが決められています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L443-L448

val asyncMode = config.getString("task-peeking-mode") match {
  case "FIFO"true
  case "LIFO"false
  case unsupported ⇒ throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
  """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}

LIFOにすると(少なくともplayでは)レイテンシが悪化してスループットが向上するようです。ちなみにakkaでは何も指定しないとFIFOになるようです。(https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/resources/reference.conf#L351)

DefaultThreadFactory

impl.ExecutionContextImpl.fromExecutornullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryの実装を見てみます。これはExecutionContextImplの内部クラスとして定義されています。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L33-L56

class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
  def wire[T <: Thread](thread: T): T = {
    thread.setDaemon(daemonic)
    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
    thread
  }

def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))

def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
  override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
  var result: T = null.asInstanceOf[T]
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      @volatile var isdone = false
      override def block(): Boolean = {
        result = try thunk finally { isdone = true }
        true
      }
      override def isReleasable = isdone
    })
    result
}

何か見覚えがあるなと思ったら、このDefaultThreadFactoryがまさに先ほどIntelliJを使って気合で探してきたBlockContextを使っている部分でした。

ここまでのまとめ

ここまでの流れを整理すると以下のようになります。

  • blockingメソッドでブロックする処理を包むとThreadBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)
  • impl.ExecutionContextImpl.fromExecutornullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryではonBlock内でManagedBlockerにブロックする処理を渡すようなBlockContextを継承したForkJoinWorkerThreadを生成するThreadFactoryを定義してForkJoinPoolに渡している。
  • scala.concurrent.ExecutionContext.Implicit.globalで取得できるExecutionContextimpl.ExecutionContextImpl.fromExecutornullを渡した結果生成された物で。
  • ManagedBlocker にブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。

ForkJoinPoolとfromExecutorService

ここまででglobalExecutionContextだとBlockContextが継承されたThreadが生成されて、blockingが良い感じになるということがわかりましたが、 ExecutionContext.fromExecutorService(new ForkJoinPool(100))とした場合はどうなるのか気になったので調べてみました。

scalaリポジトリ内のForkJoinPoo.java *4 を見ると以下のようになっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java

// L1403~1404
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

// L881~886
static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
  }
}

// L3678~3747
static {
  defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
}

// L2838~2939
public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

//

これを見るとForkJoinPoolForkJoinWorkerThreadFactoryを渡さない時にデフォルトで指定されるDefaultForkJoinWorkerThreadFactoryBlockContextを継承しないForkJoinWorkerThreadを生成するようです。(そもそもBlockContextscalaの物なのでJavaのクラスがデフォルトで継承してないのは当たり前な気がしてきました。)

ということはExecutionContext.fromExecutorService(new ForkJoinPool(100))のように指定してしまうとblockingで包んでも何もしてくれないDefaultBlockContextで処理されることになりそうです。

globalでないExecutionContextを使う際は予めブロックすることを前提にスレッドを多めに作ったりするものの少し注意が必要なようです。(動的にスレッドを増やすより、予め作っておいたほうが早く処理できると思うのでblockingで包まない戦略もある程度ありな気がしています)

akkaのdispatcher

akkaのDispatcherExecutionContextとして使えるのですが、そちらを使えば自分でForkJoinWorkerThreadFactoryを定義しなくても済みそうです。

akkaのThreadFactoryMonitorableThreadFactoryになっていてこれが、AkkaForkJoinWorkerThreadを生成しています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L183-L188

final case class MonitorableThreadFactory(name: String, daemonic: Boolean, contextClassLoader: Option[ClassLoader], exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing, protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
  def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
    val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool))
    // Name of the threads for the ForkJoinPool are not customizable. Change it here.
    t.setName(name + "-" + counter.incrementAndGet())
    t
  }

そしてAkkaForkJoinWorkerThreadBlockContextを継承しています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L161-L174

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
  override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
    val result = new AtomicReference[Option[T]](None)
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      def block(): Boolean = {
        result.set(Some(thunk))
        true
      }
      def isReleasable = result.get.isDefined
    })
    result.get.get // Exception intended if None
  }
}

blockOnの中身はExecutioContext.globalで生成されるものとそんなに変わらなさそうです。

MonitorableThreadFactoryActorSystemImplで定義されて、Dispatchersに渡されています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L572-L573 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L642-L643

final val threadFactory: MonitorableThreadFactory =
  MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)

val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
  threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))

ActorSystem.applyからdispatchers経由でAkkaForkJoinPoolを生成しつつ指定がなければAkkaForkJoinPoolが生成されるようです。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L321 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L380-L396

// 断片のみ
executor match {
      case null | "" | "fork-join-executor"new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
      case "thread-pool-executor"new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
}
final class AkkaForkJoinPool(parallelism: Int,
                             threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
                             unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
                             asyncMode: Boolean)
  extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics {
  def this(parallelism: Int,
           threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
           unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true)

  override def execute(r: Runnable): Unit =
    if (r ne null)
      super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
    else
      throw new NullPointerException("Runnable was null")

  def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}

この中間部分も調べたのですが、この記事の分量が倍になるくらい色々経由していたので割愛します(;´Д`)

結論としては、akkaのdispatcherfork-join-executorを指定しておけば、blockingが使えるExecutionContextが取得できそうです。

おまけ: scala.2.12.xでのDefaultThreadFactoryの改善

akkaのDispatcherを使わなくてもForkJoinPoolのコンストラクタにExecutionContextImpl.DefaultThreadFactoryと同じものを渡せば問題はないはずです。 注意点としてscala2.12.0-M3のコードを見ると以下の様な注意書きが見つかります。 https://github.com/scala/scala/blob/v2.12.0-M3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L78-L79

// When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk }

どうもscala2.11.xのコードではblockingをネストさせるとスケジューラにヒントが行き過ぎてしまう問題があるようです。コピペして使う際には注意が必要そうです。

おまけ: Future.applyとExecutionContext

よく考えたらFutureExecutionContextがどうやって使われてるのかよく知らなかったので調べてみました。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/Future.scala#L492

  def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/Future.scala#L18-L34

class PromiseCompletingRunnable[T](body: => T) extends Runnable {
  val promise = new Promise.DefaultPromise[T]()

  override def run() = {
    promise complete {
      try Success(body) catch { case NonFatal(e) => Failure(e) }
    }
  }
}

def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
  val runnable = new PromiseCompletingRunnable(body)
  executor.prepare.execute(runnable)
  runnable.promise.future
}

コードを見る限りexecuteが呼ばれているようです。*5

executeメソッドはExecutionContextImplを見てみると以下の様な実装になっていました。(DefaultThreadFactoryではないのでfromExecutorService非nullを渡してもこのメソッドが呼ばれます) https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L99-L110

def execute(runnable: Runnable): Unit = executor match {
  case fj: ForkJoinPool =>
    val fjt: ForkJoinTask[_] = runnable match {
      case t: ForkJoinTask[_] => t
      case r                  => new ExecutionContextImpl.AdaptedForkJoinTask(r)
    }
    Thread.currentThread match {
      case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork()
      case _                                              => fj execute fjt
    }
  case generic => generic execute runnable
}

AdaptedForkJoinTaskは大雑把にはrunnableForkJoinTaskでラップするような実装です。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L118-L130

全体としてはプールがForkJoinPoolならrunnableをとりあえずForkJoinTaskにしてfork()を呼ぶという感じになっているようです。

joinを明示的に行っている部分は見つからなかったので、そういったwork-stealな効果は使っていないのか別の部分で使っているのかは不明です。特にFuturePromise の実装をほとんど見てないのでこの部分は適当です。

感想

以上です。結論は一番上に書いてあります。 akkaのdispatcherもそれなりに追ったのですがコードがわりと飛んででいるので紹介するのは厳しさを感じ断念しました。(コンフィグファイル越しにかなり柔軟に設定できるので仕方ない気がします) コードリーディングの結果をブログに書こうとすると死ぬけど、何かしら読んだメモは残したいしどうしようみたいな気持ちになった。(小学生以下の感想)

*1:かぎかっこ内の表現はForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 を意識しました

*2:BatchingExecutorでも見つかる(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BatchingExecutor.scala#L47-L99)のですが、ForkJoinPoolの話とは少しずれてしまうので割愛します

*3:Runtime.availableProcessorsによって得られる論理コア数と同じ数になるようです。http://docs.scala-lang.org/overviews/core/futures.html

*4:scala 2.11.xではJava8以前の処理系にも対応するためにforkjoinパッケージを自前で保持していますが、2.12.xではJava8を前提とするためこのコミットで依存が取り除かれているようです

*5:executor.prepareメソッドについても調べたのですが、https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L75-L89のコメントがなるほど分からんという感じだったのでスルーしています