Allow scheduling of CF completion#275
Conversation
| private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures) { | ||
| stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, keyContexts)); | ||
|
|
||
| BatchLoaderEnvironment environment = mkBatchLoaderEnv(keys, keyContexts); |
There was a problem hiding this comment.
We now create the BatchLoaderEnvironment earlier and send it down into the other methods earlier
| Try<V> tryValue = (Try<V>) value; | ||
| if (tryValue.isSuccess()) { | ||
| future.complete(tryValue.get()); | ||
| } else { |
There was a problem hiding this comment.
The above is now run in the runnable and hence can be scheduled onto some other thread
There was a problem hiding this comment.
This moved the CF completions to another thread but the for loop still invokes them sequentially which is a problem if there are many keys in the batch. Could we schedule the completion for each key in a separate thread instead?
There was a problem hiding this comment.
So are you saying we pass in
public CompleableFuture<Void> completeThese(List<CompletableFuture> cfs, List<Object> values)
And the the caller could sequentially run them as today OR choose to run them in parralel themselves say?
| if (value instanceof Throwable) { | ||
| stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); | ||
| future.completeExceptionally(tryValue.getThrowable()); | ||
| future.completeExceptionally((Throwable) value); |
There was a problem hiding this comment.
diff is whacky. This has not changed
| Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); | ||
| return BatchLoaderEnvironment.newBatchLoaderEnvironment() | ||
| .context(context).keyContexts(keys, keyContexts).build(); | ||
| } |
There was a problem hiding this comment.
Called in two places - common method
| import static org.hamcrest.Matchers.instanceOf; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
|
|
||
| class CompletableFutureKitTest { |
| return future; | ||
| } | ||
|
|
||
| public static <V> CompletableFuture<V> success(V v) { |
There was a problem hiding this comment.
Isn't this the same as CompletableFuture.completedFuture(v)?
There was a problem hiding this comment.
Yes - but here for "completeness" ;)
| Try<V> tryValue = (Try<V>) value; | ||
| if (tryValue.isSuccess()) { | ||
| future.complete(tryValue.get()); | ||
| } else { |
There was a problem hiding this comment.
This moved the CF completions to another thread but the for loop still invokes them sequentially which is a problem if there are many keys in the batch. Could we schedule the completion for each key in a separate thread instead?
The completion of CF promises can now be scheduled on another thread
A step in the batch loading process is the completion of the futures that have been promised earlier via
Dataloader.load(). This can also be scheduled on another thread if need be, allowing the dispatch callto return quicker.
You might want to schedule the completion of values on another thread if there is following work t do such as :
In the above example the
.doSomethingSlow(v)call will happen inside the completionof the
cfStage1code path sinceCompletableFutureby design eagerly runs dependent chained methods.Perhaps you want this completion to happen more asynchronously so that the
.dispatch()returns morequickly and is not bound to the
.doSomethingSlow(v)call.By default, the dispatch completion is done on the current thread in a synchronous manner, which will include
any extra
CompletableFuturechained methods.This is an example of running the completion step in an asynchronous manner :