Source code for flambe.cluster.instance.instance

"""This modules includes base Instance classes to represent machines.

All Instance objects will be managed by Cluster objects

This base implementation is independant to the type of instance used.

Any new instance that flambe should support should inherit from the
classes that are defined in this module.

# from __future__ import annotations

import time
import os
import paramiko
from paramiko.client import SSHClient
import socket
import contextlib
import subprocess

import logging
import uuid

from configparser import ConfigParser
from typing import Optional, Type, Generator, TypeVar, List, Dict
from types import TracebackType

import flambe
from flambe.cluster import const
from flambe.cluster.utils import RemoteCommand
from flambe.cluster.instance import errors

from flambe.runnable.utils import get_flambe_repo_location
from flambe.logging import coloredlogs as cl

logger = logging.getLogger(__name__)
InsT = TypeVar("InsT", bound="Instance")
class Instance(object):
def fix_relpaths_in_config(self) -> None:
def __enter__(self):
def __exit__(self,
                 exc_type: Optional[Type[BaseException]],
                 exc_value: Optional[BaseException],
                 traceback: Optional[TracebackType]) -> Optional[bool]:
def prepare(self) -> None:
def wait_until_accessible(self) -> None:
def is_up(self) -> bool:
def _get_cli(self) -> paramiko.SSHClient:
"Double check information provided in the secrets file")
def _run_cmd(self, cmd: str, retries: int = 1, wd: str = None) -> RemoteCommand:
def _run_script(self, fname: str, desc: str) -> RemoteCommand:
def _remote_script(self, host_fname: str, desc: str) -> Generator[str, None, None]:
def run_cmds(self, setup_cmds: List[str]) -> None:
def send_rsync(self, host_path: str, remote_path: str, filter_param: str = "") -> None:
def get_home_path(self) -> str:
def clean_containers(self) -> None:
def clean_container_by_image(self, image_name: str) -> None:
def clean_container_by_command(self, command: str) -> None:
def install_docker(self) -> None:
def install_extensions(self, extensions: Dict[str, str]) -> None:
def install_flambe(self) -> None:
def is_docker_installed(self) -> bool:
def is_flambe_installed(self, version: bool = True) -> bool:
def is_docker_running(self) -> bool:
def start_docker(self) -> None:
def is_node_running(self) -> bool:
def is_flambe_running(self) -> bool:
def existing_dir(self, _dir: str) -> bool:
def shutdown_node(self) -> None:
def shutdown_flambe(self) -> None:
def create_dirs(self, relative_dirs: List[str]) -> None:
def remove_dir(self, _dir: str, content_only: bool = True) -> None:
def contains_gpu(self) -> bool:
class CPUFactoryInstance(Instance):
def prepare(self) -> None:
def launch_node(self, redis_address: str) -> None:
def num_cpus(self) -> int:
def num_gpus(self) -> int:
class GPUFactoryInstance(CPUFactoryInstance):
def prepare(self) -> None:
def install_cuda(self) -> None:
def is_cuda_installed(self) -> bool:
class OrchestratorInstance(Instance):
def prepare(self) -> None:
def launch_report_site(self, progress_file: str, port: int, output_log: str,
                           output_dir: str, tensorboard_port: int) -> None:
def is_tensorboard_running(self) -> bool:
def is_report_site_running(self) -> bool:
def remove_tensorboard(self) -> None:
def remove_report_site(self) -> None:
def launch_tensorboard(self, logs_dir: str, tensorboard_port: int) -> None:
def existing_tmux_session(self, session_name: str) -> bool:
def kill_tmux_session(self, session_name: str) -> None:
def launch_flambe(self, config_file: str, secrets_file: str, force: bool) -> None:
def launch_node(self, port: int) -> None:
def worker_nodes(self) -> List[str]:
def rsync_folder(self, _from, _to, exclude=None):