Commit 015c0e82 authored by Leonardo Menezes's avatar Leonardo Menezes
Browse files

url encode indices

es 2.X still supports special characters on indices names

closes #114
parent 1259bb9f
Loading
Loading
Loading
Loading
+82 −76
Original line number Diff line number Diff line
package elastic

import java.net.URLEncoder
import javax.inject.Singleton

import com.google.inject.Inject
import models.{ESAuth, ElasticServer}
import models.ElasticServer
import play.api.libs.json._
import play.api.libs.ws.{WSAuthScheme, WSClient}

@@ -14,101 +15,101 @@ import scala.concurrent.Future
class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {

  def main(target: ElasticServer) =
    execute(s"${target.host}", "GET", None, target.authentication)
    execute(s"", "GET", None, target)

  def clusterState(target: ElasticServer) = {
    val path = "/_cluster/state/master_node,routing_table,routing_nodes,blocks"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def indicesStats(target: ElasticServer) = {
    val path = "/_stats/docs,store"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def nodesStats(stats: Seq[String], target: ElasticServer) = {
    val path = s"/_nodes/stats/${stats.mkString(",")}?human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def indexStats(index: String, target: ElasticServer): Future[ElasticResponse] = {
    val path = s"/$index/_stats?human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_stats?human=true"
    execute(path, "GET", None, target)
  }

  def nodeStats(node: String, target: ElasticServer) = {
    val path = s"/_nodes/$node/stats?human"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/_nodes/${encoded(node)}/stats?human"
    execute(path, "GET", None, target)
  }

  def clusterSettings(target: ElasticServer) = {
    val path = "/_cluster/settings"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def aliases(target: ElasticServer) = {
    val path = "/_aliases"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def clusterHealth(target: ElasticServer) = {
    val path = "/_cluster/health"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def nodes(flags: Seq[String], target: ElasticServer) = {
    val path = s"/_nodes/_all/${flags.mkString(",")}?human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def closeIndex(index: String, target: ElasticServer) = {
    val path = s"/$index/_close"
    execute(s"${target.host}$path", "POST", None, target.authentication)
    val path = s"/${encoded(index)}/_close"
    execute(path, "POST", None, target)
  }

  def openIndex(index: String, target: ElasticServer) = {
    val path = s"/$index/_open"
    execute(s"${target.host}$path", "POST", None, target.authentication)
    val path = s"/${encoded(index)}/_open"
    execute(path, "POST", None, target)
  }

  def refreshIndex(index: String, target: ElasticServer) = {
    val path = s"/$index/_refresh"
    execute(s"${target.host}$path", "POST", None, target.authentication)
    val path = s"/${encoded(index)}/_refresh"
    execute(path, "POST", None, target)
  }

  def forceMerge(index: String, target: ElasticServer) = {
    val path = s"/$index/_forcemerge"
    execute(s"${target.host}$path", "POST", None, target.authentication)
    val path = s"/${encoded(index)}/_forcemerge"
    execute(path, "POST", None, target)
  }

  def clearIndexCache(index: String, target: ElasticServer) = {
    val path = s"/$index/_cache/clear"
    execute(s"${target.host}$path", "POST", None, target.authentication)
    val path = s"/${encoded(index)}/_cache/clear"
    execute(path, "POST", None, target)
  }

  def deleteIndex(index: String, target: ElasticServer) = {
    val path = s"/$index"
    execute(s"${target.host}$path", "DELETE", None, target.authentication)
    val path = s"/${encoded(index)}"
    execute(path, "DELETE", None, target)
  }

  def getIndexSettings(index: String, target: ElasticServer) = {
    val path = s"/$index/_settings"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_settings"
    execute(path, "GET", None, target)
  }

  def getIndexSettingsFlat(index: String, target: ElasticServer) = {
    val path = s"/$index/_settings?flat_settings=true&include_defaults=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_settings?flat_settings=true&include_defaults=true"
    execute(path, "GET", None, target)
  }

  def getIndexMapping(index: String, target: ElasticServer) = {
    val path = s"/$index/_mapping"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_mapping"
    execute(path, "GET", None, target)
  }

  def putClusterSettings(settings: String, target: ElasticServer) = {
    val path = "/_cluster/settings"
    execute(s"${target.host}$path", "PUT", Some(settings), target.authentication)
    execute(path, "PUT", Some(settings), target)
  }

  private def allocationSettings(value: String) =
@@ -121,8 +122,8 @@ class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {
    putClusterSettings(allocationSettings("none"), target)

  def getShardStats(index: String, target: ElasticServer) = {
    val path = s"/$index/_stats?level=shards&human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_stats?level=shards&human=true"
    execute(path, "GET", None, target)
  }

  def relocateShard(shard: Int, index: String, from: String, to: String, target: ElasticServer) = {
@@ -142,114 +143,114 @@ class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {
         |  ]
         |}
       """.stripMargin
    execute(s"${target.host}$path", "POST", Some(commands), target.authentication)
    execute(path, "POST", Some(commands), target)
  }

  def getIndexRecovery(index: String, target: ElasticServer) = {
    val path = s"/$index/_recovery?active_only=true&human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/${encoded(index)}/_recovery?active_only=true&human=true"
    execute(path, "GET", None, target)
  }

  def getClusterMapping(target: ElasticServer) = {
    val path = "/_mapping"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def getAliases(target: ElasticServer) = {
    val path = "/_aliases"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def updateAliases(changes: Seq[JsValue], target: ElasticServer) = {
    val path = "/_aliases"
    val body = Json.obj("actions" -> JsArray(changes))
    execute(s"${target.host}$path", "POST", Some(body.toString), target.authentication)
    execute(path, "POST", Some(body.toString), target)
  }

  def getIndexMetadata(index: String, target: ElasticServer) = {
    val path = s"/_cluster/state/metadata/$index?human=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/_cluster/state/metadata/${encoded(index)}?human=true"
    execute(path, "GET", None, target)
  }

  def createIndex(index: String, metadata: JsValue, target: ElasticServer) = {
    val path = s"/$index"
    execute(s"${target.host}$path", "PUT", Some(metadata.toString), target.authentication)
    val path = s"/${encoded(index)}"
    execute(path, "PUT", Some(metadata.toString), target)
  }

  def getIndices(target: ElasticServer) = {
    val path = s"/_cat/indices?format=json"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def getTemplates(target: ElasticServer) = {
    val path = s"/_template"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def createTemplate(name: String, template: JsValue, target: ElasticServer) = {
    val path = s"/_template/$name"
    execute(s"${target.host}$path", "PUT", Some(template.toString), target.authentication)
    val path = s"/_template/${encoded(name)}"
    execute(path, "PUT", Some(template.toString), target)
  }

  def deleteTemplate(name: String, target: ElasticServer) = {
    val path = s"/_template/$name"
    execute(s"${target.host}$path", "DELETE", None, target.authentication)
    val path = s"/_template/${encoded(name)}"
    execute(path, "DELETE", None, target)
  }

  def getNodes(target: ElasticServer) = {
    val path = s"/_cat/nodes?format=json"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def analyzeTextByField(index: String, field: String, text: String, target: ElasticServer) = {
    val path = s"/$index/_analyze"
    val path = s"/${encoded(index)}/_analyze"
    val body = Json.obj("text" -> text, "field" -> field).toString()
    execute(s"${target.host}$path", "GET", Some(body), target.authentication)
    execute(path, "GET", Some(body), target)
  }

  def analyzeTextByAnalyzer(index: String, analyzer: String, text: String, target: ElasticServer) = {
    val path = s"/$index/_analyze"
    val path = s"/${encoded(index)}/_analyze"
    val body = Json.obj("text" -> text, "analyzer" -> analyzer).toString()
    execute(s"${target.host}$path", "GET", Some(body), target.authentication)
    execute(path, "GET", Some(body), target)
  }

  def getClusterSettings(target: ElasticServer) = {
    val path = s"/_cluster/settings?flat_settings=true&include_defaults=true"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }


  // Repositories
  def getRepositories(target: ElasticServer) = {
    val path = s"/_snapshot"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    execute(path, "GET", None, target)
  }

  def createRepository(name: String, repoType: String, settings: JsValue, target: ElasticServer) = {
    val path = s"/_snapshot/$name"
    val path = s"/_snapshot/${encoded(name)}"
    val data = Json.obj("type" -> JsString(repoType), "settings" -> settings).toString
    execute(s"${target.host}$path", "PUT", Some(data), target.authentication)
    execute(path, "PUT", Some(data), target)
  }

  def deleteRepository(name: String, target: ElasticServer) = {
    val path = s"/_snapshot/$name"
    execute(s"${target.host}$path", "DELETE", None, target.authentication)
    val path = s"/_snapshot/${encoded(name)}"
    execute(path, "DELETE", None, target)
  }

  // Snapshots
  def getSnapshots(repository: String, target: ElasticServer) = {
    val path = s"/_snapshot/$repository/_all"
    execute(s"${target.host}$path", "GET", None, target.authentication)
    val path = s"/_snapshot/${encoded(repository)}/_all"
    execute(path, "GET", None, target)
  }

  def deleteSnapshot(repository: String, snapshot: String, target: ElasticServer) = {
    val path = s"/_snapshot/$repository/$snapshot"
    execute(s"${target.host}$path", "DELETE", None, target.authentication)
    val path = s"/_snapshot/${encoded(repository)}/${encoded(snapshot)}"
    execute(path, "DELETE", None, target)
  }

  def createSnapshot(repository: String, snapshot: String, ignoreUnavailable: Boolean,
                     includeGlobalState: Boolean, indices: Option[String], target: ElasticServer) = {
    val path = s"/_snapshot/$repository/$snapshot"
    val path = s"/_snapshot/${encoded(repository)}/${encoded(snapshot)}"
    val data = JsObject(
      Seq(
        ("repository", JsString(repository)),
@@ -258,13 +259,13 @@ class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {
        ("includeGlobalState", JsBoolean(includeGlobalState))
      ) ++ indices.map { i => Seq(("indices", JsString(i))) }.getOrElse(Nil)
    ).toString
    execute(s"${target.host}$path", "PUT", Some(data), target.authentication)
    execute(path, "PUT", Some(data), target)
  }

  def restoreSnapshot(repository: String, snapshot: String, renamePattern: Option[String],
                      renameReplacement: Option[String], ignoreUnavailable: Boolean, includeAliases: Boolean,
                      includeGlobalState: Boolean, indices: Option[String], target: ElasticServer) = {
    val path = s"/_snapshot/$repository/$snapshot/_restore"
    val path = s"/_snapshot/${encoded(repository)}/${encoded(snapshot)}/_restore"
    val data = JsObject(
      Seq(
        ("ignore_unavailable", JsBoolean(ignoreUnavailable)),
@@ -275,29 +276,31 @@ class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {
        renamePattern.map { r => Seq(("rename_pattern", JsString(r))) }.getOrElse(Nil) ++
        renameReplacement.map { r => Seq(("rename_replacement", JsString(r))) }.getOrElse(Nil)
    ).toString
    execute(s"${target.host}$path", "POST", Some(data), target.authentication)
    execute(path, "POST", Some(data), target)
  }

  def saveClusterSettings(settings: JsValue, target: ElasticServer) = {
    val path = s"/_cluster/settings"
    execute(s"${target.host}$path", "PUT", Some(settings.toString), target.authentication)
    execute(path, "PUT", Some(settings.toString), target)
  }

  def updateIndexSettings(index: String, settings: JsValue, target: ElasticServer) = {
    val path = s"/$index/_settings"
    execute(s"${target.host}$path", "PUT", Some(settings.toString), target.authentication)
    val path = s"/${encoded(index)}/_settings"
    execute(path, "PUT", Some(settings.toString), target)
  }

  // Cat requests
  def catRequest(api: String, target: ElasticServer) = {
    val path = s"/_cat/$api"
    execute(s"${target.host}$path?format=json", "GET", None, target.authentication)
    execute(s"$path?format=json", "GET", None, target)
  }

  def executeRequest(method: String, path: String, data: Option[JsValue], target: ElasticServer) =
    execute(s"${target.host}/$path", method, data.map(_.toString), target.authentication)
    execute(s"/${path}", method, data.map(_.toString), target)

  protected def execute[T](url: String, method: String, body: Option[String] = None, authentication: Option[ESAuth] = None) = {
  protected def execute[T](uri: String, method: String, body: Option[String] = None, target: ElasticServer) = {
    val authentication = target.authentication
    val url = s"${target.host}${uri}"
    val request = authentication.foldLeft(client.url(url).withMethod(method)) {
      case (request, auth) => request.withAuth(auth.username, auth.password, WSAuthScheme.BASIC)
    }
@@ -307,8 +310,11 @@ class HTTPElasticClient @Inject()(client: WSClient) extends ElasticClient {
    }
  }

  // FIXME: ES > 5.X does not support indices with special characters, so this could be removed
  private def encoded(text: String): String = URLEncoder.encode(text, "UTF-8")

  override def catMaster(target: ElasticServer): Future[ElasticResponse] = {
    val path = "/_cat/master"
    execute(s"${target.host}$path?format=json", "GET", None, target.authentication)
    execute(s"$path?format=json", "GET", None, target)
  }
}