ponyで何か作りたいなと思ったのでひとまずScalaで作ってそれを移植しようかなーと思い立ち作ってみた。
get/setしか叩け無いけどまあいいよね( ◜◡‾) レスポンスのパースが限定的なので勉強がてらもう少し作って見るかも。
せっかくだからnon-blockingにしたりとかredis-clusterとかまでやってみようかなー。(ponyとは・・)
ponyで何か作りたいなと思ったのでひとまずScalaで作ってそれを移植しようかなーと思い立ち作ってみた。
get/setしか叩け無いけどまあいいよね( ◜◡‾) レスポンスのパースが限定的なので勉強がてらもう少し作って見るかも。
せっかくだからnon-blockingにしたりとかredis-clusterとかまでやってみようかなー。(ponyとは・・)
はろぽに〜₍₍ (ง´・_・`)ว ⁾⁾
2016年8月26日にPony 0.3がリリースされました。 チュートリアルについてのブログを書いたときから大分経ったような気がするので、 最近のPony事情についてちょっとまとめようかなーと思います。
(ついでにチュートリアルについての記事はdeprecated的な文言を先頭に付けました。試してないですが多分色々動かないところがあると思います。( ◜◡‾))
今年の5月ごろPonyのマスコットキャラクターであるMain君が生まれました。goで言うところのgopherポジションだと思います。 Introducing Main, the Pony Mascot.
最初は( ◜◡‾)???となったんですが、しばらく眺めていると謎の良さみが深まります。slackのemojiに登録しましょう。
ちなみに3月末頃に別の案が提案されていました。
@SeanTAllen @ponylang Looks good, well done. Here is a logo I put together. Could work with you to adapt #ponylang pic.twitter.com/XpduLjQVtO
— Hoogs (@h00gs) March 30, 2016
どちらがいいとかは置いておいて、ponyのマスコットはMain君です。
ちなみにMain君ステッカーはこちらで購入することができます。もしぬいぐるみが出たら家用と職場用で2つ購入しようと思います。 Main by Sean T Allen (#12903) - Sticker Mule
Ponyの開発、商用サポートなどを手掛けていたcausality社がcloseすることになったという情報が8月21日に公開されました。
しかしPonyの開発はコミュニティドリブン(+大学の研究などで?)続いていくと宣言されていますし、ponycのコミットを見る限りちゃんと続いているので言語的には問題なさそうです。
資金獲得などの面で色々あったのだと思いますが、やはりちゃんとした技術を売りにしたスタートアップでもうまくいかないことはあるんだなーという気持ちに。
Ponyのあれこれについて解説するライブ放送の録画がvimeoで見られるようになっています!
僕のオススメは
コンパイル時にメソッドを呼び出して計算した結果を型パラメータに突っ込むみたいなことも出来て結構強力そうです。すごく複雑なことをしなくても、例えば行列の次元数を型パラメータにしておけば、行列積でかけ間違えてたらコンパイルエラーといった分かりやすいメリットを簡単に得ることが出来そうです。
ponylang, reference capability周りが変わるらしい pic.twitter.com/Ss8pjHCzR0
— まつちゃら (@matsu_chara) August 12, 2016
の2つです。
まだPonyのGCについての論文 を読んだことがなければ 「Pony VUG #6: Andrew Turley: The Art of Forgetting - Garbage Collection in Pony」もおすすめです。
という気持ちです。ponylangというとreference capabilityの型システムに目を奪われがちだけど実はGCも攻めてて面白いのである(参照を共有してるけどstop the worldレスでアクターごとにGC出来るなど)https://t.co/0kaND4frMr
— まつちゃら (@matsu_chara) August 30, 2016
Pony VUGは言語の入門だけでなく、実装の詳細やこれから入るかもしれない機能について突っ込んで触れられているので必見です。
なりました。 http://tutorial.ponylang.org/
以前は付いていたsyntax highlightがつかなくなった気がするんですが・・・という気持ちですがメジャー言語になればつくはずなので問題無いですね( ◜◡‾)
ponyのRFC議論場がgithubに公開されているらしい(まだなにもないけど1,2週間で何かでそうとのこと)https://t.co/3H7MtbRBIu #ponylang
— まつちゃら (@matsu_chara) May 19, 2016
すでにいくつかはマージされ、実装されています。
RFC#1 cryptoがめでたくマージされていたhttps://t.co/xR48Pejdcg #ponylang
— まつちゃら (@matsu_chara) July 7, 2016
https://github.com/ponylang/ponyc/search?utf8=%E2%9C%93&q=implement+RFC&type=Issues
ちなみに、この方式はRustなどに影響を受けているようです。
Pony has adopted a RFC process
何百コミットもあったのに一向にリリースされなかったPonyですが、homebrewの人が発端でリリースしようよ => 整えるか! という流れで無事に0.3リリースにたどり着いたようです🎉
ponylang, しばらく出てなかったの、開発が停滞していたわけじゃなくてリリースプロセスが整ってなかったからなんだけどhomebrewのメンテナの人がたてたイシューがきっかけでその辺が整った感じですhttps://t.co/GcmPmYzv1A
— まつちゃら (@matsu_chara) August 27, 2016
今まで0.2.1使って「動かない(´;ω;`) => masterビルドして使ってね」コンボを食らってる人が何人もいたのでこれで少し落ち着きそうです。
ちなみに0.2.1からの差分は 1092 commits 524 changed files with 13,181 additions and 2,797 deletions
です。そりゃ動かんわ!
https://github.com/ponylang/ponyc/compare/0.2.1...0.3.0
詳しくはチェンジログ参照なんですが、32bit ARM/X86対応が加わったりしました。
またletとほぼおなじだけど、ポインタをもたせるのではなくて直接埋め込むのでポインタデリファレンスの分少しだけ高速な Embedded fieldsという概念が加わりました。速そう(?) pony-tutorial/variables.md at 5295d2c35f21953e9fe6f997b9100273a6f72b42 · ponylang/pony-tutorial · GitHub
serializationの実装がmasterに入ったりしてdistributed Ponyへの道が近づいたようです。
This is the first step towards Distributed Pony being "unvapourware". / “First version of Pony serialization lande…” https://t.co/HTn1KzFwO3
— まつちゃら (@matsu_chara) May 19, 2016
他にもコレクションのメソッドが増えたり、速くなったり、ネットワーク周りが改良されたり色々色々あるのですが、全部は把握できていないので誰か教えて下さい。 🙏
ちなみに全くどうでもよいですが、記念すべき1000個めのイシューはスケジューラのセグメンテーションフォールトでした。
Segfault in Pony scheduler · Issue #1000 · ponylang/ponyc · GitHub
特に自分が未来を知っているわけではないですがErlangっぽいprocess monitoringの仕組みをいれる取り組みが進行中?( Erlang-style monitoring · Issue #350 · ponylang/ponyc · GitHub )とありますし、serializeとかが入ったので、distributed Ponyが粛々と進んでいる気配があります。
かと思えば依存型が入る?など謎の高度な仕組みが入ってきそうで最終的にどうなるのかよくわからないですが今後も注目の言語だと言えそうです。
ということで
迷ったらPHPかPonyを書くのじゃ・・・
— まつちゃら (@matsu_chara) September 8, 2016
迷ってなくてもPony書いてください(´;ω;`)
— まつちゃら (@matsu_chara) September 8, 2016
まじハローワールドだけでもいいんで
— まつちゃら (@matsu_chara) September 8, 2016
何卒🙏
ExecutionContextとblockingについて調べたメモ [scala] - だいたいよくわからないブログ の続きです。
前回の記事ではblockingでブロックする処理を包むと自動的にスレッド数が増えるため高速に処理されて嬉しいよという話を書きました。加えて、それが有効になるのはExecutionContext.globalなどのScalaが内部で提供しているExecutionContextかakkaのdispatcherを利用した場合だという話も書きました。
なるほどーと思ってそのまま放置していたのですが、ふと実際にスレッド数見てみるかと思って下記のような要領で確認してみました。 ちょっと長いですが、下記の3種類のExecutionContextでblockingな処理を大量に呼び出してどうなるかをみています。
ExecutionContext.global
ExecutionContext.fromExecutorService(new ForkJoinPool(50))
ExecutionContext.fromExecutorService(new ForkJoinPool(1000, new DefaultThreadFactory, uncaughtExceptionHandler, false)
(BlockContext付きのThreadFactory入りのForkJoinPool)上記コードだとxms1G, xmx1Gで、1,3のケースでは 5000~10000回程度呼び出すとOOMになりました。 BlockContextがついていない2のケースでは時間はかかりますがしっかり実行してくれます。 今までblockingだとスレッド数は ManagedBlocker が良い感じにしてくれるという雑な理解をしていましたが、どうも普通に増えるだけっぽいです(?)(単に実験コードがわるいだけの可能性があるので注意が必要ですが、少なくともOOMになるケース自体はあり得るようです。)
そもそもBlockContextつきのThreadFactory自体がscala処理系は簡単に使う方法を外部に公開している訳ではないことからも考えると、blockingはExecutionCotext.globalのような、そこまでヘビーに使わないときに便利なもの・・?みたいな理解になりました。
前回の記事でakkaのdispatcherはBlockContextを自前で実装していることを述べました。実験として以下の様なコードでOOMになるかどうかを実験しました。
試した結果、ちゃんと(?)OOMになったので、akkaでブロック処理を行う際はblockingを書かないようにするか、十分に注意(DBでのブロックなら障害時にスレッド数が増えすぎないか?などを考える)する必要がありそうです。
blockingが動かないとコードによってはデッドロックもあり得る・・?といった背景もあるようですが、いずれにせよakkaでblockする処理を行う際はちゃんと並列数を見積もって予め適切なサイズにExecutionContextを分離しておくというオーソドックスなやり方が一番良さそうだなと思った次第です。
openjdkの実装を少し見るかーと思ったのですが、あたりは全体的なロジックが掴めてないので部分的に読むのは難しそうでした・・。 今度時間を書けて読みたいような読みたくないような・・・。
ただ、 no compensation needed
, create a replacement
というのがあるので必ず増えるわけではないかもしれません。
OOMについて調べましたが今回の実験ではメモリを1GB程度と少なめにしているので実際にはもっと耐えてくれるはずです。ただ負荷が強いプロダクションなどに投入するときは注意しましょう。という認識になりました。
blockingが要るのか要らないのかは微妙なところですがスレッドが足りなくてデッドロックしちゃうような処理があるばあいはblockingしたほうが良さそうですねー。とはいえそしたらスレッドプール分ければいいのではという気持ちがあったりしますが、常に簡単に分割できるって感じでもないと思うので・・。逆にスレッド増え続けてしまうくらい処理が重たくなるならblockingしないほうが良い気がします。そもそもblockingで処理包むの忘れるから使えてない
気になったのでコードを追ったりしてみました。備忘録けんメモ書きです。
そもそもtimeoutがどうやって送信されてどうやってbrokerで受信されているのかが追いにくいのですが、
https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L32 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/api/ProducerRequest.scala#L31 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L32-L40 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L96-L100
RequestHeader.java+ProduceRequest.javaと考えてProtocol.javaに書いてあるSchemaと、ProducerRequest.readfromと照らし合わせると帳尻が合いそうです。
つまりProducerRequest.ackTimeoutMs(コンストラクタ第二引数)を追えばよさそうで、追っていくと request.timeout.ms
になりそうです。
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L256
this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs);
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
以上からProducerRequest(サーバー側のコード)に渡ってくるタイムアウトはrequest.timeout.ms
になりそうです。
此処から先は処理をきちんとおえてないのですがgrepする限り def handleProducerRequest(request: RequestChannel.Request) {...}
が処理を担当していそうです。
https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L298
こいつは最終的に rplicaManager.appendMessages
にtimeout値を渡しています。
appendMessages
は以下にあります。
https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L314
appendされたメッセージはリクエスト煉獄に叩きこまれます。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L343
これが実際にタイムアウトする処理はスケジュール機構なども含むので結構複雑ですが、 かなり紆余曲折があって、以下でタイムアウトします。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/TimingWheel.scala#L131-L133
そのあと、taskExecutorにsubmitされて、runが呼ばれます。
// Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask)
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/Timer.scala#L93-L94
runは forceComplete
を呼ぶようになっています。(runが呼ばれるとタイムアウトしたかキャンセルしたかみたいな扱いになっていそうです。)
/* * run() method defines a task that is executed on timeout */ override def run(): Unit = { if (forceComplete()) onExpiration() }
forceComplete経由で呼ばれるonCompleteでcallbackが呼ばれて、ReplicaManager#L343にもどります。
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)
この際statusを変更する処理などがありませんが、statusの初期値がErrors.REQUEST_TIMED_OUT
になっているので不要そうです。
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L64
ということでrequest.timeout.msを短くするとack待ち時間が短くなるようです。(色々な要素を省略したので本当はもう少し色々なところでタイムアウトしたりしています・・。)
ドキュメントを見ると request.timeout.ms
にはackのことが書いてないんですが
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhauste
timeout.ms
には書いてあります。
The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.
そしてtimeout.msはdprecatedでREQUEST_TIMEOUT_MS_CONFIGにしろと書いてあるので役割が変わって無ければ同じ説明と認識して良さそうです。
一方で、producerのclientにも同じパラメータがわたっててそっちで能動的に切断するケースも有ります。 NetworkClientに入っているrequest.timeout.ms値がその辺の機能です。 Sender.sendは最終的にclient.sendをよぶけどその正体がこれです。(KafkaProducerでパラメータと一緒に初期化)
そいつでもhandleTimeoutしていてタイムアウトするとdisconnected=trueをレスポンスに突っ込みます。
その後、 Sender.handleProduceResponse
でNETWORK_EXCEPTION
になります。
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L260-L265
このフローだとエラーログがtraceやdebugじゃないと出なくて、例外をログにだしても The server disconnected before a response was received.
というどこで何が何故切断されたのかみたいな情報が皆無なログが出るだけでつらい感じになります(;´Д`)
概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。
まずはbuild.sbtです。
name := "akka-scala-seed" version := "1.0" scalaVersion := "2.11.8" val akkaVersion = "2.4.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-contrib" % akkaVersion )
build.sbt終わり。
さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。
scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように 以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。
override def preRestart(reason: Throwable, message: Option[Any]) { message.foreach(self ! x) }
例えば以下の様な例では、2,3,4,5,1
のように出力順がひっくり返ってしまいます。
package order import akka.actor.{Actor, ActorSystem, Props} import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Await import scala.concurrent.duration._ object OrderMain extends App { implicit val system = ActorSystem() implicit val executionContext = system.dispatcher implicit val timeout = Timeout.durationToTimeout(3.seconds) val actor = system.actorOf(PreRestarter.props(new Connection)) (1 to 5).foreach(actor ! _) Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf) } class PreRestarter(connection: Connection) extends Actor { override def receive = { case "finished" => sender ! "finished" case x: Int => connection.doJob(x) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { super.preRestart(reason, message) if (!connection.isConnected) { connection.connect() } message.foreach(self ! _) } } object PreRestarter { def props(connection: Connection) = Props(classOf[PreRestarter], connection) } class Connection { var isConnected = false def connect() = { println("connected") isConnected = true } def close() = { println("closed") isConnected = false } def doJob(x: Int) = { if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x") println(s"the answer is $x") } }
大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。
そんなときのパターンとして、 childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。
package order import akka.actor.SupervisorStrategy.{Restart, Stop} import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props} import order.JobSupervisor2.{AckAndNext, NackAndRetry} import scala.collection.immutable.Queue import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.util.Random object OrderMain2 extends App { implicit val system = ActorSystem() val actor = system.actorOf(Props[RootActor2], "root") (1 to 10).foreach(actor ! _) Thread.sleep(300) (11 to 20).foreach(actor ! _) actor ! "AllJobProduced" Await.result(system.whenTerminated, Duration.Inf) } class RootActor2 extends Actor { val actor = context.actorOf(JobSupervisor2.props, "manager") override def receive = { case "AllJobFinished" => context.system.terminate case x => actor forward x } } class JobSupervisor2 extends Actor { override val supervisorStrategy = OneForOneStrategy() { case _: Exception => self ! NackAndRetry Restart case _ => Stop } val worker = context.actorOf(OneWorker2.props(new Connection2), "worker") var jobs = Queue[Int]() var isAllJobProduced = false // たかだか一つのjobをworkerに与える override def receive = { case "AllJobProduced" => isAllJobProduced = true finishCheck() case x: Int => // jobが無いとackの時に次が渡せないのでここで補填 if (jobs.isEmpty) { worker ! x } addJob(x: Int) case AckAndNext => removeOldJob() produceNext() finishCheck() case NackAndRetry => produceNext() } private def addJob(x: Int) = { jobs = jobs.enqueue(x) } private def removeOldJob() = { jobs = jobs.dequeue._2 } private def produceNext() = jobs.headOption.foreach(worker ! _) private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished" } object JobSupervisor2 { def props = Props[JobSupervisor2] case object AckAndNext case object NackAndRetry } class OneWorker2(connection: Connection2) extends Actor { override def receive = { case x: Int => connection.doJob(x) sender ! AckAndNext } override def preStart(): Unit = { super.preStart() if (!connection.isConnected) { connection.connect() } } } object OneWorker2 { def props(connection: Connection2) = Props(classOf[OneWorker2], connection) } class Connection2 { var isConnected = false def connect() = isConnected = true def close() = isConnected = false def doJob(x: Int) = { if (!isConnected) { throw new RuntimeException(s"Connection already closed. message = $x") } if (x % 5 == Random.nextInt(3)) { isConnected = false throw new RuntimeException(s"unlucky number $x. connection was reset") } println(s"the answer is $x") } }
x % 5 == Random.nextInt(3)
が成立した時connectionが切断されてdoJobに失敗するようにしました。
この実装だと1~20まで順番通りに表示されます。
余談
今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。
MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int
で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。
余談終わり
そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・)
それが akka.contrib.mailbox.PeekMailboxType
です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)
これを使って書き直すと以下のようになります。
package order import akka.actor.{Actor, ActorSystem, Props} import akka.contrib.mailbox.PeekMailboxExtension import com.typesafe.config.ConfigFactory import scala.concurrent.Await import scala.concurrent.duration.Duration import scala.util.Random object OrderMain3 extends App { implicit val system = ActorSystem("MySystem", ConfigFactory.parseString( """ peek-dispatcher { mailbox-type = "akka.contrib.mailbox.PeekMailboxType" max-retries = 2 } """)) val actor = system.actorOf(Props[RootActor3], "root") (1 to 10).foreach(actor ! _) Thread.sleep(300) (11 to 20).foreach(actor ! _) actor ! "AllJobProduced" Await.result(system.whenTerminated, Duration.Inf) } class RootActor3 extends Actor { val actor = context.actorOf(JobSupervisor3.props, "manager") override def receive = { case "AllJobFinished" => context.system.terminate case x => actor forward x } } class JobSupervisor3 extends Actor { val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker") var isAllJobProduced = false override def receive = { case "AllJobProduced" => isAllJobProduced = true finishCheck() case x: Int => worker ! x finishCheck() } private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished" } object JobSupervisor3 { def props = Props[JobSupervisor3] } class OneWorker3(connection: Connection) extends Actor { override def receive = { case x: Int => connection.doJob(x) PeekMailboxExtension.ack() } override def preStart(): Unit = { super.preStart() if (!connection.isConnected) { connection.connect() } } } object OneWorker3 { def props(connection: Connection) = Props(classOf[OneWorker3], connection) } class Connection3 { var isConnected = false def connect() = isConnected = true def close() = isConnected = false def doJob(x: Int) = { if (!isConnected) { throw new RuntimeException(s"Connection already closed. message = $x") } if (x % 5 == Random.nextInt(3)) { isConnected = false throw new RuntimeException(s"unlucky number $x. connection was reset") } println(s"the answer is $x") } }
JobSupervisor3とかもういらないくらい簡単になりました。 キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。 しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。
*1:ってドキュメントに書いてありました