Source code for surround.assembler
# assembler.py
import logging
import os
import sys
from abc import ABC
from datetime import datetime
from .config import Config, has_config
from .run_modes import RunMode
from .stage import Stage, Estimator
LOGGER = logging.getLogger(__name__)
[docs]class Assembler(ABC):
"""
Class responsible for assembling and executing a Surround pipeline.
Responsibilities:
- Encapsulate the configuration data and pipeline stages
- Load configuration from a specified module
- Run the pipeline with input data in predict/batch/train mode
For more information on this process, see the :ref:`about` page.
Example::
assembler = Assembler("Example pipeline")
assembler.set_stages([PreFilter(), PredictStage(), PostFilter()])
assembler.init_assembler(batch_mode=False)
data = AssemblyState("some data")
assembler.run(data, is_training=False)
Batch-predict mode::
assembler.init_assembler(batch_mode=True)
assembler.run(data, is_training=False)
Training mode::
assembler.init_assembler(batch_mode=True)
assembler.run(data, is_training=True)
Predict/Estimate mode::
assembler.init_assembler(batch_mode=False)
assembler.run(data, is_training=False)
"""
# pylint: disable=too-many-instance-attributes
@has_config
def __init__(self, assembler_name="", config=None):
"""
Constructor for an Assembler pipeline:
:param assembler_name: The name of the pipeline
:param config: Surround Config object
:type assembler_name: str
"""
self.assembler_name = assembler_name
self.config = config
self.stages = None
self.batch_mode = False
self.finaliser = None
self.state = None
self.metrics = None
[docs] def init_assembler(self):
"""
Initializes the assembler and all of it's stages.
Calls the :meth:`surround.stage.Stage.initialise` method of all stages and the estimator.
.. note:: Should be called after :meth:`surround.assembler.Assembler.set_config`.
:returns: whether the initialisation was successful
:rtype: bool
"""
try:
if self.stages:
estimator_count = 0
for stage in self.stages:
stage.initialise(self.config)
if isinstance(stage, Estimator):
estimator_count += 1
if estimator_count > 1:
raise ValueError("Stages can only have one Estimator class")
if self.finaliser:
self.finaliser.initialise(self.config)
except Exception as e:
if self.config.get_path("surround.surface_exceptions"):
raise e
LOGGER.exception(e)
return False
return True
[docs] def run(self, state=None, mode=RunMode.PREDICT):
"""
Run the pipeline using the input data provided.
If ``is_training`` is set to ``True`` then when it gets to the execution of the estimator,
it will use the :meth:`surround.stage.Estimator.fit` method instead.
If ``surround.enable_stage_output_dump`` is enabled in the Config instance then each stage and
estimator's :meth:`surround.stage.Stage.dump_output` method will be called.
This method doesn't return anything, instead results should be stored in the ``state``
object passed in the parameters.
:param state: Data passed between each stage in the pipeline
:type state: :class:`surround.State`
:param is_training: Run the pipeline in training mode or not
:type is_training: bool
"""
is_training = mode == RunMode.TRAIN
LOGGER.info("Starting '%s'", self.assembler_name)
if not self.stages:
raise ValueError("There are no stages to run!")
if not state:
raise ValueError("state is required to run an assembler")
self.state = state
state.freeze()
has_estimator = [s for s in self.stages if isinstance(s, Estimator)]
if is_training and not has_estimator:
raise ValueError("No Estimator class added to stages.")
def _run_stage_safe(a_stage):
start_time = datetime.now()
try:
if isinstance(a_stage, Estimator):
if is_training:
a_stage.fit(state, self.config)
else:
a_stage.estimate(state, self.config)
else:
a_stage.operate(state, self.config)
if self.config["surround"]["enable_stage_output_dump"]:
a_stage.dump_output(state, self.config)
except Exception as e:
if self.config.get_path("surround.surface_exceptions"):
raise e
state.errors.append(str(e))
LOGGER.exception(e)
execution_time = datetime.now() - start_time
state.execution_time.append(str(execution_time))
LOGGER.info("%s took %s secs", type(a_stage).__name__, execution_time)
for stage in self.stages:
_run_stage_safe(stage)
if state.errors:
break
if self.metrics and mode != RunMode.PREDICT:
_run_stage_safe(self.metrics)
if self.finaliser:
_run_stage_safe(self.finaliser)
if state.errors:
LOGGER.error(state.errors)
state.thaw()
[docs] def load_config(self, module):
"""
Given a module contained in the root of the project, create an instance of
:class:`surround.config.Config` loading configuration data from the ``config.yaml``
found in the project, and use this configuration for the pipeline.
.. note:: Should be called before :meth:`surround.assembler.Assemble.init_assembler`
:param module: name of the module
:type module: str
"""
if module:
# Module already imported and has a file attribute
mod = sys.modules.get(module)
if mod and hasattr(mod, '__file__'):
package_path = os.path.dirname(os.path.abspath(mod.__file__))
root_path = os.path.dirname(package_path)
else:
raise ValueError("Invalid Python module %s" % module)
self.set_config(Config(root_path))
if not os.path.exists(self.config["output_path"]):
os.makedirs(self.config["output_path"])
else:
self.set_config(Config())
return self
[docs] def set_config(self, config):
"""
Set the configuration data to be used during pipeline execution.
.. note:: Should be called before :meth:`surround.assembler.Assembler.init_assembler`.
:param config: the configuration data
:type config: :class:`surround.config.Config`
"""
if not config or not isinstance(config, Config):
raise TypeError("config should be of class Config")
self.config = config
return self
[docs] def set_stages(self, stages):
"""
Set the stages to be executed one after the other in the pipeline.
:param stages: list of stages to execute
:type stages: list of :class:`surround.stage.Stage`
"""
if not isinstance(stages, list) or not all([issubclass(type(x), Stage) for x in stages]):
raise ValueError("stages must be a list of Stages's only!")
self.stages = stages
return self
[docs] def set_finaliser(self, finaliser):
"""
Set the final stage that will be executed no matter how the pipeline runs.
This will be executed even when the pipeline fails or throws an error.
:param finaliser: the final stage instance
:type finaliser: :class:`surround.stage.Stage`
"""
# finaliser must be a type of Stage
if not finaliser and not isinstance(finaliser, Stage):
raise TypeError("finaliser should be of class Stage")
self.finaliser = finaliser
return self
def set_metrics(self, metrics):
if not metrics and not isinstance(metrics, Stage):
raise TypeError("metrics should be of the Stage class")
self.metrics = metrics
return self