結論
initialize()
https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L511-L522 をFSMのコンストラクタの最後(または適切なライフサイクルメソッド内)で呼びましょう。
ActorのreceiveTimeout
akka actorにはreceiveTimeoutという一定時間メッセージが来なかったらReceiveTimeout
メッセージを送るという便利機能があります。
これを使うと、返信が一定時間なかったらもう一度メッセージを送るといったリトライ処理が書けるようになります。
http://doc.akka.io/docs/akka/2.4/scala/actors.html#Receive_timeout
class FooActor extends Actor { context.setReceiveTimeout(2.seconds) def receive = { case ReceiveTimeout => println("timeout") context.setReceiveTimeout(Duration.Undefined) // Undefinedにすると解除できる case x => println(x) } }
FSMのstateTimeout
そしてFSMにはstateTimeoutという、その状態になってから一定時間メッセージが来なかったらタイムアウトする機能があります。
object FsmMain extends App { val system = ActorSystem("MyActorSystem") val bar = system.actorOf(Props[Bar]) } // `initialize()`を呼び忘れているのでこのままでは動かない(後述) class Bar extends FSM[Int, Unit] { startWith(0, (), timeout = Some(2.seconds)) // whenで指定したtimeoutを上書きできる when(0, stateTimeout = 3.seconds) { case Event(StateTimeout, _) => println("timeout") stay() case Event(x, _) => println(x) stay() forMax(4.seconds) // whenで指定したtimeoutを上書きできる } }
また、stateTimeoutの値は when
で登録する以外にも startWith
の引数で startWith(state, data, timeout = Some(2.seconds)
としたり、状態遷移で goto(...) forMax(3.seconds)
のようにforMaxを指定することで上書きすることができます。
この機能により「リクエストを送り、最大3秒間Response待ち状態になる。タイムアウトしたらリトライを行うかリクエストに対してエラーを返す」といった処理が書きやすくなります。
なお、FSM自体もActorなので引き続き前述のsetReceiveTimeout
を利用することが出来ます。
その場合はwhen
の中に case Event(ReceiveTimeout, _) =>
を記述すれば取得することが出来ます。(個人的には一緒に使うとややこしいことになりそうなのでやめておきたい気持ちがあるような・・・)
FSM stateTimeoutのtimer起動のタイミング
で、この機能を使うかーと思いTimeoutメッセージが届くか実際に実験をすると、FSM作成直後や再起動直後にtimerが作動しておらず、timeoutメッセージが届かない症状に出くわしました。
例えば一つ上のFsmMain
とBar
の例を実行すると、永久に println("timeout")
を通過することはありません。この状態でstateTimeoutさせるためには一つ以上のメッセージを送る必要がありました。
timeoutのスケジュールはreceiveの中で呼ばれるmakeTransitionの中 https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L699-L705 で行われています。
ということは初回はmakeTransitionが呼ばれないのか・・?おかしいなー、おかしいなー、と思ったのですが、よくよく考えるとFSMのコンストラクタでinitialize()
を呼び忘れていました 😨
initialize()
は https://github.com/akka/akka/blob/v2.4.13/akka-actor/src/main/scala/akka/actor/FSM.scala#L511-L522 で定義されています。実装的にはほとんど makeTransition
を呼び出すだけです。
ドキュメント http://doc.akka.io/docs/akka/2.4/scala/fsm.html#A_Simple_Example にも以下のようにしっかり書いてあります。
finally starting it up using initialize, which performs the transition into the initial state and sets up timers (if required).
これを呼び出すとしっかりとtimeoutされだすようになりました。
// 修正版 class Bar extends FSM[Int, Unit] { startWith(0, (), timeout = Some(2.seconds)) when(0, stateTimeout = 3.seconds) { case Event(StateTimeout, _) => println("timeout") stay() case Event(x, _) => println(x) stay() forMax(4.seconds) } initialize() }
めでたしめでたし。
FSMだと状態がwhenで初めて登録されるので、登録されるまでmakeTransitionできないためこのような仕様になっている・・んでしょうか?
initialize()
は忘れていても気づきにくい(特に最初はタイムアウト使って無くて後から入れる場合など)ので影響がありそうなら気をつけましょう(´・_・`)