マルチスレッドでもActorでもない並行処理

前回Javaのマルチスレッドについて勉強して、辛すぎる(∩´﹏`∩)となったので、akkaを勉強するぞ―!と思っていたのですが、気になる資料を見つけました。

並行処理の統一モデルへの動向

コンピュータ業界はオブジェクト指向を基に関数型言語に移行する動きにあります。 更に並行処理はマルチスレッドから、Actorモデルに移行しようとしています。 Actorの利点だけがクローズアップされて、全て解決のような風潮があります。 そこで、Actor, Agent, CSPについて調べてみました。

_人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人_
> Actorの利点だけがクローズアップされて、全て解決のような風潮があります。 <
 ̄YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY

Actorだけで満足してはいけないアトモスフィアを感じたので、マルチスレッドやActorモデル以外の並行処理について勉強してみました。

なお、この文章は最近並行処理の勉強を始めた人が書いているので用語の使い方が間違っていたり変な解釈を含んでいる可能性があります。個人用メモ的な意味合いが強いのでご注意ください。最後につけた参考文献は役に立つと思いますので、でっかいリンク集だと思って御覧ください。

マルチスレッド?並行処理?

ところでマルチスレッドという用語ですが今まで何となく並行処理と同じ意味で使っていました。 上記の資料だとJavaなどで用いられている並行処理の特定の計算方法を指しているようです。

佐藤先生がErlang、Scala、Javaなどの並行処理を斬る! - スティルハウスの書庫中の引用*1によると、

そして1990年後ぐらいから議論されてきた話題のひとつは、 各オブジェクトは高々のひとつの実行スレッドとするシングルスレッド実行モデルと、一つのオブジェクトでも複数同時処理を許すマルチスレッド実行モデルのどちらがいいかというもの。
さてシングルスレッド実行モデルとマルチスレッド実行モデルは何が違うかというと、前者は各オブジェクトは能動的に処理してもよい、つまり各オブジェクトに高々一つのスレッドを割り当てるが、(中略)。一方、マルチスレッド実行モデルでは各オブジェクトは能動的に処理されますし、各オブジェクトにも複数のスレッドが割り当てられます。(略)

ということで、あるオブジェクトが複数のスレッドに同時に処理されるようなモデルをマルチスレッド実行モデルと呼ぶようです。いままではなんとなく複数スレッドが立ち上がって計算されてたらマルチスレッドとか並行処理と思っていましたが、「一つのオブジェクトの処理に割り当てられるスレッドがシングルかマルチか」が重要なようです。(このへんの分類は理解が足りず、正確でないかもしれません。)

調べる限りシングルスレッド実行モデルではmessage passing方式、マルチスレッド実行モデルではshared memory方式が有名なようです。

プロセス計算

一番最初に上げた資料を読み進めていくとP.8に、ActorのセマンティクスはCSP/CCSから導かれた(なのでActorはCSP/CCSのサブセットとして考えて良い)とあります。*2

調べてみるとCSP(= Communicating Sequential Processes) やCCS(= Calculus of Communicating Systems)はプロセス計算と呼ばれる分野に属するようです。 Wikipediaで歴史を見てみると、CSPやCCSが源流となってプロセス計算と分野が出来上がり、これらの発展形としてπ計算などが生まれているようです。

プロセス計算はプロセス代数とも呼ばれています。並行処理を含む計算をモデル化することで、「こことここは等価だよね」、「ここは分配法則が使えるよね」といった具合に振る舞いを変えずに簡略化できるようになるようです。テストによる動作検証が非常に難しい並行処理において、数学的に動作が変わらないことが保証されている変形を使えるプロセス計算は、複雑なアプリケーションを記述するための強力な道具になりそうです。その辺の等価性(や双模倣意味論)についてはプロセス代数の基本と関連ツールの紹介電子情報通信学会「知識ベース」 c 電子情報通信学会 2010 1/(39) 7 群(コンピュータ-ソフトウェア)-- 1 編(ソフトウェア基礎)1-2 通信プロセス計算が詳しいです。

CSP

CSPはGoのgoroutineのベースになっているようで、資料がたくさんあります。*3

goroutine(CSP)とAkka(Actor)の違いを考えると、

  • akkaはメッセージの送り先を明示的に指定する。
  • goroutineはチャンネルへのメッセージを受け取るプロセスが誰だかは指定しない。

という違いがあり、どうやらこのメッセージの受け手の匿名性はそのままCSPとActorモデルの違いということが出来そうです。 *4

π計算

「CSP is Goのアレ」という雑な理解をしたので、CCSの方も勉強したくなりますね。今回は欲張ってCCSの発展形、π計算について調べました。

プロセス代数の基本と関連ツールの紹介によると、 π計算はCCS+チャネル渡しとのことなので、π計算を抑えておけばCCSもいい感じに抑えられるんじゃないでしょうか。(若干適当・・・。)

チュートリアル:π計算P.8によると、CCSはチャネルが静的に決定されているので、 ノードが動的に決定されるようなネットワーク構造を持つ問題に対して応用できないという欠点があり、これを克服するためにチャネル渡しを追加したのがπ計算のようです。

さきほど挙げた、 チュートリアル:π計算 では、CCSやπ計算の表現方法が数式や図を多用して解説されていますので、詳しく知りたい方はそちらを御覧ください。

Scalaでのπ計算

ここからは、PiLib: A Hosted Language for Pi-Calculus Style Concurrency - Vincent Cremet, Martin Oderskyを参照します。

PiLibはπ計算のためのScalaライブラリです。ですがもうdeprecatedなので使えません(Actorを使えとのこと)(´・ω・`) Scala2.9.3で使えることを確認したので、今回はそちらを使うことにします。

論文のサンプルそのままに、容量2のBufferを作ってみます。

Bufferは以下の様な特徴を持ちます。

  • B0の状態で、 putチャネルにxが来るとB1(x)になる。
  • B1(x)の状態で、putチャネルにyが来るとB2(x, y)になる。getチャネルにxを送るとB0になる。
  • B2(x, y)の状態で、getチャネルにxを送るとB1(y)になる。

これをPiLibで表現すると以下のようになります。

import scala.concurrent.pilib._

object Buffer {
  /**
   * π計算風な記法で書くと以下のとおり。(注意: 「get$」は「getの上にバーが付いている(texで言うと、\overline{get}) 」のを表現できなかったので、代わりに使っている。)
   * Buffer(put, get) = B0(put, get)
   * B0(put, get) = put(x).B1(put, get, x)
   * B1(put, get, x) = get$<x>.B0(put, get) + put(y).B2(put, get, x, y)
   * B2(put, get, x, y) = get$<x>.B1(put, get, y)
   */
  def Buffer[A](put: Chan[A], get: Chan[A]): Unit = {
    def B0: Unit = choice(put * { x => B1(x) })                                  // putからデータxを受け取ってB1(x)になる
    def B1(x: A): Unit = choice(get(x) * B0, put * { y => B2(x, y) })    // getにデータxを送ってB0になるか、 putからデータyを受け取ってB2(x, y)になる。
    def B2(x: A, y: A): Unit = choice(get(x) * B1(y))                          // getにデータxを受け取ってB1(y)になる。
    B0
  }
}

put * { x => B1(x) }のように、 a * { x => c} の形式になっているものはInput Guarded Processと呼ばれます。cにはxを受け取った後の「継続」が入ります。

get(x) * B0のように、a(ν) * cとなっているものはOutput Guarded Processと呼ばれます。やはりcはνを出力した後の「継続」が入ります。

また、def choice[A](gs: Gp[A]*): Aはn個のguarded processを受け取り、そのどれか(チャネルの受給が満たされたもの)を実行します。どのチャネルともコミュニケーションが取れない場合はブロックします。

ところでChan[A]だと特定のオブジェクトAしか送れませんが、以下のように定義すると π計算っぽく、「チャネルを送れるチャネル」を作ることが出来ます。

class Channel extends Chan[Channel]

これらを使って、Bufferにデータを送ったり消費したりするProducerとConsumerを定義しましょう。

  /**
   * π計算風な記法で書くと以下のとおり。(注意: 「get$」は「getの上にバーが付いている(texで言うと、\overline{get}) 」のを表現できなかったので、代わりに使っている。)
   * Producer(put, get) = νx.put$<x>.Producer(put, get)
   * Consumer(put, get) = get(x).Consumer(put, get)
   */
  def Producer(put: Channel, get: Channel): Unit = {
    val x = new Channel
    choice(put(x) * Producer(put, get))  // putチャネルにデータxを送り続ける
  }

  def Consumer(put: Channel, get: Channel): Unit = {
    choice(get * { x => Consumer(put, get)}) // getチャネルからデータxを受け取り続ける。(受け取ったデータxは無視)
  }

あとは、spawn関数で呼ぶことが出来ます。

    val put = new Channel
    val get = new Channel
    spawn < Producer(put, get) | Buffer(put, get) | Consumer(put, get) >

|で区切られている部分は並行に処理されます。この例では「Producerがデータを送り続ける」、「Bufferは2つまでデータを貯める」、「Consumerはデータを取得する」という一連の操作を行います。

上記の例ではProducerが無限にデータを送り続けスタックオーバーフローしてしまったので、回数制限を設けてついでにprintlnもつけたバージョンで実行してみます。

実行ファイルはこちら => https://github.com/matsu-chara/pilib_sample/blob/master/src/main/scala/Buffer.scala

import scala.concurrent.pilib._

// from: [PiLib: A Hosted Language for Pi-Calculus Style Concurrency - Vincent Cremet, Martin Odersky](http://lampwww.epfl.ch/~cremet/publications/pilib.pdf)
object Buffer {

  /**
   * Buffer(put, get) = B0(put, get)
   * B0(put, get) = put(x).B1(put, get, x)
   * B1(put, get, x) = get$<x>.B0(put, get) + put(y).B2(put, get, x, y)
   * B2(put, get, x, y) = get$<x>.B1(put, get, y)
   */
  def Buffer[A](put: Chan[A], get: Chan[A]): Unit = {
    def B0: Unit = choice(put * { x => println("B0 => B1"); B1(x) })
    def B1(x: A): Unit = choice(get(x) * {
      println("B1 => B0"); B0
    }, put * { y => println("B1 => B2"); B2(x, y) })
    def B2(x: A, y: A): Unit = choice(get(x) * {
      println("B2 => B1"); B1(y)
    })
    B0
  }

  // A π-calculus channel is a channel that can carry other π-calculus channel
  class Channel extends Chan[Channel]

  /**
   * Producer(put, get) = νx.put$<x>.Producer(put, get)
   * Consumer(put, get) = get(x).Consumer(put, get)
   *
   * ν put, get. Producer(put, get) | Buffer(put, get) | Consumer(put, get)
   */
  def Producer(put: Channel, get: Channel, count: Int = 5): Unit = {
    if (count > 0) {
      val x = new Channel
      choice(put(x) * {
        println("current: " + count)
        Producer(put, get, count - 1)
      })
    } else {
      sys.exit(0)
    }
  }

  def Consumer(put: Channel, get: Channel): Unit = {
    choice(get * { x =>
      Consumer(put, get)
    })
  }

  def main(args: Array[String]): Unit = {
    val put = new Channel
    val get = new Channel
    spawn < Producer(put, get) | Buffer(put, get) | Consumer(put, get) >
  }
}

並行っぽく実行するたびに経過が変わるのか見てみました。

// ===1回目===
B0 => B1
current: 5
B1 => B2
current: 4
B2 => B1
B1 => B2
current: 3
B2 => B1
B1 => B2
current: 2
B2 => B1
B1 => B2
current: 1
B2 => B1
B1 => B0

// ===2回目===
B0 => B1
current: 5
B1 => B2
current: 4
B2 => B1
B1 => B2
current: 3
B2 => B1
B1 => B0
current: 2
B0 => B1
B1 => B0
B0 => B1
current: 1
B1 => B0

となり、いい感じですね!

Consumerにわざと遅い処理を加えるとProducerがデータを送りたくてもBufferが一杯でブロックされてしまう様子が見られます。 逆にProducerにわざと遅い処理を加えるとConsumerがデータを取得したいけどデータが無くてブロックされている様子も見ることが出来ます。

元の論文では、このサンプルのあとPiLibの実装について詳しく紹介されていますが今回は使い方の紹介に留めます。

CSP-π

再び最初の資料P.32によると CSPモデルとπ計算は(ほぼ)統一できるとのことです。 とても気になりますが、疲れたので今回はとりあえずこの辺で切り上げたいと思います。

参考文献

*1:引用の元記事は http://home.att.ne.jp/sigma/satoh/diary.html

*2:そうなるとなんだか、タイトルが怪しくなってきそう・・・

*3:CCSを利用した有名な(?)言語の例は見つけられませんでした

*4:とはいえActorはCSPから生まれているようなので、”違い”と言ってしまうと語弊があるかもしれません(?)