Source code for mantlebio.core.pipeline.client
from mantlebio.core.pipeline.mantle_pipeline import _IPipeline, MantlePipeline
from mantlebio.core.session.mantle_session import _ISession
from mantlebio.exceptions import MantleMissingParameterError
from mantlebio.helpers.decorators import deprecated
from proto import pipeline_pb2
from google.protobuf.message import DecodeError
[docs]
class PipelineClient:
"""PipelineClient object for making requests to the Mantle API"""
def __init__(self, session: _ISession) -> None:
self.session = session
self.route_stem = f"/pipeline/"
pass
[docs]
@deprecated("2.0.0", "use get() instead")
def get_pipeline(self, id: str, version: str = "")->_IPipeline:
"""get an existing Pipeline
Args:
id (str): Pipeline ID
Returns:
Pipeline: Pipeline object
Note:
This method will be deprecated in version 2.0.0. Use get() instead.
"""
return self.get(id, version)
[docs]
def get(self, id: str, version: str = "")->_IPipeline:
"""get an existing Pipeline
Args:
id (str): Pipeline ID
version (str, optional): Pipeline Version. Defaults to "". If not provided, the latest version will be returned.
Returns:
Pipeline: Pipeline object
"""
if not id:
raise MantleMissingParameterError("id is required")
endpoint = f"{self.route_stem}{id}/{version}"
if not version:
# By default, the latest version will be returned so we dont want to raise an error but we want to log a warning since it could lead to unexpected behavior
Warning("No version provided. The latest version will be returned.")
endpoint = f"{self.route_stem}{id}"
res = self.session.make_request("GET", endpoint)
if not res.ok:
res.raise_for_status()
try:
pipeline_proto = pipeline_pb2.Pipeline()
pipeline_proto.ParseFromString(res.content)
except DecodeError as e:
raise
return MantlePipeline(pipeline_proto)