diff --git a/CHANGELOG.md b/CHANGELOG.md index f28a683..7e10547 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,29 @@ > As of v1.4.0, release candidates will be published in an effort to get new features out faster while still allowing > time for full QA testing before moving the release candidate to a full release. +## v2.1.0-rc.0 [2025-01-27] + +__What's New:__ + +* `pybritive-aws-cred-process` can now prompt users for `otp` or `justification` when needed. +* `my_resource` profile checkouts can now specify a `response_template` by appending `/{template name}` to the profile. + +__Enhancements:__ + +* Added ITSM `--ticket-type` `--ticket-id` options. + +__Bug Fixes:__ + +* None + +__Dependencies:__ + +* `britive~=4.0` + +__Other:__ + +* Python 3.8 is EOL, so support is dropped. + ## v2.0.1 [2025-01-17] __What's New:__ diff --git a/LICENSE b/LICENSE index dd7bce9..bb1f706 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2022 Britive, Inc +Copyright (c) 2025 Britive, Inc Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/docs/index.md b/docs/index.md index f99faad..7dc764a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -6,7 +6,7 @@ PyBritive is intended to be used as a CLI application for communicating with the ## Requirements -* Python 3.8 or higher +* Python 3.9 or higher * Active Britive tenant (or nothing is really going to work) ## Installation diff --git a/mkdocs.yml b/mkdocs.yml index 4b6d82e..8310c14 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,12 +7,12 @@ nav: - GitHub Repo: https://github.com/britive/python-cli theme: readthedocs repo_url: https://github.com/britive/python-cli -edit_uri: '' -copyright: 2024 Britive, Inc. +edit_uri: "" +copyright: 2025 Britive, Inc. markdown_extensions: - attr_list - mkdocs-click - admonition watch: - src/pybritive/commands/ - - src/pybritive/ \ No newline at end of file + - src/pybritive/ diff --git a/pyproject.toml b/pyproject.toml index 98add40..f80e04a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,19 +14,19 @@ classifiers = [ "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", "Programming Language :: Python", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Topic :: Internet", "Topic :: Security", "Topic :: Utilities", ] license = {file = "LICENSE"} -requires-python = ">= 3.8" +requires-python = ">= 3.9" dependencies = [ - "britive~=3.1", + "britive~=4.0", "click>=8.1.7", "colored", "cryptography", @@ -92,7 +92,7 @@ select = [ [tool.ruff.lint.pylint] allow-magic-value-types = ["int", "str"] -max-args = 16 +max-args = 18 max-branches = 30 max-returns = 8 max-statements = 72 diff --git a/requirements.txt b/requirements.txt index c470e57..ee9965e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ beautifulsoup4 boto3 -britive>=3.0.0 +britive~=4.0 certifi charset-normalizer click>=8.1.7 diff --git a/src/pybritive/__init__.py b/src/pybritive/__init__.py index 3f39079..325ce53 100644 --- a/src/pybritive/__init__.py +++ b/src/pybritive/__init__.py @@ -1 +1 @@ -__version__ = '2.0.1' +__version__ = '2.1.0-rc.0' diff --git a/src/pybritive/britive_cli.py b/src/pybritive/britive_cli.py index fc13371..271cbc1 100644 --- a/src/pybritive/britive_cli.py +++ b/src/pybritive/britive_cli.py @@ -8,6 +8,7 @@ import uuid from datetime import datetime, timedelta, timezone from pathlib import Path +from typing import Optional import click import jmespath @@ -15,6 +16,8 @@ import yaml from britive import exceptions from britive.britive import Britive +from britive.helpers.utils import parse_tenant +from colored import Fore, Style from jwt.exceptions import PyJWTError from tabulate import tabulate @@ -25,11 +28,6 @@ from .helpers.credentials import EncryptedFileCredentialManager, FileCredentialManager from .helpers.split import profile_split -try: - from colored import Fore, Style -except ImportError: # colored in < python3.9 - from colored import fore, style - default_table_format = 'fancy_grid' debug_enabled = os.getenv('PYBRITIVE_DEBUG') default_browser = os.getenv('PYBRITIVE_BROWSER') @@ -38,11 +36,11 @@ class BritiveCli: def __init__( self, - tenant_name: str = None, - token: str = None, + tenant_name: Optional[str] = None, + token: Optional[str] = None, silent: bool = False, - passphrase: str = None, - federation_provider: str = None, + passphrase: Optional[str] = None, + federation_provider: Optional[str] = None, from_helper_console_script: bool = False, ): self.silent = silent @@ -124,7 +122,7 @@ def login(self, explicit: bool = False, browser: str = default_browser): self.b.my_access.whoami() # this is what may cause UnauthorizedRequest except exceptions.UnauthorizedRequest as e: raise click.ClickException('Invalid API token provided.') from e - except exceptions.InvalidRequest as e: + except exceptions.generic.BritiveGenericException as e: if '400 - e1000 - bad request' in str(e).lower(): # this is for SCIM token pass else: @@ -148,7 +146,7 @@ def login(self, explicit: bool = False, browser: str = default_browser): except exceptions.UnauthorizedRequest as e: if '401 - e0000' in str(e).lower(): self.debug(f'attempt {counter} of 3 - login failed') - self.debug(f'login error message was {str(e)}') + self.debug(f'login error message was {e!s}') # we know the token is invalid since we got that API response # so we don't need to actually logout, just clear the token from @@ -178,16 +176,10 @@ def _display_banner(self): if banner := self.b.banner(): banner_changed = Cache().save_banner(tenant=self.tenant_name, banner=banner) msg_type = banner.get('messageType', 'UNKNOWN') - try: - color = {'caution': Style.BOLD + Fore.red, 'warning': Style.BOLD + Fore.yellow}.get( - msg_type.lower(), Style.BOLD + Fore.blue - ) - style_reset = Style.reset - except NameError: # colored in < python3.9 - color = {'caution': style.BOLD + fore.RED, 'warning': style.BOLD + fore.YELLOW}.get( - msg_type.lower(), style.BOLD + fore.BLUE - ) - style_reset = style.RESET + color = {'caution': Style.BOLD + Fore.red, 'warning': Style.BOLD + Fore.yellow}.get( + msg_type.lower(), Style.BOLD + Fore.blue + ) + style_reset = Style.reset if banner_changed: self.print(f'{color}*** {msg_type}: {banner.get("message", "")} ***{style_reset}') @@ -245,7 +237,7 @@ def logout(self): if self._is_saml_user(token): params['type'] = 'sso' - b.delete(f'https://{Britive.parse_tenant(self.tenant_name)}/api/auth', params=params) + b.delete(f'https://{parse_tenant(self.tenant_name)}/api/auth', params=params) self._cleanup_credentials() def debug(self, data: object, ignore_silent: bool = False): @@ -330,7 +322,7 @@ def list_secrets(self): def list_approvals(self): self.login() approvals = [] - for approval in self.b.my_access.list_approvals(): + for approval in self.b.my_approvals.list(): approval.pop('resource', None) approval.pop('consumer', None) approval.pop('timeToApprove', None) @@ -360,7 +352,7 @@ def list_resources(self): found_resource_names.append(name) self.print(resources, ignore_silent=True) - def list_profiles(self, checked_out: bool = False, profile_type: str = None): + def list_profiles(self, checked_out: bool = False, profile_type: Optional[str] = None): self.login() self._set_available_profiles(profile_type=profile_type) data = [] @@ -466,7 +458,7 @@ def list_environments(self): data.append(row) self.print(data, ignore_silent=True) - def _set_available_profiles(self, from_cache_command=False, profile_type: str = None): + def _set_available_profiles(self, from_cache_command=False, profile_type: Optional[str] = None): if not self.available_profiles: data = [] if not profile_type or profile_type == 'my-access': @@ -549,7 +541,7 @@ def construct_kube_config(self, from_cache_command=False): profiles=profiles, config=self.config, username=self.b.my_access.whoami()['username'], cli=self ) except Exception as e: # do NOT fail the CLI invocation because of this - self.print(f'error auto-generating the Britive managed kube config file: {str(e)}') + self.print(f'error auto-generating the Britive managed kube config file: {e!s}') def _get_app_type(self, application_id): self._set_available_profiles() @@ -622,7 +614,7 @@ def __get_cloud_credential_printer( def _resource_checkin(self, profile): resource_name, profile_name = self._split_resource_profile_into_parts(profile=profile) self.login() - self.b.my_resources.checkin_by_name(profile_name=profile_name, resource_name=resource_name) + self.b.my_resources.checkin_by_name(profile_name=profile_name[0], resource_name=resource_name) def _access_checkin(self, profile, console): self.login() @@ -667,7 +659,20 @@ def checkin(self, profile, console, profile_type: str = 'my-access'): else: self._access_checkin(profile=profile, console=console) - def _checkout(self, profile_name, env_name, app_name, programmatic, blocktime, maxpolltime, justification, otp): + def _checkout( + self, + app_name, + blocktime, + env_name, + justification, + maxpolltime, + otp, + profile_name, + programmatic, + ticket_id, + ticket_type, + mode=None, + ): try: self.login() @@ -676,17 +681,21 @@ def _checkout(self, profile_name, env_name, app_name, programmatic, blocktime, m ) return self.b.my_access.checkout( - profile_id=ids['profile_id'], environment_id=ids['environment_id'], - programmatic=programmatic, include_credentials=True, - wait_time=blocktime, - max_wait_time=maxpolltime, justification=justification, + max_wait_time=maxpolltime, otp=otp, + profile_id=ids['profile_id'], + programmatic=programmatic, progress_func=self.checkout_callback_printer, # callback will handle silent, isatty, etc. + ticket_id=ticket_id, + ticket_type=ticket_type, + wait_time=blocktime, ) except exceptions.ApprovalRequiredButNoJustificationProvided as e: + if mode == 'awscredentialprocess': + raise e raise click.ClickException('approval required and no justification provided.') from e except ValueError as e: raise click.BadParameter(str(e)) from e @@ -696,7 +705,17 @@ def _checkout(self, profile_name, env_name, app_name, programmatic, blocktime, m # this is a cli only feature - not available in the sdk self.print('no programmatic access available - checking out console access instead') return self._checkout( - profile_name, env_name, app_name, False, blocktime, maxpolltime, justification, otp + app_name, + blocktime, + env_name, + justification, + maxpolltime, + otp, + profile_name, + False, + ticket_id, + ticket_type, + mode, ) raise e @@ -711,16 +730,15 @@ def _split_profile_into_parts(self, profile): parts = [parts[0], parts[0], parts[1]] if len(parts) != 3: raise click.ClickException('Provided profile string does not have the required parts.') - parts_dict = {'app': parts[0], 'env': parts[1], 'profile': parts[2]} - return parts_dict + return {'app': parts[0], 'env': parts[1], 'profile': parts[2]} def _extend_checkout(self, profile, console): self.login() parts = self._split_profile_into_parts(profile) self.b.my_access.extend_checkout_by_name( - profile_name=parts['profile'], - environment_name=parts['env'], application_name=parts['app'], + environment_name=parts['env'], + profile_name=parts['profile'], programmatic=not console, ) @@ -732,7 +750,9 @@ def _split_resource_profile_into_parts(self, profile): real_profile_name = self.config.profile_aliases.get(profile.lower(), profile).lower() if real_profile_name.startswith(self.resource_profile_prefix): real_profile_name = real_profile_name.replace(self.resource_profile_prefix, '') - return real_profile_name.split('/') + resource_name, profile_name = real_profile_name.split('/', maxsplit=1) + profile_name = profile_name.split('/') + return resource_name, profile_name def _profile_is_for_resource(self, profile, profile_type): if profile_type == 'my-resources': @@ -740,17 +760,20 @@ def _profile_is_for_resource(self, profile, profile_type): real_profile_name = self.config.profile_aliases.get(profile.lower(), profile).lower() return real_profile_name.startswith(f'{self.resource_profile_prefix}') - def _resource_checkout(self, blocktime, justification, maxpolltime, profile): + def _resource_checkout(self, blocktime, justification, maxpolltime, profile, ticket_id, ticket_type): self.login() resource_name, profile_name = self._split_resource_profile_into_parts(profile=profile) response = self.b.my_resources.checkout_by_name( - resource_name=resource_name, - profile_name=profile_name, include_credentials=True, justification=justification, - wait_time=blocktime, max_wait_time=maxpolltime, + profile_name=profile_name[0], progress_func=self.checkout_callback_printer, # callback will handle silent, isatty, etc. + resource_name=resource_name, + response_template=profile_name[1] if len(profile_name) > 1 else None, + ticket_id=ticket_id, + ticket_type=ticket_type, + wait_time=blocktime, ) return response['credentials'] @@ -759,20 +782,22 @@ def _access_checkout( alias, blocktime, console, + extend, + force_renew, justification, - otp, - mode, maxpolltime, - profile, + mode, + otp, passphrase, - force_renew, + profile, + ticket_id, + ticket_type, verbose, - extend, ): # handle this special use case and quit if extend: self._extend_checkout(profile, console) - return + return None credentials = None app_type = None @@ -820,14 +845,17 @@ def _access_checkout( # create this params once so we can use it multiple places params = { - 'profile_name': parts['profile'], - 'env_name': parts['env'], 'app_name': parts['app'], - 'programmatic': not console, 'blocktime': blocktime, - 'maxpolltime': maxpolltime, + 'env_name': parts['env'], 'justification': justification, + 'maxpolltime': maxpolltime, + 'mode': mode, 'otp': otp, + 'profile_name': parts['profile'], + 'programmatic': not console, + 'ticket_id': ticket_id, + 'ticket_type': ticket_type, } if not cached_credentials_found: # nothing found in cache, cache is expired, or not a cachable mode @@ -857,41 +885,50 @@ def _access_checkout( def checkout( self, alias, + aws_credentials_file, blocktime, console, + extend, + force_renew, + gcloud_key_file, justification, - otp, - mode, maxpolltime, - profile, + mode, + otp, passphrase, - force_renew, - aws_credentials_file, - gcloud_key_file, + profile, verbose, - extend, + ticket_id: Optional[str] = None, + ticket_type: Optional[str] = None, profile_type: str = 'my-access', ): if self._profile_is_for_resource(profile=profile, profile_type=profile_type): app_type = 'Resources' k8s_processor = None credentials = self._resource_checkout( - blocktime=blocktime, justification=justification, maxpolltime=maxpolltime, profile=profile + blocktime=blocktime, + justification=justification, + maxpolltime=maxpolltime, + profile=profile, + ticket_id=ticket_id, + ticket_type=ticket_type, ) else: app_type, credentials, k8s_processor = self._access_checkout( alias=alias, blocktime=blocktime, console=console, + extend=extend, + force_renew=force_renew, justification=justification, - otp=otp, - mode=mode, maxpolltime=maxpolltime, - profile=profile, + mode=mode, + otp=otp, passphrase=passphrase, - force_renew=force_renew, + profile=profile, + ticket_id=ticket_id, + ticket_type=ticket_type, verbose=verbose, - extend=extend, ) # do this down here, so we know that the profile is valid and a checkout was successful @@ -1017,7 +1054,7 @@ def clear_kubeconfig(): def configure_update(self, section, field, value): self.config.update(section=section, field=field, value=value) - def request_submit(self, profile, justification): + def request_submit(self, profile, justification, ticket_id, ticket_type): self._validate_justification(justification) self.login() parts = self._split_profile_into_parts(profile) @@ -1027,10 +1064,12 @@ def request_submit(self, profile, justification): ) self.b.my_access.request_approval( - profile_id=ids['profile_id'], - environment_id=ids['environment_id'], block_until_disposition=False, + environment_id=ids['environment_id'], justification=justification, + profile_id=ids['profile_id'], + ticket_id=ticket_id, + ticket_type=ticket_type, ) def request_withdraw(self, profile): @@ -1048,7 +1087,7 @@ def build_gcloud_key_file_for_gcloudauthexec(profile: str): profile_hash = hashlib.sha256(string=profile.encode('utf-8')).hexdigest() return f'gcloudauthexec-{profile_hash}.json' - def clear_gcloud_auth_key_files(self, profile: str = None): + def clear_gcloud_auth_key_files(self, profile: Optional[str] = None): if profile: # we want to attempt a gcloud cli command import subprocess # lazy load as this will not always be needed @@ -1078,7 +1117,7 @@ def clear_gcloud_auth_key_files(self, profile: str = None): self.debug(' '.join(commands)) subprocess.run(commands, check=True) except Exception as e: - self.print(f'could not reset gcloud CLI active account due to issue: {str(e)}') + self.print(f'could not reset gcloud CLI active account due to issue: {e!s}') self.config.clear_gcloud_auth_key_files(profile=profile) def api(self, method, parameters: dict, query=None): @@ -1178,7 +1217,7 @@ def _convert_names_to_ids(self, profile_name: str, environment_name: str, applic raise click.ClickException('multiple matching profiles found - cannot determine which profile to use') # and now we can check to ensure we have only 1 environment - found_profile_id = list(found_profiles)[0] + found_profile_id = next(iter(found_profiles)) possible_environments = found_profiles[found_profile_id] if len(possible_environments) == 0: raise click.ClickException('no profile found with the provided application, environment, and profile names') @@ -1265,7 +1304,7 @@ def _ssh_generate_key(self, username, hostname, key_source): if key_source == 'ssh-agent': # cleanup any old ssh keys that were randomly generated now = int(time.time()) - for key in glob.glob(f'{str(ssh_dir)}/random-*'): + for key in glob.glob(f'{ssh_dir!s}/random-*'): file = key.split('/')[-1].split('.')[0] expiration = int(file.split('-')[2]) if expiration < now: @@ -1334,7 +1373,7 @@ def ssh_aws_openssh_config(self, push_public_key, key_source): if key_source == 'static': ssh_dir = Path(self.config.path).parent.absolute() / 'ssh' - lines.append(f'\tIdentityFile {str(ssh_dir)}/%h.%r.pem') + lines.append(f'\tIdentityFile {ssh_dir!s}/%h.%r.pem') else: line = '\tProxyCommand eval $(pybritive ssh aws ssm-proxy --hostname %h --username %r --port-number %p)' lines.append(line) @@ -1404,9 +1443,9 @@ def request_disposition(self, request_id, decision): self.login() if decision == 'approve': - self.b.my_access.approve_request(request_id=request_id) + self.b.my_approvals.approve_request(request_id=request_id) if decision == 'reject': - self.b.my_access.reject_request(request_id=request_id) + self.b.my_approvals.reject_request(request_id=request_id) def clear_cached_aws_credentials(self, profile): # start with the profile name that was passed in from the command @@ -1502,7 +1541,7 @@ def ssh_gcp_identity_aware_proxy(self, username, hostname, push_public_key, port '--project', project, '--metadata-from-file', - f'ssh-keys={str(key_file)}', + f'ssh-keys={key_file!s}', '--zone', zone, '--verbosity=error', @@ -1538,7 +1577,7 @@ def ssh_gcp_openssh_config(self, push_public_key, key_source): if key_source == 'static': ssh_dir = Path(self.config.path).parent.absolute() / 'ssh' - lines.append(f'\tIdentityFile {str(ssh_dir)}/%h.%r.pem') + lines.append(f'\tIdentityFile {ssh_dir!s}/%h.%r.pem') else: line = ( '\tProxyCommand eval $(pybritive ssh gcp identity-aware-proxy --hostname %h --username %r ' diff --git a/src/pybritive/commands/api.py b/src/pybritive/commands/api.py index 492e8ab..ba1c402 100644 --- a/src/pybritive/commands/api.py +++ b/src/pybritive/commands/api.py @@ -1,17 +1,17 @@ import click from click import Command -from ..completers.api_command import command_api_patch_shell_complete -from ..helpers.api_method_argument_decorator import click_smart_api_method_argument -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.completers.api_command import command_api_patch_shell_complete +from pybritive.helpers.api_method_argument_decorator import click_smart_api_method_argument +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options # this holds all the click version logic to gracefully degrade functionality # depending on the click version command_api_patch_shell_complete(Command) -@click.command(context_settings=dict(ignore_unknown_options=True, allow_extra_args=True)) +@click.command(context_settings={'ignore_unknown_options': True, 'allow_extra_args': True}) @build_britive @britive_options(names='query,output_format,tenant,token,silent,passphrase,federation_provider') @click_smart_api_method_argument # need to gracefully handle older version of click diff --git a/src/pybritive/commands/aws.py b/src/pybritive/commands/aws.py index 55cd6d6..1f5c24c 100644 --- a/src/pybritive/commands/aws.py +++ b/src/pybritive/commands/aws.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/cache.py b/src/pybritive/commands/cache.py index 54ee85f..6f0c190 100644 --- a/src/pybritive/commands/cache.py +++ b/src/pybritive/commands/cache.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/checkin.py b/src/pybritive/commands/checkin.py index 9250f94..d748d1d 100644 --- a/src/pybritive/commands/checkin.py +++ b/src/pybritive/commands/checkin.py @@ -1,8 +1,8 @@ import click -from ..helpers.build_britive import build_britive -from ..helpers.profile_argument_decorator import click_smart_profile_argument -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.helpers.profile_argument_decorator import click_smart_profile_argument +from pybritive.options.britive_options import britive_options @click.command() diff --git a/src/pybritive/commands/checkout.py b/src/pybritive/commands/checkout.py index c270ef2..d994378 100644 --- a/src/pybritive/commands/checkout.py +++ b/src/pybritive/commands/checkout.py @@ -1,14 +1,14 @@ import click -from ..helpers.build_britive import build_britive -from ..helpers.profile_argument_decorator import click_smart_profile_argument -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.helpers.profile_argument_decorator import click_smart_profile_argument +from pybritive.options.britive_options import britive_options @click.command() @build_britive @britive_options( - names='alias,blocktime,console,justification,otp,mode,maxpolltime,silent,force_renew,aws_credentials_file,' + names='alias,blocktime,console,justification,ticket_type,ticket_id,otp,mode,maxpolltime,silent,force_renew,aws_credentials_file,' 'gcloud_key_file,verbose,extend,profile_type,tenant,token,passphrase,federation_provider' ) @click_smart_profile_argument @@ -18,6 +18,8 @@ def checkout( # noqa: PLR0913 blocktime, console, justification, + ticket_type, + ticket_id, otp, mode, maxpolltime, @@ -46,6 +48,8 @@ def checkout( # noqa: PLR0913 blocktime=blocktime, console=console, justification=justification, + ticket_type=ticket_type, + ticket_id=ticket_id, otp=otp, mode=mode, maxpolltime=maxpolltime, diff --git a/src/pybritive/commands/clear.py b/src/pybritive/commands/clear.py index 1de5a00..fd9f89f 100644 --- a/src/pybritive/commands/clear.py +++ b/src/pybritive/commands/clear.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..helpers.profile_argument_decorator import click_smart_profile_argument +from pybritive.helpers.build_britive import build_britive +from pybritive.helpers.profile_argument_decorator import click_smart_profile_argument @click.group() diff --git a/src/pybritive/commands/configure.py b/src/pybritive/commands/configure.py index 93caf72..ba9baaf 100644 --- a/src/pybritive/commands/configure.py +++ b/src/pybritive/commands/configure.py @@ -1,9 +1,9 @@ import click -from ..choices.backend import backend_choices -from ..choices.output_format import output_format_choices -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.choices.backend import backend_choices +from pybritive.choices.output_format import output_format_choices +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/login.py b/src/pybritive/commands/login.py index 6cb9896..4268820 100644 --- a/src/pybritive/commands/login.py +++ b/src/pybritive/commands/login.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.command() diff --git a/src/pybritive/commands/logout.py b/src/pybritive/commands/logout.py index 18b57a1..7490179 100644 --- a/src/pybritive/commands/logout.py +++ b/src/pybritive/commands/logout.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.command() diff --git a/src/pybritive/commands/ls.py b/src/pybritive/commands/ls.py index acd7de9..edc146a 100644 --- a/src/pybritive/commands/ls.py +++ b/src/pybritive/commands/ls.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/request.py b/src/pybritive/commands/request.py index fa5bd90..912ade6 100644 --- a/src/pybritive/commands/request.py +++ b/src/pybritive/commands/request.py @@ -1,8 +1,8 @@ import click -from ..helpers.build_britive import build_britive -from ..helpers.profile_argument_decorator import click_smart_profile_argument -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.helpers.profile_argument_decorator import click_smart_profile_argument +from pybritive.options.britive_options import britive_options @click.group() @@ -13,9 +13,9 @@ def request(): @request.command() @build_britive -@britive_options(names='justification,tenant,token,silent,passphrase,federation_provider') +@britive_options(names='ticket_type,ticket_id,justification,tenant,token,silent,passphrase,federation_provider') @click_smart_profile_argument -def submit(ctx, justification, tenant, token, silent, passphrase, federation_provider, profile): +def submit(ctx, ticket_type, ticket_id, justification, tenant, token, silent, passphrase, federation_provider, profile): """Submit a request to checkout a profile. Only applicable for profiles which require approval. This command will NOT block/wait until the request is @@ -26,7 +26,9 @@ def submit(ctx, justification, tenant, token, silent, passphrase, federation_pro that should be checked out. Format is `application name/environment name/profile name`. """ - ctx.obj.britive.request_submit(profile=profile, justification=justification) + ctx.obj.britive.request_submit( + profile=profile, justification=justification, ticket_id=ticket_id, ticket_type=ticket_type + ) @request.command() diff --git a/src/pybritive/commands/secret.py b/src/pybritive/commands/secret.py index 18adfd3..3e58a35 100644 --- a/src/pybritive/commands/secret.py +++ b/src/pybritive/commands/secret.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/ssh.py b/src/pybritive/commands/ssh.py index 19a97cd..29d95dd 100644 --- a/src/pybritive/commands/ssh.py +++ b/src/pybritive/commands/ssh.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.group() diff --git a/src/pybritive/commands/user.py b/src/pybritive/commands/user.py index 91d3396..24e7770 100644 --- a/src/pybritive/commands/user.py +++ b/src/pybritive/commands/user.py @@ -1,7 +1,7 @@ import click -from ..helpers.build_britive import build_britive -from ..options.britive_options import britive_options +from pybritive.helpers.build_britive import build_britive +from pybritive.options.britive_options import britive_options @click.command() diff --git a/src/pybritive/completers/api_command.py b/src/pybritive/completers/api_command.py index 0e88747..f447bb7 100644 --- a/src/pybritive/completers/api_command.py +++ b/src/pybritive/completers/api_command.py @@ -1,7 +1,6 @@ import contextlib import inspect from importlib.metadata import version -from typing import List from britive.britive import Britive @@ -97,10 +96,10 @@ def command_api_patch_shell_complete(cls): # https://stackoverflow.com/questions/43778914/python3-using-super-in-eq-methods-raises-runtimeerror-super-class __class__ = cls # provide closure cell for super() # noqa: F841 - def shell_complete(self, ctx: Context, incomplete: str) -> List[CompletionItem]: + def shell_complete(self, ctx: Context, incomplete: str) -> list[CompletionItem]: from click.shell_completion import CompletionItem # here since this method will be monkey patched in - results: List[CompletionItem] = [] + results: list[CompletionItem] = [] if incomplete and not incomplete[0].isalnum(): method = ctx.params.get('method') diff --git a/src/pybritive/completers/bash_gte_42.py b/src/pybritive/completers/bash_gte_42.py index cd98fd4..f4cf762 100644 --- a/src/pybritive/completers/bash_gte_42.py +++ b/src/pybritive/completers/bash_gte_42.py @@ -55,7 +55,7 @@ def _check_version(self) -> None: if match is not None: major, minor = match.groups() - if major < '4' or major == '4' and minor < '2': + if major < '4' or (major == '4' and minor < '2'): raise RuntimeError(_('Shell completion is not supported for Bash versions older than 4.2.')) else: raise RuntimeError(_("Couldn't detect Bash version, shell completion is not supported.")) diff --git a/src/pybritive/completers/powershell_completion.py b/src/pybritive/completers/powershell_completion.py index d8e74a7..246f33c 100644 --- a/src/pybritive/completers/powershell_completion.py +++ b/src/pybritive/completers/powershell_completion.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict +from typing import Any from click.parser import split_arg_string from click.shell_completion import CompletionItem, ShellComplete, add_completion_class @@ -69,7 +69,7 @@ def format_completion(self, item: CompletionItem) -> str: value = f'"{value}"' return f'{value}' - def source_vars(self) -> Dict[str, Any]: + def source_vars(self) -> dict[str, Any]: """Vars for formatting :attr:`source_template`. By default this provides ``complete_func``, ``complete_var``, and ``prog_name``. diff --git a/src/pybritive/completers/profile.py b/src/pybritive/completers/profile.py index 392eda5..eac1556 100644 --- a/src/pybritive/completers/profile.py +++ b/src/pybritive/completers/profile.py @@ -1,5 +1,5 @@ -from ..helpers.cache import Cache -from ..helpers.config import ConfigManager +from pybritive.helpers.cache import Cache +from pybritive.helpers.config import ConfigManager def profile_completer(ctx, param, incomplete): diff --git a/src/pybritive/helpers/api_method_argument_decorator.py b/src/pybritive/helpers/api_method_argument_decorator.py index 539c5d0..0c2dbb4 100644 --- a/src/pybritive/helpers/api_method_argument_decorator.py +++ b/src/pybritive/helpers/api_method_argument_decorator.py @@ -2,6 +2,6 @@ def click_smart_api_method_argument(func): - from ..completers.api import api_completer + from pybritive.completers.api import api_completer dec = click.argument('method', shell_complete=api_completer) return dec(func) diff --git a/src/pybritive/helpers/aws_credential_process.py b/src/pybritive/helpers/aws_credential_process.py index 7da363e..389930a 100644 --- a/src/pybritive/helpers/aws_credential_process.py +++ b/src/pybritive/helpers/aws_credential_process.py @@ -1,4 +1,117 @@ -from sys import argv, exit +import contextlib +import io +import os +import sys +from sys import argv + + +def _fallback_input(prompt='', stream=None): + if not stream: + stream = sys.stderr + return _raw_input(prompt, stream) + + +def _linput(prompt='', stream=None): + user_input = None + with contextlib.ExitStack() as stack: + try: + fd = os.open('/dev/tty', os.O_RDWR | os.O_NOCTTY) + tty = io.FileIO(fd, 'w+') + stack.enter_context(tty) + std_input = io.TextIOWrapper(tty) + stack.enter_context(std_input) + if not stream: + stream = std_input + except OSError: + stack.close() + try: + fd = sys.stdin.fileno() + except (AttributeError, ValueError): + fd = None + user_input = _fallback_input(prompt, stream) + std_input = sys.stdin + if not stream: + stream = sys.stderr + + if fd is not None: + try: + old = termios.tcgetattr(fd) + new = old[:] + new[3] |= termios.ECHO + tcsetattr_flags = termios.TCSAFLUSH + if hasattr(termios, 'TCSASOFT'): + tcsetattr_flags |= termios.TCSASOFT + try: + termios.tcsetattr(fd, tcsetattr_flags, new) + user_input = _raw_input(prompt, stream, std_input=std_input) + finally: + termios.tcsetattr(fd, tcsetattr_flags, old) + stream.flush() + except termios.error: + if user_input is not None: + raise + if stream is not std_input: + stack.close() + user_input = _fallback_input(prompt, stream) + + stream.write('\n') + return user_input + + +def _raw_input(prompt='', stream=None, std_input=None): + if not stream: + stream = sys.stderr + if not std_input: + std_input = sys.stdin + prompt = str(prompt) + if prompt: + try: + stream.write(prompt) + except UnicodeEncodeError: + prompt = prompt.encode(stream.encoding, 'replace') + prompt = prompt.decode(stream.encoding) + stream.write(prompt) + stream.flush() + line = std_input.readline() + if not line: + raise EOFError + if line[-1] == '\n': + line = line[:-1] + return line + + +def _winput(prompt='', stream=None): + if sys.stdin is not sys.__stdin__: + return _fallback_input(prompt, stream) + + for c in prompt: + msvcrt.putwch(c) + user_input = '' + while 1: + c = msvcrt.getwche() + if c in ('\r', '\n'): + break + if c == '\003': + raise KeyboardInterrupt + user_input = user_input[:-1] if c == '\x08' else user_input + c + msvcrt.putwch('\r') + msvcrt.putwch('\n') + return user_input + + +try: + import termios + + termios.tcgetattr, termios.tcsetattr # noqa: B018 +except (ImportError, AttributeError): + try: + import msvcrt + except ImportError: + get_input = _fallback_input + else: + get_input = _winput +else: + get_input = _linput def get_args(): @@ -40,7 +153,7 @@ def get_args(): cli_version = version('pybritive') print(f'pybritive: {cli_version} / platform: {platform()} / python: {python_version()}') - exit(0) + raise SystemExit return args @@ -50,7 +163,26 @@ def usage(): f'Usage : {argv[0]} --profile [-t/--tenant, -T/--token, -p/--passphrase, -f/--force-renew, ' f'-F/--federation-provider]' ) - exit(0) + raise SystemExit + + +def perform_checkout(b, args, otp=None, justification=None): + b.checkout( + alias=None, + blocktime=None, + console=False, + justification=justification, + mode='awscredentialprocess', + maxpolltime=None, + profile=args['profile'], + passphrase=args['passphrase'], + force_renew=args['force_renew'], + aws_credentials_file=None, + gcloud_key_file=None, + verbose=None, + extend=False, + otp=otp, + ) def main(): @@ -61,7 +193,7 @@ def main(): creds = None if not args['force_renew']: # if force renew let's defer to that the full package vs. this helper - from .cache import Cache # lazy load + from pybritive.helpers.cache import Cache # lazy load creds = Cache(passphrase=args['passphrase']).get_credentials( profile_name=args['profile'], mode='awscredentialprocess' @@ -81,9 +213,11 @@ def main(): json += f'"Expiration": "{creds["expirationTime"]}",' json += '"Version": 1}' print(json) - raise SystemExit() + raise SystemExit if not creds: - from ..britive_cli import BritiveCli # lazy load for performance purposes + from britive import exceptions + + from pybritive.britive_cli import BritiveCli # lazy load for performance purposes b = BritiveCli( tenant_name=args['tenant'], @@ -94,23 +228,22 @@ def main(): from_helper_console_script=True, ) b.config.get_tenant() # have to load the config here as that work is generally done - b.checkout( - alias=None, - blocktime=None, - console=False, - justification=None, - mode='awscredentialprocess', - maxpolltime=None, - profile=args['profile'], - passphrase=args['passphrase'], - force_renew=args['force_renew'], - aws_credentials_file=None, - gcloud_key_file=None, - verbose=None, - extend=False, - otp=None, - ) - raise SystemExit() + + try: + perform_checkout(b, args) + except exceptions.StepUpAuthRequiredButNotProvided: + perform_checkout(b, args, otp=get_input(prompt='(pybritive) Enter OTP:')) + except ( + exceptions.ApprovalRequiredButNoJustificationProvided, + exceptions.badrequest.MissingJustificationError, + ): + try: + perform_checkout(b, args, justification=get_input(prompt='(pybritive) Enter Justification: ')) + except exceptions.ProfileApprovalMaxBlockTimeExceeded as e: + b.request_withdraw(profile=args['profile']) + raise SystemExit('approval not settled before blocktime exceeded - request withdrawn') from e + + raise SystemExit if __name__ == '__main__': diff --git a/src/pybritive/helpers/build_britive.py b/src/pybritive/helpers/build_britive.py index 2e85d31..20751cf 100644 --- a/src/pybritive/helpers/build_britive.py +++ b/src/pybritive/helpers/build_britive.py @@ -5,7 +5,7 @@ from click import Context from merge_args import merge_args -from ..britive_cli import BritiveCli +from pybritive.britive_cli import BritiveCli @dataclass @@ -16,7 +16,7 @@ class Common: def should_set_output_format(ctx: Context) -> bool: parent_command = ctx.parent.command.name command = ctx.command.name - return not ((parent_command in ['configure', 'clear']) or parent_command == 'cache' and command in ['clear']) + return not ((parent_command in ['configure', 'clear']) or (parent_command == 'cache' and command in ['clear'])) # this wrapper exists to centralize all "common" CLI options (options that exist for all commands) diff --git a/src/pybritive/helpers/cache.py b/src/pybritive/helpers/cache.py index 2ff1434..dad472f 100644 --- a/src/pybritive/helpers/cache.py +++ b/src/pybritive/helpers/cache.py @@ -3,12 +3,13 @@ import os import time from pathlib import Path +from typing import Optional from .encryption import InvalidPassphraseException, StringEncryption class Cache: - def __init__(self, passphrase: str = None): + def __init__(self, passphrase: Optional[str] = None): self.passphrase = passphrase self.string_encryptor = StringEncryption(passphrase=self.passphrase) home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home())) diff --git a/src/pybritive/helpers/cloud_credential_printer.py b/src/pybritive/helpers/cloud_credential_printer.py index 23661bd..f81ce5a 100644 --- a/src/pybritive/helpers/cloud_credential_printer.py +++ b/src/pybritive/helpers/cloud_credential_printer.py @@ -205,7 +205,7 @@ def print_awscredentialprocess(self): class AzureCloudCredentialPrinter(CloudCredentialPrinter): def __init__(self, console, mode, profile, silent, credentials, cli): - key = list(credentials.keys())[0] + key = next(iter(credentials)) if key != 'url': # console url is handled differently than programmatic keys so account for it here credentials = json.loads(credentials[key]) super().__init__('Azure', console, mode, profile, silent, credentials, cli) @@ -245,7 +245,7 @@ def print_azps(self): class GcpCloudCredentialPrinter(CloudCredentialPrinter): def __init__(self, console, mode, profile, silent, credentials, cli, gcloud_key_file): - key = list(credentials.keys())[0] + key = next(iter(credentials)) credentials = json.loads(credentials[key]) if key != 'url' else credentials super().__init__('GCP', console, mode, profile, silent, credentials, cli) self.gcloud_key_file = gcloud_key_file @@ -271,7 +271,7 @@ def print_gcloudauth(self): path.write_text(json.dumps(self.credentials, indent=2), encoding='utf-8') self.cli.print( - f"gcloud auth activate-service-account {self.credentials['client_email']} --key-file {str(path)}", + f"gcloud auth activate-service-account {self.credentials['client_email']} --key-file {path!s}", ignore_silent=True, ) @@ -296,7 +296,7 @@ def print_gcloudauthexec(self): subprocess.run(commands, check=True) except Exception as e: - self.cli.print(f'error running `gcloud auth activate-service-account ...`: {str(e)}') + self.cli.print(f'error running `gcloud auth activate-service-account ...`: {e!s}') class KubernetesCredentialPrinter(CloudCredentialPrinter): @@ -398,10 +398,10 @@ def add_headers(self, request, **kwargs): if command: return command - else: - raise Exception('error: no `oc login` command found') + + raise Exception('error: no `oc login` command found') except Exception as e: - self.cli.print(f'error when attempting to perform oidc auth code grant flow: {str(e)}', ignore_silent=True) + self.cli.print(f'error when attempting to perform oidc auth code grant flow: {e!s}', ignore_silent=True) def print_os(self): if self.mode_modifier == 'oclogin': @@ -412,6 +412,6 @@ def print_os(self): try: subprocess.run(command.split(' '), check=True) except Exception as e: - self.cli.print(f'error running `gcloud auth activate-service-account ...`: {str(e)}') + self.cli.print(f'error running `gcloud auth activate-service-account ...`: {e!s}') else: raise ValueError(f'--mode modifier {self.mode_modifier} for mode {self.mode} not supported') diff --git a/src/pybritive/helpers/config.py b/src/pybritive/helpers/config.py index 25512b5..01c1e33 100644 --- a/src/pybritive/helpers/config.py +++ b/src/pybritive/helpers/config.py @@ -3,14 +3,15 @@ import os import shutil from pathlib import Path +from typing import Optional import click -from britive.britive import Britive +from britive.helpers.utils import parse_tenant -from ..choices.backend import backend_choices -from ..choices.mode import mode_choices -from ..choices.output_format import output_format_choices -from ..helpers.split import profile_split +from pybritive.choices.backend import backend_choices +from pybritive.choices.mode import mode_choices +from pybritive.choices.output_format import output_format_choices +from pybritive.helpers.split import profile_split def extract_tenant(tenant_key): @@ -55,7 +56,7 @@ def coalesce(*arg): class ConfigManager: - def __init__(self, cli: object, tenant_name: str = None): + def __init__(self, cli: object, tenant_name: Optional[str] = None): self.tenant_name = tenant_name self.home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home())) self.base_path = str(Path(self.home) / '.britive') @@ -82,7 +83,7 @@ def clear_gcloud_auth_key_files(self, profile=None): else: # otherwise we can remove all items in the directory and the directory itself shutil.rmtree(str(path), ignore_errors=True) - def get_output_format(self, output_format: str = None): + def get_output_format(self, output_format: Optional[str] = None): return coalesce( output_format, self.get_tenant().get('output_format'), @@ -144,7 +145,7 @@ def get_tenant(self): 'Tenant not provided, no default tenant set, and more than one tenant exists.' ) # nothing given but only 1 tenant so assume that is what should be used - provided_tenant_name = list(self.tenants)[0] + provided_tenant_name = next(iter(self.tenants)) # if we get here then we now have a tenant name we can check to ensure exists if provided_tenant_name not in self.aliases_and_names and not name: @@ -164,7 +165,7 @@ def save(self): with open(str(self.path), 'w', encoding='utf-8') as f: config.write(f, space_around_delimiters=False) - def save_tenant(self, tenant: str, alias: str = None, output_format: str = None): + def save_tenant(self, tenant: str, alias: Optional[str] = None, output_format: Optional[str] = None): self.load() if not alias: alias = tenant @@ -175,7 +176,12 @@ def save_tenant(self, tenant: str, alias: str = None, output_format: str = None) self.config[f'tenant-{alias}']['output_format'] = output_format self.save() - def save_global(self, default_tenant_name: str = None, output_format: str = None, backend: str = None): + def save_global( + self, + default_tenant_name: Optional[str] = None, + output_format: Optional[str] = None, + backend: Optional[str] = None, + ): self.load() if not default_tenant_name and not output_format and not backend: return @@ -194,8 +200,7 @@ def get_profile_aliases(self, reverse_keys: bool = False): aliases = self.config.get('profile-aliases', {}) if reverse_keys: return {v: k for k, v in aliases.items()} - else: - return aliases + return aliases def save_profile_alias(self, alias, profile): self.profile_aliases[alias] = profile @@ -274,10 +279,10 @@ def validate_global(self, section, fields): def validate_profile_aliases(self, section, fields): for field, value in fields.items(): - if len(profile_split(value)) not in [2, 3]: + if not (2 <= len(profile_split(value)) <= 4): error = ( - f'Invalid {section} field {field} value {value} provided. Value must be 2 or 3 parts ' - 'separated by a /' + f'Invalid {section} field {field} value {value} provided.' + ' Value must be between 2 and 4 parts separated by a /' ) self.validation_error_messages.append(error) @@ -300,9 +305,9 @@ def validate_tenant(self, section, fields): self.validation_error_messages.append(f'Invalid {section} field {field} provided.') if field == 'name': try: - Britive.parse_tenant(value) + parse_tenant(value) except Exception as e: - raise click.ClickException(f'Error validating tenant name: {str(e)}') from e + raise click.ClickException(f'Error validating tenant name: {e!s}') from e if field == 'output_format' and value not in output_format_choices.choices: error = f'Invalid {section} field {field} value {value} provided. Invalid value choice.' self.validation_error_messages.append(error) diff --git a/src/pybritive/helpers/credentials.py b/src/pybritive/helpers/credentials.py index 767c8c5..feaa0cb 100644 --- a/src/pybritive/helpers/credentials.py +++ b/src/pybritive/helpers/credentials.py @@ -7,11 +7,13 @@ import time import webbrowser from pathlib import Path +from typing import Optional import click import jwt import requests from britive.britive import Britive +from britive.helpers.utils import parse_tenant from dateutil import parser from requests.adapters import HTTPAdapter, Retry @@ -52,19 +54,19 @@ def __init__( tenant_name: str, tenant_alias: str, cli: any, - federation_provider: str = None, + federation_provider: Optional[str] = None, browser: str = os.getenv('PYBRITIVE_BROWSER'), ): self.cli = cli self.tenant = tenant_name self.alias = tenant_alias - self.base_url = f'https://{Britive.parse_tenant(tenant_name)}' + self.base_url = f'https://{parse_tenant(tenant_name)}' self.federation_provider = federation_provider self.browser = browser self.session = None # not sure if we really need 32 random bytes or if any random string would work, but it was carried forward from - # the now deprecated node.js CLI, so it remains for the time being. + # the now deprecated node.js CLI, so it remains for the time being. while True: # will break eventually when we get values that do not include -- self.verifier = b64_encode_url_safe(bytes([random.getrandbits(8) for _ in range(0, 32)])) self.auth_token = b64_encode_url_safe(bytes(hashlib.sha512(self.verifier.encode('utf-8')).digest())) @@ -117,7 +119,7 @@ def perform_interactive_login(self): num_tries = 1 while True: if num_tries > 60: - raise InteractiveLoginTimeout() + raise InteractiveLoginTimeout response = self.retrieve_tokens() if response.status_code >= 400: @@ -125,7 +127,6 @@ def perform_interactive_login(self): num_tries += 1 else: credentials = response.json()['authenticationResult'] - try: # attempt to pull the expiration time from the jwt expiration_time_ms = self._extract_exp_from_jwt( @@ -194,7 +195,7 @@ def perform_federation_provider_authentication(self): duration = int(helper[1]) except ValueError: self.cli.print( - f'Invalid federation provider duration {helper[1]} provided - defaulting ' f'to {duration} seconds.' + f'Invalid federation provider duration {helper[1]} provided - defaulting to {duration} seconds.' ) # generate the token @@ -260,8 +261,7 @@ def get_token(self): else: self.perform_interactive_login() - token = self._get_token() - return token + return self._get_token() def has_valid_credentials(self): if not self.credentials or self.credentials == {}: @@ -280,7 +280,7 @@ def __init__( tenant_name: str, tenant_alias: str, cli: any, - federation_provider: str = None, + federation_provider: Optional[str] = None, browser: str = os.getenv('PYBRITIVE_BROWSER'), ): home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home())) @@ -333,8 +333,8 @@ def __init__( tenant_name: str, tenant_alias: str, cli: any, - passphrase: str = None, - federation_provider: str = None, + passphrase: Optional[str] = None, + federation_provider: Optional[str] = None, browser: str = os.getenv('PYBRITIVE_BROWSER'), ): home = os.getenv('PYBRITIVE_HOME_DIR', str(Path.home())) diff --git a/src/pybritive/helpers/encryption.py b/src/pybritive/helpers/encryption.py index 325beb8..2c8a5e7 100644 --- a/src/pybritive/helpers/encryption.py +++ b/src/pybritive/helpers/encryption.py @@ -1,6 +1,7 @@ import base64 import os import uuid +from typing import Optional from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.backends import default_backend @@ -13,7 +14,7 @@ class InvalidPassphraseException(Exception): class StringEncryption: - def __init__(self, passphrase: str = None): + def __init__(self, passphrase: Optional[str] = None): self.passphrase = passphrase or str(uuid.getnode()) # TODO change? @staticmethod @@ -42,4 +43,4 @@ def decrypt(self, ciphertext: str): key = self._key(b64salt) return Fernet(key).decrypt(base64.b64decode(ciphertext.encode())).decode('utf-8') except InvalidToken as e: - raise InvalidPassphraseException() from e + raise InvalidPassphraseException from e diff --git a/src/pybritive/helpers/k8s_exec.py b/src/pybritive/helpers/k8s_exec.py index 3ff4c09..0765ea4 100644 --- a/src/pybritive/helpers/k8s_exec.py +++ b/src/pybritive/helpers/k8s_exec.py @@ -59,7 +59,7 @@ def main(): exit() if not creds: - from ..britive_cli import BritiveCli # lazy load for performance purposes + from pybritive.britive_cli import BritiveCli # lazy load for performance purposes b = BritiveCli( tenant_name=args['tenant'], diff --git a/src/pybritive/helpers/kube_config_builder.py b/src/pybritive/helpers/kube_config_builder.py index a397930..30c4a0b 100644 --- a/src/pybritive/helpers/kube_config_builder.py +++ b/src/pybritive/helpers/kube_config_builder.py @@ -5,14 +5,14 @@ import yaml -from ..britive_cli import BritiveCli +from pybritive.britive_cli import BritiveCli + from .config import ConfigManager def sanitize(name: str): - name = name.lower() # name = name.replace(' ', '_').replace('/', "_").replace('\\', '_') - return name + return name.lower() def check_env_var(filename, cli: BritiveCli): @@ -95,9 +95,9 @@ def valid_cert(cert: str, profile: str, cli: BritiveCli): try: decoded_cert = base64.b64decode(cert).decode('utf-8') if not decoded_cert.startswith('-----BEGIN CERTIFICATE-----'): - raise ValueError() + raise ValueError if not decoded_cert.strip().endswith('-----END CERTIFICATE-----'): - raise ValueError() + raise ValueError return True except Exception: cli.print(f'could not properly decode certificate authority data for profile {profile} - skipping this cluster') diff --git a/src/pybritive/helpers/profile_argument_decorator.py b/src/pybritive/helpers/profile_argument_decorator.py index aaf3f38..6fe5860 100644 --- a/src/pybritive/helpers/profile_argument_decorator.py +++ b/src/pybritive/helpers/profile_argument_decorator.py @@ -3,7 +3,7 @@ import click -from ..completers.profile import profile_completer +from pybritive.completers.profile import profile_completer def validate_profile(ctx, param, value): diff --git a/src/pybritive/options/britive_options.py b/src/pybritive/options/britive_options.py index 854821e..ae31a9c 100644 --- a/src/pybritive/options/britive_options.py +++ b/src/pybritive/options/britive_options.py @@ -1,40 +1,42 @@ import click -from ..options.alias import option as alias -from ..options.aws_console_duration import option as aws_console_duration -from ..options.aws_credentials_file import option as aws_credentials_file -from ..options.aws_profile import option as aws_profile -from ..options.blocktime import option as blocktime -from ..options.browser import option as browser -from ..options.checked_out import option as checked_out -from ..options.configure_alias import option as configure_alias -from ..options.configure_backend import option as configure_backend -from ..options.configure_prompt import option as configure_prompt -from ..options.configure_tenant import option as configure_tenant -from ..options.console import option as console -from ..options.extend import option as extend -from ..options.federation_provider import option as federation_provider -from ..options.file import option as file -from ..options.force_renew import option as force_renew -from ..options.gcloud_key_file import option as gcloud_key_file -from ..options.justification import option as justification -from ..options.maxpolltime import option as maxpolltime -from ..options.mode import option as mode -from ..options.otp import option as otp -from ..options.output_format import option as output_format -from ..options.passphrase import option as passphrase -from ..options.profile_type import option as profile_type -from ..options.query import option as query -from ..options.silent import option as silent -from ..options.ssh_hostname import option as ssh_hostname -from ..options.ssh_key_source import option as ssh_key_source -from ..options.ssh_port import option as ssh_port -from ..options.ssh_push_public_key import option as ssh_push_public_key -from ..options.ssh_username import option as ssh_username -from ..options.tenant import option as tenant -from ..options.token import option as token -from ..options.verbose import option as verbose -from ..options.version import option as version +from pybritive.options.alias import option as alias +from pybritive.options.aws_console_duration import option as aws_console_duration +from pybritive.options.aws_credentials_file import option as aws_credentials_file +from pybritive.options.aws_profile import option as aws_profile +from pybritive.options.blocktime import option as blocktime +from pybritive.options.browser import option as browser +from pybritive.options.checked_out import option as checked_out +from pybritive.options.configure_alias import option as configure_alias +from pybritive.options.configure_backend import option as configure_backend +from pybritive.options.configure_prompt import option as configure_prompt +from pybritive.options.configure_tenant import option as configure_tenant +from pybritive.options.console import option as console +from pybritive.options.extend import option as extend +from pybritive.options.federation_provider import option as federation_provider +from pybritive.options.file import option as file +from pybritive.options.force_renew import option as force_renew +from pybritive.options.gcloud_key_file import option as gcloud_key_file +from pybritive.options.justification import option as justification +from pybritive.options.maxpolltime import option as maxpolltime +from pybritive.options.mode import option as mode +from pybritive.options.otp import option as otp +from pybritive.options.output_format import option as output_format +from pybritive.options.passphrase import option as passphrase +from pybritive.options.profile_type import option as profile_type +from pybritive.options.query import option as query +from pybritive.options.silent import option as silent +from pybritive.options.ssh_hostname import option as ssh_hostname +from pybritive.options.ssh_key_source import option as ssh_key_source +from pybritive.options.ssh_port import option as ssh_port +from pybritive.options.ssh_push_public_key import option as ssh_push_public_key +from pybritive.options.ssh_username import option as ssh_username +from pybritive.options.tenant import option as tenant +from pybritive.options.ticket_id import option as ticket_id +from pybritive.options.ticket_type import option as ticket_type +from pybritive.options.token import option as token +from pybritive.options.verbose import option as verbose +from pybritive.options.version import option as version options_map = { 'tenant': tenant, @@ -50,6 +52,8 @@ 'configure_alias': configure_alias, 'configure_prompt': configure_prompt, 'justification': justification, + 'ticket_type': ticket_type, + 'ticket_id': ticket_id, 'otp': otp, 'silent': silent, 'console': console, @@ -76,7 +80,7 @@ } -def britive_options(*args, **kwargs): +def britive_options(**kwargs): def inner(f): names = [n.strip() for n in kwargs['names'].split(',')] names.reverse() diff --git a/src/pybritive/options/browser.py b/src/pybritive/options/browser.py index 3bce8b6..bb8b7f3 100644 --- a/src/pybritive/options/browser.py +++ b/src/pybritive/options/browser.py @@ -1,6 +1,6 @@ import click -from ..choices.browser import browser_choices +from pybritive.choices.browser import browser_choices option = click.option( '--browser', diff --git a/src/pybritive/options/configure_backend.py b/src/pybritive/options/configure_backend.py index 964f3a9..ba29cc6 100644 --- a/src/pybritive/options/configure_backend.py +++ b/src/pybritive/options/configure_backend.py @@ -1,6 +1,6 @@ import click -from ..choices.backend import backend_choices +from pybritive.choices.backend import backend_choices option = click.option( '--backend', diff --git a/src/pybritive/options/mode.py b/src/pybritive/options/mode.py index 1893340..94702fe 100644 --- a/src/pybritive/options/mode.py +++ b/src/pybritive/options/mode.py @@ -1,6 +1,6 @@ import click -from ..choices.mode import mode_choices +from pybritive.choices.mode import mode_choices # as of v1.1.0 not setting a default value here on purpose as the config file now has an # aws section which provides a default value if the --mode option is omitted diff --git a/src/pybritive/options/output_format.py b/src/pybritive/options/output_format.py index 33ee4b5..8d5fa49 100644 --- a/src/pybritive/options/output_format.py +++ b/src/pybritive/options/output_format.py @@ -1,6 +1,6 @@ import click -from ..choices.output_format import output_format_choices +from pybritive.choices.output_format import output_format_choices option = click.option( '--format', diff --git a/src/pybritive/options/profile_type.py b/src/pybritive/options/profile_type.py index 6cf1be7..8a34687 100644 --- a/src/pybritive/options/profile_type.py +++ b/src/pybritive/options/profile_type.py @@ -1,6 +1,6 @@ import click -from ..choices.profile_type import profile_type_choices +from pybritive.choices.profile_type import profile_type_choices option = click.option( '--profile-type', diff --git a/src/pybritive/options/ssh_key_source.py b/src/pybritive/options/ssh_key_source.py index e9a3f43..768b912 100644 --- a/src/pybritive/options/ssh_key_source.py +++ b/src/pybritive/options/ssh_key_source.py @@ -1,6 +1,6 @@ import click -from ..choices.ssh_key_source import ssh_key_source_choices +from pybritive.choices.ssh_key_source import ssh_key_source_choices option = click.option( '--key-source', diff --git a/src/pybritive/options/ssh_push_public_key.py b/src/pybritive/options/ssh_push_public_key.py index dcc4221..37bc45c 100644 --- a/src/pybritive/options/ssh_push_public_key.py +++ b/src/pybritive/options/ssh_push_public_key.py @@ -1,6 +1,6 @@ import click -from ..choices.ssh_push_public_key import ssh_push_public_key_choices +from pybritive.choices.ssh_push_public_key import ssh_push_public_key_choices def validate(ctx, param, value): diff --git a/src/pybritive/options/ticket_id.py b/src/pybritive/options/ticket_id.py new file mode 100644 index 0000000..9ad3534 --- /dev/null +++ b/src/pybritive/options/ticket_id.py @@ -0,0 +1,8 @@ +import click + +option = click.option( + '--ticket-id', + default=None, + show_default=True, + help='Ticket ID for the ITSM process, if a profile requires a ticket.', +) diff --git a/src/pybritive/options/ticket_type.py b/src/pybritive/options/ticket_type.py new file mode 100644 index 0000000..aed87eb --- /dev/null +++ b/src/pybritive/options/ticket_type.py @@ -0,0 +1,8 @@ +import click + +option = click.option( + '--ticket-type', + default=None, + show_default=True, + help='Ticket type for the ITSM process, if a profile requires a ticket.', +) diff --git a/src/pybritive/options/version.py b/src/pybritive/options/version.py index 7c29e54..0f8743f 100644 --- a/src/pybritive/options/version.py +++ b/src/pybritive/options/version.py @@ -9,7 +9,7 @@ def version_callback(ctx, self, value): if value: cli_version = version('pybritive') click.echo(f'pybritive: {cli_version} / platform: {platform.platform()} / python: {platform.python_version()}') - raise click.exceptions.Exit() + raise click.exceptions.Exit option = click.option( diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py index 1a8056a..68d7341 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -54,5 +54,5 @@ def cli(): @pytest.fixture def unset_api_token_env_var(): - if os.getenv((name := 'BRITIVE_API_TOKEN')): + if os.getenv(name := 'BRITIVE_API_TOKEN'): del os.environ[name] diff --git a/tests/test_0200_configure.py b/tests/test_0200_configure.py index 3f19949..6731f46 100644 --- a/tests/test_0200_configure.py +++ b/tests/test_0200_configure.py @@ -1,13 +1,14 @@ import os +from typing import Optional def read_config(): local_home = os.getenv('PYBRITIVE_HOME_DIR') - with open(f'{local_home}/.britive/pybritive.config', 'r', encoding='utf-8') as f: + with open(f'{local_home}/.britive/pybritive.config', encoding='utf-8') as f: return f.read() -def common_asserts(result, substring: list = None, exit_code: int = 0): +def common_asserts(result, substring: Optional[list] = None, exit_code: int = 0): assert result.exit_code == exit_code if substring: if isinstance(substring, str): diff --git a/tests/test_0300_login.py b/tests/test_0300_login.py index a696b3b..b4024ec 100644 --- a/tests/test_0300_login.py +++ b/tests/test_0300_login.py @@ -7,12 +7,12 @@ def test_login_interactive(runner, cli): result = runner.invoke(cli, 'login') api_login_warning = 'Interactive login unavailable when an API token is provided.' if api_login_warning in result.output: - warnings.warn(api_login_warning.upper()) + warnings.warn(api_login_warning.upper(), stacklevel=2) return assert result.exit_code == 0 local_home = os.getenv('PYBRITIVE_HOME_DIR') path = Path(Path(local_home) / '.britive' / 'pybritive.credentials.encrypted') - with open(str(path), 'r', encoding='utf-8') as f: + with open(str(path), encoding='utf-8') as f: data = f.read() assert 'accessToken=' in data assert os.getenv('PYBRITIVE_TEST_TENANT') in data diff --git a/tests/test_0600_secret.py b/tests/test_0600_secret.py index 616fce6..7e2f758 100644 --- a/tests/test_0600_secret.py +++ b/tests/test_0600_secret.py @@ -19,7 +19,7 @@ def test_download(runner, cli): result = runner.invoke(cli, 'secret download /pybritive-test-file'.split(' ')) message = 'wrote contents of secret file to' common_asserts(result, message) - with open(filename, 'r', encoding='utf-8') as f: + with open(filename, encoding='utf-8') as f: assert 'test' in f.read() path = Path(filename) path.unlink(missing_ok=True) @@ -30,7 +30,7 @@ def test_download_filename_provided(runner, cli): result = runner.invoke(cli, f'secret download /pybritive-test-file -F {filename}'.split(' ')) message = 'wrote contents of secret file to' common_asserts(result, message) - with open(filename, 'r', encoding='utf-8') as f: + with open(filename, encoding='utf-8') as f: assert 'test' in f.read() path = Path(filename) path.unlink(missing_ok=True) diff --git a/tests/test_0700_cache.py b/tests/test_0700_cache.py index f971850..733c35f 100644 --- a/tests/test_0700_cache.py +++ b/tests/test_0700_cache.py @@ -7,7 +7,7 @@ def test_cache_profiles(runner, cli): result = runner.invoke(cli, 'cache profiles'.split(' ')) local_home = os.getenv('PYBRITIVE_HOME_DIR') path = Path(Path(local_home) / '.britive' / 'pybritive.cache') - with open(str(path), 'r', encoding='utf-8') as f: + with open(str(path), encoding='utf-8') as f: data = json.loads(f.read()) assert result.exit_code == 0 assert 'profiles' in data diff --git a/tests/test_1000_clear.py b/tests/test_1000_clear.py index 8b84ffb..42c0af1 100644 --- a/tests/test_1000_clear.py +++ b/tests/test_1000_clear.py @@ -7,7 +7,7 @@ def test_clear_cache(runner, cli): result = runner.invoke(cli, 'clear cache'.split(' ')) local_home = os.getenv('PYBRITIVE_HOME_DIR') path = Path(Path(local_home) / '.britive' / 'pybritive.cache') - with open(str(path), 'r', encoding='utf-8') as f: + with open(str(path), encoding='utf-8') as f: data = json.loads(f.read()) assert result.exit_code == 0 assert 'profiles' in data diff --git a/tests/test_1100_logout.py b/tests/test_1100_logout.py index 565eb95..b93cad5 100644 --- a/tests/test_1100_logout.py +++ b/tests/test_1100_logout.py @@ -7,6 +7,6 @@ def test_logout(runner, cli): assert result.exit_code == 0 local_home = os.getenv('PYBRITIVE_HOME_DIR') path = Path(Path(local_home) / '.britive' / 'pybritive.credentials.encrypted') - with open(str(path), 'r', encoding='utf-8') as f: + with open(str(path), encoding='utf-8') as f: data = f.read() assert len(data) == 0