ES Bulk API
在操作ES的过程中,有很多的情况下是需要同时往ES插入很多数据,这个时候有两种方式
第一种是通过ES的单个操作数据接口循环操作数据,这种操作方式会当请求次数多的时候,会有很明显的缺点,请求次数多,容易影响性能;单次插入数据的有效负载低(一个http请求的body携带的实际有效内容),因此在数据规模比较大的时候,更倾向于使用第二种方式
第二种是通过ES的bulk接口,顾名思义,这个接口是用于批量操作数据的,这个接口本身支持同时使用多种类型的action(插入、删除、更改等),然后一次提交批量的操作给ES,从而提高网络的有效负载,也一定程度上降低ES的请求数量
Bulk API
ES的bulk操作是通过_bulk
这个endpoint来实现的,方式是POST
,一次bulk操作可以同时进行不同的操作,语法如下:
1 2 3 4 5 6 7 8
| POST /INDEX/_bulk { "index" : { "_index" : "test", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_id" : "2" } } { "create" : { "_index" : "test", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
|
每个操作本身是通过JSON来描述的,操作之间通过\n
进行分隔,其中的create
、index
两个操作均包含一个body,这个body本身也是一个JSON,并且通过\n
分隔,紧跟在操作描述之后,如果index操作指定了_index
,则POST的时候,可以不用指定INDEX
注:如果是7.0以下的版本还需要指定_type:XX
index示例:
1 2 3 4 5
| POST /test-20201228/_bulk {"index": {"_id": "123", "_type" : "_doc"}} {"name": "xaiver", "age": 25} {"index": {"_id": "124", "_type": "_doc"}} {"name": "sharon", "age": 24}
|
其他的根据语法进行操作即可
Bulk Java API
Java 的bulk本质上是对_bulk
的封装而已,直接参考下官方的文档即可,简单的示例如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class BulkTest { public static void main(String[] args) { RestHighLevelClient restClient = EsClient.getRestClient();
BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new IndexRequest("test-20201228", "_doc") .source(XContentType.JSON, "name", "jack", "age", 35)); bulkRequest.add(new IndexRequest("test-20201228", "_doc") .source(XContentType.JSON, "name", "tony", "age", 50));
try { BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); if (bulkResponse.hasFailures()) { for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { System.out.println("error...."); } } } } catch (IOException e) { e.printStackTrace(); } } }
|
BulkProcessor
在普通的场景中,通过bulk
或其对应封装的API来实现即可,然而,在一些特定场景中,单纯使用bulk
却无法实现,比如说,我们想文档数量达到某一个阈值,或者文档大小达到某一个阈值就提交,避免某一个批次的数据量太大,造成ES处理缓慢,或者当没有达到上面的情况时,根据某个固定的频率提交,避免长时间没有数据提交从而造成活锁的情况
BulkProcessor使用
根据上面的需求,我们是可以轻松通过实现的,不过,这次这个轮子ES的Java客户端已经造好了,使用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| public static void main(String[] args) { RestHighLevelClient restClient = EsClient.getRestClient(); BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("executionId: " + executionId); System.out.println("before bulk, bulk size: " + request.numberOfActions()); }
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("after bulk, executionId: " + executionId); List<DocWriteRequest<?>> totalRequest = request.requests();
List<DocWriteRequest<?>> retryList = new ArrayList<>(); for (BulkItemResponse bulkItem : response.getItems()) { if (!bulkItem.isFailed()) { continue; }
RestStatus failStatus = bulkItem.getFailure().getStatus(); switch (failStatus) { case TOO_MANY_REQUESTS: case SERVICE_UNAVAILABLE: retryList.add(totalRequest.get(bulkItem.getItemId())); break; default: break; } } }
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("失败:" + failure.getMessage()); } };
BulkProcessor bulkProcessor = BulkProcessor .builder(((req, bulkResponseActionListener) -> restClient.bulkAsync(req, RequestOptions.DEFAULT, bulkResponseActionListener)), listener) .setBulkActions(1000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(10)) .setConcurrentRequests(2) .build();
}
public void addEvent(BulkProcessor bulkProcessor, IndexRequest request) { bulkProcessor.add(request); }
|
可以看到,通过官方提供的API,我们只需要配置一个BulkProcessor.Listener
,用于在提交前后获得通知,对应的重试逻辑,以及配置好触发提交的阈值即可,剩下的就只需要添加数据就行,当达到阈值的时候,会自动触发提交
BulkProcessor实现
大致了解了其使用方式后,接下来看下其实现逻辑,做到知其然知其所以然
BulkProcessor.Listener
接口就不分析了,其实就是前面提到的三个回调方法而已
Builder
BulkProcessor通过Builder模式来提供更加便捷的参数配置模式
BulkProcessor暴露了两个用于创建builder的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static Builder builder(Client client, Listener listener) { Objects.requireNonNull(client, "client"); Objects.requireNonNull(listener, "listener"); return new Builder(client::bulk, listener, client.threadPool(), () -> {}); }
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); return new Builder(consumer, listener, buildScheduler(scheduledThreadPoolExecutor), () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS)); }
|
Builder有好几个参数前面已经看过了,接下来完整得看下其他的参数信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public static class Builder {
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer; private final Listener listener; private final Scheduler scheduler; private final Runnable onClose; private int concurrentRequests = 1; private int bulkActions = 1000; private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); private TimeValue flushInterval = null; private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(); private String globalIndex; private String globalType; private String globalRouting; private String globalPipeline;
private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, Scheduler scheduler, Runnable onClose) { this.consumer = consumer; this.listener = listener; this.scheduler = scheduler; this.onClose = onClose; }
public BulkProcessor build() { return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults()); }
private Supplier<BulkRequest> createBulkRequestWithGlobalDefaults() { return () -> new BulkRequest(globalIndex, globalType) .pipeline(globalPipeline) .routing(globalRouting); } }
|
BulkProcessor初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, Scheduler scheduler, Runnable onClose, Supplier<BulkRequest> bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); this.scheduler = scheduler; this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestSupplier = bulkRequestSupplier; this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); this.onClose = onClose; }
|
添加数据
BulkProcessor提供了很多个重载的add方法,不过最终都会调用internalAdd
这个方法,重点看下这个方法即可
1 2 3 4 5 6 7 8
| private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); }
|
executeIfNeeded
1 2 3 4 5 6 7
| private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { return; } execute(); }
|
isOverTheLimit
1 2 3 4 5 6 7 8 9 10 11
| private boolean isOverTheLimit() { if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) { return true; } if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) { return true; } return false; }
|
execute
1 2 3 4 5 6 7 8 9
| private void execute() { final BulkRequest bulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestHandler.execute(bulkRequest, executionId); }
|
提交数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } }
@Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { } finally { if (bulkRequestSetupSuccessful == false) { toRelease.run(); } } }
|
Retry#RetryHandler#execute
1 2 3 4 5
| public void execute(BulkRequest bulkRequest) { this.currentBulkRequest = bulkRequest; consumer.accept(bulkRequest, this); }
|
这里看起来只有短短两行代码,但是却很巧妙,经过层层包装之后,最终其实是使用外部传递进来的处理器,这样子的好处在于,调用方可以根据实际的情况来提交数据,而具体的其余的阈值判断等操作则是由BulkProcessor来执行,函数式编程的一个很巧妙的方式
到这里的话,我们就已经知道了,当达到数量阈值或者大小阈值的时候,触发的提交处理逻辑,其实是采用了延迟触发的方式,只有在添加数据的时候,才去检查是否满足阈值条件。当然,如果单纯通过这种方式来触发提交,就会出现有数据,但是不满足阈值,从而导致要很长的一段时间之内数据才会触发提交甚至不触发提交的情况发生,此时就需要通过另外的阈值–时间,来保障最低频次的提交
定时提交
前面在看BulkProcessor的代码的时候,我们提到了这一行this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
,这个就是定时提交的核心了
startFlushTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) { if (flushInterval == null) { return new Scheduler.Cancellable() { @Override public boolean cancel() { return false; }
@Override public boolean isCancelled() { return true; } }; } final Runnable flushRunnable = scheduler.preserveContext(new Flush()); return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC); }
|
Flush
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| class Flush implements Runnable {
@Override public void run() { synchronized (BulkProcessor.this) { if (closed) { return; } if (bulkRequest.numberOfActions() == 0) { return; } execute(); } } }
|
到了这里,关于BulkProcessor的魔力我们就清楚了,通过设置数量阈值和大小阈值,当add时候会检查是否已经满足提交条件,如果满足,则会触发提交,同时,固定间隔时间之内也会触发提交,从而保障至少在指定的时间间隔之内,数据会被提交一次,当然,两个任务之间可能会同时提交,因此通过synchronized
来进行同步保障