これはScala Advent Calendar 2017の7日目の記事です。
昨日はsmogamiさんの「Scala.js用の型定義をラクに得る scala-js-ts-importer の紹介」でした。
Play Framework でとある処理の同時実行スレッド数、キュー数を制限しつつ、それが溢れた際のエラーログ出力でハマったのでその備忘を残す。
解決したかというと、いまいち筋のよくない力技で無理やり何とかした、という対応になるが…。
以下、Scala2.12.4, Play Framework 2.6.7で動作を確認している。
昨日はsmogamiさんの「Scala.js用の型定義をラクに得る scala-js-ts-importer の紹介」でした。
Play Framework でとある処理の同時実行スレッド数、キュー数を制限しつつ、それが溢れた際のエラーログ出力でハマったのでその備忘を残す。
解決したかというと、いまいち筋のよくない力技で無理やり何とかした、という対応になるが…。
以下、Scala2.12.4, Play Framework 2.6.7で動作を確認している。
package controllers
import javax.inject.Inject
import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._
import scala.concurrent.Future
import scala.util.control.NonFatal
class Test @Inject() extends InjectedController {
private[this] val logger = Logger(this.getClass)
def index: Action[AnyContent] = Action.async { params =>
implicit val ec = ExecutionContexts.executionContext
val v = params.getQueryString("v")
Future {
save(v)
}.transform(_.map(Ok(_)).recover {
case NonFatal(e) =>
logger.error(s"faild to save. v=$v", e)
v.fold(NoContent)(Ok(_))
})
}
private[this] def save(v: Option[String]): String = {
if (v.exists(_.length > 5)) {
throw new IllegalArgumentException()
}
Thread.sleep(5000L)
v.getOrElse("")
}
}
こんなcontrollerがあるとして、save(v)の同時実行数とキュー数を制限したいとする。(エラーが起きているのに20xで返す是非は、今回の本題ではないので一旦サンプルコードはこれで…)
RejectedExecutionHandlerを指定しない(デフォルトのAbortPolicyを使用する)
ExecutionContexts.executionContextを以下のように定義した。
package com.zaneli.concurrent
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.ExecutionContext
object ExecutionContexts {
lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1))
)
}
これで、maximumPoolSizeの1とBlockingQueueのcapacityの1を超える、つまり5秒以内に3回アクセスするとエラーとなる。
が、例外をキャッチして
logger.error()でエラーログ出力するのではなく、標準エラー出力に以下が出てしまう。java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@6862f5c6 rejected from java.util.concurrent.ThreadPoolExecutor@1965e2d7[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]ThreadPoolExecutorのコンストラクタにRejectedExecutionHandlerを指定しない場合、デフォルトでAbortPolicyが設定され、それが投げる例外がRejectedExecutionException。
今回はvパラメータ文字数が5文字以上だと不正としているが、以下のようにその際のパラメータを含めてログ出力しているので、
タスク溢れの場合にも同様のログを出力したい。
[error] c.Test - faild to save. v=Some(invalid)
自前のRejectedExecutionHandler(もしくはreporter)を指定してログ出力する
ExecutionContexts.executionContextを以下のように書き替えて自前のRejectedExecutionHandlerを設定するとどうだろうか?
package com.zaneli.concurrent
import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionHandler, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext
object ExecutionContexts {
private val logger = Logger(this.getClass)
lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1), new RejectedExecutionHandler() {
override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit =
logger.error("asynchronous computation failed.")
})
)
}
ちなみにExecutionContext.fromExecutorService()には第二引数にreporterを渡す事ができ、デフォルトが_.printStackTrace()となっている。上記の例はこう書いても同じ事ができそうだ。
package com.zaneli.concurrent
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext
object ExecutionContexts {
private val logger = Logger(this.getClass)
lazy val executionContext3: ExecutionContext = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)),
logger.error("asynchronous computation failed.", _)
)
}
これで、タスクが溢れた場合にも以下のようにロガーの機能によるログ出力ができた。[error] c.z.c.ExecutionContexts - asynchronous computation failed.が、ここでキャッチすると発生時のvパラメータが分からず、もうひと工夫必要そうだ。
reporterで例外を投げ直す…
これでいいのか感はあるが、reporterはThrowable => Unitなので、引数で受け取った例外を投げ直してcontroller側でキャッチしてもらう、というのを試してみた。
package com.zaneli.concurrent
import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit}
import play.api.Logger
import scala.concurrent.ExecutionContext
object ExecutionContexts {
private val logger = Logger(this.getClass)
lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)),
throw _
)
}
controller側にも修正が必要になる。
package controllers
import javax.inject.Inject
import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._
import scala.concurrent.Future
import scala.util.control.NonFatal
class Test @Inject() extends InjectedController {
private[this] val logger = Logger(this.getClass)
def index: Action[AnyContent] = Action.async { params =>
implicit val ec = ExecutionContexts.executionContext
val v = params.getQueryString("v")
try {
Future {
save(v)
}.transform(_.map(Ok(_)).recover {
case NonFatal(e) =>
log(v, e)
})
} catch {
case NonFatal(e) =>
Future.successful(log(v, e))
}
}
private[this] def log(v: Option[String], t: Throwable): Result = {
logger.error(s"faild to save. v=$v", t)
v.fold(NoContent)(Ok(_))
}
private[this] def save(v: Option[String]): String = {
if (v.exists(_.length > 5)) {
throw new IllegalArgumentException()
}
Thread.sleep(5000L)
v.getOrElse("")
}
}
Futureを try ~ catch しないといけなかった…。しかしこれで目的は果たせるはず…。ロガーにより、以下のエラーが出力される事を確認できた。
[error] c.Test - faild to save. v=Some(valid) java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@72cc08ea rejected from java.util.concurrent.ThreadPoolExecutor@4cea79cf[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367) at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375) at scala.concurrent.impl.Promise.transform(Promise.scala:29) at scala.concurrent.impl.Promise.transform$(Promise.scala:27)が、少し遅れて標準エラー出力にも別途
RejectedExecutionExceptionが出力されてしまった。どうやら
Future.transformがダメらしい…。何とかここで発生したエラーも拾う事ができないかと試行錯誤してみたがうまくいかず、結局諦めてtransformを使わない方向で書き直す事にした…。
package controllers
import javax.inject.Inject
import com.zaneli.concurrent.ExecutionContexts
import play.api.Logger
import play.api.mvc._
import scala.concurrent.Future
import scala.util.control.NonFatal
class Test @Inject() extends InjectedController {
private[this] val logger = Logger(this.getClass)
def index: Action[AnyContent] = Action.async { params =>
implicit val ec = ExecutionContexts.executionContext
val v = params.getQueryString("v")
try {
Future {
try {
Ok(save(v))
} catch {
case NonFatal(e) =>
log(v, e)
}
}
} catch {
case NonFatal(e) =>
Future.successful(log(v, e))
}
}
private[this] def log(v: Option[String], t: Throwable): Result = {
logger.error(s"faild to save. v=$v", t)
v.fold(NoContent)(Ok(_))
}
private[this] def save(v: Option[String]): String = {
if (v.exists(_.length > 5)) {
throw new IllegalArgumentException()
}
Thread.sleep(5000L)
v.getOrElse("")
}
}
うーむ…どんどん微妙な書き方になっていくが、とりあえずエラー出力に関しては期待通りに動く事が確認できた。[error] c.Test - faild to save. v=Some(valid) java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@24f23e5 rejected from java.util.concurrent.ThreadPoolExecutor@28de1d41[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 7] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367) at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375) at scala.concurrent.impl.Promise.transform(Promise.scala:29) at scala.concurrent.impl.Promise.transform$(Promise.scala:27)
明日はy_taka_23さんの「多分 Stainless について書きます」です。