typelevel/cats を 2.10 から 2.11にアップデートする際の注意点 (traverse編)

cats を最新にアップデートする際にハマった点を書きます*1

何が変わったのか

https://github.com/typelevel/cats/pull/4498 により List(1,2,3).traverse(i => IO(i)) の実装が List(1,2,3).map(i => IO(i)).sequence と同じ評価順になりました。(ちょっと雑な解説なので詳しくはPRを参照)

どう違うのか

  • before: 先頭要素のIOを作成 => 評価 => エラーじゃなければ次要素のIOを作成 =>評価
  • after: 各要素のIOを作成 => 先頭を評価 => エラーじゃなければ次要素のIOを評価

何が違うのか

IOインスタンスを作成する際に副作用が入っていると挙動が微妙に変わります。

副作用がなければ全く同じ挙動。IO作成時に副作用起こす人なんて居ないですよね!

困る例1: mockのcall countは副作用だよ編

以下はmockitoを使っている場合の例です

// test
when(barMock.run(any)).thenReturn(IO.raiseError(...)) // mockでエラーを返す異常系のテスト

// main
fooList.traverse  { foo =>
  for {
     result <- barMock.run(foo) <----- ここは今まで1回しか呼ばれなかったけどN回呼ばれる。(コールをカウントするのは副作用)
                                       なお1回目のrunの返り値はIO(エラー) なので、mock的にはN回呼ばれてIOインスタンスがN個作成されるものの2番目以降のIOインスタンスは実際には評価されない(中のIOが実行されることはない)
     ...
  } yield result
}

ちなみに今回の事象は traverse内の先頭のIO作成が副作用をもつ場合(今回はmockによるコールカウント)でのみ起こります。(for式の先頭IOしか作成されないので、mockがfor式の2番目とかであればflatMapにつつまれてIOインスタンスの作成が先頭IOの評価後に延期されるため挙動は今までと変わらない)

回避例

fooList.traverse  { foo =>
  for {
     _      <- IO.unit          <-- このインスタンスがN個作られるだけでその次の行は呼ばれない
     result <- barMock.run(foo) <----- List1要素目のIOチェーンを評価したタイミングでエラーになるため、2要素目のIOチェーンはインスタンスは作成されるものの評価されることはない = barMock.runがN回呼ばれることはない
     ...
  } yield result
}

無駄に IO.unit を入れるのは微妙なのでログを追加するのもいいでしょう。(なおそのログを消すと何故かテストが落ちる爆弾になるのでコメント必須) 現実的にはMockの検証を緩めて上げるのもありなのではと思います。

困る例2: 連番idのgenerateは副作用だよ編

今度はこちら側の書き方が悪かった例です。

// test
val ids = (1 to 100)
when(mockBarIdGenerator.generate()).thenReturn(ids.head, ids.tail: _*) // 呼ばれるたびに連番を返すようmock設定

// main
fooList.traverse { foo =>
  for {
    bar <- createBarIO(barIdGenerator.generate())          // A  <-- IO[Bar]のインスタンスがN回呼ばれるようになった。当然引数が評価されるので `barIdGenerator.generate()` がN回呼ばれる 。
    ...
    barId <- barIdGenerator.generate()                     // B <- 昔は A => B => A => B... と呼ばれていた。update後は A => A => ... => B => B => ... となる
    ...
  } yield bar

回避例

そもそも状態持つなら barIdGeneartor.generate の返り値の型を BarId ではなく IO[BarId] にすべきで、それをしていないのが敗因(例外が出ないとサボりがち...)

fooList.traverse { foo
  for {
    barId <- IO.delay(barIdGenerator.generate())       <-- IO.delayは `=> A` を受け取るので引数が評価されるのはIO作成時ではなく評価時。 
    bar    <- createBarIO(barId)                       <-- これで挙動は今までと同じになる
    ...
  } yield bar

困る例というか直しているときにハマった例

困る例1を直しているときに

fooList.traverse  { foo =>
     IO.unit *> barMock.run(foo)
}

のように修正したのですが、def *>[B](another: IO[B]): IO[B] なので何の意味もなかったです。(IOインスタンスが作成されないようにしたいのに引数評価時に作成されてしまう) 普通はIOインスタンスが作成されたところで副作用なんてないに決まっているのだからそこまで気にしなくて良いところではあります。 IO.defer(barMock.run(foo)) とかしてあげても良いと思いますが、やはりtestのためにmain側に手を加える(しかも一件するとdeferする意味がわかりにくい)のが微妙なようなありなような、、というところです

終わりに

いまさらMay 28, 2024のバージョン(Release v2.11.0 · typelevel/cats · GitHub) に上げとるんかいなというツッコミどころはありますが誰かの役に立てば幸いです。

*1:ハマったのは2.10から2.11の変更点なので執筆時点の最新2.13の問題ではありません

Amazon Auroraの論文(Amazon Aurora: Design Considerations for High Throughput Cloud-Native Relational Databases)輪読会 in FOLIOを開催した

Amazon Aurora: Design Considerations for High Throughput Cloud-Native Relational Databasesの輪読会を開催したのでレポブログです。

勉強会の概要

FOLIOでも使っているAmazon Auroraについてより詳しく知るために、SIGMOD2017で発表されたAmazon Auroraアーキテクチャについての論文を社内勉強会で読みました。

参加人数は12名で論文自体は10ページほどで全6回で終わりました。

  • 論文を読む
    • 事前の読み込みはなし
    • 日本語訳でOK
    • AIも活用(notebookLMとか)ただし誰でも分かるような要約を生成してそれだけ読むのはNG(原文のレベルで理解する)

こぼれ話パートについて

毎回論文パートに入る前に論文の背景知識となる部分のこぼれ話をしました

第1回: 論文が提出されてる学会について

SIGMODってなに?とかVLDB(= Very Large Data Bases)っていうカンファレンスもあるよねとか、Spanner論文はOSDIから出てるよねといった話をしました。

第2回: quorumについて

auroraはquorumで読み書きするのでまずはquorumについて解説しました。 このあたりは Amazon Aurora のアーキテクチャまとめ - おくみん公式ブログ を大変参考にさせていただきました。

第3回: block, page, bufferについて

論文ではauroraのストレージ周りの仕組みや内部構造についての説明に触れることになりますが、その前提知識となるblockやpage, bufferについて説明しました。 ハードウェア側のセクタサイズ、ファイルシステム側のブロックサイズ、DB側のページサイズなど似てる概念があるよといった話や 例えばセクタサイズとブロックサイズがずれるとどうなる? (セクターサイズとクラスタサイズ(ブロックサイズ)の違い, 不良セクタとfsck/chkdsk について | SEの道標)といった話もしました。

また mysqlのDouble writer bufferやtorn writeの話もしました。 REDOログがあれば復旧できるのでは? => REDOログはレコード単位だけどtorn writeはページ単位の話だよ(acid - Why does MySQL need double-write buffer if there is redo log and undo log? - Database Administrators Stack Exchange)=> 二回書いたら性能半分になるのでは? => シーケンシャルライトだから性能は落ちるけど倍にはならないよ(MySQL :: MySQL 8.0 リファレンスマニュアル :: 15.6.4 二重書き込みバッファー, Percona legacy documentation) といったトピックにも触れられたのは良かったのかと思いました。

最後にExt4のBigAllocを使うとtorn write発生しないからdouble write bufferはOFFにできるよといった話もしたのですが、冷静に考えてAurora使ってるんだし別に知らなくて良かったのでは・・と話してて思いました。(もう解説してしまったから時すでに遅し)

第4回: WAL, UNDOログ, REDOログについて

AuroraのキモといえばREDOログ転送ですが(諸説あるり?)、それについて知るためにはそもそもREDOログって何?という話を知っている必要があるので解説しました。

crash recoveryに欠かせない諸概念について実際クラッシュした際にどうするかについて概要を説明しました。

第5回: concurrency control, トランザクション分離レベル, MVCCについて

若干趣味に走っていますが一応論文でもトランザクション分離レベルについて語られていたので解説しました。

two phase lockで説明するトランザクション分離レベルについて最強データベース講義 を参考に説明したり、

ANSI/ISO SQL標準で定められている4つの分離レベルだけが分離レベルではないといった話を Jepsen: MySQL 8.0.34を参考に話したり、

MVCCの概要とともにMVCCとガベージコレクションについて InnoDBのMVCCのガベージコレクションについて - shallowな暮らし を参考に話したりしました。(本当は Achieve a high-speed InnoDB purge on Amazon RDS for MySQL and Amazon Aurora MySQL | AWS Database Blog も話したかったんですがさすがに時間がかかりすぎたので割愛)

第6回: DBについて詳しく知る方法

最後はDBについて詳しく知る方法について話しました。

終わりに

忙しい業務の合間をぬって参加してくださった皆様に感謝🙏

また次の勉強会を企画したいなと思っています。

Argon2とJVMとK8Sの組み合わせでおきたOoM について

はじめに

argon2-jvmでOoMが発生した問題と戦ったのでメモです。

まとめ

  • argon2-jvm は JNA (Java Native Access)を介して(オフヒープの)メモリ確保を行う
  • Kubernetesでのメモリクォータ設定時に上記を考慮しないと Out of Memoryを引き起こす可能性がある
  • argon2やk8sJVMの設定をチューニングを実施することで解決できる

発生した問題

  • とあるサーバーのAPIがたまに504 Gateway Timeoutを返す
    • 本番リリース前のQA環境(= ごく軽い負荷)での症状
      • 原因は不明だが解決しないと本番ではまともに動かないと想定される状況
  • 調査したところpodがkillされている(exit code = 137)イベントが存在する
  • 認証用のapiを叩くと即時ではないものの再現するためこの周辺が疑わしい

前提知識

今回の話ではargon2, k8s, jvmといったいくつか前提となる概念があるのでまずそちらについて解説します。

Argon2とは

Argon2は、2015年のパスワードハッシュ競技会で優勝したhash関数です。 Argon2にはいくつかのバリエーションがあります。

  • Argon2d: GPUによる攻撃に強い
  • Argon2i: サイドチャネル攻撃に強い
  • Argon2id: 上記2つのハイブリッドで、両方の攻撃に耐性がある

詳細について気になる人は Argon2とは - マネックスクリプトバンク株式会社 をご参照ください。 Argon2の特徴としてメモリハードな設計が挙げられます。計算には大量のメモリが必要なので攻撃者が並列処理で大量に攻撃するのを困難にしているようです。 また、実行時間、メモリ使用量、並列度などのパラメータをカスタマイズ可能で、セキュリティとパフォーマンスのバランスを調整できるのも利点です。

Argon2-jvmについて

今回利用していたライブラリはArgon2-jvmです。 Argon2-jvmはArgon2パスワードハッシュアルゴリズムJavaバインディングで、Java Native Access (JNA)を使用してネイティブCライブラリを呼び出しています。 なおargon2-jvmには以下のバリエーションがあります。

  • argon2-jvm-nolibs: パッケージマネージャーでArgon2がインストールされているシステムを前提とする
  • argon2-jvm: 様々なOSとアーキテクチャ用のpre-compiledされたArgon2ライブラリを含むバージョン

使用方法は簡単で、パスワードのハッシュ化とハッシュの検証のためのメソッドが提供されています。

以下は GitHub - phxql/argon2-jvm: Argon2 Binding for the JVM の例です。

Argon2 argon2 = Argon2Factory.create();

char[] password = readPasswordFromUser();

try {
    // Hash password
    String hash = argon2.hash(10, 65536, 1, password);

    // Verify password
    if (argon2.verify(hash, password)) {
        // Hash matches password
    } else {
        // Hash doesn't match password
    }
} finally {
    // Wipe confidential data
    argon2.wipeArray(password);
}

備考:その他の選択肢

argon2-jvmはネイティブライブラリに依存しているので、それを避けたい場合はSpring SecurityのArgon2PasswordEncoderが使えそうです。(Bouncy Castleの純粋なJava実装を使用しているとのこと。詳細は GitHub - phxql/argon2-jvm: Argon2 Binding for the JVM )

またargon2以外のハッシュ関数(PBKDF2など)を使いたい場合はOWASP Cheat Sheetを参照するとよいでしょう。

KubernetesのOoMについて

Kubernetes環境でOut of Memory (OOM) Killerによりポッドが終了した場合、ポッドのステータスが OOMKilled となり、ExitCodeが 137になります 。(余談ですがlinux上のプロセスがシグナルによって終了した場合、ExitCodeは128を引くと元のsignalが分かります。例. 137-128=9 (SIGKILL), 143-128=15(SIGTERM))

通常、JVMでOoMになる場合はJVMのヒープが最大値を超えたことによるOoMエラーがでる(java.lang.OutOfMemoryError: Java heap space等)ことが多いと思いますが、上記の場合はJVMがクラッシュしたのではなくk8sのOOM killerによりSIGKILLが送信されてJVMが終了しています。

このあたりについては Understanding Kubernetes Exit Code 137: A Silent-killer of Java apps | by Daniel Louvier | mossfinance | Medium が参考になります。

JVM実行時にマシンが使うメモリについて

JVMが動いているマシン上で消費される可能性があるメモリはいくつかありますが大きく分けてヒープメモリとオフヒープメモリに分けられます。

  • ヒープメモリ
    • Javaオブジェクトなどが格納される領域
  • オフヒープメモリ
    • オフヒープ以外のメモリ領域

※ 図などは 5.5. JVM ヒープおよびオフヒープメモリー | Data Grid キャッシュの設定 | Red Hat Data Grid | 8.3 | Red Hat Documentation を見ると想像しやすいと思います。

オフヒープメモリの例としては以下が挙げられます。

  • GCが使う領域
  • スレッドスタックのための領域
  • メタスペース
  • コードキャッシュ
    • JITコンパイラによって生成されたネイティブコードを格納するための領域
  • ダイレクトバッファが利用するメモリ
    • java.nio. ByteBuffer.allocateDirectで作成するオフヒープ領域
    • MaxDirectMemorySizeで上限を指定できる
  • JNI/JNAで呼び出したライブラリが利用するメモリ
    • C言語のライブラリなどが内部でmallocを呼び出す場合等で利用される

オフヒープメモリのトラッキングについて

オフヒープメモリのトラッキングはヒープメモリと比べると難しいですが不可能なわけではありません。

NativeMemoryTracking

JVMが把握しているネイティブメモリーは -XX:NativeMemoryTracking=detail を指定することで  jcmd $pid VM.native_memory によって確認可能です。これによりGCやスレッドスタック、メタスペースなどがトレース可能ですが一方で全てのネイティブメモリーがトラックできるわけではありません

参考

DirectBuffer

DirectBufferを使うとアプリケーションでネイティブ・メモリーのブロックを割り当てることができます。 これは上記の NativeMemoryTracking では確認できませんが DirectBuffer の利用量は java.nio.BitsRESERVED_MEMORY フィールドをリフレクションで取得することで確認可能なようです。(下記記事では sun.misc.SharedSecrets.getJavaNioAccess().getDirectBufferPool().getMemoryUsed()sun.misc.VM.maxDirectMemory() も使えるとの記載がありましたがいずれも未確認です。)

参考

JNI/JNAで呼び出したライブラリが利用するメモリ

この辺になるとJVMが知ってるはずもなく・・という状況です。 しかし手がないわけではなく呼び出し先のライブラリのmallocをトレースすることが可能なようです。

特にjemallocはどこの関数からmallocされてるかをサンプリングしたプロファイリングが出せるので LD_PRELOAD を使って普通のmalloc(GNU malloc)を置き換えつつ、 MALLOC_CONF を設定するとプロファイル結果が出せるようです。このプロファイルは jeprof で可視化可能なようです。

詳細は以下を御覧ください。(1つ目の記事だとmallocで困っていた問題がjemallocに差し替えたことで解決してしまったらしいので、差し替える場合は挙動が変わる可能性にはご留意ください。)

調査結果

調査

  • JVM側で出るヒープのOoMではなくK8SのOoM Killerにpodがkillされている
    • Event として以下が出ている
      • Task ... ran out of memory
      • ... 1 oom 1 die 1 create 1 start on ...
    • 通常出てくる java.lang.OutOfMemoryError: Java heap space というエラーが出ていない
    • Exit Codeが137なのでsigkillを受け取っていると思われる

仮説への道筋

  • 今回の問題の場合、ヒープのOoMではなさそうなのでオフヒープ周りが疑われる状況
    • 特に疑わしいライブラリなどに心当たりがない場合はダイレクトバッファ周りが気になるところ
    • メトリクスを見に行っても良いが、今回は認証apiを呼ぶと落ちやすいことが分かっていたのでまずはそのあたりを調査し、argon2を使う際にメモリ関連の設定値が指定されていることを確認した
    • (とはいえ対象のapiではメモリ量の指定があるhashメソッドは認証時は読んでおらず、指定のないverifyのみが呼ばれていたので関係はありそうだけど・・・という状況。ただ直感的にはverify時もメモリは使うはずという予測は立つ。)
    • (ただし、この時点ではargon2もヒープメモリを使うと考えていたためメモリ量は足りるしな・・・などと考えていた)
  • 少し調べると argon2-jvmは内部でJNA (java native access)を利用してargon2の(C言語の)ネイティブライブラリを呼び出していることが分かった
    • ネイティブライブラリの中でmallocが呼ばれているとその分だけメモリを消費する可能性があることに思い至る
  • podのメモリーリミットを設定する際にargon2のメモリ消費は特に考慮せずにJVMヒープの最大サイズを設定していることを確認
    • そのため認証apiを複数同時に呼ぶとpodのメモリリミットを超過してOoMKillerからSIGKILLを飛ばされていると推測した

立てた仮説とその検証

  • 仮説
    • 同時アクセス時にargon2のメモリ消費量もリクエスト数分増えてしまいOoMが発生する。
    • 状況的にargon2がメモリーリークを起こしているわけではなさそうと判断(※ メトリクスを見てないので確定ではない)
  • 検証
    • 仮説を確認するために ヒープサイズを固定し、podのmemory limitを以下のように調整しAPIにアクセス
      • limit - heapサイズをギリギリにするパターン (1リクエストでOoMが発生するはず)
      • limitを上記より増やすパターン (argon2の設定値から計算できる同時リクエスト数は耐えられるはず)

上記の検証が上手くいき、無事にargon2の設定とpodのmemory limit, JVMのメモリ設定が問題になっていることが分かりました。

チューニング

  • 同時リクエスト数を見積もる
  • argon2の設定をチューニング
  • podのメモリークォータ, JVMヒープをチューニング
    • k8sのlimit - jvmのHeap をargon2が消費するメモリを考慮した値に指定する
    • ヒープ外でメモリを必要とするのはargon2だけではないのでギリギリにしないように注意(DirectBufferによるオフヒープメモリなどもあるし、そもそもメタスペースなどでもメモリが必要)
    • k8sJVMのバランスは以下を参考にするとよいです。
  • セマフォでargon2が同時にさばくリクエスト件数を減らす
    • スループットやレイテンシは減りますがCPU負荷、メモリ負荷も同時に制約できます。
  • (補足) cats-effectを利用している場合は以下のチューニングガイドも参考になるでしょう
    • Starvation and Tuning · Cats Effect
    • ※ IO.blockingで包む理由はあまりありません。(結局CPUが物理的に専有されるのでcompute poolでやってもblocking poolでやっても同じ)
    • hash化によりCPUが足りなくてcompute poolがblockされてしまう場合はCPU割当を増やすかSemaphoreで包むあたりが解決になります。(前後でIO.cedeすることで他タスクにスケジューリングの機会を与えてもよいですがargon2呼び出し前後の処理が長くなければ効果は限定的かもしれません。)

終わりに

興味としてはネイティブメモリーのトラッキング調査などをやってみたかったのですが、その前に問題が解決してしまったので今回は実施していません。 そもそもプロジェクトの開発に参加していなかった関係でまずargon2-jvmを使っていることを知らなかったところからスタートしたのでなんとか解決できて良かったです。😇

protobuf editions=2023 について

先日 Go Protobuf: The new Opaque API - The Go Programming Language という記事を見てprotobufの2023とは・・?となったので調べたメモです。(公式が正なので気になる方は公式を見ましょう。)

公式記事: Protobuf Editions Overview | Protocol Buffers Documentation

edition = 2023 とは

proto2, proto3 の後継となるのが edition = 2023 です。 とはいえ proto3 から大きな変更があるわけではなく、今後のインクリメンタルリリースに備えて editions という概念を導入したのが edition = 2023 となるようです。(後述するように差分は0ではありません。)

editions について

editions 導入後は各種機能がfeature flagで管理され、例えばファイルやフィールド単位でフラグのON/OFFを管理できます。

Feature Settings for Editions | Protocol Buffers Documentation のサンプル抜粋

edition = "2023";

// File-scope definition
option features.bar = BAZ;

enum Foo {
  // Enum-scope definition
  option features.bar = QUX;

  A = 1;
  B = 2;
}

edition はそういったfeatureごとのflagのデフォルト値をまとめたものです。 edition でベースの動作を決め、後方互換のために古いAPIで挙動を変えたくないといった場合に明示的にフラグを指定することで新しい機能を柔軟に取り込むことができるようになります。

また edition を更新する際は Prototiller というツールに proto ファイルをマイグレーションしてもらえば挙動が変わらないようにフラグを書き込んでもらえるようになっています。

edition = 2023 での違い

Feature Settings for Editions | Protocol Buffers Documentation で proto2, proto3 と edition = 2023 の違いが述べられています。 ここでは proto3 との比較について見ていきます。

features.field_presence

optional/required周りのデフォルト値が変更になりました。

  • LEGACY_REQUIRED: フィールドはパースとシリアライズに必須である(proto2 requiredと同じ挙動)
  • EXPLICIT: デフォルト値を受け取ったのか、unsetなまま送られたのかが追跡される。 デフォルト値を明示的に指定した場合でもシリアライズ時にデフォルト値がバイナリに含まれる (proto3のoptional相当)
  • IMPLICIT:デフォルト値はシリアライズされないのでデフォルト値が送られた or unsetのまま送られたかは判別できない。(proto3のデフォルト動作)。

※ proto3 の optional は後から追加された物で最初からあったわけではない

defaultは以下のようになっています。

  • proto3はimplicit
  • edition=2023はexplicit

フィールドが空文字列なのかnullなのかが分かりやすくなって便利なケースは多そうですね。

以上!なんとこれ以外は proto3 と edition = 2023 のフラグは同一なようです(すごい真面目に調査したわけではないので過信は禁物)

akka-http の Timeout で使われている アルゴリズム の正体について、そして何故 ActorSystem は Materializer としても使えるのか

この記事は FOLIO Advent Calendar 2024 - Adventar の15日目です。

HashedWheelTimerの記事では akka-http のタイムアウトに LightArrayResolverScheduler が使われていることに言及したのですが、実際にどういう経路で呼ばれているのかについてはボカしていました。 今回はその流れを辿ってみましょう。

まずは akka-http のタイムアウトが実際にタスクをスケジューリングしているところを確認してみましょう。(確認はしてないけどpekkoでもだいたい同じだと思います)

akka-http の Timeout で使われている Scheduler の正体

ここから先のソースコード内の日本語コメント、省略表記は筆者が追加したものです。

akka-httpのタイムアウトが materializer の scheduler を呼び出すまでの流れ

まず timeout を設定すると何が起きるのかを見ておきましょう。 一旦概略を説明するので少し飛躍がありますが寄り道で回収していきます。

TimeoutDirective#withRequestTimeoutでタイムアウトを設定。 timeoutAccesss.updateが呼ばれる

def withRequestTimeout(timeout: Duration, handler: Option[HttpRequest => HttpResponse]): Directive0 =
    Directive { inner => ctx =>
      ctx.request.header[`Timeout-Access`] match {
        case Some(t) =>
          handler match {
            case Some(h) => t.timeoutAccess.update(timeout, h)  // ← ココ
            case _       => t.timeoutAccess.updateTimeout(timeout)
          }
        case _ => ctx.log.warning("withRequestTimeout was used in route however no request-timeout is set!")
      }
      inner(())(ctx)
    }

t.timeoutAccess の中身は(後述するように)HttpServerBluePrint 内にある TimeoutAccessImpl なので HttpServerBluePrint内にあるTimeoutAccessImpl#updateが呼ばれる

    override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = {
      val promise = Promise[TimeoutSetup]()
      for (old <- getAndSet(promise.future).fast)
        promise.success {
          if ((old.scheduledTask eq null) || old.scheduledTask.cancel()) {
            val newHandler = if (handler eq null) old.handler else handler
            val newTimeout = if (timeout eq null) old.timeout else timeout
            val newScheduling = newTimeout match {
              case x: FiniteDuration => schedule(old.timeoutBase + x - Deadline.now, newHandler) // ← ココ
              case _                 => null // don't schedule a new timeout
            }
            currentTimeout = newTimeout
            new TimeoutSetup(old.timeoutBase, newScheduling, newTimeout, newHandler)
          } else old // too late, the previously set timeout cannot be cancelled anymore
        }
    }

updateが呼ばれると materializer#scheduleが呼ばれる

    private def schedule(delay: FiniteDuration, handler: HttpRequest => HttpResponse): Cancellable =
      materializer.scheduleOnce(delay, new Runnable { def run() = trigger.invoke((self, handler(request))) }) // ← ココ

上記の流れで materializer#schedule が呼ばれるようになっています。

次以降でscheduleメソッドを呼ぶとどうなるのか確認していきたいところですが ctx.request.header[`Timeout-Access`] とは何なのかとか、 それの中身が HttpServerBluePrint の TimeoutAccessImpl だというのは本当なのかといった疑問があるので寄り道として以下の疑問について調べていきます。

  • timeoutを設定するとそもそも何が起きるのか(それはどこで設定されているのか)
  • 本節でおもむろに scheduleメソッドの実装を HttpServerBluePrint から引用したが、これはどこで設定されているのか。
  • 本節でおもむろに ctx.request.header[Timeout-Access]の中身が HttpServerBluePrint.TimeoutAccessImpl としましたがそれは本当なのか

(寄り道1)timeout 時に scheduler によりエラーレスポンスが返ってくることの確認

前節でtimeoutが呼ばれていることは確認できましたが、timeoutすると何が起きるのか確認しておきます。(設定してるだけで実は何も起きないかもしれませんからね)

withRequestTimeout時に handler 指定がなかったら oldHandler が呼ばれるようになっている。実態はinitialTimeout で設定されている this(= TimeoutSetupクラス)

  private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, ...略)
    extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest => HttpResponse) { self =>
    import materializer.executionContext

    private var currentTimeout = initialTimeout

    initialTimeout match {
      case timeout: FiniteDuration => set {
        requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) // ← ココ
      }
      case _ => set {
        requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this))
      }
    }

... 略

scheduleされたタスクが発火するとTimeoutAccessImpl#applyが呼ばれて503 HttpResponseが生成される

    override def apply(request: HttpRequest) = {
      log.info("Request timeout encountered for request [{}]", request.debugString)
      //#default-request-timeout-httpresponse
      HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +  // ← ココ
        "to produce a timely response to your request.\r\nPlease try again in a short while!"
      //#default-request-timeout-httpresponse
    }

ということで timeout の名の通り503エラーレスポンスが返されることがわかりました。

(寄り道2)TimeoutAccessImpl を提供している HttpServerBluePrint と HttpServer の関係

Httpサーバーを立てたときに本当にHttpServerBluePrintクラスが使われているのか(使われてなかったらTimeoutAccessImplも呼ばれてないのではないか)という疑問があるので確認してみます。

akka-http で Httpサーバーを立てるときは Http().newServerAt().bindFlow() のようにメソッドを呼ぶのでここから HttpServerBluePrint までの流れを辿ってみます。

HttpServerBluePrintは以下の流れで呼ばれています。

Http.newServerAt()...bindFlow

    def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): Future[ServerBinding] =
      http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)(materializer) // ← ココ

bindAndHandleImpl

  private[http] def bindAndHandleImpl(
    handler:   Flow[HttpRequest, HttpResponse, Any],
    interface: String, port: Int,
    connectionContext: ConnectionContext,
    settings:          ServerSettings,
    log:               LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] =
    bindAndHandle(handler, interface, port, connectionContext, settings, log)(fm) // ← ココ

bindAndHandle

  def bindAndHandle(
    handler:   Flow[HttpRequest, HttpResponse, Any],
    interface: String, port: Int = DefaultPortForProtocol,
    connectionContext: ConnectionContext = defaultServerHttpContext,
    settings:          ServerSettings    = ServerSettings(system),
    log:               LoggingAdapter    = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
    if (settings.http2Enabled)
      log.warning(s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")

    val fullLayer: Flow[ByteString, ByteString, (Future[Done], ServerTerminator)] =
      fuseServerFlow(fuseServerBidiFlow(settings, connectionContext, log), handler) // ← ココ (fuseServerFlowではなく、その引数で呼ばれてる方)
  private def fuseServerBidiFlow(
    settings:          ServerSettings,
    connectionContext: ConnectionContext,
    log:               LoggingAdapter): ServerLayerBidiFlow = {
    val httpLayer = serverLayer(settings, None, log, connectionContext.isSecure) // ← ココ
    val tlsStage = sslTlsServerStage(connectionContext)

    val serverBidiFlow =
      settings.idleTimeout match {
        case t: FiniteDuration => httpLayer atop tlsStage atop HttpConnectionIdleTimeoutBidi(t, None)
        case _                 => httpLayer atop tlsStage
      }

    GracefulTerminatorStage(system, settings) atop serverBidiFlow
  }
  def serverLayer(
    settings:           ServerSettings            = ServerSettings(system),
    remoteAddress:      Option[InetSocketAddress] = None,
    log:                LoggingAdapter            = system.log,
    isSecureConnection: Boolean                   = false): ServerLayer = {
    val server = HttpServerBluePrint(settings, log, isSecureConnection, dateHeaderRendering) // ← ココでHttpServerBluePrintが呼ばれている
      .addAttributes(HttpAttributes.remoteAddress(remoteAddress))
      .addAttributes(cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))

    server atop delayCancellationStage(settings)
  }

無事に Httpサーバーを立てたときに HttpServerBluePrint が設定される = scheduleメソッドの実装もこれが呼ばれていそうであることが確認できました。

もう少し詳しく見るなら server インスタンスが作られているがそれらが結局どういう風にリクエストハンドリングにつながるのかを見ていくべきですが、起動処理を全て追うことになるので今回はここまでとします。

(寄り道3) ctx.request.header[`Timeout-Access`]の中身が HttpServerBluePrint.TimeoutAccessImpl であることの確認

字面から予想するにどうもリクエスト受信後に、リクエストヘッダーに Timeout-Access というフィールドを埋め込んでアクセスしているようです。 ということで Timeout-Access ヘッダを参照しているところを調べてみます。

まずTimeout-Accessはクラスになっているのでコードジャンプが可能です

object `Timeout-Access` extends ModeledCompanion[`Timeout-Access`]  // ← ココ
final case class `Timeout-Access`(timeoutAccess: akka.http.scaladsl.TimeoutAccess)
  extends jm.headers.TimeoutAccess with SyntheticHeader {
  def renderValue[R <: Rendering](r: R): r.type = r ~~ timeoutAccess.toString
  protected def companion = `Timeout-Access`
}

HttpServerBluePrint内部で定義されている RequestTimeoutSupport#createLogic内でheaderに TimeoutAccessImpl を追加しています。

class RequestTimeoutSupport(initialTimeout: Duration, log: LoggingAdapter)
    extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
略
    def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
略
      setHandler(requestIn, new InHandler {
        def onPush(): Unit = {
          val request = grab(requestIn)
          val (entity, requestEnd) = HttpEntity.captureTermination(request.entity)
          val access = new TimeoutAccessImpl(request, initialTimeout, requestEnd, callback, // ← ココ
            interpreter.materializer, log)
          openTimeouts = openTimeouts.enqueue(access)
          push(requestOut, request.addHeader(`Timeout-Access`(access)).withEntity(entity))  // ← ココ
        }
略

RequestTimeoutSupport は GraphStage を継承しており createLogic はこのクラスのメソッドをオーバーライドしたものです。

RequestTimeoutSupport は HttpServerBluePrintクラスのrequestTimeoutSupportメソッドでtimeoutが非ゼロのときにセットされます

  def requestTimeoutSupport(timeout: Duration, log: LoggingAdapter): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
    if (timeout == Duration.Zero) BidiFlow.identity[HttpResponse, HttpRequest]
    else BidiFlow.fromGraph(new RequestTimeoutSupport(timeout, log)).reversed // ← ココ

そして上記メソッドは HttpServerBluePrint#apply で呼ばれています。

private[http] object HttpServerBluePrint {
  def apply(settings: ServerSettings, log: LoggingAdapter, isSecureConnection: Boolean, dateHeaderRendering: DateHeaderRendering): Http.ServerLayer =
    userHandlerGuard(settings.pipeliningLimit) atop
      requestTimeoutSupport(settings.timeouts.requestTimeout, log) atop // ← ココ
      requestPreparation(settings) atop
      controller(settings, log) atop
      parsingRendering(settings, log, isSecureConnection, dateHeaderRendering) atop
      websocketSupport(settings, log) atop
      tlsSupport atop
      logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)

HttpServerBluePrint#apply がサーバーを立てるときに呼ばれていることはすでに確認ずみなので、ここまでで Timeout-Access ヘッダに TimeoutAccessImpl のインスタンスが詰め込まれていることを確認できました。

もちろん requestPreparationは何をしているんだろうとか疑問は尽きないわけですが一旦ここで満足することにします。

materializer#schedule と LightArrayResolverScheduler の関係

ここまでで timeout時には materializer#schedule が呼ばれることが分かったので、次は materializer と LightArrayResolverScheduler の関係を調査してみましょう。

まずは Httpサーバーが使っている Scheduler が ActorSystem の Scheduler であることを確認します。

Httpサーバーを立てるときには Http() を呼び出すのでまずはここから見ていきます。

Http() が implicitでActorSytemを引数にとる

  def apply()(implicit system: ClassicActorSystemProvider): HttpExt = super.apply(system) // ← ココ

その後、newServerAt がそのままServerBuilder に system を横流しします。

  def newServerAt(interface: String, port: Int): ServerBuilder = ServerBuilder(interface, port, system) // ← ココ

そして、ServerBuilder が materializer を抽出(つまり以降のmaterializerはActorSystemのmaterializer)

private[http] object ServerBuilder {
  def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder =
    Impl(
      interface,
      port,
      scaladsl.HttpConnectionContext,
      system.classicSystem.log,
      ServerSettings(system.classicSystem),
      system,
      SystemMaterializer(system).materializer // ← ココ
    )
略

前述のようにtimeoutでは materializer#scheduler が呼ばれているので HttpServer が使っている materializer = ActorSystemから抽出されるmaterializerと言えそうです。 そして materiliazer#schedule でタイムアウトを処理しているので、ここで使われている Scheduler は ActorSystem が持っている Scheduler ということで良さそうです。

Schedulerについて

次は ActorSystem の Scheduler が LightArrayRevolverScheduler ( HashedWheelTimer を基本としたスケジューラー)になっていることを確認します。

ActorSystemがSettingとしてSchedulerクラスのクラス名をconfigで保持している

    final val SchedulerClass: String = getString("akka.scheduler.implementation")

デフォルトはreference.confで指定されているLightArrayResolverScheduler

    # This setting selects the timer implementation which shall be loaded at
    # system start-up.
    # The class given here must implement the akka.actor.Scheduler interface
    # and offer a public constructor which takes three arguments:
    #  1) com.typesafe.config.Config
    #  2) akka.event.LoggingAdapter
    #  3) java.util.concurrent.ThreadFactory
    implementation = akka.actor.LightArrayRevolverScheduler // ← ココ

このクラス名の文字列はActorSystemの初期化時にインスタンス化されている

  protected def createScheduler(): Scheduler =
    dynamicAccess
      .createInstanceFor[Scheduler]( // ← ココ
        settings.SchedulerClass,
        immutable.Seq(
          classOf[Config] -> settings.config,
          classOf[LoggingAdapter] -> log,
          classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler")))
      .get

コメントにある通りこれがHashedWheelTimerになっている

/**
 * This scheduler implementation is based on a revolving wheel of buckets,
 * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and // ← ココ
 * dispatches tasks it finds in the current bucket to their respective
 * ExecutionContexts. The tasks are held in TaskHolders, which upon
 * cancellation null out their reference to the actual task, leaving only this
 * shell to be cleaned up when the wheel reaches that bucket next time. This
 * enables the use of a simple linked list to chain the TaskHolders off the
 * wheel.
 *
 * Also noteworthy is that this scheduler does not obtain a current time stamp
 * when scheduling single-shot tasks, instead it always rounds up the task
 * delay to a full multiple of the TickDuration. This means that tasks are
 * scheduled possibly one tick later than they could be (if checking that
 * “now() + delay &lt;= nextTick” were done).
 */
class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFactory: ThreadFactory)
    extends Scheduler
    with Closeable {
略

これで akka-http の timeout で使われている scheduler には HashedWheelTimer が使われていると言えそうです。

ActorSytesmとMaterializerの関係について

ここまでで疑問は解決したのですが、一方で materializerを抽出するときに使っている SystemMaterializer(system).materializer ってなんだろう・・?という疑問が湧いてきたので追跡してみます。

少し考えてみると以下のような点が気になりました。

  • akkaでは ActorSystem を Materializer かのように扱う事が可能だが、そもそもakka-actorの概念である ActorSystem が何故 akka-stream の概念である Materializer として使えるのか。
  • 当然、ActorSystemの初期化時に SystemMaterializer や Materializer を初期化するコードは(依存関係的にakka-streamのコードに依存できないため)明示的にはない。どうやって初期化しているのか。

これを説明するために、まず前提となる akka の Extension 機能について説明します。

extensions について

MaterializerはActorSystemを作成した際に初期化されますが、ActorSystemはakka-actor, Materializerはakka-streamの概念となっており、原則として明示的に依存することは出来ません。 ここでakkaが利用しているのがExtensionという仕組みです。 Classic Akka Extensions • Akka Documentation

読み込む extension は config ファイルで指定するのですがライブラリ提供者向けの library-extensions と、アプリケーション開発者向けの extensions の2つがあります。 akka-stream の reference.conf では akka.library-extensions += "akka.stream.SystemMaterializer$" のように指定されており、これが2つ目の疑問への鍵(というか答え)になります。

といことで、ActorSystem が SystemMaterializer を読み込むまでの流れをみていきましょう。

ActorSystem が SystemMaterializer を読み込むまでの流れ

ActorSystem#apply を呼ぶと ActorSystemImpl#start が呼ばれる

  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() // ← ココ
  }

startメソッドの実装が_startの評価となっており、この中でloadExtensionsが呼ばれる

  private lazy val _start: this.type = try {
略
    if (!terminating)
      loadExtensions() // ← ココ
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) =>
      try terminate()
      catch { case NonFatal(_) => Try(stopScheduler()) }
      throw e
  }

loadExtensionsメソッドがconfigを読み取り DynamicAccess#getObjectFor を使って ExtensionId (or ExtensionIdProvider) のインスタンスを作成し registerExtensionに渡す

  private def loadExtensions(): Unit = {

    /*
     * @param throwOnLoadFail
     *  Throw exception when an extension fails to load (needed for backwards compatibility.
     *    when the extension cannot be found at all we throw regardless of this setting)
     */
    def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {

      immutableSeq(settings.config.getStringList(key)).foreach { fqcn =>
        dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith {
          case firstProblem =>
            dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil).recoverWith { case _ => Failure(firstProblem) }
        } match {
          case Success(p: ExtensionIdProvider) =>
            registerExtension(p.lookup)
          case Success(p: ExtensionId[_]) =>
            registerExtension(p)
          case Success(_) =>
            if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
            else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
          case Failure(problem) =>
            if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
            else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
        }
      }
    }

    loadExtensions("akka.library-extensions", throwOnLoadFail = true) // ← ココ
    loadExtensions("akka.extensions", throwOnLoadFail = false)
  }

DynamicAccess の実態は ReflectiveDynamicAccess

  protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(classLoader) // ← ココ

ReflectiveDynamicAccess はあまり特別なことをしておらずリフレクションでインスタンスを作成しているだけ

  override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
    Try {
      val types = args.map(_._1).toArray
      val values = args.map(_._2).toArray
      val constructor = clazz.getDeclaredConstructor(types: _*)
      constructor.setAccessible(true)
      val obj = constructor.newInstance(values: _*)  // ← ココ
      val t = implicitly[ClassTag[T]].runtimeClass
      if (t.isInstance(obj)) obj.asInstanceOf[T]
      else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
    }.recover { case i: InvocationTargetException if i.getTargetException ne null => throw i.getTargetException }

registerExtension では createExtension を呼び出す

  @tailrec
  final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
    findExtension(ext) match {
      case null => //Doesn't already exist, commence registration
        val inProcessOfRegistration = new CountDownLatch(1)
        extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
          case null =>
            try { // Signal was successfully sent
              ext.createExtension(this) match { // Create and initialize the extension  // ← ココ
                case null =>
                  throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
                case instance =>
                  extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension
                  instance //Profit!
              }
            } catch {
              case t: Throwable =>
                extensions.replace(ext, inProcessOfRegistration, t) //In case shit hits the fan, remove the inProcess signal
                throw t //Escalate to caller
            } finally {
              inProcessOfRegistration.countDown() //Always notify listeners of the inProcess signal
            }
          case _ =>
            registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry
        }
      case existing => existing.asInstanceOf[T]
    }
  }

上記の流れにより SystemMaterializer の createExtension が呼び出されます。

ここまでで ActorSystem と SystemMaterializer の関連性がわかりました。 しかしSystemMaterializer自体はMaterializerではありません(ただのExtension)。ここからMaterializerの初期化の流れを見ていきます。

SystemMaterializer で Materializer が初期化され、利用できるようになるまでの流れ

SystemMaterializer.createExtension で new SystemMaterializer が呼ばれる

object SystemMaterializer extends ExtensionId[SystemMaterializer] with ExtensionIdProvider {
  override def get(system: ActorSystem): SystemMaterializer = super.get(system)
  override def get(system: ClassicActorSystemProvider): SystemMaterializer = super.get(system)

  override def lookup = SystemMaterializer

  override def createExtension(system: ExtendedActorSystem): SystemMaterializer =
    new SystemMaterializer(system)  // ← ココ
}

val materializerGuardian の初期化時に MaterializerGuardian Actorが作成される

  private val materializerGuardian = system.systemActorOf(   // ← ココ
    MaterializerGuardian
      .props(systemMaterializerPromise, materializerSettings)
      // #28037 run on internal dispatcher to make sure default dispatcher starvation doesn't stop materializer creation
      .withDispatcher(Dispatchers.InternalDispatcherId)
      .withDeploy(Deploy.local),
    "Materializers")

val systemMaterializer の初期化時に startMaterializer が呼ばれ、 PhasedFusingActorMaterializer が返される。これが Materializer の実装

  private val systemMaterializer = startMaterializer(None)  // ← ココ

また、MaterializerGuardian Actor のコンストラクタでは systemMaterializerPromise の resolve も行われている。これは SystemMaterializer のフィールドになっている。

  systemMaterializerPromise.success(systemMaterializer)  // ← ココ
final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
  private val systemMaterializerPromise = Promise[Materializer]()  // ← ココ

上記 Promise の値を取得することで Materializer のインスタンスを取得できる。 SystemMaterializer#materializer はこの処理を行っている

  val materializer: Materializer = { // ← ココ
    val systemMaterializerFuture = systemMaterializerPromise.future
    systemMaterializerFuture.value match {
      case Some(tryMaterializer) =>
        tryMaterializer.get
      case None =>
        // block on async creation to make it effectively final
        Await.result(systemMaterializerFuture, materializerTimeout.duration)
    }
  }

上記の処理により SystemMaterializer Extension が PhasedFusingActorMaterializer を作成していること、それは SystemMaterializer#materializer を呼ぶことで簡単に取得できることがわかりました。

また Extension は上述の公式docの通り簡単にインスタンスを取得可能です。(SystemMaterializerのコンストラクタには ExtendedActorSystem (ようは普通のActorSystem)を渡せば簡単に初期化可能)なのでsystemが参照できればいつでもMaterializerを参照することができると言えそうです。

最後に ActorSystem自体をMaterializer扱いできる理由 を簡単に説明します。

ActorSystem自体をMaterializer扱いできる理由

これは非常に簡単です

Materializerの companion objectに ClassicActorSystemProvider から Materializer への implicit conversion が定義されている(SystemMaterializer#materializerを呼び出している)

object Materializer {

  /**
   * Implicitly provides the system wide materializer from a classic or typed `ActorSystem`
   */
  implicit def matFromSystem(implicit provider: ClassicActorSystemProvider): Materializer =  // ← ココ
    SystemMaterializer(provider.classicSystem).materializer
略

ActorSystem は ClassicActorSystemProvider を継承している

abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider // ← ココ

つまり implicit conversion により ActorSystem は Materializer 扱いできるということになります。

PhasedFusingActorMaterializer が ActorSystemのScheduler を使っていることの確認

ActorSystem由来のmaterializerの schedule メソッドを呼んでいるからといってActorSystemが保有しているSchedulerを使っているとは限りません。 念のために確認してみます。

PhasedFusingActorMaterializer では scheduleOnce のようなメソッドを実装している

  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
    system.scheduler.scheduleOnce(delay, task)(executionContext) // ← ココ

実装は system.scheduler.scheduleOnce なので system.schedulerを利用している

となります。

ここで system.schedulerが何だったかを思い出しつつ今までの流れをおさらいすると

  • akka-httpが依存しているのは MaterializerのschedulerOnce
  • Materializerのデフォルト実装である PhasedFusingActorMaterializer の schedulerOnce は system.scheduler.scheduleOnce を利用している
  • system.scheduler は ActorSystem クラス内で conf に指定されている Schedulerが利用される
  • schedulerは reference.conf から上書きしなければ LightArrayResolverScheduler が使われる
  • LightArrayResolverScheduler は HashedWheelTimer ライクな実装を使っている

となります。

終わりに

説明してみるとそれなりに大変でした。

akka-actor が依存関係的に直接参照できないはずの akka-stream の Materializer扱いできるというのが直感に反するというところですがakka独自の Extension 機構 と Scalaのimplicit conversionの合わせ技で解決されているのが興味深いかなと思います。 これでようやく akka-http の timeout で使われている scheduler が LightArrayResolverScheduler であり、そのアルゴリズムは HashedWheelTimer ベースのものだということがわかりました。

年末になかなかの長旅でしたがお疲れ様でした。

株式会社FOLIOでは、このような記事を最後まで読んでくれる もの好き 優しい人を大募集中です。 バックエンドエンジニア以外にも大々募集中ですのでまずは下記フォームでカジュアル面談にご応募ください 🙏 ※ 入力は名前とメアドだけで大丈夫です 🙆‍♀️

herp.careers