概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。
まずはbuild.sbtです。
name := "akka-scala-seed"
version := "1.0"
scalaVersion := "2.11.8"
val akkaVersion = "2.4.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-contrib" % akkaVersion
)
build.sbt終わり。
さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。
scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように
以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。
override def preRestart(reason: Throwable, message: Option[Any]) {
message.foreach(self ! x)
}
例えば以下の様な例では、2,3,4,5,1
のように出力順がひっくり返ってしまいます。
package order
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
object OrderMain extends App {
implicit val system = ActorSystem()
implicit val executionContext = system.dispatcher
implicit val timeout = Timeout.durationToTimeout(3.seconds)
val actor = system.actorOf(PreRestarter.props(new Connection))
(1 to 5).foreach(actor ! _)
Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf)
}
class PreRestarter(connection: Connection) extends Actor {
override def receive = {
case "finished" => sender ! "finished"
case x: Int => connection.doJob(x)
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
if (!connection.isConnected) {
connection.connect()
}
message.foreach(self ! _)
}
}
object PreRestarter {
def props(connection: Connection) = Props(classOf[PreRestarter], connection)
}
class Connection {
var isConnected = false
def connect() = {
println("connected")
isConnected = true
}
def close() = {
println("closed")
isConnected = false
}
def doJob(x: Int) = {
if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x")
println(s"the answer is $x")
}
}
大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。
そんなときのパターンとして、
childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。
package order
import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import order.JobSupervisor2.{AckAndNext, NackAndRetry}
import scala.collection.immutable.Queue
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random
object OrderMain2 extends App {
implicit val system = ActorSystem()
val actor = system.actorOf(Props[RootActor2], "root")
(1 to 10).foreach(actor ! _)
Thread.sleep(300)
(11 to 20).foreach(actor ! _)
actor ! "AllJobProduced"
Await.result(system.whenTerminated, Duration.Inf)
}
class RootActor2 extends Actor {
val actor = context.actorOf(JobSupervisor2.props, "manager")
override def receive = {
case "AllJobFinished" => context.system.terminate
case x => actor forward x
}
}
class JobSupervisor2 extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception =>
self ! NackAndRetry
Restart
case _ => Stop
}
val worker = context.actorOf(OneWorker2.props(new Connection2), "worker")
var jobs = Queue[Int]()
var isAllJobProduced = false
override def receive = {
case "AllJobProduced" =>
isAllJobProduced = true
finishCheck()
case x: Int =>
if (jobs.isEmpty) {
worker ! x
}
addJob(x: Int)
case AckAndNext =>
removeOldJob()
produceNext()
finishCheck()
case NackAndRetry => produceNext()
}
private def addJob(x: Int) = {
jobs = jobs.enqueue(x)
}
private def removeOldJob() = {
jobs = jobs.dequeue._2
}
private def produceNext() = jobs.headOption.foreach(worker ! _)
private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished"
}
object JobSupervisor2 {
def props = Props[JobSupervisor2]
case object AckAndNext
case object NackAndRetry
}
class OneWorker2(connection: Connection2) extends Actor {
override def receive = {
case x: Int =>
connection.doJob(x)
sender ! AckAndNext
}
override def preStart(): Unit = {
super.preStart()
if (!connection.isConnected) {
connection.connect()
}
}
}
object OneWorker2 {
def props(connection: Connection2) = Props(classOf[OneWorker2], connection)
}
class Connection2 {
var isConnected = false
def connect() = isConnected = true
def close() = isConnected = false
def doJob(x: Int) = {
if (!isConnected) {
throw new RuntimeException(s"Connection already closed. message = $x")
}
if (x % 5 == Random.nextInt(3)) {
isConnected = false
throw new RuntimeException(s"unlucky number $x. connection was reset")
}
println(s"the answer is $x")
}
}
x % 5 == Random.nextInt(3)
が成立した時connectionが切断されてdoJobに失敗するようにしました。
この実装だと1~20まで順番通りに表示されます。
余談
今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。
MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int
で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。
余談終わり
そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・)
それが akka.contrib.mailbox.PeekMailboxType
です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)
これを使って書き直すと以下のようになります。
package order
import akka.actor.{Actor, ActorSystem, Props}
import akka.contrib.mailbox.PeekMailboxExtension
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random
object OrderMain3 extends App {
implicit val system = ActorSystem("MySystem", ConfigFactory.parseString(
"""
peek-dispatcher {
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
max-retries = 2
}
"""))
val actor = system.actorOf(Props[RootActor3], "root")
(1 to 10).foreach(actor ! _)
Thread.sleep(300)
(11 to 20).foreach(actor ! _)
actor ! "AllJobProduced"
Await.result(system.whenTerminated, Duration.Inf)
}
class RootActor3 extends Actor {
val actor = context.actorOf(JobSupervisor3.props, "manager")
override def receive = {
case "AllJobFinished" => context.system.terminate
case x => actor forward x
}
}
class JobSupervisor3 extends Actor {
val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker")
var isAllJobProduced = false
override def receive = {
case "AllJobProduced" =>
isAllJobProduced = true
finishCheck()
case x: Int =>
worker ! x
finishCheck()
}
private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished"
}
object JobSupervisor3 {
def props = Props[JobSupervisor3]
}
class OneWorker3(connection: Connection) extends Actor {
override def receive = {
case x: Int =>
connection.doJob(x)
PeekMailboxExtension.ack()
}
override def preStart(): Unit = {
super.preStart()
if (!connection.isConnected) {
connection.connect()
}
}
}
object OneWorker3 {
def props(connection: Connection) = Props(classOf[OneWorker3], connection)
}
class Connection3 {
var isConnected = false
def connect() = isConnected = true
def close() = isConnected = false
def doJob(x: Int) = {
if (!isConnected) {
throw new RuntimeException(s"Connection already closed. message = $x")
}
if (x % 5 == Random.nextInt(3)) {
isConnected = false
throw new RuntimeException(s"unlucky number $x. connection was reset")
}
println(s"the answer is $x")
}
}
JobSupervisor3とかもういらないくらい簡単になりました。
キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。
しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。