前回Javaのマルチスレッドについて勉強して、辛すぎる(∩´﹏`∩)となったので、akkaを勉強するぞ―!と思っていたのですが、気になる資料を見つけました。
コンピュータ業界はオブジェクト指向を基に関数型言語に移行する動きにあります。 更に並行処理はマルチスレッドから、Actorモデルに移行しようとしています。 Actorの利点だけがクローズアップされて、全て解決のような風潮があります。 そこで、Actor, Agent, CSPについて調べてみました。
_人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人人_
> Actorの利点だけがクローズアップされて、全て解決のような風潮があります。 <
 ̄YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY ̄
Actorだけで満足してはいけないアトモスフィアを感じたので、マルチスレッドやActorモデル以外の並行処理について勉強してみました。
なお、この文章は最近並行処理の勉強を始めた人が書いているので用語の使い方が間違っていたり変な解釈を含んでいる可能性があります。個人用メモ的な意味合いが強いのでご注意ください。最後につけた参考文献は役に立つと思いますので、でっかいリンク集だと思って御覧ください。
マルチスレッド?並行処理?
ところでマルチスレッドという用語ですが今まで何となく並行処理と同じ意味で使っていました。 上記の資料だとJavaなどで用いられている並行処理の特定の計算方法を指しているようです。
佐藤先生がErlang、Scala、Javaなどの並行処理を斬る! - スティルハウスの書庫中の引用*1によると、
そして1990年後ぐらいから議論されてきた話題のひとつは、 各オブジェクトは高々のひとつの実行スレッドとするシングルスレッド実行モデルと、一つのオブジェクトでも複数同時処理を許すマルチスレッド実行モデルのどちらがいいかというもの。
さてシングルスレッド実行モデルとマルチスレッド実行モデルは何が違うかというと、前者は各オブジェクトは能動的に処理してもよい、つまり各オブジェクトに高々一つのスレッドを割り当てるが、(中略)。一方、マルチスレッド実行モデルでは各オブジェクトは能動的に処理されますし、各オブジェクトにも複数のスレッドが割り当てられます。(略)
ということで、あるオブジェクトが複数のスレッドに同時に処理されるようなモデルをマルチスレッド実行モデルと呼ぶようです。いままではなんとなく複数スレッドが立ち上がって計算されてたらマルチスレッドとか並行処理と思っていましたが、「一つのオブジェクトの処理に割り当てられるスレッドがシングルかマルチか」が重要なようです。(このへんの分類は理解が足りず、正確でないかもしれません。)
調べる限りシングルスレッド実行モデルではmessage passing方式、マルチスレッド実行モデルではshared memory方式が有名なようです。
プロセス計算
一番最初に上げた資料を読み進めていくとP.8に、ActorのセマンティクスはCSP/CCSから導かれた(なのでActorはCSP/CCSのサブセットとして考えて良い)とあります。*2
調べてみるとCSP(= Communicating Sequential Processes) やCCS(= Calculus of Communicating Systems)はプロセス計算と呼ばれる分野に属するようです。 Wikipediaで歴史を見てみると、CSPやCCSが源流となってプロセス計算と分野が出来上がり、これらの発展形としてπ計算などが生まれているようです。
プロセス計算はプロセス代数とも呼ばれています。並行処理を含む計算をモデル化することで、「こことここは等価だよね」、「ここは分配法則が使えるよね」といった具合に振る舞いを変えずに簡略化できるようになるようです。テストによる動作検証が非常に難しい並行処理において、数学的に動作が変わらないことが保証されている変形を使えるプロセス計算は、複雑なアプリケーションを記述するための強力な道具になりそうです。その辺の等価性(や双模倣意味論)についてはプロセス代数の基本と関連ツールの紹介や電子情報通信学会「知識ベース」 c 電子情報通信学会 2010 1/(39) 7 群(コンピュータ-ソフトウェア)-- 1 編(ソフトウェア基礎)1-2 通信プロセス計算が詳しいです。
CSP
CSPはGoのgoroutineのベースになっているようで、資料がたくさんあります。*3
goroutine(CSP)とAkka(Actor)の違いを考えると、
- akkaはメッセージの送り先を明示的に指定する。
- goroutineはチャンネルへのメッセージを受け取るプロセスが誰だかは指定しない。
という違いがあり、どうやらこのメッセージの受け手の匿名性はそのままCSPとActorモデルの違いということが出来そうです。 *4
π計算
「CSP is Goのアレ」という雑な理解をしたので、CCSの方も勉強したくなりますね。今回は欲張ってCCSの発展形、π計算について調べました。
プロセス代数の基本と関連ツールの紹介によると、 π計算はCCS+チャネル渡しとのことなので、π計算を抑えておけばCCSもいい感じに抑えられるんじゃないでしょうか。(若干適当・・・。)
チュートリアル:π計算P.8によると、CCSはチャネルが静的に決定されているので、 ノードが動的に決定されるようなネットワーク構造を持つ問題に対して応用できないという欠点があり、これを克服するためにチャネル渡しを追加したのがπ計算のようです。
さきほど挙げた、 チュートリアル:π計算 では、CCSやπ計算の表現方法が数式や図を多用して解説されていますので、詳しく知りたい方はそちらを御覧ください。
Scalaでのπ計算
ここからは、PiLib: A Hosted Language for Pi-Calculus Style Concurrency - Vincent Cremet, Martin Oderskyを参照します。
PiLibはπ計算のためのScalaライブラリです。ですがもうdeprecatedなので使えません(Actorを使えとのこと)(´・ω・`) Scala2.9.3で使えることを確認したので、今回はそちらを使うことにします。
論文のサンプルそのままに、容量2のBufferを作ってみます。
Bufferは以下の様な特徴を持ちます。
- B0の状態で、 putチャネルに
x
が来るとB1(x)になる。 - B1(x)の状態で、putチャネルに
y
が来るとB2(x, y)になる。getチャネルにx
を送るとB0になる。 - B2(x, y)の状態で、getチャネルに
x
を送るとB1(y)になる。
これをPiLibで表現すると以下のようになります。
import scala.concurrent.pilib._ object Buffer { /** * π計算風な記法で書くと以下のとおり。(注意: 「get$」は「getの上にバーが付いている(texで言うと、\overline{get}) 」のを表現できなかったので、代わりに使っている。) * Buffer(put, get) = B0(put, get) * B0(put, get) = put(x).B1(put, get, x) * B1(put, get, x) = get$<x>.B0(put, get) + put(y).B2(put, get, x, y) * B2(put, get, x, y) = get$<x>.B1(put, get, y) */ def Buffer[A](put: Chan[A], get: Chan[A]): Unit = { def B0: Unit = choice(put * { x => B1(x) }) // putからデータxを受け取ってB1(x)になる def B1(x: A): Unit = choice(get(x) * B0, put * { y => B2(x, y) }) // getにデータxを送ってB0になるか、 putからデータyを受け取ってB2(x, y)になる。 def B2(x: A, y: A): Unit = choice(get(x) * B1(y)) // getにデータxを受け取ってB1(y)になる。 B0 } }
put * { x => B1(x) }
のように、 a * { x => c}
の形式になっているものはInput Guarded Processと呼ばれます。cにはxを受け取った後の「継続」が入ります。
get(x) * B0
のように、a(ν) * c
となっているものはOutput Guarded Processと呼ばれます。やはりcはνを出力した後の「継続」が入ります。
また、def choice[A](gs: Gp[A]*): A
はn個のguarded process
を受け取り、そのどれか(チャネルの受給が満たされたもの)を実行します。どのチャネルともコミュニケーションが取れない場合はブロックします。
ところでChan[A]
だと特定のオブジェクトA
しか送れませんが、以下のように定義すると
π計算っぽく、「チャネルを送れるチャネル」を作ることが出来ます。
class Channel extends Chan[Channel]
これらを使って、Bufferにデータを送ったり消費したりするProducerとConsumerを定義しましょう。
/** * π計算風な記法で書くと以下のとおり。(注意: 「get$」は「getの上にバーが付いている(texで言うと、\overline{get}) 」のを表現できなかったので、代わりに使っている。) * Producer(put, get) = νx.put$<x>.Producer(put, get) * Consumer(put, get) = get(x).Consumer(put, get) */ def Producer(put: Channel, get: Channel): Unit = { val x = new Channel choice(put(x) * Producer(put, get)) // putチャネルにデータxを送り続ける } def Consumer(put: Channel, get: Channel): Unit = { choice(get * { x => Consumer(put, get)}) // getチャネルからデータxを受け取り続ける。(受け取ったデータxは無視) }
あとは、spawn
関数で呼ぶことが出来ます。
val put = new Channel val get = new Channel spawn < Producer(put, get) | Buffer(put, get) | Consumer(put, get) >
|
で区切られている部分は並行に処理されます。この例では「Producerがデータを送り続ける」、「Bufferは2つまでデータを貯める」、「Consumerはデータを取得する」という一連の操作を行います。
上記の例ではProducerが無限にデータを送り続けスタックオーバーフローしてしまったので、回数制限を設けてついでにprintlnもつけたバージョンで実行してみます。
実行ファイルはこちら => https://github.com/matsu-chara/pilib_sample/blob/master/src/main/scala/Buffer.scala
import scala.concurrent.pilib._ // from: [PiLib: A Hosted Language for Pi-Calculus Style Concurrency - Vincent Cremet, Martin Odersky](http://lampwww.epfl.ch/~cremet/publications/pilib.pdf) object Buffer { /** * Buffer(put, get) = B0(put, get) * B0(put, get) = put(x).B1(put, get, x) * B1(put, get, x) = get$<x>.B0(put, get) + put(y).B2(put, get, x, y) * B2(put, get, x, y) = get$<x>.B1(put, get, y) */ def Buffer[A](put: Chan[A], get: Chan[A]): Unit = { def B0: Unit = choice(put * { x => println("B0 => B1"); B1(x) }) def B1(x: A): Unit = choice(get(x) * { println("B1 => B0"); B0 }, put * { y => println("B1 => B2"); B2(x, y) }) def B2(x: A, y: A): Unit = choice(get(x) * { println("B2 => B1"); B1(y) }) B0 } // A π-calculus channel is a channel that can carry other π-calculus channel class Channel extends Chan[Channel] /** * Producer(put, get) = νx.put$<x>.Producer(put, get) * Consumer(put, get) = get(x).Consumer(put, get) * * ν put, get. Producer(put, get) | Buffer(put, get) | Consumer(put, get) */ def Producer(put: Channel, get: Channel, count: Int = 5): Unit = { if (count > 0) { val x = new Channel choice(put(x) * { println("current: " + count) Producer(put, get, count - 1) }) } else { sys.exit(0) } } def Consumer(put: Channel, get: Channel): Unit = { choice(get * { x => Consumer(put, get) }) } def main(args: Array[String]): Unit = { val put = new Channel val get = new Channel spawn < Producer(put, get) | Buffer(put, get) | Consumer(put, get) > } }
並行っぽく実行するたびに経過が変わるのか見てみました。
// ===1回目=== B0 => B1 current: 5 B1 => B2 current: 4 B2 => B1 B1 => B2 current: 3 B2 => B1 B1 => B2 current: 2 B2 => B1 B1 => B2 current: 1 B2 => B1 B1 => B0 // ===2回目=== B0 => B1 current: 5 B1 => B2 current: 4 B2 => B1 B1 => B2 current: 3 B2 => B1 B1 => B0 current: 2 B0 => B1 B1 => B0 B0 => B1 current: 1 B1 => B0
となり、いい感じですね!
Consumerにわざと遅い処理を加えるとProducerがデータを送りたくてもBufferが一杯でブロックされてしまう様子が見られます。 逆にProducerにわざと遅い処理を加えるとConsumerがデータを取得したいけどデータが無くてブロックされている様子も見ることが出来ます。
元の論文では、このサンプルのあとPiLibの実装について詳しく紹介されていますが今回は使い方の紹介に留めます。
CSP-π
再び最初の資料P.32によると CSPモデルとπ計算は(ほぼ)統一できるとのことです。 とても気になりますが、疲れたので今回はとりあえずこの辺で切り上げたいと思います。
参考文献
- 並行処理の統一モデルへの動向
- actorとCSPの違いや、統一モデルについて。
- 佐藤先生がErlang、Scala、Javaなどの並行処理を斬る! - スティルハウスの書庫
- 記事で引用した部分以外もおもしろいので一度は読むべし。
- プロセス計算 - Wikipedia
- いつもどおり詳しいwikipedia
- プロセス代数の基本と関連ツールの紹介
- プロセス代数について。付録の「なぜプロセス”代数”?」も必見
- 電子情報通信学会「知識ベース」 c 電子情報通信学会 2010 1/(39) 7 群(コンピュータ-ソフトウェア)-- 1 編(ソフトウェア基礎)1-2 通信プロセス計算
- CCSやπ計算の等価性や意味論などについて詳しく載っている。
- Communicating Sequential Processes (CSP)
- CSPについて。ActorとCSPのブロッキングの違いなどが図解されていてとても分かりやすい
- Goの並列処理について: Slides
- ErlangとGolangを比較してみる - Qiita
- ErlangとGo(ActorとCSP)の比較。対比や使い分けが詳細に書いてあります。
- チュートリアル:π計算 日本ソフトウェア科学会 FOSE'97, 1997年12月 講師:佐藤 一郎
- CCS・π計算がどういったものか数式や図を多用して説明されている。神。
- PiLib: A Hosted Language for Pi-Calculus Style Concurrency
- π計算のScalaライブラリPiLibについての論文
- A π-Calculus Internal Domain-Specific
Language for Scala
- 詳しい。読めてない。
*1:引用の元記事は http://home.att.ne.jp/sigma/satoh/diary.html
*2:そうなるとなんだか、タイトルが怪しくなってきそう・・・
*3:CCSを利用した有名な(?)言語の例は見つけられませんでした
*4:とはいえActorはCSPから生まれているようなので、”違い”と言ってしまうと語弊があるかもしれません(?)