# from __future__ import annotations
import os
import re
import logging
from copy import deepcopy
from typing import Dict, Optional, Any, Union, Sequence
from collections import OrderedDict
import shutil

from tqdm import tqdm
import ray
from ray.tune.suggest import SearchAlgorithm
from ray.tune.schedulers import TrialScheduler
from ray.tune.logger import DEFAULT_LOGGERS, TFLogger

from flambe.compile import Schema, Component
from flambe.compile.utils import _is_url
from flambe.runnable import ClusterRunnable
from flambe.cluster import errors as man_errors
from flambe.cluster import const
from flambe.cluster import Cluster
from flambe.experiment import utils, wording
from flambe.runnable import RemoteEnvironment
from flambe.runnable import error
from flambe.runnable import utils as run_utils
from flambe.experiment.progress import ProgressState
from flambe.experiment.tune_adapter import TuneAdapter
from flambe.logging import coloredlogs as cl

[docs]logger = logging.getLogger(__name__)
[docs]OptionalSearchAlgorithms = Optional[Dict[str, Union[SearchAlgorithm, Schema]]]
[docs]OptionalTrialSchedulers = Optional[Dict[str, Union[TrialScheduler, Schema]]]
[docs]class Experiment(ClusterRunnable): """A Experiment object. The Experiment object is the top level module in the Flambé workflow. The object is responsible for starting workers, assiging the orchestrator machine, as well as converting the input blocks into Ray Tune Experiment objects. Parameters ---------- name: str A name for the experiment pipeline: OrderedDict[str, Schema[Component]] Ordered mapping from block id to a schema of the block force: bool When running a local experiment this flag will make flambe override existing results from previous experiments. When running remote experiments this flag will reuse an existing cluster (in case of any) that is running an experiment with the same name in the same cloud service. The use of this flag is discouraged as you may lose useful data. resume: Union[str, List[str]] If a string is given, resume all blocks up until the given block_id. If a list is given, resume all blocks in that list. debug: bool If debug is True, then a debugger will be available at the beginning of each Component block of the pipeline. Defaults to False. ATTENTION: debug only works when running locally. save_path: Optional[str] A directory where to save the experiment. devices: Dict[str, int] Tune's resources per trial. For example: {"cpu": 12, "gpu": 2}. resources: Optional[Dict[str, Dict[str, Any]]] Variables to use in the pipeline section with !@ notation. This section is splitted into 2 sections: local and remote. search : Mapping[str, SearchAlgorithm], optional Map from block id to hyperparameter search space generator. May have Schemas of SearchAlgorithm as well. schedulers : Mapping[str, TrialScheduler], optional Map from block id to search scheduler. May have Schemas of TrialScheduler as well. reduce: Mapping[str, int], optional Map from block to number of trials to reduce to. env: RemoteEnvironment Contains remote information about the cluster. This object will be received in case this Experiment is running remotely. max_failures: int number of times to retry running the pipeline if it hits some type of failure, defaults to retrying twice merge_plot: bool Display all tensorboard logs in the same plot (per block type). Defaults to True. """ def __init__(self, name: str, pipeline: Dict[str, Schema], resume: Optional[Union[str, Sequence[str]]] = None, debug: bool = False, devices: Dict[str, int] = None, save_path: Optional[str] = None, resources: Optional[Dict[str, Dict[str, Any]]] = None, search: OptionalSearchAlgorithms = None, schedulers: OptionalTrialSchedulers = None, reduce: Optional[Dict[str, int]] = None, env: RemoteEnvironment = None, max_failures: int = 2, merge_plot: bool = True) -> None: super().__init__(env) = name self.original_save_path = save_path if save_path is None or len(save_path) == 0: save_path = os.path.join(os.getcwd(), "flambe-output") else: save_path = os.path.abspath(os.path.expanduser(save_path)) # Prepending 'output' to the name in the output folder # is a basic security mechanism to avoid removing user # folders when using --force (if for example, save_path # is '/$HOME/Desktop' and name is "nlp", and user has folder # '$HOME/Desktop/nlp', then there is a risk of accidentally # removing it when using --force) self.output_folder_name = f"output__{name}" self.full_save_path = os.path.join( save_path, self.output_folder_name ) self.resume = resume self.debug = debug self.devices = devices self.pipeline = pipeline # Compile search algorithms if needed = search or dict() for stage_name, search_alg in if isinstance(search_alg, Schema):[stage_name] = search_alg() # Compile schedulers if needed self.schedulers = schedulers or dict() for stage_name, scheduler in self.schedulers.items(): if isinstance(scheduler, Schema): self.schedulers[stage_name] = scheduler() self.reduce = reduce or dict() self.resources = resources or dict() self.max_failures = max_failures self.merge_plot = merge_plot if pipeline is None or not isinstance(pipeline, (Dict, OrderedDict)): raise TypeError("Pipeline argument is not of type Dict[str, Schema]. " f"Got {type(pipeline).__name__} instead") self.pipeline = pipeline
[docs] def run(self, force: bool = False, verbose: bool = False, **kwargs): """Run an Experiment""""Launching local experiment")) # Check if save_path/name already exists + is not empty # + force and resume are False if ( os.path.exists(self.full_save_path) and os.listdir(self.full_save_path) and not self.resume and not force ): raise error.ParsingRunnableError( f"Results from an experiment with the same name were located in the save path " + f"{self.full_save_path}. To overide this results, please use '--force' " + "To use these results and resume the experiment, pick 'resume: True' " + "If not, just pick another save_path/name." ) full_save_path = self.full_save_path if not self.env: wording.print_useful_local_info(full_save_path) # If running remotely then all folders were already created. # in the 'setup' method. if not self.env: if os.path.exists(full_save_path) and force: shutil.rmtree(full_save_path) # This deleted the folder also cl.RE(f"Removed previous existing from {full_save_path} " + "results as --force was specified")) if not os.path.exists(full_save_path): os.makedirs(full_save_path) logger.debug(f"{full_save_path} created to store output") local_vars = self.resources.get('local', {}) or {} local_vars = utils.rel_to_abs_paths(local_vars) remote_vars = self.resources.get('remote', {}) or {} global_vars = dict(local_vars, **remote_vars) # Check that links are in order (i.e topologically in pipeline) utils.check_links(self.pipeline, global_vars) # Check that only computable blocks are given # search algorithms and schedulers utils.check_search(self.pipeline,, self.schedulers) # Initialize ray cluster kwargs = {"logging_level": logging.ERROR, "include_webui": False} if self.debug: cl.BL("Debugger activated")) cl.YE("Pipeline will begin executing all variants and all " + "computables serially. " + "Press 's' to step into the " + "run method of the Component once the ipdb console " + "shows up")) kwargs['local_mode'] = True if self.env: ray.init(redis_address=f"{self.env.orchestrator_ip}:{const.RAY_REDIS_PORT}", **kwargs) else: ray.init(**kwargs) logger.debug(f"Ray cluster up") # Initialize map from block to list of checkpoints # This is used whe resolving links over other computable blocks # TODO: in python 3.7 we can replace these with dict() or {} checkpoints: OrderedDict = OrderedDict() schemas: OrderedDict = OrderedDict() success: OrderedDict = OrderedDict() # By default use all CPUs if no GPU is present devices = self.devices if self.devices else None if devices is None and utils.local_has_gpu(): devices = {"cpu": 4, "gpu": 1} to_resume = None if isinstance(self.resume, str): index = list(self.pipeline.keys()).index(self.resume) to_resume = list(self.pipeline.keys())[:index + 1] elif isinstance(self.resume, Sequence): to_resume = list(self.resume) # Make experiment_tag easier to extract def trial_name_creator(trial): identifier = "" if "env" in trial.config: env = trial.config["env"] if isinstance(env, type): env = env.__name__ identifier += f"{env}" if trial.experiment_tag: hyper_params = {} if "_" in trial.experiment_tag: num, tunable_params = trial.experiment_tag.split("_", 1) identifier += tunable_params param_list = [p.split("=") for p in tunable_params.split(",")] hyper_params = {p[0]: p[1] for p in param_list} else: identifier += trial.experiment_tag trial.config['hyper_params'] = hyper_params return identifier.replace("/", "_") trial_name_creator = ray.tune.function(trial_name_creator) # Compute depedencies DAG dependency_dag = {} schemas_dag: OrderedDict = OrderedDict() for block_id, schema_block in self.pipeline.items(): schemas_dag[block_id] = schema_block relevant_ids = utils.extract_needed_blocks(schemas_dag, block_id, global_vars) dependencies = deepcopy(relevant_ids) dependencies.discard(block_id) dependency_dag[block_id] = list(dependencies) if self.env: self.progress_state = ProgressState(, full_save_path, dependency_dag, len(self.env.factories_ips)) else: self.progress_state = ProgressState(, full_save_path, dependency_dag) for block_id, schema_block in tqdm(self.pipeline.items()): schema_block.add_extensions_metadata(self.extensions) logger.debug(f"Starting {block_id}") # Add the block to the configuration so far schemas[block_id] = schema_block success[block_id] = True self.progress_state.checkpoint_start(block_id) relevant_ids = utils.extract_needed_blocks(schemas, block_id, global_vars) relevant_schemas = {k: v for k, v in deepcopy(schemas).items() if k in relevant_ids} # Set resume resume = False if to_resume is None else (block_id in to_resume) # If computable, convert to tune.Trainable # Each Component block is an Experiment in ray.tune if not isinstance(schema_block, Schema): raise ValueError('schema block not of correct type Schema') if issubclass(schema_block.component_subclass, Component): # Returns is a list non-nested configuration divided_schemas = list(utils.divide_nested_grid_search_options(relevant_schemas)) divided_dict = [utils.extract_dict(x) for x in divided_schemas] # Convert options and links divided_dict_tune = [utils.convert_tune(x) for x in divided_dict] # Execute block tune_experiments = [] for param_dict, schemas_dict in zip(divided_dict_tune, divided_schemas): config = {'name': block_id, 'merge_plot': self.merge_plot, 'params': param_dict, 'schemas': Schema.serialize(schemas_dict), 'checkpoints': checkpoints, 'to_run': block_id, 'global_vars': global_vars, 'verbose': verbose, 'custom_modules': list(self.extensions.keys()), 'debug': self.debug} # Filter out the tensorboard logger as we handle # general and tensorboard-specific logging ourselves tune_loggers = list(filter(lambda l: not issubclass(l, TFLogger), DEFAULT_LOGGERS)) tune_experiment = ray.tune.Experiment(name=block_id, run=TuneAdapter, trial_name_creator=trial_name_creator, config=deepcopy(config), local_dir=full_save_path, checkpoint_freq=1, checkpoint_at_end=True, max_failures=self.max_failures, resources_per_trial=devices, loggers=tune_loggers) logger.debug(f"Created tune.Experiment for {param_dict}") tune_experiments.append(tune_experiment) trials = ray.tune.run_experiments(tune_experiments,, None), scheduler=self.schedulers.get(block_id, None), queue_trials=True, verbose=False, resume=resume, raise_on_failed_trial=False) logger.debug(f"Finish running all tune.Experiments for {block_id}") for t in trials: if t.status == t.ERROR: logger.error(f"{t} ended with ERROR status.") success[block_id] = False # Save checkpoint location # It should point from: # block_id -> hash(variant) -> checkpoint hashes = [] for t in trials: schema_with_params: Dict = OrderedDict() for b in schemas_dict: schema_copy = deepcopy(schemas_dict[b]) utils.update_schema_with_params(schema_copy, t.config['params'][b]) schema_with_params[b] = schema_copy hashes.append(repr(schema_with_params)) paths = [t._checkpoint.value for t in trials] # Mask out error trials mask = [True] * len(trials) for i, trial in enumerate(trials): if trial.status == ray.tune.trial.Trial.ERROR: mask[i] = False # Mask out on reduce reduce_k = self.reduce.get(block_id, None) if reduce_k is not None and int(reduce_k) > 0: # Get best best_trials = utils.get_best_trials(trials, topk=int(reduce_k)) best_trial_ids = set([t.trial_id for t in best_trials]) # Mask out for i, trial in enumerate(trials): if trial.trial_id not in best_trial_ids: mask[i] = False trial_checkpoints = {t_hash: path for t_hash, path in zip(hashes, paths)} trial_mask = {t_hash: mask_value for t_hash, mask_value in zip(hashes, mask)} checkpoints[block_id] = {'paths': trial_checkpoints, 'mask': trial_mask} # Rsync workers to main machine and back to all workers # TODO specify callbacks. If not remote will not work if self.env: run_utils.rsync_hosts(self.env.orchestrator_ip, self.env.factories_ips, self.env.user, self.full_save_path, self.env.key, exclude=["state.pkl"]) self.progress_state.checkpoint_end(block_id, checkpoints, success[block_id]) logger.debug(f"Done running {block_id}") # Close ray experiment ray.shutdown() logger.debug("Shutted down ray cluster") self.progress_state.finish() if all(success.values()):"Experiment ended successfully")) else: raise error.UnsuccessfulRunnableError( "Not all trials were successful. Check the logs for more information"
[docs] def setup(self, cluster: Cluster, extensions: Dict[str, str], force: bool, **kwargs) -> None: """Prepare the cluster for the Experiment remote execution. This involves: 1) [Optional] Kill previous flambe execution 2) [Optional] Remove existing results 3) Create supporting dirs (exp/synced_results, exp/resources) 4) Install extensions in all factories 5) Launch ray cluster 6) Send resources 7) Launch Tensorboard + Report site Parameters ---------- cluster: Cluster The cluster where this Runnable will be running extensions: Dict[str, str] The ClusterRunnable extensions force: bool The force value provided to Flambe """ if self.debug: raise error.ParsingRunnableError( f"Remote experiments don't support debug mode. " + "Remove 'debug: True' for running this experiment in a cluster." ) if cluster.existing_flambe_execution() or cluster.existing_ray_cluster(): if not force: raise man_errors.ClusterError("This cluster is currently used by other " + "experiment. Use --force flag to reuse it. Aborting.") else: cluster.shutdown_flambe_execution() cluster.shutdown_ray_cluster()"Forced resource to become available...")) output_dir_remote = f"{}/{self.output_folder_name}" if cluster.existing_dir(output_dir_remote): logger.debug("This cluster already ran an experiment " + "with the same name.") if self.resume:"Resuming previous experiment...")) elif force: cluster.remove_dir(output_dir_remote, content_only=True, all_hosts=True) else: raise man_errors.ClusterError( "This cluster already has results for the same experiment name. " + "If you wish to reuse them, use resume: True or if you want to override them " + "use --force. Aborting." ) cluster.install_extensions_in_factories(extensions)"Extensions installed in all factories")) # Add redundant check for typing if not cluster.orchestrator: raise man_errors.ClusterError("The orchestrator needs to exist at this point") cluster.create_dirs([, f"{}/resources", f"{}/{self.output_folder_name}"])"Created supporting directories")) cluster.launch_ray_cluster() if not cluster.check_ray_cluster(): raise man_errors.ClusterError("Ray cluster not launched correctly.") local_resources = self.resources.get("local") new_resources = {"remote": self.resources.get("remote", dict())} if local_resources: new_resources['local'] = cluster.send_local_content( local_resources, os.path.join(cluster.orchestrator.get_home_path(),, "resources") ) else: new_resources['local'] = dict() if cluster.orchestrator.is_tensorboard_running(): if force: cluster.orchestrator.remove_tensorboard() else: raise man_errors.ClusterError("Tensorboard was running on the orchestrator.") cluster.orchestrator.launch_tensorboard(output_dir_remote, const.TENSORBOARD_PORT) if cluster.orchestrator.is_report_site_running(): if force: cluster.orchestrator.remove_report_site() else: raise man_errors.ClusterError("Report site was running on the orchestrator") cluster.orchestrator.launch_report_site( f"{output_dir_remote}/state.pkl", port=const.REPORT_SITE_PORT, output_log=f"output.log", output_dir=output_dir_remote, tensorboard_port=const.TENSORBOARD_PORT ) self.set_serializable_attr("resources", new_resources) self.set_serializable_attr("devices", cluster.get_max_resources()) self.set_serializable_attr( "save_path", f"{cluster.orchestrator.get_home_path()}/{}")
[docs] def parse(self) -> None: """Parse the experiment. Parse the Experiment in search of errors that won't allow the experiment to run successfully. If it finds any error, then it raises an ParsingExperimentError. Raises ------ ParsingExperimentError In case a parsing error is found. """ # Check if name is None: if is None or len( == 0: raise error.ParsingRunnableError( "Experiment should declare a name and it must not be empty" ) # Check if name is valid else: if re.match('^([a-zA-Z0-9]+[_-]*)+$', is None: raise error.ParsingRunnableError( "Experiment name should contain only alphanumeric characters " + "(with optional - or _ in between)" ) # Check if resources contains only local and remote if self.resources: if len(list(filter(lambda x: x not in ['local', 'remote'], self.resources.keys()))) > 0: raise error.ParsingRunnableError( f"'resources' section must contain only 'local' section and/or 'remote' keys" ) # Check if local resources exists: if self.resources and self.resources.get("local"): for v in self.resources["local"].values(): if not _is_url(v) and not os.path.exists(os.path.expanduser(v)): raise error.ParsingRunnableError( f"Local resource '{v}' does not exist."