<aside> 💡 이 문서는 Elastic Horovod에서 Autoscaling과 관련된 구조 및 역할을 설명합니다.
</aside>

위 그림은 전체적인 구조를 나타냅니다. 각 부분에 대한 자세한 설명은 아래를 참고해 주십시오.
horovodrun 명령을 수행하면 horovod/runner/launch.py 파일이 실행됩니다.
if __name__ == '__main__':
run_commandline()
def _run(args):
# If LSF is used, use default values from job config
if lsf.LSFUtils.using_lsf():
if not args.np:
args.np = lsf.LSFUtils.get_num_processes()
if not args.hosts and not args.hostfile and not args.host_discovery_script:
args.hosts = ','.join('{host}:{np}'.format(host=host, np=lsf.LSFUtils.get_num_gpus())
for host in lsf.LSFUtils.get_compute_hosts())
# if hosts are not specified, either parse from hostfile, or default as
# localhost
if not args.hosts and not args.host_discovery_script:
if args.hostfile:
args.hosts = hosts.parse_host_files(args.hostfile)
else:
# Set hosts to localhost if not specified
args.hosts = 'localhost:{np}'.format(np=args.np)
# Convert nics into set
args.nics = set(args.nics.split(',')) if args.nics else None
if _is_elastic(args):
return _run_elastic(args)
else:
return _run_static(args)
def _run_elastic(args):
# construct host discovery component
if args.host_discovery_script:
discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
elif args.hosts:
_, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
if len(available_host_slots) < 2:
raise ValueError('Cannot run in fault tolerance mode with fewer than 2 hosts.')
discover_hosts = discovery.FixedHosts(available_host_slots)
else:
raise ValueError('One of --host-discovery-script, --hosts, or --hostnames must be provided')
# horovodrun has to finish all the checks before this timeout runs out.
if args.start_timeout:
start_timeout = args.start_timeout
else:
# Lookup default timeout from the environment variable.
start_timeout = int(os.getenv('HOROVOD_START_TIMEOUT', '30'))
tmout = timeout.Timeout(start_timeout,
message='Timed out waiting for {activity}. Please '
'check connectivity between servers. You '
'may need to increase the --start-timeout '
'parameter if you have too many servers.')
settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
min_np=args.min_np or args.np,
max_np=args.max_np,
elastic_timeout=args.elastic_timeout,
reset_limit=args.reset_limit,
num_proc=args.np,
verbose=2 if args.verbose else 0,
ssh_port=args.ssh_port,
ssh_identity_file=args.ssh_identity_file,
extra_mpi_args=args.mpi_args,
key=secret.make_secret_key(),
start_timeout=tmout,
output_filename=args.output_filename,
run_func_mode=args.run_func is not None,
nics=args.nics,
prefix_output_with_timestamp=args.prefix_output_with_timestamp)
if not gloo_built(verbose=(settings.verbose >= 2)):
raise ValueError('Gloo support is required to use elastic training, but has not been built. Ensure CMake is '
'installed and reinstall Horovod with HOROVOD_WITH_GLOO=1 to debug the build error.')
env = os.environ.copy()
config_parser.set_env_from_args(env, args)
gloo_run_elastic(settings, env, args.command)
def gloo_run_elastic(settings, env, command):
def get_common_interfaces(driver):
# Host-to-host common interface detection requires at least 2 hosts in an elastic job.
min_hosts = _get_min_start_hosts(settings)
current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)
exec_command = _exec_command_fn(settings)
rendezvous = RendezvousServer(settings.verbose)
launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)
gloo_run_elastic() 함수에서는 RendezvousServer 객체를 생성하고 launch_gloo_elastic() 함수를 호출합니다.
def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous):
# Make the output directory if it does not exist
if settings.output_filename:
_mkdir_p(settings.output_filename)
driver = ElasticDriver(rendezvous, settings.discovery,
settings.min_np, settings.max_np,
timeout=settings.elastic_timeout,
reset_limit=settings.reset_limit,
verbose=settings.verbose)
handler = create_rendezvous_handler(driver)
global_rendezv_port = rendezvous.start(handler)
driver.wait_for_available_slots(settings.num_proc)
nics = get_common_interfaces(driver)
server_ip = network.get_driver_ip(nics)
event = register_shutdown_event()
run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)
create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)
driver.start(settings.num_proc, create_worker)
res = driver.get_results()
driver.stop()
if res.error_message is not None:
raise RuntimeError(res.error_message)
for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
exit_code, timestamp = value
if exit_code != 0:
raise RuntimeError('Horovod detected that one or more processes exited with non-zero '
'status, thus causing the job to be terminated. The first process '
'to do so was:\\nProcess name: {name}\\nExit code: {code}\\n'
.format(name=name, code=exit_code))
launch_gloo_elastic() 함수에서는 ElasticDriver와 ElasticRendezvousHandler를 생성하고, create_rendezvous_handler() 함수를 이용해 Worker를 생성하는 커맨드를 만든 뒤 ElasticDriver.start() 함수가 실행되면서 Worker가 생성되어 학습이 시작됩니다.
class RendezvousServer:
def __init__(self, verbose=0):
self._httpd = None
self._listen_thread = None
self._verbose = verbose
# Rendezvous function finds a available port, create http socket,
# and start listening loop to handle request
# self.httpd.init needs to be called after server start
def start(self, handler_cls=RendezvousHandler):
self._httpd, port = find_port(
lambda addr: RendezvousHTTPServer(
addr, handler_cls, self._verbose))
if self._verbose:
logging.info('Rendezvous INFO: HTTP rendezvous server started.')
# start the listening loop
self._listen_thread = in_thread(target=self._httpd.serve_forever)
return port
def init(self, host_alloc_plan):
self._httpd.init(host_alloc_plan)
def stop(self):
self._httpd.shutdown()
self._listen_thread.join()
RendezvousServer는 HTTP server로서 클러스터에 전역적으로 1개만 존재하며 ElasticRendezvousHandler를 이용해 client의 요청을 처리합니다.