Kafkaのproducerのrequest.timeout.msはどのように作用するか

気になったのでコードを追ったりしてみました。備忘録けんメモ書きです。

そもそもtimeoutがどうやって送信されてどうやってbrokerで受信されているのかが追いにくいのですが、

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L32 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/api/ProducerRequest.scala#L31 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L32-L40 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L96-L100

RequestHeader.java+ProduceRequest.javaと考えてProtocol.javaに書いてあるSchemaと、ProducerRequest.readfromと照らし合わせると帳尻が合いそうです。

つまりProducerRequest.ackTimeoutMs(コンストラクタ第二引数)を追えばよさそうで、追っていくと request.timeout.ms になりそうです。

this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L256

this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L281-L290

requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L318

ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L333

以上からProducerRequest(サーバー側のコード)に渡ってくるタイムアウトrequest.timeout.msになりそうです。

此処から先は処理をきちんとおえてないのですがgrepする限り def handleProducerRequest(request: RequestChannel.Request) {...} が処理を担当していそうです。

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L298

こいつは最終的に rplicaManager.appendMessages にtimeout値を渡しています。 appendMessagesは以下にあります。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L314

appendされたメッセージはリクエスト煉獄に叩きこまれます。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L343

これが実際にタイムアウトする処理はスケジュール機構なども含むので結構複雑ですが、 かなり紆余曲折があって、以下でタイムアウトします。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/TimingWheel.scala#L131-L133

そのあと、taskExecutorにsubmitされて、runが呼ばれます。

// Already expired or cancelled
if (!timerTaskEntry.cancelled)
  taskExecutor.submit(timerTaskEntry.timerTask)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/Timer.scala#L93-L94

runは forceComplete を呼ぶようになっています。(runが呼ばれるとタイムアウトしたかキャンセルしたかみたいな扱いになっていそうです。)

/*
 * run() method defines a task that is executed on timeout
 */
override def run(): Unit = {
  if (forceComplete())
    onExpiration()
}

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedOperation.scala#L105-L108

forceComplete経由で呼ばれるonCompleteでcallbackが呼ばれて、ReplicaManager#L343にもどります。

val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L123

この際statusを変更する処理などがありませんが、statusの初期値がErrors.REQUEST_TIMED_OUT になっているので不要そうです。

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L64

ということでrequest.timeout.msを短くするとack待ち時間が短くなるようです。(色々な要素を省略したので本当はもう少し色々なところでタイムアウトしたりしています・・。)

ドキュメントを見ると request.timeout.ms にはackのことが書いてないんですが

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhauste

timeout.ms には書いてあります。

The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.

そしてtimeout.msはdprecatedでREQUEST_TIMEOUT_MS_CONFIGにしろと書いてあるので役割が変わって無ければ同じ説明と認識して良さそうです。

一方で、producerのclientにも同じパラメータがわたっててそっちで能動的に切断するケースも有ります。 NetworkClientに入っているrequest.timeout.ms値がその辺の機能です。 Sender.sendは最終的にclient.sendをよぶけどその正体がこれです。(KafkaProducerでパラメータと一緒に初期化)

そいつでもhandleTimeoutしていてタイムアウトするとdisconnected=trueをレスポンスに突っ込みます。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L408-L414

その後、 Sender.handleProduceResponseNETWORK_EXCEPTIONになります。 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L260-L265

このフローだとエラーログがtraceやdebugじゃないと出なくて、例外をログにだしても The server disconnected before a response was received. というどこで何が何故切断されたのかみたいな情報が皆無なログが出るだけでつらい感じになります(;´Д`)

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L104