IntelliJのライブテンプレートにimport scala.concurrent.ExecutionContext.Implicits.global入れたら便利になった。

タイトルそのままです。 IntelliJ IDEA 2016.3.4です。

知ってる人は知ってるというか自分もなんか登録できるなということは知ってたんですが今まで何登録すればいいんだろうと思って放置していましたが登録したら便利になりました。

Preferences > Editor > Live Templates にある + ボタンを押して登録できます。

注意が必要な点として下記のように No applicable contexts yet. と書いている場合はコード補完の候補に出ないので、隣りにある Define をクリックしてScalaなどをチェックする必要があります。

f:id:matsu_chara:20170302134040p:plain

完成後はこんな感じになるはずです。

f:id:matsu_chara:20170302134017p:plain

登録したテンプレートは command + J で呼び出せます。(exe..とか適当に文字を入れると絞込。Enterで確定)

とりあえずパッと思いついた以下を設定しました。 

名前 template
auto_formatter_off //@formatter:off
auto_formatter_on //@formatter:on
duration import scala.concurrent.duration._
implicit_execution_context import scala.concurrent.ExecutionContext.Implicits.global
java_converter import scala.collection.JavaConverters._
mockito_all import org.mockito.Mockito._
mockito_matchers import org.mockito.Matchers._

こちらのスライドだともっとアグレッシブにやってるっぽいです。

www.slideshare.net

これでJavaConvertersってどこにあるんだっけとかググらないで済む生活が送れそうです。(ExecutionContextとか覚えてても長い・・) ということで知ってる人には当たり前だけどやったことない人がいたら試してみてね!という内容でした。

slackのユーザーを全員取得してアイコンをemojiとして登録する書捨てスクリプト

前々からほしかったので時短で雑に作った。 https://github.com/matsu-chara/slack_user_avatar_emojis

matsu_chara ユーザーだったら :m_atsu_chara: のような絵文字になる。(名前そのままだとメンションになってしまうため回避するためにアンダーバーを入れている。)

既に登録済みでも上書きとかされないので新規ユーザーが入ってくる度に流してもOK。ただしアイコンの更新は未対応。

追記: 誰かがアイコンを更新する度に手動で削除するのが大変だったので一旦emojiを削除してから登録するようにした。

株式会社ドワンゴを退職しました。

2017年2月28日付けで株式会社ドワンゴを退職しました。

新卒入社から二年弱、Scalaを書いたり、ScalaTextの作成に参加したり、Kafkaを運用したりしていました。

短い期間でしたがscalazコントリビューター1位&2位の人*1を含め、色々な側面ですごい人達と同じチームで仕事が出来たのはとてもいい経験になりました。

チラシの裏になりますが、せっかくの節目なのでドワンゴの思い出を書いていきます。個人ブログだしね₍₍ (ง´・_・`)ว ⁾⁾

Slackと分報チャンネル

ドワンゴはSlackが異様に発展していてチャンネルが2700以上あり、自分自身もjoin数が社内2位(700チャンネル)になるほどハマっていました。

最近では分報も急速に発展しています。#times_matsu_charaはジョイン者数55人で、こちらも社内2位でした。 割りと長いこと1位だったのですが人事部長に抜かれてしまいました。権力人望の違いですね(?)

最近はDBの講義を見ていたこともあり、VLDBとかSIGMODとかの論文と読んだ感想をひたすら分報チャンネルに貼っていました。各分野の有識者から意見をもらえるのはそれなりに人数がいる会社ならではという感じがします。

勉強会など

社内勉強会をわりと自由に開催できる雰囲気だったので、以下の本の輪読会を開催したりしました。

他にはDijkstra Prizeを受賞した分散コンピューティングの論文読み会に参加していました。自分はビザンチン障害トランザクショナルメモリの回を担当しました。

勉強熱心な人がたくさんいるのでどんな分野でも募集すれば輪読会が出来るくらいにはメンバーが集まると思います。

社内LTにはあまり参加できなかったのですが、最終出社日間近でMemC3 *2について発表しました。本当はMICA *3の話も入れたかった・・!

社内LTでは毎回すごい発表が行われていて、見ているだけでもかなり面白かった記憶があります。趣味で脳に電気を流す人の話を聞ける場所はかなり限られると思うのでいい経験だったと思います。(?)

終わりに

色々な心残りはあるのですが、別の環境で別のことをやってみるのも面白そうだしチャレンジしてみるぞ₍₍ (ง´・_・`)ว ⁾⁾という気持ちでの転職となりました。

次の会社は株式会社FOLIOです。 うまくいくと日本株を取り扱う独立系の証券会社が約10年ぶりに誕生することになるらしいです。

Wantedlyで一人だけ声をかけたら一発で返信が返ってきてびっくりしたと言われました。そのまま入社することになり自分もびっくりしています。

話を聞きながらデモを見せてもらってこれはもうやるしかない!と思ったのでかなり早い段階で決心が固まりました。 ちなみにFOLIOでもScalaを書きます!(GRPC/Play勢力ドワンゴ*4と違いFinagle/Finatra勢力なので若干雰囲気が変わりそうです。)

資金調達直後で絶賛エンジニア募集中とのことなのでwantedly貼っておきますね!(自分が担当するバックエンド以外ももちろん募集中です)

www.wantedly.com

www.wantedly.com

ということで、今までお世話になった方、本当にありがとうございました!

最後に、ほしいものリスト貼って〜と言われたので貼って終わります。
要らないものは抜けと言われたので抜きました。餃子の皮包むやつが欲しいです。
よろしくお願いします。
http://amzn.asia/h9qN55o

*1:今は順位が変わってるけど一瞬そういう状態になった。

*2:という爆速なmemcachedのようなKVS

*3:というmemc3より更に10倍くらい速いKVS

*4:finagle使ってるところもあるのでやや主語がでかい

akkaでメールボックスをクリアしつつリスタートしたかったけど何か微妙。

リスタートの時にメールボックスクリアしたくなったんですが、(厳密にはRestartではなくてStop => 新しいのをStartみたいなのでもOKなんですが・・)あんまりいい方法がなくてうーんって感じだったので検討をメモ書き程度に。

結論としてはstashするか、deadletterが出るのを我慢するか、unhandledを我慢するか、雑に無視するかのどれかが良いという感じでしたが、まだあんまりこれだ!という手が見つかっていないのでちょっと参考にならないかもしれません。

また、自分が扱ってる状況とサンプルがだいぶ違うのでモチベーションが伝わらないと思います。なんでこんなことやってるんだというツッコミは半分くらい例が悪いことによるものです(;´Д`) (上手い例が思いつかずコードを超多くするか、極端に端折るかの二択になってしまいました。。。)

考えるのに使ったコードは↓にあります。 https://github.com/matsu-chara/AkkaSandbox/tree/master/src/main/scala/clear

そもそも標準APIにないんだからやらない方が良いというようなことな気がしつつ、考えていきます。

ベースの例

いきなり長いですが、やってることは簡単で、1~30の数字をprintlnするだけです。 printするWorkerActorはFSMになっていてconnectメッセージを処理しないとメッセージをprintする処理が出来ないようになっています。(connectが来ていない状態でメッセージを受け取るとunhandledになる。) また、途中で10を受け取ると例外が投げられるようになっています。

実験のためwhenUnhandledでわざとstopを呼んでいるので、下の例を実行しても1~9までしか表示されません。 1~9まで表示 => 10で例外 => 11を処理しようとするがunhandledになりstopする => 以降、全部deadletterとなります。

package clear

import akka.actor.SupervisorStrategy._
import akka.actor._
import clear.WorkerActor.{Running, Starting, State}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

object ClearRestart extends App {
  val system = ActorSystem("clear")
  val actor = system.actorOf(Props[SupervisorActor], "supervisor")

  try {
    Thread.sleep(200)
    (1 to 10).foreach(actor ! _)  // 10を受け取ると例外が出てworkerがrestartされる
    (11 to 20).foreach(actor ! _) // restart時のconnectより先に積まれるのでunhandled

    Thread.sleep(1000)
    (21 to 30).foreach(actor ! _) // 以前のjobは実行されなくてもいいので、unhandledにならないで欲しい。そして、準備が出来たらそこから先のジョブは実行して欲しい
  } finally {
    Thread.sleep(3000)
    Await.result(system.terminate(), Duration.Inf)
  }
}

class SupervisorActor extends Actor {
  private val actor = context.actorOf(Props[WorkerActor], "worker")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case _: ActorInitializationException ⇒ Restart
    case _: ActorKilledException ⇒ Stop
    case _: DeathPactException ⇒ Stop
    case _: Exception ⇒ Restart
  }

  def receive: Receive = {
    case x => actor forward x
  }
}

class WorkerActor extends FSM[State, Int] {
  startWith(Starting, 0)

  self ! "connect"

  when(Starting) {
    case Event("connect", _) =>
      println("connect")
      goto(Running)
  }

  when(Running) {
    case Event(10, _) =>
      throw new RuntimeException("die")
      stay()
    case Event(x, _) =>
      Thread.sleep(100)
      println(x)
      stay()
  }

  whenUnhandled {
    case _ =>
      println("unhandledになったのでstopします")
      stop()
  }
}

object WorkerActor {
  sealed trait State
  case object Starting extends State
  case object Running extends State
}

これを例に、「死ぬ前に積まれたjobは実行されなくてもいいのでunhandledにならないで欲しい。そして、準備が出来たらそこから先のジョブは実行して欲しい」といった要望を満たすことを考えてみます。 実行結果としては、「1-9は表示」、「10-20は表示されなくても良い」、「21-30は表示」という状態を目指します。 以降はここからのdiffで見ていきます。

1. シンプルにunhandledを無視

WorkerActorのStarting状態でunhandledが出るのが嫌なので、無視するcase節を追加すればいい!という作戦です。

これで結構上手くいくんですが、無理やり無視しているので本来無視したくないものも無視してしまうようなリスクがあるような無いような・・。リスタートしてから前世のメールボックスの内容を一つ一つ無視して捨てるのではなく、もう少しスマートに、リスタート時にmailboxごと破棄できないのか?と思ったのが今回の発端です。

    when(Starting) {
      case Event("connect", _) =>
        println("connect")
        goto(Running)
+    case Event(x, _) =>
+      println(s"初期化中なので無視します $x") // 本当に無視して大丈夫なのか・・状態不整合が隠れていないか不安。
+      stay()
    }

2. stashしてみよう

前節とやっていることはあまり変わらないのですがstashでもやってみました。 こちらは無視されるのではなくunstash時に既存メッセージが戻ってきます。今回は無視したいんですがclearStashはprivate APIなので呼べずunstashするしかなさそうでした。(すごい調べたわけではないですが多分あってるはず・・・。)

unstashしなければ無限に溜め込めるというわけでもない(いずれStashOverFlowで例外が飛ぶ)のでstashだけ呼んでunstashは呼ばない作戦は悪手になりそうです。(厳密にはstashを使う頻度とアクターの寿命とかによりそうですが、そこに頼るのは危険そうです。)

   when(Starting) {
     case Event("connect", _) =>
       println("connect")
+      unstashAll() // 古いジョブを実行したい場合はこちら。unstashしない場合、clearする方法は無さそうなのでいずれStashOverFlowになる?
       goto(Running)
+    case Event(_, _) =>
+      println("stashします")
+      stash()
+      stay()
   }

3. stopしてstartする

一番目の例で書いたスマートな方法に近いイメージのものです。

supervisorではRestartではなくStopを指定します。WorkerActorをwatchしておき、Terminatedメッセージを受け取ったら新しくアクターを作成します。

これにより前世の記憶を消し去ることに成功しました。しかし、こうするとstopした瞬間に積まれていたメッセージがデッドレターになるため、それが許容できるかどうか?といった話が出てきそうです。普段デッドレターが出ないようなケースだと、共通処理でデッドレターを監視して検知とかやっているとちょっとめんどくさそうな。でも普通そこまでやらない気がします。デッドレター自体は普通に作っても出たりしますし・・。ということで大抵のケースでは許容できそうという思いがあります。

 class SupervisorActor extends Actor {
-  private val actor = context.actorOf(Props[WorkerActor], "worker")
+  private var actor = createWorker()
+
+  private def createWorker() = {
+    val a = context.actorOf(Props[WorkerActor], "worker")
+    context.watch(a)
+    a
+  }

   override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
     case _: ActorInitializationException ⇒ Restart
     case _: ActorKilledException ⇒ Stop
     case _: DeathPactException ⇒ Stop
-    case _: Exception ⇒ Restart
+    case _: Exception ⇒ Stop // 止めると未処理メッセージがdeadletterになる
   }

   def receive: Receive = {
+    case Terminated(ref) =>
+      actor = createWorker()
     case x => actor forward x
   }
 }

ちなみにデッドレターログは以下のようにして止めることが出来ます。ActorSystem全体で止めることになるので完全に止めるのは少し不安な気がします。(何かの異常でデッドレター出てる時に気づけなくなるため) ログに出しておいて気にしないようにするのが良さそうです。

設定値のintは、10だとデッドレターを10件までログに出す。20だと20件までログに出す。といった意味です。

-  val system = ActorSystem("clear")
+  val system = ActorSystem("clear", ConfigFactory.parseString("akka.log-dead-letters = 0")) // deadletter logをオフ(システムグローバルにオフだとちょっと不安・・)

4. restart時に頑張る

stop & startと仕組みはほぼ同じですが、中間にアクターを増やすことで「中間のアクターをリスタートさせた結果、WorkerActorがstop => startされる」というロジックでほぼ同じことが出来ます。context.watchし忘れを防止出来る一方で、登場人物が増えるというなんだかどっちもどっちな方法です。

未処理メッセージがデッドレターになるのはこちらも同様です。

 class SupervisorActor extends Actor {
-  private val actor = context.actorOf(Props[WorkerActor], "worker")
+  private val actor = context.actorOf(Props[WorkerVisorActor], "worker_visor")

   override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
     case _: ActorInitializationException ⇒ Restart
@@ -39,6 +40,19 @@
   }
 }

+class WorkerVisorActor extends Actor {
+  private val actor = context.actorOf(Props[WorkerActor], "worker")
+
+  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
+    case _: Exception ⇒ Escalate // 孫はstopされるが、未処理のメッセージがdeadletterになってしまう。
+  }
+
+  def receive: Receive = {
+    case x =>
+      actor forward x
+  }
+}

5. もうunhandledで良くない?

ここまでくると一番最初に書いた単純に無視する案が一番良いように思えてきました。

そして無視するならwhenUnhandledでログ出してstayすれば同じじゃないかという気持ちに・・。(本来のコードではunhandledのときの処理をある程度共通化していて、そこはオーバーライドしたくないという動機があったのですが、それもそこまですごい重大な動機でもなく、ここまで頑張る必要もないな・・・という感じです)

とはいえFSMのStateがいくつかある場合、whenUnhandledでログだけ出してstayする方法ではアクターの全Stateでunhandledなメッセージが無視される一方で、最初に書いた無視案では特定のStateでだけunhandledなメッセージを無視する挙動を書くことができるのでwhenUnhandledではstopさせたりアラートを出したり出来るかもしれません。そう考えるとそこまで捨てたもんじゃない?という気持ちもあります。

デッドレターになるのも良さそうですが、無視するのに比べると若干実装めんどくさいような気もするので(とはいえ数行?)選びどころですかね。 一律で無視すると実装ミスで発生した意図しないメッセージまで無視してしまう可能性あるのでちゃんとやりたいならこっちのほうが良い気がしています。

6. カスタムdispatcher

custom dispatcherを自前で作ればできそうという書き込みをMLでみつけました。

https://groups.google.com/forum/?hl=en.#!searchin/akka-user/Clearing$20all$20mailboxes$20while$20restarting%7Csort:relevance/akka-user/3qJLNUTcLDc/wJxLx75orEEJ

試してはいないんですが、カスタムで作られたdispatcherをメンテしたくない気持ちがあります。(リンク内にもoverkillとありますし・・。) ただカスタムdispatcherってわりと普通に作る物なのかどうかよくわかってないので、その辺どうなんだろうと思いつつ今回は終わりです。

カーネギーメロンのDBに関する講義が面白いのでおすすめ

ここに書くことによって途中でやめられなくするメソッドです。

ハッカーニュースを眺めていたら以下のようなCS系講義動画のまとめリポジトリが流れていました。

GitHub - Developer-Y/cs-video-courses: List of Computer Science courses with video lectures.

へーっと思いながら何個かポチってみたところ以下に出くわしました。

15721.courses.cs.cmu.edu

英語が(自分にとって)聞き取りやすく、動画の品質(画質やスライドがちゃんと見えるかどうかといった部分)も良いものでかつ興味のある内容で出来ればスライドもおしゃれで・・・となるとなかなか少ないですが、これはかなり見やすいです。 スライドも概念図が頻繁に登場したりして、これだけでも聞き取れなかった部分などをかなり補完できます。

スケジュールページのVideoアイコンからページに飛んでもらえるとわかるのですが、スライドごとに動画を飛ばしたり戻したりできるのでわからなかったところはもう一回、この辺はわかるから2スライド分とばすみたいなことがやりやすいです。

http://cmudb.io/15721-s16-lect01

さらにページを良く見てみると結構いろいろな講義が上がっているようです。 今回紹介する、Database Systems 以外にも

など色々あります。

特にSeven Databases in Seven WeeksはMemSQL, Microsoft(Hekaton), NuoDB, MongoDB, Tokutek, VoltDBのCTOやリードエンジニアが一回ずつそれぞれのDBについて紹介してくれるという非常に豪華な内容になっています。

どんな講義?

さてDatabase Systemsの話にもどります。

この講義ではsingle nodeのin-memory databaseの内部アーキテクチャについて扱います。(分散DBについては対象外) classicalなDBMSについては扱わずstate-of-the-artなトピックについて扱うとのことです。(classicalなやつが軽視されているわけではなく、別の講義などで学習済みであることを想定しているっぽいです。)

具体的には、以下のようなトピックがあります。

  • Concurrency Control
  • Indexing
  • Storage Models, Compression
  • Join Algorithms
  • Logging & Recovery Methods
  • Query Optimization, Execution, Compilation
  • New Storage Hardware

より詳細には以下にまとまっています。 Schedule - CMU 15-721 :: Database Systems (Spring 2016)

内容としてはin-memory DBって既存のDBにキャッシュを詰みまくるのとどう違うの?という疑問に直球で答えを返せるようなイメージでしょうか。 ディスク中心のDBでは如何にディスク I/Oを避けるかが主要な関心事になっていますが、in-memoryではデータアクセスコストが(ディスクアクセスと比べれば)かなり低くなるのでボトルネックが変化します。 既存のアーキテクチャはdisk I/Oを避けるのに特化した構成になっているため、in-memoryの速さを活かすためには新たなボトルネックに合わせたin-memoryを前提としたアーキテクチャが必要になります。 この講義では、そのようなアーキテクチャについて一つ一つ学んでいく形になります。

例えば、 ディスク指向のDBではデータアクセスの際、 ディスクアドレスを計算 => buffer pool managerにキャッシュがメモリにあるか問い合わせる 、といった処理が入りますが、in-memoryを前提にすればこの処理はカットできる。とかロックをかけること自体がネックになるのでCASを使ったものにしたりロックの粒度を荒くしてときにはDB全体でロックをかけるといったことも行われる。(そうすると、全体が逐次実行になる。その結果CPUキャッシュが効くようになる!すごい!)といったことなどが挙げられます。

また、この講義では指定された論文を読んでまとめを提出したりコードを書く課題などがあります。 論文リストはスケジュールに載っています。 リスト内には2015年発表のものなど、かなり新しめの論文などが含まれているため読んでいても知ってるモダンなDBの中身について書いてあったり、あるいは全く知らなかった新しいDBについて知ることが出来ます。割とこっちが面白いというか、最近のとりあえずこれ抑えようリストが上がっているのは非常に助かっています。

で、どうなの?

こういう意識高いやつ何回か挑戦しつつ結局二回目くらいまで聞いて挫折しているんですが、現在のところ5回目くらいまで聞けているので結構良いんじゃないかと思っています。 どちらかというと動画を見るよりかは論文を読むのを中心に進めています。

読んだものを簡単に紹介します。(内容のサマリというよりかは、メモ書きに近いです)

M. Stonebraker, et al., What Goes Around Comes Around, in Readings in Database Systems, 4th Edition, 2006 (Optional)

データモデリングについての35年分の歴史を辿り、そこから得られる教訓についてまとめた論文。IMS => CODASYL => Relational => ...と進んでいく中で何を学んだのか?これから同じ失敗や議論を繰り返さないためにはどうすればよいか?について書いてあります。(その分野の巨人が動かないと結局流行らないよね。とか、改良があったとしても移行する動機が薄いと普及しないよね。みたいな技術的なところ以外(そこも含めて技術?)の要因についても背景をしっかりおさえて書いてあります。

A. Pavlo, et al., What's New with NewSQL?, 2015 (Optional)

NewSQLという分野(ACID特性を諦めないでNoSQLのようなスケーラビリティを目指すDBMS)についてのまとめです。スケーラビリティを求めてEventual Consistencyでも動くコードをアプリケーションで頑張って書かなくてもDBがやってくれる方が生産性が良いってGoogle Spannerの著者も言っていたらしいです。

The authors of Spanner even remark that it is better to have their application programmers deal with performance problems due to overuse of transactions, rather than writing code to deal with the lack of transactions as one does with a NoSQL DBMS [24].

この分野ではCockroach DB, SAP HANA, VoltDBなどがありますが(本文中ではもっと紹介されています)、これらのDBがどのようなアーキテクチャになっているかそれぞれざっくり知ることが出来ます。

H. Garcia-Molina, et al., Main Memory Database Systems: An Overview, in IEEE Trans. on Knowl. and Data Eng., 1992

ディスクではなくメモリに全てのデータを格納することを前提としたDBのアーキテクチャについてまとめられています。 いい面ばかりではなく、HDDは故障しても部分的にリストアできたりできるけどメモリは全体が壊れやすい。もしハードウェアが信頼できるようになったとしてもOSが暴走したらメモリが書き換えられたりして結局DBに格納されてるデータが壊れる・・?といった懸念があるのでチェックポイント増やしたほうが良いか・・?でもそうするとチェックポイントたくさんつくるために新しくボトルネック出来てしまう・・?といった事柄についても議論されています。

上の方で書いたMMDBならロックの粒度をDB全体にしても良いっと書いてあるのはこの論文です。ロックの対象をDB全体にするとトランザクションが逐次実行になります。逐次実行になるとCPUのキャッシュが活かせる(今まではトランザクションが待ち状態になるとキャッシュが無駄になっていた)ため更に性能が見込めるかもといった内容もあり面白いです。(今までdisk I/Oと戦っていたのにいきなりCPUのキャッシュの話に・・!)

その他にもConcurreny Control, index, ロックなどあらゆる事柄の変化について解説されています。

X. Yu, et al., Staring into the Abyss: An Evaluation of Concurrency Control with One Thousand Cores, in VLDB, 2014

1000コア時代が到来した時に今のConcurrency Controlの仕組みでスケールするか検証したらダメだったので頑張ろう・・!みたいな論文。 大きく分けて2phase-commit方式とTimeStamp Ordering方式があり、それぞれの中でも異なる実装方式を比較したりしています。 コア数増えるとAtomicIntegerのCAS使うとキャッシュコヒーレンスのための命令が飛び交って遅くなってしまう。 => バッチでtimestampをまとめて発行すればいいかと思ったけど、トランザクションが失敗した時にtimestampがまとめて無効になってしまうから競合が多いと逆効果になったり。などTAoMP に書いてあるような知識が役立ちそうでした。 図10の競合が多い場合の書き込みワークロードを見るとコア数が1のときのほうがコア数500や1000のときよりスループットが高い(10万txn/s => 5万txn/sになってしまう)といったケースもあり、このようなケースでもスケールできる方式が求められていそうです。 この論文でこの方式はうまくいかないと書いてあっても現代の1000より大幅に少ないコア数だと普通に成立したりする気がするのでその辺は注意が必要ですね。

ということで割りとなんとなくですが、学んでいますという紹介というか勉強ログ的な記事でした。