Source code for flambe.experiment.experiment

# from __future__ import annotations
import os
import re
import logging
from copy import deepcopy
from typing import Dict, Optional, Union, Sequence, cast, Callable
from collections import OrderedDict
import shutil
import tempfile

from tqdm import tqdm
from io import StringIO
import ray
from ray.tune.suggest import SearchAlgorithm
from ray.tune.schedulers import TrialScheduler
from ray.tune.logger import DEFAULT_LOGGERS, TFLogger, tf2_compat_logger

from flambe.compile import Schema, Component, yaml
from flambe.runnable import ClusterRunnable
from flambe.compile.downloader import download_manager
from flambe.cluster import errors as man_errors
from flambe.cluster import const
from flambe.cluster import Cluster
from flambe.experiment import utils, wording
from flambe.experiment.options import ClusterResource
from flambe.runnable import RemoteEnvironment
from flambe.runnable import error
from flambe.runnable import utils as run_utils
from flambe.runner.utils import get_files
from flambe.experiment.progress import ProgressState
from flambe.experiment.tune_adapter import TuneAdapter
from flambe.logging import coloredlogs as cl
from flambe.experiment.utils import get_default_devices

[docs]logger = logging.getLogger(__name__)
[docs]OptionalSearchAlgorithms = Optional[Dict[str, Union[SearchAlgorithm, Schema]]]
[docs]OptionalTrialSchedulers = Optional[Dict[str, Union[TrialScheduler, Schema]]]
[docs]class Experiment(ClusterRunnable): """A Experiment object. The Experiment object is the top level module in the Flambé workflow. The object is responsible for starting workers, assiging the orchestrator machine, as well as converting the input blocks into Ray Tune Experiment objects. Parameters ---------- name: str A name for the experiment pipeline: OrderedDict[str, Schema[Component]] Ordered mapping from block id to a schema of the block force: bool When running a local experiment this flag will make flambe override existing results from previous experiments. When running remote experiments this flag will reuse an existing cluster (in case of any) that is running an experiment with the same name in the same cloud service. The use of this flag is discouraged as you may lose useful data. resume: Union[str, List[str]] If a string is given, resume all blocks up until the given block_id. If a list is given, resume all blocks in that list. save_path: Optional[str] A directory where to save the experiment. devices: Dict[str, int] Tune's resources per trial. For example: {"cpu": 12, "gpu": 2}. resources: Optional[Dict[str, Dict[str, Any]]] Variables to use in the pipeline section with !@ notation. This section is splitted into 2 sections: local and remote. search : Mapping[str, SearchAlgorithm], optional Map from block id to hyperparameter search space generator. May have Schemas of SearchAlgorithm as well. schedulers : Mapping[str, TrialScheduler], optional Map from block id to search scheduler. May have Schemas of TrialScheduler as well. reduce: Mapping[str, int], optional Map from block to number of trials to reduce to. env: RemoteEnvironment Contains remote information about the cluster. This object will be received in case this Experiment is running remotely. max_failures: int Number of times to retry running the pipeline if it hits some type of failure, defaults to one. merge_plot: bool Display all tensorboard logs in the same plot (per block type). Defaults to True. user_provider: Callable[[], str] The logic for specifying the user triggering this Runnable. If not passed, by default it will pick the computer's user. """ def __init__(self, name: str, pipeline: Dict[str, Schema], resume: Optional[Union[str, Sequence[str]]] = None, devices: Dict[str, int] = None, save_path: Optional[str] = None, resources: Optional[Dict[str, Union[str, ClusterResource]]] = None, search: OptionalSearchAlgorithms = None, schedulers: OptionalTrialSchedulers = None, reduce: Optional[Dict[str, int]] = None, env: RemoteEnvironment = None, max_failures: int = 1, stop_on_failure: bool = True, merge_plot: bool = True, user_provider: Callable[[], str] = None) -> None: super().__init__(env=env, user_provider=user_provider) self.name = name self.original_save_path = save_path if save_path is None or len(save_path) == 0: save_path = os.path.join(os.getcwd(), "flambe-output") else: save_path = os.path.abspath(os.path.expanduser(save_path)) # Prepending 'output' to the name in the output folder # is a basic security mechanism to avoid removing user # folders when using --force (if for example, save_path # is '/$HOME/Desktop' and name is "nlp", and user has folder # '$HOME/Desktop/nlp', then there is a risk of accidentally # removing it when using --force) self.output_folder_name = f"output__{name}" self.full_save_path = os.path.join( save_path, self.output_folder_name ) self.resume = resume self.devices = devices self.resources = resources or dict() self.pipeline = pipeline # Compile search algorithms if needed self.search = search or dict() for stage_name, search_alg in self.search.items(): if isinstance(search_alg, Schema): self.search[stage_name] = search_alg() # Compile schedulers if needed self.schedulers = schedulers or dict() for stage_name, scheduler in self.schedulers.items(): if isinstance(scheduler, Schema): self.schedulers[stage_name] = scheduler() self.reduce = reduce or dict() self.max_failures = max_failures self.stop_on_failure = stop_on_failure self.merge_plot = merge_plot if pipeline is None or not isinstance(pipeline, (Dict, OrderedDict)): raise TypeError("Pipeline argument is not of type Dict[str, Schema]. " f"Got {type(pipeline).__name__} instead") self.pipeline = pipeline
[docs] def process_resources( self, resources: Dict[str, Union[str, ClusterResource]], folder: str ) -> Dict[str, Union[str, ClusterResource]]: """Download resources that are not tagged with '!cluster' into a given directory. Parameters ---------- resources: Dict[str, Union[str, ClusterResource]] The resources dict folder: str The directory where the remote resources will be downloaded. Returns ------- Dict[str, Union[str, ClusterResource]] The resources dict where the remote urls that don't contain '!cluster' point now to the local path where the resource was downloaded. """ # Keep the resources temporary dict for later cleanup ret = {} for k, v in resources.items(): if not isinstance(v, ClusterResource): with download_manager(v, os.path.join(folder, k)) as path: ret[k] = path else: ret[k] = v return ret
[docs] def run(self, force: bool = False, verbose: bool = False, debug: bool = False, **kwargs): """Run an Experiment""" logger.info(cl.BL("Launching local experiment")) # Check if save_path/name already exists + is not empty # + force and resume are False if not self.resume and not force and os.path.exists(self.full_save_path) \ and list(get_files(self.full_save_path)): raise error.ParsingRunnableError( f"Results from an experiment with the same name were located in the save path " + f"{self.full_save_path}. To overide this results, please use '--force' " + "To use these results and resume the experiment, pick 'resume: True' " + "If not, just pick another save_path/name." ) full_save_path = self.full_save_path if not self.env: wording.print_useful_local_info(full_save_path) # If running remotely then all folders were already created. # in the 'setup' method. if not self.env: if os.path.exists(full_save_path) and force: shutil.rmtree(full_save_path) # This deleted the folder also logger.info( cl.RE(f"Removed previous existing from {full_save_path} " + "results as --force was specified")) if not os.path.exists(full_save_path): os.makedirs(full_save_path) logger.debug(f"{full_save_path} created to store output") self._dump_experiment_file() if any(map(lambda x: isinstance(x, ClusterResource), self.resources.values())): raise ValueError( f"Local experiments doesn't support resources with '!cluster' tags. " + "The '!cluster' tag is used for those resources that need to be handled " + "in the cluster when running remote experiments.") if not self.env: self.tmp_resources_dir = tempfile.TemporaryDirectory() resources_folder = self.tmp_resources_dir.name else: resources_folder = f"{self.full_save_path}/_resources" resources = self.process_resources(self.resources, resources_folder) # rsync downloaded resources if self.env: run_utils.rsync_hosts(self.env.orchestrator_ip, self.env.factories_ips, self.env.user, self.full_save_path, self.env.key, exclude=["state.pkl"]) # Check that links are in order (i.e topologically in pipeline) utils.check_links(self.pipeline, resources) # Check that only computable blocks are given # search algorithms and schedulers utils.check_search(self.pipeline, self.search, self.schedulers) # Initialize ray cluster kwargs = {"logging_level": logging.ERROR, "include_webui": False} if debug: kwargs['local_mode'] = True if self.env: ray.init(redis_address=f"{self.env.orchestrator_ip}:{const.RAY_REDIS_PORT}", **kwargs) else: ray.init(**kwargs) logger.debug(f"Ray cluster up") # Initialize map from block to list of checkpoints # This is used whe resolving links over other computable blocks # TODO: in python 3.7 we can replace these with dict() or {} checkpoints: OrderedDict = OrderedDict() schemas: OrderedDict = OrderedDict() success: OrderedDict = OrderedDict() # By default use all CPUs if no GPU is present devices = self.devices if self.devices else None if devices is None: devices = get_default_devices(debug=debug) to_resume = None if isinstance(self.resume, str): index = list(self.pipeline.keys()).index(self.resume) to_resume = list(self.pipeline.keys())[:index + 1] elif isinstance(self.resume, Sequence): to_resume = list(self.resume) # Make experiment_tag easier to extract def trial_name_creator(trial): identifier = "" if "env" in trial.config: env = trial.config["env"] if isinstance(env, type): env = env.__name__ identifier += f"{env}" if trial.experiment_tag: hyper_params = {} if "_" in trial.experiment_tag: num, tunable_params = trial.experiment_tag.split("_", 1) identifier += tunable_params param_list = [p.split("=") for p in tunable_params.split(",")] hyper_params = {p[0]: p[1] for p in param_list} else: identifier += trial.experiment_tag trial.config['hyper_params'] = hyper_params return identifier.replace("/", "_") trial_name_creator = ray.tune.function(trial_name_creator) # Compute depedencies DAG dependency_dag = {} schemas_dag: OrderedDict = OrderedDict() for block_id, schema_block in self.pipeline.items(): schemas_dag[block_id] = schema_block relevant_ids = utils.extract_needed_blocks(schemas_dag, block_id, resources) dependencies = deepcopy(relevant_ids) dependencies.discard(block_id) dependency_dag[block_id] = list(dependencies) if self.env: self.progress_state = ProgressState( self.name, full_save_path, dependency_dag, self.content, len(self.env.factories_ips)) else: self.progress_state = ProgressState(self.name, full_save_path, dependency_dag, self.content) for block_id, schema_block in tqdm(self.pipeline.items()): schema_block.add_extensions_metadata(self.extensions) logger.debug(f"Starting {block_id}") # Add the block to the configuration so far schemas[block_id] = schema_block success[block_id] = True self.progress_state.checkpoint_start(block_id) relevant_ids = utils.extract_needed_blocks(schemas, block_id, resources) relevant_schemas = {k: v for k, v in deepcopy(schemas).items() if k in relevant_ids} # Set resume resume = False if to_resume is None else (block_id in to_resume) # If computable, convert to tune.Trainable # Each Component block is an Experiment in ray.tune if not isinstance(schema_block, Schema): raise ValueError('schema block not of correct type Schema') if issubclass(schema_block.component_subclass, Component): # Returns is a list non-nested configuration divided_schemas = list(utils.divide_nested_grid_search_options(relevant_schemas)) divided_dict = [utils.extract_dict(x) for x in divided_schemas] # Convert options and links divided_dict_tune = [utils.convert_tune(x) for x in divided_dict] # Execute block tune_experiments = [] for param_dict, schemas_dict in zip(divided_dict_tune, divided_schemas): config = {'name': block_id, 'merge_plot': self.merge_plot, 'params': param_dict, 'schemas': Schema.serialize(schemas_dict), 'checkpoints': checkpoints, 'to_run': block_id, 'global_vars': resources, 'verbose': verbose, 'custom_modules': list(self.extensions.keys()), 'debug': debug} # Filter out the tensorboard logger as we handle # general and tensorboard-specific logging ourselves tune_loggers = list(filter(lambda l: l != tf2_compat_logger and # noqa: E741 not issubclass(l, TFLogger), DEFAULT_LOGGERS)) tune_experiment = ray.tune.Experiment(name=block_id, run=TuneAdapter, trial_name_creator=trial_name_creator, config=deepcopy(config), local_dir=full_save_path, checkpoint_freq=1, checkpoint_at_end=True, max_failures=self.max_failures, resources_per_trial=devices, loggers=tune_loggers) logger.debug(f"Created tune.Experiment for {param_dict}") tune_experiments.append(tune_experiment) trials = ray.tune.run_experiments(tune_experiments, search_alg=self.search.get(block_id, None), scheduler=self.schedulers.get(block_id, None), queue_trials=True, verbose=False, resume=resume, raise_on_failed_trial=False) logger.debug(f"Finish running all tune.Experiments for {block_id}") any_error = False for t in trials: if t.status == t.ERROR: logger.error(cl.RE(f"Variant {t} of '{block_id}' ended with ERROR status.")) success[block_id] = False any_error = True if any_error and self.stop_on_failure: self.teardown() self.progress_state.checkpoint_end(block_id, success[block_id]) raise error.UnsuccessfulRunnableError( f"Stopping experiment at block '{block_id}' " "because there was an error and stop_on_failure == True." ) # Save checkpoint location # It should point from: # block_id -> hash(variant) -> checkpoint hashes = [] for t in trials: schema_with_params: Dict = OrderedDict() for b in schemas_dict: schema_copy = deepcopy(schemas_dict[b]) utils.update_schema_with_params(schema_copy, t.config['params'][b]) schema_with_params[b] = schema_copy hashes.append(repr(schema_with_params)) paths = [t._checkpoint.value for t in trials] # Mask out error trials mask = [True] * len(trials) for i, trial in enumerate(trials): if trial.status == ray.tune.trial.Trial.ERROR: mask[i] = False # Mask out on reduce reduce_k = self.reduce.get(block_id, None) if reduce_k is not None and int(reduce_k) > 0: # Get best best_trials = utils.get_best_trials(trials, topk=int(reduce_k)) best_trial_ids = set([t.trial_id for t in best_trials]) # Mask out for i, trial in enumerate(trials): if trial.trial_id not in best_trial_ids: mask[i] = False trial_checkpoints = {t_hash: path for t_hash, path in zip(hashes, paths)} trial_mask = {t_hash: mask_value for t_hash, mask_value in zip(hashes, mask)} checkpoints[block_id] = {'paths': trial_checkpoints, 'mask': trial_mask} # Rsync workers to main machine and back to all workers # TODO specify callbacks. If not remote will not work if self.env: run_utils.rsync_hosts(self.env.orchestrator_ip, self.env.factories_ips, self.env.user, self.full_save_path, self.env.key, exclude=["state.pkl"]) self.progress_state.checkpoint_end(block_id, success[block_id]) logger.debug(f"Done running {block_id}") self.teardown() if all(success.values()): logger.info(cl.GR("Experiment ended successfully")) else: raise error.UnsuccessfulRunnableError( "Not all trials were successful. Check the logs for more information"
)
[docs] def teardown(self): # Disconnect process from ray cluster ray.shutdown() # Shutdown ray cluster. if self.env: ret = utils.shutdown_ray_node() logger.debug(f"Node shutdown {'successful' if ret == 0 else 'failed'} in Orchestrator") for f in self.env.factories_ips: ret = utils.shutdown_remote_ray_node(f, "ubuntu", self.env.key) logger.debug(f"Node shutdown {'successful' if ret == 0 else 'failed'} in {f}") self.progress_state.finish() if hasattr(self, 'tmp_resources_dir'): self.tmp_resources_dir.cleanup()
[docs] def setup(self, cluster: Cluster, extensions: Dict[str, str], force: bool, **kwargs) -> None: """Prepare the cluster for the Experiment remote execution. This involves: 1) [Optional] Kill previous flambe execution 2) [Optional] Remove existing results 3) Create supporting dirs (exp/synced_results, exp/resources) 4) Install extensions in all factories 5) Launch ray cluster 6) Send resources 7) Launch Tensorboard + Report site Parameters ---------- cluster: Cluster The cluster where this Runnable will be running extensions: Dict[str, str] The ClusterRunnable extensions force: bool The force value provided to Flambe """ if cluster.existing_flambe_execution() or cluster.existing_ray_cluster(): if not force: raise man_errors.ClusterError("This cluster is currently used by other " + "experiment. Use --force flag to reuse it. Aborting.") else: cluster.shutdown_flambe_execution() cluster.shutdown_ray_cluster() logger.info(cl.YE("Forced resource to become available...")) output_dir_remote = f"{self.name}/{self.output_folder_name}" if cluster.existing_dir(output_dir_remote): logger.debug("This cluster already ran an experiment " + "with the same name.") if self.resume: logger.info(cl.YE("Resuming previous experiment...")) elif force: cluster.remove_dir(output_dir_remote, content_only=True, all_hosts=True) else: raise man_errors.ClusterError( "This cluster already has results for the same experiment name. " + "If you wish to reuse them, use resume: True or if you want to override them " + "use --force. Aborting." ) cluster.install_extensions_in_factories(extensions) logger.info(cl.YE("Extensions installed in all factories")) # Add redundant check for typing if not cluster.orchestrator: raise man_errors.ClusterError("The orchestrator needs to exist at this point") cluster.create_dirs([self.name, f"{self.name}/{self.output_folder_name}", f"{self.name}/{self.output_folder_name}/_resources"]) logger.info(cl.YE("Created supporting directories")) cluster.launch_ray_cluster() if not cluster.check_ray_cluster(): raise man_errors.ClusterError("Ray cluster not launched correctly.") local_resources = {k: v for k, v in self.resources.items() if not isinstance(v, ClusterResource)} tmp_resources_dir = tempfile.TemporaryDirectory() # This will download remote resources. local_resources = self.process_resources( local_resources, tmp_resources_dir.name) # type: ignore local_resources = cast(Dict[str, str], local_resources) if local_resources: new_resources = cluster.send_local_content( local_resources, os.path.join(cluster.orchestrator.get_home_path(), self.name, self.output_folder_name, "_resources"), all_hosts=True ) else: new_resources = dict() tmp_resources_dir.cleanup() # Add the cluster resources without the tag new_resources.update({k: v.location for k, v in self.resources.items() if isinstance(v, ClusterResource)}) if cluster.orchestrator.is_tensorboard_running(): if force: cluster.orchestrator.remove_tensorboard() else: raise man_errors.ClusterError("Tensorboard was running on the orchestrator.") cluster.orchestrator.launch_tensorboard(output_dir_remote, const.TENSORBOARD_PORT) if cluster.orchestrator.is_report_site_running(): if force: cluster.orchestrator.remove_report_site() else: raise man_errors.ClusterError("Report site was running on the orchestrator") cluster.orchestrator.launch_report_site( f"{output_dir_remote}/state.pkl", port=const.REPORT_SITE_PORT, output_log=f"output.log", output_dir=output_dir_remote, tensorboard_port=const.TENSORBOARD_PORT ) self.set_serializable_attr("resources", new_resources) self.set_serializable_attr( "save_path", f"{cluster.orchestrator.get_home_path()}/{self.name}")
[docs] def parse(self) -> None: """Parse the experiment. Parse the Experiment in search of errors that won't allow the experiment to run successfully. If it finds any error, then it raises an ParsingExperimentError. Raises ------ ParsingExperimentError In case a parsing error is found. """ # Check if name is None: if self.name is None or len(self.name) == 0: raise error.ParsingRunnableError( "Experiment should declare a name and it must not be empty" ) # Check if name is valid else: if re.match('^([a-zA-Z0-9]+[_-]*)+$', self.name) is None: raise error.ParsingRunnableError( "Experiment name should contain only alphanumeric characters " + "(with optional - or _ in between)"
)
[docs] def get_user(self) -> str: """Get the user that triggered this experiment. Returns ------- str: The user as a string. """ return self.env.local_user if self.env else self.user_provider()
[docs] def _dump_experiment_file(self) -> None: """Dump the experiment YAML representation to the output folder. """ destination = os.path.join(self.full_save_path, "experiment.yaml") with open(destination, 'w') as f: with StringIO() as s: yaml.dump_all([self.extensions, self], s) f.write(s.getvalue()) f.flush() logger.debug(f"Saved experiment file in {destination}")