"""This modules includes base Instance classes to represent machines.
All Instance objects will be managed by Cluster objects
(`flambe.cluster.cluster.Cluster`).
This base implementation is independant to the type of instance used.
Any new instance that flambe should support should inherit from the
classes that are defined in this module.
"""
# from __future__ import annotations
import time
import os
import paramiko
from paramiko.client import SSHClient
import socket
import contextlib
import subprocess
import logging
import uuid
from configparser import ConfigParser
from typing import Optional, Type, Generator, TypeVar, List, Dict
from types import TracebackType
import flambe
from flambe.cluster import const
from flambe.cluster.utils import RemoteCommand
from flambe.cluster.instance import errors
from flambe.runnable.utils import get_flambe_repo_location
from flambe.logging import coloredlogs as cl
[docs]logger = logging.getLogger(__name__)
[docs]InsT = TypeVar("InsT", bound="Instance")
[docs]class Instance(object):
"""Encapsulates remote instances.
In this context, the instance is a running computer.
All instances used by flambe remote mode will inherit
`Intance`. This class provides high-level methods to deal with
remote instances (for example, sending a shell command over SSH).
*Important: Instance objects should be pickeable.* Make sure that
all child classes can be pickled.
The flambe local process will communicate with the remote instances
using SSH. The authentication mechanism will be using private keys.
Parameters
----------
host : str
The public DNS host of the remote machine.
private_host : str
The private DNS host of the remote machine.
username : str
The machine's username.
key: str
The path to the ssh key used to communicate to the instance.
config : ConfigParser
The config object that contains useful information for the
instance. For example, `config['SSH']['SSH_KEY']` should
contain the path of the ssh key to login the remote instance.
debug : bool
True in case flambe was installed in dev mode, False otherwise.
use_public : bool
Wether this instance should use public or private IP. By
default, the public IP is used. Private host is used when
inside a private LAN.
"""
def __init__(self,
host: str,
private_host: str,
username: str,
key: str,
config: ConfigParser,
debug: bool,
use_public: bool = True) -> None:
self.host = host
self.private_host = private_host
self.username = username
self.key = key
self.config = config
self.fix_relpaths_in_config()
self.use_public = use_public
self.debug = debug
# Uses only one ssh client per instance object
self._cli: SSHClient = None
[docs] def fix_relpaths_in_config(self) -> None:
"""Updates all paths to be absolute.
For example, if it contains "~/a/b/c" it will be change to
/home/user/a/b/c (the appropiate $HOME value)
"""
for section in self.config:
for k, v in self.config[section].items():
if os.path.exists(v) and not os.path.isabs(v):
self.config[section][k] = os.path.abspath(v)
if v.startswith("~"):
self.config[section][k] = os.path.expanduser(v)
[docs] def __enter__(self):
"""Method to use `Instance` instances with context managers
Returns
-------
Instance
The current instance
"""
return self
[docs] def __exit__(self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType]) -> Optional[bool]:
"""Exit method for the context manager.
This method will catch any uprising exception and raise it.
Returns
-------
Optional[bool]
If true, exceptions will not be raised.
"""
if exc_value is not None:
print(f"{exc_value}")
return False
[docs] def prepare(self) -> None:
"""Runs all neccessary processes to prepare the instances.
The child classes should implement this method
according to the type of instance.
"""
raise NotImplementedError()
[docs] def wait_until_accessible(self) -> None:
"""Waits until the instance is accesible through SSHClient
It attempts `const.RETRIES` time to ping SSH port to See
if it's listening for incoming connections. In each attempt,
it waits `const.RETRY_DELAY`.
Raises
------
ConnectionError
If the instance is unaccesible through SSH
"""
retry_count = 0
while retry_count <= const.RETRIES:
if self.is_up():
logger.debug(f"Instance {self.host} is UP & accessible on port 22")
return
time.sleep(const.RETRY_DELAY)
retry_count += 1
logger.debug(f"Instance {self.host} not accesible. Retrying")
raise ConnectionError(f"{self.host} is unreachable through ssh.")
[docs] def is_up(self) -> bool:
"""Tests wether port 22 is open to incoming SSH connections
Returns
-------
bool
True if instance is listening in port 22. False otherwise.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(const.RETRY_DELAY)
result = sock.connect_ex((self.host if self.use_public else self.private_host, 22))
return result == 0
[docs] def _get_cli(self) -> paramiko.SSHClient:
"""Get an `SSHClient` in order to execute commands.
This will cache an existing SSHClient to optimize resource.
This is a private method and should only be used in this module.
Returns
-------
paramiko.SSHClient
The client for latter use.
Raises
------
SSHConnectingError
In case opening an SSH connection fails.
"""
try:
if (self._cli is None or self._cli.get_transport() is None or
not self._cli.get_transport().is_active()):
# Set cli in case it was not set or if it was closed
cli = paramiko.SSHClient()
cli.set_missing_host_key_policy(paramiko.AutoAddPolicy())
hostname = self.host if self.use_public else self.private_host
cli.connect(hostname=hostname,
username=self.username, key_filename=self.key,
allow_agent=False, look_for_keys=False)
self._cli = cli
return self._cli
except paramiko.ssh_exception.SSHException:
raise errors.SSHConnectingError(f"Error opening SSH connection with {hostname}. "
"Double check information provided in the secrets file")
[docs] def _run_cmd(self, cmd: str, retries: int = 1, wd: str = None) -> RemoteCommand:
"""Runs a single shell command in the instance through SSH.
The command will be executed in one ssh connection.
Don't expect calling several time to `_run_cmd` expecting to
keep state between commands. To use mutliple commands, use:
`_run_script`
*Important: when running docker containers, don't use -it flag!*
This is a private method and should only be used in this module.
Parameters
----------
cmd : str
The command to execute.
retries : int
The amount of attempts to run the command if it fails.
Default to 1.
wd : str
The working directory to 'cd' before running the command
Returns
-------
RemoteCommand
A `RemoteCommand` instance with success boolean and message.
Examples
--------
To get $HOME env
>>> instance._run_cmd("echo $HOME")
RemoteCommand(True, "/home/ubuntu")
This will not work
>>> instance._run_cmd("export var=10")
>>> instance._run_cmd("echo $var")
RemoteCommand(False, "")
This will work
>>> instance._run_cmd("export var=10; echo $var")
RemoteCommand(True, "10")
Raises
------
RemoteCommandError
In case the `cmd` failes after `retries` attempts.
"""
if retries <= 0:
raise ValueError("'retries' parameter should be > 0")
for i in range(retries):
cli = self._get_cli()
try:
if wd:
cmd = f"cd {wd}; {cmd}"
status, stdout, stderr = cli.exec_command(cmd)
# Blocks until done
while not stdout.channel.exit_status_ready():
status = stdout.channel.recv_exit_status()
out, err = stdout.read(), stderr.read()
success = status == 0
if not success:
logger.debug(f"Retry {i}. {cmd} failed with message: {err}")
else:
logger.debug(f"'{cmd}' ran successfully")
return RemoteCommand(success, out if success else err)
except errors.SSHConnectingError:
raise
except Exception as err:
raise errors.RemoteCommandError(err)
logger.debug(f"'{cmd}' returning after {retries} intents returning != 0")
return RemoteCommand(success, out if success else err)
[docs] def _run_script(self, fname: str, desc: str) -> RemoteCommand:
"""Runs a script by copyinh the script to the instance and
executing it.
This is a private method and should only be used in this module.
Parameters
----------
fname : str
The script filename
desc : str
A description for the script purpose. This will be used
for the copied filename
Returns
-------
RemoteCommand
A `RemoteCommand` instance with success boolean and message.
Raises
------
RemoteCommandError
In case the script fails.
"""
# TODO it can exist
with self._remote_script(fname, desc) as rs:
return self._run_cmd(f"./{rs}")
@contextlib.contextmanager
[docs] def _remote_script(self, host_fname: str, desc: str) -> Generator[str, None, None]:
"""Sends a local file containing a script to the instance
using Paramiko SFTP.
It should be used as a context manager for latter execution of
the script. See `_run_script` on how to use it.
After the context manager exists, then the file is removed from
the instance.
This is a private method and should only be used in this module.
Parameters
----------
host_fname : str
The local script filename
desc : str
A description for the script purpose. This will be used
for the copied filename
Yields
-------
str
The remote filename of the copied local file.
Raises
------
RemoteCommandError
In case sending the script fails.
"""
random_fname = f"{desc}_{uuid.uuid4().hex}.sh"
cli = paramiko.SSHClient()
cli.set_missing_host_key_policy(paramiko.AutoAddPolicy())
cli.connect(hostname=self.host, username=self.username,
key_filename=self.key)
sftp = cli.open_sftp()
try:
random_fname = f"{desc}_{uuid.uuid4().hex}.sh"
sftp.put(host_fname, random_fname)
cmd = self._run_cmd(f"chmod +x {random_fname}")
if cmd.success:
yield random_fname
else:
raise errors.RemoteCommandError(f"Error sending local script. {cmd.msg}")
finally:
sftp.remove(random_fname)
sftp.close()
cli.close()
[docs] def run_cmds(self, setup_cmds: List[str]) -> None:
"""Execute a list of sequential commands
Parameters
----------
setup_cmds: List[str]
The list of commands
Returns
-------
RemoteCommandError
In case at least one command is not successful
"""
for s in setup_cmds:
ret = self._run_cmd(s, retries=3)
if not ret.success:
raise errors.RemoteCommandError(f"Error executing {s} in {self.host}. " +
f"{ret.msg}")
[docs] def send_rsync(self, host_path: str, remote_path: str, params: List[str] = None) -> None:
"""Send a local file or folder to a remote instance with rsync.
Parameters
----------
host_path : str
The local filename or folder
remote_path : str
The remote filename or folder to use
params : List[str], optional
Extra parameters to be passed to rsync.
For example, ["--filter=':- .gitignore'"]
Raises
------
RemoteFileTransferError
In case sending the file fails.
"""
if not os.path.exists(host_path):
raise errors.RemoteFileTransferError(f"{host_path} does not exist.")
_from = host_path
if os.path.isdir(host_path) and not host_path.endswith(os.sep):
_from = f"{host_path}{os.sep}"
_to = f"{self.username}@{self.host if self.use_public else self.private_host}:{remote_path}"
rsync_params = ""
if params:
rsync_params = " ".join(params)
cmd = (
f'rsync {rsync_params} -ae "ssh -i {self.key} -o StrictHostKeyChecking=no" '
f'{_from} {_to}'
)
try:
subprocess.check_call(cmd, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=True)
logger.debug(f"rsync {host_path} -> {remote_path} successful")
except subprocess.CalledProcessError as e:
raise errors.RemoteFileTransferError(e)
[docs] def get_home_path(self) -> str:
"""Return the $HOME value of the instance.
Returns
-------
str
The $HOME env value.
Raises
------
RemoteCommandError
If after 3 retries it is not able to get $HOME.
"""
cmd = self._run_cmd("echo $HOME", retries=3)
if cmd.success:
return cmd.msg.decode("utf-8").strip()
raise errors.RemoteCommandError(f"Could not access $HOME env variable. {cmd.msg}")
[docs] def clean_containers(self) -> None:
"""Stop and remove all containers running
Raises
------
RemoteCommandError
If command fails
"""
cmd = f'''
docker stop $(docker ps -a -q);
docker rm $(docker ps -a -q);
'''
ret = self._run_cmd(cmd)
if not ret.success:
raise errors.RemoteCommandError("Could not clean containers")
[docs] def clean_container_by_image(self, image_name: str) -> None:
"""Stop and remove all containers given an image name.
Parameters
----------
image_name : str
The name of the image for which all containers
should be stopped and removed.
Raises
------
RemoteCommandError
If command fails
"""
cmd = f"docker rm $(docker stop "\
f"$(docker ps -a -q --filter ancestor={image_name} --format='{{{{.ID}}}}'))"
res = self._run_cmd(cmd)
if not res.success:
raise errors.RemoteCommandError(f"Could not clean container {image_name}. {res.msg}")
[docs] def clean_container_by_command(self, command: str) -> None:
"""Stop and remove all containers with the given command.
Parameters
----------
command : str
The command used to stop and remove the containers
Raises
------
RemoteCommandError
If command fails
"""
cmd = f"docker rm -f $(docker inspect -f '{{{{.ID}}}} "\
f"{{{{.Config.Cmd}}}}' $(docker ps -a -q) | grep {command} | awk '{{print $1}}')"
res = self._run_cmd(cmd)
if not res.success:
raise errors.RemoteCommandError(
f"Could not clean container with cmd {command}. {res.msg}")
[docs] def install_docker(self) -> None:
"""Install docker in a Ubuntu 18.04 distribution.
Raises
------
RemoteCommandError
If it's not able to install docker.
ie. then the installation script fails
"""
fname = os.path.join(os.path.dirname(__file__), "scripts/install_docker.sh")
cmd = self._run_script(fname, "install_docker")
if not cmd.success:
raise errors.RemoteCommandError(f"Could not install docker. {cmd.msg}")
[docs] def install_extensions(self, extensions: Dict[str, str]) -> None:
"""Install local + pypi extensions.
Parameters
----------
extension: Dict[str, str]
The extensions, as a dict from module_name to location
Raises
------
errors.RemoteCommandError
If could not install an extension
"""
cmd = ['python3', '-m', 'pip', 'install', '-U', '--user']
for ext, resource in extensions.items():
curr_cmd = cmd[:]
if 'PIP' in self.config:
host = self.config['PIP'].get('HOST', None)
if host:
curr_cmd.extend(["--trusted-host", host])
host_url = self.config['PIP'].get('HOST_URL', None)
if host_url:
curr_cmd.extend(["--extra-index-url", host_url])
if os.path.exists(resource):
# Package is local
if os.sep not in resource:
resource = f"./{resource}"
else:
# Package follows pypi notation: "torch>=0.4.1,<1.1"
resource = f"{resource}"
curr_cmd.append(resource)
ret = self._run_cmd(" ".join(curr_cmd))
if not ret.success:
raise errors.RemoteCommandError(
f"Could not install package {resource} in {self.host}"
)
[docs] def install_flambe(self) -> None:
"""Pip install Flambe.
If dev mode is activated, then it rsyncs the local flambe
folder and installs that version. If not, downloads from pypi.
Raises
------
RemoteCommandError
If it's not able to install flambe.
"""
flags = []
if 'PIP' in self.config:
host = self.config['PIP'].get('HOST', None)
if host:
flags.append(f"--trusted-host {host}")
host_url = self.config['PIP'].get('HOST_URL', None)
if host_url:
flags.append(f"--extra-index-url {host_url}")
if not self.debug:
pip_flambe = "flambe" if not self.contains_gpu() else "flambe[cuda]"
logger.debug(f"Installing flambe in {self.host} using pypi")
ret = self._run_cmd(
f"python3 -m pip install --user --upgrade "
f"{' '.join(flags)} {pip_flambe}=={flambe.__version__}",
retries=3
)
else:
origin = get_flambe_repo_location()
destination = os.path.join(self.get_home_path(), "extensions", "flambe")
self.send_rsync(origin, destination, params=["--exclude='.*'", "--exclude='docs/*'"])
logger.debug(f"Sent flambe {origin} -> {destination}")
pip_destination = destination if not self.contains_gpu() else f"{destination}[cuda]"
ret = self._run_cmd(
f"python3 -m pip install --user --upgrade {' '.join(flags)} {pip_destination}",
retries=3
)
if not ret.success:
raise errors.RemoteCommandError(f"Could not install flambe. {ret.msg}")
else:
logger.debug(f"Installed flambe in {self.host} successfully")
[docs] def is_docker_installed(self) -> bool:
"""Check if docker is installed in the instance.
Executes command "docker --version" and expect it not to fail.
Returns
-------
bool
True if docker is installed. False otherwise.
"""
cmd = self._run_cmd("docker --version")
return cmd.success
[docs] def is_flambe_installed(self, version: bool = True) -> bool:
"""Check if flambe is installed and if it matches version.
Parameters
----------
version: bool
If True, also the version will be used. That is, if flag
is True and the remote flambe version is different from the
local flambe version, then this method will return False.
If they match, then True. If version is False this method
will return if there is ANY flambe version in the host.
Returns
------
bool
"""
# First check if a version of flambe is installed
ret = self._run_cmd("bash -lc 'flambe --help'")
if not ret.success:
return False
if version:
cmd = "python3 -c 'import flambe; print(flambe.__version__)'"
ret = self._run_cmd(cmd)
if not ret.success:
raise errors.RemoteCommandError(
f"Could not run flambe in python at {self.host} even if binary was found."
)
return ret.msg.strip() == bytes(flambe.__version__, 'utf-8')
return True
[docs] def is_docker_running(self) -> bool:
"""Check if docker is running in the instance.
Executes the command "docker ps" and expects it not to fail.
Returns
-------
bool
True if docker is running. False otherwise.
"""
cmd = self._run_cmd("docker ps")
return cmd.success
[docs] def start_docker(self) -> None:
"""Restart docker.
Raises
------
RemoteCommandError
If it's not able to restart docker.
"""
cmd = self._run_cmd("sudo systemctl restart docker")
if not cmd.success:
raise errors.RemoteCommandError(f"Could not start docker. {cmd.msg}")
[docs] def is_node_running(self) -> bool:
"""Return if the host is running a ray node
Returns
-------
bool
"""
cmd = "ps -e | grep ray"
ret = self._run_cmd(cmd)
return ret.success
[docs] def is_flambe_running(self) -> bool:
"""Return if the host is running flambe
Returns
-------
bool
"""
cmd = "ps axco command | grep -P ^flambe$"
ret = self._run_cmd(cmd)
return ret.success
[docs] def existing_dir(self, _dir: str) -> bool:
"""Return if a directory exists in the host
Parameters
----------
_dir: str
The name of the directory. It needs to be relative to $HOME
Returns
-------
bool
True if exists. Otherwise, False.
"""
cmd = f"[ -d {self.get_home_path()}/{_dir} ]"
ret = self._run_cmd(cmd)
return ret.success
[docs] def shutdown_node(self) -> None:
"""Shut down the ray node in the host.
If the node is also the main node, then the entire
cluster will shut down
"""
if not self.is_node_running():
logger.debug("Tried to shutdown a non existing node")
return
cmd = self._run_cmd(
"bash -lc 'ray stop'")
if cmd.success:
logger.debug(f"Ray node stopped at {self.host}")
else:
raise errors.RemoteCommandError(f"Ray node failed to stop. {cmd.msg}")
[docs] def shutdown_flambe(self) -> None:
"""Shut down flambe in the host
"""
if not self.is_flambe_running():
logger.debug("Tried to shutdown flambe in a host that it's not runing flambe")
return
cmd = self._run_cmd("killall -9 flambe")
if cmd.success:
logger.debug(f"Flambe killed in {self.host}")
else:
raise errors.RemoteCommandError(f"Flambe failed to be shutdown. {cmd.msg}")
[docs] def create_dirs(self, relative_dirs: List[str]) -> None:
"""Create the necessary folders in the host.
Parameters
----------
relative_dirs: List[str]
The directories to create. They should be relative paths
and $HOME of each host will be used to add the prefix.
"""
# Create user's folders to deal with the experiment
for d in relative_dirs:
ret = self._run_cmd(f"[ -d {d} ]")
if not ret.success:
ret = self._run_cmd(f"mkdir -p {d}")
if ret.success:
logger.debug(f"Foldcreate_dirs {d} created in {self.host}")
else:
logger.debug(f"Existing folder {d} in {self.host}")
[docs] def remove_dir(self, _dir: str, content_only: bool = True) -> None:
"""Delete the specified dir result folder.
Parameters
----------
_dir: str
The directory. It needs to be relative to the $HOME path as
it will be prepended as a prefix.
content_only: bool
If True, the folder itseld will not be erased.
"""
path = f"{self.get_home_path()}/{_dir}"
if content_only:
cmd = self._run_cmd(f"rm -rf {path}/*")
else:
cmd = self._run_cmd(f"rm -rf {path}/")
if cmd.success:
logger.debug(f"Removed {path} at {self.host}.")
else:
raise errors.RemoteCommandError(
f"Failed to remove {path} on {self.host}. {cmd.msg}"
)
[docs] def contains_gpu(self) -> bool:
"""Return if this machine contains GPU.
This method will be used to possibly upgrade
this factory to a GPUFactoryInstance.
"""
cmd = "python3 -c 'import torch; print(torch.cuda.is_available())'"
ret = self._run_cmd(cmd)
if not ret.success:
raise errors.RemoteCommandError("Factory does not contain torch installed")
return ret.msg.strip() == b"True"
[docs]class CPUFactoryInstance(Instance):
"""This class represents a CPU Instance in the Ray cluster.
CPU Factories are instances that can run only one worker
(no GPUs available). This class is mostly useful debugging.
Factory instances will not keep any important information.
All information is going to be sent to an orchestrator machine.
"""
[docs] def prepare(self) -> None:
"""Prepare a CPU machine to be a worker node.
Checks if flambe is installed, and if not, installs it.
Raises
------
RemoteCommandError
In case any step of the preparing process fails.
"""
self.install_flambe()
[docs] def launch_node(self, redis_address: str) -> None:
"""Launch the ray worker node.
Parameters
----------
redis_address : str
The URL of the main node. Must be IP:port
Raises
------
RemoteCommandError
If not able to run node.
"""
# https://stackoverflow.com/a/18665363.
# `ray` is in ~/.local/bin that is not in $PATH in paramiko.
# For this, use bash and -lc flags
cmd = f"bash -lc 'ray start --redis-address {redis_address}'"
ret = self._run_cmd(cmd)
if ret.success:
logger.debug(f"Ray worker node launched at {self.host}")
else:
raise errors.RemoteCommandError(f"Could not launch worker node. {ret.msg}")
[docs] def num_cpus(self) -> int:
"""Return the number of CPUs this host contains.
"""
cmd = self._run_cmd(f"python3 -c 'import multiprocessing; " +
"print(multiprocessing.cpu_count())'")
if cmd.success:
return int(cmd.msg)
raise errors.RemoteCommandError(f"Could not find out the number of CPUs. {cmd.msg}")
[docs] def num_gpus(self) -> int:
"""Get the number of GPUs this host contains
Returns
-------
int
The number of GPUs
Raises
------
RemoteCommandError
If command to get the number of GPUs fails.
"""
cmd = self._run_cmd(f"python3 -c 'import torch; print(torch.cuda.device_count())'")
if cmd.success:
return int(cmd.msg)
raise errors.RemoteCommandError(f"Could not find out how many GPUs available. {cmd.msg}")
[docs]class GPUFactoryInstance(CPUFactoryInstance):
"""This class represents an Nvidia GPU Factory Instance.
Factory instances will not keep any important information.
All information is going to be sent to an Orchestrator machine.
"""
[docs] def prepare(self) -> None:
"""Prepare a GPU instance to run a ray worker node. For this, it
installs CUDA and flambe if not installed.
Raises
------
RemoteCommandError
In case any step of the preparing process fails.
"""
if not self.is_cuda_installed():
logger.debug(f"Installing CUDA at {self.host}")
self.install_cuda()
super().prepare()
[docs] def install_cuda(self) -> None:
"""Install CUDA 10.0 drivers in an Ubuntu 18.04 distribution.
Raises
------
RemoteCommandError
If it's not able to install drivers. ie if script fails
"""
fname = os.path.join(os.path.dirname(__file__), "scripts/install_cuda_ubuntu1804.sh")
cmd = self._run_script(fname, "install_cuda")
if not cmd.success:
raise errors.RemoteCommandError(f"Could not install CUDA. {cmd.msg}")
[docs] def is_cuda_installed(self) -> bool:
"""Check if CUDA is installed trying to execute `nvidia-smi`
Returns
-------
bool
True if CUDA is installed. False otherwise.
"""
cmd = self._run_cmd("nvidia-smi")
return cmd.success
[docs]class OrchestratorInstance(Instance):
"""The orchestrator instance will be the main machine in a cluster.
It is going to be the main node in the ray cluster and it will
also host other services. TODO: complete
All services besides ray will run in docker containers.
This instance does not needs to be a GPU machine.
"""
[docs] def prepare(self) -> None:
"""Install docker and flambe
Raises
------
RemoteCommandError
In case any step of the preparing process fails.
"""
if not self.is_docker_installed():
self.install_docker()
if not self.is_docker_running():
self.start_docker()
self.install_flambe()
[docs] def launch_report_site(self, progress_file: str,
port: int,
output_log: str,
output_dir: str,
tensorboard_port: int) -> None:
"""Launch the report site.
The report site is a Flask web app.
Raises
------
RemoteCommandError
In case the launch process fails
"""
tensorboard_url = f"http://{self.host}:{tensorboard_port}"
cmd = (
f"tmux new-session -d -s 'flambe-site' 'bash -lc \"flambe-site {progress_file} "
f"--tensorboard_url {tensorboard_url} "
f"--host 0.0.0.0 --port {port} "
f"--output-dir {output_dir} "
f"--output-log {output_log} &>> outputsite.log\"'"
)
res = self._run_cmd(cmd)
# Sometimes tmux command returns failure (because of some
# timeout) but website is running.
# Adding this extra check in that case.
if res.success or self.is_report_site_running():
logger.info(cl.BL(f"Report site at http://{self.host}:{port}"))
else:
raise errors.RemoteCommandError(f"Report site failed to run. {res.msg}")
[docs] def is_tensorboard_running(self) -> bool:
"""Return wether tensorboard is running in the host as docker.
Returns
-------
bool
True if Tensorboard is running, False otherwise.
"""
cmd = "docker ps | grep tensorboard"
ret = self._run_cmd(cmd)
return ret.success
[docs] def is_report_site_running(self) -> bool:
"""Return wether the report site is running in the host
Returns
-------
bool
"""
cmd = "ps axco command | grep -P ^flambe-site$"
ret = self._run_cmd(cmd)
return ret.success
[docs] def remove_tensorboard(self) -> None:
"""Removes tensorboard from the orchestrator.
"""
self.clean_container_by_command("tensorboard")
[docs] def remove_report_site(self) -> None:
"""Remove report site from the orchestrator.
"""
cmd = "pkill flambe-site"
ret = self._run_cmd(cmd)
if self.existing_tmux_session("flambe-site"):
self.kill_tmux_session("flambe-site")
return ret.success
[docs] def launch_tensorboard(self,
logs_dir: str,
tensorboard_port: int) -> None:
"""Launch tensorboard.
Parameters
----------
logs_dir : str
Tensorboard logs directory
tensorboard_port: int
The port where tensorboard will be available
Raises
------
RemoteCommandError
In case the launch process fails
"""
if not self.is_docker_installed():
logger.error("Can't run tensorboard. Docker not installed.")
return
cmd = self._run_cmd(
f"docker run -d -p {tensorboard_port}:6006 -v " +
f"{os.path.join(self.get_home_path(), logs_dir)}:" +
f"/tensorboard_logs {const.TENSORBOARD_IMAGE} tensorboard --logdir /tensorboard_logs")
if cmd.success:
logger.debug(f"Tensorboard running at http://{self.host}:{tensorboard_port} . " +
"Be aware that it can take a while until it starts showing results.")
else:
raise errors.RemoteCommandError(f"Tensorboard stable failed to run. {cmd.msg}")
[docs] def existing_tmux_session(self, session_name: str) -> bool:
"""Return if there is an existing tmux session with the same
name
Parameters
----------
session_name: str
The exact name of the searched tmux session
Returns
-------
bool
"""
cmd = f'tmux ls -F "#{{session_name}}" | grep -P ^{session_name}$'
ret = self._run_cmd(cmd)
return ret.success
[docs] def kill_tmux_session(self, session_name: str) -> None:
"""Kill an existing tmux session
Parameters
----------
session_name: str
The exact name of the tmux session to be removed
"""
cmd = f'tmux kill-session -t {session_name}'
ret = self._run_cmd(cmd)
if ret.success:
logger.debug(f"Remove existing tmux session {session_name}")
else:
raise errors.RemoteCommandError(f"Tried to remove a session. {ret.msg}")
[docs] def launch_flambe(self,
config_file: str,
secrets_file: str,
force: bool) -> None:
"""Launch flambe execution in the remote host
Parameters
----------
config_file: str
The config filename relative to the orchestrator
secrets_file: str
The filepath containing the secrets for the orchestrator
force: bool
The force parameters that was originally passed to flambe
"""
force_params = "--force" if force else ""
cmd = (
f"tmux new-session -d -s 'flambe' " +
f"'bash -lc \"flambe {config_file} --secrets {secrets_file} " +
f"{force_params} &> output.log\"'"
)
ret = self._run_cmd(cmd)
# Sometimes tmux command returns failure (because of some
# timeout) but flambe is running.
# Adding this extra check in that case.
if ret.success or self.is_flambe_running():
logger.info(cl.GR("Running flambe in Orchestrator"))
else:
raise errors.RemoteCommandError(f"Not able to run flambe. {ret.msg}")
[docs] def launch_node(self, port: int) -> None:
"""Launch the main ray node in given sftp server in port 49559.
Parameters
----------
port: int
Available port to launch the redis DB of the main ray node
Raises
------
RemoteCommandError
In case the launch process fails
"""
# https://stackoverflow.com/a/18665363.
# `ray` is in ~/.local/bin that is not in $PATH in paramiko.
# For this, use bash and -lc flags
cmd = self._run_cmd(
f"bash -lc 'ray start --head --num-cpus=0 --redis-port={port}'")
if cmd.success:
logger.debug(f"Ray main node running in {self.host}")
else:
raise errors.RemoteCommandError(f"Ray main node failed to run. {cmd.msg}")
[docs] def worker_nodes(self) -> List[str]:
"""Returns the list of worker nodes
Returns
-------
List[str]
The list of worker nodes identified by their hostname
"""
redis_address = f"\"{self.private_host}:{const.RAY_REDIS_PORT}\""
cmd = "python3 -c '\n"\
"import time\n"\
"import ray\n"\
f"ray.init(redis_address={redis_address})\n"\
"@ray.remote\n"\
"def f():\n"\
" time.sleep(0.01)\n"\
" return ray.services.get_node_ip_address()\n"\
"print(set(ray.get([f.remote() for _ in range(1000)])))\n'"
ret = self._run_cmd(cmd)
if not ret.success:
raise errors.RemoteCommandError(f"Failed to run Python script. {ret.msg}")
return [s[1:-1] for s in (ret.msg[1:-2]).decode("utf-8").split(',')]
[docs] def rsync_folder(self, _from, _to, exclude=None):
"""Rsyncs folders or files.
One of the folders NEEDS to be local. The remaining one can
be remote if needed.
"""
# -o StrictHostKeyChecking=no if not rsync asks for accepting
# key and it fails (the cmd is not providing the 'yes' answer)
# with message Host keys verification failed. \r\nrsync:
# connection unexpectedly closed (0 bytes received so far)
exc = ""
if exclude:
for x in exclude:
exc += f" --exclude {x} "
if not _from.endswith(os.sep):
_from = f"{_from}{os.sep}"
cmd = (
f"rsync -ae 'ssh -i {self.get_home_path()}/{const.PRIVATE_KEY} "
f"-o StrictHostKeyChecking=no' {exc} "
f"{_from} {_to}"
)
ret = self._run_cmd(cmd)
if not ret.success:
logger.debug(f"Could not rsync between {self.private_host} and {_to}")
else:
logger.debug(f"Rsync successful between {_from} -> {_to}")