これはScala Advent Calendar 2017の7日目の記事です。
昨日はsmogamiさんの「Scala.js用の型定義をラクに得る scala-js-ts-importer の紹介」でした。
Play Framework でとある処理の同時実行スレッド数、キュー数を制限しつつ、それが溢れた際のエラーログ出力でハマったのでその備忘を残す。
解決したかというと、いまいち筋のよくない力技で無理やり何とかした、という対応になるが…。
以下、Scala2.12.4, Play Framework 2.6.7で動作を確認している。
昨日はsmogamiさんの「Scala.js用の型定義をラクに得る scala-js-ts-importer の紹介」でした。
Play Framework でとある処理の同時実行スレッド数、キュー数を制限しつつ、それが溢れた際のエラーログ出力でハマったのでその備忘を残す。
解決したかというと、いまいち筋のよくない力技で無理やり何とかした、という対応になるが…。
以下、Scala2.12.4, Play Framework 2.6.7で動作を確認している。
package controllers import javax.inject.Inject import com.zaneli.concurrent.ExecutionContexts import play.api.Logger import play.api.mvc._ import scala.concurrent.Future import scala.util.control.NonFatal class Test @Inject() extends InjectedController { private[this] val logger = Logger(this.getClass) def index: Action[AnyContent] = Action.async { params => implicit val ec = ExecutionContexts.executionContext val v = params.getQueryString("v") Future { save(v) }.transform(_.map(Ok(_)).recover { case NonFatal(e) => logger.error(s"faild to save. v=$v", e) v.fold(NoContent)(Ok(_)) }) } private[this] def save(v: Option[String]): String = { if (v.exists(_.length > 5)) { throw new IllegalArgumentException() } Thread.sleep(5000L) v.getOrElse("") } }こんなcontrollerがあるとして、
save(v)
の同時実行数とキュー数を制限したいとする。(エラーが起きているのに20xで返す是非は、今回の本題ではないので一旦サンプルコードはこれで…)
RejectedExecutionHandlerを指定しない(デフォルトのAbortPolicyを使用する)
ExecutionContexts.executionContext
を以下のように定義した。package com.zaneli.concurrent import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit} import scala.concurrent.ExecutionContext object ExecutionContexts { lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)) ) }これで、
maximumPoolSize
の1とBlockingQueue
のcapacity
の1を超える、つまり5秒以内に3回アクセスするとエラーとなる。
が、例外をキャッチして
logger.error()
でエラーログ出力するのではなく、標準エラー出力に以下が出てしまう。java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@6862f5c6 rejected from java.util.concurrent.ThreadPoolExecutor@1965e2d7[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]ThreadPoolExecutorのコンストラクタにRejectedExecutionHandlerを指定しない場合、デフォルトでAbortPolicyが設定され、それが投げる例外がRejectedExecutionException。
今回はvパラメータ文字数が5文字以上だと不正としているが、以下のようにその際のパラメータを含めてログ出力しているので、
タスク溢れの場合にも同様のログを出力したい。
[error] c.Test - faild to save. v=Some(invalid)
自前のRejectedExecutionHandler(もしくはreporter)を指定してログ出力する
ExecutionContexts.executionContext
を以下のように書き替えて自前のRejectedExecutionHandler
を設定するとどうだろうか?package com.zaneli.concurrent import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionHandler, ThreadPoolExecutor, TimeUnit} import play.api.Logger import scala.concurrent.ExecutionContext object ExecutionContexts { private val logger = Logger(this.getClass) lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1), new RejectedExecutionHandler() { override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = logger.error("asynchronous computation failed.") }) ) }ちなみにExecutionContext.fromExecutorService()には第二引数に
reporter
を渡す事ができ、デフォルトが_.printStackTrace()
となっている。上記の例はこう書いても同じ事ができそうだ。
package com.zaneli.concurrent import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit} import play.api.Logger import scala.concurrent.ExecutionContext object ExecutionContexts { private val logger = Logger(this.getClass) lazy val executionContext3: ExecutionContext = ExecutionContext.fromExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)), logger.error("asynchronous computation failed.", _) ) }これで、タスクが溢れた場合にも以下のようにロガーの機能によるログ出力ができた。
[error] c.z.c.ExecutionContexts - asynchronous computation failed.が、ここでキャッチすると発生時のvパラメータが分からず、もうひと工夫必要そうだ。
reporterで例外を投げ直す…
これでいいのか感はあるが、reporter
はThrowable => Unit
なので、引数で受け取った例外を投げ直してcontroller側でキャッチしてもらう、というのを試してみた。package com.zaneli.concurrent import java.util.concurrent.{ArrayBlockingQueue, ThreadPoolExecutor, TimeUnit} import play.api.Logger import scala.concurrent.ExecutionContext object ExecutionContexts { private val logger = Logger(this.getClass) lazy val executionContext: ExecutionContext = ExecutionContext.fromExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MICROSECONDS, new ArrayBlockingQueue[Runnable](1)), throw _ ) }controller側にも修正が必要になる。
package controllers import javax.inject.Inject import com.zaneli.concurrent.ExecutionContexts import play.api.Logger import play.api.mvc._ import scala.concurrent.Future import scala.util.control.NonFatal class Test @Inject() extends InjectedController { private[this] val logger = Logger(this.getClass) def index: Action[AnyContent] = Action.async { params => implicit val ec = ExecutionContexts.executionContext val v = params.getQueryString("v") try { Future { save(v) }.transform(_.map(Ok(_)).recover { case NonFatal(e) => log(v, e) }) } catch { case NonFatal(e) => Future.successful(log(v, e)) } } private[this] def log(v: Option[String], t: Throwable): Result = { logger.error(s"faild to save. v=$v", t) v.fold(NoContent)(Ok(_)) } private[this] def save(v: Option[String]): String = { if (v.exists(_.length > 5)) { throw new IllegalArgumentException() } Thread.sleep(5000L) v.getOrElse("") } }
Future
を try ~ catch しないといけなかった…。しかしこれで目的は果たせるはず…。ロガーにより、以下のエラーが出力される事を確認できた。
[error] c.Test - faild to save. v=Some(valid) java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@72cc08ea rejected from java.util.concurrent.ThreadPoolExecutor@4cea79cf[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367) at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375) at scala.concurrent.impl.Promise.transform(Promise.scala:29) at scala.concurrent.impl.Promise.transform$(Promise.scala:27)が、少し遅れて標準エラー出力にも別途
RejectedExecutionException
が出力されてしまった。どうやら
Future.transform
がダメらしい…。何とかここで発生したエラーも拾う事ができないかと試行錯誤してみたがうまくいかず、結局諦めてtransformを使わない方向で書き直す事にした…。
package controllers import javax.inject.Inject import com.zaneli.concurrent.ExecutionContexts import play.api.Logger import play.api.mvc._ import scala.concurrent.Future import scala.util.control.NonFatal class Test @Inject() extends InjectedController { private[this] val logger = Logger(this.getClass) def index: Action[AnyContent] = Action.async { params => implicit val ec = ExecutionContexts.executionContext val v = params.getQueryString("v") try { Future { try { Ok(save(v)) } catch { case NonFatal(e) => log(v, e) } } } catch { case NonFatal(e) => Future.successful(log(v, e)) } } private[this] def log(v: Option[String], t: Throwable): Result = { logger.error(s"faild to save. v=$v", t) v.fold(NoContent)(Ok(_)) } private[this] def save(v: Option[String]): String = { if (v.exists(_.length > 5)) { throw new IllegalArgumentException() } Thread.sleep(5000L) v.getOrElse("") } }うーむ…どんどん微妙な書き方になっていくが、とりあえずエラー出力に関しては期待通りに動く事が確認できた。
[error] c.Test - faild to save. v=Some(valid) java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@24f23e5 rejected from java.util.concurrent.ThreadPoolExecutor@28de1d41[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 7] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:159) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368) at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367) at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375) at scala.concurrent.impl.Promise.transform(Promise.scala:29) at scala.concurrent.impl.Promise.transform$(Promise.scala:27)
明日はy_taka_23さんの「多分 Stainless について書きます」です。