概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。
まずはbuild.sbtです。
name := "akka-scala-seed" version := "1.0" scalaVersion := "2.11.8" val akkaVersion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-contrib" % akkaVersion )
build.sbt終わり。
さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。
scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように 以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。
override def preRestart(reason: Throwable, message: Option[Any]) { message.foreach(self ! x) }
例えば以下の様な例では、2,3,4,5,1
のように出力順がひっくり返ってしまいます。
package order import akka.actor.{Actor, ActorSystem, Props} import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ object OrderMain extends App { implicit val system = ActorSystem() implicit val executionContext = system.dispatcher implicit val timeout = Timeout.durationToTimeout(3.seconds) val actor = system.actorOf(PreRestarter.props(new Connection)) (1 to 5).foreach(actor ! _) Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf) } class PreRestarter(connection: Connection) extends Actor { override def receive = { case "finished" => sender ! "finished" case x: Int => connection.doJob(x) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { super.preRestart(reason, message) if (!connection.isConnected) { connection.connect() } message.foreach(self ! _) } } object PreRestarter { def props(connection: Connection) = Props(classOf[PreRestarter], connection) } class Connection { var isConnected = false def connect() = { println("connected") isConnected = true } def close() = { println("closed") isConnected = false } def doJob(x: Int) = { if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x") println(s"the answer is $x") } }
大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。
そんなときのパターンとして、 childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。
package order import akka.actor.SupervisorStrategy.{Restart, Stop} import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props} import order.JobSupervisor2.{AckAndNext, NackAndRetry} import scala.collection.immutable.Queue import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.util.Random object OrderMain2 extends App { implicit val system = ActorSystem() val actor = system.actorOf(Props[RootActor2], "root") (1 to 10).foreach(actor ! _) Thread.sleep(300) (11 to 20).foreach(actor ! _) actor ! "AllJobProduced" Await.result(system.whenTerminated, Duration.Inf) } class RootActor2 extends Actor { val actor = context.actorOf(JobSupervisor2.props, "manager") override def receive = { case "AllJobFinished" => context.system.terminate case x => actor forward x } } class JobSupervisor2 extends Actor { override val supervisorStrategy = OneForOneStrategy() { case _: Exception => self ! NackAndRetry Restart case _ => Stop } val worker = context.actorOf(OneWorker2.props(new Connection2), "worker") var jobs = Queue[Int]() var isAllJobProduced = false // たかだか一つのjobをworkerに与える override def receive = { case "AllJobProduced" => isAllJobProduced = true finishCheck() case x: Int => // jobが無いとackの時に次が渡せないのでここで補填 if (jobs.isEmpty) { worker ! x } addJob(x: Int) case AckAndNext => removeOldJob() produceNext() finishCheck() case NackAndRetry => produceNext() } private def addJob(x: Int) = { jobs = jobs.enqueue(x) } private def removeOldJob() = { jobs = jobs.dequeue._2 } private def produceNext() = jobs.headOption.foreach(worker ! _) private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished" } object JobSupervisor2 { def props = Props[JobSupervisor2] case object AckAndNext case object NackAndRetry } class OneWorker2(connection: Connection2) extends Actor { override def receive = { case x: Int => connection.doJob(x) sender ! AckAndNext } override def preStart(): Unit = { super.preStart() if (!connection.isConnected) { connection.connect() } } } object OneWorker2 { def props(connection: Connection2) = Props(classOf[OneWorker2], connection) } class Connection2 { var isConnected = false def connect() = isConnected = true def close() = isConnected = false def doJob(x: Int) = { if (!isConnected) { throw new RuntimeException(s"Connection already closed. message = $x") } if (x % 5 == Random.nextInt(3)) { isConnected = false throw new RuntimeException(s"unlucky number $x. connection was reset") } println(s"the answer is $x") } }
x % 5 == Random.nextInt(3)
が成立した時connectionが切断されてdoJobに失敗するようにしました。
この実装だと1~20まで順番通りに表示されます。
余談
今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。
MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int
で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。
余談終わり
そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・)
それが akka.contrib.mailbox.PeekMailboxType
です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)
これを使って書き直すと以下のようになります。
package order import akka.actor.{Actor, ActorSystem, Props} import akka.contrib.mailbox.PeekMailboxExtension import com.typesafe.config.ConfigFactory import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.util.Random object OrderMain3 extends App { implicit val system = ActorSystem("MySystem", ConfigFactory.parseString( """ peek-dispatcher { mailbox-type = "akka.contrib.mailbox.PeekMailboxType" max-retries = 2 } """)) val actor = system.actorOf(Props[RootActor3], "root") (1 to 10).foreach(actor ! _) Thread.sleep(300) (11 to 20).foreach(actor ! _) actor ! "AllJobProduced" Await.result(system.whenTerminated, Duration.Inf) } class RootActor3 extends Actor { val actor = context.actorOf(JobSupervisor3.props, "manager") override def receive = { case "AllJobFinished" => context.system.terminate case x => actor forward x } } class JobSupervisor3 extends Actor { val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker") var isAllJobProduced = false override def receive = { case "AllJobProduced" => isAllJobProduced = true finishCheck() case x: Int => worker ! x finishCheck() } private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished" } object JobSupervisor3 { def props = Props[JobSupervisor3] } class OneWorker3(connection: Connection) extends Actor { override def receive = { case x: Int => connection.doJob(x) PeekMailboxExtension.ack() } override def preStart(): Unit = { super.preStart() if (!connection.isConnected) { connection.connect() } } } object OneWorker3 { def props(connection: Connection) = Props(classOf[OneWorker3], connection) } class Connection3 { var isConnected = false def connect() = isConnected = true def close() = isConnected = false def doJob(x: Int) = { if (!isConnected) { throw new RuntimeException(s"Connection already closed. message = $x") } if (x % 5 == Random.nextInt(3)) { isConnected = false throw new RuntimeException(s"unlucky number $x. connection was reset") } println(s"the answer is $x") } }
JobSupervisor3とかもういらないくらい簡単になりました。 キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。 しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。
*1:ってドキュメントに書いてありました