flambe.cluster

Package Contents

class flambe.cluster.Cluster(name: str, factories_num: int, key: str, username: str, setup_cmds: Optional[List[str]] = None)[source]

Bases: flambe.runnable.Runnable

Basic implementation of a Cluster.

The cluster is in charge of creating the cluster of instances where one host is the Orchestrator while the other ones are Factories.

This implementation should not be used by an end user. In order to give support to a cloud service provider (ex: AWS), a child class must be implemented inheriting from the Cluster class.

Important: when possible, Clusters should context managers

Parameters:
  • name (str) – The name of the cluster, used to name the remote instances.
  • factories_num (int) – The amount of factories to use. Note that this differs from the number of workers, as each factories can contain multiple GPUs and therefore, multiple workers.
  • key (str) – The path to the ssh key used to communicate to all instances. IMPORTANT: all instances must be accessible with the same key.
  • username (str) – The username of the instances the cluster will handle. IMPORTANT: for now all instances need to have the same username.
  • setup_cmds (Optional[List[str]]) – A list of commands to be run on all hosts for setup purposes. These commands can be used to mount volumes, install software, etc. Defaults to None. IMPORTANT: the commands need to be idempotent and they shouldn’t expect user input.
__enter__(self)

A Cluster should be used with a context cluster to handle all possible errors in a clear way.

Examples

>>> with cluster as cl:
>>>     cl.launch_orchestrator()
>>>     cl.build_cluster()
>>>     ...
__exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], tb: Optional[TracebackType])

Exit method for the context cluster.

This method will catch any exception, log it and return True. This means that all exceptions produced in a Cluster (used with the context cluster) will not continue to raise.

get_orchestrator_name(self)

Get the orchestrator name.

The name is given by name with the ‘_orchestrator’ suffix. For example, if name is ‘seq2seq-en-fr’, then the orchestrator name will be ‘seq2seq-en-fr_orchestrator’.

This is an auxiliary method that can be used in child classes.

Returns:The orcehstrator name
Return type:str
get_factory_basename(self)

Get the factory base name.

The name is name with the ‘_factory’ suffix. For example, if name is ‘seq2seq-en-fr’, then the factory basename will be ‘seq2seq-en-fr_factory’.

The base name can be used to generate all the factories’ names (for example, by also appending an index to the basename).

This is an auxiliary method that can be used in child classes.

Returns:The factory basename
Return type:str
load_all_instances(self)

Method to make all hosts accessible.

Depending on the Cluster type, it behaves differently. For example, AWSCluster or GCPCluster can create the instances in this step. The SSHCluster does nothing (the machines are already created).

_get_all_hosts(self)

Auxiliary method to get all the hosts in a list.append(

create_dirs(self, relative_dirs: List[str])

Create folders in all hostss.

If some of the already exist, it will do nothing.

Parameters:relative_dirs (List[str]) – The directories to create. They should be relative paths and $HOME of each host will be used to add the prefix.
prepare_all_instances(self)

Prepare all the instances (both orchestrator and factories).

This method assumes that the hosts are running and accesible. It will call the ‘prepare’ method from all hosts.

run(self, force: bool = False, **kwargs)

Run a cluster and load all the instances.

After this metho runs, the orchestrator and factories objects will be populated.

If a runnable is provided, then the cluster will execute the runnable remotely in the cluster. Currently, only ClusterRunnable is supported.

This method should be idempotent (ie if called N times with the same configuration, only one cluster will be created.)

Parameters:force (bool, defaults to False) – If true, current executions of the same runnable in the cluster will be overriden by a new execution.
run_cmds(self, setup_cmds: List[str])

Run setup commands in all hosts

Parameters:setup_cmds (List[str]) – The list of commands
Raises:errors.RemoteCommandError – If at least one commands is not successful in at least one host.
get_orchestrator(self, ip: str, private_ip: str = None, use_public: bool = True)

Get an orchestrator instance

get_orch_home_path(self)

Return the orchestrator home path

Returns:
Return type:str
get_factory(self, ip: str, private_ip: str = None, use_public: bool = True)

Get an CPU factory instance

get_gpu_factory(self, ip: str, private_ip: str = None, use_public: bool = True)

Get an GPU factory instance

launch_ray_cluster(self)

Create a ray cluster.

The main node is going to be located in the orchestrator machine and all other nodes in the factories.

The main node is executed with –num-cpus=0 flag so that it doesn’t do any work and all work is done by the factories.

check_ray_cluster(self)

Check if ray cluster was build successfully.

Compares the name of workers available with the requested ones.

Returns:Whether the number of workers in the node matches the number of factories
Return type:bool
shutdown_ray_cluster(self)

Shut down the ray cluster.

Shut down the main node running in the orchestrator.

existing_ray_cluster(self)

Return a list of the nodes in the Ray cluster.

Returns:The list of nodes
Return type:List[Instance]
existing_flambe_execution(self)

Return a list of the hosts that are running flambe.

Returns:The list of nodes
Return type:List[Instance]
shutdown_flambe_execution(self)

Shut down any flambe execution in the hosts.

existing_dir(self, _dir: str)

Determine if _dir exists in at least one host

is_ray_cluster_up(self)

Return if the ray cluster is running.

Returns:
Return type:bool
rollback_env(self)

Rollback the enviornment.

When an error occures during the local stage of the remote runnable (i.e. creating the cluster, sending the data and submitting jobs), this method may be used to destroy the cluster that has been built.

parse(self)

Parse the cluster object.

Look for configurations mistakes that don’t allow the remote runnable to run. Each different cluster will have it’s own policies. For example, AWSCluster could check the instance types that are allowed. By default, checks nothing.

Raises:man_errors.ClusterConfigurationError – In case the Runnable is not able to run.
send_local_content(self, content: Dict[str, str], dest: str, all_hosts: bool = False)

Send local content to the cluster

Parameters:
  • content (Dict[str, str]) – The dict of resources key -> local path
  • dest (str) – The orchestator’s destination folder
  • all_hosts (bool) – If False, only send the content to the orchestrator. If True, send to all factories.
Returns:

The new dict of content with orchestrator’s paths.

Return type:

Dict[str, str]

rsync_orch(self, folder)

Rsync the orchestrator’s folder with all factories

Parameters:folder (str) – The folder to rsync. It should be a relative path. $HOME value will be automatically added.
send_secrets(self, whitelist: List[str] = None)

Send the secrets file to the orchestrator.

This file will be located in $HOME/secrets.ini The injected secrets file will be used.

Parameters:whitelist (List[str]) – A list of sections to filter. For example: [“AWS”, “GITHUB”]
execute(self, cluster_runnable, extensions: Dict[str, str], new_secrets: str, force: bool)

Execute a ClusterRunnable in the cluster.

It will first upload the runnable file + extensions to the orchestrator (under $HOME/flambe.yaml) and then it will execute it based on the provided secrets

Parameters:
  • cluster_runnable (ClusterRunnable) – The ClusterRunnable to run in the cluster
  • extensions (Dict[str, str]) – The extensions for the ClusterRunnable
  • new_secrets (str) – The path (relative to the orchestrator) where the secrets are located. IMPORTANT: previous to calling this method, the secrets should have been uploaded to the orchestrator
  • force (bool) – The force parameter provided when running flambe locally
remove_dir(self, _dir: str, content_only: bool = True, all_hosts: bool = True)

Remove a directory in the ClusterError

Parameters:
  • _dir (str) – The directory to remove
  • content_only (bool) – To remove the content only or the folder also. Defaults to True.
  • all_hosts (bool) – To remove it in all hosts or only in the Orchestrator. Defaults to True (in all hosts).
cluster_has_key(self)

Whether the cluster already contains a valid common key.

The key must be in all hosts.

Returns:If the cluster has a key in all hosts.
Return type:bool
distribute_keys(self)

Create a new key pair and distributes it to all hosts.

Ensure that the hosts have a safe communication. The name of the key is the cluster’s name

contains_gpu_factories(self)

Return if the factories contain GPU.

For now, all factories are same machine type, so as soon as a GPU is found, then this method returns.

get_max_resources(self)

Return the max common CPU/GPU devices in the factories

For example, if one factory contains 32 CPU + 1 GPU and the other factory contains 16 CPU + 2 GPU, this method will return {“cpu”: 16, “gpu”: 1} available

Returns:The devices, in {“cpu”: N, “gpu”: M} format
Return type:Dict[str, int]
install_extensions_in_orchestrator(self, extensions: Dict[str, str])

Install local + pypi extensions in the orchestrator

Parameters:

extension (Dict[str, str]) – The extensions, as a dict from module_name to location

Raises:
  • errors.RemoteCommandError – If could not install an extension.
  • man_errors.ClusterError – If the orchestrator was not loaded.
install_extensions_in_factories(self, extensions: Dict[str, str])

Install local + pypi extensions in all the factories.

Parameters:extension (Dict[str, str]) – The extensions, as a dict from module_name to location
Raises:errors.RemoteCommandError – If could not install an extension
get_remote_env(self, user_provider: Callable[[], str])

Get the RemoteEnvironment for this cluster.

The IPs stored will be the private IPs

Returns:The RemoteEnvironment with information about this cluster.
Return type:RemoteEnvironment
class flambe.cluster.AWSCluster(name: str, factories_num: int, factories_type: str, orchestrator_type: str, key_name: str, security_group: str, subnet_id: str, creator: str, key: str, volume_type: str = 'gp2', region_name: Optional[str] = None, username: str = 'ubuntu', tags: Dict[str, str] = None, orchestrator_ami: str = None, factory_ami: str = None, dedicated: bool = False, orchestrator_timeout: int = -1, factories_timeout: int = 1, volume_size: int = 100, setup_cmds: Optional[List[str]] = None)[source]

Bases: flambe.cluster.cluster.Cluster

This Cluster implementation uses AWS EC2 as the cloud provider.

This cluster works with AWS Instances that are defined in: flambe.remote.instance.aws

Parameters:
  • name (str) – The unique name for the cluster
  • factories_num (int) – The amount of factories to use. This is not the amount of workers, as each factories can contain multiple GPUs and therefore, multiple workers.
  • factories_type (str) – The type of instance to use for the Factory Instances. GPU instances are required for AWS the AWSCluster. “p2” and “p3” instances are recommended.
  • factory_ami (str) – The AMI to be used for the Factory instances. Custom Flambe AMI are provided based on Ubuntu 18.04 distribution.
  • orchestrator_type (str) – The type of instance to use for the Orchestrator Instances. This may not be a GPU instances. At least a “t2.small” instance is recommended.
  • key_name (str) – The key name that will be used to connect into the instance.
  • creator (str) – The creator should be a user identifier for the instances. This information will create a tag called ‘creator’ and it will also be used to retrieve existing hosts owned by the user.
  • key (str) – The path to the ssh key used to communicate to all instances. IMPORTANT: all instances must be accessible with the same key.
  • volume_type (str) – The type of volume in AWS to use. Only ‘gp2’ and ‘io1’ are currently available. If ‘io1’ is used, then IOPS will be fixed to 5000. IMPORTANT: ‘io1’ volumes are significantly more expensive than ‘gp2’ volumes. Defaults to ‘gp2’.
  • region_name (Optional[str]) – The region name to use. If not specified, it uses the locally configured region name or ‘us-east-1’ in case it’s not configured.
  • username (str) – The username of the instances the cluster will handle. Defaults to ‘ubuntu’. IMPORTANT: for now all instances need to have the same username.
  • tags (Dict[str, str]) – A dictionary with tags that will be added to all created hosts.
  • security_group (str) – The security group to use to create the instances.
  • subnet_id (str) – The subnet ID to use.
  • orchestrator_ami (str) – The AMI to be used for the Factory instances. Custom Flambe AMI are provided based on Ubuntu 18.04 distribution.
  • dedicated (bool) – Wether all created instances are dedicated instances or shared.
  • orchestrator_timeout (int) – Number of consecutive hours before terminating the orchestrator once the experiment is over (either success of failure). Specify -1 to disable automatic shutdown (the orchestrator will stay on until manually terminated) and 0 to shutdown when the experiment is over. For example, if specifying 24, then the orchestrator will be shut down one day after the experiment is over. ATTENTION: This also applies when the experiment ends with an error. Default is -1.
  • factories_timeout (int) – Number of consecutive hours to automatically terminate factories once the experiment is over (either success or failure). Specify -1 to disable automatic shutdown (the factories will stay on until manually terminated) and 0 to shutdown when the experiment is over. For example, if specifying 10, then the factories will be shut down 10 hours after the experiment is over. ATTENTION: This also applies when the experiment ends with an error. Default is 1.
  • volume_size (int) – The disk size in GB that all hosts will contain. Defaults to 100 GB.
  • setup_cmds (Optional[List[str]]) – A list of commands to be run on all hosts for setup purposes. These commands can be used to mount volumes, install software, etc. Defaults to None. IMPORTANT: the commands need to be idempotent and they shouldn’t expect user input.
_get_boto_session(self, region_name: Optional[str])

Get the boto3 Session from which the resources and clients will be created.

This method is called by the contructor.

Parameters:region_name (Optional[str]) – The region to use. If None, boto3 will resolve to the locally configured region_name or ‘us-east-1’ if not configured.
Returns:The boto3 Session to use
Return type:boto3.Session
load_all_instances(self)

Launch all instances for the experiment.

This method launches both the orchestrator and the factories.

_existing_cluster(self)

Whether there is an existing cluster that matches name.

The cluster should also match all other tags, including Creator)

Returns:Returns the (boto_orchestrator, [boto_factories]) that match the experiment’s name.
Return type:Tuple[Any, List[Any]]
_get_existing_tags(self, boto_instance: boto3.resources.factory.ec2.Instance)

Gets the tags of a EC2 instances

Parameters:boto_instance (BotoIns) – The EC2 instance to access the tags.
Returns:Key, Value for the specified tags.
Return type:Dict[str, str]
flambe_own_running_instances(self)

Get running instances with matching tags.

Yields:Tuple[‘boto3.resources.factory.ec2.Instance’, str] – A tuple with the instance and the name of the EC2 instance.
name_hosts(self)

Name the orchestrator and factories.

_get_all_tags(self)

Get user tags + default tags to add to the instances and volumes.

update_tags(self)

Update user provided tags to all hosts.

In case there is an existing cluster that do not contain all the tags, by executing this all hosts will have the user specified tags.

This won’t remove existing tags in the hosts.

_update_tags(self, boto_instance: boto3.resources.factory.ec2.Instance, tags: Dict[str, str])

Create/Overwrite tags on an EC2 instance and its volumes.

Parameters:
  • boto_instance ('boto3.resources.factory.ec2.Instance') – The EC2 instance
  • tags (Dict[str, str]) – The tags to create/overwrite
name_instance(self, boto_instance: boto3.resources.factory.ec2.Instance, name: str)

Renames a EC2 instance

Parameters:
  • boto_instance ('boto3.resources.factory.ec2.Instance') – The EC2 instance
  • name (str) – The new name
_create_orchestrator(self)

Create a new EC2 instance to be the Orchestrator instance.

This new machine receives all tags defined in the *.ini file.

Returns:The new orchestrator instance.
Return type:instance.AWSOrchestratorInstance
_create_factories(self, number: int = 1)

Creates new AWS EC2 instances to be the Factory instances.

These new machines receive all tags defined in the *.ini file. Factory instances will be named using the factory basename plus an index. For example, “seq2seq_factory_0”, “seq2seq_factory_1”.

Parameters:number (int) – The number of factories to be created.
Returns:The new factory instances.
Return type:List[instance.AWSGPUFactoryInstance]
_generic_launch_instances(self, instance_class: Type[T], number: int, instance_type: str, instance_ami: str, role: str)

Generic method to launch instances in AWS EC2 using boto3.

This method should not be used outside this module.

Parameters:
  • instance_class (Type[T]) – The instance class. It can be AWSOrchestratorInstance or AWSGPUFactoryInstance.
  • number (int) – The amount of instances to create
  • instance_type (str) – The instance type
  • instance_ami (str) – The AMI to be used. Should be an Ubuntu 18.04 based AMI.
  • role (str) – Wether is ‘Orchestrator’ or ‘Factory’
Returns:

The new Instances.

Return type:

List[Union[AWSOrchestratorInstance, AWSGPUFactoryInstance]]

_get_boto_public_host(self, boto_ins: boto3.resource.factory.ec2.Instance)

Return the boto instance IP or DNS that will be used by the local process to reach the current instance.

This method abstracts the way the local process will access the instances in the case it’s not the public IP.

Parameters:boto_ins ('boto3.resources.factory.ec2.Instance') – The boto instance
Returns:The host information.
Return type:str
_get_boto_private_host(self, boto_ins: boto3.resource.factory.ec2.Instance)

Return the boto instance IP or DNS that will be used by the other instances to reach the current instance.

This method abstracts the way the other instances will access the instance in the case it’s not the private IP.

Parameters:boto_ins ('boto3.resources.factory.ec2.Instance') – The boto instance
Returns:The host information.
Return type:str
terminate_instances(self)

Terminates all instances.

rollback_env(self)

Rollback the environment.

This occurs when an error is caucht during the local stage of the remote experiment (i.e. creating the cluster, sending the data and submitting jobs), this method handles cleanup stages.

parse(self)

Checks if the AWSCluster configuration is valid.

This checks that the factories are never terminated after the orchestrator is. Avoids the scenario where the cluster has only factories and no orchestrator, which is useless.

Raises:errors.ClusterConfigurationError – If configuration is not valid.
_get_boto_instance_by_host(self, public_host: str)

Returns the instance id given the public host

This method will use _get_boto_public_host to search for the given host.

Parameters:public_host (str) – The host. Depending on how the host was set, it can be an IP or DNS.
Returns:The id if found else None
Return type:Optional[boto3.resources.factory.ec2.Instance]
_get_instance_id_by_host(self, public_host: str)

Returns the instance id given the public host

Parameters:public_host (str) – The host. Depending on how the host was set, it can be an IP or DNS.
Returns:The id if found else None
Return type:Optional[str]
_get_alarm_name(self, instance_id: str)

Get the alarm name to be used for the given instance.

Parameters:instance_id (str) – The id of the instance
Returns:The name of the corresponding alarm
Return type:str
has_alarm(self, instance_id: str)

Whether the instance has an alarm set.

Parameters:instance_id (str) – The id of the instance
Returns:True if an alarm is set. False otherwise.
Return type:bool
remove_existing_events(self)

Remove the current alarm.

In case the orchestrator or factories had an alarm, we remove it to reset the new policies.

create_cloudwatch_events(self)

Creates cloudwatch events for orchestrator and factories.

_delete_cloudwatch_event(self, instance_id: str)

Deletes the alarm related to the instance.

_put_fake_cloudwatch_data(self, instance_id: str, value: int = 100, points: int = 10)

Put fake CPU Usage metric in an instance.

This method is useful to avoid triggering alarms when they are created. For example, is an instance was idle for 10 hours and an termination alarm is set for 5 hours, it will be triggered immediately. Adding a fake point will allow the alarms to start the timer from the current moment.

Parameters:
  • instance_id (str) – The ID of the EC2 instance
  • value (int) – The CPU percent value to use. Defaults to 100
  • points (int) – The amount of past minutes from the current time to generate metric points. For example, if points is 10, then 10 data metrics will be generated for the past 10 minutes, one per minute.
_create_cloudwatch_event(self, instance_id: str, mins: int = 60, cpu_thresh: float = 0.1)

Create CloudWatch alarm.

The alarm is used to terminate an instance based on CPU usage.

Parameters:
  • instance_id (str) – The ID of the EC2 instance
  • mins (int) – Number of minutes to trigger the termination event. The evaluation preriod will be always one minute.
  • cpu_thresh (float) – Percentage specifying upper bound for triggering event. If mins is 60 and cpu_thresh is 0.1, then this instance will be deleted after 1 hour of average CPU below 0.1.
_get_images(self)

Get the official AWS public AMIs created by Flambe.

ATTENTION: why not just search the tags? We need to make sure the AMIs we pick were created by the Flambe team. Because of tags values not being unique, anyone can create a public AMI with ‘Creator: flambe@asapp.com’ as a tag. If we pick that AMI, then we could potentially be Creating instances with unknown AMIs, causing potential security issues. By filtering by our acount id (which can be public), then we can make sure that all AMIs that are being scanned were created by Flambe team.

Returns:The boto3 API response
Return type:Dict
_get_ami(self, _type: str, version: str)

Given a type and a version, get the correct Flambe AMI.

IMPORTANT: we keep the version logic in case we add versioned AMIs in the future.

Parameters:
  • _type (str) – It can be either ‘factory’ or ‘orchestrator’. Note that the type is lowercase in the AMI tag.
  • version (str) – For example, “0.2.1” or “2.0”.
Returns:

Return type:

The ImageId if it’s found. None if not.

_find_default_ami(self, _type: str)

Returns an AMI with version 0.0.0, which is the default. This means that doesn’t contain flambe itself but it has some heavy dependencies already installed (like pytorch).

Parameters:_type (str) – Wether is “orchestrator” or “factory”
Returns:The ImageId or None if not found.
Return type:Optional[str]
_get_creation_name(self, role: str)

Get an initial name the instance will receive at creation time.

This name can be updated later using the ‘name_hosts’ method or the ‘name_instance’ method.

Parameters:role (str) – ‘Orchestrator’ or ‘Factory’
Returns:The initial instance name.
Return type:str
class flambe.cluster.SSHCluster(name: str, orchestrator_ip: Union[str, List[str]], factories_ips: Union[List[str], List[List[str]]], key: str, username: str, remote_context=None, use_public: bool = True, setup_cmds: Optional[List[str]] = None)[source]

Bases: flambe.cluster.cluster.Cluster

The SSH Manager needs to be used when having running instances.

For example when having on-prem hardware or just a couple of AWS EC2 instances running.

When using this cluster, the user needs to specify the IPs of the machines to use, both the public one and private one.

load_all_instances(self, exp_name: str = None, force: bool = False)

This manager assumed that instances are running.

This method loads the Python objects to the manager’s variables.

Parameters:
  • exp_name (str) – The name of the experiment
  • force (bool) – Whether to override the current experiment of the same name
rollback_env(self)
rsync_hosts(self)

Rsyncs the host’s result folders.

First, it rsyncs all worker folders to the orchestrator main folder. After that, so that every worker gets the last changes, the orchestrator rsync with all of them.