読者です 読者をやめる 読者になる 読者になる

akka FSMでstateTimeoutが一度メッセージを受け取らないとスケジュールされないときはinitializeの呼び忘れかも

結論

initialize() https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L511-L522 をFSMのコンストラクタの最後(または適切なライフサイクルメソッド内)で呼びましょう。

ActorのreceiveTimeout

akka actorにはreceiveTimeoutという一定時間メッセージが来なかったらReceiveTimeoutメッセージを送るという便利機能があります。 これを使うと、返信が一定時間なかったらもう一度メッセージを送るといったリトライ処理が書けるようになります。

http://doc.akka.io/docs/akka/2.4/scala/actors.html#Receive_timeout

class FooActor extends Actor {
  context.setReceiveTimeout(2.seconds)

  def receive = {
    case ReceiveTimeout =>
      println("timeout")
      context.setReceiveTimeout(Duration.Undefined) // Undefinedにすると解除できる
    case x => println(x)
  }
}

FSMのstateTimeout

そしてFSMにはstateTimeoutという、その状態になってから一定時間メッセージが来なかったらタイムアウトする機能があります。

object FsmMain extends App {
  val system = ActorSystem("MyActorSystem")
  val bar = system.actorOf(Props[Bar])
}

// `initialize()`を呼び忘れているのでこのままでは動かない(後述)
class Bar extends FSM[Int, Unit] {
  startWith(0, (), timeout = Some(2.seconds)) // whenで指定したtimeoutを上書きできる

  when(0, stateTimeout = 3.seconds) {
    case Event(StateTimeout, _) =>
      println("timeout")
      stay()
    case Event(x, _) =>
      println(x)
      stay() forMax(4.seconds) // whenで指定したtimeoutを上書きできる
  }
}

また、stateTimeoutの値は when で登録する以外にも startWithの引数で startWith(state, data, timeout = Some(2.seconds)としたり、状態遷移で goto(...) forMax(3.seconds) のようにforMaxを指定することで上書きすることができます。 この機能により「リクエストを送り、最大3秒間Response待ち状態になる。タイムアウトしたらリトライを行うかリクエストに対してエラーを返す」といった処理が書きやすくなります。

なお、FSM自体もActorなので引き続き前述のsetReceiveTimeoutを利用することが出来ます。 その場合はwhenの中に case Event(ReceiveTimeout, _) => を記述すれば取得することが出来ます。(個人的には一緒に使うとややこしいことになりそうなのでやめておきたい気持ちがあるような・・・)

FSM stateTimeoutのtimer起動のタイミング

で、この機能を使うかーと思いTimeoutメッセージが届くか実際に実験をすると、FSM作成直後や再起動直後にtimerが作動しておらず、timeoutメッセージが届かない症状に出くわしました。 例えば一つ上のFsmMainBarの例を実行すると、永久に println("timeout") を通過することはありません。この状態でstateTimeoutさせるためには一つ以上のメッセージを送る必要がありました。

timeoutのスケジュールはreceiveの中で呼ばれるmakeTransitionの中 https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L699-L705 で行われています。 ということは初回はmakeTransitionが呼ばれないのか・・?おかしいなー、おかしいなー、と思ったのですが、よくよく考えるとFSMのコンストラクタでinitialize()を呼び忘れていました 😨

initialize()https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L511-L522 で定義されています。実装的にはほとんど makeTransitionを呼び出すだけです。 ドキュメント http://doc.akka.io/docs/akka/2.4/scala/fsm.html#A_Simple_Example にも以下のようにしっかり書いてあります。

finally starting it up using initialize, which performs the transition into the initial state and sets up timers (if required).

これを呼び出すとしっかりとtimeoutされだすようになりました。

// 修正版
class Bar extends FSM[Int, Unit] {
  startWith(0, (), timeout = Some(2.seconds))

  when(0, stateTimeout = 3.seconds) {
    case Event(StateTimeout, _) =>
      println("timeout")
      stay()
    case Event(x, _) =>
      println(x)
      stay() forMax(4.seconds)
  }

  initialize()
}

めでたしめでたし。

FSMだと状態がwhenで初めて登録されるので、登録されるまでmakeTransitionできないためこのような仕様になっている・・んでしょうか? initialize()は忘れていても気づきにくい(特に最初はタイムアウト使って無くて後から入れる場合など)ので影響がありそうなら気をつけましょう(´・_・`)