magnoliaで素振り

magnoliaは型クラスのインスタンス自動導出のためのscalaライブラリです。 GitHub - propensive/magnolia: A better generic macro for Scala

既にscalaz-magnoliascalacheck-magnolia などがリリースされています。

公式チュートリアルもしっかりとあり、それに加えて特に何かというわけではないんですが最近本ばっかり読んでてあんまり色々いじれてないので個人の素振り備忘録です。 (つまりこの記事よりも Magnolia: Home を見たほうが良いです)

magnolia特長

型クラスの自動導出自体はshapelessでもできますが、Genに移した後HListとCoproduct用のインスタンスを定義して〜とやると意外とコード量が増えたりコンパイルエラーが起きてデバッグする時間が多くなったりします。

magnoliaを使うと以下のようなメリットが得られます。

  • 短く書ける
  • 導出に失敗した際のデバッグメッセージが詳しく出る
  • shapelessに比べて4~15倍速いcompile時間

ただフリーランチというわけではなく以下のような弱点もあります。

  • まだexperimentalなので色々変わるかもしれない
  • 一部型チェック諦めてるのでClassCastExceptionが実行時に出る可能性がある
  • 現時点ではhigher-kindな型クラス(Functor)の導出はできない

やってみた

チュートリアルとほぼ内容同じなので解説はそっち読んで貰えればと思います。 チュートリアルの延長線上でStringじゃなくて型名とパラメータ名を構造化してほしいなって思ったのでMapDumpみたいな物を作ってみました。 https://github.com/matsu-chara/magnolia-example

package example.domain

case class UserId(value: Long) extends AnyVal
case class UserName(value: String)

sealed trait UserType
object UserType {
  case object Normal extends UserType
  case object Premium extends UserType
}

case class User(
  id: UserId,
  name: UserName,
  tpe: UserType
)
package example

import example.domain.{User, UserId, UserName, UserType}
import example.dump.MapDump

object Main extends App {
  val u1 = User(UserId(1L), UserName("sato"), UserType.Normal)

//  if derivation failed, then output
//  Main.scala:11:22: magnolia: could not find MapDump.Typeclass for type Long
//       in parameter 'value' of product type example.domain.UserId
//       in parameter 'id' of product type example.domain.User
  println(MapDump.gen[User].dumpAsMap(u1))

//  same as above. but no debug output
//  (User,Map(id -> (UserId,UserId(1)), name -> (UserName,Map(value -> (string,sato))), tpe -> (Normal,Normal)))
  println(implicitly[MapDump[User]].dumpAsMap(u1))
}
package example.dump

import magnolia._

import scala.language.experimental.macros

trait MapDump[A] {
  /** return
    * (className, Map(param1 -> value1, param2 -> value2))
    * or
    * (className, value1)
    * when value class or object
    */
  def dumpAsMap(value: A): (String, Any)
}

object MapDump extends GenericMapDump {
  def apply[A](f: A => (String, Any)): MapDump[A] = new MapDump[A] {
    override def dumpAsMap(value: A): (String, Any) = f(value)
  }

  implicit val stringDump: MapDump[String] = MapDump[String] { value =>
    ("string", value)
  }

  implicit val longDump: MapDump[Long] = MapDump[Long] { value =>
    ("long", value)
  }
}

trait GenericMapDump {
  type Typeclass[T] = MapDump[T]

  def combine[T](ctx: CaseClass[Typeclass, T]): Typeclass[T] = new Typeclass[T] {
    override def dumpAsMap(value: T): (String, Any) = {
      val valueOrMap = if (ctx.isValueClass || ctx.isObject) {
        value
      } else {
        ctx.parameters.map { p =>
          p.label -> p.typeclass.dumpAsMap(p.dereference(value))
        }.toMap
      }
      (ctx.typeName.short, valueOrMap)
    }
  }

  def dispatch[T](ctx: SealedTrait[Typeclass, T]): Typeclass[T] = new Typeclass[T] {
    override def dumpAsMap(value: T): (String, Any) = ctx.dispatch(value) { sub =>
      sub.typeclass.dumpAsMap(sub.cast(value))
    }
  }

  implicit def gen[T]: Typeclass[T] = macro Magnolia.gen[T]
}

この型クラス何の役に立つんだろう・・・?というツッコミとか色々有る気がするけどとりあえず動いてるのでセーフ。

せっかくなのでdecodeも作ってみました

package example

import example.domain.{User, UserId, UserName, UserType}
import example.dump.{MapDump, FromDumpedMap}

object Main extends App {
  val u1 = User(UserId(1L), UserName("sato"), UserType.Normal)

  // User(UserId(1),UserName(sato),Normal)
  val dumped = MapDump.gen[User].dumpAsMap(u1)
  println(FromDumpedMap.gen[User].constructFrom(dumped))
}
package example.dump

import magnolia._

import scala.language.experimental.macros
import scala.util.control.NonFatal

trait FromDumpedMap[A] {
  def constructFrom(value: (String, Any)): A
}

object FromDumpedMap extends GenericFromDumpedMap {
  def apply[A](f: (String, Any) => A): FromDumpedMap[A] =
    new FromDumpedMap[A] {
      override def constructFrom(value: (String, Any)): A = f(value._1, value._2)
    }

  implicit val stringDump: FromDumpedMap[String] = FromDumpedMap[String] {
    case (clazz, param: String) if clazz == "string" =>
      param
    case arg =>
      throw new IllegalArgumentException(s"failed to decode. $arg")
  }

  implicit val longDump: FromDumpedMap[Long] = FromDumpedMap[Long] {
    case (clazz, param: Long) if clazz == "long" =>
      param
    case arg =>
      throw new IllegalArgumentException(s"failed to decode. $arg")
  }
}

trait GenericFromDumpedMap {
  type Typeclass[T] = FromDumpedMap[T]

  def combine[T](ctx: CaseClass[Typeclass, T]): Typeclass[T] = new Typeclass[T] {
    override def constructFrom(value: (String, Any)): T = {
      if (ctx.isValueClass || ctx.isObject) {
        try {
          value._2.asInstanceOf[T]
        } catch {
          case NonFatal(e) => throw new IllegalArgumentException(s"failed to decode. $ctx $value", e)
        }
      } else {
        ctx.construct { p =>
          val paramMap = try {
            value._2.asInstanceOf[Map[String, (String, Any)]]
          } catch {
            case NonFatal(e) => throw new IllegalArgumentException(s"failed to decode. $ctx $value", e)
          }

          val param = if (paramMap.contains(p.label)) {
            paramMap(p.label)
          } else {
            throw new IllegalArgumentException(s"failed to decode. $ctx $p $value")
          }
          p.typeclass.constructFrom(param)
        }
      }
    }
  }

  def dispatch[T](ctx: SealedTrait[Typeclass, T]): Typeclass[T] = new Typeclass[T] {
    override def constructFrom(value: (String, Any)): T = {
      val (subtype, subTypeValue) = ctx.subtypes.find(_.typeName.short == value._1) match {
        case Some(sub) =>
          try {
            (sub, value._2.asInstanceOf[sub.SType])
          } catch {
            case NonFatal(e) => throw new IllegalArgumentException(s"failed to decode. $ctx $value", e)
          }
        case _ => throw new IllegalArgumentException(s"failed to decode. $ctx $value")
      }
      subtype.typeclass.constructFrom((value._1, subTypeValue))
    }
  }

  implicit def gen[T]: Typeclass[T] = macro Magnolia.gen[T]
}

この形式をデシリアライズしたい人間は居るのか?というところは置いといて出来てそうです。 (そしてちゃんとやるならStringじゃなくてTypeNameとかParamNameにしたほうが良い)

consturctメソッドや中で呼んでるrawConstructメソッド的にdecode失敗したらEitherで返すみたいなことはできないっぽいので、そういうことがやりたい場合はまだ難しそうですね。

ある程度パターン化できてるので、さっくりderivingしたいだけの時はかなり便利そうです。(decodeとか書くとdecode自体が面倒なのでさくっとという範疇ではない気がしますがそれはdecode自体の問題)

HackerTackle2018でPonyの発表をしてきました

HACKER TACKLE というイベントでPonyについて発表してきました。

www.slideshare.net

タイトルの主張がやや強めになってしまった感がありますが、僕としてはPony自体というよりPonyがチャレンジしていること、それによって到来する未来に注目してもらえたらなーみたいな素朴な気持ちでつけました。

そんな感じの内容だったのでPony入門と見せかけてPonyのコードは少ししか出てきませんでした。 (でもスライド発表で文法の話しても文字が小さくて見えなかったりするしまあいいかみたいな気持ち・・(( ◜◡‾))

既に発表したことのある内容と被りすぎず、でもそらし過ぎて上級者向きにならないように・・など色々考えました。が!入門的な内容を改定を重ねつつ発表するのはやっぱり難しいですね。 (マイナー言語エバンジェリストあるある(?))

次のセッションがRustコミッターの発表でタイトルが「Concurrency in Rust」という状況でConcurrencyに長所があるPonyの話をするのはなかなか緊張感のある形でしたが、 自分としてはConcurrencyの問題は型で(部分的には)解決できる!というのが世の中に伝わるのが一番(ある意味Ponyの繁栄よりも)重要だと思っているので、むしろ良かったのかなと思います。

現段階でのPonyは結構独自感がありますが、mutable/immutableやモナドのような概念が様々な言語に取り入れられて来ているのと同じ流れでPonyの取り組みの成果みたいなものもどんどん他の言語に取り入れられる流れができると良いなーと思っています。今かどうかは分かりませんがそのうちきっと出来るはず!
そういった流れをいち早くキャッチアップするために、新しい取り組みを行っている尖った言語に触ってみるというのも一興なのではないでしょうか。つまり今すぐponycをインストールしてIntroduction · Pony Tutorialをやりましょう!!

consulの情報を取得するslack bot

consul公式のgo clientを見たらわりかし簡単にたたけそうだったのでノリで作りました。

GitHub - matsu-chara/conbot: consul reader slack bot

特にbot側では対応していないけど環境変数とかを与えるとauthとかhttpsとかやってくれるので楽。 https://github.com/hashicorp/consul/blob/v1.0.6/api/api.go#L282

catalogを見たり、各サービスのIP+portやNoteの一覧が見れたりします。 細かい絞り込みをやったり一覧で取ろうとするとN+1回呼ばないと行けないっぽいのがやや残念だけどゆっくり叩けば死なないかなという気持ち。

Jenkins Groovy ScriptでThrottle Concurrent Builds Pluginのカテゴリーを設定する

ジョブ設定が自動化されててもjenkins設定自体が自動化されてないと意味がない!ということでちょっと自動化してみました。

対象

今回の対象は Throttle Concurrent Builds Plugin - Jenkins - Jenkins Wiki 用のシステム設定で、以下のように表示される部分です。 ※ pluginは v2.0.1で確認しています。

f:id:matsu_chara:20171221132459p:plain

スクリプト

スクリプトコンソールで以下のスクリプトを実行するとカテゴリーや最大実効数が設定されます。

// jenkinsのスクリプトコンソールで実行する場合はimportは省略可能
import jenkins.*
import jenkins.model.*
import hudson.*
import hudson.model.*

def instance = Jenkins.getInstance()
def throttlePluginDescriptor = instance.getDescriptor("hudson.plugins.throttleconcurrents.ThrottleJobProperty")
def deployAllCategory = new hudson.plugins.throttleconcurrents.ThrottleJobProperty.ThrottleCategory("foo", 4, 4, [])

throttlePluginDescriptor.setCategories([deployAllCategory])
throttlePluginDescriptor.save()
throttlePluginDescriptor.load()

setCategoriesなどのメソッドはソース ( https://github.com/jenkinsci/throttle-concurrent-builds-plugin/blob/throttle-concurrents-2.0.1/src/main/java/hudson/plugins/throttleconcurrents/ThrottleJobProperty.java ) から探すのが良さそうです。(descriptorを呼んでやればよいということを探し当てるのに時間がかかった・・。)

プラグイン設定の自動化は他にも https://github.com/glenjamin/jenkins-groovy-examples のような方法もありそうでした。

まとめ

実行はansibleのjenkins_scriptプラグインjenkins_script - Executes a groovy script in the jenkins instance — Ansible Documentationなんかを使えそうです。

ansibleならpluginは jenkins_plugin - Add or remove Jenkins plugin — Ansible Documentation を使えば自動化できますし、 環境変数やクレデンシャルなども Jenkinsの構築それ全部自動でできるよ - Qiita を参考にjenkins_scriptプラグインで自動化できそうなので、だいたいのものは設定できそうです。

CombiningTreeで数を数える

第二のドワンゴアドベントカレンダー10日目です。 昨日は @yyuさんのブロックチェーンを利用した公平なガチャでした。 公平なガチャシリーズは全部おもしろいのでおすすめです。

今回はCombiningTreeによる高速なカウンターを紹介したいと思います。

ベースはThe Art of Multiprocessor ProgrammingScalable Concurrent Counting. 1994にある内容です。

高速なカウンター

1,2,3,...と数を数えることはよくあると思いますが、マルチコア環境で高速に数えたいケースについて考えてみます。 javaなどでは AtomicIntegerなどがよく利用されると思いますが、ここでは更にスケールするカウンターを考えてみたいと思います。

ComibiningTree

スケールすると言っても、一つのアドレスを書き換えあうアプローチでは、値を正確に数えるために必要となる同期がボトルネック(逐次ボトルネック)となってしまいます。 そのため、より高速なカウンターを実現するためには、何らかの意味で並列度を高くする必要があります。

そこで、複数のincrementリクエストを結合することで並列化を達成するCombiningTree(結合ツリー)を使ったカウンターを考えます。

CombiningTreeは下図のような二分木で、各leafには最大2つのスレッドが割り当てられます。 rootにはカウンタが配置されています。図では0に初期化してあります)

f:id:matsu_chara:20171209211051p:plain

割り当てられたスレッドはincrementリクエストをleafからrootに向けて上昇していきます。(precombineフェーズ) 下図ではスレッドA,Bがincrementを開始しようとしています。

f:id:matsu_chara:20171209211124p:plain

上昇の途中で別スレッドとほぼ同時に同じnodeに到達した場合にお互いのincrement値を合算する結合処理が行われます。(combineフェーズ)

具体的には後に到達したスレッド(パッシブスレッド)は結合操作が完了するまで待機し、先に到達したスレッド(アクティブスレッド)がパッシブスレッドの分も含めてincrementリクエストを上位のnodeに伝達します。

f:id:matsu_chara:20171209211147p:plain

この上昇を繰り返し、最終的にrootまで到達した後、加算を行えばincrementは完了です。(opフェーズ)

f:id:matsu_chara:20171209211208p:plain

このように処理を分散させることで1箇所がホットスポットになったり逐次ボトルネックが発生してしまうことを回避し、並列度を稼いでいます。

ただし、これで終わりではなく返り値を返すことも考えなければなりません。incrementAndGetのようなカウンタを増やす前の値(prior)を返すインターフェースを考えると待機したスレッドに値を返す必要があります。(distributeフェーズ)

具体的にはrootまで進んだアクティスレッドは木をleaf側へ下降をしながら、パッシブスレッドにincrement完了を通知します。このとき通知される値は上位レベルから得られたpriorにアクティブスレッドの値を加算した数になります。

例としてrootがすでに3のときに、更に2つのスレッドがincrementを行ったときの例を考えてみます。

f:id:matsu_chara:20171209211228p:plain

スレッドAがrootに到達するとカウンタの値は3+2で5になります。 このときpriorは加算する前の値となるので3になります。

f:id:matsu_chara:20171209211249p:plain

ここから1段ずつ下降していき、待機しているパッシブスレッドにprior + 自分の値を通知します。

f:id:matsu_chara:20171209211315p:plain

leafまで到達したら結果を返せば終了です。 例ではスレッドAは3をそのまま返し、スレッドBはAから通知された4をそのまま返しています。

f:id:matsu_chara:20171209211339p:plain

こうすることで、処理を結合したとしてもパッシブスレッドの前にアクティブスレッドがincrementを終えたという状況が返り値に反映されます。もっとNode数が増えても、リクエストが増えても同じ処理を行えば順番に加算前の値が返されます。

もう少し詳しく

ここでは参考文献2のFigure 2,Figure 3を参照します。 詳しい実装コードはTAoMPのサイト https://booksite.elsevier.com/9780123705914/?ISBN=9780123705914 のChapter 12においても配布されています。

f:id:matsu_chara:20171209211408p:plain)

f:id:matsu_chara:20171209211424p:plain

Part One(precombineフェーズ)

Part Oneではnodeをロックしてからnodeのステータスにより分岐しています。 node.status = FREE の場合にはステータスをCOMBINEに差し替えながらgoing_upがFALSEになるまでツリーを上昇していきます。 node.status = COMBINE または ROOT の場合、そこでそのスレッドの上昇は停止します。(ここでunlockをしないことがCOMBINEのポイントになります。)

node.status = RESULT の場合はRESULT状態が解除されるまでループします。

Part Two(combineフェーズ)

Part Twoでは、先程アクセスしたノードに再びアクセスし、first_incrに今までの値を格納します。スレッドが visited.wait_flag = TRUE の場合はtotalにsecond_incr(他のスレッドが残した値)を追加(combine)していきます。

Part Three(opフェーズ)

Part Threeでは、まずCOMBINEか否かで分岐があります。(ただしスレッドの上昇が止まるのはCOMBINEかROOTだけなのでelse説はROOTに対する処理です)

まずはわかりやすいelseから見ると、node.resultにtotalを足しています。つまり今まで結合してきたgetAndIncrementの本処理(rootの値の更新)を行っています。

次にnodeがCOMBINEだった場合を見ます。今まで足してきた分を先行スレッドに引き渡す必要があるためsecond_incrとして値を格納しCOMBINEが解除されるまで待機します。

Part Four(distributeフェーズ)

rootまでたどり着くと今度は下降しながら結果を通知していくフェーズになります。 下降しながらwait_flag = TRUEのnodeを見つけたらstatusをRESULTに変更してやり、first_incrを結果に加算します。(前述したようにこうすることでfirst_incr分の加算が先に行われたように見せることができます)

また、wait_flagがFALSEの場合は単にFREEにして初期状態に戻しています。 これを下降しながら最後まで辿りつき、各スレッドがsaved_resultを返すことによりgetAndIncrementが順番に行われたかのように値を返すことができます。

lock順が複雑なので様々なシナリオを考えてコードを追う必要があります。TAoMPの方はJavaなのでもう少し読みやすいかもしれません。

スケールするカウンター

ここまでCombiningTreeについて考えてみましたが、このCombiningTreeは確かにスループットは出ます1し、スレッド数の増加にも強いのですがlatencyという意味ではあまり性能が良いとは言えません。

例えば単にロックを使用したカウンターを考えると1回の呼び出しにかかる時間は O(1) です。 一方でCombiningTreeを使ったカウンターでは1回呼び出すのに O(log p) かかります。2,3

ただしスループットを考えてみると、ロックベースのカウンターではN回の呼び出しにかかる時間が O(N) になるの対して、 CombiningTreeを使ったカウンターでのN回の呼び出しは理想的な状況では O(log p) ですみます。4

このようにCombiningTreeでは高いスループットが出ますが、それは必ずしも低遅延を保証するものではありません。 最適なカウンターはアプリケーションによって異なるので、各方式の特性を理解して適切なカウンターを使うことが望ましいです。

まとめ

今回は並列に数をカウントするCombining Treeの紹介を行いました。 本当はカウンティングネットワークについても書こうと思ったのですが、 論文が読み終わらなかった長くて書ききれなかったので、また次の機会に。

TAoMPでは今回紹介したCombiningTreeやカウンティングネットワーク、その他諸々の面白いアルゴリズム・データ構造が載っているのでぜひ読みましょう!というTAoMPの宣伝記事でした。 冒頭で紹介した論文も俯瞰的にcountingについて書いてあって面白いのでぜひ!

参考文献


  1. ベンチマークについては http://people.csail.mit.edu/shanir/publications/HLS.pdf#page=18 の4章を見てください。(shared-memoryとmessage-passingの2つの実装で結構パフォーマンスが違うので注意)

  2. pは物理プロセッサの数です。

  3. ベンチマークを踏まえた議論は https://pdfs.semanticscholar.org/52a9/f8d07bbe6cc905aa94930e7792f12f4ebbc5.pdf を御覧ください。(こちらも同じ実装ではないので参考までに。)

  4. 結合が行われない絶妙なタイミングでincrementが呼ばれるようなベンチマークではこのような結果になりません。ここでの結果はすべてのincrementで結合が行われるような理想的な状況下での話です。