diff --git a/docs/index.rst b/docs/index.rst index 9ae20821..9958cf85 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -66,3 +66,4 @@ Table Of Contents Channels Decorators Exceptions + Zeebe Adapter diff --git a/docs/zeebe_adapter.rst b/docs/zeebe_adapter.rst new file mode 100644 index 00000000..8b9b9dd3 --- /dev/null +++ b/docs/zeebe_adapter.rst @@ -0,0 +1,8 @@ +============= +Zeebe Adapter +============= + +.. toctree:: + :name: zeebe_adapter + + Reference diff --git a/docs/zeebe_adapter_reference.rst b/docs/zeebe_adapter_reference.rst new file mode 100644 index 00000000..1b94cc50 --- /dev/null +++ b/docs/zeebe_adapter_reference.rst @@ -0,0 +1,56 @@ +========================== +Zeebe Adapter Reference +========================== + +.. autoclass:: pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter + :members: + :undoc-members: + +========================== +Zeebe GRPC Responses +========================== + +.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceWithResultResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.CancelProcessInstanceResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.DeployProcessResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.DeployResourceResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.PublishMessageResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.CompleteJobResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.FailJobResponse + :members: + :undoc-members: + :member-order: bysource + +.. autoclass:: pyzeebe.grpc_internals.types.ThrowErrorResponse + :members: + :undoc-members: + :member-order: bysource diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index 61d02150..f128e8b7 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -1,8 +1,16 @@ -from typing import Any, Dict, Iterable, Optional, Tuple +from typing import Iterable, Optional import grpc from typing_extensions import deprecated +from pyzeebe.grpc_internals.types import ( + CancelProcessInstanceResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, + DeployProcessResponse, + DeployResourceResponse, + PublishMessageResponse, +) from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter from pyzeebe.types import Variables @@ -25,7 +33,7 @@ async def run_process( variables: Optional[Variables] = None, version: int = -1, tenant_id: Optional[str] = None, - ) -> int: + ) -> CreateProcessInstanceResponse: """ Run process @@ -36,7 +44,7 @@ async def run_process( tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3. Returns: - int: process_instance_key, the unique id of the running process generated by Zeebe. + CreateProcessInstanceResponse: response from Zeebe. Raises: ProcessDefinitionNotFoundError: No process with bpmn_process_id exists @@ -60,7 +68,7 @@ async def run_process_with_result( timeout: int = 0, variables_to_fetch: Optional[Iterable[str]] = None, tenant_id: Optional[str] = None, - ) -> Tuple[int, Dict[str, Any]]: + ) -> CreateProcessInstanceWithResultResponse: """ Run process and wait for the result. @@ -73,7 +81,7 @@ async def run_process_with_result( tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3. Returns: - tuple: (The process instance key, A dictionary of the end state of the process instance) + CreateProcessInstanceWithResultResponse: response from Zeebe. Raises: ProcessDefinitionNotFoundError: No process with bpmn_process_id exists @@ -95,7 +103,7 @@ async def run_process_with_result( tenant_id=tenant_id, ) - async def cancel_process_instance(self, process_instance_key: int) -> int: + async def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: """ Cancel a running process instance @@ -103,7 +111,7 @@ async def cancel_process_instance(self, process_instance_key: int) -> int: process_instance_key (int): The key of the running process to cancel Returns: - int: The process_instance_key + CancelProcessInstanceResponse: response from Zeebe. Raises: ProcessInstanceNotFoundError: If no process instance with process_instance_key exists @@ -113,17 +121,19 @@ async def cancel_process_instance(self, process_instance_key: int) -> int: UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code """ - await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key) - return process_instance_key + return await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key) @deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead") - async def deploy_process(self, *process_file_path: str) -> None: + async def deploy_process(self, *process_file_path: str) -> DeployProcessResponse: """ Deploy one or more processes Args: process_file_path (str): The file path to a process definition file (bpmn/yaml) + Returns: + DeployProcessResponse: response from Zeebe. + Raises: ProcessInvalidError: If one of the process file definitions is invalid ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) @@ -132,9 +142,11 @@ async def deploy_process(self, *process_file_path: str) -> None: UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code """ - await self.zeebe_adapter.deploy_process(*process_file_path) + return await self.zeebe_adapter.deploy_process(*process_file_path) - async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None: + async def deploy_resource( + self, *resource_file_path: str, tenant_id: Optional[str] = None + ) -> DeployResourceResponse: """ Deploy one or more processes @@ -144,6 +156,9 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st resource_file_path (str): The file path to a resource definition file (bpmn/dmn/form) tenant_id (str): The tenant ID of the resources to deploy. New in Zeebe 8.3. + Returns: + DeployResourceResponse: response from Zeebe. + Raises: ProcessInvalidError: If one of the process file definitions is invalid ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) @@ -152,7 +167,7 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code """ - await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id) + return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id) async def publish_message( self, @@ -162,7 +177,7 @@ async def publish_message( time_to_live_in_milliseconds: int = 60000, message_id: Optional[str] = None, tenant_id: Optional[str] = None, - ) -> None: + ) -> PublishMessageResponse: """ Publish a message @@ -175,15 +190,18 @@ async def publish_message( active, a MessageAlreadyExists will be raised. tenant_id (str): The tenant ID of the message. New in Zeebe 8.3. + Returns: + PublishMessageResponse: response from Zeebe. + Raises: - MessageAlreadyExistError: If a message with message_id already exists + MessageAlreadyExistsError: If a message with message_id already exists ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests) ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable ZeebeInternalError: If Zeebe experiences an internal error UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code """ - await self.zeebe_adapter.publish_message( + return await self.zeebe_adapter.publish_message( name=name, correlation_key=correlation_key, time_to_live_in_milliseconds=time_to_live_in_milliseconds, diff --git a/pyzeebe/client/sync_client.py b/pyzeebe/client/sync_client.py index edcadd45..89c7f9a5 100644 --- a/pyzeebe/client/sync_client.py +++ b/pyzeebe/client/sync_client.py @@ -1,10 +1,18 @@ import asyncio -from typing import Any, Dict, List, Optional, Tuple +from typing import List, Optional import grpc from typing_extensions import deprecated from pyzeebe import ZeebeClient +from pyzeebe.grpc_internals.types import ( + CancelProcessInstanceResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, + DeployProcessResponse, + DeployResourceResponse, + PublishMessageResponse, +) from pyzeebe.types import Variables @@ -19,7 +27,7 @@ def run_process( variables: Optional[Variables] = None, version: int = -1, tenant_id: Optional[str] = None, - ) -> int: + ) -> CreateProcessInstanceResponse: return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id)) def run_process_with_result( @@ -30,21 +38,21 @@ def run_process_with_result( timeout: int = 0, variables_to_fetch: Optional[List[str]] = None, tenant_id: Optional[str] = None, - ) -> Tuple[int, Dict[str, Any]]: + ) -> CreateProcessInstanceWithResultResponse: return self.loop.run_until_complete( self.client.run_process_with_result( bpmn_process_id, variables, version, timeout, variables_to_fetch, tenant_id ) ) - def cancel_process_instance(self, process_instance_key: int) -> int: + def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key)) @deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead") - def deploy_process(self, *process_file_path: str) -> None: + def deploy_process(self, *process_file_path: str) -> DeployProcessResponse: return self.loop.run_until_complete(self.client.deploy_process(*process_file_path)) - def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None: + def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> DeployResourceResponse: return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id)) def publish_message( @@ -55,7 +63,7 @@ def publish_message( time_to_live_in_milliseconds: int = 60000, message_id: Optional[str] = None, tenant_id: Optional[str] = None, - ) -> None: + ) -> PublishMessageResponse: return self.loop.run_until_complete( self.client.publish_message( name, diff --git a/pyzeebe/grpc_internals/types.py b/pyzeebe/grpc_internals/types.py new file mode 100644 index 00000000..e0110fd5 --- /dev/null +++ b/pyzeebe/grpc_internals/types.py @@ -0,0 +1,165 @@ +from dataclasses import dataclass +from typing import List, Optional, Union + +from pyzeebe.types import Variables + + +@dataclass(frozen=True) +class CreateProcessInstanceResponse: + #: the key of the process definition which was used to create the process instance + process_definition_key: int + #: the BPMN process ID of the process definition which was used to create the process + #: instance + bpmn_process_id: str + #: the version of the process definition which was used to create the process instance + version: int + #: the unique identifier of the created process instance; to be used wherever a request + #: needs a process instance key (e.g. CancelProcessInstanceRequest) + process_instance_key: int + #: the tenant ID of the created process instance + tenant_id: Optional[str] + + +@dataclass(frozen=True) +class CreateProcessInstanceWithResultResponse: + #: the key of the process definition which was used to create the process instance + process_definition_key: int + #: the BPMN process ID of the process definition which was used to create the process + #: instance + bpmn_process_id: str + #: the version of the process definition which was used to create the process instance + version: int + #: the unique identifier of the created process instance; to be used wherever a request + #: needs a process instance key (e.g. CancelProcessInstanceRequest) + process_instance_key: int + #: consisting of all visible variables to the root scope + variables: Variables + #: the tenant ID of the process definition + tenant_id: Optional[str] + + +@dataclass(frozen=True) +class CancelProcessInstanceResponse: + pass + + +@dataclass(frozen=True) +class DeployProcessResponse: + @dataclass(frozen=True) + class ProcessMetadata: + #: the bpmn process ID, as parsed during deployment; together with the version forms a + #: unique identifier for a specific process definition + bpmn_process_id: str + #: the assigned process version + version: int + #: the assigned key, which acts as a unique identifier for this process + process_definition_key: int + #: the resource name (see: ProcessRequestObject.name) from which this process was + #: parsed + resource_name: str + + #: the unique key identifying the deployment + key: int + #: a list of deployed processes + processes: List[ProcessMetadata] + + +@dataclass(frozen=True) +class DeployResourceResponse: + @dataclass(frozen=True) + class ProcessMetadata: + #: the bpmn process ID, as parsed during deployment; together with the version forms a + #: unique identifier for a specific process definition + bpmn_process_id: str + #: the assigned process version + version: int + #: the assigned key, which acts as a unique identifier for this process + process_definition_key: int + #: the resource name (see: ProcessRequestObject.name) from which this process was + #: parsed + resource_name: str + #: the tenant ID of the deployed process + tenant_id: Optional[str] + + @dataclass(frozen=True) + class DecisionMetadata: + #: the dmn decision ID, as parsed during deployment; together with the + #: versions forms a unique identifier for a specific decision + dmn_decision_id: str + #: the dmn name of the decision, as parsed during deployment + dmn_decision_name: str + #: the assigned decision version + version: int + #: the assigned decision key, which acts as a unique identifier for this + #: decision + decision_key: int + #: the dmn ID of the decision requirements graph that this decision is part + #: of, as parsed during deployment + dmn_decision_requirements_id: str + #: the assigned key of the decision requirements graph that this decision is + #: part of + decision_requirements_key: int + #: the tenant ID of the deployed decision + tenant_id: Optional[str] + + @dataclass(frozen=True) + class DecisionRequirementsMetadata: + #: the dmn decision requirements ID, as parsed during deployment; together + #: with the versions forms a unique identifier for a specific decision + dmn_decision_requirements_id: str + #: the dmn name of the decision requirements, as parsed during deployment + dmn_decision_requirements_name: str + #: the assigned decision requirements version + version: int + #: the assigned decision requirements key, which acts as a unique identifier + #: for this decision requirements + decision_requirements_key: int + #: the resource name (see: Resource.name) from which this decision + #: requirements was parsed + resource_name: str + #: the tenant ID of the deployed decision requirements + tenant_id: Optional[str] + + @dataclass(frozen=True) + class FormMetadata: + #: the form ID, as parsed during deployment; together with the + #: versions forms a unique identifier for a specific form + form_id: str + #: the assigned form version + version: int + #: the assigned key, which acts as a unique identifier for this form + form_key: int + #: the resource name + resource_name: str + #: the tenant ID of the deployed form + tenant_id: Optional[str] + + #: the unique key identifying the deployment + key: int + #: a list of deployed resources, e.g. processes + deployments: List[Union[ProcessMetadata, DecisionMetadata, DecisionRequirementsMetadata, FormMetadata]] + #: the tenant ID of the deployed resources + tenant_id: Optional[str] + + +@dataclass(frozen=True) +class PublishMessageResponse: + #: the unique ID of the message that was published + key: int + #: the tenant ID of the message + tenant_id: Optional[str] + + +@dataclass(frozen=True) +class CompleteJobResponse: + pass + + +@dataclass(frozen=True) +class FailJobResponse: + pass + + +@dataclass(frozen=True) +class ThrowErrorResponse: + pass diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index f259ea77..7f8720c7 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -7,11 +7,8 @@ ActivatedJob, ActivateJobsRequest, CompleteJobRequest, - CompleteJobResponse, FailJobRequest, - FailJobResponse, ThrowErrorRequest, - ThrowErrorResponse, ) from pyzeebe.errors import ( @@ -24,6 +21,8 @@ from pyzeebe.job.job import Job from pyzeebe.types import Variables +from .types import CompleteJobResponse, FailJobResponse, ThrowErrorResponse + logger = logging.getLogger(__name__) @@ -80,9 +79,7 @@ def _create_job_from_raw_job(self, response: ActivatedJob) -> Job: async def complete_job(self, job_key: int, variables: Variables) -> CompleteJobResponse: try: - return await self._gateway_stub.CompleteJob( - CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables)) - ) + await self._gateway_stub.CompleteJob(CompleteJobRequest(jobKey=job_key, variables=json.dumps(variables))) except grpc.aio.AioRpcError as grpc_error: if is_error_status(grpc_error, grpc.StatusCode.NOT_FOUND): raise JobNotFoundError(job_key=job_key) from grpc_error @@ -90,11 +87,13 @@ async def complete_job(self, job_key: int, variables: Variables) -> CompleteJobR raise JobAlreadyDeactivatedError(job_key=job_key) from grpc_error await self._handle_grpc_error(grpc_error) + return CompleteJobResponse() + async def fail_job( self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Variables ) -> FailJobResponse: try: - return await self._gateway_stub.FailJob( + await self._gateway_stub.FailJob( FailJobRequest( jobKey=job_key, retries=retries, @@ -110,11 +109,13 @@ async def fail_job( raise JobAlreadyDeactivatedError(job_key=job_key) from grpc_error await self._handle_grpc_error(grpc_error) + return FailJobResponse() + async def throw_error( self, job_key: int, message: str, variables: Variables, error_code: str = "" ) -> ThrowErrorResponse: try: - return await self._gateway_stub.ThrowError( + await self._gateway_stub.ThrowError( ThrowErrorRequest( jobKey=job_key, errorMessage=message, @@ -128,3 +129,5 @@ async def throw_error( elif is_error_status(grpc_error, grpc.StatusCode.FAILED_PRECONDITION): raise JobAlreadyDeactivatedError(job_key=job_key) from grpc_error await self._handle_grpc_error(grpc_error) + + return ThrowErrorResponse() diff --git a/pyzeebe/grpc_internals/zeebe_message_adapter.py b/pyzeebe/grpc_internals/zeebe_message_adapter.py index ff6fb278..6cd5018d 100644 --- a/pyzeebe/grpc_internals/zeebe_message_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_message_adapter.py @@ -2,13 +2,15 @@ from typing import Optional import grpc -from zeebe_grpc.gateway_pb2 import PublishMessageRequest, PublishMessageResponse +from zeebe_grpc.gateway_pb2 import PublishMessageRequest from pyzeebe.errors import MessageAlreadyExistsError from pyzeebe.grpc_internals.grpc_utils import is_error_status from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase from pyzeebe.types import Variables +from .types import PublishMessageResponse + class ZeebeMessageAdapter(ZeebeAdapterBase): async def publish_message( @@ -21,7 +23,7 @@ async def publish_message( tenant_id: Optional[str] = None, ) -> PublishMessageResponse: try: - return await self._gateway_stub.PublishMessage( + response = await self._gateway_stub.PublishMessage( PublishMessageRequest( name=name, correlationKey=correlation_key, @@ -35,3 +37,5 @@ async def publish_message( if is_error_status(grpc_error, grpc.StatusCode.ALREADY_EXISTS): raise MessageAlreadyExistsError() from grpc_error await self._handle_grpc_error(grpc_error) + + return PublishMessageResponse(key=response.key, tenant_id=response.tenantId) diff --git a/pyzeebe/grpc_internals/zeebe_process_adapter.py b/pyzeebe/grpc_internals/zeebe_process_adapter.py index 83631827..4dcf1f0a 100644 --- a/pyzeebe/grpc_internals/zeebe_process_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_process_adapter.py @@ -1,6 +1,6 @@ import json import os -from typing import Any, Dict, Iterable, NoReturn, Optional, Tuple, cast +from typing import Any, Callable, Dict, Iterable, List, NoReturn, Optional, Union import aiofiles import grpc @@ -9,10 +9,12 @@ CancelProcessInstanceRequest, CreateProcessInstanceRequest, CreateProcessInstanceWithResultRequest, + DecisionMetadata, + DecisionRequirementsMetadata, DeployProcessRequest, - DeployProcessResponse, DeployResourceRequest, - DeployResourceResponse, + FormMetadata, + ProcessMetadata, ProcessRequestObject, Resource, ) @@ -29,6 +31,14 @@ from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase from pyzeebe.types import Variables +from .types import ( + CancelProcessInstanceResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, + DeployProcessResponse, + DeployResourceResponse, +) + class ZeebeProcessAdapter(ZeebeAdapterBase): async def create_process_instance( @@ -37,7 +47,7 @@ async def create_process_instance( version: int, variables: Variables, tenant_id: Optional[str] = None, - ) -> int: + ) -> CreateProcessInstanceResponse: try: response = await self._gateway_stub.CreateProcessInstance( CreateProcessInstanceRequest( @@ -49,7 +59,14 @@ async def create_process_instance( ) except grpc.aio.AioRpcError as grpc_error: await self._create_process_errors(grpc_error, bpmn_process_id, version, variables) - return cast(int, response.processInstanceKey) + + return CreateProcessInstanceResponse( + process_definition_key=response.processDefinitionKey, + bpmn_process_id=response.bpmnProcessId, + version=response.version, + process_instance_key=response.processInstanceKey, + tenant_id=response.tenantId, + ) async def create_process_instance_with_result( self, @@ -59,7 +76,7 @@ async def create_process_instance_with_result( timeout: int, variables_to_fetch: Iterable[str], tenant_id: Optional[str] = None, - ) -> Tuple[int, Dict[str, Any]]: + ) -> CreateProcessInstanceWithResultResponse: try: response = await self._gateway_stub.CreateProcessInstanceWithResult( CreateProcessInstanceWithResultRequest( @@ -75,7 +92,15 @@ async def create_process_instance_with_result( ) except grpc.aio.AioRpcError as grpc_error: await self._create_process_errors(grpc_error, bpmn_process_id, version, variables) - return response.processInstanceKey, json.loads(response.variables) + + return CreateProcessInstanceWithResultResponse( + process_definition_key=response.processDefinitionKey, + bpmn_process_id=response.bpmnProcessId, + version=response.version, + process_instance_key=response.processInstanceKey, + variables=json.loads(response.variables), + tenant_id=response.tenantId, + ) async def _create_process_errors( self, grpc_error: grpc.aio.AioRpcError, bpmn_process_id: str, version: int, variables: Dict[str, Any] @@ -92,7 +117,7 @@ async def _create_process_errors( raise ProcessTimeoutError(bpmn_process_id) from grpc_error await self._handle_grpc_error(grpc_error) - async def cancel_process_instance(self, process_instance_key: int) -> None: + async def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: try: await self._gateway_stub.CancelProcessInstance( CancelProcessInstanceRequest(processInstanceKey=process_instance_key) @@ -102,10 +127,12 @@ async def cancel_process_instance(self, process_instance_key: int) -> None: raise ProcessInstanceNotFoundError(process_instance_key=process_instance_key) from grpc_error await self._handle_grpc_error(grpc_error) + return CancelProcessInstanceResponse() + @deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead") async def deploy_process(self, *process_file_path: str) -> DeployProcessResponse: try: - return await self._gateway_stub.DeployProcess( + response = await self._gateway_stub.DeployProcess( DeployProcessRequest( processes=[await result for result in map(_create_process_request, process_file_path)] ) @@ -115,11 +142,24 @@ async def deploy_process(self, *process_file_path: str) -> DeployProcessResponse raise ProcessInvalidError() from grpc_error await self._handle_grpc_error(grpc_error) + return DeployProcessResponse( + key=response.key, + processes=[ + DeployProcessResponse.ProcessMetadata( + bpmn_process_id=process.bpmnProcessId, + version=process.version, + process_definition_key=process.processDefinitionKey, + resource_name=process.resourceName, + ) + for process in response.processes + ], + ) + async def deploy_resource( self, *resource_file_path: str, tenant_id: Optional[str] = None ) -> DeployResourceResponse: try: - return await self._gateway_stub.DeployResource( + response = await self._gateway_stub.DeployResource( DeployResourceRequest( resources=[await result for result in map(_create_resource_request, resource_file_path)], tenantId=tenant_id, @@ -130,6 +170,89 @@ async def deploy_resource( raise ProcessInvalidError() from grpc_error await self._handle_grpc_error(grpc_error) + deployments: List[ + Union[ + DeployResourceResponse.ProcessMetadata, + DeployResourceResponse.DecisionMetadata, + DeployResourceResponse.DecisionRequirementsMetadata, + DeployResourceResponse.FormMetadata, + ] + ] = [] + for deployment in response.deployments: + metadata_field = deployment.WhichOneof("Metadata") + metadata = getattr(deployment, metadata_field) + deployments.append(_METADATA_PARSERS[metadata_field](metadata)) + + return DeployResourceResponse( + key=response.key, + deployments=deployments, + tenant_id=response.tenantId, + ) + + @staticmethod + def _create_process_from_raw_process(response: ProcessMetadata) -> DeployResourceResponse.ProcessMetadata: + return DeployResourceResponse.ProcessMetadata( + bpmn_process_id=response.bpmnProcessId, + version=response.version, + process_definition_key=response.processDefinitionKey, + resource_name=response.resourceName, + tenant_id=response.tenantId, + ) + + @staticmethod + def _create_decision_from_raw_decision(response: DecisionMetadata) -> DeployResourceResponse.DecisionMetadata: + return DeployResourceResponse.DecisionMetadata( + dmn_decision_id=response.dmnDecisionId, + dmn_decision_name=response.dmnDecisionName, + version=response.version, + decision_key=response.decisionKey, + dmn_decision_requirements_id=response.dmnDecisionRequirementsId, + decision_requirements_key=response.decisionRequirementsKey, + tenant_id=response.tenantId, + ) + + @staticmethod + def _create_decision_requirements_from_raw_decision_requirements( + response: DecisionRequirementsMetadata, + ) -> DeployResourceResponse.DecisionRequirementsMetadata: + return DeployResourceResponse.DecisionRequirementsMetadata( + dmn_decision_requirements_id=response.dmnDecisionRequirementsId, + dmn_decision_requirements_name=response.dmnDecisionRequirementsName, + version=response.version, + decision_requirements_key=response.decisionRequirementsKey, + resource_name=response.resourceName, + tenant_id=response.tenantId, + ) + + @staticmethod + def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse.FormMetadata: + return DeployResourceResponse.FormMetadata( + form_id=response.formId, + version=response.version, + form_key=response.formKey, + resource_name=response.resourceName, + tenant_id=response.tenantId, + ) + + +_METADATA_PARSERS: Dict[ + str, + Callable[ + [Union[ProcessMetadata, DecisionMetadata, DecisionRequirementsMetadata, FormMetadata]], + Union[ + DeployResourceResponse.ProcessMetadata, + DeployResourceResponse.DecisionMetadata, + DeployResourceResponse.DecisionRequirementsMetadata, + DeployResourceResponse.FormMetadata, + ], + ], +] = { + "process": ZeebeProcessAdapter._create_process_from_raw_process, + "decision": ZeebeProcessAdapter._create_decision_from_raw_decision, + "decisionRequirements": ZeebeProcessAdapter._create_decision_requirements_from_raw_decision_requirements, + "form": ZeebeProcessAdapter._create_form_from_raw_form, +} + async def _create_process_request(process_file_path: str) -> ProcessRequestObject: async with aiofiles.open(process_file_path, "rb") as file: diff --git a/tests/integration/cancel_process_test.py b/tests/integration/cancel_process_test.py index a5f622ad..fbb7b645 100644 --- a/tests/integration/cancel_process_test.py +++ b/tests/integration/cancel_process_test.py @@ -4,6 +4,6 @@ async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: Dict): - process_key = await zeebe_client.run_process(process_name, process_variables) + response = await zeebe_client.run_process(process_name, process_variables) - await zeebe_client.cancel_process_instance(process_key) + await zeebe_client.cancel_process_instance(response.process_instance_key) diff --git a/tests/integration/run_process_test.py b/tests/integration/run_process_test.py index 845ed68c..85cb9863 100644 --- a/tests/integration/run_process_test.py +++ b/tests/integration/run_process_test.py @@ -16,7 +16,7 @@ async def test_run_process( initial_amount_of_processes = process_stats.get_process_runs() process_instance_key = await zeebe_client.run_process(process_name, process_variables) - await wait_for_process(process_instance_key, process_stats) + await wait_for_process(process_instance_key.process_instance_key, process_stats) assert process_stats.get_process_runs() == initial_amount_of_processes + 1 @@ -27,8 +27,8 @@ async def test_non_existent_process(zeebe_client: ZeebeClient): async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: Dict): - _, process_result = await zeebe_client.run_process_with_result( + response = await zeebe_client.run_process_with_result( process_name, process_variables, timeout=PROCESS_TIMEOUT_IN_MS ) - assert process_result["output"].startswith(process_variables["input"]) + assert response.variables["output"].startswith(process_variables["input"]) diff --git a/tests/unit/client/client_test.py b/tests/unit/client/client_test.py index fcb6ee9a..50c13e46 100644 --- a/tests/unit/client/client_test.py +++ b/tests/unit/client/client_test.py @@ -4,39 +4,56 @@ import pytest +from pyzeebe import ZeebeClient from pyzeebe.errors import ProcessDefinitionNotFoundError +from pyzeebe.grpc_internals.types import ( + CancelProcessInstanceResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, +) +from tests.unit.utils.gateway_mock import GatewayMock @pytest.mark.asyncio -async def test_run_process(zeebe_client, grpc_servicer): +async def test_run_process(zeebe_client: ZeebeClient, grpc_servicer: GatewayMock): bpmn_process_id = str(uuid4()) version = randint(0, 10) grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) assert isinstance( - await zeebe_client.run_process(bpmn_process_id=bpmn_process_id, variables={}, version=version), int + await zeebe_client.run_process(bpmn_process_id=bpmn_process_id, variables={}, version=version), + CreateProcessInstanceResponse, ) @pytest.mark.asyncio class TestRunProcessWithResult: - async def test_run_process_with_result_instance_key_is_int(self, zeebe_client, deployed_process): + async def test_run_process_with_result_type(self, zeebe_client: ZeebeClient, deployed_process): bpmn_process_id, version = deployed_process - process_instance_key, _ = await zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + response = await zeebe_client.run_process_with_result(bpmn_process_id, {}, version) - assert isinstance(process_instance_key, int) + assert isinstance(response, CreateProcessInstanceWithResultResponse) - async def test_run_process_with_result_output_variables_are_as_expected(self, zeebe_client, deployed_process): + async def test_run_process_with_result_instance_key_is_int(self, zeebe_client: ZeebeClient, deployed_process): + bpmn_process_id, version = deployed_process + + response = await zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + + assert isinstance(response.process_instance_key, int) + + async def test_run_process_with_result_output_variables_are_as_expected( + self, zeebe_client: ZeebeClient, deployed_process + ): expected = {} bpmn_process_id, version = deployed_process - _, output_variables = await zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + response = await zeebe_client.run_process_with_result(bpmn_process_id, {}, version) - assert output_variables == expected + assert response.variables == expected @pytest.mark.asyncio -async def test_deploy_process(zeebe_client): +async def test_deploy_process(zeebe_client: ZeebeClient): zeebe_client.zeebe_adapter.deploy_process = AsyncMock() file_path = str(uuid4()) await zeebe_client.deploy_process(file_path) @@ -44,7 +61,7 @@ async def test_deploy_process(zeebe_client): @pytest.mark.asyncio -async def test_deploy_resource(zeebe_client): +async def test_deploy_resource(zeebe_client: ZeebeClient): zeebe_client.zeebe_adapter.deploy_resource = AsyncMock() file_path = str(uuid4()) await zeebe_client.deploy_resource(file_path) @@ -52,28 +69,29 @@ async def test_deploy_resource(zeebe_client): @pytest.mark.asyncio -async def test_run_non_existent_process(zeebe_client): +async def test_run_non_existent_process(zeebe_client: ZeebeClient): with pytest.raises(ProcessDefinitionNotFoundError): await zeebe_client.run_process(bpmn_process_id=str(uuid4())) @pytest.mark.asyncio -async def test_run_non_existent_process_with_result(zeebe_client): +async def test_run_non_existent_process_with_result(zeebe_client: ZeebeClient): with pytest.raises(ProcessDefinitionNotFoundError): await zeebe_client.run_process_with_result(bpmn_process_id=str(uuid4())) @pytest.mark.asyncio -async def test_cancel_process_instance(zeebe_client, grpc_servicer): +async def test_cancel_process_instance(zeebe_client: ZeebeClient, grpc_servicer: GatewayMock): bpmn_process_id = str(uuid4()) version = randint(0, 10) grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) - process_instance_key = await zeebe_client.run_process( - bpmn_process_id=bpmn_process_id, variables={}, version=version + response = await zeebe_client.run_process(bpmn_process_id=bpmn_process_id, variables={}, version=version) + assert isinstance( + await zeebe_client.cancel_process_instance(process_instance_key=response.process_instance_key), + CancelProcessInstanceResponse, ) - assert isinstance(await zeebe_client.cancel_process_instance(process_instance_key=process_instance_key), int) @pytest.mark.asyncio -async def test_publish_message(zeebe_client): +async def test_publish_message(zeebe_client: ZeebeClient): await zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4())) diff --git a/tests/unit/client/sync_client_test.py b/tests/unit/client/sync_client_test.py index 83a924a4..ba6cb6a3 100644 --- a/tests/unit/client/sync_client_test.py +++ b/tests/unit/client/sync_client_test.py @@ -6,6 +6,11 @@ from pyzeebe import SyncZeebeClient from pyzeebe.errors import ProcessDefinitionNotFoundError +from pyzeebe.grpc_internals.types import ( + CancelProcessInstanceResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, +) @pytest.fixture @@ -17,12 +22,12 @@ def sync_zeebe_client(event_loop, aio_grpc_channel: grpc.aio.Channel) -> SyncZee class TestRunProcess: - def test_run_process_returns_int(self, sync_zeebe_client: SyncZeebeClient, deployed_process): + def test_run_process_returns(self, sync_zeebe_client: SyncZeebeClient, deployed_process): bpmn_process_id, version = deployed_process - process_instance_key = sync_zeebe_client.run_process(bpmn_process_id, version=version) + response = sync_zeebe_client.run_process(bpmn_process_id, version=version) - assert isinstance(process_instance_key, int) + assert isinstance(response, CreateProcessInstanceResponse) def test_raises_process_definition_not_found_error_for_invalid_process_id(self, sync_zeebe_client: SyncZeebeClient): with pytest.raises(ProcessDefinitionNotFoundError): @@ -30,12 +35,19 @@ def test_raises_process_definition_not_found_error_for_invalid_process_id(self, class TestRunProcessWithResult: - def test_run_process_with_result_instance_key_is_int(self, sync_zeebe_client: SyncZeebeClient, deployed_process): + def test_run_process_with_result_returns(self, sync_zeebe_client: SyncZeebeClient, deployed_process): + bpmn_process_id, version = deployed_process + + response = sync_zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + + assert isinstance(response, CreateProcessInstanceWithResultResponse) + + def test_run_process_returns_int(self, sync_zeebe_client: SyncZeebeClient, deployed_process): bpmn_process_id, version = deployed_process - process_instance_key, _ = sync_zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + response = sync_zeebe_client.run_process(bpmn_process_id, version=version) - assert isinstance(process_instance_key, int) + assert isinstance(response.process_instance_key, int) def test_run_process_with_result_output_variables_are_as_expected( self, sync_zeebe_client: SyncZeebeClient, deployed_process @@ -43,9 +55,9 @@ def test_run_process_with_result_output_variables_are_as_expected( expected = {} bpmn_process_id, version = deployed_process - _, output_variables = sync_zeebe_client.run_process_with_result(bpmn_process_id, {}, version) + response = sync_zeebe_client.run_process_with_result(bpmn_process_id, {}, version) - assert output_variables == expected + assert response.variables == expected def test_raises_process_definition_not_found_error_for_invalid_process_id(self, sync_zeebe_client: SyncZeebeClient): with pytest.raises(ProcessDefinitionNotFoundError): @@ -55,13 +67,11 @@ def test_raises_process_definition_not_found_error_for_invalid_process_id(self, class TestCancelProcessInstance: def test_cancel_process_instance(self, sync_zeebe_client: SyncZeebeClient, deployed_process): bpmn_process_id, version = deployed_process - process_instance_key = sync_zeebe_client.run_process( - bpmn_process_id=bpmn_process_id, variables={}, version=version - ) + response = sync_zeebe_client.run_process(bpmn_process_id=bpmn_process_id, variables={}, version=version) - returned_process_instance_key = sync_zeebe_client.cancel_process_instance(process_instance_key) + cancel_response = sync_zeebe_client.cancel_process_instance(response.process_instance_key) - assert returned_process_instance_key == process_instance_key + assert isinstance(cancel_response, CancelProcessInstanceResponse) class TestDeployProcess: diff --git a/tests/unit/grpc_internals/zeebe_job_adapter_test.py b/tests/unit/grpc_internals/zeebe_job_adapter_test.py index 604cd320..4d3510d5 100644 --- a/tests/unit/grpc_internals/zeebe_job_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_job_adapter_test.py @@ -3,17 +3,17 @@ from uuid import uuid4 import pytest -from zeebe_grpc.gateway_pb2 import ( - CompleteJobResponse, - FailJobResponse, - ThrowErrorResponse, -) from pyzeebe.errors import ( ActivateJobsRequestInvalidError, JobAlreadyDeactivatedError, JobNotFoundError, ) +from pyzeebe.grpc_internals.types import ( + CompleteJobResponse, + FailJobResponse, + ThrowErrorResponse, +) from pyzeebe.grpc_internals.zeebe_job_adapter import ZeebeJobAdapter from pyzeebe.job.job import Job from pyzeebe.task.task import Task diff --git a/tests/unit/grpc_internals/zeebe_message_adapter_test.py b/tests/unit/grpc_internals/zeebe_message_adapter_test.py index 3efd90c4..37640d33 100644 --- a/tests/unit/grpc_internals/zeebe_message_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_message_adapter_test.py @@ -2,9 +2,9 @@ from uuid import uuid4 import pytest -from zeebe_grpc.gateway_pb2 import PublishMessageResponse from pyzeebe.errors import MessageAlreadyExistsError +from pyzeebe.grpc_internals.types import PublishMessageResponse from pyzeebe.grpc_internals.zeebe_message_adapter import ZeebeMessageAdapter from tests.unit.utils.random_utils import RANDOM_RANGE diff --git a/tests/unit/grpc_internals/zeebe_process_adapter_test.py b/tests/unit/grpc_internals/zeebe_process_adapter_test.py index ca1a2294..97a970eb 100644 --- a/tests/unit/grpc_internals/zeebe_process_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_process_adapter_test.py @@ -13,6 +13,11 @@ ProcessInvalidError, ProcessTimeoutError, ) +from pyzeebe.grpc_internals.types import ( + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, + DeployResourceResponse, +) from pyzeebe.grpc_internals.zeebe_process_adapter import ZeebeProcessAdapter from tests.unit.utils.gateway_mock import GatewayMock from tests.unit.utils.random_utils import RANDOM_RANGE @@ -38,7 +43,7 @@ async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeProcessAdap response = await zeebe_adapter.create_process_instance(bpmn_process_id, version, {}) - assert isinstance(response, int) + assert isinstance(response, CreateProcessInstanceResponse) async def test_raises_on_unkown_process(self, zeebe_adapter: ZeebeProcessAdapter): bpmn_process_id = str(uuid4()) @@ -69,6 +74,22 @@ async def test_raises_on_no_start_event(self, zeebe_adapter: ZeebeProcessAdapter @pytest.mark.asyncio class TestCreateProcessWithResult: + async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeProcessAdapter, grpc_servicer: GatewayMock): + bpmn_process_id = str(uuid4()) + version = randint(0, 10) + grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) + + response = await zeebe_adapter.create_process_instance_with_result( + bpmn_process_id=bpmn_process_id, + variables={}, + version=version, + timeout=0, + variables_to_fetch=[], + tenant_id=None, + ) + + assert isinstance(response, CreateProcessInstanceWithResultResponse) + async def test_process_instance_key_type_is_int( self, zeebe_adapter: ZeebeProcessAdapter, grpc_servicer: GatewayMock ): @@ -76,7 +97,7 @@ async def test_process_instance_key_type_is_int( version = randint(0, 10) grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) - process_instance_key, _ = await zeebe_adapter.create_process_instance_with_result( + response = await zeebe_adapter.create_process_instance_with_result( bpmn_process_id=bpmn_process_id, variables={}, version=version, @@ -85,14 +106,14 @@ async def test_process_instance_key_type_is_int( tenant_id=None, ) - assert isinstance(process_instance_key, int) + assert isinstance(response.process_instance_key, int) async def test_variables_type_is_dict(self, zeebe_adapter: ZeebeProcessAdapter, grpc_servicer: GatewayMock): bpmn_process_id = str(uuid4()) version = randint(0, 10) grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) - _, response = await zeebe_adapter.create_process_instance_with_result( + response = await zeebe_adapter.create_process_instance_with_result( bpmn_process_id=bpmn_process_id, variables={}, version=version, @@ -101,7 +122,7 @@ async def test_variables_type_is_dict(self, zeebe_adapter: ZeebeProcessAdapter, tenant_id=None, ) - assert isinstance(response, dict) + assert isinstance(response.variables, dict) async def test_raises_on_process_timeout(self, zeebe_adapter: ZeebeProcessAdapter, grpc_servicer: GatewayMock): bpmn_process_id = str(uuid4()) @@ -128,13 +149,13 @@ async def test_cancels_the_process(self, zeebe_adapter: ZeebeProcessAdapter, grp bpmn_process_id = str(uuid4()) version = randint(0, 10) grpc_servicer.mock_deploy_process(bpmn_process_id, version, []) - process_instance_key = await zeebe_adapter.create_process_instance( + response = await zeebe_adapter.create_process_instance( bpmn_process_id=bpmn_process_id, variables={}, version=version ) - await zeebe_adapter.cancel_process_instance(process_instance_key) + await zeebe_adapter.cancel_process_instance(response.process_instance_key) - assert process_instance_key not in grpc_servicer.active_processes.keys() + assert response.process_instance_key not in grpc_servicer.active_processes.keys() async def test_raises_on_already_cancelled_process( self, zeebe_adapter: ZeebeProcessAdapter, grpc_servicer: GatewayMock @@ -167,6 +188,30 @@ async def test_calls_open_in_rb_mode(self, zeebe_adapter: ZeebeProcessAdapter, m @pytest.mark.asyncio class TestDeployResource: + async def test_deploy_process_response_type(self, zeebe_adapter: ZeebeProcessAdapter): + file_path = str(uuid4()) + ".bpmn" + + response = await zeebe_adapter.deploy_resource(file_path) + + assert isinstance(response, DeployResourceResponse) + assert isinstance(response.deployments[0], DeployResourceResponse.ProcessMetadata) + + async def test_deploy_decision_response_type(self, zeebe_adapter: ZeebeProcessAdapter): + file_path = str(uuid4()) + ".dmn" + + response = await zeebe_adapter.deploy_resource(file_path) + + assert isinstance(response, DeployResourceResponse) + assert isinstance(response.deployments[0], DeployResourceResponse.DecisionMetadata) + + async def test_deploy_form_response_type(self, zeebe_adapter: ZeebeProcessAdapter): + file_path = str(uuid4()) + ".form" + + response = await zeebe_adapter.deploy_resource(file_path) + + assert isinstance(response, DeployResourceResponse) + assert isinstance(response.deployments[0], DeployResourceResponse.FormMetadata) + async def test_raises_on_invalid_process(self, zeebe_adapter: ZeebeProcessAdapter): error = grpc.aio.AioRpcError(grpc.StatusCode.INVALID_ARGUMENT, None, None) diff --git a/tests/unit/utils/gateway_mock.py b/tests/unit/utils/gateway_mock.py index e8ae7581..3d16cf02 100644 --- a/tests/unit/utils/gateway_mock.py +++ b/tests/unit/utils/gateway_mock.py @@ -163,16 +163,41 @@ def DeployProcess(self, request, context): def DeployResource(self, request, context): resources = [] for resource in request.resources: - process_metadata = Deployment( - process=ProcessMetadata( - bpmnProcessId=str(uuid4()), - version=randint(0, 10), - processDefinitionKey=randint(0, RANDOM_RANGE), - resourceName=resource.name, - tenantId=request.tenantId, + if resource.name.endswith("bpmn"): + metadata = Deployment( + process=ProcessMetadata( + bpmnProcessId=str(uuid4()), + version=randint(0, 10), + processDefinitionKey=randint(0, RANDOM_RANGE), + resourceName=resource.name, + tenantId=request.tenantId, + ) ) - ) - resources.append(process_metadata) + resources.append(metadata) + elif resource.name.endswith("dmn"): + metadata = Deployment( + decision=DecisionMetadata( + dmnDecisionId=str(uuid4()), + dmnDecisionName=resource.name, + version=randint(0, 10), + decisionKey=randint(0, RANDOM_RANGE), + dmnDecisionRequirementsId=str(uuid4()), + decisionRequirementsKey=randint(0, RANDOM_RANGE), + tenantId=request.tenantId, + ) + ) + resources.append(metadata) + elif resource.name.endswith("form"): + metadata = Deployment( + form=FormMetadata( + formId=str(uuid4()), + version=randint(0, 10), + formKey=randint(0, RANDOM_RANGE), + resourceName=resource.name, + tenantId=request.tenantId, + ) + ) + resources.append(metadata) return DeployResourceResponse(key=randint(0, RANDOM_RANGE), deployments=resources, tenantId=request.tenantId)