Source code for morpho.utilities.toolbox

#!/bin/python

'''
Toolbox class: create, configure and run processors
Authors: M. Guigue
Date: 06/26/18
'''
import os
import importlib

from morpho.utilities import morphologging, parser
logger = morphologging.getLogger(__name__)


[docs]class ToolBox: ''' Manages processors requested by the user at run-time. Via a configuration file, the user defines which processor to use, how to configure them and how to connect them. ''' def __init__(self, args): self._ReadConfigFile(args.config) self._UpdateConfigFromCLI(args) self._processors_dict = dict() self._chain_processors = [] def _ReadConfigFile(self, filename): if os.path.exists(filename): if filename.endswith(".json"): my_module = importlib.import_module("json") elif filename.endswith(".yaml"): my_module = importlib.import_module("yaml") else: logger.warning( "Unknown format: {}; trying json".format(filename)) my_module = importlib.import_module("json") with open(filename, 'r') as json_file: try: self.config_dict = my_module.load(json_file) except Exception as err: logger.error( "Error while reading {}:\n{}".format(filename, err)) raise else: logger.error("File {} does not exist".format(filename)) raise FileNotFoundError(filename) def _UpdateConfigFromCLI(self, args): if "param" in args and args.param: self.config_dict = parser.update_from_arguments( self.config_dict, args.param) def _CreateAndConfigureProcessors(self): for a_dict in self.config_dict["processors-toolbox"]["processors"]: if not self._CreateOneProcessor(a_dict["name"], a_dict["type"]): logger.error( "Could not create processor <{}>; exiting".format(a_dict["name"])) return False for _, processor in self._processors_dict.items(): procName = processor["object"].name if procName in self.config_dict.keys(): config_dict = self.config_dict[procName] else: config_dict = dict() try: processor["object"].Configure(config_dict) except Exception as err: logger.error( "Configuration of <{}> failed: \n{}".format(procName, err)) return False return True def _CreateOneProcessor(self, procName, procClass): # Parsing procClass if ":" in procClass: (module_name, processor_name) = procClass.split(":") else: module_name = "morpho" processor_name = procClass # importing module (morpho is default) try: module = importlib.import_module(module_name) except: logger.error("Cannot import module {}".format(module_name)) return False try: self._processors_dict.update({procName: { "object": getattr(module, processor_name)(procName), "variableToGive": [], # -> variable to give after execution # -> which processor need to give its output to this processor "procToBeConnectedTo": [], "varToBeConnectedTo": [], # -> which variable of the connected processor to be set "deleted": False } }) logger.info("Processor <{}> ({}:{}) created".format( procName, module_name, processor_name)) return True except: logger.error("Cannot import {} from {}".format( processor_name, "morpho")) return False def _ConnectProcessors(self, nameProc): proc_object = self._processors_dict[nameProc]['object'] nConnections = len(self._processors_dict[nameProc]['variableToGive']) for i in range(nConnections): proc_name_to_update = self._processors_dict[nameProc]['procToBeConnectedTo'][i] var_to_give = self._processors_dict[nameProc]['variableToGive'][i] var_to_be_connected_to = self._processors_dict[nameProc]['varToBeConnectedTo'][i] proc_object_to_update = self._processors_dict[proc_name_to_update]['object'] logger.debug("Connection {}:{} -> {}:{}".format(nameProc, var_to_give, proc_name_to_update, var_to_be_connected_to)) try: val = getattr(proc_object, var_to_give) setattr(proc_object_to_update, var_to_be_connected_to, val) except Exception as err: logger.error("Connection {}:{} -> {}:{} failed:\n{}".format(nameProc, var_to_give, proc_name_to_update, var_to_be_connected_to, err)) return False return True def _DefineChain(self): ''' Defines the connections between the processors and place the processors into a ordered list. ''' for a_connection in self.config_dict['processors-toolbox']['connections']: if a_connection['slot'].split(":")[0] not in self._processors_dict.keys(): logger.error("Processor <{}> not defined but used as signal emitter".format( a_connection['slot'].split(":")[0])) if a_connection['signal'].split(":")[0] not in self._processors_dict.keys(): logger.error("Processor <{}> not defined but used as connection".format( a_connection['signal'].split(":")[0])) proc_name = a_connection['signal'].split(":")[0] new_proc_name = a_connection['slot'].split(":")[0] self._processors_dict[proc_name]["variableToGive"].append( a_connection['signal'].split(":")[1]) self._processors_dict[proc_name]["procToBeConnectedTo"].append( a_connection['slot'].split(":")[0]) if proc_name not in self._chain_processors: self._chain_processors.append(proc_name) if new_proc_name not in self._chain_processors: self._chain_processors.append(new_proc_name) self._processors_dict[proc_name]["varToBeConnectedTo"].append( a_connection['slot'].split(":")[1]) for a_processor in self._processors_dict.keys(): if a_processor not in self._chain_processors: self._chain_processors.append(a_processor) logger.debug("Sequence of processors: {}".format( self._sequenceProcessors())) return True def _sequenceProcessors(self): seqWithArrows = self._chain_processors[0] for item in self._chain_processors[1:]: seqWithArrows = seqWithArrows + " -> " + item return seqWithArrows def _RunChain(self): ''' Execute the chain of processors ''' for a_processor in self._chain_processors: try: if not self._processors_dict[a_processor]['object'].Run(): logger.error("Result <{}> incorrect".format(a_processor)) return False except Exception as err: logger.error( "Error while running <{}>:\n{}".format(a_processor, err)) raise err self._ConnectProcessors(a_processor) if self._processors_dict[a_processor]['object'].delete: self._processors_dict[a_processor]['deleted'] = True logger.info("Deleting <{}>".format(a_processor)) del self._processors_dict[a_processor]['object'] return True
[docs] def Run(self): import json logger.debug("Configuration:\n{}".format( json.dumps(self.config_dict, indent=4))) if not self._CreateAndConfigureProcessors(): logger.error("Error while creating and configuring processors!") return False if not self._DefineChain(): logger.error("Error while defining processors chain!") return False if not self._RunChain(): logger.error("Error while running processors!") return False
[docs] def GetProcessor(procName): if self._processors_dict[str(procName)]['deleted']: logger.warning("Processor {} has been deleted!".format(procName)) return 0 return self._processors_dict[str(procName)]['object']
[docs] def GetProcAttr(procName, varName): if self._processors_dict[str(procName)]['deleted']: logger.warning("Processor {} has been deleted!".format(procName)) return 0 value = 0 try: value = getattr(self._processors_dict[str(procName)]['object'], str(varName)) except: logger.warning("Attribute {} does not exist in {}".format(procValue, procName)) return value