Source code for pysmac.remote_smac

from __future__ import print_function, division, absolute_import

import sys
import os
import traceback
import socket
import subprocess
import resource
from pkg_resources import resource_filename
from math import ceil
import errno

import logging
import multiprocessing

import time

import pynisher




SMAC_VERSION = "smac-v2.10.03-master-778"

try:
    str=unicode #Python 2 backward compatibility
except NameError:
    pass        #Python 3 case



# takes a name and a tuple defining one parameter, registers that with the parser
# and returns the corresponding string for the SMAC pcs file and the type of the
# variable for later casting
[docs]def process_single_parameter_definition(name, specification): """ A helper function to process a single parameter definition for further communication with SMAC. """ data_type_mapping = {'integer': int, 'real': float} assert isinstance(specification, tuple), "The specification \"{}\" for {} is not valid".format(specification,name) assert len(specification)>1, "The specification \"{}\" for {} is too short".format(specification,name) if specification[0] not in {'real', 'integer', 'ordinal', 'categorical'}: raise ValueError("Type {} for {} not understood".format(specification[0], name)) string = '{} {}'.format(name, specification[0]) # numerical values if specification[0] in {'real', 'integer'}: dtype = data_type_mapping[specification[0]] if len(specification[1])!= 2: raise ValueError("Range {} for {} not valid for numerical parameter".format(specification[1], name)) if specification[1][0] >= specification[1][1]: raise ValueError("Interval {} not not understood.".format(specification[1])) if not (specification[1][0] <= specification[2] and specification[2] <= specification[1][1]): raise ValueError("Default value for {} has to be in the specified range".format(name)) if specification[0] == 'integer': if (type(specification[1][0]) != int) or (type(specification[1][1]) != int) or (type(specification[2]) != int): raise ValueError("Bounds and default value of integer parameter {} have to be integer types!".format(name)) string += " [{0[0]}, {0[1]}] [{1}]".format(specification[1], specification[2]) if ((len(specification) == 4) and specification[3] == 'log'): if specification[1][0] <= 0: raise ValueError("Range for {} cannot contain non-positive numbers.".format(name)) string += " log" # ordinal and categorical types if (specification[0] in {'ordinal', 'categorical'}): if specification[2] not in specification[1]: raise ValueError("Default value {} for {} is not valid.".format(specification[2], name)) # make sure all elements are of the same type if (len(set(map(type, specification[1]))) > 1): raise ValueError("Not all values of {} are of the same type!".format(name)) dtype = type(specification[1][0]) string += " {"+",".join(map(str, specification[1])) + '}' + ('[{}]'.format(specification[2])) return string, dtype
[docs]def process_parameter_definitions(parameter_dict): """ A helper function to process all parameter definitions conviniently with just one call. This function takes the parametr definitions from the user, converts them into lines for SMAC's PCS format, and also creates a dictionary later used in the comunication with the SMAC process. :param paramer_dict: The user defined parameter configuration space """ pcs_strings = [] parser_dict={} for k,v in list(parameter_dict.items()): line, dtype = process_single_parameter_definition(k,v) parser_dict[k] = dtype pcs_strings.append(line) return (pcs_strings, parser_dict)
[docs]class remote_smac(object): """ The class responsible for the TCP/IP communication with a SMAC instance. """ udp_timeout=5 """ The default value for a timeout for the socket """ def __init__(self, scenario_fn, additional_options_fn, seed, class_path, memory_limit, parser_dict, java_executable): """ Starts SMAC in IPC mode. SMAC will wait for udp messages to be sent. """ self.__parser = parser_dict self.__subprocess = None self.__logger = multiprocessing.get_logger() # establish a socket self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.__sock.settimeout(3) self.__sock.bind(('', 0)) self.__sock.listen(1) self.__port = self.__sock.getsockname()[1] self.__logger.debug('picked port %i'%self.__port) # build the java command cmds = java_executable.split() if memory_limit is not None: cmds += ["-Xmx%im"%memory_limit] cmds += ["-XX:ParallelGCThreads=4", "-cp", class_path, "ca.ubc.cs.beta.smac.executors.SMACExecutor", "--scenario-file", scenario_fn, "--tae", "IPC", "--ipc-mechanism", "TCP", "--ipc-remote-port", str(self.__port), "--seed", str(seed) ] with open(additional_options_fn, 'r') as fh: for line in fh: name, value = line.strip().split(' ') cmds += ['--%s'%name, '%s'%value] self.__logger.debug("SMAC command: %s"%(' '.join(cmds))) self.__logger.debug("Starting SMAC in ICP mode") # connect the output to the logger if the appropriate level has been set if self.__logger.level < logging.WARNING: self.__subprocess = subprocess.Popen(cmds, stdout =sys.stdout, stderr = sys.stderr) else: with open(os.devnull, "w") as fnull: self.__subprocess = subprocess.Popen(cmds, stdout = fnull, stderr = fnull) def __del__(self): """ Destructor makes sure that the SMAC process is terminated if necessary. """ # shut the subprocess down on 'destruction' if not (self.__subprocess is None): self.__subprocess.poll() if self.__subprocess.returncode == None: self.__subprocess.kill() self.__logger.debug('SMAC had to be terminated') else: self.__logger.debug('SMAC terminated with returncode %i', self.__subprocess.returncode)
[docs] def next_configuration(self): """ Method that queries the next configuration from SMAC. Connects to the socket, reads the message from SMAC, and converts into a proper Python representation (using the proper types). It also checks whether the SMAC subprocess is still alive. :returns: either a dictionary with a configuration, or None if SMAC has terminated """ self.__logger.debug('trying to retrieve the next configuration from SMAC') self.__sock.settimeout(self.udp_timeout) while True: try: self.__conn, addr = self.__sock.accept() fconn = self.__conn.makefile('r') config_str = fconn.readline() break except socket.timeout: # if smac already terminated, there is nothing else to do if self.__subprocess.poll() is not None: self.__logger.debug("SMAC subprocess is no longer alive!") return None #otherwise there is funny buisiness going on! else: self.__logger.debug("SMAC has not responded yet, but is still alive. Will keep waiting!") continue except socket.error as e: # continue if e.args[0] == errno.EAGAIN: self.__logger.debug("Socket to SMAC process was empty, will continue to wait.") time.sleep(1) continue else: raise except: raise self.__logger.debug("SMAC message: %s"%config_str) los = config_str.replace('\'','').split() # name is shorthand for 'list of strings' config_dict={} config_dict['instance'] = int(los[0][3:]) config_dict['instance_info'] = str(los[1]) config_dict['cutoff_time'] = float(los[2]) config_dict['cutoff_length'] = float(los[3]) config_dict['seed'] = int(los[4]) for i in range(5, len(los), 2): config_dict[ los[i][1:] ] = self.__parser[ los[i][1:] ]( los[i+1]) self.__logger.debug("Our interpretation: %s"%config_dict) return (config_dict)
[docs] def report_result(self, result_dict): """Method to report the latest run results back to SMAC. This method communicates the results from the last run back to SMAC. :param result_dict: dictionary with the keys 'value', 'status', and 'runtime'. :type result_dic: dict """ # for propper printing, we have to convert the status into unicode result_dict['status'] = result_dict['status'].decode() s = 'Result for SMAC: {0[status]}, {0[runtime]}, 0, {0[value]}, 0\ '.format(result_dict) self.__logger.debug(s) self.__conn.sendall(s.encode()) self.__conn.close();
[docs]def remote_smac_function(only_arg): """ The function that every worker from the multiprocessing pool calls to perform a separate SMAC run. This function is not part of the API that users should access, but rather part of the internals of pysmac. Due to the limitations of the multiprocessing module, it can only take one argument which is a list containing important arguments in a very specific order. Check the source code if you want to learn more. """ try: scenario_file, additional_options_fn, seed, function, parser_dict,\ memory_limit_smac_mb, class_path, num_instances, mem_limit_function,\ t_limit_function, deterministic, java_executable, timeout_quality = only_arg logger = multiprocessing.get_logger() smac = remote_smac(scenario_file, additional_options_fn, seed, class_path, memory_limit_smac_mb,parser_dict, java_executable) logger.debug('Started SMAC subprocess') num_iterations = 0 while True: config_dict = smac.next_configuration() # method next_configuration checks whether smac is still alive # if it is None, it means that SMAC has finished (for whatever reason) if config_dict is None: break # delete the unused variables from the dict if num_instances is None: del config_dict['instance'] del config_dict['instance_info'] del config_dict['cutoff_length'] if deterministic: del config_dict['seed'] current_t_limit = int(ceil(config_dict.pop('cutoff_time'))) # only restrict the runtime if an initial cutoff was defined current_t_limit = None if t_limit_function is None else current_t_limit current_wall_time_limit = None if current_t_limit is None else 10*current_t_limit # execute the function and measure the time it takes to evaluate wrapped_function = pynisher.enforce_limits( mem_in_mb=mem_limit_function, cpu_time_in_s=current_t_limit, wall_time_in_s=current_wall_time_limit, grace_period_in_s = 1)(function) # workaround for the 'Resource temporarily not available' error on # the BaWue cluster if to many processes were spawned in a short # period. It now waits a second and tries again for 8 times. num_try = 1 while num_try <= 8: try: start = time.time() res = wrapped_function(**config_dict) wall_time = time.time()-start cpu_time = resource.getrusage(resource.RUSAGE_CHILDREN).ru_utime break except OSError as e: if e.errno == 11: logger.warning('Resource temporarily not available. Trail {} of 8'.format(num_try)) time.sleep(1) else: raise except: raise finally: num_try += 1 if num_try == 9: logger.warning('Configuration {} crashed 8 times, giving up on it.'.format(config_dict)) res = None if res is not None: try: logger.debug('iteration %i:function value %s, computed in %s seconds'%(num_iterations, str(res), str(res['runtime']))) except (TypeError, AttributeError, KeyError, IndexError): logger.debug('iteration %i:function value %s, computed in %s seconds'%(num_iterations, str(res),cpu_time)) except: raise else: logger.debug('iteration %i: did not return in time, so it probably timed out'%(num_iterations)) # try to infere the status of the function call: # if res['status'] exsists, it will be used in 'report_result' # if there was no return value, it has either crashed or timed out # for simple function, we just use 'SAT' result_dict = { 'value' : timeout_quality, 'status': b'CRASHED' if res is None else b'SAT', 'runtime': cpu_time } if res is not None: if isinstance(res, dict): result_dict.update(res) else: result_dict['value'] = res # account for timeeouts if not current_t_limit is None: if ( (result_dict['runtime'] > current_t_limit-2e-2) or (wall_time >= 10*current_t_limit) ): result_dict['status']=b'TIMEOUT' # set returned quality to default in case of a timeout if result_dict['status'] == b'TIMEOUT': result_dict['value'] = result_dict['value'] if timeout_quality is None else timeout_quality smac.report_result(result_dict) num_iterations += 1 except: traceback.print_exc() # to see the traceback of subprocesses