akkaで失敗したメッセージを順番通りにもう一度処理してもらうには

概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。

まずはbuild.sbtです。

name := "akka-scala-seed"

version := "1.0"

scalaVersion := "2.11.8"

val akkaVersion = "2.4.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-contrib" % akkaVersion
)

build.sbt終わり。

さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。

scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように 以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。

override def preRestart(reason: Throwable, message: Option[Any]) {
    message.foreach(self ! x)
}

例えば以下の様な例では、2,3,4,5,1 のように出力順がひっくり返ってしまいます。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration._

object OrderMain extends App {
  implicit val system = ActorSystem()
  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout.durationToTimeout(3.seconds)

  val actor = system.actorOf(PreRestarter.props(new Connection))
  (1 to 5).foreach(actor ! _)

  Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf)
}

class PreRestarter(connection: Connection) extends Actor {
  override def receive = {
    case "finished" => sender ! "finished"
    case x: Int => connection.doJob(x)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)

    if (!connection.isConnected) {
      connection.connect()
    }
    message.foreach(self ! _)
  }
}

object PreRestarter {
  def props(connection: Connection) = Props(classOf[PreRestarter], connection)
}

class Connection {
  var isConnected = false

  def connect() = {
    println("connected")
    isConnected = true
  }

  def close() = {
    println("closed")
    isConnected = false
  }

  def doJob(x: Int) = {
    if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x")

    println(s"the answer is $x")
  }
}

大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。

そんなときのパターンとして、 childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。

package order

import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import order.JobSupervisor2.{AckAndNext, NackAndRetry}

import scala.collection.immutable.Queue
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain2 extends App {
  implicit val system = ActorSystem()
  val actor = system.actorOf(Props[RootActor2], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor2 extends Actor {

  val actor = context.actorOf(JobSupervisor2.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor2 extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception =>
      self ! NackAndRetry
      Restart
    case _ => Stop
  }

  val worker = context.actorOf(OneWorker2.props(new Connection2), "worker")
  var jobs = Queue[Int]()
  var isAllJobProduced = false

  // たかだか一つのjobをworkerに与える
  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      // jobが無いとackの時に次が渡せないのでここで補填
      if (jobs.isEmpty) {
        worker ! x
      }
      addJob(x: Int)
    case AckAndNext =>
      removeOldJob()
      produceNext()
      finishCheck()
    case NackAndRetry => produceNext()
  }

  private def addJob(x: Int) = {
    jobs = jobs.enqueue(x)
  }

  private def removeOldJob() = {
    jobs = jobs.dequeue._2
  }

  private def produceNext() = jobs.headOption.foreach(worker ! _)
  private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished"

}

object JobSupervisor2 {
  def props = Props[JobSupervisor2]
  case object AckAndNext
  case object NackAndRetry
}

class OneWorker2(connection: Connection2) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      sender ! AckAndNext
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker2 {
  def props(connection: Connection2) = Props(classOf[OneWorker2], connection)
}

class Connection2 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

x % 5 == Random.nextInt(3) が成立した時connectionが切断されてdoJobに失敗するようにしました。 この実装だと1~20まで順番通りに表示されます。

余談

今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。 MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。

余談終わり

そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・) それが akka.contrib.mailbox.PeekMailboxType です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)

これを使って書き直すと以下のようになります。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.contrib.mailbox.PeekMailboxExtension
import com.typesafe.config.ConfigFactory

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain3 extends App {
  implicit val system = ActorSystem("MySystem", ConfigFactory.parseString(
    """
    peek-dispatcher {
      mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
      max-retries = 2
    }
    """))

  val actor = system.actorOf(Props[RootActor3], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor3 extends Actor {

  val actor = context.actorOf(JobSupervisor3.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor3 extends Actor {

  val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker")
  var isAllJobProduced = false

  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      worker ! x
      finishCheck()
  }

  private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished"

}

object JobSupervisor3 {
  def props = Props[JobSupervisor3]
}

class OneWorker3(connection: Connection) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      PeekMailboxExtension.ack()
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker3 {
  def props(connection: Connection) = Props(classOf[OneWorker3], connection)
}

class Connection3 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

JobSupervisor3とかもういらないくらい簡単になりました。 キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。 しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。

*1:ってドキュメントに書いてありました

Kafkaの旧consumerでいうconsumer.idは新consumerだと何なのか

consumerが復数同時に同じpartitionにつないだときに、どのクライアントが担当するかを意図的に決定させるためにconsumer.idを設定していたんですが、新consumerでconsumer.id設定できるところがみつからないなーと思ったので、Kafkaコードを微妙に追いつつ探ってみました。備忘録がてらのメモ書きです。

どうも旧consumerでいうところのconsumer.idは新consumerでいうとclient.idらしいです。partition assignにはmemberIdが指定されるので厳密には違うっぽいですが、client.idを元に計算されるようです。(多分)

まずmemberIdは旧consumerのときは(consumer.idとして)自分から名乗っていたけど、新consumerでは最初は無名で、brokerに名付けられたら次からはその名前を名乗るようです。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  // if the member id is unknown, register the member to the group
  addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else ...

で、このmemberIdはどうやら以下のように決まるようです。

// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L588-L589

generateMemberIdSuffix は以下のように決まっています。

// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L170 randomにするかどうか迷っているようですが、予測可能だと勝手に偽物consumerが繋いできたりとかあるんでしょうか・・?

で、肝心のclientIdはというと handleJoinGroup の第三引数にありそうです。

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) { ... }

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L106-L108

このメソッドの呼び出し元をみると request.header.clientId とあるので、headerに入っていそうです。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L796

RequestHeaderのクラスを見るとどうも struct.set(CLIENT_ID_FIELD, client); とあるので ここで String client とされているものが client_id っぽいとみてよさそうです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java#L47-L61

headerはConsumerNetworkClientでつくられています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L101

ここでいう client.nextRequestHeader(api);clientは(ややこしいですが)KafkaClientのようです。そして実際には NetworkClient が使われているようです。

public class NetworkClient implements KafkaClient { ...

    public NetworkClient(Selectable selector,
                         Metadata metadata,
                         String clientId, ...) { ... }
    ...
}

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L91-L93

NetworkClientのインスタンス生成やclientIdの生成は以下で行われています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L554

これをみると、 clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); とあるので、 旧consumer.idのような役目はclient.Idとして設定可能になり、かつそのまま使われいてるわけではなく自分から名乗るのか・brokerが名付けるのかや、後ろにbrokerが何かsuffixを付けるのかなどが変更されていそうです。

partition assignment strategy周りも少し変更されていて、旧consumerだと、このAssignmentContextをつかっていますが、 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

新consumerではRangeAssignorといったAbstractPartitionAssignorが担当しているようです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L71-L72

新consumerの話に戻って、client.idのデフォルト値です。 デフォルト値は "" だが、無いと consumer-CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() になるようです。

staticだから同じプロセス内じゃないとclientId変わらないからクラスタ全体だとidentifiyできなさそうだけどドキュメントにもtracking用と書いてあるからいいんでしょうか・・。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L541

そして、ここまで読んだ後に以下のドキュメントを見つけました。

Kafka Client-side Assignment Proposal - Apache Kafka - Apache Software Foundation

  1. Does the consumer needs to provide the member-id through configs? Also is the consumerId in assign() the same as memberId? Currently the consumer-id / member-id is assigned at the coordinator side. The memberId is used exactly the same as consumerId. I just renamed it for the more general usage.

先に出てきて欲しかった(´;ω;`)

slackチャンネルのワードクラウドを生成するslackloudを作った

アイコンを変えました₍₍ (ง´・_・`)ว ⁾⁾

そして作りました。 GitHub - matsu-chara/slackloud

https://github.com/matsu-chara/slackloud/blob/master/example/example.png?raw=true

slackloudとは

slackのトークンさえあれば ./run.sh "#作成対象チャンネル" --post "#画像ポストチャンネル" でワードクラウドの生成、画像投稿までやってくれる便利なやつです。

もちろん会社スラックのワードクラウドを気軽にツイッターに挙げると情報漏えいになりかねないのでご注意ください( ◜◡‾)(サンプルは個人スラックからとってきました)

pythonでそれっぽいライブラリがあったのでやっつけで組み合わせました。

このへんの環境は全部dockerでまとめて、dockerさえあれば(pyenvやvirtualenv無しで) docker runで動かせるようにしました。 docker imageが数ギガあるのが厳しいところです。apt-get cleanくらいはしましたが、あんまり効果がなく・・。

docker history matsuchara/slackloud
IMAGE               CREATED             CREATED BY                                      SIZE                COMMENT
00451faa7bdb        33 minutes ago      /bin/sh -c #(nop)  ENTRYPOINT ["python3" "/ap   0 B
a2cfb84c4f5c        33 minutes ago      /bin/sh -c #(nop)  ENV PYTHONIOENCODING=utf-8   0 B
a5358f81c054        33 minutes ago      /bin/sh -c pip3 install -r requirements.txt     304.9 MB
5851c0ac5ac8        38 minutes ago      /bin/sh -c #(nop) COPY file:3d429e7d497f6ee60   98 B
f2e0d38a8f2a        38 minutes ago      /bin/sh -c apt-get clean && rm -rf /var/lib/a   0 B
c30f3e769c63        38 minutes ago      /bin/sh -c curl -s https://bootstrap.pypa.io/   12.7 MB
3255a7e853ef        38 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   195.9 MB
da904280048c        41 minutes ago      /bin/sh -c ./mecab-ipadic-neologd/bin/install   2.156 GB
22bfc03e65da        42 minutes ago      /bin/sh -c git clone --depth 1 https://github   102.4 MB
1625bd475980        43 minutes ago      /bin/sh -c #(nop)  WORKDIR /app                 0 B
e75ea79a36d8        43 minutes ago      /bin/sh -c update-ca-certificates               274.3 kB
46eaafa43a60        43 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   324.4 MB
ab7270f2e2e0        44 minutes ago      /bin/sh -c apt-get -y update                    21.88 MB
8d816e6fd6ac        45 minutes ago      /bin/sh -c #(nop)  MAINTAINER matsu_chara<mat   0 B
38c759202e30        3 weeks ago         /bin/sh -c #(nop) CMD ["/bin/bash"]             0 B
<missing>           3 weeks ago         /bin/sh -c sed -i 's/^#\s*\(deb.*universe\)$/   1.895 kB
<missing>           3 weeks ago         /bin/sh -c rm -rf /var/lib/apt/lists/*          0 B
<missing>           3 weeks ago         /bin/sh -c set -xe   && echo '#!/bin/sh' > /u   8.841 MB
<missing>           3 weeks ago

以上のようにイメージサイズを見ることが出来ますが、これをみると mecab-ipadic-neologd が大半のようです。mecab周辺はあまり知らないのですがmecabや辞書の役割を考えるとここの部分は仕方なさそうですね(;´Д`)

iterm2 version3.0でssh-host-colorが`44:52: syntax error: end of line~~~`みたいなエラー出すのを直した

細かいものを書いてブログを延命するエブリデイ₍₍ (ง´・_・`)ว ⁾⁾

前置き

iterm2 3.0のshell integrationが結構よくて、

  • 時間がかかるコマンドを実行しちゃった後に、あっ終わったら通知して欲しかった・・・ってときにもCmd-Opt-Aで後付アラートしかけられたり
  • 成功失敗が▷マークで表示されて、失敗されていた場合は右クリックでステータスコードが見れたり
  • ファイルドロップすると雑にscpでアップロードできたり
  • ファイル右クリックで雑にscpでダウンロードできたり

みたいなことが出来るようです。 Shell Integration - Documentation - iTerm2 - Mac OS Terminal Replacement

shell integrationは最初聞いた時うーんと思っていたんですが色々やってくれるのでよさそうです。特にアラートが良いです。 ステータスコード表示は自前でプロンプトに表示していたのから切り替えました。

ssh-host-color

iterm2 version3.0には接続先username@hostnameに応じてプロファイルを切り替える(ことで背景色を変える)ような機能があるんですが、ワイルドカード指定しかなくてどうも正規表現でマッチさせるみたいなことはできないようです。

なので、以前から使っていたssh-host-colorを続投することにしたんですが、sshしてexitすると毎回44:52: syntax error: end of line があるべきところですが identifier が見つかりました。 (-2741)と言われるので直しました。

gist.github.com

使い方は変わらずgistのコメントに従えばよい感じです。

markdown内のjsonコードブロックを拾ってjsonlintかけてくれる君を雑に作った

markdownjson codeblockにAPIレスポンスの仕様とか書いてるけど何か微妙にずれたりしていてワーってなるときに備えて作りました。 codeblockを抜き出してきてjsonlintを叩きまくる仕様です。 実装が雑だけど、困ってないしあんまり継続的にはメンテしないだろうなという気持ちを受けて experimental-markdown-json-lint という名前です。

GitHub - matsu-chara/experimental-markdown-json-lint

www.npmjs.com

以下の様なsample.mdに対して

this is a chapter

here text

aaa

{ "a": 1 }
<?php
echo 'ok';
{ "b: 'a' }
{ "b": 2 }
{ "x: 2 }

yeah

以下の様な結果を返します。

sample.md
Parse error on line 1:
{ "b: 'a' }
--^
Expecting 'STRING', '}', got 'undefined'
Parse error on line 1:
{ "x: 2 }
--^
Expecting 'STRING', '}', got 'undefined'

file-glob対応しているので **/*.md もいけます。