Skip to content
Snippets Groups Projects
Commit 1a627f81 authored by Leonardo Menezes's avatar Leonardo Menezes
Browse files

optimised cluster changes api

parent 10fed447
No related branches found
No related tags found
No related merge requests found
......@@ -7,32 +7,17 @@ import elastic.{ElasticClient, Error}
import models.commons.{Indices, Nodes}
import models.{CerebroResponse, Hosts}
import play.api.libs.json.Json
import services.cluster_changes.ClusterChangesDataService
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class ClusterChangesController @Inject()(val authentication: AuthenticationModule,
val hosts: Hosts,
client: ElasticClient) extends BaseController {
service: ClusterChangesDataService) extends BaseController {
def get = process { request =>
Future.sequence(Seq(
client.getIndices(request.target),
client.getNodes(request.target),
client.main(request.target)
)).map { responses =>
val failed = responses.find(_.isInstanceOf[Error])
failed match {
case None =>
val body = Json.obj(
"indices" -> Indices(responses(0).body),
"nodes" -> Nodes(responses(1).body),
"cluster_name" -> (responses(2).body \ "cluster_name").as[String]
)
CerebroResponse(200, body)
case Some(f) =>
CerebroResponse(f.status, f.body)
}
}
service.data(request.target).map(CerebroResponse(200, _))
}
}
package services.cluster_changes
import com.google.inject.Inject
import elastic.{ElasticClient, Error}
import models.ElasticServer
import play.api.libs.json._
import services.exception.RequestFailedException
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
class ClusterChangesDataService @Inject()(client: ElasticClient) {
val apis = Seq(
"_aliases",
"_nodes/transport",
"_cluster/state/blocks"
)
def data(target: ElasticServer): Future[JsValue] = {
Future.sequence(apis.map(client.executeRequest("GET", _, None, target))).map { responses =>
responses.zipWithIndex.find(_._1.isInstanceOf[Error]) match {
case Some((response, idx)) =>
throw RequestFailedException(apis(idx), response.status, response.body.toString())
case None =>
val aliases = responses(0).body
val nodesInfo = responses(1).body
val state = responses(2).body
// open indices
val indices = aliases.as[JsObject].keys.map(JsString(_)).toSeq
val blocks = (state \ "blocks" \ "indices").asOpt[JsObject].getOrElse(Json.obj())
val closedIndices = blocks.keys.collect {
case index if (blocks \ index \ "4").asOpt[JsObject].isDefined => JsString(index)
}
val nodes = (nodesInfo \\ "name").collect {
case node: JsString => node
}.distinct
val clusterName = (state \ "cluster_name").as[JsValue]
Json.obj(
"indices" -> JsArray(indices ++ closedIndices),
"nodes" -> JsArray(nodes),
"cluster_name" -> clusterName
)
}
}
}
}
package controllers
import controllers.AnalysisControllerSpec.application
import elastic.{ElasticResponse, Success}
import elastic.Success
import models.ElasticServer
import play.api.libs.json.Json
import play.api.test.FakeRequest
......@@ -23,31 +22,104 @@ object ClusterChangesControllerSpec extends MockedServices {
"""
|{
| "cluster_name": "elasticsearch",
| "indices": ["index1", "index2"],
| "nodes": ["Shriek", "Jimaine Szardos"]
| "indices": ["bar", "foo"],
| "nodes": ["foobar", "qux"]
|}
""".stripMargin
)
val mainResponse = Json.parse("""{"cluster_name": "elasticsearch"}""")
val indicesResponse = Json.parse(
val aliasesResponse = Json.parse(
"""
|[
| {"health":"green","status":"open","index":"index1","pri":"10","rep":"0","docs.count":"4330","docs.deleted":"10","store.size":"4.1mb","pri.store.size":"4.1mb"},
| {"health":"green","status":"closed","index":"index2","pri":"10","rep":"0","docs.count":"1497203","docs.deleted":"5048","store.size":"860.9mb","pri.store.size":"860.9mb"}
|]
|{
| "bar": {
| "aliases": {}
| }
|}
""".stripMargin
)
val stateResponse = Json.parse(
"""
|{
| "cluster_name": "elasticsearch",
| "blocks" : {
| "indices" : {
| "foo" : {
| "4" : {
| "description" : "index closed",
| "retryable" : false,
| "levels" : [ "read", "write" ]
| }
| }
| }
| }
|}
""".stripMargin
)
val nodesResponse = Json.parse(
"""
|[
| {"host":"127.0.0.1","ip":"127.0.0.1","name":"Shriek"},
| {"host":"127.0.0.1","ip":"127.0.0.1","name":"Jimaine Szardos"}
|]
|{
| "_nodes": {
| "total": 1,
| "successful": 1,
| "failed": 0
| },
| "cluster_name": "elasticsearch",
| "nodes": {
| "zV4lLJgjQ6y-BkIQI0IChg": {
| "name": "foobar",
| "transport_address": "192.1.1.0:9300",
| "host": "192.1.1.0",
| "ip": "192.1.1.0",
| "version": "5.4.0",
| "build_hash": "780f8c4",
| "roles": [
| "master",
| "data",
| "ingest"
| ],
| "attributes": {
| },
| "transport": {
| "bound_address": [
| "[::]:9300"
| ],
| "publish_address": "10.12.32.25:9300",
| "profiles": {
| }
| }
| },
| "z4LJgjQ6y-BkIQI0IChg": {
| "name": "qux",
| "transport_address": "192.1.1.0:9300",
| "host": "192.1.1.0",
| "ip": "192.1.1.0",
| "version": "5.4.0",
| "build_hash": "780f8c4",
| "roles": [
| "master",
| "data",
| "ingest"
| ],
| "attributes": {
| },
| "transport": {
| "bound_address": [
| "[::]:9300"
| ],
| "publish_address": "10.12.32.25:9300",
| "profiles": {
| }
| }
| }
| }
|}
""".stripMargin
)
client.main(ElasticServer("somehost", None)) returns Future.successful(Success(200, mainResponse))
client.getNodes(ElasticServer("somehost", None)) returns Future.successful(Success(200, nodesResponse))
client.getIndices(ElasticServer("somehost", None)) returns Future.successful(Success(200, indicesResponse))
client.executeRequest("GET", "_cluster/state/blocks", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, stateResponse))
client.executeRequest("GET", "_nodes/transport", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, nodesResponse))
client.executeRequest("GET", "_aliases", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, aliasesResponse))
val response = route(application, FakeRequest(POST, "/cluster_changes").withBody(Json.obj("host" -> "somehost"))).get
ensure(response, 200, expectedResponse)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment