Interruptible Reading¶
Reading from blocking sources like InputStream
isn't always avoidable. Once they're needed, JDK's
BodyHandlers::ofInputStream
can be used. However, reading
from such stream blocks your threads indefinitely, which causes troubles when you want to close the
application or change contexts amid reading. Methanol has support for interruptible channels.
These are asynchronously closeable and respond to thread interrupts. Using them, you can voluntarily
halt reading operations when they're not relevant anymore.
MoreBodySubscibers
has interruptible ReadableByteChannel
and Reader
implementations. Use JDK's
Channels::newInputStream
to get an InputStream
from an interruptible ReadableByteChannel
when
input streams is what you need.
Example - Interruptible Body Processing¶
Here's an example of a hypothetical component that processes the response from a ReadableByteChannel
.
When the task is to be discarded, reader threads are interrupted by shutting down the owning ExecutorService
.
This closes open channels and instructs them to halt blocking reads.
class BodyProcessor {
final ExecutorService executorService = Executors.newCachedThreadPool();
final Methanol client = Methanol.create();
CompletableFuture<Void> processAsync(HttpRequest request, Consumer<ByteBuffer> processAction) {
return client.sendAsync(request, MoreBodyHandlers.ofByteChannel())
.thenApplyAsync(res -> {
var buffer = ByteBuffer.allocate(8 * 1024);
try (var channel = res.body()) {
while (channel.read(buffer.clear()) >= 0) {
processAction.accept(buffer.flip());
}
} catch (ClosedByInterruptException | ClosedChannelException ignored) {
// The thread was interrupted due to ExecutorService shutdown
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return null;
}, executorService);
}
void terminate() { executorService.shutdownNow(); }
}