This module contains the base implementation of a Cluster.

A Cluster is in charge of dealing with the different Instance objects that will be part of the remote runnable.

Module Contents

flambe.cluster.cluster.UPLOAD_WARN_LIMIT_MB = 10[source]
class flambe.cluster.cluster.Cluster(name: str, factories_num: int, key: str, username: str, setup_cmds: Optional[List[str]] = None)[source]

Bases: flambe.runnable.Runnable

Basic implementation of a Cluster.

The cluster is in charge of creating the cluster of instances where one host is the Orchestrator while the other ones are Factories.

This implementation should not be used by an end user. In order to give support to a cloud service provider (ex: AWS), a child class must be implemented inheriting from the Cluster class.

Important: when possible, Clusters should context managers

  • name (str) – The name of the cluster, used to name the remote instances.
  • factories_num (int) – The amount of factories to use. Note that this differs from the number of workers, as each factories can contain multiple GPUs and therefore, multiple workers.
  • key (str) – The path to the ssh key used to communicate to all instances. IMPORTANT: all instances must be accessible with the same key.
  • username (str) – The username of the instances the cluster will handle. IMPORTANT: for now all instances need to have the same username.
  • setup_cmds (Optional[List[str]]) – A list of commands to be run on all hosts for setup purposes. These commands can be used to mount volumes, install software, etc. Defaults to None. IMPORTANT: the commands need to be idempotent and they shouldn’t expect user input.

A Cluster should be used with a context cluster to handle all possible errors in a clear way.


>>> with cluster as cl:
>>>     cl.launch_orchestrator()
>>>     cl.build_cluster()
>>>     ...
__exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], tb: Optional[TracebackType])[source]

Exit method for the context cluster.

This method will catch any exception, log it and return True. This means that all exceptions produced in a Cluster (used with the context cluster) will not continue to raise.


Get the orchestrator name.

The name is given by name with the ‘_orchestrator’ suffix. For example, if name is ‘seq2seq-en-fr’, then the orchestrator name will be ‘seq2seq-en-fr_orchestrator’.

This is an auxiliary method that can be used in child classes.

Returns:The orcehstrator name
Return type:str

Get the factory base name.

The name is name with the ‘_factory’ suffix. For example, if name is ‘seq2seq-en-fr’, then the factory basename will be ‘seq2seq-en-fr_factory’.

The base name can be used to generate all the factories’ names (for example, by also appending an index to the basename).

This is an auxiliary method that can be used in child classes.

Returns:The factory basename
Return type:str

Method to make all hosts accessible.

Depending on the Cluster type, it behaves differently. For example, AWSCluster or GCPCluster can create the instances in this step. The SSHCluster does nothing (the machines are already created).


Auxiliary method to get all the hosts in a list.append(

create_dirs(self, relative_dirs: List[str])[source]

Create folders in all hostss.

If some of the already exist, it will do nothing.

Parameters:relative_dirs (List[str]) – The directories to create. They should be relative paths and $HOME of each host will be used to add the prefix.

Prepare all the instances (both orchestrator and factories).

This method assumes that the hosts are running and accesible. It will call the ‘prepare’ method from all hosts.

run(self, force: bool = False, **kwargs)[source]

Run a cluster and load all the instances.

After this metho runs, the orchestrator and factories objects will be populated.

If a runnable is provided, then the cluster will execute the runnable remotely in the cluster. Currently, only ClusterRunnable is supported.

This method should be idempotent (ie if called N times with the same configuration, only one cluster will be created.)

Parameters:force (bool, defaults to False) – If true, current executions of the same runnable in the cluster will be overriden by a new execution.
run_cmds(self, setup_cmds: List[str])[source]

Run setup commands in all hosts

Parameters:setup_cmds (List[str]) – The list of commands
Raises:errors.RemoteCommandError – If at least one commands is not successful in at least one host.
get_orchestrator(self, ip: str, private_ip: str = None, use_public: bool = True)[source]

Get an orchestrator instance


Return the orchestrator home path

Return type:str
get_factory(self, ip: str, private_ip: str = None, use_public: bool = True)[source]

Get an CPU factory instance

get_gpu_factory(self, ip: str, private_ip: str = None, use_public: bool = True)[source]

Get an GPU factory instance


Create a ray cluster.

The main node is going to be located in the orchestrator machine and all other nodes in the factories.

The main node is executed with –num-cpus=0 flag so that it doesn’t do any work and all work is done by the factories.


Check if ray cluster was build successfully.

Compares the name of workers available with the requested ones.

Returns:Whether the number of workers in the node matches the number of factories
Return type:bool

Shut down the ray cluster.

Shut down the main node running in the orchestrator.


Return a list of the nodes in the Ray cluster.

Returns:The list of nodes
Return type:List[Instance]

Return a list of the hosts that are running flambe.

Returns:The list of nodes
Return type:List[Instance]

Shut down any flambe execution in the hosts.

existing_dir(self, _dir: str)[source]

Determine if _dir exists in at least one host


Return if the ray cluster is running.

Return type:bool

Rollback the enviornment.

When an error occures during the local stage of the remote runnable (i.e. creating the cluster, sending the data and submitting jobs), this method may be used to destroy the cluster that has been built.


Parse the cluster object.

Look for configurations mistakes that don’t allow the remote runnable to run. Each different cluster will have it’s own policies. For example, AWSCluster could check the instance types that are allowed. By default, checks nothing.

Raises:man_errors.ClusterConfigurationError – In case the Runnable is not able to run.
send_local_content(self, content: Dict[str, str], dest: str, all_hosts: bool = False)[source]

Send local content to the cluster

  • content (Dict[str, str]) – The dict of resources key -> local path
  • dest (str) – The orchestator’s destination folder
  • all_hosts (bool) – If False, only send the content to the orchestrator. If True, send to all factories.

The new dict of content with orchestrator’s paths.

Return type:

Dict[str, str]

rsync_orch(self, folder)[source]

Rsync the orchestrator’s folder with all factories

Parameters:folder (str) – The folder to rsync. It should be a relative path. $HOME value will be automatically added.
send_secrets(self, whitelist: List[str] = None)[source]

Send the secrets file to the orchestrator.

This file will be located in $HOME/secrets.ini The injected secrets file will be used.

Parameters:whitelist (List[str]) – A list of sections to filter. For example: [“AWS”, “GITHUB”]
execute(self, cluster_runnable, extensions: Dict[str, str], new_secrets: str, force: bool)[source]

Execute a ClusterRunnable in the cluster.

It will first upload the runnable file + extensions to the orchestrator (under $HOME/flambe.yaml) and then it will execute it based on the provided secrets

  • cluster_runnable (ClusterRunnable) – The ClusterRunnable to run in the cluster
  • extensions (Dict[str, str]) – The extensions for the ClusterRunnable
  • new_secrets (str) – The path (relative to the orchestrator) where the secrets are located. IMPORTANT: previous to calling this method, the secrets should have been uploaded to the orchestrator
  • force (bool) – The force parameter provided when running flambe locally
remove_dir(self, _dir: str, content_only: bool = True, all_hosts: bool = True)[source]

Remove a directory in the ClusterError

  • _dir (str) – The directory to remove
  • content_only (bool) – To remove the content only or the folder also. Defaults to True.
  • all_hosts (bool) – To remove it in all hosts or only in the Orchestrator. Defaults to True (in all hosts).

Whether the cluster already contains a valid common key.

The key must be in all hosts.

Returns:If the cluster has a key in all hosts.
Return type:bool

Create a new key pair and distributes it to all hosts.

Ensure that the hosts have a safe communication. The name of the key is the cluster’s name


Return if the factories contain GPU.

For now, all factories are same machine type, so as soon as a GPU is found, then this method returns.


Return the max common CPU/GPU devices in the factories

For example, if one factory contains 32 CPU + 1 GPU and the other factory contains 16 CPU + 2 GPU, this method will return {“cpu”: 16, “gpu”: 1} available

Returns:The devices, in {“cpu”: N, “gpu”: M} format
Return type:Dict[str, int]
install_extensions_in_orchestrator(self, extensions: Dict[str, str])[source]

Install local + pypi extensions in the orchestrator


extension (Dict[str, str]) – The extensions, as a dict from module_name to location

  • errors.RemoteCommandError – If could not install an extension.
  • man_errors.ClusterError – If the orchestrator was not loaded.
install_extensions_in_factories(self, extensions: Dict[str, str])[source]

Install local + pypi extensions in all the factories.

Parameters:extension (Dict[str, str]) – The extensions, as a dict from module_name to location
Raises:errors.RemoteCommandError – If could not install an extension
get_remote_env(self, user_provider: Callable[[], str])[source]

Get the RemoteEnvironment for this cluster.

The IPs stored will be the private IPs

Returns:The RemoteEnvironment with information about this cluster.
Return type:RemoteEnvironment