ぶらっ記ぃ

日本語の練習をしています

DigdagのExtensionを実装する

モチベーション

個人のリポジトリdigdag-plugin-datadogを開発していますが、これを使うユースケースは主に _error のタスクでワークフローのエラーをDatadogに通知することであり、これは複数のワークフローが存在する場合、各ワークフローに書いて回る必要があります。
エラー処理を書いて回るのはとても面倒なので、Digdagサーバで共通のエラー通知を実現したいです。
これを実現するためにExtension機能が使えないかと思ったのがきっかけです。

Extensionとは

Digdagを設計した @frsyuki さんのツイートを引用させていただきます🙇

このExtensionを使ってエラー通知の振る舞いを変更したいです。

結論

今回の結論を話してしまうと、掲題の「ワークフロー共通のエラー通知の実現」という面だけであれば、Digdagのnotification機能を使ってやるのが一番早いと思います。
少なくとも自分がやろうとした手段ではエラー時の取れる情報も変わりません。後述しますが、Extensionの導入は少し複雑な面もあり、notification.shell を使うほうが比較的シンプルです。
ただ、ExtensionにはDigdag全体の振る舞いに対して影響を及ぼすことが可能なので、カスタマイズの可能性はとても高く、このユースケース以外でも色々作れそうな気がします。

成果物

サンプルのリポジトリはこちら

以下、実装方法に関する詳細です。

ビルド設定

今回、ExtensionはScalaで実装します。 build.sbtdigdag-spiProvided で追加しておきます。

lazy val root = (project in file("."))
  .settings(
    name := "digdag-extension-example",
    libraryDependencies ++= Seq(
        "io.digdag"     % "digdag-spi" % "0.9.42" % Provided,
        "org.scalatest" %% "scalatest" % "3.0.8"  % Test
      )
  )

Extensionの実装

まずはExtension本体の実装です。
io.digdag.spi.Extension を実装したクラス、GuiceのModule、実際にDIしたいクラスを追加します。
今回はワークフローがエラーで終了した際に通知を送る際に実行される NotificationSender を追加し、Names.named で名前をつけてやります。(サンプルはただログを出すだけ)
そうすることで、notification機能のデフォルト実装である DefaultNotifiernotification.type プロパティを参照し、その名前のアノテーションのついた NotifierSender の実装を使ってくれます。

package dev.nomadblacky.digdag_extension_example
import java.util
    
import com.google.common.collect.ImmutableList
import com.google.inject.name.Names
import com.google.inject.{Binder, Module}
import io.digdag.spi.{Extension, Notification, NotificationSender}
import org.slf4j.LoggerFactory
    
class ExampleExtension extends Extension {
  override def getModules: util.List[Module] = ImmutableList.of(ExampleModule)
}
    
object ExampleModule extends Module {
  override def configure(binder: Binder): Unit =
    binder
      .bind(classOf[NotificationSender])
      .annotatedWith(Names.named("example"))
      .to(classOf[ExampleNotificationSender])
}
    
class ExampleNotificationSender extends NotificationSender {
  private val logger = LoggerFactory.getLogger(classOf[ExampleNotificationSender])

  override def sendNotification(notification: Notification): Unit = {
    logger.info("========== ExampleNotificationSender ==========")
    logger.info(notification.toString)
    logger.info("===============================================")
  }
}

ServiceLoader の設定

DigdagのExtensionおよびPluginの機構はJava標準ライブラリに含まれるServiceLoaderという仕組みで実装が追加されるようになっています。
Digdagに対して「このプラグインを読み込んでね」と教えるために以下のようにリソースファイルを追加します。

$ cat src/main/resources/META-INF/services/io.digdag.spi.Extension
dev.nomadblacky.digdag_extension_example.ExampleExtension

Extensionのパブリッシュ

基本的なExtensionの実装は以上です。これを実際にDigdagに読み込めるようにするべく、Jarに固めておきます。

$ sbt publishLocal
$ tree -L 1 target/scala-2.13/
target/scala-2.13/
├── api
├── classes
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT-javadoc.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT-sources.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT.jar
├── digdag-extension-example_2.13-0.1.0-SNAPSHOT.pom
├── ivy-0.1.0-SNAPSHOT.xml
├── resolution-cache
└── update

4 directories, 5 files

今回は、digdag-spi 以外に依存がなかったのでこれで良いですが、これ以外の依存が含まれる場合は assembly などでライブラリを含めた fatjar などを作ってやる必要がありそうです。

ワークフローを実行

さて、Extensionを使ってワークフローを実行しましょう。
今回は fail> するだけの簡単なワークフローを用意します。

timezone: UTC

+oops:
  fail>: Oops!!!

Extensionを読み込む方法ですが、現状は java コマンドの -cp オプションで直接クラスパスに含める必要があります。
以下のように digdag と今回作成したExtensionのJarをクラスパスに含めてDigdagのCLIを実行します。

$ java -cp $(which digdag):target/scala-2.13/digdag-extension-example_2.13-0.1.0-SNAPSHOT.jar \
  io.digdag.cli.Main run --no-save example.dig --config digdag.properties

以下のようにログが流れます。

2020-07-20 20:26:24 +0900: Digdag v0.9.41
2020-07-20 20:26:26 +0900 [WARN] (main): Using a new session time 2020-07-20T00:00:00+00:00.
2020-07-20 20:26:26 +0900 [INFO] (main): Starting a new session project id=1 workflow name=example session_time=2020-07-20T00:00:00+00:00
2020-07-20 20:26:27 +0900 [INFO] (0017@[0:default]+example+oops): fail>: Oops!!!
2020-07-20 20:26:27 +0900 [ERROR] (0017@[0:default]+example+oops): Task +example+oops failed.
Oops!!!
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): type: notify
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): ========== ExampleNotificationSender ==========
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): Notification{timestamp=2020-07-20T11:26:28.154Z, message=Workflow session attempt failed, siteId=0, projectId=1, projectName=default, workflowName=example, revision=2020-07-20T11:26:26.466Z, attemptId=1, sessionId=1, taskName=+example^failure-alert, timeZone=UTC, sessionUuid=b3ceed18-aeed-4b75-8e1c-f9124b13d34b, sessionTime=2020-07-20T00:00Z}
2020-07-20 20:26:28 +0900 [INFO] (0017@[0:default]+example^failure-alert): ===============================================
error:
  * +example+oops:
    Oops!!!

しっかりとログが流れていますね。
Notification の内容は notification.type=shell などと同じ内容で、残念ながら現状は原因となったエラーメッセージとスタックトレースは取得できません。
どうやらここばかりはDigdagの実装自体を変更する必要がありそうです。
ただ、どのワークフローのどのセッションでエラーが発生したかなど、最低限必要な情報は揃っているので一旦はこれで通知を実装してやっても十分かなと思います。

最後に

今回はExtensionの実装にScalaを用いましたが、JVMのクラスパスに直接含める必要があるぶん、これ以外のExtensionやPluginにバイナリ互換製のないScalaバージョンが使われていたときに何か悪さを起こしてしまうかもしれません。
(このあたり、ClassLoader周りに詳しくないので間違っていることを言ってるかもしれません…)

なにより、Extensionを使うことでDigdagの色々な振る舞いをいじれることがわかったのは大きいです。
Pluginなどでは痒くて届かなかった部分に手を出すことができるようになるかもしれません。

参考にしたもの