ProtocolBuffersでprimitiveのデフォルト値と値が入っていないことを区別したいときにどう書くか

結論

wrappers.protoが便利

背景

protobufでは値を省略したときに、その型で定められたデフォルト値が代入されます。 例えばstringを省略すると自動的に""を指定したことになり、「値が指定されなかった」のか「空文字列を明示的に入れた」のかを区別しないように書くことが求められます。

この仕様は便利なのですが、時には区別したいケースもあります。 このとき下の方式2, 3のようなテクニックを用いて未指定とデフォルト値を区別することができます。

message Example {
    // 1. ダイレクトに定義(デフォルト値と未指定を区別できない)
    string foo = 1;

    // 2. 専用のmessageを定義
    message Bar {
        string value = 1;
    }
    Bar bar = 2;

    // 3. oneofで定義
    oneof baz_option {
        string baz = 3;
    }
}

2のmessageをがんばって定義する形式は、パラメータが増えるとどんどんmessageが増えて辛くなってしまいます。 3は少しトリッキーなのがネックです。(また値の有無を switch-caseif (getBazOptionCase == BAZ) のように確認することになるため若干記述量も増します。)

wrappers.proto

と思ったらprimitive値をラップするものが公式で存在しました。

https://github.com/google/protobuf/blob/v3.1.0/src/google/protobuf/wrappers.proto#L31-L34

// Wrappers for primitive (non-message) types. These types are useful // for embedding primitives in the google.protobuf.Any type and for places // where we need to distinguish between the absence of a primitive // typed field and its default value.

とあり、まさに値がない事とデフォルト値を指定されたことを区別することができます。(Anyに値を突っ込むのにも使えるようです。)

これはmessageを都度定義する方式をただ汎用化したものかと思っていたのですが、仕様上で特別扱いされ、 Jsonマッピングした際にラップを無視して単に1.0や"str"のようなプリミティブな値として表現されるようです。 https://github.com/google/protobuf/blob/v3.1.0/src/google/protobuf/wrappers.proto#L50

都度メッセージを定義してしまうと "nyan": { "value": "str" } のような冗長なJsonになってしまうので、これを回避できるのは便利そうです。

仕様にもひっそりと Wrapper types という表現で特別扱いされていました。 protocol buffers - JSON MAPPING

Wrappers use the same representation in JSON as the wrapped primitive type, except that null is allowed and preserved during data conversion and transfer.

実際に動かしてみると以下のようになります。(コードは https://github.com/matsu-chara/proto_wrap にあります)

syntax = "proto3";

package example;

import "google/protobuf/wrappers.proto";

message Example {
    // 1. ダイレクトに定義(デフォルト値と未指定を区別できない)
    string foo = 1;

    // 2. 専用のmessageを定義
    message Bar {
        string value = 1;
    }
    Bar bar = 2;

    // 3. oneofで定義
    oneof baz_option {
        string baz = 3;
    }

    // 4. wrapperで定義
    google.protobuf.StringValue mofu = 4;
}
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
import com.google.protobuf.util.JsonFormat;
import example.Mofu.Example;

public class Main {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        Example ex = Example.newBuilder()
                .setFoo("foo")
                .setBar(Example.Bar.newBuilder().setValue("bar"))
                .setBaz("baz")
                .setMofu(StringValue.newBuilder().setValue("mofu"))
                .build();

        System.out.println("instance.toString");
        System.out.println(ex.toString());
        System.out.println("json");
        System.out.println(JsonFormat.printer().print(ex));
        System.out.printf("");

        Example empty = Example.getDefaultInstance();
        // foo == ""
        System.out.println("foo isEmpty: " + empty.getFoo().isEmpty());

        // bar == null (getBar returns defaultBar if bar == null)
        System.out.println("hasBar: " + empty.hasBar());
        System.out.println("bar isEmpty: " + empty.getBar().getValue().isEmpty());

        // baz ==  BAZOPTION_NOT_SET (getBaz returns "" if BazOptionCase != Baz)
        System.out.println("bazOptionCase: " + empty.getBazOptionCase());
        System.out.println("baz isEmpty: " + empty.getBaz().isEmpty());

        // mofu == null (getMofu returns defaultMofu if mofu == null)
        System.out.println("mofu hasMofu: " + empty.hasMofu());
        System.out.println("mofu isEmpty: " + empty.getMofu().getValue().isEmpty());
    }
}
instance.toString
instance.toString
foo: "foo"
bar {
  value: "bar"
}
baz: "baz"
mofu {
  value: "mofu"
}

json
{
  "foo": "foo",
  "bar": {
    "value": "bar"
  },
  "baz": "baz",
  "mofu": "mofu"
}
foo isEmpty: true
hasBar: false
bar isEmpty: true
bazOptionCase: BAZOPTION_NOT_SET
baz isEmpty: true
mofu hasMofu: false
mofu isEmpty: true

jsonの出力ではStringWrapperにあたる部分が簡略化されていることが分かります。
emptyの取得ではhasMofuやoneofといったテクニックで値が無いことが確認できます。

このようにWrapper typesを使えば未指定とデフォルト値を区別することが簡単にできて、なおかつjson変換の際に冗長さを減らすことができます。 もちろんフィールドが増えたり使いまわされたりするならmessageを定義するのが良いこともあるので、適切に使い分ける必要があるとは思いますが、単純にwrapしたいだけであれば選択肢に入ると思います。

ただし不用意にラップすると(クラスが増えるので)ビルド時間が増えたりするといったデメリットがあったりしますし、そもそものprotobufの設計思想から若干それているような気がしなくもないので第一選択肢としてはデフォルト値を利用する戦略を取るのが望ましい気がします。

redis叩く君をつくった

ponyで何か作りたいなと思ったのでひとまずScalaで作ってそれを移植しようかなーと思い立ち作ってみた。

GitHub - matsu-chara/redica

get/setしか叩け無いけどまあいいよね( ◜◡‾) レスポンスのパースが限定的なので勉強がてらもう少し作って見るかも。

せっかくだからnon-blockingにしたりとかredis-clusterとかまでやってみようかなー。(ponyとは・・)

Ponylang 0.3祝いと最近のPony事情

はろぽに〜₍₍ (ง´・_・`)ว ⁾⁾

Pony 0.3リリース 🎉

2016年8月26日にPony 0.3がリリースされました。 チュートリアルについてのブログを書いたときから大分経ったような気がするので、 最近のPony事情についてちょっとまとめようかなーと思います。

(ついでにチュートリアルについての記事はdeprecated的な文言を先頭に付けました。試してないですが多分色々動かないところがあると思います。( ◜◡‾))

Pony を取り巻く環境の変化

Main君爆誕

今年の5月ごろPonyのマスコットキャラクターであるMain君が生まれました。goで言うところのgopherポジションだと思います。 Introducing Main, the Pony Mascot.

https://pony.groups.io/g/user/attachment/285/0

最初は( ◜◡‾)???となったんですが、しばらく眺めていると謎の良さみが深まります。slackのemojiに登録しましょう。

ちなみに3月末頃に別の案が提案されていました。

どちらがいいとかは置いておいて、ponyのマスコットはMain君です。

ちなみにMain君ステッカーはこちらで購入することができます。もしぬいぐるみが出たら家用と職場用で2つ購入しようと思います。 Main by Sean T Allen (#12903) - Sticker Mule

さよならcausality

Ponyの開発、商用サポートなどを手掛けていたcausality社がcloseすることになったという情報が8月21日に公開されました。

しかしPonyの開発はコミュニティドリブン(+大学の研究などで?)続いていくと宣言されていますし、ponycのコミットを見る限りちゃんと続いているので言語的には問題なさそうです。

資金獲得などの面で色々あったのだと思いますが、やはりちゃんとした技術を売りにしたスタートアップでもうまくいかないことはあるんだなーという気持ちに。

Pony VUG始まる

Ponyのあれこれについて解説するライブ放送の録画がvimeoで見られるようになっています!

Sean Allen on Vimeo

僕のオススメは

  • Ponyに導入される(?)依存型について説明した 「Pony VUG #4: Luke Cheeseman: Simple Value-Dependent Types In Pony」

vimeo.com

コンパイル時にメソッドを呼び出して計算した結果を型パラメータに突っ込むみたいなことも出来て結構強力そうです。すごく複雑なことをしなくても、例えば行列の次元数を型パラメータにしておけば、行列積でかけ間違えてたらコンパイルエラーといった分かりやすいメリットを簡単に得ることが出来そうです。

  • reference capabilityのsubtyping関連を見なおして色々証明したらしい 「Pony VUG #5: George Steed: A Principled Design of Capabilities in Pony」

vimeo.com

の2つです。

まだPonyのGCについての論文 を読んだことがなければ 「Pony VUG #6: Andrew Turley: The Art of Forgetting - Garbage Collection in Pony」もおすすめです。

vimeo.com

という気持ちです。

Pony VUGは言語の入門だけでなく、実装の詳細やこれから入るかもしれない機能について突っ込んで触れられているので必見です。

チュートリアルがgitbookになった

なりました。 http://tutorial.ponylang.org/

以前は付いていたsyntax highlightがつかなくなった気がするんですが・・・という気持ちですがメジャー言語になればつくはずなので問題無いですね( ◜◡‾)

rfcの募集が始まった

すでにいくつかはマージされ、実装されています。

https://github.com/ponylang/ponyc/search?utf8=%E2%9C%93&q=implement+RFC&type=Issues

ちなみに、この方式はRustなどに影響を受けているようです。

Pony has adopted a RFC process

リリースサイクルがついに整う

何百コミットもあったのに一向にリリースされなかったPonyですが、homebrewの人が発端でリリースしようよ => 整えるか! という流れで無事に0.3リリースにたどり着いたようです🎉

今まで0.2.1使って「動かない(´;ω;`) => masterビルドして使ってね」コンボを食らってる人が何人もいたのでこれで少し落ち着きそうです。

ちなみに0.2.1からの差分は 1092 commits 524 changed files with 13,181 additions and 2,797 deletions です。そりゃ動かんわ! https://github.com/ponylang/ponyc/compare/0.2.1...0.3.0

言語で変わったところ

詳しくはチェンジログ参照なんですが、32bit ARM/X86対応が加わったりしました。

またletとほぼおなじだけど、ポインタをもたせるのではなくて直接埋め込むのでポインタデリファレンスの分少しだけ高速な Embedded fieldsという概念が加わりました。速そう(?) pony-tutorial/variables.md at 5295d2c35f21953e9fe6f997b9100273a6f72b42 · ponylang/pony-tutorial · GitHub

serializationの実装がmasterに入ったりしてdistributed Ponyへの道が近づいたようです。

https://pony.groups.io/g/user/topic/first_version_of_pony/1680751?p=,,,20,0,0,0:RecentPostDate%2FSticky,,,20,2,0,1680751

他にもコレクションのメソッドが増えたり、速くなったり、ネットワーク周りが改良されたり色々色々あるのですが、全部は把握できていないので誰か教えて下さい。 🙏

ちなみに全くどうでもよいですが、記念すべき1000個めのイシューはスケジューラのセグメンテーションフォールトでした。

Segfault in Pony scheduler · Issue #1000 · ponylang/ponyc · GitHub

今後のPony?

特に自分が未来を知っているわけではないですがErlangっぽいprocess monitoringの仕組みをいれる取り組みが進行中?( Erlang-style monitoring · Issue #350 · ponylang/ponyc · GitHub )とありますし、serializeとかが入ったので、distributed Ponyが粛々と進んでいる気配があります。 かと思えば依存型が入る?など謎の高度な仕組みが入ってきそうで最終的にどうなるのかよくわからないですが今後も注目の言語だと言えそうです。 ということで

何卒🙏

blockingとOOM [scala]

ExecutionContextとblockingについて調べたメモ [scala] - だいたいよくわからないブログ の続きです。

今回の記事の結論

  • blockingをたくさん呼ぶとOOMになるまでスレッド数が増え続けるっぽい。 ==> 追記: scala2.12 だとlimitがあるので安心して使えるらしいです。 scala.concurrent.blocking() - Septeni Engineer's Blog
  • blockingはThreadFactoryをカスタマイズしないと有効にならないのでForkJoinPoolからExecutionContextを生成した場合などは単に無視されるため安全(ただ、スレッド数は増えないのでデッドロックなどには注意)
  • ExecutionContext.global、akkaのdispatcherなどをExecutionContextとして使うとblockingが有効になるため、注意が必要。(ExecutionContext.globalは手抜き用なので良いとしても、akkaの場合は注意したほうが良い。)
  • とはいえblocking自体は有用なので使いたいケースもありそう

前回の記事のまとめ

前回の記事ではblockingでブロックする処理を包むと自動的にスレッド数が増えるため高速に処理されて嬉しいよという話を書きました。加えて、それが有効になるのはExecutionContext.globalなどのScalaが内部で提供しているExecutionContextかakkaのdispatcherを利用した場合だという話も書きました。

blockingとOOM

なるほどーと思ってそのまま放置していたのですが、ふと実際にスレッド数見てみるかと思って下記のような要領で確認してみました。 ちょっと長いですが、下記の3種類のExecutionContextでblockingな処理を大量に呼び出してどうなるかをみています。

  1. ExecutionContext.global
  2. ExecutionContext.fromExecutorService(new ForkJoinPool(50))
  3. ExecutionContext.fromExecutorService(new ForkJoinPool(1000, new DefaultThreadFactory, uncaughtExceptionHandler, false) (BlockContext付きのThreadFactory入りのForkJoinPool)

gist.github.com

上記コードだとxms1G, xmx1Gで、1,3のケースでは 5000~10000回程度呼び出すとOOMになりました。 BlockContextがついていない2のケースでは時間はかかりますがしっかり実行してくれます。 今までblockingだとスレッド数は ManagedBlocker が良い感じにしてくれるという雑な理解をしていましたが、どうも普通に増えるだけっぽいです(?)(単に実験コードがわるいだけの可能性があるので注意が必要ですが、少なくともOOMになるケース自体はあり得るようです。)

そもそもBlockContextつきのThreadFactory自体がscala処理系は簡単に使う方法を外部に公開している訳ではないことからも考えると、blockingはExecutionCotext.globalのような、そこまでヘビーに使わないときに便利なもの・・?みたいな理解になりました。

akkaのdispatcher

前回の記事でakkaのdispatcherはBlockContextを自前で実装していることを述べました。実験として以下の様なコードでOOMになるかどうかを実験しました。

gist.github.com

試した結果、ちゃんと(?)OOMになったので、akkaでブロック処理を行う際はblockingを書かないようにするか、十分に注意(DBでのブロックなら障害時にスレッド数が増えすぎないか?などを考える)する必要がありそうです。

blockingが動かないとコードによってはデッドロックもあり得る・・?といった背景もあるようですが、いずれにせよakkaでblockする処理を行う際はちゃんと並列数を見積もって予め適切なサイズにExecutionContextを分離しておくというオーソドックスなやり方が一番良さそうだなと思った次第です。

openjdk

openjdkの実装を少し見るかーと思ったのですが、あたりは全体的なロジックが掴めてないので部分的に読むのは難しそうでした・・。 今度時間を書けて読みたいような読みたくないような・・・。

jdk7u-jdk/ForkJoinPool.java at f4d80957e89a19a29bb9f9807d2a28351ed7f7df · openjdk-mirror/jdk7u-jdk · GitHub

jdk7u-jdk/ForkJoinPool.java at f4d80957e89a19a29bb9f9807d2a28351ed7f7df · openjdk-mirror/jdk7u-jdk · GitHub

ただ、 no compensation needed, create a replacement というのがあるので必ず増えるわけではないかもしれません。

まとめ

OOMについて調べましたが今回の実験ではメモリを1GB程度と少なめにしているので実際にはもっと耐えてくれるはずです。ただ負荷が強いプロダクションなどに投入するときは注意しましょう。という認識になりました。 blockingが要るのか要らないのかは微妙なところですがスレッドが足りなくてデッドロックしちゃうような処理があるばあいはblockingしたほうが良さそうですねー。とはいえそしたらスレッドプール分ければいいのではという気持ちがあったりしますが、常に簡単に分割できるって感じでもないと思うので・・。逆にスレッド増え続けてしまうくらい処理が重たくなるならblockingしないほうが良い気がします。そもそもblockingで処理包むの忘れるから使えてない

Kafkaのproducerのrequest.timeout.msはどのように作用するか

気になったのでコードを追ったりしてみました。備忘録けんメモ書きです。

そもそもtimeoutがどうやって送信されてどうやってbrokerで受信されているのかが追いにくいのですが、

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L32 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/api/ProducerRequest.scala#L31 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L32-L40 https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java#L96-L100

RequestHeader.java+ProduceRequest.javaと考えてProtocol.javaに書いてあるSchemaと、ProducerRequest.readfromと照らし合わせると帳尻が合いそうです。

つまりProducerRequest.ackTimeoutMs(コンストラクタ第二引数)を追えばよさそうで、追っていくと request.timeout.ms になりそうです。

this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L256

this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L281-L290

requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L318

ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L333

以上からProducerRequest(サーバー側のコード)に渡ってくるタイムアウトrequest.timeout.msになりそうです。

此処から先は処理をきちんとおえてないのですがgrepする限り def handleProducerRequest(request: RequestChannel.Request) {...} が処理を担当していそうです。

https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/KafkaApis.scala#L298

こいつは最終的に rplicaManager.appendMessages にtimeout値を渡しています。 appendMessagesは以下にあります。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L314

appendされたメッセージはリクエスト煉獄に叩きこまれます。 https://github.com/apache/kafka/blob/0.9.0.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L343

これが実際にタイムアウトする処理はスケジュール機構なども含むので結構複雑ですが、 かなり紆余曲折があって、以下でタイムアウトします。 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/TimingWheel.scala#L131-L133

そのあと、taskExecutorにsubmitされて、runが呼ばれます。

// Already expired or cancelled
if (!timerTaskEntry.cancelled)
  taskExecutor.submit(timerTaskEntry.timerTask)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/timer/Timer.scala#L93-L94

runは forceComplete を呼ぶようになっています。(runが呼ばれるとタイムアウトしたかキャンセルしたかみたいな扱いになっていそうです。)

/*
 * run() method defines a task that is executed on timeout
 */
override def run(): Unit = {
  if (forceComplete())
    onExpiration()
}

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedOperation.scala#L105-L108

forceComplete経由で呼ばれるonCompleteでcallbackが呼ばれて、ReplicaManager#L343にもどります。

val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus)

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L123

この際statusを変更する処理などがありませんが、statusの初期値がErrors.REQUEST_TIMED_OUT になっているので不要そうです。

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedProduce.scala#L64

ということでrequest.timeout.msを短くするとack待ち時間が短くなるようです。(色々な要素を省略したので本当はもう少し色々なところでタイムアウトしたりしています・・。)

ドキュメントを見ると request.timeout.ms にはackのことが書いてないんですが

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhauste

timeout.ms には書いてあります。

The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.

そしてtimeout.msはdprecatedでREQUEST_TIMEOUT_MS_CONFIGにしろと書いてあるので役割が変わって無ければ同じ説明と認識して良さそうです。

一方で、producerのclientにも同じパラメータがわたっててそっちで能動的に切断するケースも有ります。 NetworkClientに入っているrequest.timeout.ms値がその辺の機能です。 Sender.sendは最終的にclient.sendをよぶけどその正体がこれです。(KafkaProducerでパラメータと一緒に初期化)

そいつでもhandleTimeoutしていてタイムアウトするとdisconnected=trueをレスポンスに突っ込みます。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L408-L414

その後、 Sender.handleProduceResponseNETWORK_EXCEPTIONになります。 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L260-L265

このフローだとエラーログがtraceやdebugじゃないと出なくて、例外をログにだしても The server disconnected before a response was received. というどこで何が何故切断されたのかみたいな情報が皆無なログが出るだけでつらい感じになります(;´Д`)

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L104