これはScala Advent Calendar 2015, Embulk Advent Calendar 2015, Couchbase Advent Calendar 2015の14日目の記事です。
昨日はOE_uiaさんの「Scala標準のPromiseがAndroidで便利だという話」(Scala),
daichikeさんの「N1QLがJSONデータモデルの柔軟さを引き出す」(Couchbase)でした。

Couchbaseへの書き込みを行うEmbulkのOutputプラグインをScalaで書いてみたのでその紹介と手探り感を記事にした。
EmbulkプラグインはJavaまたはRubyで書くことができるが、すでにJavaプラグインをScalaで書く試みをされている方はいるようで、
embulkのpluginをScalaで作ってみようと思った時の備忘録や、
実際にScalaで書かれているembulk-output-aerospikeなどを参考にさせていただいた。

GitHubリポジトリ

プラグインの実装

Javaプラグインの雛形を作成
embulk new java-output couchbase
で、embulk-output-couchbase ディレクトリ以下に作成されたOutputプラグインの雛形を元に、JavaソースファイルをScalaに書き換えていく。
Java -> Scalaへの変換
build.gradle のpluginsid "scala"を、dependenciescompile "org.scala-lang:scala-library:2.11.7"を追加する。
src/main/java 以下の CouchbaseOutputPlugin.java を CouchbaseOutputPlugin.scala に変更し、 src/main/scala 以下に置く。

./gradlew compileScala
でコンパイルが通るところまでの変更をおこなったのがこちら。884e267

JavaのOptionalをScalaのOptionとして扱いたいと思い、scala-java8-compatの使用も検討したが、
Embulkの設定ファイル読み取りに使用しているのはjava.util.Optionalではなくcom.google.common.base.Optionalだったので
そのまま使うことができず、
大した処理でもないので自前でOptionConvertersを書いた。
Couchbaseへの書き込み
couchbase-java-clientを使用して、Inputから渡ってきたデータを1レコード1 JsonDocument として書き込む。

JsonDocument にする際のID文字列をどう扱うかという問題があったが、設定ファイルでIDに使用するカラムを指定することにしてみた。
キーを除くカラムは全て JsonObject 化して書き込むことにした。

org.embulk.spi.time.Timestampをどのように扱うか、JavaのDateなどに変換するか、と思ったが、
Couchbase的には日付型が存在せず、JavaクライアントでもJsonValue.checkTypeで書き込める型をチェックしている。
(参考:「Storing Date in Couchbase」)
今回は手抜きでTimestamp.toEpochMilliしたLong値として書き込むことにしたが、
設定ファイルで変換方法を指定させるなどしたほうが良かっただろうか?
もしくは、Timestamp からそれ以外の型への変換はInputかFilterで行うことを前提として、Timestampが含まれていれば例外を投げるなど、
embulk-output-couchbase では Timestamp はサポートしない、という扱いにしてしまっても良かったかもしれない。

実行

embulk run -L /path/to/embulk-output-couchbase config.yml
で実行する。
embulk exampleで生成されるサンプル入力データをCouchbaseに書き込んだ結果はこうなった。


ログは以下のように処理したレコード数を出すことにしてみた。
[INFO] (main): Next config diff: {"in":{"last_path":"/Users/zaneli/embulk/try1/csv/sample_01.csv.gz"},"out":{"rans":4}}
書き込みが失敗した場合は以下のように。
[INFO] (main): Next config diff: {"in":{"last_path":"/Users/zaneli/embulk/try1/csv/sample_01.csv.gz"},"out":{"rans":4,"failures":[{"id":"embulk_4","cause":"com.couchbase.client.java.error.DocumentAlreadyExistsException"},{"id":"embulk_1","cause":"com.couchbase.client.java.error.DocumentAlreadyExistsException"},{"id":"embulk_2","cause":"com.couchbase.client.java.error.DocumentAlreadyExistsException"},{"id":"embulk_3","cause":"com.couchbase.client.java.error.DocumentAlreadyExistsException"}]}}
この辺はまだどうするのが良いかイマイチ分かっていない…
他のプラグインを参考にしようとしてみたが、TaskReport に何も設定せず返しているものも少なくないようだ。

ユニットテスト

GradleでScalaTestを実行する
さて、ユニットテストをScalaTestで書いてみよう。
dependenciestestCompile "org.scalatest:scalatest_2.11:2.2.5"を追加するだけでいいかと思ったが、
これだけでは GradleのtestタスクでScalaTestで書いたテストは実行してくれないようだ。
これを実現する方法はいくつかあるようだったが、今回はGradleプラグインのgradle-scalatestを使用することにした。
./gradlew test
でテストが実行されるのを確認できた。
テストケース内でEmbulkを実行する
さて、自作のOutputプラグインの挙動をチェックできるテストを書きたかったのだが…
公開されている既存のプラグインにテストコードつきのものがあまりなく、やり方がよく分からない。
(参考:「テストコードのついているEmbulkプラグイン一覧(2015年6月版)」)

embulk-output-oracleにEmbulkPluginTester.javaを用意して、その中でEmbulkEmbedを実行するテストがあったので、
見様見真似でやってみることにする。
が…これがなかなか上手くいかない。
同じようなコードをScalaで書いて実行したつもりが、以下のようなエラーが出る…。
  com.google.inject.ConfigurationException: Guice configuration errors:

1) No implementation for java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess() was bound.
  while locating java.util.Set<org.embulk.plugin.PluginType> annotated with @org.embulk.exec.ForGuess()
    for parameter 1 at org.embulk.exec.GuessExecutor.<init>(GuessExecutor.java:69)
  while locating org.embulk.exec.GuessExecutor

1 error
  at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1042)
  at com.google.inject.internal.InjectorImpl.getProvider(InjectorImpl.java:1001)
  at com.google.inject.internal.InjectorImpl.getInstance(InjectorImpl.java:1051)
  at org.embulk.guice.InjectorProxy.getInstance(InjectorProxy.java:113)
  at org.embulk.EmbulkEmbed.<init>(EmbulkEmbed.java:130)
  at org.embulk.EmbulkEmbed$Bootstrap.build(EmbulkEmbed.java:116)
  at org.embulk.EmbulkEmbed$Bootstrap.initializeCloseable(EmbulkEmbed.java:96)
  at org.embulk.output.couchbase.EmbulkPluginTester$.embulk$lzycompute(EmbulkPluginTester.scala:17)
  at org.embulk.output.couchbase.EmbulkPluginTester$.embulk(EmbulkPluginTester.scala:17)
  at org.embulk.output.couchbase.EmbulkPluginTester$.run(EmbulkPluginTester.scala:20)
Embulk本体のコードを読んでみて色々試した結果、
プラグインの登録と同じようにGuessExecutor.registerDefaultGuessPluginToを呼んでおくことで回避できた。
が、これでいいのかよく分かっていないし完全におまじないコードと化している…。

Outputプラグインの動作のみ確認できれば良かったのだが、
テストの際に都合の良いInputデータを作るためにダミーのInputプラグインも用意することにした。
設定ファイルに指定したデータをそのままInputデータとして読み取って以降の処理に渡すプラグインとなっている。

かなり手探り手探りではあるが、何とかユニットテストも書くことができた。
Travis CI で Couchbaseのセットアップをしてテストを流す
最後に、Travis CIでテストを実行できるようにする。
Gradleのtestタスクとしてテストを実行できるようになっているので、ここでのネックはどのようにしてTravis上でCouchbaseをセットアップするか。
これについてはcouchbase-structuresを参考に、
Couchbase Serverのインストールと起動を行い、couchbase-cliで初期設定・テスト用バケットの作成を行い、うまくいった。

というわけで、何とかプラグインの作成からテストまで一通りできた!

所感

(3Advent Calendar兼用の記事なのにEmbulkの話ばかりで恐縮だが)
ユニットテストを書くのが結構大変な印象があったので、もう少し公式にテストサポートがあると嬉しいように思う。
EmbulkPluginTester的なものを公式に提供してもらうとか…?

また、PageReader.readXXXした結果を型に応じて保持しておくクラスをこのように書いてみたが、
他のプラグインでも同じような処理をそれぞれ実装しているような感じだったので(e.g. embulk-output-aerospike)
これも公式に用意してもらえれば便利かもしれない。
(…と書いていて気づいたけど、今回のような場合はorg.embulk.spi.ColumnVisitorで処理していけば事足りる話かも…?)

何か誤りなどあれば、ブログにコメント欄とか無いのでTwitterか何かで教えていただけるとありがたいです。

明日はKuchitamaさんの「Play2.4+Swaggerでフロントエンドエンジニアとコミュニケーションする」(Scala),
kamatama_41さんの「今年作ったEmbulkプラグインのご紹介(input-remote, filter-hash)」(Embulk),
ijokarumawakさんの「N1QL: DMLのRETURNING句を調査しながらGo言語の基礎を学ぶ」(Couchbase)です。

Copyright© 2011-2019 Shunsuke Otani All Right Reserved .