blockingとOOM [scala]

ExecutionContextとblockingについて調べたメモ [scala] - だいたいよくわからないブログ の続きです。

今回の記事の結論

  • blockingをたくさん呼ぶとOOMになるまでスレッド数が増え続けるっぽい。 ==> 追記: scala2.12 だとlimitがあるので安心して使えるらしいです。 scala.concurrent.blocking() - Septeni Engineer's Blog
  • blockingはThreadFactoryをカスタマイズしないと有効にならないのでForkJoinPoolからExecutionContextを生成した場合などは単に無視されるため安全(ただ、スレッド数は増えないのでデッドロックなどには注意)
  • ExecutionContext.global、akkaのdispatcherなどをExecutionContextとして使うとblockingが有効になるため、注意が必要。(ExecutionContext.globalは手抜き用なので良いとしても、akkaの場合は注意したほうが良い。)
  • とはいえblocking自体は有用なので使いたいケースもありそう

前回の記事のまとめ

前回の記事ではblockingでブロックする処理を包むと自動的にスレッド数が増えるため高速に処理されて嬉しいよという話を書きました。加えて、それが有効になるのはExecutionContext.globalなどのScalaが内部で提供しているExecutionContextかakkaのdispatcherを利用した場合だという話も書きました。

blockingとOOM

なるほどーと思ってそのまま放置していたのですが、ふと実際にスレッド数見てみるかと思って下記のような要領で確認してみました。 ちょっと長いですが、下記の3種類のExecutionContextでblockingな処理を大量に呼び出してどうなるかをみています。

  1. ExecutionContext.global
  2. ExecutionContext.fromExecutorService(new ForkJoinPool(50))
  3. ExecutionContext.fromExecutorService(new ForkJoinPool(1000, new DefaultThreadFactory, uncaughtExceptionHandler, false) (BlockContext付きのThreadFactory入りのForkJoinPool)

gist.github.com

上記コードだとxms1G, xmx1Gで、1,3のケースでは 5000~10000回程度呼び出すとOOMになりました。 BlockContextがついていない2のケースでは時間はかかりますがしっかり実行してくれます。 今までblockingだとスレッド数は ManagedBlocker が良い感じにしてくれるという雑な理解をしていましたが、どうも普通に増えるだけっぽいです(?)(単に実験コードがわるいだけの可能性があるので注意が必要ですが、少なくともOOMになるケース自体はあり得るようです。)

そもそもBlockContextつきのThreadFactory自体がscala処理系は簡単に使う方法を外部に公開している訳ではないことからも考えると、blockingはExecutionCotext.globalのような、そこまでヘビーに使わないときに便利なもの・・?みたいな理解になりました。

akkaのdispatcher

前回の記事でakkaのdispatcherはBlockContextを自前で実装していることを述べました。実験として以下の様なコードでOOMになるかどうかを実験しました。

gist.github.com

試した結果、ちゃんと(?)OOMになったので、akkaでブロック処理を行う際はblockingを書かないようにするか、十分に注意(DBでのブロックなら障害時にスレッド数が増えすぎないか?などを考える)する必要がありそうです。

blockingが動かないとコードによってはデッドロックもあり得る・・?といった背景もあるようですが、いずれにせよakkaでblockする処理を行う際はちゃんと並列数を見積もって予め適切なサイズにExecutionContextを分離しておくというオーソドックスなやり方が一番良さそうだなと思った次第です。

openjdk

openjdkの実装を少し見るかーと思ったのですが、あたりは全体的なロジックが掴めてないので部分的に読むのは難しそうでした・・。 今度時間を書けて読みたいような読みたくないような・・・。

jdk7u-jdk/ForkJoinPool.java at f4d80957e89a19a29bb9f9807d2a28351ed7f7df · openjdk-mirror/jdk7u-jdk · GitHub

jdk7u-jdk/ForkJoinPool.java at f4d80957e89a19a29bb9f9807d2a28351ed7f7df · openjdk-mirror/jdk7u-jdk · GitHub

ただ、 no compensation needed, create a replacement というのがあるので必ず増えるわけではないかもしれません。

まとめ

OOMについて調べましたが今回の実験ではメモリを1GB程度と少なめにしているので実際にはもっと耐えてくれるはずです。ただ負荷が強いプロダクションなどに投入するときは注意しましょう。という認識になりました。 blockingが要るのか要らないのかは微妙なところですがスレッドが足りなくてデッドロックしちゃうような処理があるばあいはblockingしたほうが良さそうですねー。とはいえそしたらスレッドプール分ければいいのではという気持ちがあったりしますが、常に簡単に分割できるって感じでもないと思うので・・。逆にスレッド増え続けてしまうくらい処理が重たくなるならblockingしないほうが良い気がします。そもそもblockingで処理包むの忘れるから使えてない

Kafkaのproducerのrequest.timeout.msはどのように作用するか

気になったのでコードを追ったりしてみました。備忘録けんメモ書きです。

そもそもtimeoutがどうやって送信されてどうやってbrokerで受信されているのかが追いにくいのですが、

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L32 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/api/ProducerRequest.scala#L31 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L32-L40 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L96-L100

RequestHeader.java+ProduceRequest.javaと考えてProtocol.javaに書いてあるSchemaと、ProducerRequest.readfromと照らし合わせると帳尻が合いそうです。

つまりProducerRequest.ackTimeoutMs(コンストラクタ第二引数)を追えばよさそうで、追っていくと request.timeout.ms になりそうです。

this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L256

this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L281-L290

requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L318

ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L333

以上からProducerRequest(サーバー側のコード)に渡ってくるタイムアウトrequest.timeout.msになりそうです。

此処から先は処理をきちんとおえてないのですがgrepする限り def handleProducerRequest(request: RequestChannel.Request) {...} が処理を担当していそうです。

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L298

こいつは最終的に rplicaManager.appendMessages にtimeout値を渡しています。 appendMessagesは以下にあります。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L314

appendされたメッセージはリクエスト煉獄に叩きこまれます。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L343

これが実際にタイムアウトする処理はスケジュール機構なども含むので結構複雑ですが、 かなり紆余曲折があって、以下でタイムアウトします。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/TimingWheel.scala#L131-L133

そのあと、taskExecutorにsubmitされて、runが呼ばれます。

// Already expired or cancelled
if (!timerTaskEntry.cancelled)
  taskExecutor.submit(timerTaskEntry.timerTask)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/Timer.scala#L93-L94

runは forceComplete を呼ぶようになっています。(runが呼ばれるとタイムアウトしたかキャンセルしたかみたいな扱いになっていそうです。)

/*
 * run() method defines a task that is executed on timeout
 */
override def run(): Unit = {
  if (forceComplete())
    onExpiration()
}

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedOperation.scala#L105-L108

forceComplete経由で呼ばれるonCompleteでcallbackが呼ばれて、ReplicaManager#L343にもどります。

val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L123

この際statusを変更する処理などがありませんが、statusの初期値がErrors.REQUEST_TIMED_OUT になっているので不要そうです。

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L64

ということでrequest.timeout.msを短くするとack待ち時間が短くなるようです。(色々な要素を省略したので本当はもう少し色々なところでタイムアウトしたりしています・・。)

ドキュメントを見ると request.timeout.ms にはackのことが書いてないんですが

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhauste

timeout.ms には書いてあります。

The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.

そしてtimeout.msはdprecatedでREQUEST_TIMEOUT_MS_CONFIGにしろと書いてあるので役割が変わって無ければ同じ説明と認識して良さそうです。

一方で、producerのclientにも同じパラメータがわたっててそっちで能動的に切断するケースも有ります。 NetworkClientに入っているrequest.timeout.ms値がその辺の機能です。 Sender.sendは最終的にclient.sendをよぶけどその正体がこれです。(KafkaProducerでパラメータと一緒に初期化)

そいつでもhandleTimeoutしていてタイムアウトするとdisconnected=trueをレスポンスに突っ込みます。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L408-L414

その後、 Sender.handleProduceResponseNETWORK_EXCEPTIONになります。 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L260-L265

このフローだとエラーログがtraceやdebugじゃないと出なくて、例外をログにだしても The server disconnected before a response was received. というどこで何が何故切断されたのかみたいな情報が皆無なログが出るだけでつらい感じになります(;´Д`)

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L104

akkaで失敗したメッセージを順番通りにもう一度処理してもらうには

概ね Mailbox with Explicit Acknowledgement — Akka Documentation のお話です。

まずはbuild.sbtです。

name := "akka-scala-seed"

version := "1.0"

scalaVersion := "2.11.8"

val akkaVersion = "2.4.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-contrib" % akkaVersion
)

build.sbt終わり。

さて、conectionが切れるなどメッセージの内容以外の原因でchildの仕事が失敗してrestartする時、akkaでは死んだ時処理していたメッセージは消えてしまいます。

scala - Akka - resending the "breaking" message - Stack Overflow で紹介されているように 以下のようにpreRestartでselfに投げることでメッセージを再処理することが出来るのですが、そうすると順番が変わってしまいます。

override def preRestart(reason: Throwable, message: Option[Any]) {
    message.foreach(self ! x)
}

例えば以下の様な例では、2,3,4,5,1 のように出力順がひっくり返ってしまいます。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration._

object OrderMain extends App {
  implicit val system = ActorSystem()
  implicit val executionContext = system.dispatcher
  implicit val timeout = Timeout.durationToTimeout(3.seconds)

  val actor = system.actorOf(PreRestarter.props(new Connection))
  (1 to 5).foreach(actor ! _)

  Await.result((actor ? "finished").flatMap(_ => system.terminate()), Duration.Inf)
}

class PreRestarter(connection: Connection) extends Actor {
  override def receive = {
    case "finished" => sender ! "finished"
    case x: Int => connection.doJob(x)
  }

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)

    if (!connection.isConnected) {
      connection.connect()
    }
    message.foreach(self ! _)
  }
}

object PreRestarter {
  def props(connection: Connection) = Props(classOf[PreRestarter], connection)
}

class Connection {
  var isConnected = false

  def connect() = {
    println("connected")
    isConnected = true
  }

  def close() = {
    println("closed")
    isConnected = false
  }

  def doJob(x: Int) = {
    if (!isConnected) throw new RuntimeException(s"Connection already closed. message = $x")

    println(s"the answer is $x")
  }
}

大体良いんですがなんとなくもやもやするとか、タイムアウトを設定したいとなると、待機jobの数も考慮して設定するのかそれともリトライのときにタイムアウトするのは仕方ない?でもそしたらそもそもリトライしなくていい?そうするとエラーレスポンス増えるけどそれでいいの?といったことを判断するといった面倒が増えてしまいます。

そんなときのパターンとして、 childは仕事だけをする。supervisorはジョブを貯めこんで子供が仕事を終わらせたら次の仕事を一個ずつ渡す形式にしてやることが考えられます。*1たとえば以下のようになります。

package order

import akka.actor.SupervisorStrategy.{Restart, Stop}
import akka.actor.{Actor, ActorSystem, OneForOneStrategy, Props}
import order.JobSupervisor2.{AckAndNext, NackAndRetry}

import scala.collection.immutable.Queue
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain2 extends App {
  implicit val system = ActorSystem()
  val actor = system.actorOf(Props[RootActor2], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor2 extends Actor {

  val actor = context.actorOf(JobSupervisor2.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor2 extends Actor {
  override val supervisorStrategy = OneForOneStrategy() {
    case _: Exception =>
      self ! NackAndRetry
      Restart
    case _ => Stop
  }

  val worker = context.actorOf(OneWorker2.props(new Connection2), "worker")
  var jobs = Queue[Int]()
  var isAllJobProduced = false

  // たかだか一つのjobをworkerに与える
  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      // jobが無いとackの時に次が渡せないのでここで補填
      if (jobs.isEmpty) {
        worker ! x
      }
      addJob(x: Int)
    case AckAndNext =>
      removeOldJob()
      produceNext()
      finishCheck()
    case NackAndRetry => produceNext()
  }

  private def addJob(x: Int) = {
    jobs = jobs.enqueue(x)
  }

  private def removeOldJob() = {
    jobs = jobs.dequeue._2
  }

  private def produceNext() = jobs.headOption.foreach(worker ! _)
  private def finishCheck() = if (isAllJobProduced && jobs.isEmpty) context.parent ! "AllJobFinished"

}

object JobSupervisor2 {
  def props = Props[JobSupervisor2]
  case object AckAndNext
  case object NackAndRetry
}

class OneWorker2(connection: Connection2) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      sender ! AckAndNext
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker2 {
  def props(connection: Connection2) = Props(classOf[OneWorker2], connection)
}

class Connection2 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

x % 5 == Random.nextInt(3) が成立した時connectionが切断されてdoJobに失敗するようにしました。 この実装だと1~20まで順番通りに表示されます。

余談

今回のようにparentがsupervisorStrategyで失敗したことを認識して投げ直してあげる方式と、childのpreStartでRequestを投げてparentがそれに応える方式があると思うんですよね。 MainでわざとThread.sleepで間を開けてjobが空っぽになる期間を用意してみたところ、Requestをparentに送るパターン(後者)だとそのタイミングでjobが無かったら次のRequestをスケジュールするか、何か状態を持たせて遅れなかったとき限定で receiveの x: Int で直接送りつける?(でもそうすると子供からRequestが来るのと親が送りつけてくるパスが両方できてわかりにくい?)みたいな面倒が生じたので今回は前者のパターンを採用してみました。でもSupervisorStrategyに処理を書くのはそれはそれで・・って感じで困るのでなんかいい指針が欲しいです。

余談終わり

そして、本題ですがこういう明示的にackをするパターンがすでにakka.contribにあるらしいです(・ ω ・) それが akka.contrib.mailbox.PeekMailboxType です。ドキュメントのトップにはなくて External Contributions — Akka Documentation のページに有りました。(akkaのドキュメントはこういうトップからは視えないけどページ内リンクから参照されている隠しページ(?)があるのでpdfで見ると新たなページを発掘できて楽しい)

これを使って書き直すと以下のようになります。

package order

import akka.actor.{Actor, ActorSystem, Props}
import akka.contrib.mailbox.PeekMailboxExtension
import com.typesafe.config.ConfigFactory

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

object OrderMain3 extends App {
  implicit val system = ActorSystem("MySystem", ConfigFactory.parseString(
    """
    peek-dispatcher {
      mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
      max-retries = 2
    }
    """))

  val actor = system.actorOf(Props[RootActor3], "root")
  (1 to 10).foreach(actor ! _)

  Thread.sleep(300)

  (11 to 20).foreach(actor ! _)

  actor ! "AllJobProduced"
  Await.result(system.whenTerminated, Duration.Inf)
}

class RootActor3 extends Actor {

  val actor = context.actorOf(JobSupervisor3.props, "manager")

  override def receive = {
    case "AllJobFinished" => context.system.terminate
    case x => actor forward x
  }
}

class JobSupervisor3 extends Actor {

  val worker = context.actorOf(OneWorker3.props(new Connection).withDispatcher("peek-dispatcher"), "worker")
  var isAllJobProduced = false

  override def receive = {
    case "AllJobProduced" =>
      isAllJobProduced = true
      finishCheck()
    case x: Int =>
      worker ! x
      finishCheck()
  }

  private def finishCheck() = if (isAllJobProduced) context.parent ! "AllJobFinished"

}

object JobSupervisor3 {
  def props = Props[JobSupervisor3]
}

class OneWorker3(connection: Connection) extends Actor {

  override def receive = {
    case x: Int =>
      connection.doJob(x)
      PeekMailboxExtension.ack()
  }

  override def preStart(): Unit = {
    super.preStart()

    if (!connection.isConnected) {
      connection.connect()
    }
  }
}

object OneWorker3 {
  def props(connection: Connection) = Props(classOf[OneWorker3], connection)
}

class Connection3 {
  var isConnected = false

  def connect() = isConnected = true

  def close() = isConnected = false

  def doJob(x: Int) = {
    if (!isConnected) {
      throw new RuntimeException(s"Connection already closed. message = $x")
    }

    if (x % 5 == Random.nextInt(3)) {
      isConnected = false
      throw new RuntimeException(s"unlucky number $x. connection was reset")
    }

    println(s"the answer is $x")
  }
}

JobSupervisor3とかもういらないくらい簡単になりました。 キューの管理を自前でしなくていいのは楽です。間違えて二回送っちゃったりしないことをテストするのは結構頭使いますし・・・。 しかも自作の時はついてなかったメッセージ単位のretry回数制限まで実装されているようなので結構よさそうだなーと思った話でした。

*1:ってドキュメントに書いてありました

Kafkaの旧consumerでいうconsumer.idは新consumerだと何なのか

consumerが復数同時に同じpartitionにつないだときに、どのクライアントが担当するかを意図的に決定させるためにconsumer.idを設定していたんですが、新consumerでconsumer.id設定できるところがみつからないなーと思ったので、Kafkaコードを微妙に追いつつ探ってみました。備忘録がてらのメモ書きです。

どうも旧consumerでいうところのconsumer.idは新consumerでいうとclient.idらしいです。partition assignにはmemberIdが指定されるので厳密には違うっぽいですが、client.idを元に計算されるようです。(多分)

まずmemberIdは旧consumerのときは(consumer.idとして)自分から名乗っていたけど、新consumerでは最初は無名で、brokerに名付けられたら次からはその名前を名乗るようです。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  // if the member id is unknown, register the member to the group
  addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else ...

で、このmemberIdはどうやら以下のように決まるようです。

// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L588-L589

generateMemberIdSuffix は以下のように決まっています。

// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L170 randomにするかどうか迷っているようですが、予測可能だと勝手に偽物consumerが繋いできたりとかあるんでしょうか・・?

で、肝心のclientIdはというと handleJoinGroup の第三引数にありそうです。

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) { ... }

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L106-L108

このメソッドの呼び出し元をみると request.header.clientId とあるので、headerに入っていそうです。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L796

RequestHeaderのクラスを見るとどうも struct.set(CLIENT_ID_FIELD, client); とあるので ここで String client とされているものが client_id っぽいとみてよさそうです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java#L47-L61

headerはConsumerNetworkClientでつくられています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L101

ここでいう client.nextRequestHeader(api);clientは(ややこしいですが)KafkaClientのようです。そして実際には NetworkClient が使われているようです。

public class NetworkClient implements KafkaClient { ...

    public NetworkClient(Selectable selector,
                         Metadata metadata,
                         String clientId, ...) { ... }
    ...
}

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L91-L93

NetworkClientのインスタンス生成やclientIdの生成は以下で行われています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L554

これをみると、 clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); とあるので、 旧consumer.idのような役目はclient.Idとして設定可能になり、かつそのまま使われいてるわけではなく自分から名乗るのか・brokerが名付けるのかや、後ろにbrokerが何かsuffixを付けるのかなどが変更されていそうです。

partition assignment strategy周りも少し変更されていて、旧consumerだと、このAssignmentContextをつかっていますが、 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

新consumerではRangeAssignorといったAbstractPartitionAssignorが担当しているようです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L71-L72

新consumerの話に戻って、client.idのデフォルト値です。 デフォルト値は "" だが、無いと consumer-CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() になるようです。

staticだから同じプロセス内じゃないとclientId変わらないからクラスタ全体だとidentifiyできなさそうだけどドキュメントにもtracking用と書いてあるからいいんでしょうか・・。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L541

そして、ここまで読んだ後に以下のドキュメントを見つけました。

Kafka Client-side Assignment Proposal - Apache Kafka - Apache Software Foundation

  1. Does the consumer needs to provide the member-id through configs? Also is the consumerId in assign() the same as memberId? Currently the consumer-id / member-id is assigned at the coordinator side. The memberId is used exactly the same as consumerId. I just renamed it for the more general usage.

先に出てきて欲しかった(´;ω;`)

slackチャンネルのワードクラウドを生成するslackloudを作った

アイコンを変えました₍₍ (ง´・_・`)ว ⁾⁾

そして作りました。 GitHub - matsu-chara/slackloud

https://github.com/matsu-chara/slackloud/blob/master/example/example.png?raw=true

slackloudとは

slackのトークンさえあれば ./run.sh "#作成対象チャンネル" --post "#画像ポストチャンネル" でワードクラウドの生成、画像投稿までやってくれる便利なやつです。

もちろん会社スラックのワードクラウドを気軽にツイッターに挙げると情報漏えいになりかねないのでご注意ください( ◜◡‾)(サンプルは個人スラックからとってきました)

pythonでそれっぽいライブラリがあったのでやっつけで組み合わせました。

このへんの環境は全部dockerでまとめて、dockerさえあれば(pyenvやvirtualenv無しで) docker runで動かせるようにしました。 docker imageが数ギガあるのが厳しいところです。apt-get cleanくらいはしましたが、あんまり効果がなく・・。

docker history matsuchara/slackloud
IMAGE               CREATED             CREATED BY                                      SIZE                COMMENT
00451faa7bdb        33 minutes ago      /bin/sh -c #(nop)  ENTRYPOINT ["python3" "/ap   0 B
a2cfb84c4f5c        33 minutes ago      /bin/sh -c #(nop)  ENV PYTHONIOENCODING=utf-8   0 B
a5358f81c054        33 minutes ago      /bin/sh -c pip3 install -r requirements.txt     304.9 MB
5851c0ac5ac8        38 minutes ago      /bin/sh -c #(nop) COPY file:3d429e7d497f6ee60   98 B
f2e0d38a8f2a        38 minutes ago      /bin/sh -c apt-get clean && rm -rf /var/lib/a   0 B
c30f3e769c63        38 minutes ago      /bin/sh -c curl -s https://bootstrap.pypa.io/   12.7 MB
3255a7e853ef        38 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   195.9 MB
da904280048c        41 minutes ago      /bin/sh -c ./mecab-ipadic-neologd/bin/install   2.156 GB
22bfc03e65da        42 minutes ago      /bin/sh -c git clone --depth 1 https://github   102.4 MB
1625bd475980        43 minutes ago      /bin/sh -c #(nop)  WORKDIR /app                 0 B
e75ea79a36d8        43 minutes ago      /bin/sh -c update-ca-certificates               274.3 kB
46eaafa43a60        43 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   324.4 MB
ab7270f2e2e0        44 minutes ago      /bin/sh -c apt-get -y update                    21.88 MB
8d816e6fd6ac        45 minutes ago      /bin/sh -c #(nop)  MAINTAINER matsu_chara<mat   0 B
38c759202e30        3 weeks ago         /bin/sh -c #(nop) CMD ["/bin/bash"]             0 B
<missing>           3 weeks ago         /bin/sh -c sed -i 's/^#\s*\(deb.*universe\)$/   1.895 kB
<missing>           3 weeks ago         /bin/sh -c rm -rf /var/lib/apt/lists/*          0 B
<missing>           3 weeks ago         /bin/sh -c set -xe   && echo '#!/bin/sh' > /u   8.841 MB
<missing>           3 weeks ago

以上のようにイメージサイズを見ることが出来ますが、これをみると mecab-ipadic-neologd が大半のようです。mecab周辺はあまり知らないのですがmecabや辞書の役割を考えるとここの部分は仕方なさそうですね(;´Д`)