akka-http の Timeout で使われている アルゴリズム の正体について、そして何故 ActorSystem は Materializer としても使えるのか

この記事は FOLIO Advent Calendar 2024 - Adventar の15日目です。

HashedWheelTimerの記事では akka-http のタイムアウトに LightArrayResolverScheduler が使われていることに言及したのですが、実際にどういう経路で呼ばれているのかについてはボカしていました。 今回はその流れを辿ってみましょう。

まずは akka-http のタイムアウトが実際にタスクをスケジューリングしているところを確認してみましょう。(確認はしてないけどpekkoでもだいたい同じだと思います)

akka-http の Timeout で使われている Scheduler の正体

ここから先のソースコード内の日本語コメント、省略表記は筆者が追加したものです。

akka-httpのタイムアウトが materializer の scheduler を呼び出すまでの流れ

まず timeout を設定すると何が起きるのかを見ておきましょう。 一旦概略を説明するので少し飛躍がありますが寄り道で回収していきます。

TimeoutDirective#withRequestTimeoutでタイムアウトを設定。 timeoutAccesss.updateが呼ばれる

def withRequestTimeout(timeout: Duration, handler: Option[HttpRequest => HttpResponse]): Directive0 =
    Directive { inner => ctx =>
      ctx.request.header[`Timeout-Access`] match {
        case Some(t) =>
          handler match {
            case Some(h) => t.timeoutAccess.update(timeout, h)  // ← ココ
            case _       => t.timeoutAccess.updateTimeout(timeout)
          }
        case _ => ctx.log.warning("withRequestTimeout was used in route however no request-timeout is set!")
      }
      inner(())(ctx)
    }

t.timeoutAccess の中身は(後述するように)HttpServerBluePrint 内にある TimeoutAccessImpl なので HttpServerBluePrint内にあるTimeoutAccessImpl#updateが呼ばれる

    override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = {
      val promise = Promise[TimeoutSetup]()
      for (old <- getAndSet(promise.future).fast)
        promise.success {
          if ((old.scheduledTask eq null) || old.scheduledTask.cancel()) {
            val newHandler = if (handler eq null) old.handler else handler
            val newTimeout = if (timeout eq null) old.timeout else timeout
            val newScheduling = newTimeout match {
              case x: FiniteDuration => schedule(old.timeoutBase + x - Deadline.now, newHandler) // ← ココ
              case _                 => null // don't schedule a new timeout
            }
            currentTimeout = newTimeout
            new TimeoutSetup(old.timeoutBase, newScheduling, newTimeout, newHandler)
          } else old // too late, the previously set timeout cannot be cancelled anymore
        }
    }

updateが呼ばれると materializer#scheduleが呼ばれる

    private def schedule(delay: FiniteDuration, handler: HttpRequest => HttpResponse): Cancellable =
      materializer.scheduleOnce(delay, new Runnable { def run() = trigger.invoke((self, handler(request))) }) // ← ココ

上記の流れで materializer#schedule が呼ばれるようになっています。

次以降でscheduleメソッドを呼ぶとどうなるのか確認していきたいところですが ctx.request.header[`Timeout-Access`] とは何なのかとか、 それの中身が HttpServerBluePrint の TimeoutAccessImpl だというのは本当なのかといった疑問があるので寄り道として以下の疑問について調べていきます。

  • timeoutを設定するとそもそも何が起きるのか(それはどこで設定されているのか)
  • 本節でおもむろに scheduleメソッドの実装を HttpServerBluePrint から引用したが、これはどこで設定されているのか。
  • 本節でおもむろに ctx.request.header[Timeout-Access]の中身が HttpServerBluePrint.TimeoutAccessImpl としましたがそれは本当なのか

(寄り道1)timeout 時に scheduler によりエラーレスポンスが返ってくることの確認

前節でtimeoutが呼ばれていることは確認できましたが、timeoutすると何が起きるのか確認しておきます。(設定してるだけで実は何も起きないかもしれませんからね)

withRequestTimeout時に handler 指定がなかったら oldHandler が呼ばれるようになっている。実態はinitialTimeout で設定されている this(= TimeoutSetupクラス)

  private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, ...略)
    extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest => HttpResponse) { self =>
    import materializer.executionContext

    private var currentTimeout = initialTimeout

    initialTimeout match {
      case timeout: FiniteDuration => set {
        requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) // ← ココ
      }
      case _ => set {
        requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this))
      }
    }

... 略

scheduleされたタスクが発火するとTimeoutAccessImpl#applyが呼ばれて503 HttpResponseが生成される

    override def apply(request: HttpRequest) = {
      log.info("Request timeout encountered for request [{}]", request.debugString)
      //#default-request-timeout-httpresponse
      HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +  // ← ココ
        "to produce a timely response to your request.\r\nPlease try again in a short while!"
      //#default-request-timeout-httpresponse
    }

ということで timeout の名の通り503エラーレスポンスが返されることがわかりました。

(寄り道2)TimeoutAccessImpl を提供している HttpServerBluePrint と HttpServer の関係

Httpサーバーを立てたときに本当にHttpServerBluePrintクラスが使われているのか(使われてなかったらTimeoutAccessImplも呼ばれてないのではないか)という疑問があるので確認してみます。

akka-http で Httpサーバーを立てるときは Http().newServerAt().bindFlow() のようにメソッドを呼ぶのでここから HttpServerBluePrint までの流れを辿ってみます。

HttpServerBluePrintは以下の流れで呼ばれています。

Http.newServerAt()...bindFlow

    def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): Future[ServerBinding] =
      http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)(materializer) // ← ココ

bindAndHandleImpl

  private[http] def bindAndHandleImpl(
    handler:   Flow[HttpRequest, HttpResponse, Any],
    interface: String, port: Int,
    connectionContext: ConnectionContext,
    settings:          ServerSettings,
    log:               LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] =
    bindAndHandle(handler, interface, port, connectionContext, settings, log)(fm) // ← ココ

bindAndHandle

  def bindAndHandle(
    handler:   Flow[HttpRequest, HttpResponse, Any],
    interface: String, port: Int = DefaultPortForProtocol,
    connectionContext: ConnectionContext = defaultServerHttpContext,
    settings:          ServerSettings    = ServerSettings(system),
    log:               LoggingAdapter    = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
    if (settings.http2Enabled)
      log.warning(s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")

    val fullLayer: Flow[ByteString, ByteString, (Future[Done], ServerTerminator)] =
      fuseServerFlow(fuseServerBidiFlow(settings, connectionContext, log), handler) // ← ココ (fuseServerFlowではなく、その引数で呼ばれてる方)
  private def fuseServerBidiFlow(
    settings:          ServerSettings,
    connectionContext: ConnectionContext,
    log:               LoggingAdapter): ServerLayerBidiFlow = {
    val httpLayer = serverLayer(settings, None, log, connectionContext.isSecure) // ← ココ
    val tlsStage = sslTlsServerStage(connectionContext)

    val serverBidiFlow =
      settings.idleTimeout match {
        case t: FiniteDuration => httpLayer atop tlsStage atop HttpConnectionIdleTimeoutBidi(t, None)
        case _                 => httpLayer atop tlsStage
      }

    GracefulTerminatorStage(system, settings) atop serverBidiFlow
  }
  def serverLayer(
    settings:           ServerSettings            = ServerSettings(system),
    remoteAddress:      Option[InetSocketAddress] = None,
    log:                LoggingAdapter            = system.log,
    isSecureConnection: Boolean                   = false): ServerLayer = {
    val server = HttpServerBluePrint(settings, log, isSecureConnection, dateHeaderRendering) // ← ココでHttpServerBluePrintが呼ばれている
      .addAttributes(HttpAttributes.remoteAddress(remoteAddress))
      .addAttributes(cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))

    server atop delayCancellationStage(settings)
  }

無事に Httpサーバーを立てたときに HttpServerBluePrint が設定される = scheduleメソッドの実装もこれが呼ばれていそうであることが確認できました。

もう少し詳しく見るなら server インスタンスが作られているがそれらが結局どういう風にリクエストハンドリングにつながるのかを見ていくべきですが、起動処理を全て追うことになるので今回はここまでとします。

(寄り道3) ctx.request.header[`Timeout-Access`]の中身が HttpServerBluePrint.TimeoutAccessImpl であることの確認

字面から予想するにどうもリクエスト受信後に、リクエストヘッダーに Timeout-Access というフィールドを埋め込んでアクセスしているようです。 ということで Timeout-Access ヘッダを参照しているところを調べてみます。

まずTimeout-Accessはクラスになっているのでコードジャンプが可能です

object `Timeout-Access` extends ModeledCompanion[`Timeout-Access`]  // ← ココ
final case class `Timeout-Access`(timeoutAccess: akka.http.scaladsl.TimeoutAccess)
  extends jm.headers.TimeoutAccess with SyntheticHeader {
  def renderValue[R <: Rendering](r: R): r.type = r ~~ timeoutAccess.toString
  protected def companion = `Timeout-Access`
}

HttpServerBluePrint内部で定義されている RequestTimeoutSupport#createLogic内でheaderに TimeoutAccessImpl を追加しています。

class RequestTimeoutSupport(initialTimeout: Duration, log: LoggingAdapter)
    extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
略
    def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
略
      setHandler(requestIn, new InHandler {
        def onPush(): Unit = {
          val request = grab(requestIn)
          val (entity, requestEnd) = HttpEntity.captureTermination(request.entity)
          val access = new TimeoutAccessImpl(request, initialTimeout, requestEnd, callback, // ← ココ
            interpreter.materializer, log)
          openTimeouts = openTimeouts.enqueue(access)
          push(requestOut, request.addHeader(`Timeout-Access`(access)).withEntity(entity))  // ← ココ
        }
略

RequestTimeoutSupport は GraphStage を継承しており createLogic はこのクラスのメソッドをオーバーライドしたものです。

RequestTimeoutSupport は HttpServerBluePrintクラスのrequestTimeoutSupportメソッドでtimeoutが非ゼロのときにセットされます

  def requestTimeoutSupport(timeout: Duration, log: LoggingAdapter): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
    if (timeout == Duration.Zero) BidiFlow.identity[HttpResponse, HttpRequest]
    else BidiFlow.fromGraph(new RequestTimeoutSupport(timeout, log)).reversed // ← ココ

そして上記メソッドは HttpServerBluePrint#apply で呼ばれています。

private[http] object HttpServerBluePrint {
  def apply(settings: ServerSettings, log: LoggingAdapter, isSecureConnection: Boolean, dateHeaderRendering: DateHeaderRendering): Http.ServerLayer =
    userHandlerGuard(settings.pipeliningLimit) atop
      requestTimeoutSupport(settings.timeouts.requestTimeout, log) atop // ← ココ
      requestPreparation(settings) atop
      controller(settings, log) atop
      parsingRendering(settings, log, isSecureConnection, dateHeaderRendering) atop
      websocketSupport(settings, log) atop
      tlsSupport atop
      logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)

HttpServerBluePrint#apply がサーバーを立てるときに呼ばれていることはすでに確認ずみなので、ここまでで Timeout-Access ヘッダに TimeoutAccessImpl のインスタンスが詰め込まれていることを確認できました。

もちろん requestPreparationは何をしているんだろうとか疑問は尽きないわけですが一旦ここで満足することにします。

materializer#schedule と LightArrayResolverScheduler の関係

ここまでで timeout時には materializer#schedule が呼ばれることが分かったので、次は materializer と LightArrayResolverScheduler の関係を調査してみましょう。

まずは Httpサーバーが使っている Scheduler が ActorSystem の Scheduler であることを確認します。

Httpサーバーを立てるときには Http() を呼び出すのでまずはここから見ていきます。

Http() が implicitでActorSytemを引数にとる

  def apply()(implicit system: ClassicActorSystemProvider): HttpExt = super.apply(system) // ← ココ

その後、newServerAt がそのままServerBuilder に system を横流しします。

  def newServerAt(interface: String, port: Int): ServerBuilder = ServerBuilder(interface, port, system) // ← ココ

そして、ServerBuilder が materializer を抽出(つまり以降のmaterializerはActorSystemのmaterializer)

private[http] object ServerBuilder {
  def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder =
    Impl(
      interface,
      port,
      scaladsl.HttpConnectionContext,
      system.classicSystem.log,
      ServerSettings(system.classicSystem),
      system,
      SystemMaterializer(system).materializer // ← ココ
    )
略

前述のようにtimeoutでは materializer#scheduler が呼ばれているので HttpServer が使っている materializer = ActorSystemから抽出されるmaterializerと言えそうです。 そして materiliazer#schedule でタイムアウトを処理しているので、ここで使われている Scheduler は ActorSystem が持っている Scheduler ということで良さそうです。

Schedulerについて

次は ActorSystem の Scheduler が LightArrayRevolverScheduler ( HashedWheelTimer を基本としたスケジューラー)になっていることを確認します。

ActorSystemがSettingとしてSchedulerクラスのクラス名をconfigで保持している

    final val SchedulerClass: String = getString("akka.scheduler.implementation")

デフォルトはreference.confで指定されているLightArrayResolverScheduler

    # This setting selects the timer implementation which shall be loaded at
    # system start-up.
    # The class given here must implement the akka.actor.Scheduler interface
    # and offer a public constructor which takes three arguments:
    #  1) com.typesafe.config.Config
    #  2) akka.event.LoggingAdapter
    #  3) java.util.concurrent.ThreadFactory
    implementation = akka.actor.LightArrayRevolverScheduler // ← ココ

このクラス名の文字列はActorSystemの初期化時にインスタンス化されている

  protected def createScheduler(): Scheduler =
    dynamicAccess
      .createInstanceFor[Scheduler]( // ← ココ
        settings.SchedulerClass,
        immutable.Seq(
          classOf[Config] -> settings.config,
          classOf[LoggingAdapter] -> log,
          classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler")))
      .get

コメントにある通りこれがHashedWheelTimerになっている

/**
 * This scheduler implementation is based on a revolving wheel of buckets,
 * like Netty’s HashedWheelTimer, which it advances at a fixed tick rate and // ← ココ
 * dispatches tasks it finds in the current bucket to their respective
 * ExecutionContexts. The tasks are held in TaskHolders, which upon
 * cancellation null out their reference to the actual task, leaving only this
 * shell to be cleaned up when the wheel reaches that bucket next time. This
 * enables the use of a simple linked list to chain the TaskHolders off the
 * wheel.
 *
 * Also noteworthy is that this scheduler does not obtain a current time stamp
 * when scheduling single-shot tasks, instead it always rounds up the task
 * delay to a full multiple of the TickDuration. This means that tasks are
 * scheduled possibly one tick later than they could be (if checking that
 * “now() + delay &lt;= nextTick” were done).
 */
class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFactory: ThreadFactory)
    extends Scheduler
    with Closeable {
略

これで akka-http の timeout で使われている scheduler には HashedWheelTimer が使われていると言えそうです。

ActorSytesmとMaterializerの関係について

ここまでで疑問は解決したのですが、一方で materializerを抽出するときに使っている SystemMaterializer(system).materializer ってなんだろう・・?という疑問が湧いてきたので追跡してみます。

少し考えてみると以下のような点が気になりました。

  • akkaでは ActorSystem を Materializer かのように扱う事が可能だが、そもそもakka-actorの概念である ActorSystem が何故 akka-stream の概念である Materializer として使えるのか。
  • 当然、ActorSystemの初期化時に SystemMaterializer や Materializer を初期化するコードは(依存関係的にakka-streamのコードに依存できないため)明示的にはない。どうやって初期化しているのか。

これを説明するために、まず前提となる akka の Extension 機能について説明します。

extensions について

MaterializerはActorSystemを作成した際に初期化されますが、ActorSystemはakka-actor, Materializerはakka-streamの概念となっており、原則として明示的に依存することは出来ません。 ここでakkaが利用しているのがExtensionという仕組みです。 Classic Akka Extensions • Akka Documentation

読み込む extension は config ファイルで指定するのですがライブラリ提供者向けの library-extensions と、アプリケーション開発者向けの extensions の2つがあります。 akka-stream の reference.conf では akka.library-extensions += "akka.stream.SystemMaterializer$" のように指定されており、これが2つ目の疑問への鍵(というか答え)になります。

といことで、ActorSystem が SystemMaterializer を読み込むまでの流れをみていきましょう。

ActorSystem が SystemMaterializer を読み込むまでの流れ

ActorSystem#apply を呼ぶと ActorSystemImpl#start が呼ばれる

  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() // ← ココ
  }

startメソッドの実装が_startの評価となっており、この中でloadExtensionsが呼ばれる

  private lazy val _start: this.type = try {
略
    if (!terminating)
      loadExtensions() // ← ココ
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) =>
      try terminate()
      catch { case NonFatal(_) => Try(stopScheduler()) }
      throw e
  }

loadExtensionsメソッドがconfigを読み取り DynamicAccess#getObjectFor を使って ExtensionId (or ExtensionIdProvider) のインスタンスを作成し registerExtensionに渡す

  private def loadExtensions(): Unit = {

    /*
     * @param throwOnLoadFail
     *  Throw exception when an extension fails to load (needed for backwards compatibility.
     *    when the extension cannot be found at all we throw regardless of this setting)
     */
    def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {

      immutableSeq(settings.config.getStringList(key)).foreach { fqcn =>
        dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith {
          case firstProblem =>
            dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil).recoverWith { case _ => Failure(firstProblem) }
        } match {
          case Success(p: ExtensionIdProvider) =>
            registerExtension(p.lookup)
          case Success(p: ExtensionId[_]) =>
            registerExtension(p)
          case Success(_) =>
            if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
            else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
          case Failure(problem) =>
            if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
            else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
        }
      }
    }

    loadExtensions("akka.library-extensions", throwOnLoadFail = true) // ← ココ
    loadExtensions("akka.extensions", throwOnLoadFail = false)
  }

DynamicAccess の実態は ReflectiveDynamicAccess

  protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(classLoader) // ← ココ

ReflectiveDynamicAccess はあまり特別なことをしておらずリフレクションでインスタンスを作成しているだけ

  override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
    Try {
      val types = args.map(_._1).toArray
      val values = args.map(_._2).toArray
      val constructor = clazz.getDeclaredConstructor(types: _*)
      constructor.setAccessible(true)
      val obj = constructor.newInstance(values: _*)  // ← ココ
      val t = implicitly[ClassTag[T]].runtimeClass
      if (t.isInstance(obj)) obj.asInstanceOf[T]
      else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
    }.recover { case i: InvocationTargetException if i.getTargetException ne null => throw i.getTargetException }

registerExtension では createExtension を呼び出す

  @tailrec
  final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
    findExtension(ext) match {
      case null => //Doesn't already exist, commence registration
        val inProcessOfRegistration = new CountDownLatch(1)
        extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
          case null =>
            try { // Signal was successfully sent
              ext.createExtension(this) match { // Create and initialize the extension  // ← ココ
                case null =>
                  throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
                case instance =>
                  extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension
                  instance //Profit!
              }
            } catch {
              case t: Throwable =>
                extensions.replace(ext, inProcessOfRegistration, t) //In case shit hits the fan, remove the inProcess signal
                throw t //Escalate to caller
            } finally {
              inProcessOfRegistration.countDown() //Always notify listeners of the inProcess signal
            }
          case _ =>
            registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry
        }
      case existing => existing.asInstanceOf[T]
    }
  }

上記の流れにより SystemMaterializer の createExtension が呼び出されます。

ここまでで ActorSystem と SystemMaterializer の関連性がわかりました。 しかしSystemMaterializer自体はMaterializerではありません(ただのExtension)。ここからMaterializerの初期化の流れを見ていきます。

SystemMaterializer で Materializer が初期化され、利用できるようになるまでの流れ

SystemMaterializer.createExtension で new SystemMaterializer が呼ばれる

object SystemMaterializer extends ExtensionId[SystemMaterializer] with ExtensionIdProvider {
  override def get(system: ActorSystem): SystemMaterializer = super.get(system)
  override def get(system: ClassicActorSystemProvider): SystemMaterializer = super.get(system)

  override def lookup = SystemMaterializer

  override def createExtension(system: ExtendedActorSystem): SystemMaterializer =
    new SystemMaterializer(system)  // ← ココ
}

val materializerGuardian の初期化時に MaterializerGuardian Actorが作成される

  private val materializerGuardian = system.systemActorOf(   // ← ココ
    MaterializerGuardian
      .props(systemMaterializerPromise, materializerSettings)
      // #28037 run on internal dispatcher to make sure default dispatcher starvation doesn't stop materializer creation
      .withDispatcher(Dispatchers.InternalDispatcherId)
      .withDeploy(Deploy.local),
    "Materializers")

val systemMaterializer の初期化時に startMaterializer が呼ばれ、 PhasedFusingActorMaterializer が返される。これが Materializer の実装

  private val systemMaterializer = startMaterializer(None)  // ← ココ

また、MaterializerGuardian Actor のコンストラクタでは systemMaterializerPromise の resolve も行われている。これは SystemMaterializer のフィールドになっている。

  systemMaterializerPromise.success(systemMaterializer)  // ← ココ
final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
  private val systemMaterializerPromise = Promise[Materializer]()  // ← ココ

上記 Promise の値を取得することで Materializer のインスタンスを取得できる。 SystemMaterializer#materializer はこの処理を行っている

  val materializer: Materializer = { // ← ココ
    val systemMaterializerFuture = systemMaterializerPromise.future
    systemMaterializerFuture.value match {
      case Some(tryMaterializer) =>
        tryMaterializer.get
      case None =>
        // block on async creation to make it effectively final
        Await.result(systemMaterializerFuture, materializerTimeout.duration)
    }
  }

上記の処理により SystemMaterializer Extension が PhasedFusingActorMaterializer を作成していること、それは SystemMaterializer#materializer を呼ぶことで簡単に取得できることがわかりました。

また Extension は上述の公式docの通り簡単にインスタンスを取得可能です。(SystemMaterializerのコンストラクタには ExtendedActorSystem (ようは普通のActorSystem)を渡せば簡単に初期化可能)なのでsystemが参照できればいつでもMaterializerを参照することができると言えそうです。

最後に ActorSystem自体をMaterializer扱いできる理由 を簡単に説明します。

ActorSystem自体をMaterializer扱いできる理由

これは非常に簡単です

Materializerの companion objectに ClassicActorSystemProvider から Materializer への implicit conversion が定義されている(SystemMaterializer#materializerを呼び出している)

object Materializer {

  /**
   * Implicitly provides the system wide materializer from a classic or typed `ActorSystem`
   */
  implicit def matFromSystem(implicit provider: ClassicActorSystemProvider): Materializer =  // ← ココ
    SystemMaterializer(provider.classicSystem).materializer
略

ActorSystem は ClassicActorSystemProvider を継承している

abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider // ← ココ

つまり implicit conversion により ActorSystem は Materializer 扱いできるということになります。

PhasedFusingActorMaterializer が ActorSystemのScheduler を使っていることの確認

ActorSystem由来のmaterializerの schedule メソッドを呼んでいるからといってActorSystemが保有しているSchedulerを使っているとは限りません。 念のために確認してみます。

PhasedFusingActorMaterializer では scheduleOnce のようなメソッドを実装している

  override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
    system.scheduler.scheduleOnce(delay, task)(executionContext) // ← ココ

実装は system.scheduler.scheduleOnce なので system.schedulerを利用している

となります。

ここで system.schedulerが何だったかを思い出しつつ今までの流れをおさらいすると

  • akka-httpが依存しているのは MaterializerのschedulerOnce
  • Materializerのデフォルト実装である PhasedFusingActorMaterializer の schedulerOnce は system.scheduler.scheduleOnce を利用している
  • system.scheduler は ActorSystem クラス内で conf に指定されている Schedulerが利用される
  • schedulerは reference.conf から上書きしなければ LightArrayResolverScheduler が使われる
  • LightArrayResolverScheduler は HashedWheelTimer ライクな実装を使っている

となります。

終わりに

説明してみるとそれなりに大変でした。

akka-actor が依存関係的に直接参照できないはずの akka-stream の Materializer扱いできるというのが直感に反するというところですがakka独自の Extension 機構 と Scalaのimplicit conversionの合わせ技で解決されているのが興味深いかなと思います。 これでようやく akka-http の timeout で使われている scheduler が LightArrayResolverScheduler であり、そのアルゴリズムは HashedWheelTimer ベースのものだということがわかりました。

年末になかなかの長旅でしたがお疲れ様でした。

株式会社FOLIOでは、このような記事を最後まで読んでくれる もの好き 優しい人を大募集中です。 バックエンドエンジニア以外にも大々募集中ですのでまずは下記フォームでカジュアル面談にご応募ください 🙏 ※ 入力は名前とメアドだけで大丈夫です 🙆‍♀️

herp.careers