consumerが復数同時に同じpartitionにつないだときに、どのクライアントが担当するかを意図的に決定させるためにconsumer.idを設定していたんですが、新consumerでconsumer.id設定できるところがみつからないなーと思ったので、Kafkaコードを微妙に追いつつ探ってみました。備忘録がてらのメモ書きです。
どうも旧consumerでいうところのconsumer.idは新consumerでいうとclient.idらしいです。partition assignにはmemberIdが指定されるので厳密には違うっぽいですが、client.idを元に計算されるようです。(多分)
まずmemberIdは旧consumerのときは(consumer.idとして)自分から名乗っていたけど、新consumerでは最初は無名で、brokerに名付けられたら次からはその名前を名乗るようです。
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // if the member id is unknown, register the member to the group addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else ...
で、このmemberIdはどうやら以下のように決まるようです。
// use the client-id with a random id suffix as the member-id val memberId = clientId + "-" + group.generateMemberIdSuffix
generateMemberIdSuffix
は以下のように決まっています。
// TODO: decide if ids should be predictable or random def generateMemberIdSuffix = UUID.randomUUID().toString
https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L170 randomにするかどうか迷っているようですが、予測可能だと勝手に偽物consumerが繋いできたりとかあるんでしょうか・・?
で、肝心のclientIdはというと handleJoinGroup
の第三引数にありそうです。
def handleJoinGroup(groupId: String, memberId: String, clientId: String, clientHost: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback) { ... }
このメソッドの呼び出し元をみると request.header.clientId
とあるので、headerに入っていそうです。
https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L796
RequestHeaderのクラスを見るとどうも struct.set(CLIENT_ID_FIELD, client);
とあるので
ここで String client
とされているものが client_id
っぽいとみてよさそうです。
https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java#L47-L61
headerはConsumerNetworkClient
でつくられています。
https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L101
ここでいう client.nextRequestHeader(api);
のclient
は(ややこしいですが)KafkaClient
のようです。そして実際には NetworkClient
が使われているようです。
public class NetworkClient implements KafkaClient { ... public NetworkClient(Selectable selector, Metadata metadata, String clientId, ...) { ... } ... }
NetworkClientのインスタンス生成やclientIdの生成は以下で行われています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L554
これをみると、 clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
とあるので、
旧consumer.idのような役目はclient.Idとして設定可能になり、かつそのまま使われいてるわけではなく自分から名乗るのか・brokerが名付けるのかや、後ろにbrokerが何かsuffixを付けるのかなどが変更されていそうです。
partition assignment strategy周りも少し変更されていて、旧consumerだと、このAssignmentContextをつかっていますが、 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
新consumerではRangeAssignor
といったAbstractPartitionAssignor
が担当しているようです。
https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L71-L72
新consumerの話に戻って、client.idのデフォルト値です。 デフォルト値は "" だが、無いと consumer-CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() になるようです。
staticだから同じプロセス内じゃないとclientId変わらないからクラスタ全体だとidentifiyできなさそうだけどドキュメントにもtracking用と書いてあるからいいんでしょうか・・。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L541
そして、ここまで読んだ後に以下のドキュメントを見つけました。
Kafka Client-side Assignment Proposal - Apache Kafka - Apache Software Foundation
- Does the consumer needs to provide the member-id through configs? Also is the consumerId in assign() the same as memberId? Currently the consumer-id / member-id is assigned at the coordinator side. The memberId is used exactly the same as consumerId. I just renamed it for the more general usage.
先に出てきて欲しかった(´;ω;`)