Class DataBufferUtils
DataBuffers.- Since:
- 5.0
- Author:
- Arjen Poutsma, Brian Clozel
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceContract to find delimiter(s) against one or more data buffers that can be passed one at a time to theDataBufferUtils.Matcher.match(DataBuffer)method. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic reactor.core.publisher.Mono<DataBuffer>join(Publisher<? extends DataBuffer> dataBuffers) Return a newDataBuffercomposed from joining together the givendataBufferselements.static reactor.core.publisher.Mono<DataBuffer>join(Publisher<? extends DataBuffer> buffers, int maxByteCount) Variant ofjoin(Publisher)that behaves the same way up until the specified max number of bytes to buffer.static DataBufferUtils.Matchermatcher(byte[] delimiter) Return aDataBufferUtils.Matcherfor the given delimiter.static DataBufferUtils.Matchermatcher(byte[]... delimiters) Return aDataBufferUtils.Matcherfor the given delimiters.static reactor.core.publisher.Flux<DataBuffer>read(Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) Read bytes from the given filePathinto aFluxofDataBuffers.static reactor.core.publisher.Flux<DataBuffer>read(Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) Read the givenResourceinto aFluxofDataBuffers starting at the given position.static reactor.core.publisher.Flux<DataBuffer>read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) Read the givenResourceinto aFluxofDataBuffers.static reactor.core.publisher.Flux<DataBuffer>readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, long position, DataBufferFactory bufferFactory, int bufferSize) Obtain anAsynchronousFileChannelfrom the given supplier, and read it into aFluxofDataBuffers, starting at the given position.static reactor.core.publisher.Flux<DataBuffer>readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aAsynchronousFileChannelfrom the given supplier, and read it into aFluxofDataBuffers.static reactor.core.publisher.Flux<DataBuffer>readByteChannel(Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) static reactor.core.publisher.Flux<DataBuffer>readInputStream(Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) static booleanrelease(DataBuffer dataBuffer) Release the given data buffer, if it is aPooledDataBufferand has been allocated.static Consumer<DataBuffer>Return a consumer that callsrelease(DataBuffer)on all passed data buffers.static <T extends DataBuffer>
Tretain(T dataBuffer) Retain the given data buffer, if it is aPooledDataBuffer.static reactor.core.publisher.Flux<DataBuffer>skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Skip buffers from the givenPublisheruntil the total byte count reaches the given maximum byte count, or until the publisher is complete.static reactor.core.publisher.Flux<DataBuffer>takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Relay buffers from the givenPublisheruntil the total byte count reaches the given maximum byte count, or until the publisher is complete.static <T extends DataBuffer>
TAssociate the given hint with the data buffer if it is a pooled buffer and supports leak tracking.static reactor.core.publisher.Flux<DataBuffer>write(Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) Write the given stream ofDataBuffersto the givenAsynchronousFileChannel.static reactor.core.publisher.Flux<DataBuffer>write(Publisher<DataBuffer> source, OutputStream outputStream) Write the given stream ofDataBuffersto the givenOutputStream.static reactor.core.publisher.Flux<DataBuffer>write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) Write the given stream ofDataBuffersto the givenAsynchronousFileChannel.static reactor.core.publisher.Flux<DataBuffer>write(Publisher<DataBuffer> source, WritableByteChannel channel) Write the given stream ofDataBuffersto the givenWritableByteChannel.static reactor.core.publisher.Mono<Void>write(Publisher<DataBuffer> source, Path destination, OpenOption... options) Write the given stream ofDataBuffersto the given filePath.
-
Constructor Details
-
DataBufferUtils
public DataBufferUtils()
-
-
Method Details
-
readInputStream
public static reactor.core.publisher.Flux<DataBuffer> readInputStream(Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain anInputStreamfrom the given supplier, and read it into aFluxofDataBuffers. Closes the input stream when the Flux is terminated.- Parameters:
inputStreamSupplier- the supplier for the input stream to read frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readByteChannel
public static reactor.core.publisher.Flux<DataBuffer> readByteChannel(Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aReadableByteChannelfrom the given supplier, and read it into aFluxofDataBuffers. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier- the supplier for the channel to read frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readAsynchronousFileChannel
public static reactor.core.publisher.Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) Obtain aAsynchronousFileChannelfrom the given supplier, and read it into aFluxofDataBuffers. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier- the supplier for the channel to read frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
readAsynchronousFileChannel
public static reactor.core.publisher.Flux<DataBuffer> readAsynchronousFileChannel(Callable<AsynchronousFileChannel> channelSupplier, long position, DataBufferFactory bufferFactory, int bufferSize) Obtain anAsynchronousFileChannelfrom the given supplier, and read it into aFluxofDataBuffers, starting at the given position. Closes the channel when the Flux is terminated.- Parameters:
channelSupplier- the supplier for the channel to read fromposition- the position to start reading frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) Read bytes from the given filePathinto aFluxofDataBuffers. The method ensures that the file is closed when the flux is terminated.- Parameters:
path- the path to read bytes frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
- Since:
- 5.2
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) Read the givenResourceinto aFluxofDataBuffers.If the resource is a file, it is read into an
AsynchronousFileChanneland turned toFluxviareadAsynchronousFileChannel(Callable, DataBufferFactory, int)or else fall back toreadByteChannel(Callable, DataBufferFactory, int). Closes the channel when the flux is terminated.- Parameters:
resource- the resource to read frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
read
public static reactor.core.publisher.Flux<DataBuffer> read(Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) Read the givenResourceinto aFluxofDataBuffers starting at the given position.If the resource is a file, it is read into an
AsynchronousFileChanneland turned toFluxviareadAsynchronousFileChannel(Callable, DataBufferFactory, int)or else fall back onreadByteChannel(Callable, DataBufferFactory, int). Closes the channel when the flux is terminated.- Parameters:
resource- the resource to read fromposition- the position to start reading frombufferFactory- the factory to create data buffers withbufferSize- the maximum size of the data buffers- Returns:
- a Flux of data buffers read from the given channel
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) Write the given stream ofDataBuffersto the givenOutputStream. Does not close the output stream when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFluxwith areleaseConsumer().Note that the writing process does not start until the returned
Fluxis subscribed to.- Parameters:
source- the stream of data buffers to be writtenoutputStream- the output stream to write to- Returns:
- a Flux containing the same buffers as in
source, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) Write the given stream ofDataBuffersto the givenWritableByteChannel. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFluxwith areleaseConsumer().Note that the writing process does not start until the returned
Fluxis subscribed to.- Parameters:
source- the stream of data buffers to be writtenchannel- the channel to write to- Returns:
- a Flux containing the same buffers as in
source, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) Write the given stream ofDataBuffersto the givenAsynchronousFileChannel. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFluxwith areleaseConsumer().Note that the writing process does not start until the returned
Fluxis subscribed to.- Parameters:
source- the stream of data buffers to be writtenchannel- the channel to write to- Returns:
- a Flux containing the same buffers as in
source, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal - Since:
- 5.0.10
-
write
public static reactor.core.publisher.Flux<DataBuffer> write(Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) Write the given stream ofDataBuffersto the givenAsynchronousFileChannel. Does not close the channel when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returnedFluxwith areleaseConsumer().Note that the writing process does not start until the returned
Fluxis subscribed to.- Parameters:
source- the stream of data buffers to be writtenchannel- the channel to write toposition- the file position where writing is to begin; must be non-negative- Returns:
- a flux containing the same buffers as in
source, that starts the writing process when subscribed to, and that publishes any writing errors and the completion signal
-
write
public static reactor.core.publisher.Mono<Void> write(Publisher<DataBuffer> source, Path destination, OpenOption... options) Write the given stream ofDataBuffersto the given filePath. The optionaloptionsparameter specifies how the file is created or opened (defaults toCREATE,TRUNCATE_EXISTING, andWRITE).- Parameters:
source- the stream of data buffers to be writtendestination- the path to the fileoptions- the options specifying how the file is opened- Returns:
- a
Monothat indicates completion or error - Since:
- 5.2
-
takeUntilByteCount
public static reactor.core.publisher.Flux<DataBuffer> takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Relay buffers from the givenPublisheruntil the total byte count reaches the given maximum byte count, or until the publisher is complete.- Parameters:
publisher- the publisher to filtermaxByteCount- the maximum byte count- Returns:
- a flux whose maximum byte count is
maxByteCount
-
skipUntilByteCount
public static reactor.core.publisher.Flux<DataBuffer> skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) Skip buffers from the givenPublisheruntil the total byte count reaches the given maximum byte count, or until the publisher is complete.- Parameters:
publisher- the publisher to filtermaxByteCount- the maximum byte count- Returns:
- a flux with the remaining part of the given publisher
-
retain
Retain the given data buffer, if it is aPooledDataBuffer.- Parameters:
dataBuffer- the data buffer to retain- Returns:
- the retained buffer
-
touch
Associate the given hint with the data buffer if it is a pooled buffer and supports leak tracking.- Parameters:
dataBuffer- the data buffer to attach the hint tohint- the hint to attach- Returns:
- the input buffer
- Since:
- 5.3.2
-
release
Release the given data buffer, if it is aPooledDataBufferand has been allocated.- Parameters:
dataBuffer- the data buffer to release- Returns:
trueif the buffer was released;falseotherwise.
-
releaseConsumer
Return a consumer that callsrelease(DataBuffer)on all passed data buffers. -
join
public static reactor.core.publisher.Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) Return a newDataBuffercomposed from joining together the givendataBufferselements. Depending on theDataBuffertype, the returned buffer may be a single buffer containing all data of the provided buffers, or it may be a zero-copy, composite with references to the given buffers.If
dataBuffersproduces an error or if there is a cancel signal, then all accumulated buffers will be released.Note that the given data buffers do not have to be released. They will be released as part of the returned composite.
- Parameters:
dataBuffers- the data buffers that are to be composed- Returns:
- a buffer that is composed from the
dataBuffersargument - Since:
- 5.0.3
-
join
public static reactor.core.publisher.Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) Variant ofjoin(Publisher)that behaves the same way up until the specified max number of bytes to buffer. Once the limit is exceeded,DataBufferLimitExceptionis raised.- Parameters:
buffers- the data buffers that are to be composedmaxByteCount- the max number of bytes to buffer, or -1 for unlimited- Returns:
- a buffer with the aggregated content, possibly an empty Mono if the max number of bytes to buffer is exceeded.
- Throws:
DataBufferLimitException- if maxByteCount is exceeded- Since:
- 5.1.11
-
matcher
Return aDataBufferUtils.Matcherfor the given delimiter. The matcher can be used to find the delimiters in a stream of data buffers.- Parameters:
delimiter- the delimiter bytes to find- Returns:
- the matcher
- Since:
- 5.2
-
matcher
Return aDataBufferUtils.Matcherfor the given delimiters. The matcher can be used to find the delimiters in a stream of data buffers.- Parameters:
delimiters- the delimiters bytes to find- Returns:
- the matcher
- Since:
- 5.2
-