TwitterのFutureについてのざっくりまとめ

基本的に以下からそのまま取ってきています。

説明用のざっくりポインタとしてまとめる予定だったのに、あれもこれもと欲張ってしまった代物。(その割に全部あるわけではない。) playからfinagleに移行してきたりするとメソッド名が細かく違ったりするのに最初は戸惑いますが、Scala標準Futureとakkaのschedulerを使ったことがあれば、すぐ慣れつつ便利さを感じられると思います。

例ではThread.sleepを呼びまくっていますが、そのへんの事情はScala標準のFutureと同じなのでちゃんとやるときはFuture.sleepやtimer.doLaterなどを使ったほうが良いです。

目次

Futureの作成

基本的な作り方について。 この作り方だとapplyも含めて全部同期実行になる点に注意。

value/exception

import com.twitter.util.Future

// 基本の作り方
Future.value(1)
Future.exception(new RuntimeException)

apply/const

Scala標準Futureではapplyは非同期に実行されるが、TwitterFutureでのapplyは同期実行という違いに注意。

import com.twitter.util.Future

// applyを使うとTryで包むのでReturn/Throwに仕分けてくれる
// Scala標準のFutureと異なり、ただTryで包むだけ == 同期実行な点に注意。
Future(1)
Future(throw new RuntimeException)

// twitter Tryからの変換
Future.const(Try(1))
Future.const(Try(throw new RuntimeException))

Futureをまとめる

Scala標準のFutureと同じくfor式を使うだけだと並行実行されないことがあるので注意。

map/flatMap

join

ScalaFutureのzipと同じ http://qiita.com/mtoyoshi/items/f68beb17710c3819697f#zip コード例は略。

失敗のハンドリング

handle/rescure

import com.twitter.util.{Future, Return, Throw}

val successFuture = Future.value(1)
val failedFuture = Future.exception[Int](new RuntimeException)

// handle
// 失敗した例外をSuccessにできる。 failedに対するmap
// caseはPartialFunctionなのでcaseにマッチしない例外はそのまま例外として扱われる。
successFuture.handle {
  case e: RuntimeException => 0
}
failedFuture.handle {
  case e: RuntimeException => 0
}

// rescue
// 失敗した例外をSuccessにしたり別の例外に変換できる。 failedに対するflatMap
// caseはPartialFunctionなのでcaseにマッチしない例外はそのまま例外として扱われる。
successFuture.rescue {
  case e: RuntimeException => Future.value(0)
}
failedFuture.rescue {
  case e: RuntimeException => Future.value(0)
}

transform

import com.twitter.util.{Future, Return, Throw}

val successFuture = Future.value(1)
val failedFuture = Future.exception[Int](new RuntimeException)

// transform
// rescueと異なり成功時の値も同時に変換できる
successFuture.transform {
  case Return(a) if a == 1 => Future.exception(new RuntimeException)
  case Return(a) => Future.value(a)
  case Throw(e: RuntimeException) => Future.value(0)
  case Throw(e) => Future.exception(e)
}
failedFuture.transform {
  case Return(a) => Future.value(a)
  case Throw(e: RuntimeException) => Future.value(0)
  case Throw(e) => Future.exception(e)
}

// transformedByというメソッドもあるが、こちらはFutureTransformerを受け取る。
// FutureTransformerはJavaFriendlyと書いてあるので基本的にはtransformを使えば良い。
// 今回は割愛。

FuturePool

非同期実行したい場合はFuturePoolの力が必要。

FuturePool.unboundedPool

スレッドプール内部のExecutorServiceはglobalのもの(https://github.com/twitter/util/blob/util-6.45.0/util-core/src/main/scala/com/twitter/util/FuturePool.scala#L70-L72) が利用される。

import com.twitter.util.FuturePool

// 非同期実行されるプール
// 処理の実行方法はFuture.applyと同じくpoolのapplyに処理を渡せばOK
// unboundedなので際限なく拡張される。
val unboundedPool = FuturePool.unboundedPool
unboundedPool(1)
unboundedPool(throw new RuntimeException)

FuturePool(dbExecutorService).apply

import java.util.concurrent.ForkJoinPool

import com.twitter.util.FuturePool

// 非同期実行されるプールを自分で作る。
val dbExecutorService = new ForkJoinPool(50)
val myPool = FuturePool(dbExecutorService)

myPool { Thread.sleep(1); 1 }
myPool(throw new RuntimeException)

// interruptibleUnboundedPoolというキャンセルに対応したPoolもある。

Timer系列

Timerの種類を例ごとに変えてあるが使い方はどれも同じ。Timerの種類については後述。

sleep

import com.twitter.util.{Future, Await}
import com.twitter.conversions.time._

implicit val timer = com.twitter.finagle.util.DefaultTimer

// 3秒後にUnitが返る
val f = Future.sleep(3.seconds)

Await.result(f)

delayed

完了が遅れるだけで計算自体はすぐ行われる点に注意。

import com.twitter.util.{Future, Await}
import com.twitter.conversions.time._

implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)

// 3秒後にIntが返る
val f = Future {
  println("in future")
  1
}.delayed(3.seconds).foreach(_ => println("done"))

Await.result(f)

Timer#schedule

Futureのメソッドではないがついでなので紹介。

import com.twitter.util.{Await, Future, Time}
import com.twitter.conversions.time._

implicit val timer = new com.twitter.util.ScheduledThreadPoolTimer(poolSize = 5, makeDaemons = true)

// schedule 1秒ごとに何度も実行する 。キャンセル可能なTimerTaskを返す。
val timerTask = timer.schedule(1.seconds) {
  println("1sec!")
}

Thread.sleep(3000)
Await.result(timerTask.close())

// doLater 2秒後に1回実行する。Futureを返す。
val f1 =  timer.doLater(2.seconds) {
  println("2sec!")
}
Await.result(f1)

// doAt 具体的な時刻を指定する。Futureを返す。
val f2 =  timer.doAt(Time.Now + 3.seconds) {
  println("3sec!")
}
Await.result(f2)

timerの話

tl;dr;

  • finagleを使っているならcom.twitter.finagle.util.DefaultTimerを使えばOK
    • finagle-netty4が依存パスにあればNetty4HashedWheelTimerが使われる。
    • 無くてもJavaTimer(isDaemon=true)が使われるので安心。
  • finagleは使って無くてtwitter/utilだけ使っている場合はJavaTimerかScheduledThreadPoolTimerが選択肢
    • 単に new JavaTimer とするとユーザースレッドとして起動するため、明示的にcancelを呼ばないとtimerがGCされるまでプロセスが終了しなくなることがあるのでisDaemon=trueを指定するとよい。

細かい話

twitter/utilのこの手のメソッドはcom.twitter.util.Timerを要求してくる。

スケジューリングを無視して即時実行するNullTimerやテスト用のMockTimerがあるが、 finagleを使っていない場合、基本的にはJavaTimer(isDaemon=true)を、ある程度の性能が欲しい場合はScheduledThreadPoolTimerを使えば良さそう。

finagleを使っている場合は、もう少し性能が出るタイマーがDefaultTimerとしてfinagle自体に用意されているのでそちらを使う方が良さそう。

com.twitter.finagle.util.DefaultTimerの実装はServiceLoaderで一番最初に見つかったクラスを使う。 ServiceLoaderで見つからなかった場合はwarningログが出つつcom.twitter.util.JavaTimerをdaemonThreadをONにした上で使うようになっている。 https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-core/src/main/scala/com/twitter/finagle/util/DefaultTimer.scala#L31-L36

finagle-netty4(finagle-{thrift, http}の依存にある)がNetty4HashedWheelTimerを指定しているので大抵の場合はこれが読み込まれる気がする。  https://github.com/twitter/finagle/blob/finagle-6.45.0/finagle-netty4/src/main/resources/META-INF/services/com.twitter.finagle.util.DefaultTimer

もう一つcom.twitter.finagle.util.HashedWheelTimer.Defaultというややこしいものが存在するが、こちらはNetty3ベースのHashedWheelTimerを使っている。 Netty3,4間のHashedWheelTimerの差はよくわかっていないが新しい方が良さそうなので基本的にはcom.twitter.finagle.util.DefaultTimerを使うのが良いだろう。

Timerのロードにサービスローダーを使うようになったのはfinagle6.45から。経緯とかは https://github.com/twitter/finagle/commit/d047b4568e07a56b481b5f7c193b0e8c5ec6ff71 のコミットに書いてある通り、finagle-coreからnetty3依存を剥がすためにそうなっているらしい。

複数の処理

select/or

import com.twitter.util.{Future, Try}

// selectは一番最初に終わったFutureの値と残りのFutureを返す。
val fs = (1 to 5).map(Future(_))
val (firstDoneTry, others): (Try[Int], Seq[Future[Int]]) = Await.result(Future.select(fs))
println(firstDoneTry) // Return(1)
println(others) // Seq(Future(Return(2)), Future(Return(3)), Future(Return(4)), Future(Return(5)))


// orはselectの2つ版。selectと異なり、先に終わった値が含まれるFutureのみを返す。
Future(1).or(Future(2))

// selectIndexという一番最初に終わったSeq[Future[A]]のindexを返すメソッドもあるが割愛。

traverseSequentially/collect/collectToTry

traverseSequentiallyは前のFutureが終了してから次を実行し、collectは同時に実行する。

実装を見るとcollectでも結果に含まれる要素の順番は引数と同じになるっぽい。順序についてはドキュメントやコメントにはのってない気がする。

collectToTryはcollectと異なり一部が失敗したときも成功し、一連の結果がTryで取得できる。

import com.twitter.util.{Await, Future}

// 順番に実行される
val f = Future.traverseSequentially(1 to 5) { i =>
  Future(i)
}

// 並列に実行される
val f = Future.collect((1 to 5).map { i =>
  Future {
    val wait = scala.util.Random.nextInt(300)
    println(s"$i => $wait ms")
    Thread.sleep(wait)
    i
  }
})
println(Await.result(f)) // 順序は同じ。ArraySeq(1, 2, 3, 4, 5)

// 並列に実行される。失敗も補足できる
val f2 = Future.collectToTry((1 to 5).map { i =>
  Future {
    if(i % 2 == 0) throw new RuntimeException else i
  }
})

println(Await.result(f2)) // 順序は同じ。ArraySeq(Return(1), Throw(java.lang.RuntimeException), Return(3), Throw(java.lang.RuntimeException), Return(5))

batched

大量の非同期処理をいくつかのグループに分けて実行する君

同時実行数の制限という意味だと com.twitter.concurrent.AsyncSemaphore を使う手もある。 Futures — Util 6.45.0 documentation

batchedは複数リクエストをまとめて送るのが前提になっている(Seq[In]=> Seq[Out]で処理を記述する)などの違いがある。

import com.twitter.conversions.time._
import com.twitter.util.{Await, Future}

implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)

// まずbatcherを作る
val batcher = Future.batched[Int, String](
  sizeThreshold = 10, // 実行しきい値は10。正確にはsizeThreshold*sizePercentile個のジョブがエンキューされるまで実行を待つようになっている。
  timeThreshold = 500.milliseconds, // sizeの条件を満たさなくてもenqueueからtimeThresholdを超過したらジョブが実行される
  sizePercentile = 0.3f  // sizeThresholdと合わせて最低ジョブ実行数を決める。
                         // この例では固定値を入れているが名前渡しになっているのでRandom.nextFloat()とか入れるとバッチサイズを都度変化できるようになっている。
                         // 必要がなければ指定しないでデフォルト値(1.0f)を使えば良さそう。
) { ids =>
  // Seq[In]を受け取って、Seq[Out]を返す関数を書く
  Future.sleep(scala.util.Random.nextInt(50).milliseconds).map { _ =>
    println(s"${ids.mkString(", ")} are inserted")
    ids.map(_.toString)
  }
}

val userIds = 1 to 50

// applyに渡してジョブをenqueueする。中でsynchronizedするのでcollectで呼ぶ意味はあまりない。
// バッチグループごとにsleepを入れたりする機能は無さそうなので、enqueue自体のタイミングで制御すると良さそう。
val insertedFuture = Future.collect(userIds.map(batcher)) 

println(Await.result(insertedFuture))

// thresholdに関係なく全リクエストの実行を開始したいときはflushBatchを呼ぶ。
batcher.flushBatch() 

// sizeThreshold = 1でuserIdsを10個投入した場合の出力
// 8 are inserted
// 5 are inserted
// 2 are inserted
// 6 are inserted
// 3 are inserted
// 4 are inserted
// 1 are inserted
// 10 are inserted
// 9 are inserted
// 7 are inserted
// ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// sizeThreshold = 100でuserIdsを10個投入した場合の出力
// timeThreshold秒経過してから以下が出力される。
// 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 are inserted
// ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// sizeThreshold = 20でuserIdsを10個投入した場合の出力(sizePercentile=0.3fなので20*0.3=6個ずつ実行される。
// ただし7~10個目は数が足りないのでtimeThreshold秒経過してから出力される。
// 1, 2, 3, 4, 5, 6 are inserted
// 7, 8, 9, 10 are inserted
// ArraySeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

便利const

import com.twitter.util.Future

// 以下の2つは同じ
Future.Unit
Future.Done

// 便利定数
Future.True
Future.False
Future.None
Future.Nil
Future.???

// Java用なので使わなくて良い
Future.Void

callback

コールバックよりはmap/flatMapでつなぎたい。ensureはありかも。

onSuccess/onFailure

省略

respond/ensure

import com.twitter.util.Future

// respond
// 完了した際のコールバックを設定する。
// 主にライブラリなどの汎用コード向けとコメントにある。
// respondが返すFutureはコールバックが実行されたときではなく、元の処理(f1の処理)が完了されたときに完了となる。
// そのためrespondは結果表示やリソースの後始末などの副作用を起こす前提となる。(promiseを解決したりしているコードがちょくちょく見かけられる。)
val f1 = Future.value(1)
val f2 = f1.respond {
  case Return(a) => println(a)
  case Throw(e) => println(e)
}

// ensure
// respondとほぼ同じだが、引数として計算の結果を受け取らない。
// 成功しても失敗してもいいから単にリソースをcloseしたい場合などに使える。
val f1 = Future.value(1)
val f2 = f1.ensure {
  println("f1 finished")
}

キャンセル

raise

import com.twitter.util.{Await, Future, FuturePool}

// Future.valueは即時評価なのでraiseできない。
// またinterruptibleUnboundedPoolを使っても、state=Doneになるとraiseを呼んでも正常系の値が返ってくるのでsleepでごまかしている
val f1 = FuturePool.interruptibleUnboundedPool {
  Thread.sleep(100)
  1
}
f1.raise(new RuntimeException("interrupt"))
Await.result(f) //  java.util.concurrent.CancellationException

// FuturePool.unboundedPoolを使う場合はFuture#interruptibleを使うとinterruptできるようになる。逆にinterruptibleを呼ばないとcancel出来ない。
val f2 = FuturePool.unboundedPool {
  Thread.sleep(100)
  1
}.interruptible()
f2.raise(new RuntimeException("interrupt"))
Await.result(f2)

raiseWithin

N秒以内に終わらないとタイムアウトといった指定が出来る。

import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.conversions.time._

implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)

val f = FuturePool.interruptibleUnboundedPool {
  Thread.sleep(3000)
  1
}

// 2秒後にraiseされる
f.raiseWithin(2.seconds)

Await.result(f)

within/by

winthinとbyはDurationを受け取るかTimeを受け取るかの違いしか無い。

raseWithinとwithin/byには処理自体のfutureをraiseするか、withinなどの呼び出しの返り値のみをraiseするかの微妙な違いがある。詳細は以下のコメントを参照。

import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.conversions.time._

implicit val timer = new com.twitter.util.JavaTimer(isDaemon = true)

val f1 = FuturePool.interruptibleUnboundedPool {
  Thread.sleep(3000)
  1
}

// 2秒後にf2がraiseされるが、raseWithinではf1,f2の両方raiseされるのに対し、within/byはf2のみがraiseされるためf1自体の結果は普通に取得することが出来る。
val f2 = f1.within(2.seconds)

println(Await.result(f1)) // 1が表示される

Await.result(f2)

monitored

Promise使いつつネストしていると辛くなるケースを救えるらしい。いまのところ使ったことは無い。

import java.util.concurrent.ForkJoinPool
import com.twitter.util.{Future, FuturePool, Try, Return, Throw, Promise}

import scala.util.control.NonFatal

val exception = new RuntimeException("test")

// 以下のようなケースを考えると、notMonitoredは決して終了しない。
val inner1 = new Promise[Int]
val inner2 = new Promise[Int]

val notMonitored: Future[Int] = {
  inner1.ensure {
    throw exception
    inner2.update(Return(2))
  }
  inner2
}

// このようなケースを防ぐために内部で起きた例外を伝搬してくれるのがFuture.monitored
val monitored: Future[Int] = Future.monitored {
  inner1.ensure {
    throw exception
    inner2.update(Return(2))
  }
  inner2
}

// before
inner1 // state=Waiting
inner2 // state=Waiting
notMonitored.poll // None
monitored.poll // None

inner1.update(Return(1))

// after
inner1 // state=Done
inner2 // state=Interuppted
notMonitored.poll // None (永久に終わらない)
monitored.poll // Some (例外が伝搬されるので終わる)

おまけ: Futureの再帰

ドキュメントにちゃんと実装してあるから再帰してもサンプルにあるようなコードではスタックオーバーフローにならないよと書いてある。

Futures — Util 6.45.0 documentation

ReladomoというORMを触ってみた

Reladomoゴールドマン・サックスが公開しているJava向けORMです。

https://github.com/goldmansachs/reladomo-kataチュートリアルが公開されていたので触ってみました。

ちなみにReladomoもReladomo-KataもApache License 2.0です。

Chaining

JavaのORMにも色々種類があると思いますが、Reladomoの特徴的な機能としてChainingが挙げられます。

Chainingという単語自体には色々意味があるようですが、Reladomoが持つChainingは監査のためにオブジェクトに対する変更履歴を全て保存し後から履歴を追跡出来るようにするための機能です。

監査用履歴は基本的には通常のアプリケーションから利用されることはあまりありませんが、履歴の保存や定期的な参照が必要になるようなユースケースは多数存在します。(たぶん) こういった履歴管理はエンティティの変更ごとに追加等が必要になるので結構な労力が必要になりますが、 Reladomoを使うとその辺の履歴管理をよしなにやってくれるので他のORMを使うより効率的に開発ができるということのようです。

肝心の履歴をどうやって保持するかですが、基本的にはデータの有効期限を繋いでいくモデリングになっているようです。 具体的には以下のようにIN_Z・OUT_Zというカラムでデータが有効になった日時とデータが更新されて無効になった日時を保存します。

item id name IN_Z OUT_Z
1 foo 2004-04-01T10:00Z 2004-04-01T12:00Z
1 bar 2004-04-01T12:00Z

このようなItemテーブルがあるとき、Reladomoに itemId == 1 のitemを問い合わせると自動的にOUT_Zを指定したクエリを生成して問い合わせてくれます。

SELECT * FROM ITEM
WHERE item_id = 1 AND OUT_Z = '9999-12-01 23:59:00.000'

ちなみにIN_Z, OUT_Zというカラム名9999-12-01... (infinity)などはオプションで指定可能です。

もちろんitemId = 1 のアイテムに関する変更履歴やある時点での情報を取得することも可能です。 データ更新時は履歴のことは考えずにデータをORM経由で更新すればOKです。

またデータの削除の代わりにOUT_Zに入っているinfinityを具体的な時刻に更新することでデータを無効にするterminate操作が可能です。

BiTemporal Modeling

履歴管理にはさらにBiTemporal Modelingというモデリング手法があります。

ざっくりいうとアプリケーションレベルの有効・無効時刻(BussinessDate)とトランザクションが起きた時刻レベルでの有効・無効時刻(ProcessingDate)を組み合わせる方式になっています。 この方式で時刻を管理すると、8/1にあるべきだった更新の反映が8/4日になってしまったので今までの履歴を全部調節する必要があるが、その際にどういう変更があったかも追跡できるようにしておきたいといった要望に答えることができます。

BiTemporal Modelingについては別途まとめたので詳しくはそちらも御覧ください。

二種類の時刻を使った詳細な履歴管理を行いたい場合、自分で色々書いているとどうしても複雑になりミスも考えられるのでORMである程度吸収されるというのはかなり魅力的に感じました。

メモ

以下は触ってみた感想メモです。

オブジェクト定義

  • objectの定義はxmlで書くらしい。最初はうおー現代ーと思ったけどIntelliJXMLサポートが賢いので意外と悪くなかった。IntelliJnoNamespaceSchemaLocation を見て適切に補完を出してくれるらしい。
    • schemaにはnullableとかmaxlengthやらindexやらを書く必要があるのでとにかく補完が効くのが重要で、それが達成されているのでjsonyamlかみたいなのはそこまで重要じゃないかなと思った。
    • そしてスキーマファイルにはちゃんと属性の意味がコメントで書いてあった・・!コードジャンプで行ける・・!
  • とはいえプログラミング言語で書きたい気持ちがあったりなかったり。

コード生成

  • コード生成はチュートリアルにある構成ならmvn compileですぐ出来る。
  • srcとgenerated_src以下にコードが生成される。srcの方は一回だけ生成して自分でいじる用途(ほぼ空のクラスが出来るだけ)。generated_srcの方はコード生成っぽいコードが出来る。
  • ついでにgenerated_resourceの方にddlも書き出してくれる。

コード

  • selectした結果がPersonListみたいな型になって、これScalaから使いにくいのではと思ったけどListのサブクラスになっているらしく、やりたければすぐasScalaできそうだった。(まだしてない)

この後、使い方について書こうかなと思ったんですがtutorialの例みるだけでいい感じになれそうだったので省略。

BiTemporal Data Modelに入門中

BiTemporal Modelingについてちょっとだけ調べたりしたのでメモ。 基本的に Temporal
Data Models(ppt注意)Temporal Databasesを大幅に端折ったものに他の資料を少しだけ入れた感じの内容です。

Temporal Data Models

BiTemporal ModelingはTemporal Data Modelsという分野で研究されているモデリングの一種です。 Temporal Data Modelでは時間によって変わっていくデータを扱います。 ちなみにここでいうData ModelはData Structure+Query Languageを意味します。

データが時間によって変わるということは何らかのUpdate操作が必要です。一番単純な方法を考えるとDBのレコードを直接更新(無効になったデータは削除)することでUpdate操作を実現できそうです。 この方式を使う場合、DBはある瞬間におけるシステムの状態を保持していることになります。

一方で、データの変化を後から追跡したい場合には単純なUpdate処理に加えて履歴データを含めた更新処理を作り込むことになります。 例えば人事管理システムでは、このユーザーが今どの部署に在籍しているのか?も重要ですが、このユーザーは今までどんな部署に在籍していたのかという履歴も重要になるためこのような作り込みが必要になります。

データの変化を追跡可能にするためには履歴データを保存する必要があります。このとき、履歴データにはなんらかの意味での時刻を含める必要があります。 考えられる時刻にはtransaction time, valid time, publication time,…など様々な種類の時間が考えられます。 どういう時刻が必要とされるかはアプリケーション次第・・なのですが、valid timeとtransaction timeが重要というbroad consensusがあるようです。[1]

valid time, transaction timeとは

valid timeは、あるfactがtrueになる時刻のことです。 例えば「Johnは10/1に入社した」という事実があればvalid timeは10/1 ~ infinityとなります。 また、「Johnの給料は8/1から8/4まで10ドルだった」という事実を考えるとvalid timeは8/1~8/4となります。

valid timeは定義上どのようなデータにも存在し、DBに登録されているかどうかは関係がありません。(factがtrueになるかどうかが問題) また、期間が定まっていてもよいですし無期限でも構いません。

一方transaction timeは、DBにデータが存在した時間 = (insertされた時刻〜delete時刻された時刻)です。 例えば「Johnは10/1に入社した」というデータが10/5にinsertされ3/31にdeleteされた場合、transaction timeは10/5~3/31です。

4つのデータモデル

さきほど説明した2種類の時間を使うか使わないかで4つのデータモデルが考えられます。

  • どちらもなし => Snapshot Model
  • valid timeのみ => Valid Time Data Model
  • transactional timeのみ => Transactional Data Model
  • valid time & transactional timeの組み合わせ => BiTemporal Data Model

なおこれらのデータモデルを使ったDBにはそれぞれ名前がついているようです[3]

  • Historical DB → valid-time DB
  • Rollback DB → transaction-time DB
  • Temporal DB → bitemporal DB

4つのデータモデルについて

さて、それぞれについて見ていきます。   Snapshot Modelは時刻情報を保持しません。そのため履歴の追跡などは行えません。*1 ということで、ここからはSnapshot Model以外の3つについて見ていきます。

[1]のp.9にある例そのままですが、以下のようなケースを考えます。

  1. John was hired as a programmer (PRG) 
 with initial salary 2000 at time 1;
  2. John’s salary was raised to 3000 at time 3 
 (but recorded in the DB at time 4);
  3. John became a database administrator (DBA)
 at time 6.

Transaction-Time Model

transaction-time Modelで上記の例を考えると以下のようになります。

step1.(時刻1)

name job salary transaction_time
john PRG 2000 [1,NOW]

step2.(時刻4)

※1レコード目のtransaction-timeをNOWから3に更新することでdeleteを表現しています。

name job salary transaction_time
john PRG 2000 [1,3]
john PRG 3000 [4,NOW]

step3.(時刻6)

name job salary transaction_time
john PRG 2000 [1,3]
john PRG 3000 [4,5]
john DBA 3000 [6,Now]

transaction-timeではDBの記録時刻単位でしか保存できないのでデータの変更を遡って行うケースをうまく表現できません。 そのため本来は時刻3から給料が上がったはずのjohnの給料が実際には時刻4から反映されることになってしまいます。*2

Valid-Time Model

valid-timeでは以下のようになります。

step1.(時刻1)

name job salary valid_time
john PRG 2000 [1,NOW]

step2.(時刻4)

name job salary valid_time
john PRG 2000 [1,2]
john PRG 3000 [3,NOW]

step3.(時刻6)

name job salary valid_time
john PRG 2000 [1,2]
john PRG 3000 [3,5]
john DBA 3000 [6,Now]

valid-timeではfactの時刻を記録するため、transaction-timeのときと違い遡ったデータの更新を正確に表現できています。 これでいいような気もしますが、これだけだとどの時点のデータが遡って更新されたのかが判別できません。 step3でいうと2レコード目が遡って更新されたデータですが、これは1, 3レコード目となんら違いはない普通のデータに見えます。

しかし、時刻3でのDBにSELECTをかけたとするとvalid_time=3でのJohnの給料は2000になっているはずです。 一方で時刻4でのDBにSELECTをかけたとするとvalid_time=3でのJohnの給料は3000になっているはずです。

遡った修正があるとこういった乖離が起こるわけですが、乖離が実際に存在するかどうか?(あるデータに遡った更新があったかどうか)はValid-Time Modelでは分かりません。

BiTemporal Model

Valid-Time Modelでは遡ったデータの更新があったかどうかを追跡することができませんでした。 医療情報などの特に重要なデータを扱う場合、このような遡った更新による影響も含めて追跡したいことがあります。*3

データの変更を遡って行えるようにしつつ、どのデータが遡って更新されたのか、遡った更新の前はどんな状態だったのか、といった色々な種類の履歴を追跡できるのがBiTemporal Modelです。

前述したようにBiTemporal Modelはvalid-timeとtransaction-timeの両方を保存する方式です。 BiTemporal Modelでさきほどの例を表現すると以下のようなデータになります。

step1.(時刻1)

name job salary transaction_time valid_time
john PRG 2000 [1,Now] [1,Now]

step2.(時刻4)

name job salary transaction_time valid_time
john PRG 2000 [1,3] [1,Now]
john PRG 2000 [4,Now] [1,2]
john PRG 3000 [4,Now] [3,Now]

step3.(時刻6)

name job salary transaction_time valid_time
john PRG 2000 [1,3] [1,Now]
john PRG 2000 [4,Now] [1,2]
john PRG 3000 [4,5] [3,Now]
john PRG 3000 [6,Now] [3,5]
john DBA 3000 [6,Now] [6,Now]

transaction_time.Start > valid_time.Start となっているデータが後から更新されたデータと捉えることができます。(正確にはfactがtrueになった後にinsertされたデータ) step3での2, 3, 4レコード目が該当します。

読み方が少しむずかしいですがtransaction_timeを固定するとわかりやすいです。

  • transaction_time=1とすると、1~Nowの時点でjohnは(job = PRG, salary = 2000)です。
  • transaction_time=4とすると、1~2の時点でjohnは(job = PRG, salary = 2000)で、3~NOWの時点でjohnは(job = PRG, salary = 3000)です。
  • transaction_time=6とすると、1~2の時点でjohnは(job = PRG, salary = 2000)で、3~5の時点でjohnは(job = PRG, salary = 3000)で6~NOWの時点でjohnは(job = DBA, salary = 3000)です。

履歴の変更という観点で見るとstep3における1レコード目のvalid_timeが2レコード目のvalid_time = [1, 2]によって変更されたとみることができます。この変更がinsertされた時刻はtransaction_timeを見れば良いので4だとわかります。 3レコード目は要件にあった遡った更新で入れたいデータです。このデータのvalid_timeは4レコード目によってvalid_time = [3,5]に変更されたと見ることができます。

アプリケーションで最も用があると思われる今現在アプリケーション的に有効なデータはtransaction_timeを最新に固定すれば取得することができます。 また、transaction_timeを過去のものに固定すれば、その時点でDBが認識していたデータも取得できます。 もちろん、transaction_timeを固定すれば、その時点でDBが認識していたvalid_timeベースでの変更履歴を取得することもできるため、様々な追跡の要望に答えられます。

まとめ

  • DBに保存する時刻としてはtransaction-time/valid-timeの二種類が有用
  • transaction-time DBでは過去に遡ったデータの更新は難しい
  • valid-time DBでは過去に遡ったデータの更新は可能だが、遡って更新を行ったデータか行っていないデータかの判別が難しい
  • BiTemporal Dataでは遡ったデータの更新も更新自体の追跡も可能

etc..

Temporal Database機能はOracleでサポートされているようです。  PostgreSQLにもextensionで存在しているようでした。これらのDBサポートがどこまでやってくれてどのくらい便利なのかは未調査です。

今回は期間で時刻を区切る方法にしか言及しませんでしたがTimestampの方式も色々あるようです。他にもRDBっぽいデータの持ち方以外の方法など、話題が結構あってそれに対応するクエリ言語をどうするかといった部分もおもしろそうですがなんとなく一区切りついたのでまた気が向いたら調べようかなと思います。

参考

  1. Temporal
Data Models(ppt)
  2. Temporal Databases - Richard T. Snodgrass 1998
  3. Temporal Databases - Richard T. Snodgrass and Ilsoo Ahn 1986
  4. Temporal and Real-Time Databases: A Survey(ppt)
  5. Temporal Data and The Relational Model

*1:とはいえ追跡が不要ならこれで十分です。

*2:現実的にはupdated_atをいじったりして反映できそうでが、その場合はvalid-time modelで時刻を管理していると言えそうです。

*3:valid-timeとtransaction-timeの乖離が発生する他の例としては、ある時間にセンサーで観測したデータが地理的に別の地点にあるDBに保存されるケースが考えられます。観測したデータに加えてデータが何分遅れで到着したのかといったデータも欲しい場合はvalid-time, transaction-timeの片方のみでは表現できないのでBiTemporal Modelのような柔軟なモデリングが必要になる可能性があります。

case classのフィールド名とフィールドに対応する値を渡すと型安全にフィールド名と対応する値の組を渡してくれるアノテーションをscalametaで書いた

ややこしいタイトルシリーズ(?)

モチベーションが伝わりづらいけどDBへのアップデートでフィールドを4つか5つ指定したい(かつcase classのインスタンスは情報が足りなくて作れないという制約がある)という状況を考えます。

このとき sql.update(テーブル, Map[更新するカラムの名前 -> 更新する値]) のようなインターフェースがあるとするとMap[フィールド名 => Any] のようなものが必要になります。 例えば User(id: Long, tpe: Int, name: String) では Map("id" -> 0L, tpe: 1, name: "モフたろう") のようなものになる。

フィールド名を手書きするのは嫌だし、idに間違えてStringを渡してしまうことも避けたいので (フィールド名, そのフィールドに応じた型) というタプルを型安全に作ってからMap[String, Any]を生成する方針にしたい。

ということで Mofu.MacroPorter.wan(value = 1) のようにすると型チェックされた上で “wan” -> 1 がかえってくるマクロを作りました。 コンパニオンオブジェクトにフィールド名と全く同じ名前のメソッドが生えます。(同じ名前で使いやすいのか微妙だ)

gist.github.com

感想

  • shapelessのLensを使えばフィールド名の取得は行けそうだったけど、渡されたフィールドの型に応じた型をチェックするのが難しかった。LabelledGenericもLensもインスタンスがないとフィールドの型チェックが難しそうにみえた。情報としては揃っていてcan not proveになやまされたのでテクニックを知っていれば多分取れそう。

  • 書いたけど例のごとくIntelliJでは真っ赤なので作ってみたけど微妙だなーとなったのでそっ閉じ。

opensslでBASE64エンコードされた文字列をdecryptしようとしたら769bytes以上になるとエラーになる件

scalaでファイルを暗号化&base64エンコードしてopensslでファイルを平文にしようとしたところ平文サイズが768byte以下のファイルは平文にできるのに、769byte以上の文字列を入れると下記のエラーが出る現象について。

$ openssl aes-256-cbc -iv $IV -K $KEY -d -base64 -in mofu
bad decrypt
140735797384200:error:0606506D:digital envelope routines:EVP_DecryptFinal_ex:wrong final block length:evp_enc.c:518:

Enc - OpenSSLWiki を読むと、以下のような記述がありました。

These flags tell OpenSSL to apply Base64-encoding before or after the cryptographic operation. The -a and -base64 are equivalent. If you want to decode a base64 file it is necessary to use the -d option. By default the encoded file has a line break every 64 characters. To suppress this you can use in addition to -base64 the -A flag. This will produce a file with no line breaks at all. You can use these flags just for encoding Base64 without any ciphers involved.

-base64(または-a)つけるとbase64デコードするよ。このとき64文字ごとに改行されていることを想定するよ。改行がない場合は-base64と一緒に -A つけてね。とありました。 ということで以下のようにして成功しました。

$ openssl aes-256-cbc -iv $IV -K $KEY -d -base64 -A -in mofu

勉強不足・・