From 3075fcc796e71b67f95d04433c9065cda07c6bc4 Mon Sep 17 00:00:00 2001 From: bbaker Date: Sun, 17 May 2026 12:47:31 +1000 Subject: [PATCH 1/3] Allow scheduling of CF completion --- README.md | 34 ++++++ .../java/org/dataloader/DataLoaderHelper.java | 108 ++++++++++-------- .../dataloader/impl/CompletableFutureKit.java | 25 ++++ .../scheduler/BatchLoaderScheduler.java | 42 ++++++- .../impl/CompletableFutureKitTest.java | 102 +++++++++++++++++ .../scheduler/BatchLoaderSchedulerTest.java | 71 ++++++++++++ 6 files changed, 336 insertions(+), 46 deletions(-) create mode 100644 src/test/java/org/dataloader/impl/CompletableFutureKitTest.java diff --git a/README.md b/README.md index a53e7668..1925be36 100644 --- a/README.md +++ b/README.md @@ -633,6 +633,40 @@ Do not assume that a single call to `dispatch()` results in a single call to `Ba This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling) +### Scheduling DataLoader promise completion + +Another step in the batch loading process is the completion of the futures that have been promised earlier via +`Dataloader.load()`. This can also be scheduled on another thread if need be, allowing the dispatch call +to return quicker. + +You might want to schedule the completion of values on another thread if there is following work t do such as : + +```java +var cfStage1 = dataLoader.load(key); +var cfStage2 = cfStage1.thenApply(v -> doSomethingSlow(v)); +//... +var dispatchCF = dataLoader.dispatch(); +``` + +In the above example the `.doSomethingSlow(v)` call will happen inside the completion +of the `cfStage1` code path since `CompletableFuture` by design eagerly runs dependent chained methods. + +Perhaps you want this completion to happen more asynchronously so that the `.dispatch()` returns more +quickly and is not bound to the `.doSomethingSlow(v)` call. + +By default, the dispatch completion is done on the current thread in a synchronous manner, which will include +any extra `CompletableFuture` chained methods. + +This is an example of running the completion step in an asynchronous manner : + +```java + +@Override +public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.runAsync(completeValuesRunnable); +} +``` + ## Scheduled Registry Dispatching `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index feb6184b..e1c77c00 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -19,7 +19,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -32,7 +31,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.dataloader.impl.Assertions.assertState; @@ -313,54 +311,56 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< } // // now reassemble all the futures into one that is the complete set of results - return allOf(allBatches.toArray(new CompletableFuture[0])) - .thenApply(v -> allBatches.stream() - .map(CompletableFuture::join) - .flatMap(Collection::stream) - .collect(toList())); + return CompletableFutureKit.allOfFlatMap(allBatches); } @SuppressWarnings("unchecked") - private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { - stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts)); - CompletableFuture> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled()); + private CompletableFuture> dispatchQueueBatch(List keys, List keyContexts, List> queuedFutures) { + stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, keyContexts)); + + BatchLoaderEnvironment environment = mkBatchLoaderEnv(keys, keyContexts); + + CompletableFuture> batchLoad = invokeLoader(environment, keys, keyContexts, queuedFutures, loaderOptions.cachingEnabled()); return batchLoad - .thenApply(values -> { + .thenCompose(values -> { assertResultSize(keys, values); if (isPublisher() || isMappedPublisher()) { // We have already completed the queued futures by the time the overall batchLoad future has completed. - return values; + return CompletableFutureKit.success(values); } - List clearCacheKeys = new ArrayList<>(); - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - V value = values.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); - if (value instanceof Throwable) { - stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - future.completeExceptionally((Throwable) value); - clearCacheKeys.add(keys.get(idx)); - } else if (value instanceof Try) { - // we allow the batch loader to return a Try so we can better represent a computation - // that might have worked or not. - Try tryValue = (Try) value; - if (tryValue.isSuccess()) { - future.complete(tryValue.get()); - } else { + Runnable completeValuesRunnable = () -> { + List clearCacheKeys = new ArrayList<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + V value = values.get(idx); + Object callContext = keyContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + if (value instanceof Throwable) { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); - future.completeExceptionally(tryValue.getThrowable()); + future.completeExceptionally((Throwable) value); clearCacheKeys.add(keys.get(idx)); + } else if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + future.complete(tryValue.get()); + } else { + stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + future.completeExceptionally(tryValue.getThrowable()); + clearCacheKeys.add(keys.get(idx)); + } + } else { + future.complete(value); } - } else { - future.complete(value); } - } - possiblyClearCacheEntriesOnExceptions(clearCacheKeys); - return values; + possiblyClearCacheEntriesOnExceptions(clearCacheKeys); + }; + + return scheduleCompletion(environment, keys, values, completeValuesRunnable); }).exceptionally(ex -> { - stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, keyContexts)); if (ex instanceof CompletionException) { ex = ex.getCause(); } @@ -375,6 +375,26 @@ private CompletableFuture> dispatchQueueBatch(List keys, List }); } + private CompletableFuture> scheduleCompletion(BatchLoaderEnvironment environment, List keys, List values, Runnable completeValuesRunnable) { + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + CompletionStage scheduledCompletion; + if (batchLoaderScheduler != null) { + scheduledCompletion = batchLoaderScheduler + .scheduleCompletion(completeValuesRunnable, keys, environment); + } else { + scheduledCompletion = CompletableFutureKit.run(completeValuesRunnable); + } + return scheduledCompletion + .thenApply(ignored -> values) + .toCompletableFuture(); + } + + private BatchLoaderEnvironment mkBatchLoaderEnv(List keys, List keyContexts) { + Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); + return BatchLoaderEnvironment.newBatchLoaderEnvironment() + .context(context).keyContexts(keys, keyContexts).build(); + } + private void assertResultSize(List keys, List values) { assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list"); @@ -396,15 +416,17 @@ private void possiblyClearCacheEntriesOnExceptions(List keys) { CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); + BatchLoaderEnvironment environment = mkBatchLoaderEnv(keys, keyContexts); + List> queuedFutures = singletonList(new CompletableFuture<>()); - return invokeLoader(keys, keyContexts, queuedFutures, cachingEnabled) + return invokeLoader(environment, keys, keyContexts, queuedFutures, cachingEnabled) .thenApply(list -> list.get(0)) .toCompletableFuture(); } - CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures, boolean cachingEnabled) { + CompletableFuture> invokeLoader(BatchLoaderEnvironment environment, List keys, List keyContexts, List> queuedFutures, boolean cachingEnabled) { if (!cachingEnabled) { - return invokeLoader(keys, keyContexts, queuedFutures); + return invokeLoader(environment, keys, keyContexts, queuedFutures); } CompletableFuture>> cacheCallCF = getFromValueCache(keys); return cacheCallCF.thenCompose(cachedValues -> { @@ -453,7 +475,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, // we missed some keys from cache, so send them to the batch loader // and then fill in their values // - CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts, missedQueuedFutures); + CompletableFuture> batchLoad = invokeLoader(environment, missedKeys, missedKeyContexts, missedQueuedFutures); return batchLoad.thenCompose(missedValues -> { assertResultSize(missedKeys, missedValues); @@ -472,11 +494,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, }); } - CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures) { - Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); - BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment() - .context(context).keyContexts(keys, keyContexts).build(); - + CompletableFuture> invokeLoader(BatchLoaderEnvironment environment, List keys, List keyContexts, List> queuedFutures) { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginBatchLoader(dataLoader, keys, environment)); CompletableFuture> batchLoad; diff --git a/src/main/java/org/dataloader/impl/CompletableFutureKit.java b/src/main/java/org/dataloader/impl/CompletableFutureKit.java index ebc35eca..e33fd967 100644 --- a/src/main/java/org/dataloader/impl/CompletableFutureKit.java +++ b/src/main/java/org/dataloader/impl/CompletableFutureKit.java @@ -2,6 +2,7 @@ import org.dataloader.annotations.Internal; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -22,6 +23,12 @@ public static CompletableFuture failedFuture(Exception e) { return future; } + public static CompletableFuture success(V v) { + CompletableFuture future = new CompletableFuture<>(); + future.complete(v); + return future; + } + public static Throwable cause(CompletableFuture completableFuture) { if (!completableFuture.isCompletedExceptionally()) { return null; @@ -67,4 +74,22 @@ public static CompletableFuture> allOf(Map CompletableFuture> allOfFlatMap(List>> cfs) { + + return CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])) + .thenApply(v -> cfs.stream() + .map(CompletableFuture::join) + .flatMap(Collection::stream) + .collect(toList())); + } + + public static CompletableFuture run(Runnable runnable) { + try { + runnable.run(); + return CompletableFutureKit.success(null); + } catch (Exception e) { + return CompletableFutureKit.failedFuture(e); + } + } } diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index e7e95d9e..0a5d91ef 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -2,11 +2,12 @@ import org.dataloader.BatchLoader; import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.BatchPublisher; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoader; import org.dataloader.MappedBatchPublisher; -import org.dataloader.BatchPublisher; +import org.dataloader.impl.CompletableFutureKit; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ public interface BatchLoaderScheduler { * @param the value type */ interface ScheduledBatchLoaderCall { + CompletionStage> invoke(); } @@ -41,6 +43,7 @@ interface ScheduledBatchLoaderCall { * @param the value type */ interface ScheduledMappedBatchLoaderCall { + CompletionStage> invoke(); } @@ -48,6 +51,7 @@ interface ScheduledMappedBatchLoaderCall { * This represents a callback that will invoke a {@link BatchPublisher} or {@link MappedBatchPublisher} function under the covers */ interface ScheduledBatchPublisherCall { + void invoke(); } @@ -92,4 +96,40 @@ interface ScheduledBatchPublisherCall { * @param the key type */ void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule the "completion" of the {@link java.util.concurrent.CompletableFuture}s in the {@link org.dataloader.DataLoader} that map + * to values that have come back from the batch loader call. + *

+ * You might want to schedule the completion of values on another thread if there following chained work such as : + *

+     * {@code
+     *   var cfStage1 = dataLoader.load(key);
+     *   var cfStage2 = cfStage1.thenApply(v -> doSomethingSlow(v));
+     *   //...
+     *   var dispatchCF = dataLoader.dispatch();
+     * }
+     * 
+ *

+ * In the above example the `.doSomethingSlow(v)` call will happen inside the completion + * of the dataloader code path when it tries to complete `cfStage1` since {@link java.util.concurrent.CompletableFuture} + * by design runs chained dependent methods eagerly. + *

+ * Perhaps you want this tp happen more asynchronously so that the `.dispatch()` returns more + * quickly and is not bound to the `.doSomethingSlow(v)` call. + *

+ * By default, the dispatch completion is done on the current thread in a synchronous manner, which will include + * any extra {@link java.util.concurrent.CompletableFuture} dependent chained methods. + * + * @param completeValuesRunnable this is the runnable that the {@link DataLoader} engine code needs to be run + * @param keys this is the list of keys that will be passed to the {@link BatchPublisher}. + * This is provided only for informative reasons and, you can't change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * + * @return a {@link CompletionStage} representing this work is being scheduled + */ + default CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + return CompletableFutureKit.run(completeValuesRunnable); + } + } diff --git a/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java b/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java new file mode 100644 index 00000000..22b30eec --- /dev/null +++ b/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java @@ -0,0 +1,102 @@ +package org.dataloader.impl; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class CompletableFutureKitTest { + + @Test + void failedFuture() { + CompletableFuture cf = CompletableFutureKit.failedFuture(new RuntimeException("BANG")); + assertThat(cf.isCompletedExceptionally(), equalTo(true)); + CompletionException completionException = assertThrows(CompletionException.class, cf::join); + assertThat(completionException.getCause().getMessage(), equalTo("BANG")); + } + + @Test + void success() { + CompletableFuture cf = CompletableFutureKit.success("BANG"); + assertThat(cf.isCompletedExceptionally(), equalTo(false)); + assertThat(cf.join(), equalTo("BANG")); + } + + @Test + void cause() { + CompletableFuture cf = CompletableFutureKit.failedFuture(new RuntimeException("BANG")); + assertThat(CompletableFutureKit.cause(cf), instanceOf(RuntimeException.class)); + } + + @Test + void succeeded() { + CompletableFuture cf = CompletableFutureKit.success("BANG"); + assertThat(CompletableFutureKit.succeeded(cf), equalTo(true)); + assertThat(CompletableFutureKit.failed(cf), equalTo(false)); + } + + @Test + void failed() { + CompletableFuture cf = CompletableFutureKit.failedFuture(new RuntimeException("BANG")); + assertThat(CompletableFutureKit.failed(cf), equalTo(true)); + assertThat(CompletableFutureKit.succeeded(cf), equalTo(false)); + } + + @Test + void allOf() { + CompletableFuture> list = CompletableFutureKit.allOf(List.of( + CompletableFutureKit.success("1"), + CompletableFutureKit.success("2"), + CompletableFutureKit.success("3") + )); + + assertThat(list.join(), equalTo(List.of("1", "2", "3"))); + } + + @Test + void flatMapAllOf() { + + CompletableFuture> list1 = CompletableFutureKit.allOf(List.of( + CompletableFutureKit.success("1"), + CompletableFutureKit.success("2"), + CompletableFutureKit.success("3") + )); + CompletableFuture> list2 = CompletableFutureKit.allOf(List.of( + CompletableFutureKit.success("4"), + CompletableFutureKit.success("5"), + CompletableFutureKit.success("6") + )); + + CompletableFuture> list = CompletableFutureKit.allOfFlatMap(List.of(list1, list2)); + assertThat(list.join(), equalTo(List.of("1", "2", "3", "4", "5", "6"))); + + } + + @Test + void run() { + AtomicBoolean ran = new AtomicBoolean(false); + Runnable runnable = () -> ran.set(true); + + CompletableFuture runCF = CompletableFutureKit.run(runnable); + runCF.join(); + assertThat(ran.get(), equalTo(true)); + + runnable = () -> { + throw new RuntimeException("BANG"); + }; + + runCF = CompletableFutureKit.run(runnable); + + CompletionException completionException = assertThrows(CompletionException.class, runCF::join); + assertThat(completionException.getCause().getMessage(), equalTo("BANG")); + + + } +} \ No newline at end of file diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index b9a7c01d..b33cefe5 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -184,4 +184,75 @@ public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall } + @Test + void can_schedule_cf_completion() { + + AtomicBoolean useThreading = new AtomicBoolean(false); + BatchLoaderScheduler scheduler = new BatchLoaderScheduler() { + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + scheduledCall.invoke(); + } + + @Override + public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + if (useThreading.get()) { + snooze(500); + return CompletableFuture.runAsync(completeValuesRunnable); + } else { + return BatchLoaderScheduler.super.scheduleCompletion(completeValuesRunnable, keys, environment); + } + } + }; + + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(scheduler).build(); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + CompletableFuture cf1 = identityLoader.load(1); + CompletableFuture cf2 = identityLoader.load(2); + CompletableFuture> dispatchCF = identityLoader.dispatch(); + + await().until(dispatchCF::isDone); + assertThat(cf1.join(), equalTo(1)); + assertThat(cf2.join(), equalTo(2)); + + // switch mode to threading mdoe + + useThreading.set(true); + + cf1 = identityLoader.load(10); + cf2 = identityLoader.load(20); + dispatchCF = identityLoader.dispatch(); + + + await().until(dispatchCF::isDone); + assertThat(cf1.join(), equalTo(10)); + assertThat(cf2.join(), equalTo(20)); + } + + @Test + void no_scheduler_present_will_works() { + DataLoaderOptions options = DataLoaderOptions.newOptions().build(); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + CompletableFuture cf1 = identityLoader.load(1); + CompletableFuture cf2 = identityLoader.load(2); + CompletableFuture> dispatchCF = identityLoader.dispatch(); + + await().until(dispatchCF::isDone); + assertThat(cf1.join(), equalTo(1)); + assertThat(cf2.join(), equalTo(2)); + } } From 8eadcb235841b8767c626e89cd735f58103dfd4c Mon Sep 17 00:00:00 2001 From: bbaker Date: Sun, 17 May 2026 15:08:09 +1000 Subject: [PATCH 2/3] Allow scheduling of CF completion --- README.md | 2 +- src/main/java/org/dataloader/DataLoaderHelper.java | 2 +- src/main/java/org/dataloader/impl/CompletableFutureKit.java | 2 +- .../java/org/dataloader/scheduler/BatchLoaderScheduler.java | 2 +- .../java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1925be36..461a6d16 100644 --- a/README.md +++ b/README.md @@ -662,7 +662,7 @@ This is an example of running the completion step in an asynchronous manner : ```java @Override -public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { +public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { return CompletableFuture.runAsync(completeValuesRunnable); } ``` diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index e1c77c00..63e216a8 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -242,7 +242,7 @@ DispatchResult dispatch() { int queueSize = loaderQueueEntryHead.queueSize; // we copy the pre-loaded set of futures ready for dispatch Object[] keysArray = new Object[queueSize]; - CompletableFuture[] queuedFuturesArray = new CompletableFuture[queueSize]; + CompletableFuture[] queuedFuturesArray = new CompletableFuture[queueSize]; Object[] callContextsArray = new Object[queueSize]; int index = queueSize - 1; while (loaderQueueEntryHead != null) { diff --git a/src/main/java/org/dataloader/impl/CompletableFutureKit.java b/src/main/java/org/dataloader/impl/CompletableFutureKit.java index e33fd967..6750960a 100644 --- a/src/main/java/org/dataloader/impl/CompletableFutureKit.java +++ b/src/main/java/org/dataloader/impl/CompletableFutureKit.java @@ -84,7 +84,7 @@ public static CompletableFuture> allOfFlatMap(List run(Runnable runnable) { + public static CompletableFuture run(Runnable runnable) { try { runnable.run(); return CompletableFutureKit.success(null); diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 0a5d91ef..2088a273 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -128,7 +128,7 @@ interface ScheduledBatchPublisherCall { * * @return a {@link CompletionStage} representing this work is being scheduled */ - default CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + default CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { return CompletableFutureKit.run(completeValuesRunnable); } diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index b33cefe5..daf03189 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -205,7 +205,7 @@ public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall } @Override - public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { if (useThreading.get()) { snooze(500); return CompletableFuture.runAsync(completeValuesRunnable); From e04c76d677c88cce8481640350ccf09213de6dc3 Mon Sep 17 00:00:00 2001 From: Francois-Xavier Bonnet Date: Tue, 19 May 2026 07:52:48 +1000 Subject: [PATCH 3/3] Allow BatchLoaderScheduler process all CF completions as a batch and potentially in parallel --- README.md | 8 ++-- .../java/org/dataloader/DataLoaderHelper.java | 39 +++++++++++-------- .../dataloader/impl/CompletableFutureKit.java | 19 +++++++++ .../scheduler/BatchLoaderScheduler.java | 6 +-- .../impl/CompletableFutureKitTest.java | 29 +++++++++++++- .../scheduler/BatchLoaderSchedulerTest.java | 29 ++++++++++++-- 6 files changed, 102 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 461a6d16..da3f9227 100644 --- a/README.md +++ b/README.md @@ -655,15 +655,17 @@ Perhaps you want this completion to happen more asynchronously so that the `.dis quickly and is not bound to the `.doSomethingSlow(v)` call. By default, the dispatch completion is done on the current thread in a synchronous manner, which will include -any extra `CompletableFuture` chained methods. +any extra `CompletableFuture` chained methods. If multiple keys were loaded, the dispatch completion is done +sequentially. This is an example of running the completion step in an asynchronous manner : ```java @Override -public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { - return CompletableFuture.runAsync(completeValuesRunnable); +public CompletionStage scheduleCompletion(List completeValueRunnables, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.allOf(completeValueRunnables.stream() + .map(CompletableFuture::runAsync).toArray(CompletableFuture[]::new)); } ``` diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 63e216a8..13d7ab9a 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -19,6 +19,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -27,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyList; @@ -329,17 +331,18 @@ private CompletableFuture> dispatchQueueBatch(List keys, List return CompletableFutureKit.success(values); } - Runnable completeValuesRunnable = () -> { - List clearCacheKeys = new ArrayList<>(); - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - V value = values.get(idx); - Object callContext = keyContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); + Collection clearCacheKeys = new ConcurrentLinkedQueue<>(); + List completeValueRunnables = new ArrayList<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + V value = values.get(idx); + Object callContext = keyContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + Runnable completeValueRunnable = () -> { if (value instanceof Throwable) { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); future.completeExceptionally((Throwable) value); - clearCacheKeys.add(keys.get(idx)); + clearCacheKeys.add(key); } else if (value instanceof Try) { // we allow the batch loader to return a Try so we can better represent a computation // that might have worked or not. @@ -349,16 +352,18 @@ private CompletableFuture> dispatchQueueBatch(List keys, List } else { stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); future.completeExceptionally(tryValue.getThrowable()); - clearCacheKeys.add(keys.get(idx)); + clearCacheKeys.add(key); } } else { future.complete(value); } - } + }; + completeValueRunnables.add(completeValueRunnable); + } + return scheduleCompletion(environment, keys, values, completeValueRunnables).thenApply(ignored -> { possiblyClearCacheEntriesOnExceptions(clearCacheKeys); - }; - - return scheduleCompletion(environment, keys, values, completeValuesRunnable); + return values; + }); }).exceptionally(ex -> { stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, keyContexts)); if (ex instanceof CompletionException) { @@ -375,14 +380,14 @@ private CompletableFuture> dispatchQueueBatch(List keys, List }); } - private CompletableFuture> scheduleCompletion(BatchLoaderEnvironment environment, List keys, List values, Runnable completeValuesRunnable) { + private CompletableFuture> scheduleCompletion(BatchLoaderEnvironment environment, List keys, List values, List completeValueRunnables) { BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); CompletionStage scheduledCompletion; if (batchLoaderScheduler != null) { scheduledCompletion = batchLoaderScheduler - .scheduleCompletion(completeValuesRunnable, keys, environment); + .scheduleCompletion(completeValueRunnables, keys, environment); } else { - scheduledCompletion = CompletableFutureKit.run(completeValuesRunnable); + scheduledCompletion = CompletableFutureKit.runAll(completeValueRunnables); } return scheduledCompletion .thenApply(ignored -> values) @@ -400,7 +405,7 @@ private void assertResultSize(List keys, List values) { assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list"); } - private void possiblyClearCacheEntriesOnExceptions(List keys) { + private void possiblyClearCacheEntriesOnExceptions(Collection keys) { if (keys.isEmpty()) { return; } diff --git a/src/main/java/org/dataloader/impl/CompletableFutureKit.java b/src/main/java/org/dataloader/impl/CompletableFutureKit.java index 6750960a..eed47b11 100644 --- a/src/main/java/org/dataloader/impl/CompletableFutureKit.java +++ b/src/main/java/org/dataloader/impl/CompletableFutureKit.java @@ -84,6 +84,13 @@ public static CompletableFuture> allOfFlatMap(List run(Runnable runnable) { try { runnable.run(); @@ -92,4 +99,16 @@ public static CompletableFuture run(Runnable runnable) { return CompletableFutureKit.failedFuture(e); } } + + /** + * Runs all the {@link Runnable} from the list synchronously on the current thread, returning a + * {@link CompletableFuture} that is completed normally or exceptionally based on the outcome. + * + * @param runnables the list of tasks to execute + * @return a completed future, or a failed future if any of the tasks throws + */ + public static CompletableFuture runAll(List runnables) { + return CompletableFuture.allOf(runnables.stream().map(CompletableFutureKit::run).toArray(CompletableFuture[]::new)); + } + } diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 2088a273..fe67545b 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -121,15 +121,15 @@ interface ScheduledBatchPublisherCall { * By default, the dispatch completion is done on the current thread in a synchronous manner, which will include * any extra {@link java.util.concurrent.CompletableFuture} dependent chained methods. * - * @param completeValuesRunnable this is the runnable that the {@link DataLoader} engine code needs to be run + * @param completeValueRunnables these are the runnable tasks that the {@link DataLoader} engine code needs to be run * @param keys this is the list of keys that will be passed to the {@link BatchPublisher}. * This is provided only for informative reasons and, you can't change the keys that are used * @param environment this is the {@link BatchLoaderEnvironment} in place, * * @return a {@link CompletionStage} representing this work is being scheduled */ - default CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { - return CompletableFutureKit.run(completeValuesRunnable); + default CompletionStage scheduleCompletion(List completeValueRunnables, List keys, BatchLoaderEnvironment environment) { + return CompletableFutureKit.runAll(completeValueRunnables); } } diff --git a/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java b/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java index 22b30eec..f13f3995 100644 --- a/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java +++ b/src/test/java/org/dataloader/impl/CompletableFutureKitTest.java @@ -99,4 +99,31 @@ void run() { } -} \ No newline at end of file + + @Test + void runAll() { + AtomicBoolean ran1 = new AtomicBoolean(false); + AtomicBoolean ran2 = new AtomicBoolean(false); + Runnable runnable1 = () -> ran1.set(true); + Runnable runnable2 = () -> ran2.set(true); + + CompletableFuture runCF = CompletableFutureKit.runAll(List.of(runnable1, runnable2)); + runCF.join(); + assertThat(ran1.get(), equalTo(true)); + assertThat(ran2.get(), equalTo(true)); + + ran1.set(false); + ran2.set(false); + Runnable runnable3 = () -> { + throw new RuntimeException("BANG"); + }; + + runCF = CompletableFutureKit.runAll(List.of(runnable1, runnable2, runnable3)); + + CompletionException completionException = assertThrows(CompletionException.class, runCF::join); + assertThat(ran1.get(), equalTo(true)); + assertThat(ran2.get(), equalTo(true)); + assertThat(completionException.getCause().getMessage(), equalTo("BANG")); + } + +} diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index daf03189..1973e916 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -3,6 +3,7 @@ import org.dataloader.BatchLoaderEnvironment; import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; +import org.dataloader.impl.CompletableFutureKit; import org.junit.jupiter.api.Test; import java.util.List; @@ -11,6 +12,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.Collectors; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; @@ -188,6 +190,7 @@ public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall void can_schedule_cf_completion() { AtomicBoolean useThreading = new AtomicBoolean(false); + AtomicBoolean parallelCompletion = new AtomicBoolean(false); BatchLoaderScheduler scheduler = new BatchLoaderScheduler() { @Override public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { @@ -205,12 +208,17 @@ public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall } @Override - public CompletionStage scheduleCompletion(Runnable completeValuesRunnable, List keys, BatchLoaderEnvironment environment) { + public CompletionStage scheduleCompletion(List completeValueRunnables, List keys, BatchLoaderEnvironment environment) { if (useThreading.get()) { snooze(500); - return CompletableFuture.runAsync(completeValuesRunnable); + if (!parallelCompletion.get()) { + return CompletableFutureKit.runAll(completeValueRunnables); + } else { + return CompletableFuture.allOf(completeValueRunnables.stream() + .map(CompletableFuture::runAsync).toArray(CompletableFuture[]::new)); + } } else { - return BatchLoaderScheduler.super.scheduleCompletion(completeValuesRunnable, keys, environment); + return BatchLoaderScheduler.super.scheduleCompletion(completeValueRunnables, keys, environment); } } }; @@ -227,7 +235,20 @@ public CompletionStage scheduleCompletion(Runnable completeValuesRunna assertThat(cf1.join(), equalTo(1)); assertThat(cf2.join(), equalTo(2)); - // switch mode to threading mdoe + // switch mode to threading mode + + useThreading.set(true); + + cf1 = identityLoader.load(10); + cf2 = identityLoader.load(20); + dispatchCF = identityLoader.dispatch(); + + + await().until(dispatchCF::isDone); + assertThat(cf1.join(), equalTo(10)); + assertThat(cf2.join(), equalTo(20)); + + // switch mode to parallel execution useThreading.set(true);