文章目录
- 一. ES8 的Java API 环境准备
- 二. ES8 的Java API 索引操作
- 三. ES8 的Java API 文档操作
- 1. 文档的 插入 批量插入 删除等操作
- 2. 文档的查询
- 四、异步客户端操作
一. ES8 的Java API 环境准备
ES8 废除了Type的概念。为了适应这种数据结构的改变,ES官方从1.7版本开始建议使用新的Elasticsearch Java Client。
搭建maven环境:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<elastic.version>8.6.2</elastic.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>8.6.2</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elastic.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
获取客户端对象:
因为,服务注册的是基于https的安全elasticsearch服务认证,所以,将之前的证书进行一个转换:
openssl pkcs12 -in elastic-stack-ca.p12 -clcerts -nokeys -out es-api-ca.crt
将生成的证书放到项目里面。
创建连接对象:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
public class ESClient {
public static void main(String[] args) throws Exception{
initESConnection();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
transport.close();
}
}
二. ES8 的Java API 索引操作
ES Java的API相关操作:
- 采用构造器形式来创建所需要的对象。
- 通过lambda来创建。
对象形式操作:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
public class ESClient {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
operationIndex();
}
private static void operationIndex() throws Exception{
ElasticsearchIndicesClient indices = client.indices();
ExistsRequest existsRequest = new ExistsRequest.Builder().index(INDEX_HOLMES).build();
final boolean flg = indices.exists(existsRequest).value();
if (flg){
System.out.println("索引" + INDEX_HOLMES + "已经存在!");
} else {
CreateIndexRequest request = new CreateIndexRequest.Builder()
.index(INDEX_HOLMES)
.build();
final CreateIndexResponse createIndexResponse = indices.create(request);
System.out.println("创建索引的响应对象" + createIndexResponse);
}
GetIndexRequest getIndexRequest = new GetIndexRequest.Builder().index(INDEX_HOLMES).build();
final GetIndexResponse getIndexResponse = indices.get(getIndexRequest);
System.out.println("查询的响应结果:" + getIndexResponse);
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(INDEX_HOLMES).build();
DeleteIndexResponse delete = indices.delete(deleteIndexRequest);
System.out.println("索引删除成功:" + delete);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
lambda方式创建索引:(推荐使用,代码简洁)
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.util.ObjectBuilder;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.function.Function;
public class ESClient {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
operationIndexLambda();
}
private static void operationIndexLambda() throws Exception{
ElasticsearchIndicesClient indices = client.indices();
boolean flg = indices.exists(req -> req.index(INDEX_HOLMES)).value();
if (flg){
System.out.println("索引" + INDEX_HOLMES + "已经存在!");
} else {
final CreateIndexResponse createIndexResponse = indices.create(req -> req.index(INDEX_HOLMES));
System.out.println("创建索引的响应对象:" + createIndexResponse);
}
final GetIndexResponse getIndexResponse = indices.get(req -> req.index(INDEX_HOLMES));
System.out.println("查询的响应结果:" + getIndexResponse.get("itholmes"));
DeleteIndexResponse deleteIndexResponse = indices.delete(req -> req.index(INDEX_HOLMES));
System.out.println("索引删除成功:" + deleteIndexResponse);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
三. ES8 的Java API 文档操作
1. 文档的 插入 批量插入 删除等操作
文档对象形式操作:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.ArrayList;
import java.util.List;
public class ESClient2 {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
operateDocument();
}
public static void operateDocument() throws Exception{
User user = new User();
user.setId(1001);
user.setName("zhangsan");
user.setAge(30);
CreateRequest<User> createRequest = new CreateRequest.Builder<User>()
.index(INDEX_HOLMES)
.id("1001")
.document(user)
.build();
CreateResponse createResponse = client.create(createRequest);
System.out.println("文档创建的响应对象:" + createResponse);
List<BulkOperation> opts = new ArrayList<>();
for (int i = 0; i < 5; i++) {
CreateOperation<User> optObj = new CreateOperation.Builder<User>()
.index(INDEX_HOLMES)
.id("200" + i)
.document(new User(2000 + i,"张三" + i,30 + i))
.build();
BulkOperation opt = new BulkOperation.Builder()
.create(optObj)
.build();
opts.add(opt);
}
BulkRequest bulkRequest = new BulkRequest.Builder()
.operations(opts)
.build();
final BulkResponse bulk = client.bulk(bulkRequest);
System.out.println("批量新增数据的响应:" + bulk);
DeleteRequest deleteRequest = new DeleteRequest.Builder()
.index(INDEX_HOLMES)
.id("2001")
.build();
DeleteResponse delete = client.delete(deleteRequest);
System.out.println("删除后的响应:" + delete);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
lambda形式:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.ArrayList;
import java.util.List;
public class ESClient2 {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
operateDocumentLambda();
}
public static void operateDocumentLambda() throws Exception{
Result result = client.create(
req ->
req.index(INDEX_HOLMES).id("1001").document(new User(1001, "张三", 30))
).result();
System.out.println("文档创建的响应对象:" + result);
ArrayList<User> users = new ArrayList<>();
for (int i = 0; i < 5; i++) {
users.add(new User(3000 + i , "lisi" + i ,30 + i));
}
BulkResponse bulk = client.bulk(
req -> {
users.forEach(
u -> {
req.operations(
b -> b.create(
d -> d.index(INDEX_HOLMES)
.id(u.getId().toString())
.document(u)
)
);
}
);
return req;
}
);
System.out.println("批量新增数据的响应:" + bulk);
DeleteResponse delete = client.delete(
req -> req.index(INDEX_HOLMES).id("3001")
);
System.out.println("删除后的响应:" + delete);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
2. 文档的查询
对象形式操作:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.ArrayList;
import java.util.List;
public class ESClient3 {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
queryDocument();
}
public static void queryDocument() throws Exception{
MatchQuery matchQuery = new MatchQuery.Builder()
.field("age").query(30)
.build();
Query query = new Query.Builder()
.match(matchQuery)
.build();
SearchRequest searchRequest = new SearchRequest.Builder()
.query(query)
.build();
SearchResponse<Object> search = client.search(searchRequest, Object.class);
System.out.println(search);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
lambda操作:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.query_dsl.MatchQuery;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.CreateOperation;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.ArrayList;
import java.util.List;
public class ESClient3 {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
queryDocumentLambda();
}
public static void queryDocumentLambda() throws Exception{
SearchResponse<Object> search = client.search(
req -> {
req.query(
q -> q.match(
m -> m.field("name").query("zhangsan")
)
);
return req;
},
Object.class
);
System.out.println(search);
transport.close();
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
四、异步客户端操作
进行异步相关操作:
package com.itholmes.elasticsearch.api;
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
public class ESClient3 {
private static ElasticsearchClient client;
private static ElasticsearchAsyncClient asyncClient;
private static ElasticsearchTransport transport;
public static final String INDEX_HOLMES = "itholmes";
public static void main(String[] args) throws Exception{
initESConnection();
asyncClientOperation();
}
public static void asyncClientOperation() throws Exception{
asyncClient.indices().create(
req -> req.index("newindex")
).thenApply(
resp -> resp.acknowledged()
).whenComplete(
(resp,error) -> {
System.out.println("回调方法");
if (resp != null){
System.out.println(resp);
} else {
error.printStackTrace();
}
}
);
System.out.println("主线程代码...");
}
public static void initESConnection() throws Exception{
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic","3j=JYpywv=jMtQB+XIXS"));
Path caCertificatePath = Paths.get("E:\\itholmes\\demo\\itholmes-es8\\certs\\es-api-ca.crt");
CertificateFactory factory = CertificateFactory.getInstance("x.509");
Certificate trustedCa;
try (InputStream is = Files.newInputStream(caCertificatePath)){
trustedCa = factory.generateCertificate(is);
}
KeyStore trustStore = KeyStore.getInstance("pkcs12");
trustStore.load(null,null);
trustStore.setCertificateEntry("ca",trustedCa);
SSLContextBuilder sslContextBuilder = SSLContexts.custom()
.loadTrustMaterial(trustStore,null);
SSLContext sslContext = sslContextBuilder.build();
RestClientBuilder builder = RestClient.builder(new HttpHost("192.168.43.133", 9200, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext).setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestClient restClient = builder.build();
transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
client = new ElasticsearchClient(transport);
asyncClient = new ElasticsearchAsyncClient(transport);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)