Source code for mantlebio.core.pipeline.mantle_pipeline
from abc import abstractmethod
import inspect
from mantlebio.helpers.decorators import deprecated
from proto import pipeline_pb2
from mantlebio.exceptions import MantleAttributeError
[docs]
class _IPipeline:
@abstractmethod
def __init__(self,pipeline_instance: pipeline_pb2.Pipeline)->None:
pass
@abstractmethod
def __getattr__(self, name):
pass
@deprecated("2.0.0", "use base object instead")
@property
@abstractmethod
def pipeline_pb2(self)->pipeline_pb2.Pipeline:
pass
[docs]
class MantlePipeline(_IPipeline):
def __init__(self, pipeline_instance: pipeline_pb2.Pipeline) -> None:
self._pipeline_instance = pipeline_instance
[docs]
def _wrap_method(self, method):
def wrapper(*args, **kwargs):
return method(*args, **kwargs)
return wrapper
@property
def pipeline_pb2(self)->pipeline_pb2.Pipeline:
return self._pipeline_instance
def __getattr__(self, name):
# First, check if the object itself has the property
# TODO: this should be removed when we deprecate the proto property accessors
try:
return super().__getattribute__(name)
except AttributeError:
pass
# Dynamically route attribute access to the protobuf object
if hasattr(self._pipeline_instance, name):
attr = getattr(self._pipeline_instance, name)
if inspect.ismethod(attr):
return self._wrap_method(attr)
return attr
raise MantleAttributeError(f"'{type(self._pipeline_instance).__name__}' object has no attribute '{name}'")