Loading app/controllers/elasticsearch/GetShardStats.scala 0 → 100644 +21 −0 Original line number Diff line number Diff line package controllers.elasticsearch import elastic.ElasticClient._ import elastic.ElasticResponse import models.ShardStats import scala.concurrent.ExecutionContext.Implicits.global class GetShardStats extends ElasticsearchController { def index = processRequest { request => val index = request.get("index") val node = request.get("node") val shard = request.getInt("shard") getShardStats(index, request.host).zip(getIndexRecovery(index, request.host)).map { case (indexStats, indexRecovery) => val stats = ShardStats(index, node, shard, indexStats.body, indexRecovery.body) ElasticResponse(200, stats) } } } app/elastic/ElasticClient.scala +9 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ import play.api.Play.current import play.api.libs.ws.WS import play.api.mvc.Results.EmptyContent import scala.concurrent.Future trait ElasticClient { def main(host: String) = Loading Loading @@ -69,6 +71,13 @@ trait ElasticClient { def disableShardAllocation(host: String) = putClusterSettings(allocationSettings("none"), host) def getShardStats(index: String, host: String) = ElasticResponse(WS.url(s"$host/$index/_stats?level=shards&human=true").get()) def getIndexRecovery(index: String, host: String) = ElasticResponse(WS.url(s"$host/$index/_recovery?active_only=true&human=true").get()) } object ElasticClient extends ElasticClient app/models/CerebroRequest.scala +5 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ import play.api.libs.json.JsValue class CerebroRequest(val host: String, body: JsValue) { def get(name: String) = (body \ name).asOpt[String].getOrElse(throw MissingRequiredParamException(name)) def get(name: String) = (body \ name).asOpt[String].getOrElse(throw MissingRequiredParamException(name)) def getInt(name: String) = (body \ name).asOpt[Int].getOrElse(throw MissingRequiredParamException(name)) def getArray(name: String) = (body \ name).asOpt[Array[String]].getOrElse(throw MissingRequiredParamException(name)) Loading app/models/ShardStats.scala 0 → 100644 +30 −0 Original line number Diff line number Diff line package models import play.api.libs.json.{JsArray, JsNull, JsValue} object ShardStats { def apply(index: String, node: String, shard: Int, stats: JsValue, recovery: JsValue): JsValue = { val shardStats = getShardStats(index, node, shard, stats) shardStats.getOrElse(getShardRecovery(index, node, shard, recovery).getOrElse(JsNull)) } private def getShardStats(index: String, node: String, shard: Int, stats: JsValue): Option[JsValue] = (stats \ "indices" \ index \ "shards" \ shard.toString).asOpt[JsArray] match { case Some(JsArray(shards)) => shards.collectFirst { case s if (s \ "routing" \ "node").as[String].equals(node) => s } case _ => None } private def getShardRecovery(index: String, node: String, shard: Int, recovery: JsValue): Option[JsValue] = (recovery \ "index" \ "shards").asOpt[JsArray] match { case Some(JsArray(recoveries)) => recoveries.collectFirst { case r if (r \ "target" \ "id").as[String].equals(node) && (r \ "id").as[Int].equals(shard) => r } case _ => None } } conf/routes +1 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ POST /apis/update_cluster_settings @controllers.elasticsearch.PutClust POST /apis/get_node_stats @controllers.elasticsearch.NodeStatsController.index POST /apis/disable_shard_allocation @controllers.elasticsearch.DisableShardAllocationController.index POST /apis/enable_shard_allocation @controllers.elasticsearch.EnableShardAllocationController.index POST /apis/get_shard_stats @controllers.elasticsearch.GetShardStats.index GET /apis/hosts @controllers.HostsController.index Loading Loading
app/controllers/elasticsearch/GetShardStats.scala 0 → 100644 +21 −0 Original line number Diff line number Diff line package controllers.elasticsearch import elastic.ElasticClient._ import elastic.ElasticResponse import models.ShardStats import scala.concurrent.ExecutionContext.Implicits.global class GetShardStats extends ElasticsearchController { def index = processRequest { request => val index = request.get("index") val node = request.get("node") val shard = request.getInt("shard") getShardStats(index, request.host).zip(getIndexRecovery(index, request.host)).map { case (indexStats, indexRecovery) => val stats = ShardStats(index, node, shard, indexStats.body, indexRecovery.body) ElasticResponse(200, stats) } } }
app/elastic/ElasticClient.scala +9 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ import play.api.Play.current import play.api.libs.ws.WS import play.api.mvc.Results.EmptyContent import scala.concurrent.Future trait ElasticClient { def main(host: String) = Loading Loading @@ -69,6 +71,13 @@ trait ElasticClient { def disableShardAllocation(host: String) = putClusterSettings(allocationSettings("none"), host) def getShardStats(index: String, host: String) = ElasticResponse(WS.url(s"$host/$index/_stats?level=shards&human=true").get()) def getIndexRecovery(index: String, host: String) = ElasticResponse(WS.url(s"$host/$index/_recovery?active_only=true&human=true").get()) } object ElasticClient extends ElasticClient
app/models/CerebroRequest.scala +5 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ import play.api.libs.json.JsValue class CerebroRequest(val host: String, body: JsValue) { def get(name: String) = (body \ name).asOpt[String].getOrElse(throw MissingRequiredParamException(name)) def get(name: String) = (body \ name).asOpt[String].getOrElse(throw MissingRequiredParamException(name)) def getInt(name: String) = (body \ name).asOpt[Int].getOrElse(throw MissingRequiredParamException(name)) def getArray(name: String) = (body \ name).asOpt[Array[String]].getOrElse(throw MissingRequiredParamException(name)) Loading
app/models/ShardStats.scala 0 → 100644 +30 −0 Original line number Diff line number Diff line package models import play.api.libs.json.{JsArray, JsNull, JsValue} object ShardStats { def apply(index: String, node: String, shard: Int, stats: JsValue, recovery: JsValue): JsValue = { val shardStats = getShardStats(index, node, shard, stats) shardStats.getOrElse(getShardRecovery(index, node, shard, recovery).getOrElse(JsNull)) } private def getShardStats(index: String, node: String, shard: Int, stats: JsValue): Option[JsValue] = (stats \ "indices" \ index \ "shards" \ shard.toString).asOpt[JsArray] match { case Some(JsArray(shards)) => shards.collectFirst { case s if (s \ "routing" \ "node").as[String].equals(node) => s } case _ => None } private def getShardRecovery(index: String, node: String, shard: Int, recovery: JsValue): Option[JsValue] = (recovery \ "index" \ "shards").asOpt[JsArray] match { case Some(JsArray(recoveries)) => recoveries.collectFirst { case r if (r \ "target" \ "id").as[String].equals(node) && (r \ "id").as[Int].equals(shard) => r } case _ => None } }
conf/routes +1 −0 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ POST /apis/update_cluster_settings @controllers.elasticsearch.PutClust POST /apis/get_node_stats @controllers.elasticsearch.NodeStatsController.index POST /apis/disable_shard_allocation @controllers.elasticsearch.DisableShardAllocationController.index POST /apis/enable_shard_allocation @controllers.elasticsearch.EnableShardAllocationController.index POST /apis/get_shard_stats @controllers.elasticsearch.GetShardStats.index GET /apis/hosts @controllers.HostsController.index Loading