diff --git a/README.md b/README.md index 88016a3..26de0ea 100644 --- a/README.md +++ b/README.md @@ -8,3 +8,4 @@ We currently provide SDKs for the following platforms: * [Node.js](./node) * [PHP](./php) * [DotNet](./dotnet) +* [Python](./python) diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..7b88a56 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,7 @@ +.pytest_cache/* +tests/.pytest_cache/* + +.eggs/* +*.egg-info/* +dist/* +build/* \ No newline at end of file diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..2c552a4 --- /dev/null +++ b/python/README.md @@ -0,0 +1,50 @@ +# Phenix EdgeAuth Digest Tokens for Python + +Easily generate secure digest tokens to use with the Phenix platform without requiring any networking activity. + +## Installation + +To install Phenix Edge Authorization Digest Token with pip: + +```shell script +$ pip install phenix-edge-auth +``` + +## Testing + +```shell script +$ pytest -vv +``` + +## Updating PyPi + +```shell script +$ python setup.py bdist_whe +$ twine upload dist/* +``` + +## Example + +```python +from edgeauth.token_builder import TokenBuilder + +# Create a token to access a channel +token = TokenBuilder() + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_in_seconds(3600) \ + .for_channel('us-northeast#my-application-id#my-channel.1345') \ + .build() +``` + +## Command Line Examples + +Display the help information: +```shell script +$ edgeauth_cli --help +``` + +Create a token for channel access: +```shell script +$ edgeauth_cli --application_id "my-application-id" --secret "my-secret" --expires_in_seconds 3600 --channel "us-northeast#my-application-id#my-channel.1345" +``` diff --git a/python/edgeauth/__init__.py b/python/edgeauth/__init__.py new file mode 100644 index 0000000..ff95117 --- /dev/null +++ b/python/edgeauth/__init__.py @@ -0,0 +1,7 @@ +""" +Phenix EdgeAuth +""" + +__version__ = '0.1.1' +__author__ = 'Brandon Drake' +__credits__ = 'Phenix Real Time Solutions, Inc.' diff --git a/python/edgeauth/cli.py b/python/edgeauth/cli.py new file mode 100644 index 0000000..1ab14c9 --- /dev/null +++ b/python/edgeauth/cli.py @@ -0,0 +1,100 @@ +"""CLI for EdgeAuth +""" + +import argparse +import sys + +from sys import version_info + +from .token_builder import TokenBuilder + +PY3_MIN_VERSION = (3, 6) + +def main(): + if version_info < PY3_MIN_VERSION: + raise Exception('Please use Python >= 3.6') + + parser = argparse.ArgumentParser(description='EdgeAuth token generator') + + + parser.add_argument('-u', '--application_id', required=True, help='The application ID') + parser.add_argument('-w', '--secret', required=True, help='The application secret') + + parser.add_argument('-l', '--expires_in_seconds', type=int, default=3600, help='Token life time in seconds') + parser.add_argument('-e', '--expires_at', help='Token expires at timestamp measured in milliseconds since UNIX epoch') + + parser.add_argument('-b', '--capabilities', help='Comma separated list of capabilities, e.g. for publishing') + + parser.add_argument('-z', '--session_id', help='Token is limited to the given session') + parser.add_argument('-x', '--remote_address', help='Token is limited to the given remote address') + parser.add_argument('-o', '--origin_stream_id', help='[STREAMING] Token is limited to the given origin stream') + + parser.add_argument('-c', '--channel', help='[STREAMING] Token is limited to the given channel') + parser.add_argument('-i', '--channel_alias', help='[STREAMING] Token is limited to the given channel alias') + + parser.add_argument('-m', '--room', help='[STREAMING] Token is limited to the given room') + parser.add_argument('-n', '--room_alias', help='[STREAMING] Token is limited to the given room alias') + + parser.add_argument('-t', '--tag', help='[STREAMING] Token is limited to the given origin stream tag') + parser.add_argument('-r', '--apply_tag', help='[REPORTING] Apply tag to the new stream') + + parser.add_argument('-a', '--authentication_only', action="store_true", help='Token can be used for authentication only') + parser.add_argument('-s', '--streaming_only', action="store_true", help='Token can be used for streaming only') + parser.add_argument('-p', '--publishing_only', action="store_true", help='Token can be used for publishing only') + + + args = parser.parse_args() + + token = TokenBuilder() \ + .with_application_id(args.application_id) \ + .with_secret(args.secret) + + if args.expires_at is not None: + token.expires_at(args.expires_at) + else: + token.expires_in_seconds(args.expires_in_seconds) + + if args.authentication_only: + token.for_authenticate_only() + + if args.streaming_only: + token.for_streaming_only() + + if args.publishing_only: + token.for_publishing_only() + + if args.capabilities: + for capability in args.capabilities.split(','): + token.with_capability(capability.strip()) + + if args.session_id: + token.for_session(args.session_id) + + if args.remote_address: + token.for_remote_address(args.remote_address) + + if args.origin_stream_id: + token.for_origin_stream(args.origin_stream_id) + + if args.channel: + token.for_channel(args.channel) + + if args.channel_alias: + token.for_channel_alias(args.channel_alias) + + if args.room: + token.for_channel(args.room) + + if args.room_alias: + token.for_channel_alias(args.room_alias) + + if args.tag: + token.for_tag(args.tag) + + if args.apply_tag: + token.apply_tag(args.apply_tag) + + return token.build() + +if __name__ == '__main__': + sys.exit(main()) \ No newline at end of file diff --git a/python/edgeauth/digest_tokens.py b/python/edgeauth/digest_tokens.py new file mode 100644 index 0000000..d12d14b --- /dev/null +++ b/python/edgeauth/digest_tokens.py @@ -0,0 +1,172 @@ +import hashlib +import hmac +import base64 +import json +from io import StringIO + + +DIGEST_TOKEN_PREFIX = 'DIGEST:' +ENCODING = 'utf-8' + + +class BadToken(Exception): + pass + + +class BadDigest(Exception): + pass + + +class DigestTokens: + def is_digest_token(self, encoded_token): + """Check if a value is a valid digest token. + + Keyword arguments: + encodedToken -- an encoded token + """ + return encoded_token and isinstance(encoded_token, str) and encoded_token.startswith(DIGEST_TOKEN_PREFIX) + + def verify_and_decode(self, secret, encoded_token): + """Verifies and decodes a digest token. + + Keywork arguments: + secret -- the shared secret used to sign the token + encoded_token -- the encoded token + """ + try: + if not isinstance(secret, str): + raise TypeError('Secret must be a string') + + if not isinstance(encoded_token, str): + raise TypeError('Encoded token must be a string') + + if not self.is_digest_token(encoded_token): + return { + 'verified': False, + 'code': 'not-a-digest-token', + } + + encoded_digest_token = encoded_token[len(DIGEST_TOKEN_PREFIX):] + decoded_digest_token_as_string = base64 \ + .b64decode(encoded_digest_token) + + info = None + + try: + info = json.loads(decoded_digest_token_as_string) + except Exception: + raise BadToken() + + if not info['applicationId'] or not isinstance(info['applicationId'], str): + raise BadToken() + + if not info['digest'] or not isinstance(info['digest'], str): + raise BadToken() + + if not info['token'] or not isinstance(info['token'], str): + raise BadToken() + + digest_as_string = self.calculate_digest( + info['applicationId'], + secret, + info['token'] + ) + + digest = info['digest'] + + if not digest_as_string == digest: + raise BadDigest() + + value = json.loads(info['token']) + + value['applicationId'] = info['applicationId'] + + return { + 'code': 'verified', + 'value': value, + 'verified': True, + } + except BadToken as ex: + return { + 'code': 'bad-token', + # 'message': str(ex), + 'verified': False, + } + except BadDigest: + return { + 'code': 'bad-digest', + 'verified': False, + } + except Exception as ex: + return { + 'code': 'server-error', + 'message': str(ex), + 'verified': False, + } + + def sign_and_encode(self, application_id, secret, token): + """Signs and encodes a digest token. + + Keyword arguments: + application_id -- the application ID used to sign the token + secret -- the shared secret used to sign the token + token -- the raw token object to sign + """ + if not isinstance(secret, str): + raise TypeError('Secret must be a string') + + if not isinstance(token, dict): + raise TypeError('Token must be a dictionary') + + if 'expires' not in token or not type(token['expires']) in [float, int]: + raise ValueError('Token must have an expiration (milliseconds since UNIX epoch)') + + if 'application_id' in token: + raise ValueError('Token should not have an application_id property') + + # io = StringIO() + # json.dump(token, io) + # token_as_string = io.getvalue() + + token_as_string = json.dumps(token, separators=(',', ':')) + + digest = self.calculate_digest(application_id, secret, token_as_string) + + info = { + 'applicationId': application_id, + 'digest': digest, + 'token': token_as_string, + } + + decoded_digest_token_as_string = json.dumps(info, separators=(',', ':')) + encoded_digest_token = base64 \ + .b64encode(decoded_digest_token_as_string.encode(ENCODING)) \ + .decode(ENCODING) + + return DIGEST_TOKEN_PREFIX + encoded_digest_token + + def calculate_digest(self, application_id, secret, token): + """Calculates the digest for a token. + + Keyword arguments: + application_id -- the application ID used to sign the token + secret -- the shared secret used to sign the token + token -- encoded token + """ + if not isinstance(application_id, str): + raise TypeError('Application Id must be a string') + + if not isinstance(secret, str): + raise TypeError('Secret must be a string') + + if not isinstance(token, str): + raise TypeError('Token must be a string') + + salt = (application_id + secret).encode(ENCODING) + verify = hmac.new(salt, digestmod=hashlib.sha512) + + verify.update(token.encode(ENCODING)) + + digest = base64.b64encode(verify.digest()).decode(ENCODING) + + return digest diff --git a/python/edgeauth/token_builder.py b/python/edgeauth/token_builder.py new file mode 100644 index 0000000..5e6bc30 --- /dev/null +++ b/python/edgeauth/token_builder.py @@ -0,0 +1,244 @@ +from datetime import datetime, timezone + +from .digest_tokens import DigestTokens + + +class TokenBuilder: + application_id = None + secret = None + token = {} + + def __init__(self): + """Token builder helper class to create digest tokens that can be used with the Phenix platform. + """ + self.application_id = None + self.secret = None + self.token = {} + + def with_application_id(self, application_id): + """The application ID used to sign the token. (required) + + Keyword arguments: + application_id -- the application ID to sign the token + """ + if not isinstance(application_id, str): + raise TypeError('Application Id must be a string') + + self.application_id = application_id + + return self + + def with_secret(self, secret): + """The secret used to sign the token. (required) + + Keyword arguments: + secret -- the shared secret ro sigh the token + """ + if not isinstance(secret, str): + raise TypeError('Secret must be a string') + + self.secret = secret + + return self + + def with_capability(self, capability): + """Set a capability for the token, e.g. to publish a stream. (optional) + + Keyword arguments: + capability -- the valid capability + """ + if not isinstance(capability, str): + raise TypeError('Capability must be a string') + + token = self.token + capabilities = set(token['capabilities']) if 'capabilities' in token else set([]) + + capabilities.add(capability) + + self.token['capabilities'] = sorted(list(capabilities)) + + return self + + def expires_in_seconds(self, seconds): + """Expires the token in the given time. + NOTE: Your time must be synced with the atomic clock for expiration time to work properly. + + Keyword arguments: + seconds -- the time in seconds + """ + if not isinstance(seconds, int) and not isinstance(seconds, float): + raise TypeError('Seconds must be a float or an int') + + self.token['expires'] = (datetime.now().replace(tzinfo=timezone.utc).timestamp() + seconds) * 1000 + + return self + + def expires_at(self, timestamp): + """Expires the token at the given timestamp + NOTE: Your time must be synced with the atomic clock for expiration time to work properly. + + Keyword arguments: + timestamp -- the time as a timestamp + """ + if not isinstance(timestamp, int) and not isinstance(timestamp, float): + raise TypeError('Timestamp must be a float or an int') + + self.token['expires'] = timestamp + + return self + + def for_authenticate_only(self): + """Limit the token to authentication only. (optional) + """ + self.token['type'] = 'auth' + + return self + + def for_streaming_only(self): + """Limit the token to streaming only. (optional) + """ + self.token['type'] = 'stream' + + return self + + def for_publishing_only(self): + """Limit the token to publishing only. (optional) + """ + self.token['type'] = 'publish' + + return self + + def for_session(self, session_id): + """Limit the token to the specified session ID. (optional) + + Keyword arguments: + session_id -- the session id + """ + if not isinstance(session_id, str): + raise TypeError('Session Id must be a string') + + self.token['sessionId'] = session_id + + return self + + def for_remote_address(self, remote_address): + """Limit the token to the specified remote address. (optional) + + Keyword arguments: + remote_address -- the remote address + """ + if not isinstance(remote_address, str): + raise TypeError('Remote Address must be a string') + + self.token['remoteAddress'] = remote_address + + return self + + def for_origin_stream(self, origin_stream_id): + """Limit the token to the specified origin stream ID. (optional) + + Keyword arguments: + origin_stream_id -- the origin stream ID + """ + if not isinstance(origin_stream_id, str): + raise TypeError('Origin Stream Id must be a string') + + self.token['originStreamId'] = origin_stream_id + + return self + + def for_channel(self, channel_id): + """Limit the token to the specified channel ID. (optional) + + Keyword arguments: + channel_id -- the channel id + """ + if not isinstance(channel_id, str): + raise TypeError('Channel ID must be a string') + + self.for_tag('channelId:{}'.format(channel_id)) + + return self + + def for_channel_alias(self, channel_alias): + """Limit the token to the specified channel alias. (optional) + + Keyword arguments: + channel_alias -- the channel alias + """ + if not isinstance(channel_alias, str): + raise TypeError('Channel Alias must be a string') + + self.for_tag('channelAlias:{}'.format(channel_alias)) + + return self + + def for_room(self, room_id): + """Limit the token to the specified room ID. (optional) + + Keyword arguments: + room_id -- the room id + """ + if not isinstance(room_id, str): + raise TypeError('Room ID must be a string') + + self.for_tag('roomId:{}'.format(room_id)) + + return self + + def for_room_alias(self, room_alias): + """Limit the token to the specified room alias. (optional) + + Keyword arguments: + room_alias -- the room alias + """ + if not isinstance(room_alias, str): + raise TypeError('Room Alias must be a string') + + self.for_tag('roomAlias:{}'.format(room_alias)) + + return self + + def for_tag(self, tag): + """Limit the token to the specified tag on the origin stream. (optional) + + Keyword arguments: + tag -- the tag required on the origin stream + """ + if not isinstance(tag, str): + raise TypeError('Tag must be a string') + + self.token['requiredTag'] = tag + + return self + + def apply_tag(self, tag): + """Apply the tag to the stream when it is setup. (optional) + + Keyword arguments: + tag -- the tag added to the new stream + """ + if not isinstance(tag, str): + raise TypeError('Tag must be a string') + + token = self.token + apply_tags = set(token['applyTags']) if 'applyTags' in token else set() + + apply_tags.add(tag) + + self.token['applyTags'] = list(apply_tags) + + return self + + def build(self): + """Build the signed token + """ + token = DigestTokens() + + if not self.application_id: + raise ValueError('application_id must be set using the "with_application_id" method before calling "build"') + + if not self.secret: + raise ValueError('secret must be set using the "with_secret" method call before calling "build"') + + return token.sign_and_encode(self.application_id, self.secret, self.token) diff --git a/python/setup.cfg b/python/setup.cfg new file mode 100644 index 0000000..5e98a2a --- /dev/null +++ b/python/setup.cfg @@ -0,0 +1,25 @@ +[bumpversion] +current_version = 0.1.1 +commit = True +tag = True + +[bumpversion:file:setup.py] +search = version='{current_version}' +replace = version='{new_version}' + +[bumpversion:file:test_pkg/__init__.py] +search = __version__ = '{current_version}' +replace = __version__ = '{new_version}' + +[bdist_wheel] +universal = 1 + +[flake8] +exclude = docs + +[aliases] +# Define setup.py command aliases here +test = pytest + +[tool:pytest] +collect_ignore = ['setup.py'] diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000..3b7f46d --- /dev/null +++ b/python/setup.py @@ -0,0 +1,45 @@ +from setuptools import setup, find_packages + +with open("README.md", "r") as fh: + long_description = fh.read() + +requirements = [ + # 'argparse', + # 'PrettyPrinter', +] + +setup_requirements = [ + 'pytest-runner', +] + +test_requirements = [ + 'pytest>=3', +] + +setup( + name='phenix-edge-auth', + version='0.1.1', + description='Easily generate secure digest tokens to use with the Phenix platform', + long_description=long_description, + long_description_content_type="text/markdown", + url='https://github.com/PhenixRTS/EdgeAuth/python', + author='Brandon Drake', + author_email='tomoguisuru@gmail.com', + license='Apache-2.0', + packages=find_packages(include=['edgeauth', 'edgeauth.*']), + install_requires=requirements, + setup_requires=setup_requirements, + test_suite='tests', + tests_require=test_requirements, + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], + entry_points={ + 'console_scripts': [ + 'edgeauth_cli=edgeauth.cli:main', + ], + }, + python_requires='>=3.6', +) diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/tests/test_when_verifying_a_bad_token.py b/python/tests/test_when_verifying_a_bad_token.py new file mode 100644 index 0000000..52e5f40 --- /dev/null +++ b/python/tests/test_when_verifying_a_bad_token.py @@ -0,0 +1,13 @@ +from edgeauth.digest_tokens import DigestTokens + + +class TestWhenVerifyingABadToken: + def test_the_token_fails_to_verify(self): + token = 'DIGEST:bad-token' + result = DigestTokens() \ + .verify_and_decode('bad-secret', token) + + assert result['verified'] is False + assert result['code'] == 'bad-token' + assert 'message' not in result + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_channel.py b/python/tests/test_when_verifying_a_token_for_a_channel.py new file mode 100644 index 0000000..108f87a --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_channel.py @@ -0,0 +1,41 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForAChannel: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel('us-northeast#my-application-id#my-channel.134566') \ + .for_streaming_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJZNGM3Tmp6eDVhalkzLzRWK3pwTVliNTBBU1ZCUXc0NlAvS2dwc3JrTnpDdFAzZWM5NzVzblorN3lJNzZiM0wrTmNtb2FoL3hOTUhQZ00vNEExaDI4UT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsSWQ6dXMtbm9ydGhlYXN0I215LWFwcGxpY2F0aW9uLWlkI215LWNoYW5uZWwuMTM0NTY2XCIsXCJ0eXBlXCI6XCJzdHJlYW1cIn0ifQ==' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'channelId:us-northeast#my-application-id#my-channel.134566' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_channel_alias.py b/python/tests/test_when_verifying_a_token_for_a_channel_alias.py new file mode 100644 index 0000000..4a6e8c6 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_channel_alias.py @@ -0,0 +1,41 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForAChannelAlias: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel_alias('my-channel') \ + .for_streaming_only() \ + .build() + + def test_the_token_matches_the_expected_value(self): + self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJPMk90R1ZBMlErTGlhRkdjSjZ0cnlXZWE4L2l2dWFQR2gzcFJpcVd3ZlJPVWdBSSs0dFdaYXdBc011Y2MyMHNRTjZpaGZtVGVDNFVubXVoWko5aHBxUT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsQWxpYXM6bXktY2hhbm5lbFwiLFwidHlwZVwiOlwic3RyZWFtXCJ9In0=' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'channelAlias:my-channel' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_remote_address.py b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_remote_address.py new file mode 100644 index 0000000..b88d086 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_remote_address.py @@ -0,0 +1,43 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForAChannelAliasAndRemoteAddress: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel_alias('my-channel') \ + .for_remote_address('10.1.2.3') \ + .for_streaming_only() \ + .build() + + def test_the_token_matches_the_expected_value(self): + self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiI4MitYd1dITVRUc0xWYThKcnFPUmdjYlRXL2g2clFBTlF1MjgvRytQeHllQ09qSHEyb2xDYzVacUJ1MktqN0tGYmYyTC84TDZyaE9xTTZPMjNBR29HUT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsQWxpYXM6bXktY2hhbm5lbFwiLFwicmVtb3RlQWRkcmVzc1wiOlwiMTAuMS4yLjNcIixcInR5cGVcIjpcInN0cmVhbVwifSJ9' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'channelAlias:my-channel' + assert value['remoteAddress'] == '10.1.2.3' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_session.py b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_session.py new file mode 100644 index 0000000..42bfd86 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_session.py @@ -0,0 +1,43 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForAChannelAliasAndSession: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel_alias('my-channel') \ + .for_session('session-id') \ + .for_streaming_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJBQi9Nanp2a1lnMGRTODF6aU1SVDZ3OUtwWmtjMU42U3VMTW56V09CQVJQZWJuenRHZTlmM2ZNS1FURXVqaHpVTkY0TWVsNkpMekFiWlZ3TFBSbEN4QT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsQWxpYXM6bXktY2hhbm5lbFwiLFwic2Vzc2lvbklkXCI6XCJzZXNzaW9uLWlkXCIsXCJ0eXBlXCI6XCJzdHJlYW1cIn0ifQ==' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'channelAlias:my-channel' + assert value['sessionId'] == 'session-id' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_with_a_tag_added.py b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_with_a_tag_added.py new file mode 100644 index 0000000..ea7368a --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_channel_alias_and_with_a_tag_added.py @@ -0,0 +1,43 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForAChannelAliasAndWithATagAdded: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel_alias('my-channel') \ + .for_streaming_only() \ + .apply_tag('customer1') \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJMU0VnS2dGTy9aRUdxdEFLazVZb0F6cFJuTnQ4enhwUjNsdEJ3cWtOR3E1VWdjWWZpcnZKTDk3NHhpangyNS9XbHpqaUg1dk5ZMHdaYklFSkE2MzJqdz09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsQWxpYXM6bXktY2hhbm5lbFwiLFwidHlwZVwiOlwic3RyZWFtXCIsXCJhcHBseVRhZ3NcIjpbXCJjdXN0b21lcjFcIl19In0=' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'channelAlias:my-channel' + assert 'customer1' in value['applyTags'] + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_room.py b/python/tests/test_when_verifying_a_token_for_a_room.py new file mode 100644 index 0000000..d066225 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_room.py @@ -0,0 +1,41 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForARoomAlias: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_room_alias('my-room') \ + .for_streaming_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiI1UkN3a0FrdFdJTDNWNllXN0V0dE14ejhpZXJvMWZkcXF0dEdRVFdaUDVCZ1k0OFhIUGltYmx3dDl1QUgyQWI3bHVVcWs0OG1DQktveE10WkhpaHNoQT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJyb29tQWxpYXM6bXktcm9vbVwiLFwidHlwZVwiOlwic3RyZWFtXCJ9In0=' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'roomAlias:my-room' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_room_alias.py b/python/tests/test_when_verifying_a_token_for_a_room_alias.py new file mode 100644 index 0000000..12f3fbf --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_room_alias.py @@ -0,0 +1,41 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForARoom: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_room('my-room.123456') \ + .for_streaming_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiI2WWdud09qWkx4Mk8zQXJjd29CUlVKU0UyYkRVNWVGY0FIYjI3OEJxVlMvcmplMXlsRU51bE5BSTVqakd2Mjc3VnZTTEtkYk1jTW1HenA3Nm9wNkNmZz09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJyb29tSWQ6bXktcm9vbS4xMjM0NTZcIixcInR5cGVcIjpcInN0cmVhbVwifSJ9' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'roomId:my-room.123456' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_a_tag.py b/python/tests/test_when_verifying_a_token_for_a_tag.py new file mode 100644 index 0000000..2ef3d01 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_a_tag.py @@ -0,0 +1,41 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForATag: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_tag('my-tag=awesome') \ + .for_streaming_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJGUGRrTFFyVGlsS0toRDduc2QzeDZoNWV1aXVsaDVCYy9lNEtmQWY0THB5Qno4N2trK2lrQWN5ZUppcFk3alo4clpTN1N0bWw1aERMWEJIZXkrbmw2QT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJteS10YWc9YXdlc29tZVwiLFwidHlwZVwiOlwic3RyZWFtXCJ9In0=' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['requiredTag'] == 'my-tag=awesome' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_publishing.py b/python/tests/test_when_verifying_a_token_for_publishing.py new file mode 100644 index 0000000..eebc6ee --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_publishing.py @@ -0,0 +1,40 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForPublishing: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_publishing_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJrVElBcDh4ZUlqRXBxU2p0R3Zha3JOR2FFWnl5S1hMdmRMdmpBTHpJYkhYQmtqVXg2eU9hOHNmTGVoMFJydnNHaDJFbHF5OE5MMVBFVG51QjdQR3Z6dz09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInR5cGVcIjpcInB1Ymxpc2hcIn0ifQ==' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['type'] == 'publish' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel.py b/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel.py new file mode 100644 index 0000000..103b89b --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel.py @@ -0,0 +1,42 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForPublishingToAChannel: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel('us-northeast#my-application-id#my-channel.134566') \ + .for_publishing_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJVZ3hjTDVVMlAvZDVtTXI4N3NzM3M5ZDdNNHo1elNZRGZrN0duL1BHS1d4S3NRS2t0c2pkN0Y5QTlRRHVQNnRSaTMzTG00TlpDVTZvSDFjbzFIa2Nmdz09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsSWQ6dXMtbm9ydGhlYXN0I215LWFwcGxpY2F0aW9uLWlkI215LWNoYW5uZWwuMTM0NTY2XCIsXCJ0eXBlXCI6XCJwdWJsaXNoXCJ9In0=' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['type'] == 'publish' + assert value['requiredTag'] == 'channelId:us-northeast#my-application-id#my-channel.134566' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel_alias.py b/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel_alias.py new file mode 100644 index 0000000..b1d2eb0 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_publishing_to_a_channel_alias.py @@ -0,0 +1,42 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForPublishingToAChannelAlias: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_channel_alias('my-channel') \ + .for_publishing_only() \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJIREJPRzdiOFRuV0ZoNVMrR0Y5Z1lWQkNrM1J4WlhXNWh6UUN0bk9raXZLNlY0K1AxcDVKcHJ2TTNIVElyTUFBclUxMkY5bkltNGRvRm5TWXVjSzloUT09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInJlcXVpcmVkVGFnXCI6XCJjaGFubmVsQWxpYXM6bXktY2hhbm5lbFwiLFwidHlwZVwiOlwicHVibGlzaFwifSJ9' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['type'] == 'publish' + assert value['requiredTag'] == 'channelAlias:my-channel' + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result diff --git a/python/tests/test_when_verifying_a_token_for_publishing_with_capabilities.py b/python/tests/test_when_verifying_a_token_for_publishing_with_capabilities.py new file mode 100644 index 0000000..eda2d97 --- /dev/null +++ b/python/tests/test_when_verifying_a_token_for_publishing_with_capabilities.py @@ -0,0 +1,46 @@ +import pytest + +from edgeauth.digest_tokens import DigestTokens +from edgeauth.token_builder import TokenBuilder + + +class TestWhenVerifyingATokenForPublishingWithCapabilities: + token = None + + @pytest.fixture(autouse=True) + def before_each(self): + self.token = TokenBuilder() \ + .with_application_id('my-application-id') \ + .with_secret('my-secret') \ + .expires_at(1000) \ + .for_publishing_only() \ + .with_capability('multi-bitrate') \ + .with_capability('streaming') \ + .build() + + def test_token_matches_expected_value(self): + assert self.token == 'DIGEST:eyJhcHBsaWNhdGlvbklkIjoibXktYXBwbGljYXRpb24taWQiLCJkaWdlc3QiOiJFKytBK3EwWGhGQ09LT011RnZqcnRIOVNyeHpwZ0Q1VVZYb1B6Q1VPaGNLU3pHTGRQZmsyRVYzVkZOOWRyM2tBVGZtSWRUeCtSTlFodjJ3aVJGbUM1Zz09IiwidG9rZW4iOiJ7XCJleHBpcmVzXCI6MTAwMCxcInR5cGVcIjpcInB1Ymxpc2hcIixcImNhcGFiaWxpdGllc1wiOltcIm11bHRpLWJpdHJhdGVcIixcInN0cmVhbWluZ1wiXX0ifQ==' + + def test_the_token_successfully_verifies_with_the_correct_secret(self): + result = DigestTokens() \ + .verify_and_decode('my-secret', self.token) + + assert result['verified'] == True + assert result['code'] == 'verified' + assert 'value' in result + + value = result['value'] + + assert value['type'] == 'publish' + + assert len(value['capabilities']) == 2 + assert 'multi-bitrate' in value['capabilities'] + assert 'streaming' in value['capabilities'] + + def test_the_token_fails_to_verify_with_a_bad_secret(self): + result = DigestTokens() \ + .verify_and_decode('bad-secret', self.token) + + assert result['verified'] == False + assert result['code'] == 'bad-digest' + assert 'value' not in result