この記事は FOLIO Advent Calendar 2024 - Adventar の15日目です。
HashedWheelTimerの記事では akka-http のタイムアウトに LightArrayResolverScheduler が使われていることに言及したのですが、実際にどういう経路で呼ばれているのかについてはボカしていました。
今回はその流れを辿ってみましょう。
まずは akka-http のタイムアウトが実際にタスクをスケジューリングしているところを確認してみましょう。(確認はしてないけどpekkoでもだいたい同じだと思います)
akka-http の Timeout で使われている Scheduler の正体
ここから先のソースコード内の日本語コメント、省略表記は筆者が追加したものです。
akka-httpのタイムアウトが materializer の scheduler を呼び出すまでの流れ
まず timeout を設定すると何が起きるのかを見ておきましょう。
一旦概略を説明するので少し飛躍がありますが寄り道で回収していきます。
TimeoutDirective#withRequestTimeoutでタイムアウトを設定。 timeoutAccesss.updateが呼ばれる
def withRequestTimeout(timeout: Duration, handler: Option[HttpRequest => HttpResponse]): Directive0 =
Directive { inner => ctx =>
ctx.request.header[`Timeout-Access`] match {
case Some(t) =>
handler match {
case Some(h) => t.timeoutAccess.update(timeout, h)
case _ => t.timeoutAccess.updateTimeout(timeout)
}
case _ => ctx.log.warning("withRequestTimeout was used in route however no request-timeout is set!")
}
inner(())(ctx)
}
t.timeoutAccess
の中身は(後述するように)HttpServerBluePrint 内にある TimeoutAccessImpl なので HttpServerBluePrint内にあるTimeoutAccessImpl#updateが呼ばれる
override def update(timeout: Duration, handler: HttpRequest => HttpResponse): Unit = {
val promise = Promise[TimeoutSetup]()
for (old <- getAndSet(promise.future).fast)
promise.success {
if ((old.scheduledTask eq null) || old.scheduledTask.cancel()) {
val newHandler = if (handler eq null) old.handler else handler
val newTimeout = if (timeout eq null) old.timeout else timeout
val newScheduling = newTimeout match {
case x: FiniteDuration => schedule(old.timeoutBase + x - Deadline.now, newHandler)
case _ => null
}
currentTimeout = newTimeout
new TimeoutSetup(old.timeoutBase, newScheduling, newTimeout, newHandler)
} else old
}
}
updateが呼ばれると materializer#scheduleが呼ばれる
private def schedule(delay: FiniteDuration, handler: HttpRequest => HttpResponse): Cancellable =
materializer.scheduleOnce(delay, new Runnable { def run() = trigger.invoke((self, handler(request))) })
上記の流れで materializer#schedule が呼ばれるようになっています。
次以降でscheduleメソッドを呼ぶとどうなるのか確認していきたいところですが
ctx.request.header[`Timeout-Access`] とは何なのかとか、 それの中身が HttpServerBluePrint の TimeoutAccessImpl だというのは本当なのかといった疑問があるので寄り道として以下の疑問について調べていきます。
- timeoutを設定するとそもそも何が起きるのか(それはどこで設定されているのか)
- 本節でおもむろに scheduleメソッドの実装を HttpServerBluePrint から引用したが、これはどこで設定されているのか。
- 本節でおもむろに ctx.request.header[
Timeout-Access
]の中身が HttpServerBluePrint.TimeoutAccessImpl としましたがそれは本当なのか
(寄り道1)timeout 時に scheduler によりエラーレスポンスが返ってくることの確認
前節でtimeoutが呼ばれていることは確認できましたが、timeoutすると何が起きるのか確認しておきます。(設定してるだけで実は何も起きないかもしれませんからね)
withRequestTimeout時に handler 指定がなかったら oldHandler が呼ばれるようになっている。実態はinitialTimeout で設定されている this(= TimeoutSetupクラス)
private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, ...略)
extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest => HttpResponse) { self =>
import materializer.executionContext
private var currentTimeout = initialTimeout
initialTimeout match {
case timeout: FiniteDuration => set {
requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this))
}
case _ => set {
requestEnd.fast.map(_ => new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this))
}
}
... 略
scheduleされたタスクが発火するとTimeoutAccessImpl#applyが呼ばれて503 HttpResponseが生成される
override def apply(request: HttpRequest) = {
log.info("Request timeout encountered for request [{}]", request.debugString)
HttpResponse(StatusCodes.ServiceUnavailable, entity = "The server was not able " +
"to produce a timely response to your request.\r\nPlease try again in a short while!"
}
ということで timeout の名の通り503エラーレスポンスが返されることがわかりました。
(寄り道2)TimeoutAccessImpl を提供している HttpServerBluePrint と HttpServer の関係
Httpサーバーを立てたときに本当にHttpServerBluePrintクラスが使われているのか(使われてなかったらTimeoutAccessImplも呼ばれてないのではないか)という疑問があるので確認してみます。
akka-http で Httpサーバーを立てるときは Http().newServerAt().bindFlow()
のようにメソッドを呼ぶのでここから HttpServerBluePrint までの流れを辿ってみます。
HttpServerBluePrintは以下の流れで呼ばれています。
Http.newServerAt()...bindFlow
def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): Future[ServerBinding] =
http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)(materializer)
bindAndHandleImpl
private[http] def bindAndHandleImpl(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int,
connectionContext: ConnectionContext,
settings: ServerSettings,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] =
bindAndHandle(handler, interface, port, connectionContext, settings, log)(fm)
bindAndHandle
def bindAndHandle(
handler: Flow[HttpRequest, HttpResponse, Any],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext = defaultServerHttpContext,
settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: Materializer = systemMaterializer): Future[ServerBinding] = {
if (settings.http2Enabled)
log.warning(s"Binding with a connection source not supported with HTTP/2. Falling back to HTTP/1.1 for port [$port].")
val fullLayer: Flow[ByteString, ByteString, (Future[Done], ServerTerminator)] =
fuseServerFlow(fuseServerBidiFlow(settings, connectionContext, log), handler)
略
private def fuseServerBidiFlow(
settings: ServerSettings,
connectionContext: ConnectionContext,
log: LoggingAdapter): ServerLayerBidiFlow = {
val httpLayer = serverLayer(settings, None, log, connectionContext.isSecure)
val tlsStage = sslTlsServerStage(connectionContext)
val serverBidiFlow =
settings.idleTimeout match {
case t: FiniteDuration => httpLayer atop tlsStage atop HttpConnectionIdleTimeoutBidi(t, None)
case _ => httpLayer atop tlsStage
}
GracefulTerminatorStage(system, settings) atop serverBidiFlow
}
def serverLayer(
settings: ServerSettings = ServerSettings(system),
remoteAddress: Option[InetSocketAddress] = None,
log: LoggingAdapter = system.log,
isSecureConnection: Boolean = false): ServerLayer = {
val server = HttpServerBluePrint(settings, log, isSecureConnection, dateHeaderRendering)
.addAttributes(HttpAttributes.remoteAddress(remoteAddress))
.addAttributes(cancellationStrategyAttributeForDelay(settings.streamCancellationDelay))
server atop delayCancellationStage(settings)
}
無事に Httpサーバーを立てたときに HttpServerBluePrint が設定される = scheduleメソッドの実装もこれが呼ばれていそうであることが確認できました。
もう少し詳しく見るなら server インスタンスが作られているがそれらが結局どういう風にリクエストハンドリングにつながるのかを見ていくべきですが、起動処理を全て追うことになるので今回はここまでとします。
字面から予想するにどうもリクエスト受信後に、リクエストヘッダーに Timeout-Access というフィールドを埋め込んでアクセスしているようです。
ということで Timeout-Access ヘッダを参照しているところを調べてみます。
まずTimeout-Accessはクラスになっているのでコードジャンプが可能です
object `Timeout-Access` extends ModeledCompanion[`Timeout-Access`]
final case class `Timeout-Access`(timeoutAccess: akka.http.scaladsl.TimeoutAccess)
extends jm.headers.TimeoutAccess with SyntheticHeader {
def renderValue[R <: Rendering](r: R): r.type = r ~~ timeoutAccess.toString
protected def companion = `Timeout-Access`
}
HttpServerBluePrint内部で定義されている RequestTimeoutSupport#createLogic内でheaderに TimeoutAccessImpl を追加しています。
class RequestTimeoutSupport(initialTimeout: Duration, log: LoggingAdapter)
extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
略
def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
略
setHandler(requestIn, new InHandler {
def onPush(): Unit = {
val request = grab(requestIn)
val (entity, requestEnd) = HttpEntity.captureTermination(request.entity)
val access = new TimeoutAccessImpl(request, initialTimeout, requestEnd, callback,
interpreter.materializer, log)
openTimeouts = openTimeouts.enqueue(access)
push(requestOut, request.addHeader(`Timeout-Access`(access)).withEntity(entity))
}
略
RequestTimeoutSupport は GraphStage を継承しており createLogic はこのクラスのメソッドをオーバーライドしたものです。
RequestTimeoutSupport は HttpServerBluePrintクラスのrequestTimeoutSupportメソッドでtimeoutが非ゼロのときにセットされます
def requestTimeoutSupport(timeout: Duration, log: LoggingAdapter): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
if (timeout == Duration.Zero) BidiFlow.identity[HttpResponse, HttpRequest]
else BidiFlow.fromGraph(new RequestTimeoutSupport(timeout, log)).reversed
そして上記メソッドは HttpServerBluePrint#apply で呼ばれています。
private[http] object HttpServerBluePrint {
def apply(settings: ServerSettings, log: LoggingAdapter, isSecureConnection: Boolean, dateHeaderRendering: DateHeaderRendering): Http.ServerLayer =
userHandlerGuard(settings.pipeliningLimit) atop
requestTimeoutSupport(settings.timeouts.requestTimeout, log) atop
requestPreparation(settings) atop
controller(settings, log) atop
parsingRendering(settings, log, isSecureConnection, dateHeaderRendering) atop
websocketSupport(settings, log) atop
tlsSupport atop
logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)
HttpServerBluePrint#apply
がサーバーを立てるときに呼ばれていることはすでに確認ずみなので、ここまでで Timeout-Access ヘッダに TimeoutAccessImpl のインスタンスが詰め込まれていることを確認できました。
もちろん requestPreparationは何をしているんだろうとか疑問は尽きないわけですが一旦ここで満足することにします。
materializer#schedule と LightArrayResolverScheduler の関係
ここまでで timeout時には materializer#schedule が呼ばれることが分かったので、次は materializer と LightArrayResolverScheduler の関係を調査してみましょう。
まずは Httpサーバーが使っている Scheduler が ActorSystem の Scheduler であることを確認します。
Httpサーバーを立てるときには Http() を呼び出すのでまずはここから見ていきます。
Http() が implicitでActorSytemを引数にとる
def apply()(implicit system: ClassicActorSystemProvider): HttpExt = super.apply(system)
その後、newServerAt がそのままServerBuilder に system を横流しします。
def newServerAt(interface: String, port: Int): ServerBuilder = ServerBuilder(interface, port, system)
そして、ServerBuilder が materializer を抽出(つまり以降のmaterializerはActorSystemのmaterializer)
private[http] object ServerBuilder {
def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder =
Impl(
interface,
port,
scaladsl.HttpConnectionContext,
system.classicSystem.log,
ServerSettings(system.classicSystem),
system,
SystemMaterializer(system).materializer
)
略
前述のようにtimeoutでは materializer#scheduler が呼ばれているので HttpServer が使っている materializer = ActorSystemから抽出されるmaterializerと言えそうです。
そして materiliazer#schedule でタイムアウトを処理しているので、ここで使われている Scheduler は ActorSystem が持っている Scheduler ということで良さそうです。
Schedulerについて
次は ActorSystem の Scheduler が LightArrayRevolverScheduler ( HashedWheelTimer を基本としたスケジューラー)になっていることを確認します。
ActorSystemがSettingとしてSchedulerクラスのクラス名をconfigで保持している
final val SchedulerClass: String = getString("akka.scheduler.implementation")
デフォルトはreference.confで指定されているLightArrayResolverScheduler
implementation = akka.actor.LightArrayRevolverScheduler
このクラス名の文字列はActorSystemの初期化時にインスタンス化されている
protected def createScheduler(): Scheduler =
dynamicAccess
.createInstanceFor[Scheduler](
settings.SchedulerClass,
immutable.Seq(
classOf[Config] -> settings.config,
classOf[LoggingAdapter] -> log,
classOf[ThreadFactory] -> threadFactory.withName(threadFactory.name + "-scheduler")))
.get
コメントにある通りこれがHashedWheelTimerになっている
class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFactory: ThreadFactory)
extends Scheduler
with Closeable {
略
これで akka-http の timeout で使われている scheduler には HashedWheelTimer が使われていると言えそうです。
ActorSytesmとMaterializerの関係について
ここまでで疑問は解決したのですが、一方で materializerを抽出するときに使っている SystemMaterializer(system).materializer
ってなんだろう・・?という疑問が湧いてきたので追跡してみます。
少し考えてみると以下のような点が気になりました。
- akkaでは ActorSystem を Materializer かのように扱う事が可能だが、そもそもakka-actorの概念である ActorSystem が何故 akka-stream の概念である Materializer として使えるのか。
- 当然、ActorSystemの初期化時に SystemMaterializer や Materializer を初期化するコードは(依存関係的にakka-streamのコードに依存できないため)明示的にはない。どうやって初期化しているのか。
これを説明するために、まず前提となる akka の Extension 機能について説明します。
extensions について
MaterializerはActorSystemを作成した際に初期化されますが、ActorSystemはakka-actor, Materializerはakka-streamの概念となっており、原則として明示的に依存することは出来ません。
ここでakkaが利用しているのがExtensionという仕組みです。 Classic Akka Extensions • Akka Documentation
読み込む extension は config ファイルで指定するのですがライブラリ提供者向けの library-extensions と、アプリケーション開発者向けの extensions の2つがあります。
akka-stream の reference.conf では akka.library-extensions += "akka.stream.SystemMaterializer$" のように指定されており、これが2つ目の疑問への鍵(というか答え)になります。
といことで、ActorSystem が SystemMaterializer を読み込むまでの流れをみていきましょう。
ActorSystem が SystemMaterializer を読み込むまでの流れ
ActorSystem#apply を呼ぶと ActorSystemImpl#start
が呼ばれる
def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
val bootstrapSettings = setup.get[BootstrapSetup]
val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
}
startメソッドの実装が_startの評価となっており、この中でloadExtensionsが呼ばれる
private lazy val _start: this.type = try {
略
if (!terminating)
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
} catch {
case NonFatal(e) =>
try terminate()
catch { case NonFatal(_) => Try(stopScheduler()) }
throw e
}
loadExtensionsメソッドがconfigを読み取り DynamicAccess#getObjectFor を使って ExtensionId (or ExtensionIdProvider) のインスタンスを作成し registerExtensionに渡す
private def loadExtensions(): Unit = {
def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {
immutableSeq(settings.config.getStringList(key)).foreach { fqcn =>
dynamicAccess.getObjectFor[AnyRef](fqcn).recoverWith {
case firstProblem =>
dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil).recoverWith { case _ => Failure(firstProblem) }
} match {
case Success(p: ExtensionIdProvider) =>
registerExtension(p.lookup)
case Success(p: ExtensionId[_]) =>
registerExtension(p)
case Success(_) =>
if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
case Failure(problem) =>
if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
}
}
}
loadExtensions("akka.library-extensions", throwOnLoadFail = true)
loadExtensions("akka.extensions", throwOnLoadFail = false)
}
DynamicAccess の実態は ReflectiveDynamicAccess
protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(classLoader)
ReflectiveDynamicAccess はあまり特別なことをしておらずリフレクションでインスタンスを作成しているだけ
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
Try {
val types = args.map(_._1).toArray
val values = args.map(_._2).toArray
val constructor = clazz.getDeclaredConstructor(types: _*)
constructor.setAccessible(true)
val obj = constructor.newInstance(values: _*)
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) obj.asInstanceOf[T]
else throw new ClassCastException(clazz.getName + " is not a subtype of " + t)
}.recover { case i: InvocationTargetException if i.getTargetException ne null => throw i.getTargetException }
registerExtension では createExtension を呼び出す
@tailrec
final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
findExtension(ext) match {
case null =>
val inProcessOfRegistration = new CountDownLatch(1)
extensions.putIfAbsent(ext, inProcessOfRegistration) match {
case null =>
try {
ext.createExtension(this) match {
case null =>
throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
case instance =>
extensions.replace(ext, inProcessOfRegistration, instance)
instance
}
} catch {
case t: Throwable =>
extensions.replace(ext, inProcessOfRegistration, t)
throw t
} finally {
inProcessOfRegistration.countDown()
}
case _ =>
registerExtension(ext)
}
case existing => existing.asInstanceOf[T]
}
}
上記の流れにより SystemMaterializer の createExtension が呼び出されます。
ここまでで ActorSystem と SystemMaterializer の関連性がわかりました。
しかしSystemMaterializer自体はMaterializerではありません(ただのExtension)。ここからMaterializerの初期化の流れを見ていきます。
SystemMaterializer で Materializer が初期化され、利用できるようになるまでの流れ
SystemMaterializer.createExtension で new SystemMaterializer が呼ばれる
object SystemMaterializer extends ExtensionId[SystemMaterializer] with ExtensionIdProvider {
override def get(system: ActorSystem): SystemMaterializer = super.get(system)
override def get(system: ClassicActorSystemProvider): SystemMaterializer = super.get(system)
override def lookup = SystemMaterializer
override def createExtension(system: ExtendedActorSystem): SystemMaterializer =
new SystemMaterializer(system)
}
val materializerGuardian の初期化時に MaterializerGuardian Actorが作成される
private val materializerGuardian = system.systemActorOf(
MaterializerGuardian
.props(systemMaterializerPromise, materializerSettings)
.withDispatcher(Dispatchers.InternalDispatcherId)
.withDeploy(Deploy.local),
"Materializers")
val systemMaterializer の初期化時に startMaterializer が呼ばれ、 PhasedFusingActorMaterializer が返される。これが Materializer の実装
private val systemMaterializer = startMaterializer(None)
また、MaterializerGuardian Actor のコンストラクタでは systemMaterializerPromise の resolve も行われている。、これは SystemMaterializer のフィールドになっている。
systemMaterializerPromise.success(systemMaterializer)
final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
private val systemMaterializerPromise = Promise[Materializer]()
略
上記 Promise の値を取得することで Materializer のインスタンスを取得できる。 SystemMaterializer#materializer はこの処理を行っている
val materializer: Materializer = {
val systemMaterializerFuture = systemMaterializerPromise.future
systemMaterializerFuture.value match {
case Some(tryMaterializer) =>
tryMaterializer.get
case None =>
Await.result(systemMaterializerFuture, materializerTimeout.duration)
}
}
上記の処理により SystemMaterializer Extension が PhasedFusingActorMaterializer を作成していること、それは SystemMaterializer#materializer を呼ぶことで簡単に取得できることがわかりました。
また Extension は上述の公式docの通り簡単にインスタンスを取得可能です。(SystemMaterializerのコンストラクタには ExtendedActorSystem (ようは普通のActorSystem)を渡せば簡単に初期化可能)なのでsystemが参照できればいつでもMaterializerを参照することができると言えそうです。
最後に ActorSystem自体をMaterializer扱いできる理由 を簡単に説明します。
ActorSystem自体をMaterializer扱いできる理由
これは非常に簡単です
Materializerの companion objectに ClassicActorSystemProvider から Materializer への implicit conversion が定義されている(SystemMaterializer#materializerを呼び出している)
object Materializer {
implicit def matFromSystem(implicit provider: ClassicActorSystemProvider): Materializer =
SystemMaterializer(provider.classicSystem).materializer
略
ActorSystem は ClassicActorSystemProvider を継承している
abstract class ActorSystem extends ActorRefFactory with ClassicActorSystemProvider
つまり implicit conversion により ActorSystem は Materializer 扱いできるということになります。
PhasedFusingActorMaterializer が ActorSystemのScheduler を使っていることの確認
ActorSystem由来のmaterializerの schedule メソッドを呼んでいるからといってActorSystemが保有しているSchedulerを使っているとは限りません。
念のために確認してみます。
PhasedFusingActorMaterializer では scheduleOnce のようなメソッドを実装している
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable =
system.scheduler.scheduleOnce(delay, task)(executionContext)
実装は system.scheduler.scheduleOnce
なので system.schedulerを利用している
となります。
ここで system.schedulerが何だったかを思い出しつつ今までの流れをおさらいすると
- akka-httpが依存しているのは MaterializerのschedulerOnce
- Materializerのデフォルト実装である PhasedFusingActorMaterializer の schedulerOnce は system.scheduler.scheduleOnce を利用している
- system.scheduler は ActorSystem クラス内で conf に指定されている Schedulerが利用される
- schedulerは reference.conf から上書きしなければ LightArrayResolverScheduler が使われる
- LightArrayResolverScheduler は HashedWheelTimer ライクな実装を使っている
となります。
終わりに
説明してみるとそれなりに大変でした。
akka-actor が依存関係的に直接参照できないはずの akka-stream の Materializer扱いできるというのが直感に反するというところですがakka独自の Extension 機構 と Scalaのimplicit conversionの合わせ技で解決されているのが興味深いかなと思います。
これでようやく akka-http の timeout で使われている scheduler が LightArrayResolverScheduler であり、そのアルゴリズムは HashedWheelTimer ベースのものだということがわかりました。
年末になかなかの長旅でしたがお疲れ様でした。
株式会社FOLIOでは、このような記事を最後まで読んでくれる もの好き 優しい人を大募集中です。 バックエンドエンジニア以外にも大々募集中ですのでまずは下記フォームでカジュアル面談にご応募ください 🙏
※ 入力は名前とメアドだけで大丈夫です 🙆♀️
herp.careers