pony runtimeメモ actor dispatch編 [ponylang]

pony runtime

ponyのランタイムについての論文を読んでいる中でActorの仕組みについての部分を見て現状の実装はどうなっているのか気になったので少しだけ見てみました。

論文は2013年ですが現時点(2016/3/5)での最新コミットのソースを見てみます。 *1

actor.h

まずはActorから https://github.com/ponylang/ponyc/tree/d888dab8bdcc4e02ce043f41cad62a9adcc88b5d/src/libponyrt/actor の辺のディレクトリです。

.hを見ると以下のようになっています、

typedef struct pony_actor_t
{
  pony_type_t* type;
  messageq_t q;
  pony_msg_t* continuation;
  uint8_t flags;

  // keep things accessed by other actors on a separate cache line
  __pony_spec_align__(heap_t heap, 64); // 52/104 bytes
  gc_t gc; // 44/80 bytes
} pony_actor_t;

pony_type_t

pony_type_t は実行時の型情報が入ってるみたいなコメントがありました。vtablesとかあるのでそういうことだと思います。

messageq_t, pony_msg_t

messageq_t は各actorが持っているメッセージキューのことです。 pony_msg_t* continuation というのが後続のメッセージになっています。

pony_spec_align, gc_t

キャッシュラインを分けるために何かスペーシングをしてるなどの細かい工夫がかいま見えます?(あんまり良くわかってないです。。。)

gc_tは文字通りgcに関する情報のようです。具体的には内部のオブジェクトのマップやアクターの参照のマップなどが入っていそうです。ponyのgc機構であるpony ORCAはactor内部のデータが外部からどのくらい参照されているかをactor内部でカウントとして持っておくような機構があるようなのでその辺のことをやってくれるはずです。

関数

actor.h内のその他の関数を見ると、actor_run, actor_destroyなどそれっぽい関数が並んでいます。

bool actor_run(pony_ctx_t* ctx, pony_actor_t* actor, size_t batch);
void actor_destroy(pony_actor_t* actor);
gc_t* actor_gc(pony_actor_t* actor);
heap_t* actor_heap(pony_actor_t* actor);
...

ちなみに論文では以下のようになっています

bool actor_run(actor_t* actor);
void actor_destroy(actor_t* actor, map_t* cycle);
void actor_sendv(actor_t* to, uint64_t id, int argc, arg_t* argv);
actor_t* actor_create(actor_type_t* type);
void pony_send(actor_t* to, uint64_t id);
void pony_sendv(actor_t* to, uint64_t id, int argc, arg_t* argv);
/* convenience functions are omitted */

sendやsendvはどこにいったのでしょうか・・とおもったらactor.cにいました。

ponyのschedulerにactor_runを呼ばれたら、アクターが処理を開始する仕組みになっているようです。

actor.c

論文Listing 2.2では actor_type_t として以下の構造が宣言されています。

typedef const struct actor_type_t
{
  trace_fn trace;
  message_type_fn message_type;
  dispatch_fn dispatch;
  actor_type_t;
}

ところがgithubの方ではversion0.1まで遡ってもありませんでした。 dispatchしているところをみると、これらの情報は先ほど軽くみたpony_type_t に含まれているようです。 ということで現行の実装を追ってみます。今回はactorの生成 => dispatch => destroyの流れを見てみます。ただしGC(destroyがいつ呼ばれるか)、schdulerに関しては別の処理が増えるので深追いはしないことにします。

actor creation

まずactorが生成されるところをザクっと見てみます。

pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type)
{
  assert(type != NULL);

#ifdef USE_TELEMETRY
  ctx->count_alloc_actors++;
#endif

  // allocate variable sized actors correctly
  pony_actor_t* actor = (pony_actor_t*)pool_alloc_size(type->size);
  memset(actor, 0, type->size);
  actor->type = type;

  messageq_init(&actor->q);
  heap_init(&actor->heap);
  gc_done(&actor->gc);

  if(ctx->current != NULL)
  {
    // actors begin unblocked and referenced by the creating actor
    actor->gc.rc = GC_INC_MORE;
    gc_createactor(ctx->current, actor);
  } else {
    // no creator, so the actor isn't referenced by anything
    actor->gc.rc = 0;
  }

  return actor;
}

引数のpony_ctx_tscheduler_initすると生成されるようです。start.cという見るからにそれっぽいところで呼ばれています。 もう一つのpony_type_tは先ほどから何度か出ている型情報を入れる構造体です。

処理の内容としてはnullチェックしてcount増やしてメモリallocして〜という処理につづいて, messageq_init, heap_init によってmessage queueの初期化ヒープの初期化を行っています。 heapの方は次のgcを行うメモリ量が決められています。1 << 14 だと16KBほどでしょうか。heap_setinitialgcという関数で初期値を変えられるようです。この関数はstart.cで呼ばれていました。実行時引数か何かで渡せそうです。 次に、actorのリファレンスカウントを決めています。ctx->currentは現在処理中のアクターのはずなので実行中のアクターがいないとはこれいかに・・・ということでもう少し見てみます。

まずschedulerが初期化された瞬間のctx->currentはnullのような気がしますactor_runを読んでしまうとcurrentは上書きされてしまうのでそこまでのタイミングでアクターが作られている必要がありそうです。 gen_mainを見てみるとpony_initをした後すぐにctxを読み取っています。pony_initは先程のstart.cの関数です。pony_initの直後に呼ばれているcreate_main内では pony_createを呼んでいます。この時点ではまだctx->currentはnullになっていそうです。ということでmainアクターは// no creator, so the actor isn't referenced by anythingの処理に入るのではないかと思いました。

またpony_becomeをnullで呼び出してもctx->currentはnullになるようです。(これはpony_runtime外からpony APIを呼び出す用みたいなことがコメントに書いてあるのでCなどから呼ぶ時に色々出来るのでしょうか・・・)

dispatch

さてactorがメッセージを処理する際はschedulerがactor_runを読んでくれます。 コードは以下です。

bool actor_run(pony_ctx_t* ctx, pony_actor_t* actor, size_t batch)
{
  ctx->current = actor;

  pony_msg_t* msg;
  size_t app = 0;

  while(actor->continuation != NULL)
  {
    msg = actor->continuation;
    actor->continuation = msg->next;
    bool ret = handle_message(ctx, actor, msg);
    pool_free(msg->size, msg);

    if(ret)
    {
      // If we handle an application message, try to gc.
      app++;
      try_gc(ctx, actor);

      if(app == batch)
        return !has_flag(actor, FLAG_UNSCHEDULED);
    }
  }

  while((msg = messageq_pop(&actor->q)) != NULL)
  {
    if(handle_message(ctx, actor, msg))
    {
      // If we handle an application message, try to gc.
      app++;
      try_gc(ctx, actor);

      if(app == batch)
        return !has_flag(actor, FLAG_UNSCHEDULED);
    }
  }

  // We didn't hit our app message batch limit. We now believe our queue to be
  // empty, but we may have received further messages.
  assert(app < batch);
  try_gc(ctx, actor);

  if(has_flag(actor, FLAG_UNSCHEDULED))
  {
    // When unscheduling, don't mark the queue as empty, since we don't want
    // to get rescheduled if we receive a message.
    return false;
  }

  // If we have processed any application level messages, defer blocking.
  if(app > 0)
    return true;

  // Tell the cycle detector we are blocking. We may not actually block if a
  // message is received between now and when we try to mark our queue as
  // empty, but that's ok, we have still logically blocked.
  if(!has_flag(actor, FLAG_BLOCKED | FLAG_SYSTEM) ||
    has_flag(actor, FLAG_RC_CHANGED))
  {
    set_flag(actor, FLAG_BLOCKED);
    unset_flag(actor, FLAG_RC_CHANGED);
    cycle_block(ctx, actor, &actor->gc);
  }

  // Return true (i.e. reschedule immediately) if our queue isn't empty.
  return !messageq_markempty(&actor->q);
}

continuationから読み取ってhandle_messageを読んでいます。メッセージがなければmessageqからpopしています。handle_message以降はgcやscheduler関連のようなので割愛します。handle_messageではmsg->idをベースにswitch-caseで分岐しています。といってもdefault以外はGC系のシステムメッセージのようです。そして最後にtype->dispatchを呼んで終わりです。dispatchの中身は多分libponycなどで生成されている気がします。

destroy

さて最後のdestroyですが、いつ呼ばれるのかを気にしなければdestroyしていくだけなので簡単です。

void actor_destroy(pony_actor_t* actor)
{
  assert(has_flag(actor, FLAG_PENDINGDESTROY));

  // Make sure the actor being destroyed has finished marking its queue
  // as empty. Otherwise, it may spuriously see that tail and head are not
  // the same and fail to mark the queue as empty, resulting in it getting
  // rescheduled.
  pony_msg_t* head = _atomic_load(&actor->q.head);

  while(((uintptr_t)head & (uintptr_t)1) != (uintptr_t)1)
    head = _atomic_load(&actor->q.head);

  messageq_destroy(&actor->q);
  gc_destroy(&actor->gc);
  heap_destroy(&actor->heap);

  // Free variable sized actors correctly.
  pool_free_size(actor->type->size, actor);
}

まとめ

schedulerのワークスティーリングの部分とGCの部分はまだ読めてないですが、Actorの基本的な構造と振る舞いはわかってきた気がします。 またやるか分かりませんがコードリーディングにはちょうど良さそうなので読むのはやっていきたいなと思いました。

*1:0.2.1以降のリリースが途絶えて久しいんですが何か大きい機能を中で作っているのでしょうか・・。コミットも続いてるし、3月7~9日のどこかの日にUsing Pony for Fintechという挑戦的なタイトルでCTOの人が登壇するらしいのでプロジェクト自体はアクティブのようです。

ExecutionContextとblockingについて調べたメモ [scala]

この記事の結論

  • globalExecutionContextではブロックする処理をblockingで包むとスレッド数が勝手に増えるから空きスレッドが無くて実行できないといったことを防げる。

  • ExecutionContext.fromExecutorService(new ForkJoinPool(100)) で生成されるThreadBlockContextトレイトを継承してないのでblockingを使ってもスレッド数を増やした方が良いという情報がスケジューラに伝わらない。

  • akkaのdispatcherExecutionContextとして使うとBlockContext付きのForkJoinPoolを簡単に作れる。

  • ExecutionContext.fromExecutorServiceでもForkJoinPoolのコンストラクタに自前定義したThreadFactoryを渡すようにすればblockContextを渡せる。scala 2.11.xの実装はあまり重大でない(?)バグがあるから2.12.xからコピペしても良いかもしれない。

追記:この記事の続編として blockingとOOM [scala] - だいたいよくわからないブログ を書きました。blockingがあることによって困ることもあるので用法用量を守って使うことが求められます。 追記の追記: 続編に書いたblockingでOOMになる問題はscala 2.12で解決済みのようなのでそこまで気にしなくて良さそうです。

blockingとは・・・

def blocking[T](body: ⇒ T): T

Scala Standard Library API (Scaladoc) 2.10.0

Used to designate a piece of code which potentially blocks, allowing the current BlockContext to adjust the runtime’s behavior.
Properly marking blocking code may improve performance or avoid deadlocks.

ランタイムビヘイビアーが調節されますって言われても困る(´;ω;`)

ということで blocking っていうメソッドが謎に包まれていたのでちょっと調べました。 解説というより自分用メモなのあえてまとめて無くてリファレンスとして持ってこれるように冗長になっています。なのでこの記事を副読本がてらコードを実際に追ったほうが速いと思います(◞‸◟) そして、調べ初めなので間違っている可能性もあります。(特にスケジューラ周りがあやふや)

ForkJoinPool, ManagedBlocker#onBlock, BlockContext については下記のエントリが詳しいです。 ForkJoinPoolとblocking - テストステ論

以下は引用です。

  • BlockContextというのがどこかに存在し, ブロックしたい場合はそこに通知がいく
  • 通知はヒントであり, BlockContextが必要と判断すればスレッドを追加する
  • blockingは, BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)である.
  • ForkJoinPool以外のBlockContextは, DefaultBlockContextであり, blockOnメソッドは空であるから, ForkJoinPool以外の時は「何もしない」である.

ForkJoinPoolについてはよく調べられていないのですが、今のところの理解は以下のようになりました。

  • work-stealing: fork() したタスクを join() するとタスクの終了までブロックするが、これはThread.sleepやIO待ちのブロックのようにスレッドが完全に何もしなくなるのとは違い「一度タスク実行を中断した上で、スレッドが他のタスクの処理を開始してくれるようになる」*1

  • 動的なスレッド生成: スケジューラが必要と判断したらスレッドを増やしてくれる。(ただしタスクがブロックしていることを自動で検出してくれたりはしないのでヒントを伝える必要がある)

スレッドを使いまわしてスレッドの生成コストやコンテキストスイッチを抑えつつ、本当に必要ならスレッド生成もしてくれる便利な奴といったところでしょうか。

blockingの中身

さて、肝心のblockingメソッドは(上の引用にもありますが)、scala.concurrentのパッケージオブジェクト内で定義されています。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/package.scala#L123

def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)(scala.concurrent.AwaitPermission)

実装を見るとBlockContext内のメソッドを呼んでいるようです。BlockContextは先程も出てきました。

BlockContextはどこにあるか

さてBlockContextがどこかに存在するらしいのでどこにあるのかを探すと以下の様なものが見つかります。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L41-L49 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BlockContext.scala#L51-L65

trait BlockContext {
  def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T
}

object BlockContext {
  private object DefaultBlockContext extends BlockContext {
   override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk
 }

  private val contextLocal = new ThreadLocal[BlockContext]()

  /** Obtain the current thread's current `BlockContext`. */
  def current: BlockContext = contextLocal.get match {
    case null => Thread.currentThread match {
      case ctx: BlockContext => ctx
      case _ => DefaultBlockContext
    }
    case some => some
  }
//  ...
}

blockingメソッドで呼んでいたBlockContext.currentは現在のスレッドが「BlockContextを継承している場合は現在のスレッド」を、そうでない場合は「DefaultBlockContextというblockOnメソッドの中が何もせず中身を評価するだけのメソッドになっているBlockContextを返す」ようになっています。

これを見るとBlockContextThreadに継承させて使う物のようです。

ということは、blockOnが実際に何をやるかについて知りたい場合はここを見ても仕方がなさそうです。 IntelliJを使って気合で探すと ExecutionContextImpl 内で見つかります。*2

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L43-L54

def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
  override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
    var result: T = null.asInstanceOf[T]
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      @volatile var isdone = false
      override def block(): Boolean = {
        result = try thunk finally { isdone = true }
        true
      }
      override def isReleasable = isdone
    })
    result
  }
})

overrideしたblockOnでは色々やっていますがつまるところ ForkJoinPool.ManagedBlocker#blockでブロックするかもしれない処理を包んだ後にForkJoinPool.managedBlockというメソッドに渡しているようです。名前からしてblockする処理を渡すと何かマネージしてくれそうなオーラを感じます。

ManagedBlockerについて

ManagedBlockerについては上述した記事でも触れられていますが、 ForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 の「ForkJoinTaskの中で協調的に動作するSleepの実装例」が分かりやすかったです。

詳しい解説は元記事によりますが、isReleasableでまだブロックする必要があるかを判定し、終わっていなかったらblockメソッド呼んでくれるようです。

Sleepの例ではblockが何度も実行されるようになっていましたがExecutionContextImpl#newThreadの実装ではthunkの評価が終わるまでブロックする(そして終わり次第isReleasableがtrueになる)ので一度しか実行されないようです。

ManagedBlockerの内部実装は読んでいないのですが、 ForkJoinPool.ManagedBlocker (Java Platform SE 7 ) では

they allow more efficient internal handling of cases in which additional workers may be, but usually are not, needed to ensure sufficient parallelism

とあります。(usuallyとは・・。)ForkJoinPoolは分割統治アルゴリズムに向けたフレームワークということなのでDB操作でブロックするとかそういうことに使うのはusuallyから逸脱しているとかあるのでしょうか・・。どのくらいスレッドが増えてくれるのかは謎ですが、以下の記事にもスレッドが増えそうな言及がありました。

This will give the thread pool a chance to spawn new threads in order to prevent starvation

When informed that one of those threads is about to block, it compensates by starting an additional thread.

Welcome, ManagedBlocker — the way to signal to the ForkJoinPool that it should extend its parallelism, to compensate for potential blocked worker threads.

とりあえずどれだけ増えるか分からないけどManagedBlockerに渡せば必要なら増えてくれるという理解になりました。

ここまでのまとめ

ここまでの流れを整理すると以下のようになります。

  • blockingメソッドでブロックする処理を包むとThreadBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)
  • ExecutionContextImplではonBlock内でManagedBlockerにブロックする処理を渡すようなThreadを生成するような実装を行っている場所がどうやらあるらしい。
  • ManagedBlocker にブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。

ExecutionContextImplは何者なのか

ExecutionContextImplが出てきましたが、一部分しか見ていないのでもう少し詳しく見てみます。

ExecutionContextImplは名前の通りExecutionContextの元です。

実装上は以下のように利用されています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L119-L164

def global: ExecutionContextExecutor = Implicits.global

object Implicits {
  implicit lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
}

def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService =
  impl.ExecutionContextImpl.fromExecutorService(e, reporter)

def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService = fromExecutorService(e, defaultReporter)

def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor =
  impl.ExecutionContextImpl.fromExecutor(e, reporter)

import scala.concurrent.ExecutionContext.Implicits.globalで使えるようになるExecutionContextfromExecutornullを渡すことで生成されているようです。 fromExecutorServiceもありますが、これはJavaと同じようにExecutorにいくつかのメソッドが追加されたExecutorServiceが生成されるというだけで大きくは違わず、最終的にはExecutionContextImplがnewされるようです。(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L132-L150)

ExecutionContextImpl は渡されたes: Executorを以下のようにパターンマッチしてnullの場合に備えています。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L27-L30

val executor: Executor = es match {
  case null => createExecutorService
  case some => some
}

Implicits.globalglobalの際はnullが渡されるので、このcreateExecutorService が呼ばれるようです。(試してないですが、ExecutionContext.fromExecutor(null)とかすると標準のと同じExecutionContextが作れそうです。)

createExecutorServiceは以下のようなっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L74-L97

val threadFactory = new DefaultThreadFactory(daemonic = true)

try {
  new ForkJoinPool(
    desiredParallelism,
    threadFactory,
    uncaughtExceptionHandler,
    true) // Async all the way baby
} catch {...}

ついにForkJoinPoolが生成されました。そしてForkJoinPoolにはデフォルトの並列数*3と、DefaultThreadFactoryが渡されています。uncaughtExceptionHandlerは名前のとおりです。最後のasyncModetrueにすると通常スタックとして積まれるタスクがキューのようにFIFOになるようです。

asyncModeについての余談

playのconfigについてのドキュメントにこの設定について書いてありました。ThreadPools - 2.4.x

# Setting this to LIFO changes the fork-join-executor
# to use a stack discipline for task scheduling. This usually
# improves throughput at the cost of possibly increasing
# latency and risking task starvation (which should be rare).
task-peeking-mode = LIFO

この設定は正確にはakkaのdispatcherの設定です。akkaではtask-peeking-modeというキーでasyncModeが決められています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L443-L448

val asyncMode = config.getString("task-peeking-mode") match {
  case "FIFO"true
  case "LIFO"false
  case unsupported ⇒ throw new IllegalArgumentException("Cannot instantiate ForkJoinExecutorServiceFactory. " +
  """"task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
}

LIFOにすると(少なくともplayでは)レイテンシが悪化してスループットが向上するようです。ちなみにakkaでは何も指定しないとFIFOになるようです。(https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/resources/reference.conf#L351)

DefaultThreadFactory

impl.ExecutionContextImpl.fromExecutornullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryの実装を見てみます。これはExecutionContextImplの内部クラスとして定義されています。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L33-L56

class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
  def wire[T <: Thread](thread: T): T = {
    thread.setDaemon(daemonic)
    thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
    thread
  }

def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))

def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
  override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
  var result: T = null.asInstanceOf[T]
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      @volatile var isdone = false
      override def block(): Boolean = {
        result = try thunk finally { isdone = true }
        true
      }
      override def isReleasable = isdone
    })
    result
}

何か見覚えがあるなと思ったら、このDefaultThreadFactoryがまさに先ほどIntelliJを使って気合で探してきたBlockContextを使っている部分でした。

ここまでのまとめ

ここまでの流れを整理すると以下のようになります。

  • blockingメソッドでブロックする処理を包むとThreadBlockContextを継承していればblockOnによって何らかの処理が行われる。(継承していない場合は何もしないblockOnが呼ばれる)
  • impl.ExecutionContextImpl.fromExecutornullを渡した時にしか呼ばれないcreateExecutorServiceの中で生成されているDefaultThreadFactoryではonBlock内でManagedBlockerにブロックする処理を渡すようなBlockContextを継承したForkJoinWorkerThreadを生成するThreadFactoryを定義してForkJoinPoolに渡している。
  • scala.concurrent.ExecutionContext.Implicit.globalで取得できるExecutionContextimpl.ExecutionContextImpl.fromExecutornullを渡した結果生成された物で。
  • ManagedBlocker にブロックする処理を渡すとForkJoinPoolのスケジューラがスレッド数を(必要なら)増やしてくれる。

ForkJoinPoolとfromExecutorService

ここまででglobalExecutionContextだとBlockContextが継承されたThreadが生成されて、blockingが良い感じになるということがわかりましたが、 ExecutionContext.fromExecutorService(new ForkJoinPool(100))とした場合はどうなるのか気になったので調べてみました。

scalaリポジトリ内のForkJoinPoo.java *4 を見ると以下のようになっています。(長いので抜粋) https://github.com/scala/scala/blob/v2.11.7/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java

// L1403~1404
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

// L881~886
static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
  public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
  }
}

// L3678~3747
static {
  defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
}

// L2838~2939
public ForkJoinPool(int parallelism) {
  this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

//

これを見るとForkJoinPoolForkJoinWorkerThreadFactoryを渡さない時にデフォルトで指定されるDefaultForkJoinWorkerThreadFactoryBlockContextを継承しないForkJoinWorkerThreadを生成するようです。(そもそもBlockContextscalaの物なのでJavaのクラスがデフォルトで継承してないのは当たり前な気がしてきました。)

ということはExecutionContext.fromExecutorService(new ForkJoinPool(100))のように指定してしまうとblockingで包んでも何もしてくれないDefaultBlockContextで処理されることになりそうです。

globalでないExecutionContextを使う際は予めブロックすることを前提にスレッドを多めに作ったりするものの少し注意が必要なようです。(動的にスレッドを増やすより、予め作っておいたほうが早く処理できると思うのでblockingで包まない戦略もある程度ありな気がしています)

akkaのdispatcher

akkaのDispatcherExecutionContextとして使えるのですが、そちらを使えば自分でForkJoinWorkerThreadFactoryを定義しなくても済みそうです。

akkaのThreadFactoryMonitorableThreadFactoryになっていてこれが、AkkaForkJoinWorkerThreadを生成しています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L183-L188

final case class MonitorableThreadFactory(name: String, daemonic: Boolean, contextClassLoader: Option[ClassLoader], exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing, protected val counter: AtomicLong = new AtomicLong) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
  def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
    val t = wire(new MonitorableThreadFactory.AkkaForkJoinWorkerThread(pool))
    // Name of the threads for the ForkJoinPool are not customizable. Change it here.
    t.setName(name + "-" + counter.incrementAndGet())
    t
  }

そしてAkkaForkJoinWorkerThreadBlockContextを継承しています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala#L161-L174

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
  override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
    val result = new AtomicReference[Option[T]](None)
    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
      def block(): Boolean = {
        result.set(Some(thunk))
        true
      }
      def isReleasable = result.get.isDefined
    })
    result.get.get // Exception intended if None
  }
}

blockOnの中身はExecutioContext.globalで生成されるものとそんなに変わらなさそうです。

MonitorableThreadFactoryActorSystemImplで定義されて、Dispatchersに渡されています。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L572-L573 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/actor/ActorSystem.scala#L642-L643

final val threadFactory: MonitorableThreadFactory =
  MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)

val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
  threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))

ActorSystem.applyからdispatchers経由でAkkaForkJoinPoolを生成しつつ指定がなければAkkaForkJoinPoolが生成されるようです。 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L321 https://github.com/akka/akka/blob/v2.4.2/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala#L380-L396

// 断片のみ
executor match {
      case null | "" | "fork-join-executor"new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
      case "thread-pool-executor"new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
}
final class AkkaForkJoinPool(parallelism: Int,
                             threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
                             unhandledExceptionHandler: Thread.UncaughtExceptionHandler,
                             asyncMode: Boolean)
  extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, asyncMode) with LoadMetrics {
  def this(parallelism: Int,
           threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
           unhandledExceptionHandler: Thread.UncaughtExceptionHandler) = this(parallelism, threadFactory, unhandledExceptionHandler, asyncMode = true)

  override def execute(r: Runnable): Unit =
    if (r ne null)
      super.execute((if (r.isInstanceOf[ForkJoinTask[_]]) r else new AkkaForkJoinTask(r)).asInstanceOf[ForkJoinTask[Any]])
    else
      throw new NullPointerException("Runnable was null")

  def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
}

この中間部分も調べたのですが、この記事の分量が倍になるくらい色々経由していたので割愛します(;´Д`)

結論としては、akkaのdispatcherfork-join-executorを指定しておけば、blockingが使えるExecutionContextが取得できそうです。

おまけ: scala.2.12.xでのDefaultThreadFactoryの改善

akkaのDispatcherを使わなくてもForkJoinPoolのコンストラクタにExecutionContextImpl.DefaultThreadFactoryと同じものを渡せば問題はないはずです。 注意点としてscala2.12.0-M3のコードを見ると以下の様な注意書きが見つかります。 https://github.com/scala/scala/blob/v2.12.0-M3/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L78-L79

// When we block, switch out the BlockContext temporarily so that nested blocking does not created N new Threads
BlockContext.withBlockContext(BlockContext.defaultBlockContext) { thunk }

どうもscala2.11.xのコードではblockingをネストさせるとスケジューラにヒントが行き過ぎてしまう問題があるようです。コピペして使う際には注意が必要そうです。

おまけ: Future.applyとExecutionContext

よく考えたらFutureExecutionContextがどうやって使われてるのかよく知らなかったので調べてみました。

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/Future.scala#L492

  def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)

https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/Future.scala#L18-L34

class PromiseCompletingRunnable[T](body: => T) extends Runnable {
  val promise = new Promise.DefaultPromise[T]()

  override def run() = {
    promise complete {
      try Success(body) catch { case NonFatal(e) => Failure(e) }
    }
  }
}

def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
  val runnable = new PromiseCompletingRunnable(body)
  executor.prepare.execute(runnable)
  runnable.promise.future
}

コードを見る限りexecuteが呼ばれているようです。*5

executeメソッドはExecutionContextImplを見てみると以下の様な実装になっていました。(DefaultThreadFactoryではないのでfromExecutorService非nullを渡してもこのメソッドが呼ばれます) https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L99-L110

def execute(runnable: Runnable): Unit = executor match {
  case fj: ForkJoinPool =>
    val fjt: ForkJoinTask[_] = runnable match {
      case t: ForkJoinTask[_] => t
      case r                  => new ExecutionContextImpl.AdaptedForkJoinTask(r)
    }
    Thread.currentThread match {
      case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork()
      case _                                              => fj execute fjt
    }
  case generic => generic execute runnable
}

AdaptedForkJoinTaskは大雑把にはrunnableForkJoinTaskでラップするような実装です。 https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/impl/ExecutionContextImpl.scala#L118-L130

全体としてはプールがForkJoinPoolならrunnableをとりあえずForkJoinTaskにしてfork()を呼ぶという感じになっているようです。

joinを明示的に行っている部分は見つからなかったので、そういったwork-stealな効果は使っていないのか別の部分で使っているのかは不明です。特にFuturePromise の実装をほとんど見てないのでこの部分は適当です。

感想

以上です。結論は一番上に書いてあります。 akkaのdispatcherもそれなりに追ったのですがコードがわりと飛んででいるので紹介するのは厳しさを感じ断念しました。(コンフィグファイル越しにかなり柔軟に設定できるので仕方ない気がします) コードリーディングの結果をブログに書こうとすると死ぬけど、何かしら読んだメモは残したいしどうしようみたいな気持ちになった。(小学生以下の感想)

*1:かぎかっこ内の表現はForkJoinPoolとForkJoinTaskの使い方とブロッキングの実装方法 - seraphyの日記 を意識しました

*2:BatchingExecutorでも見つかる(https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/BatchingExecutor.scala#L47-L99)のですが、ForkJoinPoolの話とは少しずれてしまうので割愛します

*3:Runtime.availableProcessorsによって得られる論理コア数と同じ数になるようです。http://docs.scala-lang.org/overviews/core/futures.html

*4:scala 2.11.xではJava8以前の処理系にも対応するためにforkjoinパッケージを自前で保持していますが、2.12.xではJava8を前提とするためこのコミットで依存が取り除かれているようです

*5:executor.prepareメソッドについても調べたのですが、https://github.com/scala/scala/blob/v2.11.7/src/library/scala/concurrent/ExecutionContext.scala#L75-L89のコメントがなるほど分からんという感じだったのでスルーしています

Actorの初期化をpreStartでやるべきかプライマリコンストラクタでやるべきかは公式ドキュメントに書いてある - akka

akka2.3.13です。

actorの初期化について迷ったのですが公式ドキュメントに書いてあったのでまとめます。

モチベ

Actorのフィールドでなんらかの値を初期化する際に以下のパターンが考えられます。 *1

var foo = _ // varで宣言

override preStart() {
  super.preStart()
  foo = initFoo // ここで初期化
}
val foo = initFoo // valで宣言&初期化

override preStart() {
  super.preStart()
}

最初はアクターのインスタンスが使いまわされるならrestartのときにコンストラクタが呼ばれないこともありそう・・?みたいな事を思って var foo 方式にしていましたがnull safeじゃない点が気になったので簡単に検証しました。

検証

gist.github.com

実行したログをみると、どうもrestartされた際にはプライマリコンストラクタもpreStartもどちらも呼ばれているようです。

公式に書いてあったこと

職場の先輩に公式に書いてあることを教えてもらったので読んでみました。

Actors — Akka Documentation

によると毎回必ず呼ばれるのはむしろプライマリコンストラクタの方で、コードの書き方によってはpreStartの方は呼ばれないこともあるようです。 だいたい意訳な感じのTweetをしたのでご参考まで

*1:公式ドキュメントには初期化メッセージを送る方式もありますがここでは省略します。

Learning Akkaを読んだ

www.amazon.co.jp を読みました。(2015/12/24発売)

全体を通してJava8とScala版のコードが併記されているので賞味としてはもう少し短くなりそうです。

この本では、akkaを使った並行処理だけでなくakka-remoteを使った分散システムの作成にも取り組みます。 ただ分散システムの難しいところに触れるというよりは、さらっとakka-remote, akka-clusterの使い方を触れるという感じになるので 初めてakka使う人でも楽に読めるはずです。

簡単な所からはじめつつ、

みたいなakka周辺(並行処理・分散システムなど?)を勉強するなら当然抑えておくべき基本的なトピックを体系的に抑えてくれる内容になっています。

akkaの話も地に足がついている感じで初歩的ですが抑えておくべき部分が体系的に学べる内容になっています。

例1: 最初はaskで書いてみて、askはタイムアウトがあるけどタイムアウトしたときのスタックトレースってsystem.schedulerの情報が出るだけでデバッグが難しい。 しかもreplyするときに sender() が原因でハマるとつらいよね。なのでtellで書いてみましょう。 tellバージョンでタイムアウトが欲しいときは使い捨てアクター作りましょう。オーバーヘッドが気になる?askも中で使い捨てアクターつくってるから遅くはならないですよ:)

例2: コンストラクタにDB接続情報を渡して、すぐにDBに接続させたいときはプライマリコンストラクタやpreStartでやるとActorInitializationExceptionが出てしまうので そういうことをしたいときはpreStartで自分にメッセージ投げるとよいです。(初期化情報はコンストラクタに渡さないとrestartしたときに消えてしまいます。なので外側から初期化メッセージを初期化情報と共に送るやり方はうまくいきません)

例3: DBへのリクエストなどのブロッキング処理やCPU負荷が高くなりうる場所では(ExecutionContextを分けるように)Dispatcherを分けましょう。基本はForkJoinを使うdefault dispatcherで良いです。ちなみにTestActorRefで同期テストが出来るのは、dispatcherがCallingThreadDispatcherになるからです。actorのdispatcherは他にもあって例えばBalancingDispatcherにすると全アクターでメールボックスを共有するからワーカーのロードバランシングに良いです。(ただリモートでは使えません。)

みたいな感じで、簡単なサンプル -> 問題点 -> 改善&なぜそうすべきか・akkaの動作の仕組みが簡潔に書かれています。 この辺もきっちり簡単なところから抑えていってくれるので基礎固めにはよさそうです。

またサンプルコードにテストがちゃんと付いてるのでテスト書きたいときは参考にできそうです。(actorsystem.shutdownが適切に呼ばれてなかったりするので鵜呑みは危険ですが・・・。)

テストの件もそうなんですが、サンプルコードもScalaなのにJava版のコードに似せたのかScalaっぽくないというかakkaのJavaAPIを呼んでたりするところがあったりしてうーん?というところがあったり、postStart(多分preStartの間違い?)を使いましょうと書いてあって、完全にはオススメしきれない微妙な気持ち・・。

注意点ですが、kindle, epub版は本文中のコードが改行すら壊滅的に崩れおちているので電子版ならPDFを入手するようにしたほうが良さそうです。 amazonのレビューにkindle版はコードがやばいとあったので、Learning Akka | PACKT Booksで買ったのですが、 epub版もダメでした。PDF版は普通に読めるのでPDF版を読むのが良いと思います。(packtpubでpdf版も入手できます)

処理が止まってしまったactorはKILLできる・・? - akka

注:どうなんだろうというところで終わっているので結論はありません。

akka 2.3.13です。

Actorの中で使ってるライブラリがデッドロックしてる?みたいなケースに遭遇したので雑に調査しました。 とりあえず少なくとも以下の様な状況でKillしようとしても止まらない感じがしました。

gist.github.com

Thread.sleepでデッドロックとかいうのもなんなのでホントはどうなのか若干曖昧です(◞‸◟) そもそもアクターだしデッドロックさせるな的な話とかあって、うーんと思いましたが、そこまで気になるものでもなかったので一旦保留・・・。 Futureでつつんでタイムアウトとかそういう感じが良いですかね。

参考