In this section, we will dive into the Mono publisher implementation in Reactor Java and understand how it represents a stream with zero or one element. We'll explore real-time examples where Mono is commonly used and demonstrate its usage in asynchronous or delayed result scenarios.
Example Scenario: Fetching User Details
In this scenario, imagine a web application where we need to fetch details of a user from a remote database. Let's see how Mono can be used for such an asynchronous operation.
Step 1: Creating a Mono We can create a Mono using the just() method and provide the user ID to fetch the details.
Mono<User> fetchUserDetails(String userId) {
// Simulating asynchronous operation to fetch user details
return Mono.fromCallable(() -> userRepository.findById(userId))
.subscribeOn(Schedulers.parallel());
}
When you use subscribeOn(Schedulers.parallel()), you're indicating that the subscription (the part of the reactive stream where the data producer and data consumer are connected) should run on a scheduler that is designed for parallel execution. This can be useful in scenarios where you have a CPU-bound or computationally intensive task and want to take advantage of multiple threads for parallelism.
Step 2: Subscribing to the Mono To retrieve the user details, we subscribe to the Mono and specify the actions to perform when the result is available or when an error occurs.
fetchUserDetails("123")
.doOnSuccess(user -> System.out.println("User details: " + user))
.doOnError(error -> System.err.println("Error occurred: " + error.getMessage()))
.subscribe();
Real-time Use Cases:
- Handling HTTP requests and responses asynchronously.
- Performing database operations that return a single result.
- Processing events or messages from a message queue.
Understanding Flux in Reactor Java
In this section, we will explore the Flux publisher implementation in Reactor Java, which represents a stream of zero to many items. We'll discuss how Flux is commonly used for handling streams of data where multiple items are emitted over time. Real-time examples will help illustrate Flux usage in scenarios involving event processing and data streaming.
Example Scenario: Streaming Sensor Readings
Imagine a real-time monitoring system that receives sensor readings from various devices and processes them. Let's see how Flux can be utilized in such a streaming scenario.
Step 1: Creating a Flux We can create a Flux using the range() method to generate a stream of simulated sensor readings.
Flux<SensorReading> streamSensorReadings() {
// Simulating streaming sensor readings
return Flux.range(1, 100)
.map(this::generateSensorReading)
.delayElements(Duration.ofSeconds(1));
}
Step 2: Subscribing to the Flux To process the sensor readings, we subscribe to the Flux and define the actions to perform for each item emitted.
streamSensorReadings()
.doOnNext(reading -> processSensorReading(reading))
.doOnError(error -> System.err.println("Error occurred: " + error.getMessage()))
.subscribe();
Real-time Use Cases:
- Handling real-time event streams, such as social media updates or stock market data.
- Processing messages from a message broker, like Apache Kafka.
- Managing continuous data streams, such as log processing or IoT telemetry.
Parallel Processing with ParallelFlux in Reactor Java
In this section, we will explore the ParallelFlux publisher implementation in Reactor Java, which allows for parallel processing of a stream. We'll discuss how ParallelFlux can be used to distribute the workload across multiple threads or CPU cores, providing faster execution times for computationally intensive operations.
Example Scenario: Image Processing in Parallel
Let's consider an image processing application that applies filters to a set of images. We'll demonstrate how ParallelFlux can be leveraged to parallelize the image processing, improving performance.
Step 1: Creating a ParallelFlux We can create a ParallelFlux using the parallel() method on a Flux and specify the number of parallel workers.
ParallelFlux<Image> parallelImageProcessing() {
// Simulating parallel image processing
return Flux.fromIterable(images)
.parallel()
.runOn(Schedulers.parallel())
.map(this::applyFilters);
}
Step 2: Subscribing to the ParallelFlux To initiate the parallel image processing, we subscribe to the ParallelFlux and define the actions to perform on the processed images.
parallelImageProcessing()
.doOnNext(processedImage -> saveProcessedImage(processedImage))
.doOnError(error -> System.err.println("Error occurred: " + error.getMessage()))
.sequential()
.subscribe();
Real-time Use Cases:
- Processing computationally intensive tasks, such as image or video rendering.
- Parallelizing data transformation or aggregation operations.
- Handling large-scale data processing and analytics.
Leveraging ConnectableFlux in Reactor Java
In this section, we will explore the ConnectableFlux publisher implementation in Reactor Java, which allows multiple subscribers to receive the same stream of data simultaneously. We'll discuss how ConnectableFlux can be used to share a single stream among multiple subscribers and ensure they all receive the same sequence of elements.
Example Scenario: Real-time Notifications
Consider a real-time notification system where multiple clients subscribe to receive notifications about various events. Let's see how ConnectableFlux can be used to broadcast notifications to multiple subscribers.
Step 1: Creating a ConnectableFlux We can create a ConnectableFlux using the publish() method on a Flux.
ConnectableFlux<Notification> broadcastNotifications() {
// Simulating real-time notifications
return Flux.interval(Duration.ofSeconds(1))
.map(this::generateNotification)
.publish();
}
Step 2: Connecting and Subscribing to the ConnectableFlux To start broadcasting notifications, we connect the ConnectableFlux and define the actions for each subscribed client.
ConnectableFlux<Notification> connectableFlux = broadcastNotifications();
connectableFlux.connect();
connectableFlux.subscribe(notification -> sendNotificationToClientA(notification));
connectableFlux.subscribe(notification -> sendNotificationToClientB(notification));
//Full Example (This is just a demoable code, I kept it simple for learning purpuse)
public class ConnectableFluxDemo {
public void connectableFluxExample() {
ConnectableFlux<Notification> connectableFlux = broadcastNotifications();
connectableFlux.connect();
connectableFlux.subscribe(notification -> sendNotificationToClientA(notification));
connectableFlux.subscribe(notification -> sendNotificationToClientB(notification));
}
private void sendNotificationToClientA(Notification notification) {
NotificationSender.send("A", notification);
}
private void sendNotificationToClientB(Notification notification) {
NotificationSender.send("B", notification);
}
private ConnectableFlux<Notification> broadcastNotifications() {
return ConnectableFlux.from(Flux.interval(Duration.ofSeconds(1)))
.map(this::generateNotification)
.publish();
}
private Notification generateNotification(Long id) {
return Notification.builder()
.id(id)
.message("Message for id " + id)
.build();
}
}
@Builder
public class Notification {
private long id;
private String message;
}
public class NotificationSender {
public static void send(String client, Notification notification) {
//Implement notification sender
}
}
Real-time Use Cases:
- Real-time chat applications, where multiple clients receive messages simultaneously.
- Broadcasting events or updates to subscribed clients.
- Building real-time dashboards or monitoring systems.