読者です 読者をやめる 読者になる 読者になる

pony runtimeメモ actor dispatch編 [ponylang]

pony runtime

ponyのランタイムについての論文を読んでいる中でActorの仕組みについての部分を見て現状の実装はどうなっているのか気になったので少しだけ見てみました。

論文は2013年ですが現時点(2016/3/5)での最新コミットのソースを見てみます。 *1

actor.h

まずはActorから https://github.com/ponylang/ponyc/tree/d888dab8bdcc4e02ce043f41cad62a9adcc88b5d/src/libponyrt/actor の辺のディレクトリです。

.hを見ると以下のようになっています、

typedef struct pony_actor_t
{
  pony_type_t* type;
  messageq_t q;
  pony_msg_t* continuation;
  uint8_t flags;

  // keep things accessed by other actors on a separate cache line
  __pony_spec_align__(heap_t heap, 64); // 52/104 bytes
  gc_t gc; // 44/80 bytes
} pony_actor_t;

pony_type_t

pony_type_t は実行時の型情報が入ってるみたいなコメントがありました。vtablesとかあるのでそういうことだと思います。

messageq_t, pony_msg_t

messageq_t は各actorが持っているメッセージキューのことです。 pony_msg_t* continuation というのが後続のメッセージになっています。

pony_spec_align, gc_t

キャッシュラインを分けるために何かスペーシングをしてるなどの細かい工夫がかいま見えます?(あんまり良くわかってないです。。。)

gc_tは文字通りgcに関する情報のようです。具体的には内部のオブジェクトのマップやアクターの参照のマップなどが入っていそうです。ponyのgc機構であるpony ORCAはactor内部のデータが外部からどのくらい参照されているかをactor内部でカウントとして持っておくような機構があるようなのでその辺のことをやってくれるはずです。

関数

actor.h内のその他の関数を見ると、actor_run, actor_destroyなどそれっぽい関数が並んでいます。

bool actor_run(pony_ctx_t* ctx, pony_actor_t* actor, size_t batch);
void actor_destroy(pony_actor_t* actor);
gc_t* actor_gc(pony_actor_t* actor);
heap_t* actor_heap(pony_actor_t* actor);
...

ちなみに論文では以下のようになっています

bool actor_run(actor_t* actor);
void actor_destroy(actor_t* actor, map_t* cycle);
void actor_sendv(actor_t* to, uint64_t id, int argc, arg_t* argv);
actor_t* actor_create(actor_type_t* type);
void pony_send(actor_t* to, uint64_t id);
void pony_sendv(actor_t* to, uint64_t id, int argc, arg_t* argv);
/* convenience functions are omitted */

sendやsendvはどこにいったのでしょうか・・とおもったらactor.cにいました。

ponyのschedulerにactor_runを呼ばれたら、アクターが処理を開始する仕組みになっているようです。

actor.c

論文Listing 2.2では actor_type_t として以下の構造が宣言されています。

typedef const struct actor_type_t
{
  trace_fn trace;
  message_type_fn message_type;
  dispatch_fn dispatch;
  actor_type_t;
}

ところがgithubの方ではversion0.1まで遡ってもありませんでした。 dispatchしているところをみると、これらの情報は先ほど軽くみたpony_type_t に含まれているようです。 ということで現行の実装を追ってみます。今回はactorの生成 => dispatch => destroyの流れを見てみます。ただしGC(destroyがいつ呼ばれるか)、schdulerに関しては別の処理が増えるので深追いはしないことにします。

actor creation

まずactorが生成されるところをザクっと見てみます。

pony_actor_t* pony_create(pony_ctx_t* ctx, pony_type_t* type)
{
  assert(type != NULL);

#ifdef USE_TELEMETRY
  ctx->count_alloc_actors++;
#endif

  // allocate variable sized actors correctly
  pony_actor_t* actor = (pony_actor_t*)pool_alloc_size(type->size);
  memset(actor, 0, type->size);
  actor->type = type;

  messageq_init(&actor->q);
  heap_init(&actor->heap);
  gc_done(&actor->gc);

  if(ctx->current != NULL)
  {
    // actors begin unblocked and referenced by the creating actor
    actor->gc.rc = GC_INC_MORE;
    gc_createactor(ctx->current, actor);
  } else {
    // no creator, so the actor isn't referenced by anything
    actor->gc.rc = 0;
  }

  return actor;
}

引数のpony_ctx_tscheduler_initすると生成されるようです。start.cという見るからにそれっぽいところで呼ばれています。 もう一つのpony_type_tは先ほどから何度か出ている型情報を入れる構造体です。

処理の内容としてはnullチェックしてcount増やしてメモリallocして〜という処理につづいて, messageq_init, heap_init によってmessage queueの初期化ヒープの初期化を行っています。 heapの方は次のgcを行うメモリ量が決められています。1 << 14 だと16KBほどでしょうか。heap_setinitialgcという関数で初期値を変えられるようです。この関数はstart.cで呼ばれていました。実行時引数か何かで渡せそうです。 次に、actorのリファレンスカウントを決めています。ctx->currentは現在処理中のアクターのはずなので実行中のアクターがいないとはこれいかに・・・ということでもう少し見てみます。

まずschedulerが初期化された瞬間のctx->currentはnullのような気がしますactor_runを読んでしまうとcurrentは上書きされてしまうのでそこまでのタイミングでアクターが作られている必要がありそうです。 gen_mainを見てみるとpony_initをした後すぐにctxを読み取っています。pony_initは先程のstart.cの関数です。pony_initの直後に呼ばれているcreate_main内では pony_createを呼んでいます。この時点ではまだctx->currentはnullになっていそうです。ということでmainアクターは// no creator, so the actor isn't referenced by anythingの処理に入るのではないかと思いました。

またpony_becomeをnullで呼び出してもctx->currentはnullになるようです。(これはpony_runtime外からpony APIを呼び出す用みたいなことがコメントに書いてあるのでCなどから呼ぶ時に色々出来るのでしょうか・・・)

dispatch

さてactorがメッセージを処理する際はschedulerがactor_runを読んでくれます。 コードは以下です。

bool actor_run(pony_ctx_t* ctx, pony_actor_t* actor, size_t batch)
{
  ctx->current = actor;

  pony_msg_t* msg;
  size_t app = 0;

  while(actor->continuation != NULL)
  {
    msg = actor->continuation;
    actor->continuation = msg->next;
    bool ret = handle_message(ctx, actor, msg);
    pool_free(msg->size, msg);

    if(ret)
    {
      // If we handle an application message, try to gc.
      app++;
      try_gc(ctx, actor);

      if(app == batch)
        return !has_flag(actor, FLAG_UNSCHEDULED);
    }
  }

  while((msg = messageq_pop(&actor->q)) != NULL)
  {
    if(handle_message(ctx, actor, msg))
    {
      // If we handle an application message, try to gc.
      app++;
      try_gc(ctx, actor);

      if(app == batch)
        return !has_flag(actor, FLAG_UNSCHEDULED);
    }
  }

  // We didn't hit our app message batch limit. We now believe our queue to be
  // empty, but we may have received further messages.
  assert(app < batch);
  try_gc(ctx, actor);

  if(has_flag(actor, FLAG_UNSCHEDULED))
  {
    // When unscheduling, don't mark the queue as empty, since we don't want
    // to get rescheduled if we receive a message.
    return false;
  }

  // If we have processed any application level messages, defer blocking.
  if(app > 0)
    return true;

  // Tell the cycle detector we are blocking. We may not actually block if a
  // message is received between now and when we try to mark our queue as
  // empty, but that's ok, we have still logically blocked.
  if(!has_flag(actor, FLAG_BLOCKED | FLAG_SYSTEM) ||
    has_flag(actor, FLAG_RC_CHANGED))
  {
    set_flag(actor, FLAG_BLOCKED);
    unset_flag(actor, FLAG_RC_CHANGED);
    cycle_block(ctx, actor, &actor->gc);
  }

  // Return true (i.e. reschedule immediately) if our queue isn't empty.
  return !messageq_markempty(&actor->q);
}

continuationから読み取ってhandle_messageを読んでいます。メッセージがなければmessageqからpopしています。handle_message以降はgcやscheduler関連のようなので割愛します。handle_messageではmsg->idをベースにswitch-caseで分岐しています。といってもdefault以外はGC系のシステムメッセージのようです。そして最後にtype->dispatchを呼んで終わりです。dispatchの中身は多分libponycなどで生成されている気がします。

destroy

さて最後のdestroyですが、いつ呼ばれるのかを気にしなければdestroyしていくだけなので簡単です。

void actor_destroy(pony_actor_t* actor)
{
  assert(has_flag(actor, FLAG_PENDINGDESTROY));

  // Make sure the actor being destroyed has finished marking its queue
  // as empty. Otherwise, it may spuriously see that tail and head are not
  // the same and fail to mark the queue as empty, resulting in it getting
  // rescheduled.
  pony_msg_t* head = _atomic_load(&actor->q.head);

  while(((uintptr_t)head & (uintptr_t)1) != (uintptr_t)1)
    head = _atomic_load(&actor->q.head);

  messageq_destroy(&actor->q);
  gc_destroy(&actor->gc);
  heap_destroy(&actor->heap);

  // Free variable sized actors correctly.
  pool_free_size(actor->type->size, actor);
}

まとめ

schedulerのワークスティーリングの部分とGCの部分はまだ読めてないですが、Actorの基本的な構造と振る舞いはわかってきた気がします。 またやるか分かりませんがコードリーディングにはちょうど良さそうなので読むのはやっていきたいなと思いました。

*1:0.2.1以降のリリースが途絶えて久しいんですが何か大きい機能を中で作っているのでしょうか・・。コミットも続いてるし、3月7~9日のどこかの日にUsing Pony for Fintechという挑戦的なタイトルでCTOの人が登壇するらしいのでプロジェクト自体はアクティブのようです。