Skip to content

Commit

Permalink
feat: Support standard environment variables
Browse files Browse the repository at this point in the history
Closes #81
  • Loading branch information
PSanetra committed Oct 9, 2024
1 parent cbe7617 commit 58ae243
Show file tree
Hide file tree
Showing 8 changed files with 508 additions and 69 deletions.
4 changes: 2 additions & 2 deletions pyzeebe/channel/insecure_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import grpc

from pyzeebe.channel.channel_options import get_channel_options
from pyzeebe.channel.utils import create_address
from pyzeebe.channel.utils import get_zeebe_address
from pyzeebe.types import ChannelArgumentType


Expand All @@ -22,5 +22,5 @@ def create_insecure_channel(
Returns:
grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway.
"""
grpc_address = create_address(grpc_address=grpc_address)
grpc_address = get_zeebe_address(grpc_address=grpc_address)
return grpc.aio.insecure_channel(target=grpc_address, options=get_channel_options(channel_options))
128 changes: 84 additions & 44 deletions pyzeebe/channel/oauth_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,26 @@
import grpc

from pyzeebe.channel.channel_options import get_channel_options
from pyzeebe.channel.utils import (
get_camunda_client_id,
get_camunda_client_secret,
get_camunda_cloud_cluster_id,
get_camunda_cloud_cluster_region,
get_camunda_cloud_hostname,
get_camunda_credentials_scopes,
get_camunda_oauth_url,
get_camunda_token_audience,
get_zeebe_address,
)
from pyzeebe.credentials.oauth import Oauth2ClientCredentialsMetadataPlugin
from pyzeebe.types import ChannelArgumentType


def create_oauth2_client_credentials_channel(
grpc_address: str,
client_id: str,
client_secret: str,
authorization_server: str,
grpc_address: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
authorization_server: Optional[str] = None,
scope: Optional[str] = None,
audience: Optional[str] = None,
channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials(),
Expand All @@ -26,14 +37,21 @@ def create_oauth2_client_credentials_channel(
https://datatracker.ietf.org/doc/html/rfc6749#section-11.2.2
Args:
grpc_address (str): Zeebe Gateway Address.
client_id (str): The client id.
client_secret (str): The client secret.
authorization_server (str): The authorization server issuing access tokens
grpc_address (Optional[str]): Zeebe Gateway Address.
Defaults to value from ZEEBE_ADDRESS environment variable
or "{CAMUNDA_CLUSTER_ID}.{CAMUNDA_CLUSTER_REGION}.zeebe.camunda.io:443"
or "localhost:26500".
client_id (Optional[str]): The client id.
Defaults to value from CAMUNDA_CLIENT_ID or ZEEBE_CLIENT_ID environment variable
client_secret (Optional[str]): The client secret.
Defaults to value from CAMUNDA_CLIENT_SECRET or ZEEBE_CLIENT_SECRET environment variable
authorization_server (Optional[str]): The authorization server issuing access tokens
to the client after successfully authenticating the client.
scope (Optional[str]): The scope of the access request. Defaults to None.
audience (Optional[str]): The audience for authentication. Defaults to None.
Defaults to value from CAMUNDA_OAUTH_URL or ZEEBE_AUTHORIZATION_SERVER_URL environment variable
scope (Optional[str]): The scope of the access request.
Defaults to value from CAMUNDA_CREDENTIALS_SCOPES environment variable.
audience (Optional[str]): The audience for authentication.
Defaults to value from CAMUNDA_TOKEN_AUDIENCE or ZEEBE_TOKEN_AUDIENCE environment variable
channel_credentials (grpc.ChannelCredentials): The gRPC channel credentials.
Defaults to grpc.ssl_channel_credentials().
Expand All @@ -53,12 +71,17 @@ def create_oauth2_client_credentials_channel(
InvalidOAuthCredentialsError: One of the provided camunda credentials is not correct
"""

authorization_server = get_camunda_oauth_url(authorization_server)

if not authorization_server:
raise ValueError('ZEEBE_AUTHORIZATION_SERVER_URL is not configured')

oauth2_client_credentials = Oauth2ClientCredentialsMetadataPlugin(
client_id=client_id,
client_secret=client_secret,
authorization_server=authorization_server,
scope=scope,
audience=audience,
client_id=get_camunda_client_id(client_id),
client_secret=get_camunda_client_secret(client_secret),
authorization_server=authorization_server or '',
scope=get_camunda_credentials_scopes(scope),
audience=get_camunda_token_audience(audience),
leeway=leeway,
expire_in=expire_in,
)
Expand All @@ -69,20 +92,22 @@ def create_oauth2_client_credentials_channel(
)

channel: grpc.aio.Channel = grpc.aio.secure_channel(
target=grpc_address, credentials=composite_credentials, options=get_channel_options(channel_options)
target=get_zeebe_address(grpc_address),
credentials=composite_credentials,
options=get_channel_options(channel_options),
)

return channel


def create_camunda_cloud_channel(
client_id: str,
client_secret: str,
cluster_id: str,
region: str = "bru-2",
scope: str = "Zeebe",
authorization_server: str = "https://login.cloud.camunda.io/oauth/token",
audience: str = "zeebe.camunda.io",
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
cluster_id: Optional[str] = None,
region: Optional[str] = None,
scope: Optional[str] = None,
authorization_server: Optional[str] = None,
audience: Optional[str] = None,
channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials(),
channel_options: Optional[ChannelArgumentType] = None,
leeway: int = 60,
Expand All @@ -91,15 +116,26 @@ def create_camunda_cloud_channel(
"""Create a gRPC channel for connecting to Camunda 8 Cloud (SaaS).
Args:
client_id (str): The client id.
client_secret (str): The client secret.
cluster_id (str): The ID of the cluster to connect to.
region (Optional[str]): The region of the cluster. Defaults to "bru-2".
scope (Optional[str]): The scope of the access request. Defaults to "Zeebe".
client_id (Optional[str]): The client id.
Defaults to value from CAMUNDA_CLIENT_ID or ZEEBE_CLIENT_ID environment variable.
client_secret (Optional[str]): The client secret.
Defaults to value from CAMUNDA_CLIENT_SECRET or ZEEBE_CLIENT_SECRET environment variable.
cluster_id (Optional[str]): The ID of the cluster to connect to.
Defaults to value from CAMUNDA_CLUSTER_ID environment variable.
region (Optional[str]): The region of the cluster.
Defaults to value from CAMUNDA_CLUSTER_REGION environment variable or 'bru-2'.
scope (Optional[str]): The scope of the access request.
Defaults to value from CAMUNDA_CREDENTIALS_SCOPES environment variable or 'Zeebe'.
authorization_server (Optional[str]): The authorization server issuing access tokens
to the client after successfully authenticating the client.
Defaults to "https://login.cloud.camunda.io/oauth/token".
audience (Optional[str]): The audience for authentication. Defaults to "zeebe.camunda.io".
Defaults to value from CAMUNDA_OAUTH_URL
or ZEEBE_AUTHORIZATION_SERVER_URL environment variable
or "https://login.cloud.camunda.io/oauth/token".
audience (Optional[str]): The audience for authentication.
Defaults to value from CAMUNDA_TOKEN_AUDIENCE
or ZEEBE_TOKEN_AUDIENCE environment variable
or "{cluster_id}.{region}.zeebe.camunda.io"
or "zeebe.camunda.io".
channel_credentials (grpc.ChannelCredentials): The gRPC channel credentials.
Defaults to grpc.ssl_channel_credentials().
Expand All @@ -116,34 +152,38 @@ def create_camunda_cloud_channel(
grpc.aio.Channel: The gRPC channel for connecting to Camunda Cloud.
"""

grpc_address = f"{cluster_id}.{region}.zeebe.camunda.io:443"
cluster_id = get_camunda_cloud_cluster_id(cluster_id)
region = get_camunda_cloud_cluster_region(region)

oauth2_client_credentials = Oauth2ClientCredentialsMetadataPlugin(
client_id=client_id,
client_secret=client_secret,
authorization_server=authorization_server,
scope=scope,
audience=audience,
client_id=get_camunda_client_id(client_id),
client_secret=get_camunda_client_secret(client_secret),
authorization_server=get_camunda_oauth_url(authorization_server)
or "https://login.cloud.camunda.io/oauth/token",
scope=get_camunda_credentials_scopes(scope) or "Zeebe",
audience=get_camunda_token_audience(audience) or "zeebe.camunda.io",
leeway=leeway,
expire_in=expire_in,
)

# NOTE: Overwrite the _oauth.fetch_token method to include client_id, client_secret in the request body
func = partial(
oauth2_client_credentials._oauth.fetch_token,
# NOTE: Overwrite the _oauth.fetch_token method to include client_id in the request body
oauth2_client_credentials._func_retrieve_token = partial(
oauth2_client_credentials._func_retrieve_token,
include_client_id=True,
token_url=authorization_server,
client_secret=client_secret,
audience=audience,
)
oauth2_client_credentials._func_retrieve_token = func

call_credentials: grpc.CallCredentials = grpc.metadata_call_credentials(oauth2_client_credentials)
# channel_credentials: grpc.ChannelCredentials = channel_credentials or grpc.ssl_channel_credentials()
composite_credentials: grpc.ChannelCredentials = grpc.composite_channel_credentials(
channel_credentials, call_credentials
)

grpc_hostname = get_camunda_cloud_hostname(cluster_id, region)
if grpc_hostname:
grpc_address = f"{grpc_hostname}:443"
else:
grpc_address = get_zeebe_address(None)

channel: grpc.aio.Channel = grpc.aio.secure_channel(
target=grpc_address, credentials=composite_credentials, options=get_channel_options(channel_options)
)
Expand Down
4 changes: 2 additions & 2 deletions pyzeebe/channel/secure_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import grpc

from pyzeebe.channel.channel_options import get_channel_options
from pyzeebe.channel.utils import create_address
from pyzeebe.channel.utils import get_zeebe_address
from pyzeebe.types import ChannelArgumentType


Expand All @@ -26,7 +26,7 @@ def create_secure_channel(
Returns:
grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway.
"""
grpc_address = create_address(grpc_address=grpc_address)
grpc_address = get_zeebe_address(grpc_address=grpc_address)
credentials = channel_credentials or grpc.ssl_channel_credentials()
return grpc.aio.secure_channel(
target=grpc_address, credentials=credentials, options=get_channel_options(channel_options)
Expand Down
151 changes: 147 additions & 4 deletions pyzeebe/channel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,152 @@
DEFAULT_ZEEBE_ADDRESS = "localhost:26500"


def create_address(
grpc_address: Optional[str] = None,
) -> str:
def get_zeebe_address(grpc_address: Optional[str] = None) -> str:
"""
Args:
grpc_address (str, optional): zeebe grpc server address.
Returns:
str: The zeebe grpc server address.
Default: Value from ZEEBE_ADDRESS environment variable
or "{CAMUNDA_CLUSTER_ID}.{CAMUNDA_CLUSTER_REGION}.zeebe.camunda.io"
or "localhost:26500"
"""

if grpc_address:
return grpc_address
return os.getenv("ZEEBE_ADDRESS", DEFAULT_ZEEBE_ADDRESS)

camunda_cloud_address = None
camunda_cloud_hostname = get_camunda_cloud_hostname(None, None)

if camunda_cloud_hostname:
camunda_cloud_address = f"{camunda_cloud_hostname}:443"

return os.getenv("ZEEBE_ADDRESS", camunda_cloud_address or DEFAULT_ZEEBE_ADDRESS)


def get_camunda_oauth_url(oauth_url: Optional[str]) -> Optional[str]:
"""
Args:
oauth_url (str, optional): The camunda platform authorization server url provided as parameter.
Returns:
str: The camunda platform authorization server url.
Default: Value from CAMUNDA_OAUTH_URL or ZEEBE_AUTHORIZATION_SERVER_URL environment variable
"""
return oauth_url or os.getenv("CAMUNDA_OAUTH_URL", os.getenv("ZEEBE_AUTHORIZATION_SERVER_URL"))


def get_camunda_client_id(client_id: Optional[str]) -> str:
"""
Args:
client_id (str, optional): The client id provided as parameter.
Returns:
str: The client id.
Default: Value from CAMUNDA_CLIENT_ID or ZEEBE_CLIENT_ID environment variable
"""
ret = client_id or os.getenv("CAMUNDA_CLIENT_ID", os.getenv("ZEEBE_CLIENT_ID"))

if not ret:
raise ValueError(
"parameter client_id or one of the environment variables CAMUNDA_CLIENT_ID or ZEEBE_CLIENT_ID must be provided!"
)

return ret


def get_camunda_client_secret(client_secret: Optional[str]) -> str:
"""
Args:
client_secret (str, optional): The client secret provided as parameter.
Returns:
str: The client secret.
Default: Value from CAMUNDA_CLIENT_SECRET or ZEEBE_CLIENT_SECRET environment variable
"""
ret = client_secret or os.getenv("CAMUNDA_CLIENT_SECRET", os.getenv("ZEEBE_CLIENT_SECRET"))

if not ret:
raise ValueError(
"parameter client_secret or one of the environment variables CAMUNDA_CLIENT_SECRET or ZEEBE_CLIENT_SECRET must be provided!"
)

return ret


def get_camunda_cloud_cluster_id(cluster_id: Optional[str]) -> Optional[str]:
"""
Args:
cluster_id (str, optional): The camunda cluster id provided as parameter.
Returns:
str: The camunda cluster id.
Default: Value from CAMUNDA_CLUSTER_ID environment variable
"""

return cluster_id or os.getenv("CAMUNDA_CLUSTER_ID")


def get_camunda_cloud_cluster_region(cluster_region: Optional[str]) -> str:
"""
Args:
cluster_region (str, optional): The camunda cluster region provided as parameter.
Returns:
str: The camunda cluster region.
Default: Value from CAMUNDA_CLUSTER_REGION environment variable or 'bru-2'
"""

return cluster_region or os.getenv("CAMUNDA_CLUSTER_REGION") or "bru-2"


def get_camunda_token_audience(token_audience: Optional[str]) -> Optional[str]:
"""
Args:
token_audience (str, optional): The token audience provided as parameter.
Returns:
str: The token audience.
Default: Value from CAMUNDA_TOKEN_AUDIENCE
or ZEEBE_TOKEN_AUDIENCE environment variable
or camunda cloud token audience if camunda cluster_id is available
"""

return (
token_audience
or os.getenv("CAMUNDA_TOKEN_AUDIENCE", os.getenv("ZEEBE_TOKEN_AUDIENCE"))
or get_camunda_cloud_hostname(None, None)
)


def get_camunda_cloud_hostname(cluster_id: Optional[str], cluster_region: Optional[str]) -> Optional[str]:
"""
Args:
cluster_id (str, optional): The camunda cluster id provided as parameter.
cluster_region (str, optional): The camunda cluster region provided as parameter.
Returns:
str: The token audience for camunda cloud or none if cluster_id or cluster_region is not provided.
"""

cluster_id = get_camunda_cloud_cluster_id(cluster_id)
cluster_region = get_camunda_cloud_cluster_region(cluster_region)

if (not cluster_id) or (not cluster_region):
return None

return f"{cluster_id}.{cluster_region}.zeebe.camunda.io"


def get_camunda_credentials_scopes(scopes: Optional[str]) -> Optional[str]:
"""
Args:
scopes (str, optional): The camunda credentials scopes as single string.
Returns:
str: The camunda cedential scopes string.
Default: Value from CAMUNDA_CREDENTIALS_SCOPES
"""

return scopes or os.getenv("CAMUNDA_CREDENTIALS_SCOPES")
Loading

0 comments on commit 58ae243

Please sign in to comment.