Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ select = [

[tool.ruff.lint.pylint]
allow-magic-value-types = ["int", "str"]
max-args = 12
max-args = 14
max-branches = 30
max-returns = 8
max-statements = 72
2 changes: 1 addition & 1 deletion src/britive/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '4.4.0'
__version__ = '4.5.0rc1'
23 changes: 20 additions & 3 deletions src/britive/access_broker/profiles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ def __init__(self, britive) -> None:
self.permissions = Permissions(britive)
self.policies = Policies(britive)

def create(self, name: str, description: str = '', expiration_duration: int = 900000) -> dict:
def create(
self, name: str, description: str = '', expiration_duration: int = 900000, impersonation: bool = False
) -> dict:
"""
Create a new profile.

:param name: Name of the profile.
:param description: Description of the profile.
:param expiration_duration: Expiration duration of the profile.
:param impersonation: Allow impersonation of the profile.
:return: Created profile.
"""

params = {'name': name, 'description': description, 'expirationDuration': expiration_duration}
params = {
'name': name,
'delegationEnabled': impersonation,
'description': description,
'expirationDuration': expiration_duration,
}

return self.britive.post(f'{self.base_url}', json=params)

Expand All @@ -46,7 +54,12 @@ def list(self) -> list:
return self.britive.get(self.base_url, params={})

def update(
self, profile_id: str, name: str = None, description: str = None, expiration_duration: int = None
self,
profile_id: str,
name: str = None,
description: str = None,
expiration_duration: int = None,
impersonation: bool = None,
) -> dict:
"""
Update a profile.
Expand All @@ -55,6 +68,7 @@ def update(
:param name: Name of the profile.
:param description: Description of the profile.
:param expiration_duration: Expiration duration of the profile.
:param impersonation: Allow impersonation of the profile.
:return: Updated profile.
"""

Expand All @@ -66,6 +80,9 @@ def update(
params['description'] = description
if expiration_duration:
params['expirationDuration'] = expiration_duration
params['delegationEnabled'] = (
self.get(profile_id=profile_id).get('delegationEnabled') if impersonation is None else impersonation
)

return self.britive.patch(f'{self.base_url}/{profile_id}', json=params)

Expand Down
25 changes: 14 additions & 11 deletions src/britive/application_management/profiles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from .session_attributes import SessionAttributes

creation_defaults = {
'delegationEnabled': False,
'description': '',
'destinationUrl': '',
'excludeAdminFromAdminRelatedCommunication': 'DEFAULT',
'expirationDuration': 3600000,
'extensionDuration': 1800000,
'notificationPriorToExpiration': 300000,
'extendable': False,
'extensionDuration': 1800000,
'extensionLimit': '1',
'notificationPriorToExpiration': 300000,
'status': 'active',
'destinationUrl': '',
'useDefaultAppUrl': True,
'description': '',
}

update_fields_to_keep: list = list(creation_defaults)
Expand All @@ -42,24 +44,25 @@ def create(self, application_id: str, name: str, **kwargs) -> dict:
:param kwargs: A key/value mapping consisting of the following fields. If any/all are omitted default values
will be used. The keys and default values are provided below.

- delegationEnabled: False
- description: ''
- destinationUrl: ''
- excludeAdminFromAdminRelatedCommunication: 'DEFAULT'
- expirationDuration: 3600000
- extensionDuration: 1800000
- notificationPriorToExpiration: 300000
- extendable: False
- extensionDuration: 1800000
- extensionLimit: '1'
- status: 'active'
- destinationUrl: ''
- useDefaultAppUrl: True
- description: ''
- notificationPriorToExpiration: 300000
- scope: if not provided, no scopes will be applied. If provided it must follow the
format listed below.

[
{
'type': 'EnvironmentGroup'|'Environment',
'value':'ID'
},
]
- status: 'active'
- useDefaultAppUrl: True

:return: Details of the newly created profile.
"""
Expand Down
32 changes: 18 additions & 14 deletions src/britive/britive.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,30 +181,30 @@ def features(self) -> dict:
def banner(self) -> dict:
return self.get(f'{self.base_url}/banner')

def get(self, url, params=None) -> dict:
def get(self, url, params: dict = None, headers: dict = None) -> dict:
"""Internal use only."""

return self.__request('get', url, params=params)
return self.__request('get', url, params=params, headers=headers)

def post(self, url, params=None, data=None, json=None) -> dict:
def post(self, url, params: dict = None, data: dict = None, json: dict = None, headers: dict = None) -> dict:
"""Internal use only."""

return self.__request('post', url, params=params, data=data, json=json)
return self.__request('post', url, params=params, data=data, json=json, headers=headers)

def patch(self, url, params=None, data=None, json=None) -> dict:
def patch(self, url, params: dict = None, data: dict = None, json: dict = None, headers: dict = None) -> dict:
"""Internal use only."""

return self.__request('patch', url, params=params, data=data, json=json)
return self.__request('patch', url, params=params, data=data, json=json, headers=headers)

def put(self, url, params=None, data=None, json=None) -> dict:
def put(self, url, params: dict = None, data: dict = None, json: dict = None, headers: dict = None) -> dict:
"""Internal use only."""

return self.__request('put', url, params=params, data=data, json=json)
return self.__request('put', url, params=params, data=data, json=json, headers=headers)

def delete(self, url, params=None, data=None, json=None) -> dict:
def delete(self, url, params: dict = None, data: dict = None, json: dict = None, headers: dict = None) -> dict:
"""Internal use only."""

return self.__request('delete', url, params=params, data=data, json=json)
return self.__request('delete', url, params=params, data=data, json=json, headers=headers)

# note - this method could be iffy in the future if the app changes the way it handles
# file uploads. As of 2022-01-26 it is working fine with the "Upload SAML Metadata" action
Expand All @@ -223,11 +223,13 @@ def post_upload(self, url, params=None, files=None) -> dict:
response = self.session.post(url, params=params, files=files, headers={'Content-Type': None})
return handle_response(response)

def __request_with_exponential_backoff_and_retry(self, method, url, params, data, json) -> dict:
def __request_with_exponential_backoff_and_retry(self, method, url, params, data, json, headers) -> dict:
num_retries = 0

while num_retries <= self.retry_max_times:
response = self.session.request(method, url, params=params, data=data, json=json)
response = self.session.request(
method, url, params=params, data=data, json=json, headers={**self.session.headers, **headers}
)

# handle the use case of a tenant being in maintenance mode
# which means we should break out of this loop early and
Expand All @@ -244,15 +246,17 @@ def __request_with_exponential_backoff_and_retry(self, method, url, params, data

return response

def __request(self, method, url, params=None, data=None, json=None) -> dict:
def __request(self, method, url, params=None, data=None, json=None, headers=None) -> dict:
return_data = []
_pagination_type = None

if params is None:
params = {}
if headers is None:
headers = {}

while True:
response = self.__request_with_exponential_backoff_and_retry(method, url, params, data, json)
response = self.__request_with_exponential_backoff_and_retry(method, url, params, data, json, headers)
if response_has_no_content(response):
return None

Expand Down
4 changes: 2 additions & 2 deletions src/britive/helpers/methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ def __init__(self, britive) -> None:
self.britive = britive

def get_profile_and_environment_ids_given_names(
self, profile_name: str, environment_name: str, application_name: str = None
self, profile_name: str, environment_name: str, application_name: str = None, headers: dict = None
) -> dict:
ids = None
environment_found = False
profile_found = False
for app in self.britive.get(f'{self.britive.base_url}/access'):
for app in self.britive.get(f'{self.britive.base_url}/access', headers=headers):
if application_name and app['appName'].lower() != application_name.lower():
continue
if not (
Expand Down
5 changes: 3 additions & 2 deletions src/britive/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ def source_federation_token(provider: str, tenant: Optional[str] = None, duratio
* Azure System Assigned Managed Identities (azuresmi)
* Azure User Assigned Managed Identities (azureumi)
* Bitbucket Pipelines (bitbucket)
* GCP
* Github Actions (github)
* Gitlab (gitlab)
* spacelift.io (spacelift)

Any other OIDC federation provider can be used and tokens can be provided to this class for authentication
to a Britive tenant. Details of how to construct these tokens can be found at https://docs.britive.com.

:param provider: The name of the federation provider. Valid options are `aws`, `github`, `bitbucket`,
`azuresmi`, `azureumi`, `spacelift`, and `gitlab`.
:param provider: The name of the federation provider. Valid options are `aws`, `azuresmi`, `azureumi`, `bitbucket`,
`gcp`, `github`, `gitlab`, and `spacelift`.

For the AWS provider it is possible to provide a profile via value `aws-profile`. If no profile is provided
then the boto3 `Session.get_credentials()` method will be used to obtain AWS credentials, which follows
Expand Down
1 change: 1 addition & 0 deletions src/britive/identity_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

class IdentityManagement:
def __init__(self, britive) -> None:
self.ai_identities = ServiceIdentities(britive, identity_type='AIIdentity')
self.identity_attributes = IdentityAttributes(britive)
self.identity_providers = IdentityProviders(britive)
self.service_identities = ServiceIdentities(britive)
Expand Down
11 changes: 6 additions & 5 deletions src/britive/identity_management/service_identities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@


class ServiceIdentities:
def __init__(self, britive) -> None:
def __init__(self, britive, identity_type: str = 'ServiceIdentity') -> None:
self.britive = britive
self.base_url = f'{self.britive.base_url}/users'
self.custom_attributes = CustomAttributes(britive)
self.identity_type = identity_type

def list(self, filter_expression: str = None, include_tags: bool = False) -> list:
"""
Expand All @@ -19,7 +20,7 @@ def list(self, filter_expression: str = None, include_tags: bool = False) -> lis
:return: List of service identity records
"""

params = {'type': 'ServiceIdentity', 'page': 0, 'size': 100}
params = {'type': self.identity_type, 'page': 0, 'size': 100}
if filter_expression:
params['filter'] = filter_expression
if include_tags:
Expand All @@ -35,7 +36,7 @@ def get(self, service_identity_id: str) -> dict:
:return: Details of the specified user.
"""

params = {'type': 'ServiceIdentity'}
params = {'type': self.identity_type}
return self.britive.get(f'{self.base_url}/{service_identity_id}', params=params)

def get_by_name(self, name: str) -> list:
Expand Down Expand Up @@ -70,7 +71,7 @@ def search(self, search_string: str) -> list:
:return: List of user records
"""

params = {'type': 'ServiceIdentity', 'page': 0, 'size': 100, 'searchText': search_string}
params = {'type': self.identity_type, 'page': 0, 'size': 100, 'searchText': search_string}

return self.britive.get(self.base_url, params)

Expand All @@ -87,7 +88,7 @@ def create(self, **kwargs) -> dict:

required_fields = ['name']

kwargs['type'] = 'ServiceIdentity'
kwargs['type'] = self.identity_type
if 'status' not in kwargs:
kwargs['status'] = 'active'

Expand Down
Loading
Loading