Source code for mantlebio.core.pipeline_run.client

from typing import Dict, Optional

from mantlebio.core.dataset.client import _IDatasetClient
from mantlebio.core.pipeline_run.helpers import MantlePipelineRunKickoff
from mantlebio.core.pipeline_run.mantle_pipeline_run import _IPipelineRun, MantlePipelineRun
from mantlebio.core.session.mantle_session import _ISession
from mantlebio.core.storage.client import _IStorageClient
from mantlebio.exceptions import MantleConfigurationError
from mantlebio.helpers.decorators import deprecated
from proto import data_type_pb2, pipeline_run_pb2


[docs] class PipelineRunClient: """PipelineRunClient object for making pipeline run requests to the Mantle API""" def __init__(self, session: _ISession, storage_client: _IStorageClient, dataset_client: _IDatasetClient) -> None: self.session = session self._route_stem = f"/pipeline_run/" self.storage_client = storage_client self.dataset_client = dataset_client self.entity_client = self.dataset_client pass
[docs] @deprecated("2.0.0", "use get() instead") def load_run(self, id: str)-> _IPipelineRun: """Load an existing Pipeline Run Args: id (str): Pipeline ID Returns: PipelineRun: PipelineRun object Note: Will be deprecated in version 2.0.0 in favor of pipeline_run.get() """ return self.get(id)
[docs] @deprecated("2.0.0", "use kickoff() instead") def kickoff_run(self, pipeline_id: str, version: str, input: Optional[Dict] = None, external: bool = False) ->_IPipelineRun: """Create a new Pipeline Run Args: id (str): Pipeline ID version (str): Pipeline Version input (dict, optional): Pipeline Run inputs Returns: PipelineRun: PipelineRun object Note: Will be deprecated in version 2.0.0 in favor of kickoff() """ return self.kickoff(pipeline_id,version,input,external)
[docs] def kickoff(self, pipeline_id: str, version: str, input: Optional[Dict] = None, external: bool = False)-> _IPipelineRun: if not pipeline_id and version: raise MantleConfigurationError("Cannot create new pipleine run without pipeline ID and version.") pipeline_kickoff = MantlePipelineRunKickoff( pipeline_id=pipeline_id, pipeline_version=version, external=external, inputs=input ) pipeline_run = MantlePipelineRun(pipeline_kickoff, self.session, self.storage_client, self.dataset_client) if not input: return pipeline_run # after kicking off the pipeline, upload the inputs to storage. for field in input.keys(): if not isinstance(input[field], data_type_pb2.FileUpload): continue # upload file to storage self.storage_client.upload_file( path=input[field].path, upload_key=f"runs/{pipeline_run.unique_id}/input/{field}/{input[field].name}" ) return pipeline_run
[docs] @deprecated("2.0.0", "use get().updated_statue() instead") def update_status(self, id: str, status: str): pipeline_run = self.get(id) pipeline_run.update_status(status)
[docs] def get(self, id: str)-> _IPipelineRun: res = self.session.make_request( "GET", f"{self._route_stem}{id}") if not res.ok: res.raise_for_status() pipeline_run_obj_pb2 = pipeline_run_pb2.PipelineRun() pipeline_run_obj_pb2.ParseFromString(res.content) return MantlePipelineRun(pipeline_run_obj_pb2, self.session, self.storage_client, self.dataset_client)