Kafkaの旧consumerでいうconsumer.idは新consumerだと何なのか

consumerが復数同時に同じpartitionにつないだときに、どのクライアントが担当するかを意図的に決定させるためにconsumer.idを設定していたんですが、新consumerでconsumer.id設定できるところがみつからないなーと思ったので、Kafkaコードを微妙に追いつつ探ってみました。備忘録がてらのメモ書きです。

どうも旧consumerでいうところのconsumer.idは新consumerでいうとclient.idらしいです。partition assignにはmemberIdが指定されるので厳密には違うっぽいですが、client.idを元に計算されるようです。(多分)

まずmemberIdは旧consumerのときは(consumer.idとして)自分から名乗っていたけど、新consumerでは最初は無名で、brokerに名付けられたら次からはその名前を名乗るようです。

if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
  // if the member id is unknown, register the member to the group
  addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
} else ...

で、このmemberIdはどうやら以下のように決まるようです。

// use the client-id with a random id suffix as the member-id
val memberId = clientId + "-" + group.generateMemberIdSuffix

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L588-L589

generateMemberIdSuffix は以下のように決まっています。

// TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupMetadata.scala#L170 randomにするかどうか迷っているようですが、予測可能だと勝手に偽物consumerが繋いできたりとかあるんでしょうか・・?

で、肝心のclientIdはというと handleJoinGroup の第三引数にありそうです。

def handleJoinGroup(groupId: String,
                      memberId: String,
                      clientId: String,
                      clientHost: String,
                      sessionTimeoutMs: Int,
                      protocolType: String,
                      protocols: List[(String, Array[Byte])],
                      responseCallback: JoinCallback) { ... }

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala#L106-L108

このメソッドの呼び出し元をみると request.header.clientId とあるので、headerに入っていそうです。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L796

RequestHeaderのクラスを見るとどうも struct.set(CLIENT_ID_FIELD, client); とあるので ここで String client とされているものが client_id っぽいとみてよさそうです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java#L47-L61

headerはConsumerNetworkClientでつくられています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L101

ここでいう client.nextRequestHeader(api);clientは(ややこしいですが)KafkaClientのようです。そして実際には NetworkClient が使われているようです。

public class NetworkClient implements KafkaClient { ...

    public NetworkClient(Selectable selector,
                         Metadata metadata,
                         String clientId, ...) { ... }
    ...
}

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L91-L93

NetworkClientのインスタンス生成やclientIdの生成は以下で行われています。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L554

これをみると、 clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); とあるので、 旧consumer.idのような役目はclient.Idとして設定可能になり、かつそのまま使われいてるわけではなく自分から名乗るのか・brokerが名付けるのかや、後ろにbrokerが何かsuffixを付けるのかなどが変更されていそうです。

partition assignment strategy周りも少し変更されていて、旧consumerだと、このAssignmentContextをつかっていますが、 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/consumer/PartitionAssignor.scala

新consumerではRangeAssignorといったAbstractPartitionAssignorが担当しているようです。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java#L71-L72

新consumerの話に戻って、client.idのデフォルト値です。 デフォルト値は "" だが、無いと consumer-CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement() になるようです。

staticだから同じプロセス内じゃないとclientId変わらないからクラスタ全体だとidentifiyできなさそうだけどドキュメントにもtracking用と書いてあるからいいんでしょうか・・。 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L539-L541

そして、ここまで読んだ後に以下のドキュメントを見つけました。

Kafka Client-side Assignment Proposal - Apache Kafka - Apache Software Foundation

  1. Does the consumer needs to provide the member-id through configs? Also is the consumerId in assign() the same as memberId? Currently the consumer-id / member-id is assigned at the coordinator side. The memberId is used exactly the same as consumerId. I just renamed it for the more general usage.

先に出てきて欲しかった(´;ω;`)