diff --git a/app/controllers/ClusterChangesController.scala b/app/controllers/ClusterChangesController.scala index 8fc9e646633fdd818dcf307d83508c2b9e40e0ba..687b69bc5b3ac1837d8aec14ed62e73b12122bd9 100644 --- a/app/controllers/ClusterChangesController.scala +++ b/app/controllers/ClusterChangesController.scala @@ -7,32 +7,17 @@ import elastic.{ElasticClient, Error} import models.commons.{Indices, Nodes} import models.{CerebroResponse, Hosts} import play.api.libs.json.Json +import services.cluster_changes.ClusterChangesDataService import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future class ClusterChangesController @Inject()(val authentication: AuthenticationModule, val hosts: Hosts, - client: ElasticClient) extends BaseController { + service: ClusterChangesDataService) extends BaseController { def get = process { request => - Future.sequence(Seq( - client.getIndices(request.target), - client.getNodes(request.target), - client.main(request.target) - )).map { responses => - val failed = responses.find(_.isInstanceOf[Error]) - failed match { - case None => - val body = Json.obj( - "indices" -> Indices(responses(0).body), - "nodes" -> Nodes(responses(1).body), - "cluster_name" -> (responses(2).body \ "cluster_name").as[String] - ) - CerebroResponse(200, body) - case Some(f) => - CerebroResponse(f.status, f.body) - } - } + service.data(request.target).map(CerebroResponse(200, _)) } + } diff --git a/app/services/cluster_changes/ClusterChangesDataService.scala b/app/services/cluster_changes/ClusterChangesDataService.scala new file mode 100644 index 0000000000000000000000000000000000000000..f8b60f272838bbc6171ddb045a499837fdfc7b1f --- /dev/null +++ b/app/services/cluster_changes/ClusterChangesDataService.scala @@ -0,0 +1,53 @@ +package services.cluster_changes + +import com.google.inject.Inject +import elastic.{ElasticClient, Error} +import models.ElasticServer +import play.api.libs.json._ +import services.exception.RequestFailedException + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future + +class ClusterChangesDataService @Inject()(client: ElasticClient) { + + val apis = Seq( + "_aliases", + "_nodes/transport", + "_cluster/state/blocks" + ) + + def data(target: ElasticServer): Future[JsValue] = { + Future.sequence(apis.map(client.executeRequest("GET", _, None, target))).map { responses => + responses.zipWithIndex.find(_._1.isInstanceOf[Error]) match { + case Some((response, idx)) => + throw RequestFailedException(apis(idx), response.status, response.body.toString()) + case None => + val aliases = responses(0).body + val nodesInfo = responses(1).body + val state = responses(2).body + + // open indices + val indices = aliases.as[JsObject].keys.map(JsString(_)).toSeq + + val blocks = (state \ "blocks" \ "indices").asOpt[JsObject].getOrElse(Json.obj()) + val closedIndices = blocks.keys.collect { + case index if (blocks \ index \ "4").asOpt[JsObject].isDefined => JsString(index) + } + + val nodes = (nodesInfo \\ "name").collect { + case node: JsString => node + }.distinct + + val clusterName = (state \ "cluster_name").as[JsValue] + Json.obj( + "indices" -> JsArray(indices ++ closedIndices), + "nodes" -> JsArray(nodes), + "cluster_name" -> clusterName + ) + } + + } + } + +} diff --git a/test/controllers/ClusterChangesControllerSpec.scala b/test/controllers/ClusterChangesControllerSpec.scala index d7a9c7f7c1fe141aa0ee9efe8650b5521667b5e4..a9ee779fe26ec45a41a1fe44018176cc3afe308e 100644 --- a/test/controllers/ClusterChangesControllerSpec.scala +++ b/test/controllers/ClusterChangesControllerSpec.scala @@ -1,7 +1,6 @@ package controllers -import controllers.AnalysisControllerSpec.application -import elastic.{ElasticResponse, Success} +import elastic.Success import models.ElasticServer import play.api.libs.json.Json import play.api.test.FakeRequest @@ -23,31 +22,104 @@ object ClusterChangesControllerSpec extends MockedServices { """ |{ | "cluster_name": "elasticsearch", - | "indices": ["index1", "index2"], - | "nodes": ["Shriek", "Jimaine Szardos"] + | "indices": ["bar", "foo"], + | "nodes": ["foobar", "qux"] |} """.stripMargin ) - val mainResponse = Json.parse("""{"cluster_name": "elasticsearch"}""") - val indicesResponse = Json.parse( + + val aliasesResponse = Json.parse( + """ + |{ + | "bar": { + | "aliases": {} + | } + |} + """.stripMargin + ) + + val stateResponse = Json.parse( """ - |[ - | {"health":"green","status":"open","index":"index1","pri":"10","rep":"0","docs.count":"4330","docs.deleted":"10","store.size":"4.1mb","pri.store.size":"4.1mb"}, - | {"health":"green","status":"closed","index":"index2","pri":"10","rep":"0","docs.count":"1497203","docs.deleted":"5048","store.size":"860.9mb","pri.store.size":"860.9mb"} - |] + |{ + | "cluster_name": "elasticsearch", + | "blocks" : { + | "indices" : { + | "foo" : { + | "4" : { + | "description" : "index closed", + | "retryable" : false, + | "levels" : [ "read", "write" ] + | } + | } + | } + | } + |} """.stripMargin ) + val nodesResponse = Json.parse( """ - |[ - | {"host":"127.0.0.1","ip":"127.0.0.1","name":"Shriek"}, - | {"host":"127.0.0.1","ip":"127.0.0.1","name":"Jimaine Szardos"} - |] + |{ + | "_nodes": { + | "total": 1, + | "successful": 1, + | "failed": 0 + | }, + | "cluster_name": "elasticsearch", + | "nodes": { + | "zV4lLJgjQ6y-BkIQI0IChg": { + | "name": "foobar", + | "transport_address": "192.1.1.0:9300", + | "host": "192.1.1.0", + | "ip": "192.1.1.0", + | "version": "5.4.0", + | "build_hash": "780f8c4", + | "roles": [ + | "master", + | "data", + | "ingest" + | ], + | "attributes": { + | }, + | "transport": { + | "bound_address": [ + | "[::]:9300" + | ], + | "publish_address": "10.12.32.25:9300", + | "profiles": { + | } + | } + | }, + | "z4LJgjQ6y-BkIQI0IChg": { + | "name": "qux", + | "transport_address": "192.1.1.0:9300", + | "host": "192.1.1.0", + | "ip": "192.1.1.0", + | "version": "5.4.0", + | "build_hash": "780f8c4", + | "roles": [ + | "master", + | "data", + | "ingest" + | ], + | "attributes": { + | }, + | "transport": { + | "bound_address": [ + | "[::]:9300" + | ], + | "publish_address": "10.12.32.25:9300", + | "profiles": { + | } + | } + | } + | } + |} """.stripMargin ) - client.main(ElasticServer("somehost", None)) returns Future.successful(Success(200, mainResponse)) - client.getNodes(ElasticServer("somehost", None)) returns Future.successful(Success(200, nodesResponse)) - client.getIndices(ElasticServer("somehost", None)) returns Future.successful(Success(200, indicesResponse)) + client.executeRequest("GET", "_cluster/state/blocks", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, stateResponse)) + client.executeRequest("GET", "_nodes/transport", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, nodesResponse)) + client.executeRequest("GET", "_aliases", None, ElasticServer("somehost", None)) returns Future.successful(Success(200, aliasesResponse)) val response = route(application, FakeRequest(POST, "/cluster_changes").withBody(Json.obj("host" -> "somehost"))).get ensure(response, 200, expectedResponse) }