DigdagのExtensionを実装する
モチベーション
個人のリポジトリでdigdag-plugin-datadogを開発していますが、これを使うユースケースは主に _error
のタスクでワークフローのエラーをDatadogに通知することであり、これは複数のワークフローが存在する場合、各ワークフローに書いて回る必要があります。
エラー処理を書いて回るのはとても面倒なので、Digdagサーバで共通のエラー通知を実現したいです。
これを実現するためにExtension機能が使えないかと思ったのがきっかけです。
Extensionとは
Digdagを設計した @frsyuki さんのツイートを引用させていただきます🙇
Digdag Extensionは、GuiceのModuleを仕込める物で、Guiceの起動前にロードされ、システム全体のGuiceに対して自由に影響できる:https://t.co/UKCAwjyifz
— Sadayuki Furuhashi (@frsyuki) May 19, 2017
このExtensionを使ってエラー通知の振る舞いを変更したいです。
結論
今回の結論を話してしまうと、掲題の「ワークフロー共通のエラー通知の実現」という面だけであれば、Digdagのnotification機能を使ってやるのが一番早いと思います。
少なくとも自分がやろうとした手段ではエラー時の取れる情報も変わりません。後述しますが、Extensionの導入は少し複雑な面もあり、notification.shell
を使うほうが比較的シンプルです。
ただ、ExtensionにはDigdag全体の振る舞いに対して影響を及ぼすことが可能なので、カスタマイズの可能性はとても高く、このユースケース以外でも色々作れそうな気がします。
成果物
サンプルのリポジトリはこちら
以下、実装方法に関する詳細です。
ビルド設定
今回、ExtensionはScalaで実装します。
build.sbt
に digdag-spi
を Provided
で追加しておきます。
lazy val root = (project in file(".")) .settings( name := "digdag-extension-example", libraryDependencies ++= Seq( "io.digdag" % "digdag-spi" % "0.9.42" % Provided, "org.scalatest" %% "scalatest" % "3.0.8" % Test ) )
Extensionの実装
まずはExtension本体の実装です。
io.digdag.spi.Extension
を実装したクラス、GuiceのModule、実際にDIしたいクラスを追加します。
今回はワークフローがエラーで終了した際に通知を送る際に実行される NotificationSender
を追加し、Names.named
で名前をつけてやります。(サンプルはただログを出すだけ)
そうすることで、notification機能のデフォルト実装である DefaultNotifier
は notification.type
プロパティを参照し、その名前のアノテーションのついた NotifierSender
の実装を使ってくれます。
package dev.nomadblacky.digdag_extension_example import java.util import com.google.common.collect.ImmutableList import com.google.inject.name.Names import com.google.inject.{Binder, Module} import io.digdag.spi.{Extension, Notification, NotificationSender} import org.slf4j.LoggerFactory class ExampleExtension extends Extension { override def getModules: util.List[Module] = ImmutableList.of(ExampleModule) } object ExampleModule extends Module { override def configure(binder: Binder): Unit = binder .bind(classOf[NotificationSender]) .annotatedWith(Names.named("example")) .to(classOf[ExampleNotificationSender]) } class ExampleNotificationSender extends NotificationSender { private val logger = LoggerFactory.getLogger(classOf[ExampleNotificationSender]) override def sendNotification(notification: Notification): Unit = { logger.info("========== ExampleNotificationSender ==========") logger.info(notification.toString) logger.info("===============================================") } }
ServiceLoader の設定
DigdagのExtensionおよびPluginの機構はJava標準ライブラリに含まれるServiceLoaderという仕組みで実装が追加されるようになっています。
Digdagに対して「このプラグインを読み込んでね」と教えるために以下のようにリソースファイルを追加します。
$ cat src/main/resources/META-INF/services/io.digdag.spi.Extension
dev.nomadblacky.digdag_extension_example.ExampleExtension
Extensionのパブリッシュ
基本的なExtensionの実装は以上です。これを実際にDigdagに読み込めるようにするべく、Jarに固めておきます。
$ sbt publishLocal
$ tree -L 1 target/scala-2.13/
target/scala-2.13/
├── api
├── classes
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT-javadoc.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT-sources.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT.pom
├── ivy-0.1.0-SNAPSHOT.xml
├── resolution-cache
└── update
4 directories, 5 files
今回は、digdag-spi
以外に依存がなかったのでこれで良いですが、これ以外の依存が含まれる場合は assembly
などでライブラリを含めた fatjar などを作ってやる必要がありそうです。
ワークフローを実行
さて、Extensionを使ってワークフローを実行しましょう。
今回は fail>
するだけの簡単なワークフローを用意します。
timezone: UTC +oops: fail>: Oops!!!
Extensionを読み込む方法ですが、現状は java
コマンドの -cp
オプションで直接クラスパスに含める必要があります。
以下のように digdag
と今回作成したExtensionのJarをクラスパスに含めてDigdagのCLIを実行します。
$ java -cp $(which digdag):target/scala-2.13/digdag-extension-example_2.13-0.1.0-SNAPSHOT.jar \
io.digdag.cli.Main run --no-save example.dig --config digdag.properties
以下のようにログが流れます。
2020-07-20 20:26:24 +0900: Digdag v0.9.41
2020-07-20 20:26:26 +0900 [WARN] (main): Using a new session time 2020-07-20T00:00:00+00:00.
2020-07-20 20:26:26 +0900 [INFO] (main): Starting a new session project id=1 workflow name=example session_time=2020-07-20T00:00:00+00:00
2020-07-20 20:26:27 +0900 [INFO] (0017@[0:default]+example+oops): fail>: Oops!!!
2020-07-20 20:26:27 +0900 [ERROR] (0017@[0:default]+example+oops): Task +example+oops failed.
Oops!!!
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): type: notify
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): ========== ExampleNotificationSender ==========
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): Notification{timestamp=2020-07-20T11:26:28.154Z, message=Workflow session attempt failed, siteId=0, projectId=1, projectName=default, workflowName=example, revision=2020-07-20T11:26:26.466Z, attemptId=1, sessionId=1, taskName=+example^failure-alert, timeZone=UTC, sessionUuid=b3ceed18-aeed-4b75-8e1c-f9124b13d34b, sessionTime=2020-07-20T00:00Z}
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): ===============================================
error:
* +example+oops:
Oops!!!
しっかりとログが流れていますね。
Notification
の内容は notification.type=shell
などと同じ内容で、残念ながら現状は原因となったエラーメッセージとスタックトレースは取得できません。
どうやらここばかりはDigdagの実装自体を変更する必要がありそうです。
ただ、どのワークフローのどのセッションでエラーが発生したかなど、最低限必要な情報は揃っているので一旦はこれで通知を実装してやっても十分かなと思います。
最後に
今回はExtensionの実装にScalaを用いましたが、JVMのクラスパスに直接含める必要があるぶん、これ以外のExtensionやPluginにバイナリ互換製のないScalaバージョンが使われていたときに何か悪さを起こしてしまうかもしれません。
(このあたり、ClassLoader周りに詳しくないので間違っていることを言ってるかもしれません…)
なにより、Extensionを使うことでDigdagの色々な振る舞いをいじれることがわかったのは大きいです。
Pluginなどでは痒くて届かなかった部分に手を出すことができるようになるかもしれません。
参考にしたもの
- https://github.com/yoyama/digdag-wait-op
- https://twitter.com/frsyuki/status/865462007163637762
- https://github.com/treasure-data/digdag
- https://github.com/treasure-data/digdag/blob/6547542a274ac1a30c88c82e509ac0742d6d872e/digdag-core/src/main/java/io/digdag/core/ExtensionServiceLoaderModule.java#L32
- https://github.com/treasure-data/digdag/blob/d02fb5e6d757979a979fde548b7ccf3d1fc5716e/digdag-standards/src/main/java/io/digdag/standards/StandardsExtension.java
- https://github.com/treasure-data/digdag/blob/a7039bc76c3de08022411b8897517757bec42172/digdag-core/src/main/java/io/digdag/core/DigdagEmbed.java#L212
- https://github.com/treasure-data/digdag/blob/6547542a274ac1a30c88c82e509ac0742d6d872e/digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3StorageExtension.java