Implementing chat application with Pony lang | Software Development Ramblings に影響を受けて、chatではなくpub/subっぽい何かを作ってみることにしました。
内容的にはpub/subのアーキテクチャがどうあるべきかということではなく、 ponylangでこういうことが書きたいときはこうすれば良いかもしれない。といったサンプルのような感じです。 (なのでtopicが1個しか作れないなど、機能がほとんど無いのでそのまま実用は出来ないと思います。) もちろんponylangは絶賛勉強中なのでベストプラクティスではないところもあると思いますが、 コンパイルは通っているのでうわあああコンパイル通らないよおおおという時には役立つかもしれません。
pony-pubsub
コードは以下にあります。コミットごとに解説していきます。 GitHub - matsu-chara/pony-pubsub: sample application of ponylang また、上記のチュートリアルで解説していることはカットしています。
構成
mainでpublisher, subscriberをいくつか作成します。 その後、serverに登録してpublisherがpublishしたメッセージをsubscriberが受け取ってコンソールに出力します。topicの購読機能などは無く、publishされたメッセージは問答無用で全てのsubscriberに配信されます。
この時serverがactorなので、server.register_publisher()
に渡す引数はval
などの「安全に渡せる」参照である必要があります。
こんかいはそのためにp1, p2, s1, s2といった変数をval
で定義しています。*1またPublisher
やSubscriber
のregisterメソッドで、レシーバーがval
参照であることを要求しています。
これがないとレシーバーが以外でもregisterメソッドを呼べてしまうことになりコンパイルエラーになります。
// main.pony actor Main new create(env:Env) => let server = Server(env) let p1: Publisher val = recover Publisher("niconare") end let p2: Publisher val = recover Publisher("nicolun") end let s1: Subscriber val = recover Subscriber("user1", env) end let s2: Subscriber val = recover Subscriber("user2", env) end p1.register(server) p2.register(server) s1.register(server) s2.register(server) p1.publish(server, "new presentation!") p2.publish(server, "foo joined!")
// publisher.pony class Publisher let name: String new create(name': String) => name = name' fun val register(server: Server) => server.register_publisher(this) fun publish(server: Server, message: String) => server.publish(name + " sends " + message)
// subscriber.pony class Subscriber let name: String let _env: Env new create(name': String, env': Env) => name = name' _env = env' fun val register(server: Server) => server.register_subscriber(this) fun box receive(message: String) => _env.out.print(name + " received [" + message + "]")
// server.pony use "collections" actor Server let _env: Env let pubs: List[Publisher val] let subs: List[Subscriber val] new create(env': Env) => _env = env' pubs = List[Publisher val] subs = List[Subscriber val] be register_publisher(pub: Publisher val) => pubs.push(pub) be register_subscriber(sub: Subscriber val) => subs.push(sub) be publish(message: String) => for sub in subs.values() do sub.receive(message) end
これが1つめのコミットの大まかな内容です。
コンストラクタ
始めのうちはval
を表現するために以下のように、recoverを使っていました。
let p1: Publisher val = recover Publisher("niconare", server) end let p2: Publisher val = recover Publisher("nicolun", server) end
これでもOKですが2つめのコミットのようにnew val create
とすると、
refの代わりにvalが返ってくるので以下のようにrecover無しで書くことが出来ます。
let p1 = Publisher("niconare", server) let p2 = Publisher("nicolun", server)
これを反映させたコードと以前のコードとのdiffを表示すると以下のようになります。
diff --git a/main.pony b/main.pony index c124bc1..0cb0653 100644 --- a/main.pony +++ b/main.pony @@ -2,10 +2,10 @@ actor Main new create(env:Env) => let server = Server(env) - let p1: Publisher val = recover Publisher("niconare") end - let p2: Publisher val = recover Publisher("nicolun") end - let s1: Subscriber val = recover Subscriber("user1", env) end - let s2: Subscriber val = recover Subscriber("user2", env) end + let p1 = Publisher("niconare") + let p2 = Publisher("nicolun") + let s1 = Subscriber("user1", env) + let s2 = Subscriber("user2", env) p1.register(server) p2.register(server) diff --git a/publisher.pony b/publisher.pony index 0bb9ab3..4fb591e 100644 --- a/publisher.pony +++ b/publisher.pony @@ -1,7 +1,7 @@ class Publisher let name: String - new create(name': String) => + new val create(name': String) => name = name' fun val register(server: Server) => diff --git a/subscriber.pony b/subscriber.pony index 37c9367..bc5c864 100644 --- a/subscriber.pony +++ b/subscriber.pony @@ -2,7 +2,7 @@ class Subscriber let name: String let _env: Env - new create(name': String, env': Env) => + new val create(name': String, env': Env) => name = name' _env = env'
ここまでで2つめのコミットの内容になりました。
subscriber
Serverでsubscriber.receive(message)
を直接呼ぶと、
Subscriberがクラスなのでブロックします。これは動作が遅いSubscriberが居る場合に致命的なパフォーマンス悪化を招く可能性があります。
この問題に対処するために、3つめのコミットのように各々のSubscriberにreceiveさせるためのWorker Actorを作るようにしました。
workerはsubscriberにreceiveをdelegateするだけですが、
こうするとServerはSubscriberの受け取る処理の重さにかかわらず速やかに待機状態に戻れます。
workerの実装は以下のようになります。
// worker.pony actor Worker let sub: Subscriber val new create(sub': Subscriber val) => sub = sub' be receive(message: String) => sub.receive(message)
これを利用するために以下のような変更を行います。
diff --git a/server.pony b/server.pony index 6e8f402..057e4c1 100644 --- a/server.pony +++ b/server.pony @@ -3,21 +3,21 @@ use "collections" actor Server let _env: Env let pubs: List[Publisher val] - let subs: List[Subscriber val] + let sub_workers: List[Worker] new create(env': Env) => _env = env' pubs = List[Publisher val] - subs = List[Subscriber val] + sub_workers = List[Worker] be register_publisher(pub: Publisher val) => pubs.push(pub) be register_subscriber(sub: Subscriber val) => - subs.push(sub) + sub_workers.push(Worker(sub)) be publish(message: String) => - for sub in subs.values() do - sub.receive(message) + for worker in sub_workers.values() do + worker.receive(message) end
ここまでで3つめのコミットの内容になりました。
publisher
話を面白くするためにpublisherは何らかの時点でアプリケーションを止めずにリロードをかけたいことがあるとします。 この仕様を満たすようなpublisherのリロード機能をつけるために以下の関数を定義したくなります。
be reload(pubs': List[Publisher val])
しかし、actorのbehaviorにはiso, val, tagしか渡すことができません。 なので以下のようにメンバー変数の型修飾子とreloadのシグネチャを変更することで対応したくなります。
var pubs: List[Publisher val] val be reload(pubs': List[Publisher val] val)
しかしこうすると以下のpush
操作がうまく行かなくなります。
be register_publisher(pub: Publisher val) => pubs.push(pub)
これはpubs
がmutableな参照であることを要求しているからです。
このようなvarではあるけど安全にmutableなデータを渡したいという際に役立つのがiso
のような
mutableを安全に扱うことができるreference capabilityです。
これを利用するとserverの実装は以下のように変化します。
diff --git a/server.pony b/server.pony index 057e4c1..d24ca61 100644 --- a/server.pony +++ b/server.pony @@ -2,17 +2,20 @@ use "collections" actor Server let _env: Env - let pubs: List[Publisher val] + var pubs: List[Publisher val] iso let sub_workers: List[Worker] new create(env': Env) => _env = env' - pubs = List[Publisher val] + pubs = recover List[Publisher val] end sub_workers = List[Worker] be register_publisher(pub: Publisher val) => pubs.push(pub) + be reload(pubs': List[Publisher val] iso) => + pubs = consume pubs' + be register_subscriber(sub: Subscriber val) => sub_workers.push(Worker(sub))
reloadのメソッドを読んで見るためにmain.ponyに以下の行を追加してみましょう。
diff --git a/main.pony b/main.pony index 0cb0653..dbad1d2 100644 --- a/main.pony +++ b/main.pony @@ -1,3 +1,5 @@ +use "collections" + actor Main new create(env:Env) => let server = Server(env) @@ -15,3 +17,18 @@ actor Main p1.publish(server, "new presentation!") p2.publish(server, "foo joined!") + let p3 = Publisher("niconico") + let p4 = Publisher("neconeco") + let p5 = Publisher("noconoco") + + let new_publishers = recover + let ps = List[Publisher val] + ps.push(p3) + ps.push(p4) + ps.push(p5) + end + + server.reload(consume new_publishers) + p3.publish(server, "niconico!") + p4.publish(server, "nyanyan!") + p5.publish(server, "kameeee!")
ここまでで4つめのコミットの内容になりました。
main.ponyを実行してみて気がついたのですが、ここまで来てpublisherがserverに登録されていても何も意味がないことが判明しました。 しかたがないので登録されていないpublisherからのpublishは無視する仕様を追加することにしましょう。 今回はpublishの時点でpublisherリストに載っていない場合はpublishせずに無視するという単純な仕組みにしました。
そのためにはfor文でpubs
を走査し、publish
してきたpublisher
と一致する物がリストに含まれているかを検査すれば良いのですが、
メンバー変数のpubs
を調子に乗ってiso
にしてしまったので困ったことになりました。
具体的には、以下の様なfor文がコンパイルエラーになってしまいます。
var isRegistered = false for pub in pubs.values() do isRegistered = isRegistered or (sender == pub) end
/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:27: receiver capability is not a subtype of method capability for pub in pubs.values() do ^ /Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:16: receiver type: List[Publisher val] iso! for pub in pubs.values() do ^ /usr/local/Cellar/ponyc/0.2.1/packages/collections/list.pony:217:3: target type: List[Publisher val] box fun values(): ListValues[A, this->ListNode[A]]^ => ^ /Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:27: this would be possible if the arguments and return value were all sendable for pub in pubs.values() do
これはList[A].values()
のreceiverの指定がbox
だからです。
しかし前述のようにregister_publisher
メソッドで、pubs
を書きかえたいので
valやboxにはできません。
この場合は、
register_publisher
で受け取るメッセージiso
のままにしておき、メンバー変数をtrn
にすると対処できます。
つまり以下の様なコードにすればOKです。
diff --git a/server.pony b/server.pony index d24ca61..6748551 100644 --- a/server.pony +++ b/server.pony @@ -2,7 +2,7 @@ use "collections" actor Server let _env: Env - var pubs: List[Publisher val] iso + var pubs: List[Publisher val] trn let sub_workers: List[Worker] new create(env': Env) => @@ -19,7 +19,13 @@ actor Server be register_subscriber(sub: Subscriber val) => sub_workers.push(Worker(sub)) - be publish(message: String) => + be publish(sender: Publisher val, message: String) => + var isRegistered = false + for pub in pubs.values() do + isRegistered = isRegistered or (sender == pub) + end + if isRegistered == false then return end + for worker in sub_workers.values() do worker.receive(message) end
publishメソッドで登録済みpublisherかどうかを判定するためにsender: Publisher
を受け取るようにしました。
trn
はwrite uniqueのみを保証するのでiso
と同じ問題は発生しません。(問題なくvalues()
も呼べます。)
またtrn
ならwriteできるので、register_publisher
で値を書き換えることも出来ます。
val
で受け取ってListをまるまるcopyしてref
にする。という方法は取らないことに注意してください。
ponyのreference capabilityはこのようなメッセージのコピーをどうやったら安全になくすことができるか?という問題への解決策なので、コピーしてしまうと少しもったいないことになります。
(もちろんパフォーマンスが重要でないケースではシンプルに出来るメリットもあるはずなのでケースバイケースではあります。)
さて上記の変更を加えてponyc
でコンパイルすると以下の様なエラーになると思います。
/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:25:46: couldn't find 'eq' in 'Publisher' isRegistered = isRegistered or (sender == pub)
これはPublisher
にeq
メソッドが無いというエラーです。ponyでは==
はeq
のエイリアスになっています。
pobyには置演算子は対応するメソッドで実装するというルールがあります。 詳しくはhttp://tutorial.ponylang.org/expressions/infix-ops/を参照してください。
さきほどのpublishメソッドの変更を合わせると以下の様な変更になります。
diff --git a/publisher.pony b/publisher.pony index 4fb591e..845ae9a 100644 --- a/publisher.pony +++ b/publisher.pony @@ -7,6 +7,9 @@ class Publisher fun val register(server: Server) => server.register_publisher(this) - fun publish(server: Server, message: String) => - server.publish(name + " sends " + message) + fun val publish(server: Server, message: String) => + server.publish(this, name + " sends " + message) + + fun box eq(that: Publisher box): Bool => + name == that.name
mainを変更して未登録publisherからのpublishが無視されることを確認しましょう。
diff --git a/main.pony b/main.pony index dbad1d2..d70a896 100644 --- a/main.pony +++ b/main.pony @@ -32,3 +32,7 @@ actor Main p3.publish(server, "niconico!") p4.publish(server, "nyanyan!") p5.publish(server, "kameeee!") + + // this message will be ignored + p1.publish(server, "new presentation!") +
ここまでで5つめのコミットの内容になりました。
さて、未登録publisherを弾く機能は出来ましたが、publishのたびに毎回Listを一巡するのは少々効率が悪そうです。 そこでSetを使った実装に変更します。
ponylangのSetの実装(https://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/collections/set.pony)を見るとSet
はHashSet
の型パラメータをいくつか指定したものになっています。Equatable
とHashable
なA
を渡せばSetにしてくれるようです。 始めはSet
でやっていたんですが、上手く動かなかったので一旦SetIs
に逃げることにしました。SetIs
はHashIs
というis
(pointerによる比較)ベースでhash()
とeq
を実装してくれている物を使っているようです。実装はhashfun.ponyにあります。
また、Set.contains()
的なメソッドが見つからなかったのでSet[A].set(target) < That
のような実装でごまかすことにしました。微妙に残念ですがそこまで悪く無いような気もします。
変更は以下のようになります。
diff --git a/server.pony b/server.pony index 6748551..42c3207 100644 --- a/server.pony +++ b/server.pony @@ -2,29 +2,26 @@ use "collections" actor Server let _env: Env - var pubs: List[Publisher val] trn + var pubs: SetIs[Publisher val] trn let sub_workers: List[Worker] new create(env': Env) => _env = env' - pubs = recover List[Publisher val] end + pubs = recover SetIs[Publisher val] end sub_workers = List[Worker] be register_publisher(pub: Publisher val) => - pubs.push(pub) + pubs.set(pub) - be reload(pubs': List[Publisher val] iso) => + be reload(pubs': SetIs[Publisher val] iso) => pubs = consume pubs' be register_subscriber(sub: Subscriber val) => sub_workers.push(Worker(sub)) be publish(sender: Publisher val, message: String) => - var isRegistered = false - for pub in pubs.values() do - isRegistered = isRegistered or (sender == pub) - end - if isRegistered == false then return end + let isRegistered = (SetIs[Publisher val].set(sender) < pubs) + if(isRegistered == false) then return end for worker in sub_workers.values() do worker.receive(message)
pubsの型を変えたのでmainにも以下のような修正が必要です。
diff --git a/main.pony b/main.pony index d70a896..9d52be6 100644 --- a/main.pony +++ b/main.pony @@ -22,10 +22,10 @@ actor Main let p5 = Publisher("noconoco") let new_publishers = recover - let ps = List[Publisher val] - ps.push(p3) - ps.push(p4) - ps.push(p5) + let ps = SetIs[Publisher val] + ps.set(p3) + ps.set(p4) + ps.set(p5) end server.reload(consume new_publishers)
ここまでで6つめのコミットの内容になりました。
ここで、SetIs
が出来たのでSet
の実装に再チャレンジします。
hash()
は実装しているのでinterface Hashable
の要求には答えているのに何故だろうと思ったらEquatable
はeq
と共にne
も要求するようです。なおHashable
はhttps://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/collections/hashfun.ponyにあり、fun hash(): U64
を要求します。またEquatable
はhttps://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/builtin/comparable.ponyにあり、fun eq(that: box->A): Bool
とfun ne(that: box->A): Bool => not eq(that)
を要求します。ということで、忘れていたne
の実装を行い無事にSet
による実装に切り替えることが出来ました。
実装は以下のように変更されます。
diff --git a/main.pony b/main.pony index 9d52be6..773852b 100644 --- a/main.pony +++ b/main.pony @@ -22,7 +22,7 @@ actor Main let p5 = Publisher("noconoco") let new_publishers = recover - let ps = SetIs[Publisher val] + let ps = Set[Publisher val] ps.set(p3) ps.set(p4) ps.set(p5) diff --git a/publisher.pony b/publisher.pony index 845ae9a..a952c47 100644 --- a/publisher.pony +++ b/publisher.pony @@ -13,3 +13,9 @@ class Publisher fun box eq(that: Publisher box): Bool => name == that.name + fun box ne(that: Publisher box): Bool => + name != that.name + + fun hash(): U64 => + name.hash() + diff --git a/server.pony b/server.pony index 42c3207..e33a2da 100644 --- a/server.pony +++ b/server.pony @@ -2,25 +2,25 @@ use "collections" actor Server let _env: Env - var pubs: SetIs[Publisher val] trn + var pubs: Set[Publisher val] trn let sub_workers: List[Worker] new create(env': Env) => _env = env' - pubs = recover SetIs[Publisher val] end + pubs = recover Set[Publisher val] end sub_workers = List[Worker] be register_publisher(pub: Publisher val) => pubs.set(pub) - be reload(pubs': SetIs[Publisher val] iso) => + be reload(pubs': Set[Publisher val] iso) => pubs = consume pubs' be register_subscriber(sub: Subscriber val) => sub_workers.push(Worker(sub)) be publish(sender: Publisher val, message: String) => - let isRegistered = (SetIs[Publisher val].set(sender) < pubs) + let isRegistered = (Set[Publisher val].set(sender) < pubs) if(isRegistered == false) then return end for worker in sub_workers.values() do
これで7つめのコミットの内容になりました。
ToDo?
debug print packageを使おうと思ったんですが、masterにはあったけどpony 0.2.1には無いという罠にはまったのでそのうち使いたいなと思います。
まとめ
コンパイル通らない時にどうするんだーみたいな時にこれが少しでも役に立つと良いなと思います。
*1:追記:recoverでvalにしていますが、subscirberのfactory methodをnew val ~~のように定義したほうがスマートかもしれません。