読者です 読者をやめる 読者になる 読者になる

akkaでメールボックスをクリアしつつリスタートしたかったけど何か微妙。

リスタートの時にメールボックスクリアしたくなったんですが、(厳密にはRestartではなくてStop => 新しいのをStartみたいなのでもOKなんですが・・)あんまりいい方法がなくてうーんって感じだったので検討をメモ書き程度に。

結論としてはstashするか、deadletterが出るのを我慢するか、unhandledを我慢するか、雑に無視するかのどれかが良いという感じでしたが、まだあんまりこれだ!という手が見つかっていないのでちょっと参考にならないかもしれません。

また、自分が扱ってる状況とサンプルがだいぶ違うのでモチベーションが伝わらないと思います。なんでこんなことやってるんだというツッコミは半分くらい例が悪いことによるものです(;´Д`) (上手い例が思いつかずコードを超多くするか、極端に端折るかの二択になってしまいました。。。)

考えるのに使ったコードは↓にあります。 https://github.com/matsu-chara/AkkaSandbox/tree/master/src/main/scala/clear

そもそも標準APIにないんだからやらない方が良いというようなことな気がしつつ、考えていきます。

ベースの例

いきなり長いですが、やってることは簡単で、1~30の数字をprintlnするだけです。 printするWorkerActorはFSMになっていてconnectメッセージを処理しないとメッセージをprintする処理が出来ないようになっています。(connectが来ていない状態でメッセージを受け取るとunhandledになる。) また、途中で10を受け取ると例外が投げられるようになっています。

実験のためwhenUnhandledでわざとstopを呼んでいるので、下の例を実行しても1~9までしか表示されません。 1~9まで表示 => 10で例外 => 11を処理しようとするがunhandledになりstopする => 以降、全部deadletterとなります。

package clear

import akka.actor.SupervisorStrategy._
import akka.actor._
import clear.WorkerActor.{Running, Starting, State}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object ClearRestart extends App {
  val system = ActorSystem("clear")
  val actor = system.actorOf(Props[SupervisorActor], "supervisor")

  try {
    Thread.sleep(200)
    (1 to 10).foreach(actor ! _)  // 10を受け取ると例外が出てworkerがrestartされる
    (11 to 20).foreach(actor ! _) // restart時のconnectより先に積まれるのでunhandled

    Thread.sleep(1000)
    (21 to 30).foreach(actor ! _) // 以前のjobは実行されなくてもいいので、unhandledにならないで欲しい。そして、準備が出来たらそこから先のジョブは実行して欲しい
  } finally {
    Thread.sleep(3000)
    Await.result(system.terminate(), Duration.Inf)
  }
}

class SupervisorActor extends Actor {
  private val actor = context.actorOf(Props[WorkerActor], "worker")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _: ActorInitializationException ⇒ Restart
    case _: ActorKilledException ⇒ Stop
    case _: DeathPactException ⇒ Stop
    case _: Exception ⇒ Restart
  }

  def receive: Receive = {
    case x => actor forward x
  }
}

class WorkerActor extends FSM[State, Int] {
  startWith(Starting, 0)

  self ! "connect"

  when(Starting) {
    case Event("connect", _) =>
      println("connect")
      goto(Running)
  }

  when(Running) {
    case Event(10, _) =>
      throw new RuntimeException("die")
      stay()
    case Event(x, _) =>
      Thread.sleep(100)
      println(x)
      stay()
  }

  whenUnhandled {
    case _ =>
      println("unhandledになったのでstopします")
      stop()
  }
}

object WorkerActor {
  sealed trait State
  case object Starting extends State
  case object Running extends State
}

これを例に、「死ぬ前に積まれたjobは実行されなくてもいいのでunhandledにならないで欲しい。そして、準備が出来たらそこから先のジョブは実行して欲しい」といった要望を満たすことを考えてみます。 実行結果としては、「1-9は表示」、「10-20は表示されなくても良い」、「21-30は表示」という状態を目指します。 以降はここからのdiffで見ていきます。

1. シンプルにunhandledを無視

WorkerActorのStarting状態でunhandledが出るのが嫌なので、無視するcase節を追加すればいい!という作戦です。

これで結構上手くいくんですが、無理やり無視しているので本来無視したくないものも無視してしまうようなリスクがあるような無いような・・。リスタートしてから前世のメールボックスの内容を一つ一つ無視して捨てるのではなく、もう少しスマートに、リスタート時にmailboxごと破棄できないのか?と思ったのが今回の発端です。

    when(Starting) {
      case Event("connect", _) =>
        println("connect")
        goto(Running)
+    case Event(x, _) =>
+      println(s"初期化中なので無視します $x") // 本当に無視して大丈夫なのか・・状態不整合が隠れていないか不安。
+      stay()
    }

2. stashしてみよう

前節とやっていることはあまり変わらないのですがstashでもやってみました。 こちらは無視されるのではなくunstash時に既存メッセージが戻ってきます。今回は無視したいんですがclearStashはprivate APIなので呼べずunstashするしかなさそうでした。(すごい調べたわけではないですが多分あってるはず・・・。)

unstashしなければ無限に溜め込めるというわけでもない(いずれStashOverFlowで例外が飛ぶ)のでstashだけ呼んでunstashは呼ばない作戦は悪手になりそうです。(厳密にはstashを使う頻度とアクターの寿命とかによりそうですが、そこに頼るのは危険そうです。)

   when(Starting) {
     case Event("connect", _) =>
       println("connect")
+      unstashAll() // 古いジョブを実行したい場合はこちら。unstashしない場合、clearする方法は無さそうなのでいずれStashOverFlowになる?
       goto(Running)
+    case Event(_, _) =>
+      println("stashします")
+      stash()
+      stay()
   }

3. stopしてstartする

一番目の例で書いたスマートな方法に近いイメージのものです。

supervisorではRestartではなくStopを指定します。WorkerActorをwatchしておき、Terminatedメッセージを受け取ったら新しくアクターを作成します。

これにより前世の記憶を消し去ることに成功しました。しかし、こうするとstopした瞬間に積まれていたメッセージがデッドレターになるため、それが許容できるかどうか?といった話が出てきそうです。普段デッドレターが出ないようなケースだと、共通処理でデッドレターを監視して検知とかやっているとちょっとめんどくさそうな。でも普通そこまでやらない気がします。デッドレター自体は普通に作っても出たりしますし・・。ということで大抵のケースでは許容できそうという思いがあります。

 class SupervisorActor extends Actor {
-  private val actor = context.actorOf(Props[WorkerActor], "worker")
+  private var actor = createWorker()
+
+  private def createWorker() = {
+    val a = context.actorOf(Props[WorkerActor], "worker")
+    context.watch(a)
+    a
+  }

   override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
     case _: ActorInitializationException ⇒ Restart
     case _: ActorKilledException ⇒ Stop
     case _: DeathPactException ⇒ Stop
-    case _: Exception ⇒ Restart
+    case _: Exception ⇒ Stop // 止めると未処理メッセージがdeadletterになる
   }

   def receive: Receive = {
+    case Terminated(ref) =>
+      actor = createWorker()
     case x => actor forward x
   }
 }

ちなみにデッドレターログは以下のようにして止めることが出来ます。ActorSystem全体で止めることになるので完全に止めるのは少し不安な気がします。(何かの異常でデッドレター出てる時に気づけなくなるため) ログに出しておいて気にしないようにするのが良さそうです。

設定値のintは、10だとデッドレターを10件までログに出す。20だと20件までログに出す。といった意味です。

-  val system = ActorSystem("clear")
+  val system = ActorSystem("clear", ConfigFactory.parseString("akka.log-dead-letters = 0")) // deadletter logをオフ(システムグローバルにオフだとちょっと不安・・)

4. restart時に頑張る

stop & startと仕組みはほぼ同じですが、中間にアクターを増やすことで「中間のアクターをリスタートさせた結果、WorkerActorがstop => startされる」というロジックでほぼ同じことが出来ます。context.watchし忘れを防止出来る一方で、登場人物が増えるというなんだかどっちもどっちな方法です。

未処理メッセージがデッドレターになるのはこちらも同様です。

 class SupervisorActor extends Actor {
-  private val actor = context.actorOf(Props[WorkerActor], "worker")
+  private val actor = context.actorOf(Props[WorkerVisorActor], "worker_visor")

   override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
     case _: ActorInitializationException ⇒ Restart
@@ -39,6 +40,19 @@
   }
 }

+class WorkerVisorActor extends Actor {
+  private val actor = context.actorOf(Props[WorkerActor], "worker")
+
+  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+    case _: Exception ⇒ Escalate // 孫はstopされるが、未処理のメッセージがdeadletterになってしまう。
+  }
+
+  def receive: Receive = {
+    case x =>
+      actor forward x
+  }
+}

5. もうunhandledで良くない?

ここまでくると一番最初に書いた単純に無視する案が一番良いように思えてきました。

そして無視するならwhenUnhandledでログ出してstayすれば同じじゃないかという気持ちに・・。(本来のコードではunhandledのときの処理をある程度共通化していて、そこはオーバーライドしたくないという動機があったのですが、それもそこまですごい重大な動機でもなく、ここまで頑張る必要もないな・・・という感じです)

とはいえFSMのStateがいくつかある場合、whenUnhandledでログだけ出してstayする方法ではアクターの全Stateでunhandledなメッセージが無視される一方で、最初に書いた無視案では特定のStateでだけunhandledなメッセージを無視する挙動を書くことができるのでwhenUnhandledではstopさせたりアラートを出したり出来るかもしれません。そう考えるとそこまで捨てたもんじゃない?という気持ちもあります。

デッドレターになるのも良さそうですが、無視するのに比べると若干実装めんどくさいような気もするので(とはいえ数行?)選びどころですかね。 一律で無視すると実装ミスで発生した意図しないメッセージまで無視してしまう可能性あるのでちゃんとやりたいならこっちのほうが良い気がしています。

6. カスタムdispatcher

custom dispatcherを自前で作ればできそうという書き込みをMLでみつけました。

https://groups.google.com/forum/?hl=en.#!searchin/akka-user/Clearing$20all$20mailboxes$20while$20restarting%7Csort:relevance/akka-user/3qJLNUTcLDc/wJxLx75orEEJ

試してはいないんですが、カスタムで作られたdispatcherをメンテしたくない気持ちがあります。(リンク内にもoverkillとありますし・・。) ただカスタムdispatcherってわりと普通に作る物なのかどうかよくわかってないので、その辺どうなんだろうと思いつつ今回は終わりです。