この記事の結論
global
なExecutionContext
ではブロックする処理をblocking
で包むとスレッド数が勝手に増えるから空きスレッドが無くて実行できないといったことを防げる。ExecutionContext.fromExecutorService(new ForkJoinPool(100))
で生成されるThread
はBlockContext
トレイトを継承してないのでblocking
を使ってもスレッド数を増やした方が良いという情報がスケジューラに伝わらない。akkaの
dispatcher
をExecutionContext
として使うとBlockContext
付きのForkJoinPool
を簡単に作れる。ExecutionContext.fromExecutorService
でもForkJoinPool
のコンストラクタに自前定義したThreadFactory
を渡すようにすればblockContext
を渡せる。scala 2.11.xの実装はあまり重大でない(?)バグがあるから2.12.xからコピペしても良いかもしれない。
追記:この記事の続編として blockingとOOM [scala] - だいたいよくわからないブログ を書きました。blocking
があることによって困ることもあるので用法用量を守って使うことが求められます。
追記の追記: 続編に書いたblockingでOOMになる問題はscala 2.12で解決済みのようなのでそこまで気にしなくて良さそうです。
blockingとは・・・
def blocking[T](body: ⇒ T): T
Scala Standard Library API (Scaladoc) 2.10.0
Used to designate a piece of code which potentially blocks, allowing the current BlockContext to adjust the runtime’s behavior.
Properly marking blocking code may improve performance or avoid deadlocks.
ランタイムビヘイビアーが調節されますって言われても困る(´;ω;`)
ということで blocking
っていうメソッドが謎に包まれていたのでちょっと調べました。
解説というより自分用メモなのあえてまとめて無くてリファレンスとして持ってこれるように冗長になっています。なのでこの記事を副読本がてらコードを実際に追ったほうが速いと思います(◞‸◟) そして、調べ初めなので間違っている可能性もあります。(特にスケジューラ周りがあやふや)
ForkJoinPool
, ManagedBlocker#onBlock
, BlockContext
については下記のエントリが詳しいです。
ForkJoinPoolとblocking - テストステ論
以下は引用です。
- BlockContextというのがどこかに存在し, ブロックしたい場合はそこに通知がいく
- 通知はヒントであり, BlockContextが必要と判断すればスレッドを追加する
- blockingは, BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)である.
- ForkJoinPool以外のBlockContextは, DefaultBlockContextであり, blockOnメソッドは空であるから, ForkJoinPool以外の時は「何もしない」である.
ForkJoinPoolについてはよく調べられていないのですが、今のところの理解は以下のようになりました。
work-stealing:
fork()
したタスクをjoin()
するとタスクの終了までブロックするが、これはThread.sleep
やIO待ちのブロックのようにスレッドが完全に何もしなくなるのとは違い「一度タスク実行を中断した上で、スレッドが他のタスクの処理を開始してくれるようになる」*1動的なスレッド生成: スケジューラが必要と判断したらスレッドを増やしてくれる。(ただしタスクがブロックしていることを自動で検出してくれたりはしないのでヒントを伝える必要がある)
スレッドを使いまわしてスレッドの生成コストやコンテキストスイッチを抑えつつ、本当に必要ならスレッド生成もしてくれる便利な奴といったところでしょうか。
blockingの中身
さて、肝心のblockingメソッドは(上の引用にもありますが)、scala.concurrent
のパッケージオブジェクト内で定義されています。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/package.scala#L123
def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)
実装を見るとBlockContext内のメソッドを呼んでいるようです。BlockContext
は先程も出てきました。
BlockContextはどこにあるか
さてBlockContext
がどこかに存在するらしいのでどこにあるのかを探すと以下の様なものが見つかります。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L41-L49 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L51-L65
trait BlockContext { def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T } object BlockContext { private object DefaultBlockContext extends BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk } private val contextLocal = new ThreadLocal[BlockContext]() /** Obtain the current thread's current `BlockContext`. */ def current: BlockContext = contextLocal.get match { case null => Thread.currentThread match { case ctx: BlockContext => ctx case _ => DefaultBlockContext } case some => some } // ... }
blocking
メソッドで呼んでいたBlockContext.current
は現在のスレッドが「BlockContextを継承している場合は現在のスレッド」を、そうでない場合は「DefaultBlockContext
というblockOnメソッドの中が何もせず中身を評価するだけのメソッドになっているBlockContextを返す」ようになっています。
これを見るとBlockContext
はThread
に継承させて使う物のようです。
ということは、blockOn
が実際に何をやるかについて知りたい場合はここを見ても仕方がなさそうです。
IntelliJを使って気合で探すと ExecutionContextImpl
内で見つかります。*2
def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false override def block(): Boolean = { result = try thunk finally { isdone = true } true } override def isReleasable = isdone }) result } })
overrideしたblockOn
では色々やっていますがつまるところ ForkJoinPool.ManagedBlocker#block
でブロックするかもしれない処理を包んだ後にForkJoinPool.managedBlock
というメソッドに渡しているようです。名前からしてblockする処理を渡すと何かマネージしてくれそうなオーラを感じます。
ManagedBlockerについて
ManagedBlocker
については上述した記事でも触れられていますが、 ForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 の「ForkJoinTaskの中で協調的に動作するSleepの実装例」が分かりやすかったです。
詳しい解説は元記事によりますが、isReleasable
でまだブロックする必要があるかを判定し、終わっていなかったらblock
メソッド呼んでくれるようです。
Sleepの例ではblock
が何度も実行されるようになっていましたがExecutionContextImpl#newThread
の実装ではthunk
の評価が終わるまでブロックする(そして終わり次第isReleasable
がtrueになる)ので一度しか実行されないようです。
ManagedBlockerの内部実装は読んでいないのですが、 ForkJoinPool.ManagedBlocker (Java Platform SE 7 ) では
they allow more efficient internal handling of cases in which additional workers may be, but usually are not, needed to ensure sufficient parallelism
とあります。(usuallyとは・・。)ForkJoinPoolは分割統治アルゴリズムに向けたフレームワークということなのでDB操作でブロックするとかそういうことに使うのはusuallyから逸脱しているとかあるのでしょうか・・。どのくらいスレッドが増えてくれるのかは謎ですが、以下の記事にもスレッドが増えそうな言及がありました。
This will give the thread pool a chance to spawn new threads in order to prevent starvation
When informed that one of those threads is about to block, it compensates by starting an additional thread.
Welcome, ManagedBlocker — the way to signal to the ForkJoinPool that it should extend its parallelism, to compensate for potential blocked worker threads.
とりあえずどれだけ増えるか分からないけどManagedBlocker
に渡せば必要なら増えてくれるという理解になりました。
ここまでのまとめ
ここまでの流れを整理すると以下のようになります。
blocking
メソッドでブロックする処理を包むとThread
がBlockContext
を継承していればblockOn
によって何らかの処理が行われる。(継承していない場合は何もしないblockOn
が呼ばれる)ExecutionContextImpl
ではonBlock
内でManagedBlocker
にブロックする処理を渡すようなThread
を生成するような実装を行っている場所がどうやらあるらしい。ManagedBlocker
にブロックする処理を渡すとForkJoinPool
のスケジューラがスレッド数を(必要なら)増やしてくれる。
ExecutionContextImplは何者なのか
ExecutionContextImpl
が出てきましたが、一部分しか見ていないのでもう少し詳しく見てみます。
ExecutionContextImpl
は名前の通りExecutionContext
の元です。
実装上は以下のように利用されています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L119-L164
def global: ExecutionContextExecutor = Implicits.global object Implicits { implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor) } def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService = impl.ExecutionContextImpl.fromExecutorService(e, reporter) def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter) def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(e, reporter)
import scala.concurrent.ExecutionContext.Implicits.global
で使えるようになるExecutionContext
はfromExecutor
にnull
を渡すことで生成されているようです。
fromExecutorService
もありますが、これはJavaと同じようにExecutor
にいくつかのメソッドが追加されたExecutorService
が生成されるというだけで大きくは違わず、最終的にはExecutionContextImpl
がnewされるようです。(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L132-L150)
ExecutionContextImpl
は渡されたes: Executor
を以下のようにパターンマッチしてnullの場合に備えています。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L27-L30
val executor: Executor = es match { case null => createExecutorService case some => some }
Implicits.global
やglobal
の際はnull
が渡されるので、このcreateExecutorService
が呼ばれるようです。(試してないですが、ExecutionContext.fromExecutor(null)
とかすると標準のと同じExecutionContext
が作れそうです。)
createExecutorService
は以下のようなっています。(長いので抜粋)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L74-L97
val threadFactory = new DefaultThreadFactory(daemonic = true) try { new ForkJoinPool( desiredParallelism, threadFactory, uncaughtExceptionHandler, true) // Async all the way baby } catch {...}
ついにForkJoinPoolが生成されました。そしてForkJoinPool
にはデフォルトの並列数*3と、DefaultThreadFactory
が渡されています。uncaughtExceptionHandler
は名前のとおりです。最後のasyncMode
をtrue
にすると通常スタックとして積まれるタスクがキューのようにFIFOになるようです。
asyncModeについての余談
playのconfigについてのドキュメントにこの設定について書いてありました。ThreadPools - 2.4.x
# Setting this to LIFO changes the fork-join-executor # to use a stack discipline for task scheduling. This usually # improves throughput at the cost of possibly increasing # latency and risking task starvation (which should be rare). task-peeking-mode = LIFO
この設定は正確にはakkaのdispatcherの設定です。akkaではtask-peeking-mode
というキーでasyncModeが決められています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L443-L448
val asyncMode = config.getString("task-peeking-mode") match { case "FIFO" ⇒ true case "LIFO" ⇒ false case unsupported ⇒ throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " + """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""") }
LIFOにすると(少なくともplayでは)レイテンシが悪化してスループットが向上するようです。ちなみにakkaでは何も指定しないとFIFOになるようです。(https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/resources/reference.conf#L351)
DefaultThreadFactory
impl.ExecutionContextImpl.fromExecutor
にnull
を渡した時にしか呼ばれないcreateExecutorService
の中で生成されているDefaultThreadFactory
の実装を見てみます。これはExecutionContextImpl
の内部クラスとして定義されています。
class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def wire[T <: Thread](thread: T): T = { thread.setDaemon(daemonic) thread.setUncaughtExceptionHandler(uncaughtExceptionHandler) thread } def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false override def block(): Boolean = { result = try thunk finally { isdone = true } true } override def isReleasable = isdone }) result }
何か見覚えがあるなと思ったら、このDefaultThreadFactory
がまさに先ほどIntelliJを使って気合で探してきたBlockContext
を使っている部分でした。
ここまでのまとめ
ここまでの流れを整理すると以下のようになります。
blocking
メソッドでブロックする処理を包むとThread
がBlockContext
を継承していればblockOn
によって何らかの処理が行われる。(継承していない場合は何もしないblockOn
が呼ばれる)impl.ExecutionContextImpl.fromExecutor
にnull
を渡した時にしか呼ばれないcreateExecutorService
の中で生成されているDefaultThreadFactory
ではonBlock
内でManagedBlocker
にブロックする処理を渡すようなBlockContext
を継承したForkJoinWorkerThread
を生成するThreadFactory
を定義してForkJoinPool
に渡している。scala.concurrent.ExecutionContext.Implicit.global
で取得できるExecutionContext
はimpl.ExecutionContextImpl.fromExecutor
にnull
を渡した結果生成された物で。ManagedBlocker
にブロックする処理を渡すとForkJoinPool
のスケジューラがスレッド数を(必要なら)増やしてくれる。
ForkJoinPoolとfromExecutorService
ここまででglobal
なExecutionContext
だとBlockContext
が継承されたThread
が生成されて、blockingが良い感じになるということがわかりましたが、
ExecutionContext.fromExecutorService(new ForkJoinPool(100))
とした場合はどうなるのか気になったので調べてみました。
scalaリポジトリ内のForkJoinPoo.java *4 を見ると以下のようになっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
// L1403~1404 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; // L881~886 static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } // L3678~3747 static { defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); } // L2838~2939 public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } //
これを見るとForkJoinPool
にForkJoinWorkerThreadFactory
を渡さない時にデフォルトで指定されるDefaultForkJoinWorkerThreadFactory
はBlockContext
を継承しないForkJoinWorkerThread
を生成するようです。(そもそもBlockContext
はscalaの物なのでJavaのクラスがデフォルトで継承してないのは当たり前な気がしてきました。)
ということはExecutionContext.fromExecutorService(new ForkJoinPool(100))
のように指定してしまうとblocking
で包んでも何もしてくれないDefaultBlockContext
で処理されることになりそうです。
globalでないExecutionContext
を使う際は予めブロックすることを前提にスレッドを多めに作ったりするものの少し注意が必要なようです。(動的にスレッドを増やすより、予め作っておいたほうが早く処理できると思うのでblockingで包まない戦略もある程度ありな気がしています)
akkaのdispatcher
akkaのDispatcher
はExecutionContext
として使えるのですが、そちらを使えば自分でForkJoinWorkerThreadFactory
を定義しなくても済みそうです。
akkaのThreadFactory
はMonitorableThreadFactory
になっていてこれが、AkkaForkJoinWorkerThread
を生成しています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L183-L188
final case class MonitorableThreadFactory(name: String, daemonic: Boolean, contextClassLoader: Option[ClassLoader], exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing, protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool)) // Name of the threads for the ForkJoinPool are not customizable. Change it here. t.setName(name + "-" + counter.incrementAndGet()) t }
そしてAkkaForkJoinWorkerThread
はBlockContext
を継承しています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L161-L174
private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext { override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = { val result = new AtomicReference[Option[T]](None) ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { def block(): Boolean = { result.set(Some(thunk)) true } def isReleasable = result.get.isDefined }) result.get.get // Exception intended if None } }
blockOn
の中身はExecutioContext.global
で生成されるものとそんなに変わらなさそうです。
MonitorableThreadFactory
はActorSystemImpl
で定義されて、Dispatchers
に渡されています。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L572-L573
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L642-L643
final val threadFactory: MonitorableThreadFactory = MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler) val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
ActorSystem.apply
からdispatchers
経由でAkkaForkJoinPool
を生成しつつ指定がなければAkkaForkJoinPool
が生成されるようです。
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L321
https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L380-L396
// 断片のみ executor match { case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) }
final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler, asyncMode: Boolean) extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics { def this(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true) override def execute(r: Runnable): Unit = if (r ne null) super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]]) else throw new NullPointerException("Runnable was null") def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() }
この中間部分も調べたのですが、この記事の分量が倍になるくらい色々経由していたので割愛します(;´Д`)
結論としては、akkaのdispatcher
でfork-join-executor
を指定しておけば、blocking
が使えるExecutionContext
が取得できそうです。
おまけ: scala.2.12.xでのDefaultThreadFactoryの改善
akkaのDispatcherを使わなくてもForkJoinPool
のコンストラクタにExecutionContextImpl.DefaultThreadFactory
と同じものを渡せば問題はないはずです。
注意点としてscala2.12.0-M3のコードを見ると以下の様な注意書きが見つかります。
https://github.com/scala/scala/blob/v2.12.0-M3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L78-L79
// When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk }
どうもscala2.11.xのコードではblocking
をネストさせるとスケジューラにヒントが行き過ぎてしまう問題があるようです。コピペして使う際には注意が必要そうです。
おまけ: Future.applyとExecutionContext
よく考えたらFuture
でExecutionContext
がどうやって使われてるのかよく知らなかったので調べてみました。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/Future.scala#L492
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/Future.scala#L18-L34
class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } } def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.prepare.execute(runnable) runnable.promise.future }
コードを見る限りexecute
が呼ばれているようです。*5
execute
メソッドはExecutionContextImpl
を見てみると以下の様な実装になっていました。(DefaultThreadFactory
ではないのでfromExecutorService
に非null
を渡してもこのメソッドが呼ばれます)
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L99-L110
def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => val fjt: ForkJoinTask[_] = runnable match { case t: ForkJoinTask[_] => t case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) } Thread.currentThread match { case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() case _ => fj execute fjt } case generic => generic execute runnable }
AdaptedForkJoinTask
は大雑把にはrunnable
をForkJoinTask
でラップするような実装です。
https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L118-L130
全体としてはプールがForkJoinPool
ならrunnable
をとりあえずForkJoinTask
にしてfork()
を呼ぶという感じになっているようです。
join
を明示的に行っている部分は見つからなかったので、そういったwork-stealな効果は使っていないのか別の部分で使っているのかは不明です。特にFuture
やPromise
の実装をほとんど見てないのでこの部分は適当です。
感想
以上です。結論は一番上に書いてあります。 akkaのdispatcherもそれなりに追ったのですがコードがわりと飛んででいるので紹介するのは厳しさを感じ断念しました。(コンフィグファイル越しにかなり柔軟に設定できるので仕方ない気がします) コードリーディングの結果をブログに書こうとすると死ぬけど、何かしら読んだメモは残したいしどうしようみたいな気持ちになった。(小学生以下の感想)
*1:かぎかっこ内の表現はForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 を意識しました
*2:BatchingExecutorでも見つかる(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BatchingExecutor.scala#L47-L99)のですが、ForkJoinPoolの話とは少しずれてしまうので割愛します
*3:Runtime.availableProcessorsによって得られる論理コア数と同じ数になるようです。http://docs.scala-lang.org/overviews/core/futures.html
*4:scala 2.11.xではJava8以前の処理系にも対応するためにforkjoinパッケージを自前で保持していますが、2.12.xではJava8を前提とするためこのコミットで依存が取り除かれているようです
*5:executor.prepareメソッドについても調べたのですが、https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L75-L89のコメントがなるほど分からんという感じだったのでスルーしています