Kafkaの旧consumerでいうconsumer.idは新consumerだと何なのか

consumerが復数同時に同じpartitionにつないだときに、どのクライアントが担当するかを意図的に決定させるためにconsumer.idを設定していたんですが、新consumerでconsumer.id設定できるところがみつからないなーと思ったので、Kafkaコードを微妙に追いつつ探ってみました。備忘録がてらのメモ書きです。

どうも旧consumerでいうところのconsumer.idは新consumerでいうとclient.idらしいです。partition assignにはmemberIdが指定されるので厳密には違うっぽいですが、client.idを元に計算されるようです。(多分)

まずmemberIdは旧consumerのときは(consumer.idとして)自分から名乗っていたけど、新consumerでは最初は無名で、brokerに名付けられたら次からはその名前を名乗るようです。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  // if the member id is unknown, register the member to the group
  addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else ...

で、このmemberIdはどうやら以下のように決まるようです。

// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L588-L589

generateMemberIdSuffix は以下のように決まっています。

// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L170 randomにするかどうか迷っているようですが、予測可能だと勝手に偽物consumerが繋いできたりとかあるんでしょうか・・?

で、肝心のclientIdはというと handleJoinGroup の第三引数にありそうです。

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) { ... }

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L106-L108

このメソッドの呼び出し元をみると request.header.clientId とあるので、headerに入っていそうです。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L796

RequestHeaderのクラスを見るとどうも struct.set(CLIENT_ID_FIELD, client); とあるので ここで String client とされているものが client_id っぽいとみてよさそうです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java#L47-L61

headerはConsumerNetworkClientでつくられています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L101

ここでいう client.nextRequestHeader(api);clientは(ややこしいですが)KafkaClientのようです。そして実際には NetworkClient が使われているようです。

public class NetworkClient implements KafkaClient { ...

    public NetworkClient(Selectable selector,
                         Metadata metadata,
                         String clientId, ...) { ... }
    ...
}

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L91-L93

NetworkClientのインスタンス生成やclientIdの生成は以下で行われています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L554

これをみると、 clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); とあるので、 旧consumer.idのような役目はclient.Idとして設定可能になり、かつそのまま使われいてるわけではなく自分から名乗るのか・brokerが名付けるのかや、後ろにbrokerが何かsuffixを付けるのかなどが変更されていそうです。

partition assignment strategy周りも少し変更されていて、旧consumerだと、このAssignmentContextをつかっていますが、 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

新consumerではRangeAssignorといったAbstractPartitionAssignorが担当しているようです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L71-L72

新consumerの話に戻って、client.idのデフォルト値です。 デフォルト値は "" だが、無いと consumer-CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() になるようです。

staticだから同じプロセス内じゃないとclientId変わらないからクラスタ全体だとidentifiyできなさそうだけどドキュメントにもtracking用と書いてあるからいいんでしょうか・・。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L541

そして、ここまで読んだ後に以下のドキュメントを見つけました。

Kafka Client-side Assignment Proposal - Apache Kafka - Apache Software Foundation

  1. Does the consumer needs to provide the member-id through configs? Also is the consumerId in assign() the same as memberId? Currently the consumer-id / member-id is assigned at the coordinator side. The memberId is used exactly the same as consumerId. I just renamed it for the more general usage.

先に出てきて欲しかった(´;ω;`)

slackチャンネルのワードクラウドを生成するslackloudを作った

アイコンを変えました₍₍ (ง´・_・`)ว ⁾⁾

そして作りました。 GitHub - matsu-chara/slackloud

https://github.com/matsu-chara/slackloud/blob/master/example/example.png?raw=true

slackloudとは

slackのトークンさえあれば ./run.sh "#作成対象チャンネル" --post "#画像ポストチャンネル" でワードクラウドの生成、画像投稿までやってくれる便利なやつです。

もちろん会社スラックのワードクラウドを気軽にツイッターに挙げると情報漏えいになりかねないのでご注意ください( ◜◡‾)(サンプルは個人スラックからとってきました)

pythonでそれっぽいライブラリがあったのでやっつけで組み合わせました。

このへんの環境は全部dockerでまとめて、dockerさえあれば(pyenvやvirtualenv無しで) docker runで動かせるようにしました。 docker imageが数ギガあるのが厳しいところです。apt-get cleanくらいはしましたが、あんまり効果がなく・・。

docker history matsuchara/slackloud
IMAGE               CREATED             CREATED BY                                      SIZE                COMMENT
00451faa7bdb        33 minutes ago      /bin/sh -c #(nop)  ENTRYPOINT ["python3" "/ap   0 B
a2cfb84c4f5c        33 minutes ago      /bin/sh -c #(nop)  ENV PYTHONIOENCODING=utf-8   0 B
a5358f81c054        33 minutes ago      /bin/sh -c pip3 install -r requirements.txt     304.9 MB
5851c0ac5ac8        38 minutes ago      /bin/sh -c #(nop) COPY file:3d429e7d497f6ee60   98 B
f2e0d38a8f2a        38 minutes ago      /bin/sh -c apt-get clean && rm -rf /var/lib/a   0 B
c30f3e769c63        38 minutes ago      /bin/sh -c curl -s https://bootstrap.pypa.io/   12.7 MB
3255a7e853ef        38 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   195.9 MB
da904280048c        41 minutes ago      /bin/sh -c ./mecab-ipadic-neologd/bin/install   2.156 GB
22bfc03e65da        42 minutes ago      /bin/sh -c git clone --depth 1 https://github   102.4 MB
1625bd475980        43 minutes ago      /bin/sh -c #(nop)  WORKDIR /app                 0 B
e75ea79a36d8        43 minutes ago      /bin/sh -c update-ca-certificates               274.3 kB
46eaafa43a60        43 minutes ago      /bin/sh -c apt-get -y --no-install-recommends   324.4 MB
ab7270f2e2e0        44 minutes ago      /bin/sh -c apt-get -y update                    21.88 MB
8d816e6fd6ac        45 minutes ago      /bin/sh -c #(nop)  MAINTAINER matsu_chara<mat   0 B
38c759202e30        3 weeks ago         /bin/sh -c #(nop) CMD ["/bin/bash"]             0 B
<missing>           3 weeks ago         /bin/sh -c sed -i 's/^#\s*\(deb.*universe\)$/   1.895 kB
<missing>           3 weeks ago         /bin/sh -c rm -rf /var/lib/apt/lists/*          0 B
<missing>           3 weeks ago         /bin/sh -c set -xe   && echo '#!/bin/sh' > /u   8.841 MB
<missing>           3 weeks ago

以上のようにイメージサイズを見ることが出来ますが、これをみると mecab-ipadic-neologd が大半のようです。mecab周辺はあまり知らないのですがmecabや辞書の役割を考えるとここの部分は仕方なさそうですね(;´Д`)

iterm2 version3.0でssh-host-colorが`44:52: syntax error: end of line~~~`みたいなエラー出すのを直した

細かいものを書いてブログを延命するエブリデイ₍₍ (ง´・_・`)ว ⁾⁾

前置き

iterm2 3.0のshell integrationが結構よくて、

  • 時間がかかるコマンドを実行しちゃった後に、あっ終わったら通知して欲しかった・・・ってときにもCmd-Opt-Aで後付アラートしかけられたり
  • 成功失敗が▷マークで表示されて、失敗されていた場合は右クリックでステータスコードが見れたり
  • ファイルドロップすると雑にscpでアップロードできたり
  • ファイル右クリックで雑にscpでダウンロードできたり

みたいなことが出来るようです。 Shell Integration - Documentation - iTerm2 - Mac OS Terminal Replacement

shell integrationは最初聞いた時うーんと思っていたんですが色々やってくれるのでよさそうです。特にアラートが良いです。 ステータスコード表示は自前でプロンプトに表示していたのから切り替えました。

ssh-host-color

iterm2 version3.0には接続先username@hostnameに応じてプロファイルを切り替える(ことで背景色を変える)ような機能があるんですが、ワイルドカード指定しかなくてどうも正規表現でマッチさせるみたいなことはできないようです。

なので、以前から使っていたssh-host-colorを続投することにしたんですが、sshしてexitすると毎回44:52: syntax error: end of line があるべきところですが identifier が見つかりました。 (-2741)と言われるので直しました。

gist.github.com

使い方は変わらずgistのコメントに従えばよい感じです。

markdown内のjsonコードブロックを拾ってjsonlintかけてくれる君を雑に作った

markdownjson codeblockにAPIレスポンスの仕様とか書いてるけど何か微妙にずれたりしていてワーってなるときに備えて作りました。 codeblockを抜き出してきてjsonlintを叩きまくる仕様です。 実装が雑だけど、困ってないしあんまり継続的にはメンテしないだろうなという気持ちを受けて experimental-markdown-json-lint という名前です。

GitHub - matsu-chara/experimental-markdown-json-lint

www.npmjs.com

以下の様なsample.mdに対して

this is a chapter

here text

aaa

{ "a": 1 }
<?php
echo 'ok';
{ "b: 'a' }
{ "b": 2 }
{ "x: 2 }

yeah

以下の様な結果を返します。

sample.md
Parse error on line 1:
{ "b: 'a' }
--^
Expecting 'STRING', '}', got 'undefined'
Parse error on line 1:
{ "x: 2 }
--^
Expecting 'STRING', '}', got 'undefined'

file-glob対応しているので **/*.md もいけます。

Slackの勢いをグラフにしてくれる君を作った

github.com

https://raw.githubusercontent.com/matsu-chara/slack_ikioi_png/images/all.png

分報チャンネルが社内で話題なので流量を可視化することで仕事の集中度となんかリンクしてるのかなーと思って雑に作りました。 22~0時とかに山がありますがその辺は仕事してるのではなくて帰宅後雑談タイムだなーとかが分かります。

ruby力がなさすぎてrubocopに怒られまくった(;´Д`)