diff --git a/elasticsearch-core/pom.xml b/elasticsearch-core/pom.xml index be4adcf..2bb4388 100644 --- a/elasticsearch-core/pom.xml +++ b/elasticsearch-core/pom.xml @@ -69,21 +69,21 @@ org.apache.lucene lucene-core - 4.10.4 + ${lucene.version} test org.apache.lucene lucene-analyzers-common - 4.10.4 + ${lucene.version} test org.elasticsearch elasticsearch - 1.6.2 + ${elasticsearch.version} test diff --git a/elasticsearch-core/src/main/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClient.scala b/elasticsearch-core/src/main/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClient.scala index 834de06..ab8a1bd 100644 --- a/elasticsearch-core/src/main/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClient.scala +++ b/elasticsearch-core/src/main/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClient.scala @@ -161,7 +161,7 @@ class RestlasticSearchClient(endpointProvider: EndpointProvider, signer: Option[ def createIndex(index: Index, settings: Option[IndexSetting] = None): Future[RawJsonResponse] = { implicit val ec = indexExecutionCtx runEsCommand(CreateIndex(settings), s"/${index.name}").recover { - case ElasticErrorResponse(message, status) if message.toString contains "IndexAlreadyExistsException" => + case ElasticErrorResponse(message, status) if message.toString contains "index_already_exists_exception" => throw IndexAlreadyExistsException(message.toString) } } @@ -171,9 +171,14 @@ class RestlasticSearchClient(endpointProvider: EndpointProvider, signer: Option[ runEsCommand(EmptyObject, s"/${index.name}", DELETE) } - def deleteDocument(index: Index, tpe: Type, query: QueryRoot): Future[RawJsonResponse] = { + def deleteDocument(index: Index, tpe: Type, deleteQuery: QueryRoot, pluginEnabled: Boolean = false): Future[RawJsonResponse] = { implicit val ec = indexExecutionCtx - runEsCommand(query, s"/${index.name}/${tpe.name}/_query", DELETE) + if (pluginEnabled) { + runEsCommand(deleteQuery, s"/${index.name}/${tpe.name}/_query", DELETE) + } else { + val documents = Await.result(query(index, tpe, deleteQuery, rawJsonStr = false), 10.seconds).rawSearchResponse.hits.hits.map(_._id) + bulkDelete(index, tpe, documents.map(Document(_, Map()))).map(res => RawJsonResponse(res.toString)) + } } // Scroll requests have optimizations that make them faster when the sort order is _doc. @@ -261,9 +266,10 @@ object RestlasticSearchClient { case class ScrollId(id: String) case class BulkIndexResponse(items: List[Map[String, BulkItem]]) - case class BulkItem(_index: String, _type: String, _id: String, status: Int, error: Option[String]) { + case class BulkIndexError(reason: String) + case class BulkItem(_index: String, _type: String, _id: String, status: Int, error: Option[BulkIndexError]) { def created = status > 200 && status < 299 && !alreadyExists - def alreadyExists = error.exists(_.contains("DocumentAlreadyExists")) + def alreadyExists = error.exists(_.reason.contains("document already exists")) def success = status >= 200 && status <= 299 } diff --git a/elasticsearch-core/src/test/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClientTest.scala b/elasticsearch-core/src/test/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClientTest.scala index daf00cb..aa78239 100644 --- a/elasticsearch-core/src/test/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClientTest.scala +++ b/elasticsearch-core/src/test/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClientTest.scala @@ -293,6 +293,7 @@ class RestlasticSearchClientTest extends WordSpec with Matchers with ScalaFuture } val delFut = restClient.deleteDocument(index, tpe, new QueryRoot(TermQuery("text7", "here7"))) Await.result(delFut, 10.seconds) + refresh() val resFut1 = restClient.query(index, tpe, new QueryRoot(TermQuery("text7", "here7"))) whenReady(resFut1) { res => res.sourceAsMap.toList should be(List()) diff --git a/elasticsearch-test/pom.xml b/elasticsearch-test/pom.xml index 7866d15..04a9a05 100644 --- a/elasticsearch-test/pom.xml +++ b/elasticsearch-test/pom.xml @@ -27,35 +27,29 @@ org.apache.lucene lucene-core - 4.10.4 + ${lucene.version} org.apache.lucene lucene-analyzers-common - 4.10.4 + ${lucene.version} org.elasticsearch elasticsearch - 1.6.2 + ${elasticsearch.version} org.elasticsearch elasticsearch - 1.6.2 + ${elasticsearch.version} test-jar - - org.apache.lucene - lucene-test-framework - 4.10.4 - - org.scalatest scalatest_${scala.version.major} diff --git a/elasticsearch-test/src/main/scala/com/sumologic/elasticsearch_test/ElasticsearchIntegrationTest.scala b/elasticsearch-test/src/main/scala/com/sumologic/elasticsearch_test/ElasticsearchIntegrationTest.scala index 0453a03..5060d42 100644 --- a/elasticsearch-test/src/main/scala/com/sumologic/elasticsearch_test/ElasticsearchIntegrationTest.scala +++ b/elasticsearch-test/src/main/scala/com/sumologic/elasticsearch_test/ElasticsearchIntegrationTest.scala @@ -22,7 +22,7 @@ import java.io.File import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.ImmutableSettings +import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.{InetSocketTransportAddress, LocalTransportAddress} import org.elasticsearch.node.NodeBuilder import org.scalatest.{BeforeAndAfterAll, Suite} @@ -71,10 +71,10 @@ trait ElasticsearchIntegrationTest extends BeforeAndAfterAll { object ElasticsearchIntegrationTest { private val r = new Random() - private lazy val esNodeSettings = ImmutableSettings.settingsBuilder().put("path.data", createTempDir("elasticsearch-test")).build() + private lazy val esNodeSettings = Settings.builder().put("path.home", createTempDir("elasticsearch-test")).build() private lazy val esNode = NodeBuilder.nodeBuilder().local(true).settings(esNodeSettings).node() - private lazy val settings = ImmutableSettings.settingsBuilder().put("node.local", "true").build() - lazy val client = new TransportClient(settings).addTransportAddress(new LocalTransportAddress("1")) + private lazy val settings = Settings.builder().put("node.local", "true").build() + lazy val client = esNode.client() lazy val globalEndpoint = { val nodeInfos = client.admin().cluster().prepareNodesInfo().clear().setSettings(true).setHttp(true).get() val nodeAddress = diff --git a/pom.xml b/pom.xml index 404a40f..a52de50 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,8 @@ 2.11 2.3.11 + 2.3.5 + 5.5.0