読者です 読者をやめる 読者になる 読者になる

pub subっぽいponylangのサンプルコードを書いた

Implementing chat application with Pony lang | Software Development Ramblings に影響を受けて、chatではなくpub/subっぽい何かを作ってみることにしました。

内容的にはpub/subのアーキテクチャがどうあるべきかということではなく、 ponylangでこういうことが書きたいときはこうすれば良いかもしれない。といったサンプルのような感じです。 (なのでtopicが1個しか作れないなど、機能がほとんど無いのでそのまま実用は出来ないと思います。) もちろんponylangは絶賛勉強中なのでベストプラクティスではないところもあると思いますが、 コンパイルは通っているのでうわあああコンパイル通らないよおおおという時には役立つかもしれません。

pony-pubsub

コードは以下にあります。コミットごとに解説していきます。 GitHub - matsu-chara/pony-pubsub: sample application of ponylang また、上記のチュートリアルで解説していることはカットしています。

構成

mainでpublisher, subscriberをいくつか作成します。 その後、serverに登録してpublisherがpublishしたメッセージをsubscriberが受け取ってコンソールに出力します。topicの購読機能などは無く、publishされたメッセージは問答無用で全てのsubscriberに配信されます。

この時serverがactorなので、server.register_publisher()に渡す引数はvalなどの「安全に渡せる」参照である必要があります。 こんかいはそのためにp1, p2, s1, s2といった変数をvalで定義しています。*1またPublisherSubscriberのregisterメソッドで、レシーバーがval参照であることを要求しています。 これがないとレシーバーが以外でもregisterメソッドを呼べてしまうことになりコンパイルエラーになります。

// main.pony
actor Main
  new create(env:Env) =>
    let server = Server(env)

    let p1: Publisher val = recover Publisher("niconare") end
    let p2: Publisher val = recover Publisher("nicolun") end
    let s1: Subscriber val = recover Subscriber("user1", env) end
    let s2: Subscriber val = recover Subscriber("user2", env) end

    p1.register(server)
    p2.register(server)
    s1.register(server)
    s2.register(server)

    p1.publish(server, "new presentation!")
    p2.publish(server, "foo joined!")
// publisher.pony
class Publisher
  let name: String

  new create(name': String) =>
    name = name'

  fun val register(server: Server) =>
    server.register_publisher(this)

  fun publish(server: Server, message: String) =>
    server.publish(name + " sends " + message)
// subscriber.pony
class Subscriber
  let name: String
  let _env: Env

  new create(name': String, env': Env) =>
    name = name'
    _env = env'

  fun val register(server: Server) =>
    server.register_subscriber(this)

  fun box receive(message: String) =>
    _env.out.print(name + " received [" + message + "]")
// server.pony
use "collections"

actor Server
  let _env: Env
  let pubs: List[Publisher val]
  let subs: List[Subscriber val]

  new create(env': Env) =>
    _env = env'
    pubs = List[Publisher val]
    subs = List[Subscriber val]

  be register_publisher(pub: Publisher val) =>
    pubs.push(pub)

  be register_subscriber(sub: Subscriber val) =>
    subs.push(sub)

  be publish(message: String) =>
    for sub in subs.values() do
      sub.receive(message)
    end

これが1つめのコミットの大まかな内容です。

コンストラクタ

始めのうちはvalを表現するために以下のように、recoverを使っていました。

let p1: Publisher val = recover Publisher("niconare", server) end
let p2: Publisher val = recover Publisher("nicolun", server) end

これでもOKですが2つめのコミットのようにnew val createとすると、 refの代わりにvalが返ってくるので以下のようにrecover無しで書くことが出来ます。

let p1 = Publisher("niconare", server)
let p2 = Publisher("nicolun", server)

これを反映させたコードと以前のコードとのdiffを表示すると以下のようになります。

diff --git a/main.pony b/main.pony
index c124bc1..0cb0653 100644
--- a/main.pony
+++ b/main.pony
@@ -2,10 +2,10 @@ actor Main
   new create(env:Env) =>
     let server = Server(env)
 
-    let p1: Publisher val = recover Publisher("niconare") end
-    let p2: Publisher val = recover Publisher("nicolun") end
-    let s1: Subscriber val = recover Subscriber("user1", env) end
-    let s2: Subscriber val = recover Subscriber("user2", env) end
+    let p1 = Publisher("niconare")
+    let p2 = Publisher("nicolun")
+    let s1 = Subscriber("user1", env)
+    let s2 = Subscriber("user2", env)
 
     p1.register(server)
     p2.register(server)
diff --git a/publisher.pony b/publisher.pony
index 0bb9ab3..4fb591e 100644
--- a/publisher.pony
+++ b/publisher.pony
@@ -1,7 +1,7 @@
 class Publisher
   let name: String
 
-  new create(name': String) =>
+  new val create(name': String) =>
     name = name'
 
   fun val register(server: Server) =>
diff --git a/subscriber.pony b/subscriber.pony
index 37c9367..bc5c864 100644
--- a/subscriber.pony
+++ b/subscriber.pony
@@ -2,7 +2,7 @@ class Subscriber
   let name: String
   let _env: Env
 
-  new create(name': String, env': Env) =>
+  new val create(name': String, env': Env) =>
     name = name'
     _env = env'

ここまでで2つめのコミットの内容になりました。

subscriber

Serverでsubscriber.receive(message)を直接呼ぶと、 Subscriberがクラスなのでブロックします。これは動作が遅いSubscriberが居る場合に致命的なパフォーマンス悪化を招く可能性があります。 この問題に対処するために、3つめのコミットのように各々のSubscriberにreceiveさせるためのWorker Actorを作るようにしました。 workerはsubscriberにreceiveをdelegateするだけですが、 こうするとServerはSubscriberの受け取る処理の重さにかかわらず速やかに待機状態に戻れます。 workerの実装は以下のようになります。

// worker.pony
actor Worker
  let sub: Subscriber val

  new create(sub': Subscriber val) =>
    sub = sub'

  be receive(message: String) =>
    sub.receive(message)

これを利用するために以下のような変更を行います。

diff --git a/server.pony b/server.pony
index 6e8f402..057e4c1 100644
--- a/server.pony
+++ b/server.pony
@@ -3,21 +3,21 @@ use "collections"
 actor Server
   let _env: Env
   let pubs: List[Publisher val]
-  let subs: List[Subscriber val]
+  let sub_workers: List[Worker]
 
   new create(env': Env) =>
     _env = env'
     pubs = List[Publisher val]
-    subs = List[Subscriber val]
+    sub_workers = List[Worker]
 
   be register_publisher(pub: Publisher val) =>
     pubs.push(pub)
 
   be register_subscriber(sub: Subscriber val) =>
-    subs.push(sub)
+    sub_workers.push(Worker(sub))
 
   be publish(message: String) =>
-    for sub in subs.values() do
-      sub.receive(message)
+    for worker in sub_workers.values() do
+      worker.receive(message)
     end

ここまでで3つめのコミットの内容になりました。

publisher

話を面白くするためにpublisherは何らかの時点でアプリケーションを止めずにリロードをかけたいことがあるとします。 この仕様を満たすようなpublisherのリロード機能をつけるために以下の関数を定義したくなります。

be reload(pubs': List[Publisher val])

しかし、actorのbehaviorにはiso, val, tagしか渡すことができません。 なので以下のようにメンバー変数の型修飾子とreloadのシグネチャを変更することで対応したくなります。

var pubs: List[Publisher val] val
be reload(pubs': List[Publisher val] val)

しかしこうすると以下のpush操作がうまく行かなくなります。

be register_publisher(pub: Publisher val) => pubs.push(pub)

これはpubsがmutableな参照であることを要求しているからです。

このようなvarではあるけど安全にmutableなデータを渡したいという際に役立つのがisoのような mutableを安全に扱うことができるreference capabilityです。

これを利用するとserverの実装は以下のように変化します。

diff --git a/server.pony b/server.pony
index 057e4c1..d24ca61 100644
--- a/server.pony
+++ b/server.pony
@@ -2,17 +2,20 @@ use "collections"
 
 actor Server
   let _env: Env
-  let pubs: List[Publisher val]
+  var pubs: List[Publisher val] iso
   let sub_workers: List[Worker]
 
   new create(env': Env) =>
     _env = env'
-    pubs = List[Publisher val]
+    pubs = recover List[Publisher val] end
     sub_workers = List[Worker]
 
   be register_publisher(pub: Publisher val) =>
     pubs.push(pub)
 
+  be reload(pubs': List[Publisher val] iso) =>
+    pubs = consume pubs'
+
   be register_subscriber(sub: Subscriber val) =>
     sub_workers.push(Worker(sub))

reloadのメソッドを読んで見るためにmain.ponyに以下の行を追加してみましょう。

diff --git a/main.pony b/main.pony
index 0cb0653..dbad1d2 100644
--- a/main.pony
+++ b/main.pony
@@ -1,3 +1,5 @@
+use "collections"
+
 actor Main
   new create(env:Env) =>
     let server = Server(env)
@@ -15,3 +17,18 @@ actor Main
     p1.publish(server, "new presentation!")
     p2.publish(server, "foo joined!")
 
+    let p3 = Publisher("niconico")
+    let p4 = Publisher("neconeco")
+    let p5 = Publisher("noconoco")
+
+    let new_publishers  = recover
+      let ps = List[Publisher val]
+      ps.push(p3)
+      ps.push(p4)
+      ps.push(p5)
+    end
+
+    server.reload(consume new_publishers)
+    p3.publish(server, "niconico!")
+    p4.publish(server, "nyanyan!")
+    p5.publish(server, "kameeee!")

ここまでで4つめのコミットの内容になりました。

main.ponyを実行してみて気がついたのですが、ここまで来てpublisherがserverに登録されていても何も意味がないことが判明しました。 しかたがないので登録されていないpublisherからのpublishは無視する仕様を追加することにしましょう。 今回はpublishの時点でpublisherリストに載っていない場合はpublishせずに無視するという単純な仕組みにしました。

そのためにはfor文でpubsを走査し、publishしてきたpublisherと一致する物がリストに含まれているかを検査すれば良いのですが、 メンバー変数のpubsを調子に乗ってisoにしてしまったので困ったことになりました。 具体的には、以下の様なfor文がコンパイルエラーになってしまいます。

var isRegistered = false
for pub in pubs.values() do
  isRegistered = isRegistered or (sender == pub)
end
/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:27: receiver capability is not a subtype of method capability
    for pub in pubs.values() do
                          ^
/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:16: receiver type: List[Publisher val] iso!
    for pub in pubs.values() do
               ^
/usr/local/Cellar/ponyc/0.2.1/packages/collections/list.pony:217:3: target type: List[Publisher val] box
  fun values(): ListValues[A, this->ListNode[A]]^ =>
  ^
/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:24:27: this would be possible if the arguments and return value were all sendable
    for pub in pubs.values() do

これはList[A].values()のreceiverの指定がboxだからです。 しかし前述のようにregister_publisherメソッドで、pubsを書きかえたいので valやboxにはできません。

この場合は、 register_publisherで受け取るメッセージisoのままにしておき、メンバー変数をtrnにすると対処できます。 つまり以下の様なコードにすればOKです。

diff --git a/server.pony b/server.pony
index d24ca61..6748551 100644
--- a/server.pony
+++ b/server.pony
@@ -2,7 +2,7 @@ use "collections"
 
 actor Server
   let _env: Env
-  var pubs: List[Publisher val] iso
+  var pubs: List[Publisher val] trn
   let sub_workers: List[Worker]
 
   new create(env': Env) =>
@@ -19,7 +19,13 @@ actor Server
   be register_subscriber(sub: Subscriber val) =>
     sub_workers.push(Worker(sub))
 
-  be publish(message: String) =>
+  be publish(sender: Publisher val, message: String) =>
+    var isRegistered = false
+    for pub in pubs.values() do
+      isRegistered = isRegistered or (sender == pub)
+    end
+    if isRegistered == false then return end
+
     for worker in sub_workers.values() do
       worker.receive(message)
     end

publishメソッドで登録済みpublisherかどうかを判定するためにsender: Publisherを受け取るようにしました。

trnはwrite uniqueのみを保証するのでisoと同じ問題は発生しません。(問題なくvalues()も呼べます。) またtrnならwriteできるので、register_publisherで値を書き換えることも出来ます。

valで受け取ってListをまるまるcopyしてrefにする。という方法は取らないことに注意してください。 ponyのreference capabilityはこのようなメッセージのコピーをどうやったら安全になくすことができるか?という問題への解決策なので、コピーしてしまうと少しもったいないことになります。 (もちろんパフォーマンスが重要でないケースではシンプルに出来るメリットもあるはずなのでケースバイケースではあります。)

さて上記の変更を加えてponycコンパイルすると以下の様なエラーになると思います。

/Users/matsu_chara/Documents/sand/pony-pubsub/server.pony:25:46: couldn't find 'eq' in 'Publisher'
      isRegistered = isRegistered or (sender == pub)

これはPublishereqメソッドが無いというエラーです。ponyでは==eqエイリアスになっています。 pobyには置演算子は対応するメソッドで実装するというルールがあります。 詳しくはhttp://tutorial.ponylang.org/expressions/infix-ops/を参照してください。 さきほどのpublishメソッドの変更を合わせると以下の様な変更になります。

diff --git a/publisher.pony b/publisher.pony
index 4fb591e..845ae9a 100644
--- a/publisher.pony
+++ b/publisher.pony
@@ -7,6 +7,9 @@ class Publisher
   fun val register(server: Server) =>
     server.register_publisher(this)
 
-  fun publish(server: Server, message: String) =>
-    server.publish(name + " sends " + message)
+  fun val publish(server: Server, message: String) =>
+    server.publish(this, name + " sends " + message)
+
+  fun box eq(that: Publisher box): Bool =>
+    name == that.name

mainを変更して未登録publisherからのpublishが無視されることを確認しましょう。

diff --git a/main.pony b/main.pony
index dbad1d2..d70a896 100644
--- a/main.pony
+++ b/main.pony
@@ -32,3 +32,7 @@ actor Main
     p3.publish(server, "niconico!")
     p4.publish(server, "nyanyan!")
     p5.publish(server, "kameeee!")
+
+    // this message will be ignored
+    p1.publish(server, "new presentation!")
+

ここまでで5つめのコミットの内容になりました。

さて、未登録publisherを弾く機能は出来ましたが、publishのたびに毎回Listを一巡するのは少々効率が悪そうです。 そこでSetを使った実装に変更します。

ponylangのSetの実装(https://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/collections/set.pony)を見るとSetHashSetの型パラメータをいくつか指定したものになっています。EquatableHashableAを渡せばSetにしてくれるようです。 始めはSetでやっていたんですが、上手く動かなかったので一旦SetIsに逃げることにしました。SetIsHashIsというis (pointerによる比較)ベースでhash()eqを実装してくれている物を使っているようです。実装はhashfun.ponyにあります。

また、Set.contains()的なメソッドが見つからなかったのでSet[A].set(target) < Thatのような実装でごまかすことにしました。微妙に残念ですがそこまで悪く無いような気もします。 変更は以下のようになります。

diff --git a/server.pony b/server.pony
index 6748551..42c3207 100644
--- a/server.pony
+++ b/server.pony
@@ -2,29 +2,26 @@ use "collections"
 
 actor Server
   let _env: Env
-  var pubs: List[Publisher val] trn
+  var pubs: SetIs[Publisher val] trn
   let sub_workers: List[Worker]
 
   new create(env': Env) =>
     _env = env'
-    pubs = recover List[Publisher val] end
+    pubs = recover SetIs[Publisher val] end
     sub_workers = List[Worker]
 
   be register_publisher(pub: Publisher val) =>
-    pubs.push(pub)
+    pubs.set(pub)
 
-  be reload(pubs': List[Publisher val] iso) =>
+  be reload(pubs': SetIs[Publisher val] iso) =>
     pubs = consume pubs'
 
   be register_subscriber(sub: Subscriber val) =>
     sub_workers.push(Worker(sub))
 
   be publish(sender: Publisher val, message: String) =>
-    var isRegistered = false
-    for pub in pubs.values() do
-      isRegistered = isRegistered or (sender == pub)
-    end
-    if isRegistered == false then return end
+    let isRegistered = (SetIs[Publisher val].set(sender) < pubs)
+    if(isRegistered == false) then return end
 
     for worker in sub_workers.values() do
       worker.receive(message)

pubsの型を変えたのでmainにも以下のような修正が必要です。

diff --git a/main.pony b/main.pony
index d70a896..9d52be6 100644
--- a/main.pony
+++ b/main.pony
@@ -22,10 +22,10 @@ actor Main
     let p5 = Publisher("noconoco")

     let new_publishers  = recover
-      let ps = List[Publisher val]
-      ps.push(p3)
-      ps.push(p4)
-      ps.push(p5)
+      let ps = SetIs[Publisher val]
+      ps.set(p3)
+      ps.set(p4)
+      ps.set(p5)
     end

     server.reload(consume new_publishers)

ここまでで6つめのコミットの内容になりました。

ここで、SetIsが出来たのでSetの実装に再チャレンジします。 hash()は実装しているのでinterface Hashableの要求には答えているのに何故だろうと思ったらEquatableeqと共にneも要求するようです。なおHashablehttps://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/collections/hashfun.ponyにあり、fun hash(): U64 を要求します。またEquatablehttps://github.com/CausalityLtd/ponyc/blob/0.2.1/packages/builtin/comparable.ponyにあり、fun eq(that: box->A): Boolfun ne(that: box->A): Bool => not eq(that)を要求します。ということで、忘れていたneの実装を行い無事にSetによる実装に切り替えることが出来ました。 実装は以下のように変更されます。

diff --git a/main.pony b/main.pony
index 9d52be6..773852b 100644
--- a/main.pony
+++ b/main.pony
@@ -22,7 +22,7 @@ actor Main
     let p5 = Publisher("noconoco")
 
     let new_publishers  = recover
-      let ps = SetIs[Publisher val]
+      let ps = Set[Publisher val]
       ps.set(p3)
       ps.set(p4)
       ps.set(p5)
diff --git a/publisher.pony b/publisher.pony
index 845ae9a..a952c47 100644
--- a/publisher.pony
+++ b/publisher.pony
@@ -13,3 +13,9 @@ class Publisher
   fun box eq(that: Publisher box): Bool =>
     name == that.name
 
+  fun box ne(that: Publisher box): Bool =>
+    name != that.name
+
+  fun hash(): U64 =>
+    name.hash()
+
diff --git a/server.pony b/server.pony
index 42c3207..e33a2da 100644
--- a/server.pony
+++ b/server.pony
@@ -2,25 +2,25 @@ use "collections"
 
 actor Server
   let _env: Env
-  var pubs: SetIs[Publisher val] trn
+  var pubs: Set[Publisher val] trn
   let sub_workers: List[Worker]
 
   new create(env': Env) =>
     _env = env'
-    pubs = recover SetIs[Publisher val] end
+    pubs = recover Set[Publisher val] end
     sub_workers = List[Worker]
 
   be register_publisher(pub: Publisher val) =>
     pubs.set(pub)
 
-  be reload(pubs': SetIs[Publisher val] iso) =>
+  be reload(pubs': Set[Publisher val] iso) =>
     pubs = consume pubs'
 
   be register_subscriber(sub: Subscriber val) =>
     sub_workers.push(Worker(sub))
 
   be publish(sender: Publisher val, message: String) =>
-    let isRegistered = (SetIs[Publisher val].set(sender) < pubs)
+    let isRegistered = (Set[Publisher val].set(sender) < pubs)
     if(isRegistered == false) then return end
 
     for worker in sub_workers.values() do

これで7つめのコミットの内容になりました。

ToDo?

debug print packageを使おうと思ったんですが、masterにはあったけどpony 0.2.1には無いという罠にはまったのでそのうち使いたいなと思います。

まとめ

コンパイル通らない時にどうするんだーみたいな時にこれが少しでも役に立つと良いなと思います。

*1:追記:recoverでvalにしていますが、subscirberのfactory methodをnew val ~~のように定義したほうがスマートかもしれません。