经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Elasticsearch » 查看文章
Elasticsearch Java API 很全的整理
来源:cnblogs  作者:张永清  时间:2019/10/18 8:45:31  对本文有异议

Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种。相比来说transportClient API效率更高,transportClient 是通过Elasticsearch内部RPC的形式进行请求的,连接可以是一个长连接,相当于是把客户端的请求当成

Elasticsearch 集群的一个节点。但是从Elasticsearch 7 后就会移除transportClient 。主要原因是transportClient 难以向下兼容版本。

本文中所有的讲解和操作都是基于jdk 1.8 和elasticsearch 6.2.4版本。

备注:本文参考了很多Elasticsearch 的官方文档以及部l网络资料做的综合整理。

一、High REST Client

High Client 基于 Low Client, 主要目的是暴露一些 API,这些 API 可以接受请求对象为参数,返回响应对象,而对请求和响应细节的处理都是由 client 自动完成的。

API 在调用时都可以是同步或者异步两种形式
同步 API 会导致阻塞,一直等待数据返回
异步 API 在命名上会加上 async 后缀,需要有一个 listener 作为参数,等这个请求返回结果或者发生错误时,这个 listener 就会被调用,listener主要是解决自动回调的问题,有点像安卓 开发里面的listener监听回调。

Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

Maven 依赖

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

client初始化:

RestHighLevelClient 实例依赖 REST low-level client builder

  1. public class ElasticSearchClient {
    private String[] hostsAndPorts;

    public ElasticSearchClient(String[] hostsAndPorts) {
    this.hostsAndPorts = hostsAndPorts;
    }
  1. public RestHighLevelClient getClient() {
  2. RestHighLevelClient client = null;
  3. List<HttpHost> httpHosts = new ArrayList<HttpHost>();
  4. if (hostsAndPorts.length > 0) {
  5. for (String hostsAndPort : hostsAndPorts) {
  6. String[] hp = hostsAndPort.split(":");
  7. httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
  8. }
  9. client = new RestHighLevelClient(
  10. RestClient.builder(httpHosts.toArray(new HttpHost[0])));
  11. } else {
  12. client = new RestHighLevelClient(
  13. RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
  14. }
  15. return client;
  16. }
    }

 

文档 API(High level rest 客户端支持下面的 文档(Document) API):

  • 单文档 API:
  • index API
  • Get API
  • Delete API
  • Update API
  • 多文档 API:
  • Bulk API
  • Multi-Get API

1、Index API:
IndexRequest:
封装好的参考方法:

  1. private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
  2. IndexRequest indexRequest = null;
  3. if (null == index || null == indexType) {
  4. throw new ElasticsearchException("index or indexType must not be null");
  5. }
  6. if (null == docId) {
  7. indexRequest = new IndexRequest(index, indexType);
  8. } else {
  9. indexRequest = new IndexRequest(index, indexType, docId);
  10. }
  11. return indexRequest;
  12. }
  13.  
  14. /**
  15. * 同步执行索引
  16. *
  17. * @param index
  18. * @param indexType
  19. * @param docId
  20. * @param dataMap
  21. * @throws IOException
  22. */
  23. public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  24. return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
  25. }
  26.  
  27. /**
  28. * 异步执行
  29. *
  30. * @param index
  31. * @param indexType
  32. * @param docId
  33. * @param dataMap
  34. * @param indexResponseActionListener
  35. * @throws IOException
  36. */
  37. public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
  38. getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
  39. }

API解释:  

 

  1. IndexRequest request = new IndexRequest(
  2. "posts", // 索引 Index
  3. "doc", // Type
  4. "1"); // 文档 Document Id
  5. String jsonString = "{" +
  6. "\"user\":\"kimchy\"," +
  7. "\"postDate\":\"2013-01-30\"," +
  8. "\"message\":\"trying out Elasticsearch\"" +
  9. "}";
  10. request.source(jsonString, XContentType.JSON); // 文档源格式为 json string

Document Source
document source 可以是下面的格式

Map类型的输入:

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("user", "kimchy");
  3. jsonMap.put("postDate", new Date());
  4. jsonMap.put("message", "trying out Elasticsearch");
  5. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  6. .source(jsonMap); // 会自动将 Map 转换为 JSON 格式

XContentBuilder : 这是 Document Source 提供的帮助类,专门用来产生 json 格式的数据:

  1. XContentBuilder builder = XContentFactory.jsonBuilder();
  2. builder.startObject();
  3. {
  4. builder.field("user", "kimchy");
  5. builder.timeField("postDate", new Date());
  6. builder.field("message", "trying out Elasticsearch");
  7. }
  8. builder.endObject();
  9. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  10. .source(builder);

Object 键对:

  1. IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
  2. .source("user", "kimchy",
  3. "postDate", new Date(),
  4. "message", "trying out Elasticsearch");

同步索引:

  1. IndexResponse indexResponse = client.index(request);

异步索引:异步执行函数需要添加 listener, 而对于 index 而言,这个 listener 的类型就是 ActionListener

  1. client.indexAsync(request, listener);

异步方法执行后会立刻返回,在索引操作执行完成后,ActionListener 就会被回调:

执行成功,调用 onResponse 函数
执行失败,调用 onFailure 函数

  1. ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
  2. @Override
  3. public void onResponse(IndexResponse indexResponse) {
  4. }
  5. @Override
  6. public void onFailure(Exception e) {
  7. }
  8. };

IndexResponse:
不管是同步回调还是异步回调,如果调用成功,都会返回 IndexRespose 对象。 

  1. String index = indexResponse.getIndex();
  2. String type = indexResponse.getType();
  3. String id = indexResponse.getId();
  4. long version = indexResponse.getVersion();
  5. if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. // 文档第一次创建
  7. } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  8. // 文档之前已存在,当前是重写
  9. }
  10. ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
  11. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  12. // 成功的分片数量少于总分片数量
  13. }
  14. if (shardInfo.getFailed() > 0) {
  15. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  16. String reason = failure.reason(); // 处理潜在的失败信息
  17. }
  18. }

在索引时有版本冲突的话,会抛出 ElasticsearchException

  1. IndexRequest request = new IndexRequest("posts", "doc", "1")
  2. .source("field", "value")
  3. .version(1); // 这里是文档版本号
  4. try {
  5. IndexResponse response = client.index(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. // 冲突了
  9. }
  10. }

如果将 opType 设置为 create, 而且如果索引的文档与已存在的文档在 index, type 和 id 上均相同,也会抛出冲突异常。

  1. IndexRequest request = new IndexRequest("posts", "doc", "1")
  2. .source("field", "value")
  3. .opType(DocWriteRequest.OpType.CREATE);
  4. try {
  5. IndexResponse response = client.index(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. }
  9. }

2、GET API
GET 请求
每个 GET 请求都必须需传入下面 3 个参数:

  • Index
  • Type
  • Document id
  1. GetRequest getRequest = new GetRequest(
  2. "posts",
  3. "doc",
  4. "1");

可选参数
下面的参数都是可选的, 里面的选项并不完整,如要获取完整的属性,请参考 官方文档

不获取源数据,默认是获取的

  1. request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

配置返回数据中包含指定字段

  1. String[] includes = new String[]{"message", "*Date"};
  2. String[] excludes = Strings.EMPTY_ARRAY;
  3. FetchSourceContext fetchSourceContext =
  4. new FetchSourceContext(true, includes, excludes);
  5. request.fetchSourceContext(fetchSourceContext);

配置返回数据中排除指定字段

  1. String[] includes = Strings.EMPTY_ARRAY;
  2. String[] excludes = new String[]{"message"};
  3. FetchSourceContext fetchSourceContext =
  4. new FetchSourceContext(true, includes, excludes);
  5. request.fetchSourceContext(fetchSourceContext);

实时 默认为 true

  1. request.realtime(false);

版本

  1. request.version(2);

版本类型

  1. request.versionType(VersionType.EXTERNAL);

同步执行

  1. GetResponse getResponse = client.get(getRequest);

异步执行
此部分与 index 相似, 只有一点不同, 返回类型为 GetResponse

Get Response
返回的 GetResponse 对象包含要请求的文档数据(包含元数据和字段)

 

  1. String index = getResponse.getIndex();
  2. String type = getResponse.getType();
  3. String id = getResponse.getId();
  4. if (getResponse.isExists()) {
  5. long version = getResponse.getVersion();
  6. String sourceAsString = getResponse.getSourceAsString(); // string 形式
  7. Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map
  8. byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字节形式
  9. } else {
  10. // 没有发现请求的文档
  11. }

在请求中如果包含特定的文档版本,如果与已存在的文档版本不匹配, 就会出现冲突

  1. try {
  2. GetRequest request = new GetRequest("posts", "doc", "1").version(2);
  3. GetResponse getResponse = client.get(request);
  4. } catch (ElasticsearchException exception) {
  5. if (exception.status() == RestStatus.CONFLICT) {
  6. // 版本冲突
  7. }
  8. }
  1. 封装好的参考方法:
  2. /**
  3. * @param index
  4. * @param indexType
  5. * @param docId
  6. * @param includes 返回需要包含的字段,可以传入空
  7. * @param excludes 返回需要不包含的字段,可以传入为空
  8. * @param excludes version
  9. * @param excludes versionType
  10. * @return
  11. * @throws IOException
  12. */
  13.  
  14. public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
  15. if (null == includes || includes.length == 0) {
  16. includes = Strings.EMPTY_ARRAY;
  17. }
  18. if (null == excludes || excludes.length == 0) {
  19. excludes = Strings.EMPTY_ARRAY;
  20. }
  21. GetRequest getRequest = new GetRequest(index, indexType, docId);
  22. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  23. getRequest.realtime(true);
  24. if (null != version) {
  25. getRequest.version(version);
  26. }
  27. if (null != versionType) {
  28. getRequest.versionType(versionType);
  29. }
  30. return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
  31. }
  32. /**
  33. * @param index
  34. * @param indexType
  35. * @param docId
  36. * @param includes
  37. * @param excludes
  38. * @return
  39. * @throws IOException
  40. */
  41.  
  42. public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
  43. return getRequest(index, indexType, docId, includes, excludes, null, null);
  44. }
  45. /**
  46. * @param index
  47. * @param indexType
  48. * @param docId
  49. * @return
  50. * @throws IOException
  51. */
  52. public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
  53. GetRequest getRequest = new GetRequest(index, indexType, docId);
  54. return getClient().get(getRequest);
  55. }

3、Exists API

如果文档存在 Exists API 返回 true, 否则返回 fasle。

Exists Request

GetRequest 用法和 Get API 差不多,两个对象的可选参数是相同的。由于 exists() 方法只返回 true 或者 false, 建议将获取 _source 以及任何存储字段的值关闭,尽量使请求轻量级。

  1. GetRequest getRequest = new GetRequest(
  2. "posts", // Index
  3. "doc", // Type
  4. "1"); // Document id
  5. getRequest.fetchSourceContext(new FetchSourceContext(false)); // 禁用 _source 字段
  6. getRequest.storedFields("_none_"); // 禁止存储任何字段

同步请求

  1. boolean exists = client.exists(getRequest);

异步请求
异步请求与 Index API 相似,此处不赘述,只粘贴代码。如需详细了解,请参阅官方地址

  1. ActionListener<Boolean> listener = new ActionListener<Boolean>() {
  2. @Override
  3. public void onResponse(Boolean exists) {
  4. }
  5. @Override
  6. public void onFailure(Exception e) {
  7. }
  8. };
  9. client.existsAsync(getRequest, listener);

封装的参考方法:

  1. /**
  2. * @param index
  3. * @param indexType
  4. * @param docId
  5. * @return
  6. * @throws IOException
  7. */
  8. public Boolean existDoc(String index, String indexType, String docId) throws IOException {
  9. GetRequest getRequest = new GetRequest(index, indexType, docId);
  10. getRequest.fetchSourceContext(new FetchSourceContext(false));
  11. getRequest.storedFields("_none_");
  12. return getClient().exists(getRequest);
  13. }

4、Delete API

Delete Request
DeleteRequest 必须传入下面参数

  1. DeleteRequest request = new DeleteRequest(
  2. "posts", // index
  3. "doc", // doc
  4. "1"); // document id

可选参数
超时时间

  1. request.timeout(TimeValue.timeValueMinutes(2));
  2. request.timeout("2m");

刷新策略

  1. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
  2. request.setRefreshPolicy("wait_for");

版本

  1. request.version(2);

版本类型

  1. request.versionType(VersionType.EXTERNAL);
  1. 同步执行
    DeleteResponse deleteResponse = client.delete(request);

异步执行

  1. ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
  2. @Override
  3. public void onResponse(DeleteResponse deleteResponse) {
  4. }
  5. @Override
  6. public void onFailure(Exception e) {
  7. }
  8. };


client.deleteAsync(request, listener);
Delete Response

  1.  

DeleteResponse 可以检索执行操作的信息

  1. String index = deleteResponse.getIndex();
  2. String type = deleteResponse.getType();
  3. String id = deleteResponse.getId();
  4. long version = deleteResponse.getVersion();
  5. ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
  6. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  7. // 成功分片数目小于总分片
  8. }
  9. if (shardInfo.getFailed() > 0) {
  10. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  11. String reason = failure.reason(); // 处理潜在失败
  12. }
  13. }

也可以来检查文档是否存在

  1. DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
  2. DeleteResponse deleteResponse = client.delete(request);
  3. if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
  4. // 文档不存在
  5. }
  6. 版本冲突时也会抛出 `ElasticsearchException
  7. try {
  8. DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
  9. DeleteResponse deleteResponse = client.delete(request);
  10. } catch (ElasticsearchException exception) {
  11. if (exception.status() == RestStatus.CONFLICT) {
  12. // 版本冲突
  13. }
  14. }

封装好的参考方法:

  1. /**
  2. * @param index
  3. * @param indexType
  4. * @param docId
  5. * @param timeValue
  6. * @param refreshPolicy
  7. * @param version
  8. * @param versionType
  9. * @return
  10. * @throws IOException
  11. */
  12. public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
  13. DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
  14. if (null != timeValue) {
  15. deleteRequest.timeout(timeValue);
  16. }
  17. if (null != refreshPolicy) {
  18. deleteRequest.setRefreshPolicy(refreshPolicy);
  19. }
  20. if (null != version) {
  21. deleteRequest.version(version);
  22. }
  23. if (null != versionType) {
  24. deleteRequest.versionType(versionType);
  25. }
  26. return getClient().delete(deleteRequest);
  27. }
  28. /**
  29. * @param index
  30. * @param indexType
  31. * @param docId
  32. * @return
  33. * @throws IOException
  34. */
  35. public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
  36. return deleteDoc(index, indexType, docId, null, null, null, null);
  37. }

5、Update API

Update Request
UpdateRequest 的必需参数如下

  1. UpdateRequest request = new UpdateRequest(
  2. "posts", // Index
  3. "doc", // 类型
  4. "1"); // 文档 Id

使用脚本更新

部分文档更新:
在更新部分文档时,已存在文档与部分文档会合并。

部分文档可以有以下形式:

JSON 格式:

  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1");
  2. String jsonString = "{" +
  3. "\"updated\":\"2017-01-01\"," +
  4. "\"reason\":\"daily update\"" +
  5. "}";
  6. request.doc(jsonString, XContentType.JSON);

Map 格式:

  1. Map<String, Object> jsonMap = new HashMap<>();
  2. jsonMap.put("updated", new Date());
  3. jsonMap.put("reason", "daily update");
  4. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  5. .doc(jsonMap);

XContentBuilder 对象:

  1. XContentBuilder builder = XContentFactory.jsonBuilder();
  2. builder.startObject();
  3. {
  4. builder.timeField("updated", new Date());
  5. builder.field("reason", "daily update");
  6. }
  7. builder.endObject();
  8. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  9. .doc(builder);
  10. Object key-pairs
  11. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  12. .doc("updated", new Date(),
  13. "reason", "daily update");

Upserts:如果文档不存在,可以使用 upserts 方法将文档以新文档的方式创建。

  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  2. .doc("updated", new Date(),
  3. "reason", "daily update");

upserts 方法支持的文档格式与 update 方法相同。

可选参数:
超时时间

  1. request.timeout(TimeValue.timeValueSeconds(1));
  2. request.timeout("1s");

刷新策略

  1. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
  2. request.setRefreshPolicy("wait_for");

冲突后重试次数

  1. request.retryOnConflict(3);

获取数据源,默认是开启的

  1. request.fetchSource(true);

包括特定字段

  1. String[] includes = new String[]{"updated", "r*"};
  2. String[] excludes = Strings.EMPTY_ARRAY;
  3. request.fetchSource(new FetchSourceContext(true, includes, excludes));

排除特定字段

  1. String[] includes = Strings.EMPTY_ARRAY;
  2. String[] excludes = new String[]{"updated"};
  3. request.fetchSource(new FetchSourceContext(true, includes, excludes));

指定版本

  1. request.version(2);

禁用 noop detection

  1. request.scriptedUpsert(true);

 

设置如果更新的文档不存在,就必须要创建一个

  1. request.docAsUpsert(true);

同步执行

  1. UpdateResponse updateResponse = client.update(request);

异步执行

  1. ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
  2. @Override
  3. public void onResponse(UpdateResponse updateResponse) {
  4. }
  5. @Override
  6. public void onFailure(Exception e) {
  7. }
  8. };
  9. client.updateAsync(request, listener);

Update Response

  1. String index = updateResponse.getIndex();
  2. String type = updateResponse.getType();
  3. String id = updateResponse.getId();
  4. long version = updateResponse.getVersion();
  5. if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
  6. // 文档已创建
  7. } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
  8. // 文档已更新
  9. } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
  10. // 文档已删除
  11. } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
  12. // 文档不受更新的影响
  13. }

如果在 UpdateRequest 中使能了获取源数据,响应中则包含了更新后的源文档信息。

  1. GetResult result = updateResponse.getGetResult();
  2. if (result.isExists()) {
  3. String sourceAsString = result.sourceAsString(); // 将获取的文档以 string 格式输出
  4. Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式输出
  5. byte[] sourceAsBytes = result.source(); // 字节形式
  6. } else {
  7. // 默认情况下,不会返回文档源数据
  8. }

也可以检测是否分片失败

  1. ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
  2. if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
  3. // 成功的分片数量小于总分片数量
  4. }
  5. if (shardInfo.getFailed() > 0) {
  6. for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
  7. String reason = failure.reason(); // 得到分片失败的原因
  8. }
  9. }

如果在执行 UpdateRequest 时,文档不存在,响应中会包含 404 状态码,而且会抛出 ElasticsearchException 。

  1. UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
  2. .doc("field", "value");
  3. try {
  4. UpdateResponse updateResponse = client.update(request);
  5. } catch (ElasticsearchException e) {
  6. if (e.status() == RestStatus.NOT_FOUND) {
  7. // 处理文档不存在的情况
  8. }
  9. }

如果版本冲突,也会抛出 ElasticsearchException

  1. UpdateRequest request = new UpdateRequest("posts", "doc", "1")
  2. .doc("field", "value")
  3. .version(1);
  4. try {
  5. UpdateResponse updateResponse = client.update(request);
  6. } catch(ElasticsearchException e) {
  7. if (e.status() == RestStatus.CONFLICT) {
  8. // 处理版本冲突的情况
  9. }
  10. }

封装好的参考方法:

  1. /**
  2. * @param index
  3. * @param indexType
  4. * @param docId
  5. * @param dataMap
  6. * @param timeValue
  7. * @param refreshPolicy
  8. * @param version
  9. * @param versionType
  10. * @param docAsUpsert
  11. * @param includes
  12. * @param excludes
  13. * @return
  14. * @throws IOException
  15. */
  16. public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
  17. UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
  18. updateRequest.doc(dataMap);
  19. if (null != timeValue) {
  20. updateRequest.timeout(timeValue);
  21. }
  22. if (null != refreshPolicy) {
  23. updateRequest.setRefreshPolicy(refreshPolicy);
  24. }
  25. if (null != version) {
  26. updateRequest.version(version);
  27. }
  28. if (null != versionType) {
  29. updateRequest.versionType(versionType);
  30. }
  31. updateRequest.docAsUpsert(docAsUpsert);
  32. //冲突时重试的次数
  33. updateRequest.retryOnConflict(3);
  34. if (null == includes && null == excludes) {
  35. return getClient().update(updateRequest);
  36. } else {
  37. if (null == includes || includes.length == 0) {
  38. includes = Strings.EMPTY_ARRAY;
  39. }
  40. if (null == excludes || excludes.length == 0) {
  41. excludes = Strings.EMPTY_ARRAY;
  42. }
  43. return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
  44. }
  45. }
  46. /**
  47. * 更新时不存在就插入
  48. *
  49. * @param index
  50. * @param indexType
  51. * @param docId
  52. * @param dataMap
  53. * @return
  54. * @throws IOException
  55. */
  56. public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  57. return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
  58. }
  59. /**
  60. * 存在才更新
  61. *
  62. * @param index
  63. * @param indexType
  64. * @param docId
  65. * @param dataMap
  66. * @return
  67. * @throws IOException
  68. */
  69. public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  70. return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
  71. }

 

6、Bulk API 批量处理

批量请求
使用 BulkRequest 可以在一次请求中执行多个索引,更新和删除的操作。

  1. BulkRequest request = new BulkRequest();
  2. request.add(new IndexRequest("posts", "doc", "1")
  3. .source(XContentType.JSON,"field", "foo")); // 将第一个 IndexRequest 添加到批量请求中
  4. request.add(new IndexRequest("posts", "doc", "2")
  5. .source(XContentType.JSON,"field", "bar")); // 第二个
  6. request.add(new IndexRequest("posts", "doc", "3")
  7. .source(XContentType.JSON,"field", "baz")); // 第三个

在同一个 BulkRequest 也可以添加不同的操作类型

  1. BulkRequest request = new BulkRequest();
  2. request.add(new DeleteRequest("posts", "doc", "3"));
  3. request.add(new UpdateRequest("posts", "doc", "2")
  4. .doc(XContentType.JSON,"other", "test"));
  5. request.add(new IndexRequest("posts", "doc", "4")
  6. .source(XContentType.JSON,"field", "baz"));

可选参数
超时时间

  1. request.timeout(TimeValue.timeValueMinutes(2));
  2. request.timeout("2m");

刷新策略

  1. request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
  2. request.setRefreshPolicy("wait_for");

设置在批量操作前必须有几个分片处于激活状态

 

  1. request.waitForActiveShards(2);
  2. request.waitForActiveShards(ActiveShardCount.ALL); // 全部分片都处于激活状态
  3. request.waitForActiveShards(ActiveShardCount.DEFAULT); // 默认
  4. request.waitForActiveShards(ActiveShardCount.ONE); // 一个

同步请求

  1. BulkResponse bulkResponse = client.bulk(request);

异步请求

  1. ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
  2. @Override
  3. public void onResponse(BulkResponse bulkResponse) {
  4. }
  5. @Override
  6. public void onFailure(Exception e) {
  7. }
  8. };
  9. client.bulkAsync(request, listener);

Bulk Response
BulkResponse 中包含执行操作后的信息,并允许对每个操作结果迭代。

  1. for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍历所有的操作结果
  2. DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 获取操作结果的响应,可以是 IndexResponse, UpdateResponse or DeleteResponse, 它们都可以惭怍是 DocWriteResponse 实例
  3.  
  4. if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
  5. || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
  6. IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作后的响应结果
  7. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
  8. UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作后的响应结果
  9. } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
  10. DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作后的响应结果
  11. }
  12. }

此外,批量响应还有一个非常便捷的方法来检测是否有一个或多个操作失败

  1. if (bulkResponse.hasFailures()) {
  2. // 表示至少有一个操作失败
  3. }

在这种情况下,我们要遍历所有的操作结果,检查是否是失败的操作,并获取对应的失败信息

  1. for (BulkItemResponse bulkItemResponse : bulkResponse) {
  2. if (bulkItemResponse.isFailed()) { // 检测给定的操作是否失败
  3. BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 获取失败信息
  4. }
  5. }

Bulk Processor
BulkProcessor 是为了简化 Bulk API 的操作提供的一个工具类,要执行操作,就需要下面组件

RestHighLevelClient 用来执行 BulkRequest 并获取 BulkResponse`
BulkProcessor.Listener 对 BulkRequest 执行前后以及失败时监听
BulkProcessor.builder 方法用来构建一个新的BulkProcessor

  1. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  2. @Override
  3. public void beforeBulk(long executionId, BulkRequest request) {
  4. // 在每个 BulkRequest 执行前调用
  5. }
  6. @Override
  7. public void afterBulk(long executionId, BulkRequest request,
  8. BulkResponse response) {
  9. // 在每个 BulkRequest 执行后调用
  10. }
  11. @Override
  12. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  13. // 失败时调用
  14. }
  15. };

BulkProcessor.Builder 提供了多个方法来配置 BulkProcessor
如何来处理请求的执行。

  1. BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
  2. builder.setBulkActions(500); // 指定多少操作时,就会刷新一次
  3. builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
  4. builder.setConcurrentRequests(0); // 指定多大容量,就会刷新一次
  5. builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允许并发执行的数量
  6. builder.setBackoffPolicy(BackoffPolicy
  7. .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
  8. BulkProcessor 创建后,各种请求就可以添加进去:
  9. IndexRequest one = new IndexRequest("posts", "doc", "1").
  10. source(XContentType.JSON, "title",
  11. "In which order are my Elasticsearch queries executed?");
  12. IndexRequest two = new IndexRequest("posts", "doc", "2")
  13. .source(XContentType.JSON, "title",
  14. "Current status and upcoming changes in Elasticsearch");
  15. IndexRequest three = new IndexRequest("posts", "doc", "3")
  16. .source(XContentType.JSON, "title",
  17. "The Future of Federated Search in Elasticsearch");
  18. bulkProcessor.add(one);
  19. bulkProcessor.add(two);
  20. bulkProcessor.add(three);

BulkProcessor 执行时,会对每个 bulk request调用 BulkProcessor.Listener , listener 提供了下面方法来访问 BulkRequest 和 BulkResponse:

  1. BulkProcessor.Listener listener = new BulkProcessor.Listener() {
  2. @Override
  3. public void beforeBulk(long executionId, BulkRequest request) {
  4. int numberOfActions = request.numberOfActions(); // 在执行前获取操作的数量
  5. logger.debug("Executing bulk [{}] with {} requests",
  6. executionId, numberOfActions);
  7. }
  8. @Override
  9. public void afterBulk(long executionId, BulkRequest request,
  10. BulkResponse response) {
  11. if (response.hasFailures()) { // 执行后查看响应中是否包含失败的操作
  12. logger.warn("Bulk [{}] executed with failures", executionId);
  13. } else {
  14. logger.debug("Bulk [{}] completed in {} milliseconds",
  15. executionId, response.getTook().getMillis());
  16. }
  17. }
  18. @Override
  19. public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
  20. logger.error("Failed to execute bulk", failure); // 请求失败时打印信息
  21. }
  22. };

请求添加到 BulkProcessor , 它的实例可以使用下面两种方法关闭请求。

awaitClose() 在请求返回后或等待一定时间关闭

  1. boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close() 立刻关闭

  1. bulkProcessor.close();

两个方法都会在关闭前对处理器中的请求进行刷新,并避免新的请求添加进去。

封装好的参考方法:

  1. /**
  2. * 批量操作
  3. *
  4. * @param indexBeanList
  5. * @param timeValue
  6. * @param refreshPolicy
  7. * @return
  8. * @throws IOException
  9. */
  10. public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException {
  11. BulkRequest bulkRequest = getBulkRequest(indexBeanList);
  12. if (null != timeValue) {
  13. bulkRequest.timeout(timeValue);
  14. }
  15. if (null != refreshPolicy) {
  16. bulkRequest.setRefreshPolicy(refreshPolicy);
  17. }
  18. return getClient().bulk(bulkRequest);
  19. }
  20. private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) {
  21. BulkRequest bulkRequest = new BulkRequest();
  22. indexBeanList.forEach(indexBean -> {
  23. if ("1".equals(indexBean.getOperateType())) {
  24. bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType()));
  25. } else if ("2".equals(indexBean.getOperateType())) {
  26. if ((null != indexBean.getDocId())) {
  27. throw new ElasticsearchException("update action docId must not be null");
  28. }
  29. bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
  30. } else if ("3".equals(indexBean.getOperateType())) {
  31. if ((null != indexBean.getDocId())) {
  32. throw new ElasticsearchException("delete action docId must not be null");
  33. }
  34. bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
  35. } else {
  36. throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support");
  37. }
  38. });
  39. return bulkRequest;
  40. }
  41. /**
  42. * 批量操作
  43. *
  44. * @param indexBeanList
  45. * @return
  46. */
  47. public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException {
  48. return bulkRequest(indexBeanList, null, null);
  49. }
  50. /**
  51. * 批量异步操作
  52. *
  53. * @param indexBeanList
  54. * @param bulkResponseActionListener
  55. */
  56.  
  57. public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) {
  58. getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener);
  59. }

7、Search APIs:

Java High Level REST Client 支持下面的 Search API:

  • Search API
  • Search Scroll API
  • Clear Scroll API
  • Multi-Search API
  • Ranking Evaluation API

Search API
Search Request
searchRequest 用来完成和搜索文档,聚合,建议等相关的任何操作同时也提供了各种方式来完成对查询结果的高亮操作。

最基本的查询操作如下

  1. SearchRequest searchRequest = new SearchRequest();
  2. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  3. searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 添加 match_all 查询
  4. searchRequest.source(searchSourceBuilder); // 将 SearchSourceBuilder 添加到 SeachRequest 中

可选参数

  1. SearchRequest searchRequest = new SearchRequest("posts"); // 设置搜索的 index
  2. searchRequest.types("doc"); // 设置搜索的 type

除了配置 index 和 type 外,还有一些其他的可选参数

  1. searchRequest.routing("routing"); // 设置 routing 参数
  2. searchRequest.preference("_local"); // 配置搜索时偏爱使用本地分片,默认是使用随机分片

什么是 routing 参数?
当索引一个文档的时候,文档会被存储在一个主分片上。在存储时一般都会有多个主分片。Elasticsearch 如何知道一个文档应该放置在哪个分片呢?这个过程是根据下面的这个公式来决定的:

shard = hash(routing) % number_of_primary_shards
routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值
number_of_primary_shards 是主分片数量
所有的文档 API 都接受一个叫做 routing 的路由参数,通过这个参数我们可以自定义文档到分片的映射。一个自定义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被存储到同一个分片中。

使用 SearchSourceBuilder
对搜索行为的配置可以使用 SearchSourceBuilder 来完成,来看一个实例

  1. SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); // 默认配置
  2. sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy")); // 设置搜索,可以是任何类型的 QueryBuilder
  3. sourceBuilder.from(0); // 起始 index
  4. sourceBuilder.size(5); // 大小 size
  5. sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); // 设置搜索的超时时间

设置完成后,就可以添加到 SearchRequest 中。

  1. SearchRequest searchRequest = new SearchRequest();
  2. searchRequest.source(sourceBuilder);

构建查询条件
查询请求是通过使用 QueryBuilder 对象来完成的,并且支持 Query DSL。

DSL (domain-specific language) 领域特定语言,是指专注于某个应用程序领域的计算机语言。

可以使用构造函数来创建 QueryBuilder

  1. MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");

QueryBuilder 创建后,就可以调用方法来配置它的查询选项:

  1. matchQueryBuilder.fuzziness(Fuzziness.AUTO); // 模糊查询
  2. matchQueryBuilder.prefixLength(3); // 前缀查询的长度
  3. matchQueryBuilder.maxExpansions(10); // max expansion 选项,用来控制模糊查询

也可以使用QueryBuilders 工具类来创建 QueryBuilder 对象。这个类提供了函数式编程风格的各种方法用来快速创建 QueryBuilder 对象。

  1. QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy")
  2. .fuzziness(Fuzziness.AUTO)
  3. .prefixLength(3)
  4. .maxExpansions(10);

fuzzy-matching 拼写错误时的匹配:

好的全文检索不应该是完全相同的限定逻辑,相反,可以扩大范围来包括可能的匹配,从而根据相关性得分将更好的匹配放在前面。

例如,搜索 quick brown fox 时会匹配一个包含 fast brown foxes 的文档

不论什么方式创建的 QueryBuilder ,最后都需要添加到 `SearchSourceBuilder 中

  1. searchSourceBuilder.query(matchQueryBuilder);

构建查询 文档中提供了一个丰富的查询列表,里面包含各种查询对应的QueryBuilder 对象以及QueryBuilder helper 方法,大家可以去参考。

关于构建查询的内容会在下篇文章中讲解,敬请期待。

指定排序
SearchSourceBuilder 允许添加一个或多个SortBuilder 实例。这里包含 4 种特殊的实现, (Field-, Score-, GeoDistance- 和 ScriptSortBuilder)

  1. sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); // 根据分数 _score 降序排列 (默认行为)
  2. sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); // 根据 id 降序排列

过滤数据源
默认情况下,查询请求会返回文档的内容 _source ,当然我们也可以配置它。例如,禁止对 _source 的获取

  1. sourceBuilder.fetchSource(false);

也可以使用通配符模式以更细的粒度包含或排除特定的字段:

  1. String[] includeFields = new String[] {"title", "user", "innerObject.*"};
  2. String[] excludeFields = new String[] {"_type"};
  3. sourceBuilder.fetchSource(includeFields, excludeFields);

高亮请求
可以通过在 SearchSourceBuilder 上设置 HighlightBuilder 完成对结果的高亮,而且可以配置不同的字段具有不同的高亮行为。

  1. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  2. HighlightBuilder highlightBuilder = new HighlightBuilder();
  3. HighlightBuilder.Field highlightTitle =
  4. new HighlightBuilder.Field("title"); // title 字段高亮
  5. highlightTitle.highlighterType("unified"); // 配置高亮类型
  6. highlightBuilder.field(highlightTitle); // 添加到 builder
  7. HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
  8. highlightBuilder.field(highlightUser);
  9. searchSourceBuilder.highlighter(highlightBuilder);

聚合请求
要实现聚合请求分两步

创建合适的 `AggregationBuilder
作为参数配置在 `SearchSourceBuilder 上

  1. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  2. TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
  3. .field("company.keyword");
  4. aggregation.subAggregation(AggregationBuilders.avg("average_age")
  5. .field("age"));
  6. searchSourceBuilder.aggregation(aggregation);

建议请求 Requesting Suggestions

SuggestionBuilder 实现类是由 SuggestBuilders 工厂类来创建的。

  1. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  2. SuggestionBuilder termSuggestionBuilder =
  3. SuggestBuilders.termSuggestion("user").text("kmichy");
  4. SuggestBuilder suggestBuilder = new SuggestBuilder();
  5. suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
  6. searchSourceBuilder.suggest(suggestBuilder);

对请求和聚合分析
分析 API 可用来对一个特定的查询操作中的请求和聚合进行分析,此时要将SearchSourceBuilder 的 profile标志位设置为 true

  1. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  2. searchSourceBuilder.profile(true);

只要 SearchRequest 执行完成,对应的 SearchResponse 响应中就会包含 分析结果

同步执行
同步执行是阻塞式的,只有结果返回后才能继续执行。

  1. SearchResponse searchResponse = client.search(searchRequest);

异步执行
异步执行使用的是 listener 对结果进行处理。

  1. ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
  2. @Override
  3. public void onResponse(SearchResponse searchResponse) {
  4. // 查询成功
  5. }
  6. @Override
  7. public void onFailure(Exception e) {
  8. // 查询失败
  9. }
  10. };

SearchResponse

查询执行完成后,会返回 SearchResponse 对象,并在对象中包含查询执行的细节和符合条件的文档集合。

归纳一下, SerchResponse 包含的信息如下

请求本身的信息,如 HTTP 状态码,执行时间,或者请求是否超时

  1. RestStatus status = searchResponse.status(); // HTTP 状态码
  2. TimeValue took = searchResponse.getTook(); // 查询占用的时间
  3. Boolean terminatedEarly = searchResponse.isTerminatedEarly(); // 是否由于 SearchSourceBuilder 中设置 terminateAfter 而过早终止
  4. boolean timedOut = searchResponse.isTimedOut(); // 是否超时

查询影响的分片数量的统计信息,成功和失败的分片

  1. int totalShards = searchResponse.getTotalShards();
  2. int successfulShards = searchResponse.getSuccessfulShards();
  3. int failedShards = searchResponse.getFailedShards();
  4. for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
  5. // failures should be handled here
  6. }

检索 SearchHits
要访问返回的文档,首先要在响应中获取其中的 SearchHits

  1. SearchHits hits = searchResponse.getHits();

SearchHits 中包含了所有命中的全局信息,如查询命中的数量或者最大分值:

  1. long totalHits = hits.getTotalHits();
  2. float maxScore = hits.getMaxScore();

查询的结果嵌套在 SearchHits 中,可以通过遍历循环获取

  1. SearchHit[] searchHits = hits.getHits();
  2. for (SearchHit hit : searchHits) {
  3. // do something with the SearchHit
  4. }
  1. SearchHit 提供了如 index type docId 和每个命中查询的分数
  1. String index = hit.getIndex();
  2. String type = hit.getType();
  3. String id = hit.getId();
  4. float score = hit.getScore();

而且,还可以获取到文档的源数据,以 JSON-String 形式或者 key-value map 对的形式。在 map 中,字段可以是普通类型,或者是列表类型,嵌套对象。

  1. String sourceAsString = hit.getSourceAsString();
  2. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  3. String documentTitle = (String) sourceAsMap.get("title");
  4. List<Object> users = (List<Object>) sourceAsMap.get("user");
  5. Map<String, Object> innerObject =
  6. (Map<String, Object>) sourceAsMap.get("innerObject");

Search API 查询关系
上面的 QueryBuilder , SearchSourceBuilder 和 SearchRequest 之间都是嵌套关系, 可以参考下图:

 

 

8、全文查询 Full Text Queries

什么是全文查询?
像使用 match 或者 query_string 这样的高层查询都属于全文查询,

查询 日期(date) 或整数(integer) 字段,会将查询字符串分别作为日期或整数对待。
查询一个( not_analyzed )未分析的精确值字符串字段,会将整个查询字符串作为单个词项对待。
查询一个( analyzed )已分析的全文字段,会先将查询字符串传递到一个合适的分析器,然后生成一个供查询的词项列表
组成了词项列表,后面就会对每个词项逐一执行底层查询,将查询结果合并,并且为每个文档生成最终的相关度评分。

Match
match 查询的单个词的步骤是什么?
检查字段类型,查看字段是 analyzed, not_analyzed
分析查询字符串,如果只有一个单词项, match 查询在执行时就会是单个底层的 term 查询
查找匹配的文档,会在倒排索引中查找匹配文档,然后获取一组包含该项的文档
为每个文档评分
构建 Match 查询
match 查询可以接受 text/numeric/dates 格式的参数,分析,并构建一个查询。

  1. GET /_search
  2. {
  3. "query": {
  4. "match" : {
  5. "message" : "this is a test"
  6. }
  7. }
  8. }

上面的实例中 message 是一个字段名。

对应的 QueryBuilder class : MatchQueryBuilder

具体方法 : QueryBuilders.matchQuery()

全文查询 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Match MatchQueryBuilder QueryBuilders.matchQuery()
Match Phrase MatchPhraseQueryBuilder QueryBuilders.matchPhraseQuery()
Match Phrase Prefix MatchPhrasePrefixQueryBuilder QueryBuilders.matchPhrasePrefixQuery()
Multi Match MultiMatchQueryBuilder QueryBuilders.multiMatchQuery()
Common Terms CommonTermsQueryBuilder QueryBuilders.commonTermsQuery()
Query String QueryStringQueryBuilder QueryBuilders.queryStringQuery()
Simple Query String SimpleQueryStringBuilder QueryBuilders.simpleQueryStringQuery()

基于词项的查询
这种类型的查询不需要分析,它们是对单个词项操作,只是在倒排索引中查找准确的词项(精确匹配)并且使用 TF/IDF 算法为每个包含词项的文档计算相关度评分 _score。

Term
term 查询可用作精确值匹配,精确值的类型则可以是数字,时间,布尔类型,或者是那些 not_analyzed 的字符串。

对应的 QueryBuilder class 是TermQueryBuilder

具体方法是 QueryBuilders.termQuery()

Terms
terms 查询允许指定多个值进行匹配。如果这个字段包含了指定值中的任何一个值,就表示该文档满足条件。

对应的 QueryBuilder class 是 TermsQueryBuilder

具体方法是 QueryBuilders.termsQuery()

Wildcard
wildcard 通配符查询是一种底层基于词的查询,它允许指定匹配的正则表达式。而且它使用的是标准的 shell 通配符查询:

? 匹配任意字符
* 匹配 0 个或多个字符
wildcard 需要扫描倒排索引中的词列表才能找到所有匹配的词,然后依次获取每个词相关的文档 ID。

由于通配符和正则表达式只能在查询时才能完成,因此查询效率会比较低,在需要高性能的场合,应当谨慎使用。

对应的 QueryBuilder class 是 WildcardQueryBuilder

具体方法是 QueryBuilders.wildcardQuery()

基于词项 API 列表

Search QueryQueryBuilder ClassMethod in QueryBuilders
Term TermQueryBuilder QueryBuilders.termQuery()
Terms TermsQueryBuilder QueryBuilders.termsQuery()
Range RangeQueryBuilder QueryBuilders.rangeQuery()
Exists ExistsQueryBuilder QueryBuilders.existsQuery()
Prefix PrefixQueryBuilder QueryBuilders.prefixQuery()
Wildcard WildcardQueryBuilder QueryBuilders.wildcardQuery()
Regexp RegexpQueryBuilder QueryBuilders.regexpQuery()
Fuzzy FuzzyQueryBuilder QueryBuilders.fuzzyQuery()
Type TypeQueryBuilder QueryBuilders.typeQuery()
Ids IdsQueryBuilder QueryBuilders.idsQuery()

复合查询
什么是复合查询?
复合查询会将其他的复合查询或者叶查询包裹起来,以嵌套的形式展示和执行,得到的结果也是对各个子查询结果和分数的合并。可以分为下面几种:

constant_score query

经常用在使用 filter 的场合,所有匹配的文档分数都是一个不变的常量

bool query

可以将多个叶查询和组合查询再组合起来,可接受的参数如下

must : 文档必须匹配这些条件才能被包含进来
must_not 文档必须不匹配才能被包含进来
should 如果满足其中的任何语句,都会增加分数;即使不满足,也没有影响
filter 以过滤模式进行,不评分,但是必须匹配
dis_max query

叫做分离最大化查询,它会将任何与查询匹配的文档都作为结果返回,但是只是将其中最佳匹配的评分作为最终的评分返回。

function_score query

允许为每个与主查询匹配的文档应用一个函数,可用来改变甚至替换原始的评分

boosting query

用来控制(提高或降低)复合查询中子查询的权重。

Search QueryQueryBuilder ClassMethod in QueryBuilders
Constant Score ConstantScoreQueryBuilder QueryBuilders.constantScoreQuery()
Bool BoolQueryBuilder QueryBuilders.boolQuery()
Dis Max DisMaxQueryBuilder QueryBuilders.disMaxQuery()
Function Score FunctionScoreQueryBuilder QueryBuilders.functionScoreQuery()
Boosting BoostingQueryBuilder QueryBuilders.boostingQuery()

特殊查询
Wrapper Query
这里比较重要的一个是 Wrapper Query,是说可以接受任何其他 base64 编码的字符串作为子查询。

主要应用场合就是在 Rest High-Level REST client 中接受 json 字符串作为参数。比如使用 gson 等 json 库将要查询的语句拼接好,直接塞到 Wrapper Query 中查询就可以了,非常方便。

Wrapper Query 对应的 QueryBuilder class 是WrapperQueryBuilder

具体方法是 QueryBuilders.wrapperQuery()

9、关于 REST Client的完整工具类代码

  1. public class IndexBean {
  2. //index name
  3. private String index;
  4. //index type
  5. private String indexType;
  6. //index doc id
  7. private String docId;
  8. // 1 IndexRequest 2 UpdateRequest 3 DeleteRequest
  9. private String operateType;
  10. public String getOperateType() {
  11. return operateType;
  12. }
  13. public void setOperateType(String operateType) {
  14. this.operateType = operateType;
  15. }
  16. public String getIndex() {
  17. return index;
  18. }
  19. public void setIndex(String index) {
  20. this.index = index;
  21. }
  22. public String getIndexType() {
  23. return indexType;
  24. }
  25. public void setIndexType(String indexType) {
  26. this.indexType = indexType;
  27. }
  28. public String getDocId() {
  29. return docId;
  30. }
  31. public void setDocId(String docId) {
  32. this.docId = docId;
  33. }
  34. }
  1. /**
  2. * 自定义的es异常类
  3. */
  4. public class ElasticsearchException extends RuntimeException {
  5. public ElasticsearchException(String s, Exception e) {
  6. super(s, e);
  7. }
  8. public ElasticsearchException(String s){
  9. super(s);
  10. }
  11. }
  1. import org.apache.http.HttpHost;
  2. import org.elasticsearch.action.ActionListener;
  3. import org.elasticsearch.action.bulk.BulkRequest;
  4. import org.elasticsearch.action.bulk.BulkResponse;
  5. import org.elasticsearch.action.delete.DeleteRequest;
  6. import org.elasticsearch.action.delete.DeleteResponse;
  7. import org.elasticsearch.action.get.GetRequest;
  8. import org.elasticsearch.action.get.GetResponse;
  9. import org.elasticsearch.action.index.IndexRequest;
  10. import org.elasticsearch.action.index.IndexResponse;
  11. import org.elasticsearch.action.search.SearchRequest;
  12. import org.elasticsearch.action.search.SearchResponse;
  13. import org.elasticsearch.action.support.WriteRequest;
  14. import org.elasticsearch.action.update.UpdateRequest;
  15. import org.elasticsearch.action.update.UpdateResponse;
  16. import org.elasticsearch.client.RestClient;
  17. import org.elasticsearch.client.RestHighLevelClient;
  18. import org.elasticsearch.common.Strings;
  19. import org.elasticsearch.common.unit.TimeValue;
  20. import org.elasticsearch.index.VersionType;
  21. import org.elasticsearch.index.query.MatchQueryBuilder;
  22. import org.elasticsearch.index.query.TermQueryBuilder;
  23. import org.elasticsearch.search.builder.SearchSourceBuilder;
  24. import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
  25. import org.elasticsearch.search.sort.SortBuilder;
  26. import java.io.IOException;
  27. import java.util.ArrayList;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.concurrent.TimeUnit;
  31. /**
  32. * es操作
  33. *
  34. */
  35. public class ElasticSearchClient {
  36. private String[] hostsAndPorts;
  37. public ElasticSearchClient(String[] hostsAndPorts) {
  38. this.hostsAndPorts = hostsAndPorts;
  39. }
  40. public RestHighLevelClient getClient() {
  41. RestHighLevelClient client = null;
  42. List<HttpHost> httpHosts = new ArrayList<HttpHost>();
  43. if (hostsAndPorts.length > 0) {
  44. for (String hostsAndPort : hostsAndPorts) {
  45. String[] hp = hostsAndPort.split(":");
  46. httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
  47. }
  48. client = new RestHighLevelClient(
  49. RestClient.builder(httpHosts.toArray(new HttpHost[0])));
  50. } else {
  51. client = new RestHighLevelClient(
  52. RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
  53. }
  54. return client;
  55. }
  56. private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
  57. IndexRequest indexRequest = null;
  58. if (null == index || null == indexType) {
  59. throw new ElasticsearchException("index or indexType must not be null");
  60. }
  61. if (null == docId) {
  62. indexRequest = new IndexRequest(index, indexType);
  63. } else {
  64. indexRequest = new IndexRequest(index, indexType, docId);
  65. }
  66. return indexRequest;
  67. }
  68. /**
  69. * 同步执行索引
  70. *
  71. * @param index
  72. * @param indexType
  73. * @param docId
  74. * @param dataMap
  75. * @throws IOException
  76. */
  77. public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  78. return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
  79. }
  80. /**
  81. * 异步执行
  82. *
  83. * @param index
  84. * @param indexType
  85. * @param docId
  86. * @param dataMap
  87. * @param indexResponseActionListener
  88. * @throws IOException
  89. */
  90. public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
  91. getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
  92. }
  93. /**
  94. * @param index
  95. * @param indexType
  96. * @param docId
  97. * @param includes 返回需要包含的字段,可以传入空
  98. * @param excludes 返回需要不包含的字段,可以传入为空
  99. * @param excludes version
  100. * @param excludes versionType
  101. * @return
  102. * @throws IOException
  103. */
  104.  
  105. public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
  106. if (null == includes || includes.length == 0) {
  107. includes = Strings.EMPTY_ARRAY;
  108. }
  109. if (null == excludes || excludes.length == 0) {
  110. excludes = Strings.EMPTY_ARRAY;
  111. }
  112. GetRequest getRequest = new GetRequest(index, indexType, docId);
  113. FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
  114. getRequest.realtime(true);
  115. if (null != version) {
  116. getRequest.version(version);
  117. }
  118. if (null != versionType) {
  119. getRequest.versionType(versionType);
  120. }
  121. return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
  122. }
  123. /**
  124. * @param index
  125. * @param indexType
  126. * @param docId
  127. * @param includes
  128. * @param excludes
  129. * @return
  130. * @throws IOException
  131. */
  132.  
  133. public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
  134. return getRequest(index, indexType, docId, includes, excludes, null, null);
  135. }
  136. /**
  137. * @param index
  138. * @param indexType
  139. * @param docId
  140. * @return
  141. * @throws IOException
  142. */
  143. public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
  144. GetRequest getRequest = new GetRequest(index, indexType, docId);
  145. return getClient().get(getRequest);
  146. }
  147. /**
  148. * @param index
  149. * @param indexType
  150. * @param docId
  151. * @return
  152. * @throws IOException
  153. */
  154. public Boolean existDoc(String index, String indexType, String docId) throws IOException {
  155. GetRequest getRequest = new GetRequest(index, indexType, docId);
  156. getRequest.fetchSourceContext(new FetchSourceContext(false));
  157. getRequest.storedFields("_none_");
  158. return getClient().exists(getRequest);
  159. }
  160. /**
  161. * @param index
  162. * @param indexType
  163. * @param docId
  164. * @param timeValue
  165. * @param refreshPolicy
  166. * @param version
  167. * @param versionType
  168. * @return
  169. * @throws IOException
  170. */
  171. public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
  172. DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
  173. if (null != timeValue) {
  174. deleteRequest.timeout(timeValue);
  175. }
  176. if (null != refreshPolicy) {
  177. deleteRequest.setRefreshPolicy(refreshPolicy);
  178. }
  179. if (null != version) {
  180. deleteRequest.version(version);
  181. }
  182. if (null != versionType) {
  183. deleteRequest.versionType(versionType);
  184. }
  185. return getClient().delete(deleteRequest);
  186. }
  187. /**
  188. * @param index
  189. * @param indexType
  190. * @param docId
  191. * @return
  192. * @throws IOException
  193. */
  194. public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
  195. return deleteDoc(index, indexType, docId, null, null, null, null);
  196. }
  197. /**
  198. * @param index
  199. * @param indexType
  200. * @param docId
  201. * @param dataMap
  202. * @param timeValue
  203. * @param refreshPolicy
  204. * @param version
  205. * @param versionType
  206. * @param docAsUpsert
  207. * @param includes
  208. * @param excludes
  209. * @return
  210. * @throws IOException
  211. */
  212. public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
  213. UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
  214. updateRequest.doc(dataMap);
  215. if (null != timeValue) {
  216. updateRequest.timeout(timeValue);
  217. }
  218. if (null != refreshPolicy) {
  219. updateRequest.setRefreshPolicy(refreshPolicy);
  220. }
  221. if (null != version) {
  222. updateRequest.version(version);
  223. }
  224. if (null != versionType) {
  225. updateRequest.versionType(versionType);
  226. }
  227. updateRequest.docAsUpsert(docAsUpsert);
  228. //冲突时重试的次数
  229. updateRequest.retryOnConflict(3);
  230. if (null == includes && null == excludes) {
  231. return getClient().update(updateRequest);
  232. } else {
  233. if (null == includes || includes.length == 0) {
  234. includes = Strings.EMPTY_ARRAY;
  235. }
  236. if (null == excludes || excludes.length == 0) {
  237. excludes = Strings.EMPTY_ARRAY;
  238. }
  239. return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
  240. }
  241. }
  242. /**
  243. * 更新时不存在就插入
  244. *
  245. * @param index
  246. * @param indexType
  247. * @param docId
  248. * @param dataMap
  249. * @return
  250. * @throws IOException
  251. */
  252. public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  253. return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
  254. }
  255. /**
  256. * 存在才更新
  257. *
  258. * @param index
  259. * @param indexType
  260. * @param docId
  261. * @param dataMap
  262. * @return
  263. * @throws IOException
  264. */
  265. public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
  266. return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
  267. }
  268. /**
  269. * 批量操作
  270. *
  271. * @param indexBeanList
  272. * @param timeValue
  273. * @param refreshPolicy
  274. * @return
  275. * @throws IOException
  276. */
  277. public BulkResponse bulkRequest(List<IndexBean> indexBeanList, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy) throws IOException {
  278. BulkRequest bulkRequest = getBulkRequest(indexBeanList);
  279. if (null != timeValue) {
  280. bulkRequest.timeout(timeValue);
  281. }
  282. if (null != refreshPolicy) {
  283. bulkRequest.setRefreshPolicy(refreshPolicy);
  284. }
  285. return getClient().bulk(bulkRequest);
  286. }
  287. private BulkRequest getBulkRequest(List<IndexBean> indexBeanList) {
  288. BulkRequest bulkRequest = new BulkRequest();
  289. indexBeanList.forEach(indexBean -> {
  290. if ("1".equals(indexBean.getOperateType())) {
  291. bulkRequest.add(null != indexBean.getDocId() ? new IndexRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()) : new IndexRequest(indexBean.getIndex(), indexBean.getIndexType()));
  292. } else if ("2".equals(indexBean.getOperateType())) {
  293. if ((null != indexBean.getDocId())) {
  294. throw new ElasticsearchException("update action docId must not be null");
  295. }
  296. bulkRequest.add(new UpdateRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
  297. } else if ("3".equals(indexBean.getOperateType())) {
  298. if ((null != indexBean.getDocId())) {
  299. throw new ElasticsearchException("delete action docId must not be null");
  300. }
  301. bulkRequest.add(new DeleteRequest(indexBean.getIndex(), indexBean.getIndexType(), indexBean.getDocId()));
  302. } else {
  303. throw new ElasticsearchException("OperateType" + indexBean.getOperateType() + "is not support");
  304. }
  305. });
  306. return bulkRequest;
  307. }
  308. /**
  309. * 批量操作
  310. *
  311. * @param indexBeanList
  312. * @return
  313. */
  314. public BulkResponse bulkRequest(List<IndexBean> indexBeanList) throws IOException {
  315. return bulkRequest(indexBeanList, null, null);
  316. }
  317. /**
  318. * 批量异步操作
  319. *
  320. * @param indexBeanList
  321. * @param bulkResponseActionListener
  322. */
  323.  
  324. public void AsyncBulkRequest(List<IndexBean> indexBeanList, ActionListener<BulkResponse> bulkResponseActionListener) {
  325. getClient().bulkAsync(getBulkRequest(indexBeanList), bulkResponseActionListener);
  326. }
  327. private SearchRequest getSearchRequest(String index, String indexType) {
  328. SearchRequest searchRequest;
  329. if (null == index) {
  330. throw new ElasticsearchException("index name must not be null");
  331. }
  332. if (null != indexType) {
  333. searchRequest = new SearchRequest(index, indexType);
  334. } else {
  335. searchRequest = new SearchRequest(index);
  336. }
  337. return searchRequest;
  338. }
  339. /**
  340. * @param index
  341. * @param indexType
  342. * @return
  343. * @throws IOException
  344. */
  345. public SearchResponse searchRequest(String index, String indexType) throws IOException {
  346. return getClient().search(getSearchRequest(index, indexType));
  347. }
  348. /**
  349. * @param index
  350. * @param indexType
  351. * @param from
  352. * @param size
  353. * @param termQueryBuilder
  354. * @return
  355. * @throws IOException
  356. */
  357. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder) throws IOException {
  358. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null)));
  359. }
  360. private SearchSourceBuilder getSearchSourceBuilder(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, String sortField, SortBuilder sortBuilder, Boolean fetchSource) {
  361. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  362. if (null != termQueryBuilder) {
  363. searchSourceBuilder.query(termQueryBuilder);
  364. }
  365. searchSourceBuilder.from(from);
  366. searchSourceBuilder.size(size);
  367. if (null != sortField) {
  368. searchSourceBuilder.sort(sortField);
  369. }
  370. if (null != sortBuilder) {
  371. searchSourceBuilder.sort(sortBuilder);
  372. }
  373. //设置超时时间
  374. searchSourceBuilder.timeout(new TimeValue(120, TimeUnit.SECONDS));
  375. if (null != fetchSource) {
  376. searchSourceBuilder.fetchSource(fetchSource);
  377. }
  378. return searchSourceBuilder;
  379. }
  380. /**
  381. * @param index
  382. * @param indexType
  383. * @param from
  384. * @param size
  385. * @param termQueryBuilder
  386. * @param matchQueryBuilder
  387. * @return
  388. * @throws IOException
  389. */
  390. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder) throws IOException {
  391. if (null == matchQueryBuilder) {
  392. throw new ElasticsearchException("matchQueryBuilder is null");
  393. }
  394. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, null, null).query(matchQueryBuilder)));
  395. }
  396. /**
  397. * @param index
  398. * @param indexType
  399. * @param from
  400. * @param size
  401. * @param matchQueryBuilder
  402. * @return
  403. * @throws IOException
  404. */
  405. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder) throws IOException {
  406. if (null == matchQueryBuilder) {
  407. throw new ElasticsearchException("matchQueryBuilder is null");
  408. }
  409. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, null, null).query(matchQueryBuilder)));
  410. }
  411. /**
  412. * @param index
  413. * @param indexType
  414. * @param from
  415. * @param size
  416. * @param matchQueryBuilder
  417. * @param sortField
  418. * @return
  419. * @throws IOException
  420. */
  421. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException {
  422. if (null == matchQueryBuilder) {
  423. throw new ElasticsearchException("matchQueryBuilder is null");
  424. }
  425. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, null).query(matchQueryBuilder)));
  426. }
  427. /**
  428. * @param index
  429. * @param indexType
  430. * @param from
  431. * @param size
  432. * @param matchQueryBuilder
  433. * @param sortField
  434. * @param fetchSource
  435. * @return
  436. * @throws IOException
  437. */
  438. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException {
  439. if (null == matchQueryBuilder) {
  440. throw new ElasticsearchException("matchQueryBuilder is null");
  441. }
  442. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, sortField, null, fetchSource).query(matchQueryBuilder)));
  443. }
  444. /**
  445. * @param index
  446. * @param indexType
  447. * @param from
  448. * @param size
  449. * @param matchQueryBuilder
  450. * @param sortBuilder
  451. * @return
  452. * @throws IOException
  453. */
  454. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException {
  455. if (null == matchQueryBuilder) {
  456. throw new ElasticsearchException("matchQueryBuilder is null");
  457. }
  458. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, null, null, sortBuilder, null).query(matchQueryBuilder)));
  459. }
  460. /**
  461. * 支持排序
  462. *
  463. * @param index
  464. * @param indexType
  465. * @param from
  466. * @param size
  467. * @param termQueryBuilder
  468. * @param matchQueryBuilder
  469. * @param sortField
  470. * @return
  471. * @throws IOException
  472. */
  473. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField) throws IOException {
  474. if (null == matchQueryBuilder) {
  475. throw new ElasticsearchException("matchQueryBuilder is null");
  476. }
  477. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)));
  478. }
  479. /**
  480. * @param index
  481. * @param indexType
  482. * @param from
  483. * @param size
  484. * @param termQueryBuilder
  485. * @param matchQueryBuilder
  486. * @param sortBuilder
  487. * @param fetchSource 开关
  488. * @return
  489. * @throws IOException
  490. */
  491. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder, Boolean fetchSource) throws IOException {
  492. if (null == matchQueryBuilder) {
  493. throw new ElasticsearchException("matchQueryBuilder is null");
  494. }
  495. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, fetchSource).query(matchQueryBuilder)));
  496. }
  497. /**
  498. * @param index
  499. * @param indexType
  500. * @param from
  501. * @param size
  502. * @param termQueryBuilder
  503. * @param matchQueryBuilder
  504. * @param sortBuilder
  505. * @return
  506. * @throws IOException
  507. */
  508. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder) throws IOException {
  509. if (null == matchQueryBuilder) {
  510. throw new ElasticsearchException("matchQueryBuilder is null");
  511. }
  512. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)));
  513. }
  514. /**
  515. * @param index
  516. * @param indexType
  517. * @param from
  518. * @param size
  519. * @param termQueryBuilder
  520. * @param matchQueryBuilder
  521. * @param sortField
  522. * @param fetchSource
  523. * @return
  524. * @throws IOException
  525. */
  526. public SearchResponse searchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField, Boolean fetchSource) throws IOException {
  527. if (null == matchQueryBuilder) {
  528. throw new ElasticsearchException("matchQueryBuilder is null");
  529. }
  530. return getClient().search(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, fetchSource).query(matchQueryBuilder)));
  531. }
  532. /**
  533. * 异步操作
  534. * @param index
  535. * @param indexType
  536. * @param from
  537. * @param size
  538. * @param termQueryBuilder
  539. * @param matchQueryBuilder
  540. * @param sortBuilder
  541. * @param listener
  542. * @throws IOException
  543. */
  544.  
  545. public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, SortBuilder sortBuilder,ActionListener<SearchResponse> listener) throws IOException {
  546. if (null == matchQueryBuilder) {
  547. throw new ElasticsearchException("matchQueryBuilder is null");
  548. }
  549. getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, null, sortBuilder, null).query(matchQueryBuilder)),listener);
  550. }
  551. /**
  552. * 异步操作
  553. * @param index
  554. * @param indexType
  555. * @param from
  556. * @param size
  557. * @param termQueryBuilder
  558. * @param matchQueryBuilder
  559. * @param sortField
  560. * @param listener
  561. * @throws IOException
  562. */
  563. public void asyncSearchRequest(String index, String indexType, Integer from, Integer size, TermQueryBuilder termQueryBuilder, MatchQueryBuilder matchQueryBuilder, String sortField,ActionListener<SearchResponse> listener) throws IOException {
  564. if (null == matchQueryBuilder) {
  565. throw new ElasticsearchException("matchQueryBuilder is null");
  566. }
  567. getClient().searchAsync(getSearchRequest(index, indexType).source(getSearchSourceBuilder(index, indexType, from, size, termQueryBuilder, sortField, null, null).query(matchQueryBuilder)),listener);
  568. }
  569. }

二、transportClient API

未完待续,近期继续整理

 

原文链接:http://www.cnblogs.com/laoqing/p/11693144.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号