これはScala Advent Calendar 2017の7日目の記事です。
昨日はsmogamiさんの「Scala.js用の型定義をラクに得る scala-js-ts-importer の紹介」でした。
Play Framework でとある処理の同時実行スレッド数、キュー数を制限しつつ、それが溢れた際のエラーログ出力でハマったのでその備忘を残す。
解決したかというと、いまいち筋のよくない力技で無理やり何とかした、という対応になるが…。

以下、Scala2.12.4, Play Framework 2.6.7で動作を確認している。
package controllers

import javax.inject.Inject

import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._

import scala.concurrent.Future
import scala.util.control.NonFatal

class Test @Inject() extends InjectedController {

  private[this] val logger = Logger(this.getClass)

  def index: Action[AnyContent] = Action.async { params =>
    implicit val ec = ExecutionContexts.executionContext

    val v = params.getQueryString("v")
    Future {
      save(v)
    }.transform(_.map(Ok(_)).recover {
      case NonFatal(e) =>
        logger.error(s"faild to save. v=$v", e)
        v.fold(NoContent)(Ok(_))
    })
  }

  private[this] def save(v: Option[String]): String = {
    if (v.exists(_.length > 5)) {
      throw new IllegalArgumentException()
    }
    Thread.sleep(5000L)
    v.getOrElse("")
  }
}
こんなcontrollerがあるとして、save(v)の同時実行数とキュー数を制限したいとする。
(エラーが起きているのに20xで返す是非は、今回の本題ではないので一旦サンプルコードはこれで…)

RejectedExecutionHandlerを指定しない(デフォルトのAbortPolicyを使用する)

ExecutionContexts.executionContextを以下のように定義した。
package com.zaneli.concurrent

import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.ExecutionContext

object ExecutionContexts {
  lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1))
  )
}
これで、maximumPoolSizeの1とBlockingQueuecapacityの1を超える、
つまり5秒以内に3回アクセスするとエラーとなる。
が、例外をキャッチしてlogger.error()でエラーログ出力するのではなく、標準エラー出力に以下が出てしまう。
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@6862f5c6 rejected from java.util.concurrent.ThreadPoolExecutor@1965e2d7[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
ThreadPoolExecutorのコンストラクタにRejectedExecutionHandlerを指定しない場合、デフォルトでAbortPolicyが設定され、それが投げる例外がRejectedExecutionException
今回はvパラメータ文字数が5文字以上だと不正としているが、以下のようにその際のパラメータを含めてログ出力しているので、
タスク溢れの場合にも同様のログを出力したい。
[error] c.Test - faild to save. v=Some(invalid)

自前のRejectedExecutionHandler(もしくはreporter)を指定してログ出力する

ExecutionContexts.executionContextを以下のように書き替えて自前のRejectedExecutionHandlerを設定するとどうだろうか?
package com.zaneli.concurrent

import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionHandler, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext

object ExecutionContexts {
  private val logger = Logger(this.getClass)

  lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1), new RejectedExecutionHandler() {
      override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit =
        logger.error("asynchronous computation failed.")
    })
  )
}
ちなみにExecutionContext.fromExecutorService()には第二引数にreporterを渡す事ができ、デフォルトが_.printStackTrace()となっている。
上記の例はこう書いても同じ事ができそうだ。
package com.zaneli.concurrent

import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext

object ExecutionContexts {
  private val logger = Logger(this.getClass)

  lazy val executionContext3: ExecutionContext = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)),
    logger.error("asynchronous computation failed.", _)
  )
}
これで、タスクが溢れた場合にも以下のようにロガーの機能によるログ出力ができた。
[error] c.z.c.ExecutionContexts - asynchronous computation failed.
が、ここでキャッチすると発生時のvパラメータが分からず、もうひと工夫必要そうだ。

reporterで例外を投げ直す…

これでいいのか感はあるが、reporterThrowable => Unitなので、引数で受け取った例外を投げ直してcontroller側でキャッチしてもらう、というのを試してみた。
package com.zaneli.concurrent

import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext

object ExecutionContexts {
  private val logger = Logger(this.getClass)

  lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)),
    throw _
  )
}
controller側にも修正が必要になる。
package controllers

import javax.inject.Inject

import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._

import scala.concurrent.Future
import scala.util.control.NonFatal

class Test @Inject() extends InjectedController {

  private[this] val logger = Logger(this.getClass)

  def index: Action[AnyContent] = Action.async { params =>
    implicit val ec = ExecutionContexts.executionContext

    val v = params.getQueryString("v")
    try {
      Future {
        save(v)
      }.transform(_.map(Ok(_)).recover {
        case NonFatal(e) =>
          log(v, e)
      })
    } catch {
      case NonFatal(e) =>
        Future.successful(log(v, e))
    }
  }

  private[this] def log(v: Option[String], t: Throwable): Result = {
    logger.error(s"faild to save. v=$v", t)
    v.fold(NoContent)(Ok(_))
  }

  private[this] def save(v: Option[String]): String = {
    if (v.exists(_.length > 5)) {
      throw new IllegalArgumentException()
    }
    Thread.sleep(5000L)
    v.getOrElse("")
  }
}
Futureを try ~ catch しないといけなかった…。しかしこれで目的は果たせるはず…。
ロガーにより、以下のエラーが出力される事を確認できた。
[error] c.Test - faild to save. v=Some(valid)
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@72cc08ea rejected from java.util.concurrent.ThreadPoolExecutor@4cea79cf[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 2]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159)
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
  at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368)
  at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367)
  at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375)
  at scala.concurrent.impl.Promise.transform(Promise.scala:29)
  at scala.concurrent.impl.Promise.transform$(Promise.scala:27)
が、少し遅れて標準エラー出力にも別途RejectedExecutionExceptionが出力されてしまった。
どうやらFuture.transformがダメらしい…。
何とかここで発生したエラーも拾う事ができないかと試行錯誤してみたがうまくいかず、結局諦めてtransformを使わない方向で書き直す事にした…。
package controllers

import javax.inject.Inject

import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._

import scala.concurrent.Future
import scala.util.control.NonFatal

class Test @Inject() extends InjectedController {

  private[this] val logger = Logger(this.getClass)

  def index: Action[AnyContent] = Action.async { params =>
    implicit val ec = ExecutionContexts.executionContext

    val v = params.getQueryString("v")
    try {
      Future {
        try {
          Ok(save(v))
        } catch {
          case NonFatal(e) =>
            log(v, e)
        }
      }
    } catch {
      case NonFatal(e) =>
        Future.successful(log(v, e))
    }
  }

  private[this] def log(v: Option[String], t: Throwable): Result = {
    logger.error(s"faild to save. v=$v", t)
    v.fold(NoContent)(Ok(_))
  }

  private[this] def save(v: Option[String]): String = {
    if (v.exists(_.length > 5)) {
      throw new IllegalArgumentException()
    }
    Thread.sleep(5000L)
    v.getOrElse("")
  }
}
うーむ…どんどん微妙な書き方になっていくが、とりあえずエラー出力に関しては期待通りに動く事が確認できた。
[error] c.Test - faild to save. v=Some(valid)
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@24f23e5 rejected from java.util.concurrent.ThreadPoolExecutor@28de1d41[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 7]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159)
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
  at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368)
  at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367)
  at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375)
  at scala.concurrent.impl.Promise.transform(Promise.scala:29)
  at scala.concurrent.impl.Promise.transform$(Promise.scala:27)

明日はy_taka_23さんの「多分 Stainless について書きます」です。

Copyright© 2011-2018 Shunsuke Otani All Right Reserved .