読者です 読者をやめる 読者になる 読者になる

akkaで失敗したメッセージを順番通りにもう一度処理してもらうには

概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。

まずはbuild.sbtです。

name := "akka-scala-seed"

version := "1.0"

scalaVersion := "2.11.8"

val akkaVersion = "2.4.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-contrib" % akkaVersion
)

build.sbt終わり。

さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。

scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように 以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。

override def preRestart(reason: Throwable, message: Option[Any]) {
    message.foreach(self ! x)
}

例えば以下の様な例では、2,3,4,5,1 のように出力順がひっくり返ってしまいます。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration._

object OrderMain extends App {
  implicit val system = ActorSystem()
  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout.durationToTimeout(3.seconds)

  val actor = system.actorOf(PreRestarter.props(new Connection))
  (1 to 5).foreach(actor ! _)

  Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf)
}

class PreRestarter(connection: Connection) extends Actor {
  override def receive = {
    case "finished" => sender ! "finished"
    case x: Int => connection.doJob(x)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)

    if (!connection.isConnected) {
      connection.connect()
    }
    message.foreach(self ! _)
  }
}

object PreRestarter {
  def props(connection: Connection) = Props(classOf[PreRestarter], connection)
}

class Connection {
  var isConnected = false

  def connect() = {
    println("connected")
    isConnected = true
  }

  def close() = {
    println("closed")
    isConnected = false
  }

  def doJob(x: Int) = {
    if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x")

    println(s"the answer is $x")
  }
}

大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。

そんなときのパターンとして、 childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。

package order

import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import order.JobSupervisor2.{AckAndNext, NackAndRetry}

import scala.collection.immutable.Queue
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain2 extends App {
  implicit val system = ActorSystem()
  val actor = system.actorOf(Props[RootActor2], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor2 extends Actor {

  val actor = context.actorOf(JobSupervisor2.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor2 extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception =>
      self ! NackAndRetry
      Restart
    case _ => Stop
  }

  val worker = context.actorOf(OneWorker2.props(new Connection2), "worker")
  var jobs = Queue[Int]()
  var isAllJobProduced = false

  // たかだか一つのjobをworkerに与える
  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      // jobが無いとackの時に次が渡せないのでここで補填
      if (jobs.isEmpty) {
        worker ! x
      }
      addJob(x: Int)
    case AckAndNext =>
      removeOldJob()
      produceNext()
      finishCheck()
    case NackAndRetry => produceNext()
  }

  private def addJob(x: Int) = {
    jobs = jobs.enqueue(x)
  }

  private def removeOldJob() = {
    jobs = jobs.dequeue._2
  }

  private def produceNext() = jobs.headOption.foreach(worker ! _)
  private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished"

}

object JobSupervisor2 {
  def props = Props[JobSupervisor2]
  case object AckAndNext
  case object NackAndRetry
}

class OneWorker2(connection: Connection2) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      sender ! AckAndNext
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker2 {
  def props(connection: Connection2) = Props(classOf[OneWorker2], connection)
}

class Connection2 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

x % 5 == Random.nextInt(3) が成立した時connectionが切断されてdoJobに失敗するようにしました。 この実装だと1~20まで順番通りに表示されます。

余談

今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。 MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。

余談終わり

そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・) それが akka.contrib.mailbox.PeekMailboxType です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)

これを使って書き直すと以下のようになります。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.contrib.mailbox.PeekMailboxExtension
import com.typesafe.config.ConfigFactory

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain3 extends App {
  implicit val system = ActorSystem("MySystem", ConfigFactory.parseString(
    """
    peek-dispatcher {
      mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
      max-retries = 2
    }
    """))

  val actor = system.actorOf(Props[RootActor3], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor3 extends Actor {

  val actor = context.actorOf(JobSupervisor3.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor3 extends Actor {

  val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker")
  var isAllJobProduced = false

  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      worker ! x
      finishCheck()
  }

  private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished"

}

object JobSupervisor3 {
  def props = Props[JobSupervisor3]
}

class OneWorker3(connection: Connection) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      PeekMailboxExtension.ack()
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker3 {
  def props(connection: Connection) = Props(classOf[OneWorker3], connection)
}

class Connection3 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

JobSupervisor3とかもういらないくらい簡単になりました。 キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。 しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。

*1:ってドキュメントに書いてありました