Skip to main content

Scaling with Clusters

So far, we have looked at distributed systems where there is a single process running each piece of the compute graph -- compute parallelism (like pipelining). However, we can also use Hydroflow+ to run the same computation on multiple processes -- achieving data parallelism (like replication and partitioning). This is done by creating a cluster of processes that all run the same subgraph.

Creating Clusters

Just like we use ProcessSpec to create processes, we use ClusterSpec to create clusters. We can then use the flow.cluster(spec) method to instantiate a cluster in our graph. Let's create a simple application where a leader process broadcasts data to a cluster of workers.

We start with the standard architecture, with a flow graph and a runtime entrypoint, but now take a cluster spec in addition to a process spec.

tip

If you have been following along with the Hydroflow+ template, you'll now need to declare a new module for this example. Create a new file at src/broadcast.rs and add the following to src/lib.rs:

src/lib.rs
pub mod broadcast;
src/broadcast.rs
use hydroflow_plus::*;
use stageleft::*;

pub struct Leader {}
pub struct Workers {}

pub fn broadcast(
flow: &FlowBuilder,
) -> (Process<Leader>, Cluster<Workers>) {
let leader = flow.process();
let workers = flow.cluster();

// ...

(leader, workers)
}

Broadcasting Data

When sending data between individual processes, we used the send_bincode operator. When sending data from a process to a cluster, we can use the broadcast_bincode operator instead.

let data = flow.source_iter(&leader, q!(0..10));
data
.broadcast_bincode(&workers)
.for_each(q!(|n| println!("{}", n)));

The Stream returned by broadcast_bincode represents the data received on each process in the cluster. Because all processes in a cluster run the exact same computation, we can then use the for_each operator directly on that stream to print the data on each process.

Deploying Graphs with Clusters

To deploy this application, we must set up the Hydro Deploy configuration as before. Our deployment script (examples/broadcast.rs) instantiates multiple services for the leader process and the workers. Since this script defines the physical deployment, we explicitly instantiate multiple services for the cluster spec, returning a Vec of services. We also set a display name for each service so that we can tell them apart in the logs.

examples/broadcast.rs
use std::cell::RefCell;

use hydro_deploy::{Deployment, HydroflowCrate};
use hydroflow_plus_deploy::TrybuildHost;

#[tokio::main]
async fn main() {
let mut deployment = Deployment::new();

let builder = hydroflow_plus::FlowBuilder::new();
let (leader, workers) = flow::broadcast::broadcast(&builder);

flow.with_default_optimize()
.with_process(&leader, TrybuildHost::new(deployment.Localhost()))
.with_cluster(&workers, (0..2)
.map(|idx| TrybuildHost::new(deployment.Localhost()))
.collect::<Vec<_>>()
)
.deploy(&mut deployment);

deployment.run_ctrl_c().await.unwrap();
}

If we run this script, we should see the following output:

cargo run --example broadcast
[worker/0] 0
[worker/1] 0
[worker/0] 1
[worker/1] 1
...