Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ We currently provide SDKs for the following platforms:
* [Node.js](./node)
* [PHP](./php)
* [DotNet](./dotnet)
* [Python](./python)
7 changes: 7 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.pytest_cache/*
tests/.pytest_cache/*

.eggs/*
*.egg-info/*
dist/*
build/*
50 changes: 50 additions & 0 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Phenix EdgeAuth Digest Tokens for Python

Easily generate secure digest tokens to use with the Phenix platform without requiring any networking activity.

## Installation

To install Phenix Edge Authorization Digest Token with pip:

```shell script
$ pip install phenix-edge-auth
```

## Testing

```shell script
$ pytest -vv
```

## Updating PyPi

```shell script
$ python setup.py bdist_whe
$ twine upload dist/*
```

## Example

```python
from edgeauth.token_builder import TokenBuilder

# Create a token to access a channel
token = TokenBuilder()
.with_application_id('my-application-id') \
.with_secret('my-secret') \
.expires_in_seconds(3600) \
.for_channel('us-northeast#my-application-id#my-channel.1345') \
.build()
```

## Command Line Examples

Display the help information:
```shell script
$ edgeauth_cli --help
```

Create a token for channel access:
```shell script
$ edgeauth_cli --application_id "my-application-id" --secret "my-secret" --expires_in_seconds 3600 --channel "us-northeast#my-application-id#my-channel.1345"
```
7 changes: 7 additions & 0 deletions python/edgeauth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Phenix EdgeAuth
"""

__version__ = '0.1.1'
__author__ = 'Brandon Drake'
__credits__ = 'Phenix Real Time Solutions, Inc.'
100 changes: 100 additions & 0 deletions python/edgeauth/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""CLI for EdgeAuth
"""

import argparse
import sys

from sys import version_info

from .token_builder import TokenBuilder

PY3_MIN_VERSION = (3, 6)

def main():
if version_info < PY3_MIN_VERSION:
raise Exception('Please use Python >= 3.6')

parser = argparse.ArgumentParser(description='EdgeAuth token generator')


parser.add_argument('-u', '--application_id', required=True, help='The application ID')
parser.add_argument('-w', '--secret', required=True, help='The application secret')

parser.add_argument('-l', '--expires_in_seconds', type=int, default=3600, help='Token life time in seconds')
parser.add_argument('-e', '--expires_at', help='Token expires at timestamp measured in milliseconds since UNIX epoch')

parser.add_argument('-b', '--capabilities', help='Comma separated list of capabilities, e.g. for publishing')

parser.add_argument('-z', '--session_id', help='Token is limited to the given session')
parser.add_argument('-x', '--remote_address', help='Token is limited to the given remote address')
parser.add_argument('-o', '--origin_stream_id', help='[STREAMING] Token is limited to the given origin stream')

parser.add_argument('-c', '--channel', help='[STREAMING] Token is limited to the given channel')
parser.add_argument('-i', '--channel_alias', help='[STREAMING] Token is limited to the given channel alias')

parser.add_argument('-m', '--room', help='[STREAMING] Token is limited to the given room')
parser.add_argument('-n', '--room_alias', help='[STREAMING] Token is limited to the given room alias')

parser.add_argument('-t', '--tag', help='[STREAMING] Token is limited to the given origin stream tag')
parser.add_argument('-r', '--apply_tag', help='[REPORTING] Apply tag to the new stream')

parser.add_argument('-a', '--authentication_only', action="store_true", help='Token can be used for authentication only')
parser.add_argument('-s', '--streaming_only', action="store_true", help='Token can be used for streaming only')
parser.add_argument('-p', '--publishing_only', action="store_true", help='Token can be used for publishing only')


args = parser.parse_args()

token = TokenBuilder() \
.with_application_id(args.application_id) \
.with_secret(args.secret)

if args.expires_at is not None:
token.expires_at(args.expires_at)
else:
token.expires_in_seconds(args.expires_in_seconds)

if args.authentication_only:
token.for_authenticate_only()

if args.streaming_only:
token.for_streaming_only()

if args.publishing_only:
token.for_publishing_only()

if args.capabilities:
for capability in args.capabilities.split(','):
token.with_capability(capability.strip())

if args.session_id:
token.for_session(args.session_id)

if args.remote_address:
token.for_remote_address(args.remote_address)

if args.origin_stream_id:
token.for_origin_stream(args.origin_stream_id)

if args.channel:
token.for_channel(args.channel)

if args.channel_alias:
token.for_channel_alias(args.channel_alias)

if args.room:
token.for_channel(args.room)

if args.room_alias:
token.for_channel_alias(args.room_alias)

if args.tag:
token.for_tag(args.tag)

if args.apply_tag:
token.apply_tag(args.apply_tag)

return token.build()

if __name__ == '__main__':
sys.exit(main())
172 changes: 172 additions & 0 deletions python/edgeauth/digest_tokens.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import hashlib
import hmac
import base64
import json
from io import StringIO


DIGEST_TOKEN_PREFIX = 'DIGEST:'
ENCODING = 'utf-8'


class BadToken(Exception):
pass


class BadDigest(Exception):
pass


class DigestTokens:
def is_digest_token(self, encoded_token):
"""Check if a value is a valid digest token.

Keyword arguments:
encodedToken -- an encoded token
"""
return encoded_token and isinstance(encoded_token, str) and encoded_token.startswith(DIGEST_TOKEN_PREFIX)

def verify_and_decode(self, secret, encoded_token):
"""Verifies and decodes a digest token.

Keywork arguments:
secret -- the shared secret used to sign the token
encoded_token -- the encoded token
"""
try:
if not isinstance(secret, str):
raise TypeError('Secret must be a string')

if not isinstance(encoded_token, str):
raise TypeError('Encoded token must be a string')

if not self.is_digest_token(encoded_token):
return {
'verified': False,
'code': 'not-a-digest-token',
}

encoded_digest_token = encoded_token[len(DIGEST_TOKEN_PREFIX):]
decoded_digest_token_as_string = base64 \
.b64decode(encoded_digest_token)

info = None

try:
info = json.loads(decoded_digest_token_as_string)
except Exception:
raise BadToken()

if not info['applicationId'] or not isinstance(info['applicationId'], str):
raise BadToken()

if not info['digest'] or not isinstance(info['digest'], str):
raise BadToken()

if not info['token'] or not isinstance(info['token'], str):
raise BadToken()

digest_as_string = self.calculate_digest(
info['applicationId'],
secret,
info['token']
)

digest = info['digest']

if not digest_as_string == digest:
raise BadDigest()

value = json.loads(info['token'])

value['applicationId'] = info['applicationId']

return {
'code': 'verified',
'value': value,
'verified': True,
}
except BadToken as ex:
return {
'code': 'bad-token',
# 'message': str(ex),
'verified': False,
}
except BadDigest:
return {
'code': 'bad-digest',
'verified': False,
}
except Exception as ex:
return {
'code': 'server-error',
'message': str(ex),
'verified': False,
}

def sign_and_encode(self, application_id, secret, token):
"""Signs and encodes a digest token.

Keyword arguments:
application_id -- the application ID used to sign the token
secret -- the shared secret used to sign the token
token -- the raw token object to sign
"""
if not isinstance(secret, str):
raise TypeError('Secret must be a string')

if not isinstance(token, dict):
raise TypeError('Token must be a dictionary')

if 'expires' not in token or not type(token['expires']) in [float, int]:
raise ValueError('Token must have an expiration (milliseconds since UNIX epoch)')

if 'application_id' in token:
raise ValueError('Token should not have an application_id property')

# io = StringIO()
# json.dump(token, io)
# token_as_string = io.getvalue()

token_as_string = json.dumps(token, separators=(',', ':'))

digest = self.calculate_digest(application_id, secret, token_as_string)

info = {
'applicationId': application_id,
'digest': digest,
'token': token_as_string,
}

decoded_digest_token_as_string = json.dumps(info, separators=(',', ':'))
encoded_digest_token = base64 \
.b64encode(decoded_digest_token_as_string.encode(ENCODING)) \
.decode(ENCODING)

return DIGEST_TOKEN_PREFIX + encoded_digest_token

def calculate_digest(self, application_id, secret, token):
"""Calculates the digest for a token.

Keyword arguments:
application_id -- the application ID used to sign the token
secret -- the shared secret used to sign the token
token -- encoded token
"""
if not isinstance(application_id, str):
raise TypeError('Application Id must be a string')

if not isinstance(secret, str):
raise TypeError('Secret must be a string')

if not isinstance(token, str):
raise TypeError('Token must be a string')

salt = (application_id + secret).encode(ENCODING)
verify = hmac.new(salt, digestmod=hashlib.sha512)

verify.update(token.encode(ENCODING))

digest = base64.b64encode(verify.digest()).decode(ENCODING)

return digest
Loading