<aside> 💡 이 문서는 Elastic Horovod에서 Autoscaling과 관련된 구조 및 역할을 설명합니다.

</aside>

전체구조.png

위 그림은 전체적인 구조를 나타냅니다. 각 부분에 대한 자세한 설명은 아래를 참고해 주십시오.

horovodrun 명령을 수행하면 horovod/runner/launch.py 파일이 실행됩니다.

if __name__ == '__main__':
    run_commandline()
def _run(args):
    # If LSF is used, use default values from job config
    if lsf.LSFUtils.using_lsf():
        if not args.np:
            args.np = lsf.LSFUtils.get_num_processes()
        if not args.hosts and not args.hostfile and not args.host_discovery_script:
            args.hosts = ','.join('{host}:{np}'.format(host=host, np=lsf.LSFUtils.get_num_gpus())
                                  for host in lsf.LSFUtils.get_compute_hosts())

    # if hosts are not specified, either parse from hostfile, or default as
    # localhost
    if not args.hosts and not args.host_discovery_script:
        if args.hostfile:
            args.hosts = hosts.parse_host_files(args.hostfile)
        else:
            # Set hosts to localhost if not specified
            args.hosts = 'localhost:{np}'.format(np=args.np)

    # Convert nics into set
    args.nics = set(args.nics.split(',')) if args.nics else None

    if _is_elastic(args):
        return _run_elastic(args)
    else:
        return _run_static(args)
def _run_elastic(args):
    # construct host discovery component
    if args.host_discovery_script:
        discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
    elif args.hosts:
        _, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
        if len(available_host_slots) < 2:
            raise ValueError('Cannot run in fault tolerance mode with fewer than 2 hosts.')
        discover_hosts = discovery.FixedHosts(available_host_slots)
    else:
        raise ValueError('One of --host-discovery-script, --hosts, or --hostnames must be provided')

    # horovodrun has to finish all the checks before this timeout runs out.
    if args.start_timeout:
        start_timeout = args.start_timeout
    else:
        # Lookup default timeout from the environment variable.
        start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '30'))

    tmout = timeout.Timeout(start_timeout,
                            message='Timed out waiting for {activity}. Please '
                                    'check connectivity between servers. You '
                                    'may need to increase the --start-timeout '
                                    'parameter if you have too many servers.')
    settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
                                                min_np=args.min_np or args.np,
                                                max_np=args.max_np,
                                                elastic_timeout=args.elastic_timeout,
                                                reset_limit=args.reset_limit,
                                                num_proc=args.np,
                                                verbose=2 if args.verbose else 0,
                                                ssh_port=args.ssh_port,
                                                ssh_identity_file=args.ssh_identity_file,
                                                extra_mpi_args=args.mpi_args,
                                                key=secret.make_secret_key(),
                                                start_timeout=tmout,
                                                output_filename=args.output_filename,
                                                run_func_mode=args.run_func is not None,
                                                nics=args.nics,
                                                prefix_output_with_timestamp=args.prefix_output_with_timestamp)

    if not gloo_built(verbose=(settings.verbose >= 2)):
        raise ValueError('Gloo support is required to use elastic training, but has not been built.  Ensure CMake is '
                         'installed and reinstall Horovod with HOROVOD_WITH_GLOO=1 to debug the build error.')

    env = os.environ.copy()
    config_parser.set_env_from_args(env, args)
    gloo_run_elastic(settings, env, args.command)
def gloo_run_elastic(settings, env, command):

    def get_common_interfaces(driver):
        # Host-to-host common interface detection requires at least 2 hosts in an elastic job.
        min_hosts = _get_min_start_hosts(settings)
        current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
        return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)

    exec_command = _exec_command_fn(settings)
    rendezvous = RendezvousServer(settings.verbose)
    launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)

gloo_run_elastic() 함수에서는 RendezvousServer 객체를 생성하고 launch_gloo_elastic() 함수를 호출합니다.

def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous):
    # Make the output directory if it does not exist
    if settings.output_filename:
        _mkdir_p(settings.output_filename)

    driver = ElasticDriver(rendezvous, settings.discovery,
                           settings.min_np, settings.max_np,
                           timeout=settings.elastic_timeout,
                           reset_limit=settings.reset_limit,
                           verbose=settings.verbose)

    handler = create_rendezvous_handler(driver)
    global_rendezv_port = rendezvous.start(handler)
    driver.wait_for_available_slots(settings.num_proc)

    nics = get_common_interfaces(driver)
    server_ip = network.get_driver_ip(nics)

    event = register_shutdown_event()
    run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)

    create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)

    driver.start(settings.num_proc, create_worker)
    res = driver.get_results()
    driver.stop()

    if res.error_message is not None:
        raise RuntimeError(res.error_message)

    for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
        exit_code, timestamp = value
        if exit_code != 0:
            raise RuntimeError('Horovod detected that one or more processes exited with non-zero '
                               'status, thus causing the job to be terminated. The first process '
                               'to do so was:\\nProcess name: {name}\\nExit code: {code}\\n'
                               .format(name=name, code=exit_code))

launch_gloo_elastic() 함수에서는 ElasticDriverElasticRendezvousHandler를 생성하고, create_rendezvous_handler() 함수를 이용해 Worker를 생성하는 커맨드를 만든 뒤 ElasticDriver.start() 함수가 실행되면서 Worker가 생성되어 학습이 시작됩니다.

RendezvousServer

class RendezvousServer:
    def __init__(self, verbose=0):
        self._httpd = None
        self._listen_thread = None
        self._verbose = verbose

    # Rendezvous function finds a available port, create http socket,
    # and start listening loop to handle request
    # self.httpd.init needs to be called after server start
    def start(self, handler_cls=RendezvousHandler):
        self._httpd, port = find_port(
            lambda addr: RendezvousHTTPServer(
                addr, handler_cls, self._verbose))
        if self._verbose:
            logging.info('Rendezvous INFO: HTTP rendezvous server started.')

        # start the listening loop
        self._listen_thread = in_thread(target=self._httpd.serve_forever)

        return port

    def init(self, host_alloc_plan):
        self._httpd.init(host_alloc_plan)

    def stop(self):
        self._httpd.shutdown()
        self._listen_thread.join()

RendezvousServer는 HTTP server로서 클러스터에 전역적으로 1개만 존재하며 ElasticRendezvousHandler를 이용해 client의 요청을 처리합니다.