Skip to content

Commit

Permalink
Merge pull request #443 from dimastbk/wrap-zeebe-responses
Browse files Browse the repository at this point in the history
feat: wrap zeebe responses, return dataclass in zeebe adapter
  • Loading branch information
dimastbk authored Jul 23, 2024
2 parents 21ed649 + 33777bb commit 1e5c725
Show file tree
Hide file tree
Showing 17 changed files with 585 additions and 101 deletions.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ Table Of Contents
Channels <channels>
Decorators <decorators>
Exceptions <errors>
Zeebe Adapter <zeebe_adapter>
8 changes: 8 additions & 0 deletions docs/zeebe_adapter.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
=============
Zeebe Adapter
=============

.. toctree::
:name: zeebe_adapter

Reference <zeebe_adapter_reference>
56 changes: 56 additions & 0 deletions docs/zeebe_adapter_reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
==========================
Zeebe Adapter Reference
==========================

.. autoclass:: pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter
:members:
:undoc-members:

==========================
Zeebe GRPC Responses
==========================

.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CreateProcessInstanceWithResultResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CancelProcessInstanceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployProcessResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.DeployResourceResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.PublishMessageResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.CompleteJobResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.FailJobResponse
:members:
:undoc-members:
:member-order: bysource

.. autoclass:: pyzeebe.grpc_internals.types.ThrowErrorResponse
:members:
:undoc-members:
:member-order: bysource
50 changes: 34 additions & 16 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
from typing import Any, Dict, Iterable, Optional, Tuple
from typing import Iterable, Optional

import grpc
from typing_extensions import deprecated

from pyzeebe.grpc_internals.types import (
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployProcessResponse,
DeployResourceResponse,
PublishMessageResponse,
)
from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter
from pyzeebe.types import Variables

Expand All @@ -25,7 +33,7 @@ async def run_process(
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
) -> CreateProcessInstanceResponse:
"""
Run process
Expand All @@ -36,7 +44,7 @@ async def run_process(
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.
Returns:
int: process_instance_key, the unique id of the running process generated by Zeebe.
CreateProcessInstanceResponse: response from Zeebe.
Raises:
ProcessDefinitionNotFoundError: No process with bpmn_process_id exists
Expand All @@ -60,7 +68,7 @@ async def run_process_with_result(
timeout: int = 0,
variables_to_fetch: Optional[Iterable[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict[str, Any]]:
) -> CreateProcessInstanceWithResultResponse:
"""
Run process and wait for the result.
Expand All @@ -73,7 +81,7 @@ async def run_process_with_result(
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.
Returns:
tuple: (The process instance key, A dictionary of the end state of the process instance)
CreateProcessInstanceWithResultResponse: response from Zeebe.
Raises:
ProcessDefinitionNotFoundError: No process with bpmn_process_id exists
Expand All @@ -95,15 +103,15 @@ async def run_process_with_result(
tenant_id=tenant_id,
)

async def cancel_process_instance(self, process_instance_key: int) -> int:
async def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
"""
Cancel a running process instance
Args:
process_instance_key (int): The key of the running process to cancel
Returns:
int: The process_instance_key
CancelProcessInstanceResponse: response from Zeebe.
Raises:
ProcessInstanceNotFoundError: If no process instance with process_instance_key exists
Expand All @@ -113,17 +121,19 @@ async def cancel_process_instance(self, process_instance_key: int) -> int:
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key)
return process_instance_key
return await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key)

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
async def deploy_process(self, *process_file_path: str) -> None:
async def deploy_process(self, *process_file_path: str) -> DeployProcessResponse:
"""
Deploy one or more processes
Args:
process_file_path (str): The file path to a process definition file (bpmn/yaml)
Returns:
DeployProcessResponse: response from Zeebe.
Raises:
ProcessInvalidError: If one of the process file definitions is invalid
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
Expand All @@ -132,9 +142,11 @@ async def deploy_process(self, *process_file_path: str) -> None:
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
await self.zeebe_adapter.deploy_process(*process_file_path)
return await self.zeebe_adapter.deploy_process(*process_file_path)

async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
async def deploy_resource(
self, *resource_file_path: str, tenant_id: Optional[str] = None
) -> DeployResourceResponse:
"""
Deploy one or more processes
Expand All @@ -144,6 +156,9 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st
resource_file_path (str): The file path to a resource definition file (bpmn/dmn/form)
tenant_id (str): The tenant ID of the resources to deploy. New in Zeebe 8.3.
Returns:
DeployResourceResponse: response from Zeebe.
Raises:
ProcessInvalidError: If one of the process file definitions is invalid
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
Expand All @@ -152,7 +167,7 @@ async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[st
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)
return await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def publish_message(
self,
Expand All @@ -162,7 +177,7 @@ async def publish_message(
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
) -> PublishMessageResponse:
"""
Publish a message
Expand All @@ -175,15 +190,18 @@ async def publish_message(
active, a MessageAlreadyExists will be raised.
tenant_id (str): The tenant ID of the message. New in Zeebe 8.3.
Returns:
PublishMessageResponse: response from Zeebe.
Raises:
MessageAlreadyExistError: If a message with message_id already exists
MessageAlreadyExistsError: If a message with message_id already exists
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnknownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
await self.zeebe_adapter.publish_message(
return await self.zeebe_adapter.publish_message(
name=name,
correlation_key=correlation_key,
time_to_live_in_milliseconds=time_to_live_in_milliseconds,
Expand Down
22 changes: 15 additions & 7 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import asyncio
from typing import Any, Dict, List, Optional, Tuple
from typing import List, Optional

import grpc
from typing_extensions import deprecated

from pyzeebe import ZeebeClient
from pyzeebe.grpc_internals.types import (
CancelProcessInstanceResponse,
CreateProcessInstanceResponse,
CreateProcessInstanceWithResultResponse,
DeployProcessResponse,
DeployResourceResponse,
PublishMessageResponse,
)
from pyzeebe.types import Variables


Expand All @@ -19,7 +27,7 @@ def run_process(
variables: Optional[Variables] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
) -> CreateProcessInstanceResponse:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id))

def run_process_with_result(
Expand All @@ -30,21 +38,21 @@ def run_process_with_result(
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict[str, Any]]:
) -> CreateProcessInstanceWithResultResponse:
return self.loop.run_until_complete(
self.client.run_process_with_result(
bpmn_process_id, variables, version, timeout, variables_to_fetch, tenant_id
)
)

def cancel_process_instance(self, process_instance_key: int) -> int:
def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse:
return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key))

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
def deploy_process(self, *process_file_path: str) -> None:
def deploy_process(self, *process_file_path: str) -> DeployProcessResponse:
return self.loop.run_until_complete(self.client.deploy_process(*process_file_path))

def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> DeployResourceResponse:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

def publish_message(
Expand All @@ -55,7 +63,7 @@ def publish_message(
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
) -> PublishMessageResponse:
return self.loop.run_until_complete(
self.client.publish_message(
name,
Expand Down
Loading

0 comments on commit 1e5c725

Please sign in to comment.