diff --git a/tests/test_bidstream_client_e2e.py b/tests/test_bidstream_client_e2e.py
new file mode 100644
index 0000000..6ab4914
--- /dev/null
+++ b/tests/test_bidstream_client_e2e.py
@@ -0,0 +1,105 @@
+import os
+import unittest
+
+from uid2_client import BidstreamClient, Uid2PublisherClient, TokenGenerateInput, DecryptionStatus
+
+
+@unittest.skipIf(
+ os.getenv("UID2_BASE_URL") is None
+ or os.getenv("UID2_API_KEY") is None
+ or os.getenv("UID2_SECRET_KEY") is None,
+ "Environment variables UID2_BASE_URL, UID2_API_KEY, and UID2_SECRET_KEY must be set",
+)
+class BidstreamClientIntegrationTests(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL")
+ cls.UID2_API_KEY = os.getenv("UID2_API_KEY")
+ cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY")
+
+ if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY:
+ cls.bidstream_client = BidstreamClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ cls.publisher_client = Uid2PublisherClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ else:
+ raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first")
+
+ def test_bidstream_client_key_refresh(self):
+ refresh_response = self.bidstream_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ def test_bidstream_client_with_generated_token(self):
+ token_response = self.publisher_client.generate_token(
+ TokenGenerateInput.from_email("hopefully-not-opted-out@example.com").do_not_generate_tokens_for_opted_out()
+ )
+ identity = token_response.get_identity()
+
+ advertising_token = identity.get_advertising_token()
+ self.assertIsNotNone(advertising_token)
+
+ refresh_response = self.bidstream_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ decryption_response = self.bidstream_client.decrypt_token_into_raw_uid(
+ advertising_token, "example.com"
+ )
+
+ self.assertTrue(decryption_response.success)
+ self.assertIsNotNone(decryption_response.uid)
+ self.assertIsNotNone(decryption_response.established)
+ self.assertIsNotNone(decryption_response.site_id)
+
+ def test_bidstream_client_with_invalid_token(self):
+ refresh_response = self.bidstream_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ invalid_token = "invalid-token"
+ decryption_response = self.bidstream_client.decrypt_token_into_raw_uid(
+ invalid_token, "example.com"
+ )
+ self.assertFalse(decryption_response.success)
+
+ def test_bidstream_client_without_refresh(self):
+ token_response = self.publisher_client.generate_token(
+ TokenGenerateInput.from_email("hopefully-not-opted-out@example.com").do_not_generate_tokens_for_opted_out()
+ )
+ identity = token_response.get_identity()
+ advertising_token = identity.get_advertising_token()
+
+ fresh_client = BidstreamClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
+
+ decryption_response = fresh_client.decrypt_token_into_raw_uid(
+ advertising_token, "example.com"
+ )
+ self.assertFalse(decryption_response.success)
+
+ def test_bidstream_client_error_handling(self):
+ bad_client = BidstreamClient(self.UID2_BASE_URL, "bad-api-key", self.UID2_SECRET_KEY)
+ refresh_response = bad_client.refresh()
+ self.assertFalse(refresh_response.success)
+
+ bad_client = BidstreamClient(self.UID2_BASE_URL, self.UID2_API_KEY, "bad-secret-key")
+ refresh_response = bad_client.refresh()
+ self.assertFalse(refresh_response.success)
+
+ def test_bidstream_client_phone_token_decryption(self):
+ token_response = self.publisher_client.generate_token(
+ TokenGenerateInput.from_phone("+12345678901").do_not_generate_tokens_for_opted_out()
+ )
+ self.assertFalse(token_response.is_optout())
+
+ identity = token_response.get_identity()
+ advertising_token = identity.get_advertising_token()
+
+ refresh_response = self.bidstream_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ decryption_response = self.bidstream_client.decrypt_token_into_raw_uid(
+ advertising_token, "example.com"
+ )
+
+ self.assertTrue(decryption_response.success)
+ self.assertIsNotNone(decryption_response.uid)
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/tests/test_identity_map_client.py b/tests/test_identity_map_client.py
index 9cdd850..1f34cce 100644
--- a/tests/test_identity_map_client.py
+++ b/tests/test_identity_map_client.py
@@ -6,7 +6,6 @@
from uid2_client import IdentityMapClient, IdentityMapInput, normalize_and_hash_email, normalize_and_hash_phone
-
@unittest.skipIf(
os.getenv("UID2_BASE_URL") == None
or os.getenv("UID2_API_KEY") == None
diff --git a/tests/test_identity_map_client_unit_tests.py b/tests/test_identity_map_client_unit_tests.py
index 7820969..09179b5 100644
--- a/tests/test_identity_map_client_unit_tests.py
+++ b/tests/test_identity_map_client_unit_tests.py
@@ -1,10 +1,9 @@
import base64
-import json
import unittest
import datetime as dt
from unittest.mock import patch, MagicMock
-from uid2_client import IdentityMapClient, get_datetime_utc_iso_format
+from uid2_client import IdentityMapClient, get_datetime_utc_iso_format, Uid2Response, Envelope
class IdentityMapUnitTests(unittest.TestCase):
@@ -34,10 +33,10 @@ def test_get_datetime_utc_iso_format_timestamp(self):
iso_format_timestamp = get_datetime_utc_iso_format(timestamp)
self.assertEqual(expected_timestamp, iso_format_timestamp)
- @patch('uid2_client.identity_map_client.make_v2_request')
- @patch('uid2_client.identity_map_client.post')
- @patch('uid2_client.identity_map_client.parse_v2_response')
- def test_identity_buckets_request(self, mock_parse_v2_response, mock_post, mock_make_v2_request):
+ @patch('uid2_client.identity_map_client.create_envelope')
+ @patch('uid2_client.identity_map_client.make_request')
+ @patch('uid2_client.identity_map_client.parse_response')
+ def test_identity_buckets_request(self, mock_parse_response, mock_make_request, mock_create_envelope):
expected_req = b'{"since_timestamp": "2024-07-02T14:30:15.123456"}'
test_cases = ["2024-07-02T14:30:15.123456+00:00", "2024-07-02 09:30:15.123456-05:00",
"2024-07-02T08:30:15.123456-06:00", "2024-07-02T10:30:15.123456-04:00",
@@ -45,12 +44,11 @@ def test_identity_buckets_request(self, mock_parse_v2_response, mock_post, mock_
"2024-07-03T00:30:15.123456+10:00", "2024-07-02T20:00:15.123456+05:30"]
mock_req = b'mocked_request_data'
mock_nonce = 'mocked_nonce'
- mock_make_v2_request.return_value = (mock_req, mock_nonce)
- mock_response = MagicMock()
- mock_response.read.return_value = b'{"mocked": "response"}'
- mock_post.return_value = mock_response
- mock_parse_v2_response.return_value = b'{"body":[],"status":"success"}'
+ mock_create_envelope.return_value = Envelope(mock_req, mock_nonce)
+ mock_response = '{"mocked": "response"}'
+ mock_make_request.return_value = Uid2Response.from_string(mock_response)
+ mock_parse_response.return_value = b'{"body":[],"status":"success"}'
for timestamp in test_cases:
self.identity_map_client.get_identity_buckets(dt.datetime.fromisoformat(timestamp))
- called_args, called_kwargs = mock_make_v2_request.call_args
+ called_args, called_kwargs = mock_create_envelope.call_args
self.assertEqual(expected_req, called_args[2])
diff --git a/tests/test_identity_map_v3_client.py b/tests/test_identity_map_v3_client.py
new file mode 100644
index 0000000..bec0ead
--- /dev/null
+++ b/tests/test_identity_map_v3_client.py
@@ -0,0 +1,234 @@
+import os
+import unittest
+
+from datetime import datetime, timedelta, timezone
+from urllib.error import URLError, HTTPError
+
+from uid2_client import IdentityMapV3Client, IdentityMapV3Input, IdentityMapV3Response, normalize_and_hash_email, normalize_and_hash_phone
+from uid2_client.unmapped_identity_reason import UnmappedIdentityReason
+
+@unittest.skipIf(
+ os.getenv("UID2_BASE_URL") == None
+ or os.getenv("UID2_API_KEY") == None
+ or os.getenv("UID2_SECRET_KEY") == None,
+ reason="Environment variables UID2_BASE_URL, UID2_API_KEY, and UID2_SECRET_KEY must be set",
+)
+class IdentityMapV3IntegrationTests(unittest.TestCase):
+ UID2_BASE_URL = None
+ UID2_API_KEY = None
+ UID2_SECRET_KEY = None
+
+ identity_map_client = None
+
+ @classmethod
+ def setUpClass(cls):
+ cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL")
+ cls.UID2_API_KEY = os.getenv("UID2_API_KEY")
+ cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY")
+
+ if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY:
+ cls.identity_map_client = IdentityMapV3Client(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ else:
+ raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first")
+
+ def test_identity_map_emails(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_mapped(response, "hopefully-not-opted-out@example.com")
+ self.assert_mapped(response, "somethingelse@example.com")
+
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "optout@example.com")
+
+ def test_identity_map_nothing_unmapped(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["hopefully-not-opted-out@example.com", "somethingelse@example.com"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_mapped(response, "hopefully-not-opted-out@example.com")
+ self.assert_mapped(response, "somethingelse@example.com")
+
+ def test_identity_map_nothing_mapped(self):
+ identity_map_input = IdentityMapV3Input.from_emails(["optout@example.com"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "optout@example.com")
+
+ def test_identity_map_invalid_email(self):
+ self.assertRaises(ValueError, IdentityMapV3Input.from_emails,
+ ["email@example.com", "this is not an email"])
+
+ def test_identity_map_invalid_phone(self):
+ self.assertRaises(ValueError, IdentityMapV3Input.from_phones,
+ ["+12345678901", "this is not a phone number"])
+
+ def test_identity_map_invalid_hashed_email(self):
+ identity_map_input = IdentityMapV3Input.from_hashed_emails(["this is not a hashed email"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_unmapped(response, UnmappedIdentityReason.INVALID_IDENTIFIER, "this is not a hashed email")
+
+ def test_identity_map_invalid_hashed_phone(self):
+ identity_map_input = IdentityMapV3Input.from_hashed_phones(["this is not a hashed phone"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_unmapped(response, UnmappedIdentityReason.INVALID_IDENTIFIER, "this is not a hashed phone")
+
+ def test_identity_map_hashed_emails(self):
+ hashed_email1 = normalize_and_hash_email("hopefully-not-opted-out@example.com")
+ hashed_email2 = normalize_and_hash_email("somethingelse@example.com")
+ hashed_opted_out_email = normalize_and_hash_email("optout@example.com")
+ identity_map_input = IdentityMapV3Input.from_hashed_emails([hashed_email1, hashed_email2, hashed_opted_out_email])
+
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+
+ self.assert_mapped(response, hashed_email1)
+ self.assert_mapped(response, hashed_email2)
+
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_email)
+
+ def test_identity_map_duplicate_emails(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["JANE.SAOIRSE@gmail.com", "Jane.Saoirse@gmail.com", "JaneSaoirse+UID2@gmail.com", "janesaoirse@gmail.com",
+ "JANE.SAOIRSE@gmail.com"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+
+ mapped_identities = response.mapped_identities
+ self.assertEqual(4, len(mapped_identities))
+
+ raw_uid = mapped_identities.get("JANE.SAOIRSE@gmail.com").current_raw_uid
+ self.assertEqual(raw_uid, mapped_identities.get("Jane.Saoirse@gmail.com").current_raw_uid)
+ self.assertEqual(raw_uid, mapped_identities.get("JaneSaoirse+UID2@gmail.com").current_raw_uid)
+ self.assertEqual(raw_uid, mapped_identities.get("janesaoirse@gmail.com").current_raw_uid)
+
+ def test_identity_map_duplicate_hashed_emails(self):
+ hashed_email = normalize_and_hash_email("hopefully-not-opted-out@example.com")
+ duplicate_hashed_email = hashed_email
+ hashed_opted_out_email = normalize_and_hash_email("optout@example.com")
+ duplicate_hashed_opted_out_email = hashed_opted_out_email
+
+ identity_map_input = IdentityMapV3Input.from_hashed_emails(
+ [hashed_email, duplicate_hashed_email, hashed_opted_out_email, duplicate_hashed_opted_out_email])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+
+ self.assert_mapped(response, hashed_email)
+ self.assert_mapped(response, duplicate_hashed_email)
+
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_email)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, duplicate_hashed_opted_out_email)
+
+ def test_identity_map_empty_input(self):
+ identity_map_input = IdentityMapV3Input.from_emails([])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assertTrue(len(response.mapped_identities) == 0)
+ self.assertTrue(len(response.unmapped_identities) == 0)
+
+ def test_identity_map_phones(self):
+ identity_map_input = IdentityMapV3Input.from_phones(["+12345678901", "+98765432109", "+00000000000"])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_mapped(response, "+12345678901")
+ self.assert_mapped(response, "+98765432109")
+
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, "+00000000000")
+
+ def test_identity_map_hashed_phones(self):
+ hashed_phone1 = normalize_and_hash_phone("+12345678901")
+ hashed_phone2 = normalize_and_hash_phone("+98765432109")
+ hashed_opted_out_phone = normalize_and_hash_phone("+00000000000")
+ identity_map_input = IdentityMapV3Input.from_hashed_phones([hashed_phone1, hashed_phone2, hashed_opted_out_phone])
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+ self.assert_mapped(response, hashed_phone1)
+ self.assert_mapped(response, hashed_phone2)
+
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, hashed_opted_out_phone)
+
+ def test_identity_map_all_identity_types_in_one_request(self):
+ mapped_email = "hopefully-not-opted-out@example.com"
+ optout_email = "optout@example.com"
+ mapped_phone = "+12345678901"
+ optout_phone = "+00000000000"
+
+ mapped_email_hash = normalize_and_hash_email("mapped-email@example.com")
+ optout_email_hash = normalize_and_hash_email(optout_email)
+ mapped_phone_hash = normalize_and_hash_phone(mapped_phone)
+ optout_phone_hash = normalize_and_hash_phone(optout_phone)
+
+ identity_map_input = (IdentityMapV3Input.from_emails([mapped_email, optout_email])
+ .with_hashed_emails([mapped_email_hash, optout_email_hash])
+ .with_phones([mapped_phone, optout_phone])
+ .with_hashed_phones([mapped_phone_hash, optout_phone_hash]))
+
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+
+ # Test mapped identities
+ self.assert_mapped(response, mapped_email)
+ self.assert_mapped(response, mapped_email_hash)
+ self.assert_mapped(response, mapped_phone)
+ self.assert_mapped(response, mapped_phone_hash)
+
+ # Test unmapped identities
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email_hash)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone_hash)
+
+ def test_identity_map_all_identity_types_added_one_by_one(self):
+ mapped_email = "hopefully-not-opted-out@example.com"
+ optout_phone = "+00000000000"
+ mapped_phone_hash = normalize_and_hash_phone("+12345678901")
+ optout_email_hash = normalize_and_hash_email("optout@example.com")
+
+ identity_map_input = IdentityMapV3Input()
+ identity_map_input.with_email(mapped_email)
+ identity_map_input.with_phone(optout_phone)
+ identity_map_input.with_hashed_phone(mapped_phone_hash)
+ identity_map_input.with_hashed_email(optout_email_hash)
+
+ response = self.identity_map_client.generate_identity_map(identity_map_input)
+
+ # Test mapped identities
+ self.assert_mapped(response, mapped_email)
+ self.assert_mapped(response, mapped_phone_hash)
+
+ # Test unmapped identities
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_phone)
+ self.assert_unmapped(response, UnmappedIdentityReason.OPTOUT, optout_email_hash)
+
+ def test_identity_map_client_bad_url(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"])
+ client = IdentityMapV3Client("https://operator-bad-url.uidapi.com", os.getenv("UID2_API_KEY"), os.getenv("UID2_SECRET_KEY"))
+ self.assertRaises(URLError, client.generate_identity_map, identity_map_input)
+
+ def test_identity_map_client_bad_api_key(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"])
+ client = IdentityMapV3Client(os.getenv("UID2_BASE_URL"), "bad-api-key", os.getenv("UID2_SECRET_KEY"))
+ self.assertRaises(HTTPError, client.generate_identity_map, identity_map_input)
+
+ def test_identity_map_client_bad_secret(self):
+ identity_map_input = IdentityMapV3Input.from_emails(
+ ["hopefully-not-opted-out@example.com", "somethingelse@example.com", "optout@example.com"])
+
+ client = IdentityMapV3Client(os.getenv("UID2_BASE_URL"), os.getenv("UID2_API_KEY"), "wJ0hP19QU4hmpB64Y3fV2dAed8t/mupw3sjN5jNRFzg=")
+ self.assertRaises(HTTPError, client.generate_identity_map, identity_map_input)
+
+ def assert_mapped(self, response: IdentityMapV3Response, dii):
+ mapped_identity = response.mapped_identities.get(dii)
+ self.assertIsNotNone(mapped_identity)
+ self.assertIsNotNone(mapped_identity.current_raw_uid)
+
+ # Refresh from should be now or in the future, allow some slack for time between request and this assertion
+ one_minute_ago = datetime.now(timezone.utc) - timedelta(seconds=60)
+ self.assertTrue(mapped_identity.refresh_from > one_minute_ago)
+
+ unmapped_identity = response.unmapped_identities.get(dii)
+ self.assertIsNone(unmapped_identity)
+
+ def assert_unmapped(self, response, reason, dii):
+ unmapped_identity = response.unmapped_identities.get(dii)
+ self.assertEqual(reason, unmapped_identity.reason)
+
+ mapped_identity = response.mapped_identities.get(dii)
+ self.assertIsNone(mapped_identity)
+
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tests/test_identity_map_v3_response.py b/tests/test_identity_map_v3_response.py
new file mode 100644
index 0000000..28d65b5
--- /dev/null
+++ b/tests/test_identity_map_v3_response.py
@@ -0,0 +1,161 @@
+import unittest
+from datetime import datetime
+from uid2_client import IdentityMapV3Input, IdentityMapV3Response
+from uid2_client.unmapped_identity_reason import UnmappedIdentityReason
+
+
+class IdentityMapV3ResponseTest(unittest.TestCase):
+ SOME_EMAIL = "email1@example.com"
+
+ def test_mapped_identity(self):
+ email1 = "email1@example.com"
+ email2 = "email2@example.com"
+ phone1 = "+1234567890"
+ phone2 = "+0987654321"
+ hashed_email1 = "email 1 hash"
+ hashed_email2 = "email 2 hash"
+ hashed_phone1 = "phone 1 hash"
+ hashed_phone2 = "phone 2 hash"
+
+ email1_refresh_from = datetime.fromisoformat('2025-01-01T00:00:01+00:00')
+ email2_refresh_from = datetime.fromisoformat('2025-06-30T00:00:20+00:00')
+ phone1_refresh_from = datetime.fromisoformat('2025-01-01T00:05:00+00:00')
+ phone2_refresh_from = datetime.fromisoformat('2025-06-30T00:00:22+00:00')
+ hashed_email1_refresh_from = datetime.fromisoformat('2025-01-01T00:00:33+00:00')
+ hashed_email2_refresh_from = datetime.fromisoformat('2025-06-30T00:00:00+00:00')
+ hashed_phone1_refresh_from = datetime.fromisoformat('2025-01-01T00:00:11+00:00')
+ hashed_phone2_refresh_from = datetime.fromisoformat('2025-06-30T00:00:01+00:00')
+
+ email_hash_entries = [
+ self._mapped_response_payload_entry("email 1 current uid", "email 1 previous uid", email1_refresh_from),
+ self._mapped_response_payload_entry("email 2 current uid", "email 2 previous uid", email2_refresh_from),
+ self._mapped_response_payload_entry("hashed email 1 current uid", "hashed email 1 previous uid", hashed_email1_refresh_from),
+ self._mapped_response_payload_entry("hashed email 2 current uid", "hashed email 2 previous uid", hashed_email2_refresh_from)
+ ]
+
+ phone_hash_entries = [
+ self._mapped_response_payload_entry("phone 1 current uid", "phone 1 previous uid", phone1_refresh_from),
+ self._mapped_response_payload_entry("phone 2 current uid", "phone 2 previous uid", phone2_refresh_from),
+ self._mapped_response_payload_entry("hashed phone 1 current uid", "hashed phone 1 previous uid", hashed_phone1_refresh_from),
+ self._mapped_response_payload_entry("hashed phone 2 current uid", "hashed phone 2 previous uid", hashed_phone2_refresh_from)
+ ]
+
+ response_payload = self._mapped_response_payload(email_hash_entries, phone_hash_entries)
+
+ input_obj = (IdentityMapV3Input()
+ .with_emails([email1, email2])
+ .with_hashed_emails([hashed_email1, hashed_email2])
+ .with_phones([phone1, phone2])
+ .with_hashed_phones([hashed_phone1, hashed_phone2]))
+
+ response = IdentityMapV3Response(response_payload, input_obj)
+
+ self.assertTrue(response.is_success())
+ self.assertEqual(8, len(response.mapped_identities))
+ self.assertEqual(0, len(response.unmapped_identities))
+
+ # Email
+ raw_email_mapping1 = response.mapped_identities.get(email1)
+ self.assertEqual("email 1 current uid", raw_email_mapping1.current_raw_uid)
+ self.assertEqual("email 1 previous uid", raw_email_mapping1.previous_raw_uid)
+ self.assertEqual(email1_refresh_from, raw_email_mapping1.refresh_from)
+
+ raw_email_mapping2 = response.mapped_identities.get(email2)
+ self.assertEqual("email 2 current uid", raw_email_mapping2.current_raw_uid)
+ self.assertEqual("email 2 previous uid", raw_email_mapping2.previous_raw_uid)
+ self.assertEqual(email2_refresh_from, raw_email_mapping2.refresh_from)
+
+ # Phone
+ raw_phone_mapping1 = response.mapped_identities.get(phone1)
+ self.assertEqual("phone 1 current uid", raw_phone_mapping1.current_raw_uid)
+ self.assertEqual("phone 1 previous uid", raw_phone_mapping1.previous_raw_uid)
+ self.assertEqual(phone1_refresh_from, raw_phone_mapping1.refresh_from)
+
+ raw_phone_mapping2 = response.mapped_identities.get(phone2)
+ self.assertEqual("phone 2 current uid", raw_phone_mapping2.current_raw_uid)
+ self.assertEqual("phone 2 previous uid", raw_phone_mapping2.previous_raw_uid)
+ self.assertEqual(phone2_refresh_from, raw_phone_mapping2.refresh_from)
+
+ # Hashed Email
+ hashed_email_mapping1 = response.mapped_identities.get(hashed_email1)
+ self.assertEqual("hashed email 1 current uid", hashed_email_mapping1.current_raw_uid)
+ self.assertEqual("hashed email 1 previous uid", hashed_email_mapping1.previous_raw_uid)
+ self.assertEqual(hashed_email1_refresh_from, hashed_email_mapping1.refresh_from)
+
+ hashed_email_mapping2 = response.mapped_identities.get(hashed_email2)
+
+ self.assertEqual("hashed email 2 current uid", hashed_email_mapping2.current_raw_uid)
+ self.assertEqual("hashed email 2 previous uid", hashed_email_mapping2.previous_raw_uid)
+ self.assertEqual(hashed_email2_refresh_from, hashed_email_mapping2.refresh_from)
+
+ # Hashed Phone
+ hashed_phone_mapping1 = response.mapped_identities.get(hashed_phone1)
+ self.assertEqual("hashed phone 1 current uid", hashed_phone_mapping1.current_raw_uid)
+ self.assertEqual("hashed phone 1 previous uid", hashed_phone_mapping1.previous_raw_uid)
+ self.assertEqual(hashed_phone1_refresh_from, hashed_phone_mapping1.refresh_from)
+
+ hashed_phone_mapping2 = response.mapped_identities.get(hashed_phone2)
+ self.assertEqual("hashed phone 2 current uid", hashed_phone_mapping2.current_raw_uid)
+ self.assertEqual("hashed phone 2 previous uid", hashed_phone_mapping2.previous_raw_uid)
+ self.assertEqual(hashed_phone2_refresh_from, hashed_phone_mapping2.refresh_from)
+
+ def test_unmapped_identity_reason_unknown(self):
+ input_obj = IdentityMapV3Input.from_emails([self.SOME_EMAIL])
+
+ response = IdentityMapV3Response(self._unmapped_response_payload("some new unmapped reason"), input_obj)
+ self.assertTrue(response.is_success())
+
+ unmapped_identity = response.unmapped_identities.get(self.SOME_EMAIL)
+ self.assertEqual(UnmappedIdentityReason.UNKNOWN, unmapped_identity.reason)
+ self.assertEqual("some new unmapped reason", unmapped_identity.raw_reason)
+
+ def test_unmapped_identity_reason_optout(self):
+ input_obj = IdentityMapV3Input.from_emails([self.SOME_EMAIL])
+
+ response = IdentityMapV3Response(self._unmapped_response_payload("optout"), input_obj)
+ self.assertTrue(response.is_success())
+
+ unmapped_identity = response.unmapped_identities.get(self.SOME_EMAIL)
+ self.assertEqual(UnmappedIdentityReason.OPTOUT, unmapped_identity.reason)
+ self.assertEqual("optout", unmapped_identity.raw_reason)
+
+ def test_unmapped_identity_reason_invalid(self):
+ input_obj = IdentityMapV3Input.from_emails([self.SOME_EMAIL])
+
+ response = IdentityMapV3Response(self._unmapped_response_payload("invalid identifier"), input_obj)
+ self.assertTrue(response.is_success())
+
+ unmapped_identity = response.unmapped_identities.get(self.SOME_EMAIL)
+ self.assertEqual(UnmappedIdentityReason.INVALID_IDENTIFIER, unmapped_identity.reason)
+ self.assertEqual("invalid identifier", unmapped_identity.raw_reason)
+
+ def test_response_status_not_success(self):
+ input_obj = IdentityMapV3Input.from_emails([self.SOME_EMAIL])
+
+ failure_response_payload = '{"status":"error","body":{}}'
+
+ with self.assertRaises(ValueError) as context:
+ IdentityMapV3Response(failure_response_payload, input_obj)
+
+ self.assertEqual("Got unexpected identity map status: error", str(context.exception))
+
+ @staticmethod
+ def _unmapped_response_payload(reason: str) -> str:
+ return f'{{"status":"success","body":{{"email_hash":[{{"e":"{reason}"}}]}}}}'
+
+ @staticmethod
+ def _mapped_response_payload(email_hash_entries: list, phone_hash_entries: list) -> str:
+ email_entries_str = ",".join(email_hash_entries)
+ phone_entries_str = ",".join(phone_hash_entries)
+ return (f'{{"status":"success","body":{{'
+ f'"email_hash":[{email_entries_str}],'
+ f'"phone_hash":[{phone_entries_str}]'
+ f'}}}}')
+
+ @staticmethod
+ def _mapped_response_payload_entry(current_uid: str, previous_uid: str, refresh_from: datetime) -> str:
+ return f'{{"u":"{current_uid}","p":"{previous_uid}","r":{refresh_from.timestamp()}}}'
+
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/tests/test_publisher_client.py b/tests/test_publisher_client.py
index 6b06f1b..903891d 100644
--- a/tests/test_publisher_client.py
+++ b/tests/test_publisher_client.py
@@ -7,7 +7,6 @@
from uid2_client.identity_tokens import IdentityTokens
from urllib.request import HTTPError
-
@unittest.skipIf(
os.getenv("EUID_BASE_URL") == None
or os.getenv("EUID_API_KEY") == None
@@ -98,7 +97,8 @@ def setUpClass(cls):
def test_integration_generate_and_refresh(self):
token_generate_response = self.publisher_client.generate_token(
- TokenGenerateInput.from_email("test@example.com"))
+ TokenGenerateInput.from_email("hopefully-not-opted-out@example.com").do_not_generate_tokens_for_opted_out()
+ )
self.assertFalse(token_generate_response.is_optout())
@@ -146,9 +146,9 @@ def test_integration_optout(self):
# this test requires these env vars to be configured: UID2_BASE_URL, UID2_API_KEY, UID2_SECRET_KEY
def test_integration_phone(self):
-
token_generate_response = self.publisher_client.generate_token(
- TokenGenerateInput.from_phone("+12345678901"))
+ TokenGenerateInput.from_phone("+12345678901").do_not_generate_tokens_for_opted_out()
+ )
self.assertFalse(token_generate_response.is_optout())
identity = token_generate_response.get_identity()
diff --git a/tests/test_refresh_keys_util.py b/tests/test_refresh_keys_util.py
index b6273db..0ca0059 100644
--- a/tests/test_refresh_keys_util.py
+++ b/tests/test_refresh_keys_util.py
@@ -1,20 +1,13 @@
import json
import unittest
-from unittest.mock import patch
+from unittest.mock import patch, MagicMock
-from uid2_client import refresh_keys_util
+from uid2_client import refresh_keys_util, Uid2Response
from test_utils import *
from uid2_client.encryption import _encrypt_gcm, _decrypt_gcm
class TestRefreshKeysUtil(unittest.TestCase):
- class MockPostResponse:
- def __init__(self, return_value):
- self.return_value = return_value
-
- def read(self):
- return base64.b64encode(self.return_value)
-
def _make_post_response(self, request_data, response_payload):
d = base64.b64decode(request_data)[1:]
d = _decrypt_gcm(d, client_secret_bytes)
@@ -23,13 +16,13 @@ def _make_post_response(self, request_data, response_payload):
payload = int.to_bytes(int(now.timestamp() * 1000), 8, 'big')
payload += nonce
payload += response_payload
- envelope = _encrypt_gcm(payload, None, client_secret_bytes)
-
- return self.MockPostResponse(envelope)
+ encrypted_payload = _encrypt_gcm(payload, None, client_secret_bytes)
+ encrypted_string = base64.b64encode(encrypted_payload)
+ return Uid2Response.from_string(encrypted_string)
- def _get_post_refresh_keys_response(self, base_url, path, headers, data):
+ def _get_post_refresh_keys_response(self, base_url, path, headers, envelope):
response_payload = key_set_to_json_for_sharing([master_key, site_key]).encode()
- return self._make_post_response(data, response_payload)
+ return self._make_post_response(envelope.envelope, response_payload)
def _validate_master_and_site_key(self, keys):
self.assertEqual(len(keys.values()), 2)
@@ -55,23 +48,23 @@ def _validate_master_and_site_key(self, keys):
self.assertEqual(master_secret, master.secret)
self.assertEqual(1, master.keyset_id)
- @patch('uid2_client.refresh_keys_util.post')
- def test_refresh_sharing_keys(self, mock_post):
- mock_post.side_effect = self._get_post_refresh_keys_response
+ @patch('uid2_client.refresh_keys_util.make_request')
+ def test_refresh_sharing_keys(self, mock_make_request):
+ mock_make_request.side_effect = self._get_post_refresh_keys_response
refresh_response = refresh_keys_util.refresh_sharing_keys("base_url", "auth_key", base64.b64decode(client_secret))
self.assertTrue(refresh_response.success)
self._validate_master_and_site_key(refresh_response.keys)
- mock_post.assert_called_once()
- self.assertEqual(mock_post.call_args[0], ('base_url', '/v2/key/sharing'))
+ mock_make_request.assert_called_once()
+ self.assertEqual(mock_make_request.call_args[0], ('base_url', '/v2/key/sharing'))
- @patch('uid2_client.refresh_keys_util.post')
- def test_refresh_bidstream_keys(self, mock_post):
- mock_post.side_effect = self._get_post_refresh_keys_response
+ @patch('uid2_client.refresh_keys_util.make_request')
+ def test_refresh_bidstream_keys(self, mock_make_request):
+ mock_make_request.side_effect = self._get_post_refresh_keys_response
refresh_response = refresh_keys_util.refresh_bidstream_keys("base_url", "auth_key", base64.b64decode(client_secret))
self.assertTrue(refresh_response.success)
self._validate_master_and_site_key(refresh_response.keys)
- mock_post.assert_called_once()
- self.assertEqual(mock_post.call_args[0], ('base_url', '/v2/key/bidstream'))
+ mock_make_request.assert_called_once()
+ self.assertEqual(mock_make_request.call_args[0], ('base_url', '/v2/key/bidstream'))
def test_parse_keys_json_identity(self):
response_body_str = key_set_to_json_for_sharing([master_key, site_key])
diff --git a/tests/test_sharing_client.py b/tests/test_sharing_client.py
index 38d42ab..a9e2d3b 100644
--- a/tests/test_sharing_client.py
+++ b/tests/test_sharing_client.py
@@ -111,7 +111,7 @@ def test_token_lifetime_too_long_for_sharing_but_remaining_lifetime_allowed(self
self._test_bidstream_client.assert_fails(result, expected_version, expected_scope)
def test_token_lifetime_too_long_for_sharing(self): # TokenLifetimeTooLongForSharing
- expires_in_sec = now + dt.timedelta(days=30) + dt.timedelta(minutes=1)
+ expires_in_sec = dt.datetime.now(tz=timezone.utc) + dt.timedelta(days=30) + dt.timedelta(minutes=1)
for expected_scope, expected_version in test_cases_all_scopes_all_versions:
with self.subTest(expected_scope=expected_scope, expected_version=expected_version):
token = generate_uid_token(expected_scope, expected_version, expires_at=expires_in_sec)
diff --git a/tests/test_sharing_client_e2e.py b/tests/test_sharing_client_e2e.py
new file mode 100644
index 0000000..777c527
--- /dev/null
+++ b/tests/test_sharing_client_e2e.py
@@ -0,0 +1,104 @@
+import os
+import unittest
+from urllib.error import HTTPError
+
+from uid2_client import SharingClient, Uid2PublisherClient, TokenGenerateInput, EncryptionStatus, BidstreamClient
+
+
+@unittest.skipIf(
+ os.getenv("UID2_BASE_URL") is None
+ or os.getenv("UID2_API_KEY") is None
+ or os.getenv("UID2_SECRET_KEY") is None,
+ "Environment variables UID2_BASE_URL, UID2_API_KEY, and UID2_SECRET_KEY must be set",
+)
+class SharingClientIntegrationTests(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.UID2_BASE_URL = os.getenv("UID2_BASE_URL")
+ cls.UID2_API_KEY = os.getenv("UID2_API_KEY")
+ cls.UID2_SECRET_KEY = os.getenv("UID2_SECRET_KEY")
+
+ if cls.UID2_BASE_URL and cls.UID2_API_KEY and cls.UID2_SECRET_KEY:
+ cls.sharing_client = SharingClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ cls.publisher_client = Uid2PublisherClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ cls.bidstream_client = BidstreamClient(cls.UID2_BASE_URL, cls.UID2_API_KEY, cls.UID2_SECRET_KEY)
+ else:
+ raise Exception("set the required UID2_BASE_URL/UID2_API_KEY/UID2_SECRET_KEY environment variables first")
+
+ def test_sharing_client_key_refresh(self):
+ refresh_response = self.sharing_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ def test_sharing_client_encrypt_decrypt_raw_uid(self):
+ # Get raw uid
+ token_response = self.publisher_client.generate_token(
+ TokenGenerateInput.from_email("hopefully-not-opted-out@example.com").do_not_generate_tokens_for_opted_out()
+ )
+ identity = token_response.get_identity()
+
+ self.bidstream_client.refresh()
+ decrypted_token = self.bidstream_client.decrypt_token_into_raw_uid(identity.get_advertising_token(), "example.com")
+ self.assertTrue(decrypted_token.success)
+ raw_uid = decrypted_token.uid
+
+ # Refresh keys first
+ refresh_response = self.sharing_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ # Encrypt the raw UID
+ encryption_response = self.sharing_client.encrypt_raw_uid_into_token(raw_uid)
+ self.assertTrue(encryption_response.success)
+ self.assertIsNotNone(encryption_response.encrypted_data)
+
+ # Now decrypt the encrypted token
+ decryption_response = self.sharing_client.decrypt_token_into_raw_uid(
+ encryption_response.encrypted_data
+ )
+ self.assertTrue(decryption_response.success)
+ self.assertEqual(decryption_response.uid, raw_uid)
+
+ def test_sharing_client_encrypt_with_invalid_raw_uid(self):
+ refresh_response = self.sharing_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ invalid_raw_uid = "invalid_raw_uid"
+ encryption_response = self.sharing_client.encrypt_raw_uid_into_token(invalid_raw_uid)
+
+ self.assertFalse(encryption_response.success)
+
+ def test_sharing_client_decrypt_with_invalid_token(self):
+ refresh_response = self.sharing_client.refresh()
+ self.assertTrue(refresh_response.success)
+
+ invalid_token = "invalid-token"
+ decryption_response = self.sharing_client.decrypt_token_into_raw_uid(invalid_token)
+
+ self.assertFalse(decryption_response.success)
+
+ def test_sharing_client_without_refresh(self):
+ fresh_client = SharingClient(self.UID2_BASE_URL, self.UID2_API_KEY, self.UID2_SECRET_KEY)
+
+ token_response = self.publisher_client.generate_token(
+ TokenGenerateInput.from_email("hopefully-not-opted-out@example.com").do_not_generate_tokens_for_opted_out()
+ )
+ identity = token_response.get_identity()
+ self.bidstream_client.refresh()
+ decrypted_token = self.bidstream_client.decrypt_token_into_raw_uid(identity.get_advertising_token(), "example.com")
+ self.assertTrue(decrypted_token.success)
+ raw_uid = decrypted_token.uid
+
+ encryption_response = fresh_client.encrypt_raw_uid_into_token(raw_uid)
+
+ self.assertFalse(encryption_response.success)
+
+ def test_sharing_client_error_handling(self):
+ bad_client = SharingClient(self.UID2_BASE_URL, "bad-api-key", self.UID2_SECRET_KEY)
+ refresh_response = bad_client.refresh()
+ self.assertFalse(refresh_response.success)
+
+ bad_client = SharingClient(self.UID2_BASE_URL, self.UID2_API_KEY, "bad-secret-key")
+ refresh_response = bad_client.refresh()
+ self.assertFalse(refresh_response.success)
+
+if __name__ == '__main__':
+ unittest.main()
\ No newline at end of file
diff --git a/uid2_client/__init__.py b/uid2_client/__init__.py
index 608a265..2a145ee 100644
--- a/uid2_client/__init__.py
+++ b/uid2_client/__init__.py
@@ -27,3 +27,8 @@
from .identity_map_client import *
from .identity_map_input import *
from .identity_map_response import *
+from .identity_map_v3_client import *
+from .identity_map_v3_input import *
+from .identity_map_v3_response import *
+from .unmapped_identity_reason import *
+from .uid2_response import *
diff --git a/uid2_client/encryption_data_response.py b/uid2_client/encryption_data_response.py
index 53c3a48..ac11700 100644
--- a/uid2_client/encryption_data_response.py
+++ b/uid2_client/encryption_data_response.py
@@ -15,6 +15,10 @@ def make_success(encrypted_data):
def make_error(encryption_status):
return EncryptionDataResponse(encryption_status, None)
+ @property
+ def success(self):
+ return self._encryption_status == EncryptionStatus.SUCCESS
+
@property
def encrypted_data(self):
return self._encrypted_data
diff --git a/uid2_client/envelope.py b/uid2_client/envelope.py
new file mode 100644
index 0000000..87b623d
--- /dev/null
+++ b/uid2_client/envelope.py
@@ -0,0 +1,23 @@
+import base64
+
+
+class Envelope:
+ def __init__(self, envelope, nonce):
+ self._binary_envelope = envelope
+ self._nonce = nonce
+
+ @property
+ def envelope(self):
+ """
+ Returns an encrypted request envelope which can be used in the POST body of a UID2 endpoint.
+ See Encrypted Request Envelope
+ """
+ return base64.b64encode(self._binary_envelope)
+
+ @property
+ def nonce(self):
+ return self._nonce
+
+ @property
+ def binary_envelope(self):
+ return self._binary_envelope
diff --git a/uid2_client/identity_map_client.py b/uid2_client/identity_map_client.py
index d050456..2ec42ab 100644
--- a/uid2_client/identity_map_client.py
+++ b/uid2_client/identity_map_client.py
@@ -1,12 +1,11 @@
-import base64
-import datetime as dt
import json
+import datetime as dt
from datetime import timezone
-from .identity_buckets_response import IdentityBucketsResponse
+from .request_response_util import *
+from .input_util import get_datetime_utc_iso_format
from .identity_map_response import IdentityMapResponse
-
-from uid2_client import auth_headers, make_v2_request, post, parse_v2_response, get_datetime_utc_iso_format
+from .identity_buckets_response import IdentityBucketsResponse
class IdentityMapClient:
@@ -35,15 +34,15 @@ def __init__(self, base_url, api_key, client_secret):
self._client_secret = base64.b64decode(client_secret)
def generate_identity_map(self, identity_map_input):
- req, nonce = make_v2_request(self._client_secret, dt.datetime.now(tz=timezone.utc),
+ envelope = create_envelope(self._client_secret, dt.datetime.now(tz=timezone.utc),
identity_map_input.get_identity_map_input_as_json_string().encode())
- resp = post(self._base_url, '/v2/identity/map', headers=auth_headers(self._api_key), data=req)
- resp_body = parse_v2_response(self._client_secret, resp.read(), nonce)
+ resp = make_request(self._base_url, '/v2/identity/map', headers=auth_headers(self._api_key), envelope=envelope)
+ resp_body = parse_response(self._client_secret, resp, envelope.nonce)
return IdentityMapResponse(resp_body, identity_map_input)
def get_identity_buckets(self, since_timestamp):
- req, nonce = make_v2_request(self._client_secret, dt.datetime.now(tz=timezone.utc),
+ envelope = create_envelope(self._client_secret, dt.datetime.now(tz=timezone.utc),
json.dumps({"since_timestamp": get_datetime_utc_iso_format(since_timestamp)}).encode())
- resp = post(self._base_url, '/v2/identity/buckets', headers=auth_headers(self._api_key), data=req)
- resp_body = parse_v2_response(self._client_secret, resp.read(), nonce)
+ resp = make_request(self._base_url, '/v2/identity/buckets', headers=auth_headers(self._api_key), envelope=envelope)
+ resp_body = parse_response(self._client_secret, resp, envelope.nonce)
return IdentityBucketsResponse(resp_body)
diff --git a/uid2_client/identity_map_v3_client.py b/uid2_client/identity_map_v3_client.py
new file mode 100644
index 0000000..e955e10
--- /dev/null
+++ b/uid2_client/identity_map_v3_client.py
@@ -0,0 +1,42 @@
+import datetime as dt
+from datetime import timezone
+
+from .request_response_util import *
+from .identity_map_v3_input import IdentityMapV3Input
+from .identity_map_v3_response import IdentityMapV3Response
+
+
+class IdentityMapV3Client:
+ """Client for interacting with UID2 Identity Map v3 services
+
+ You will need to have the base URL of the endpoint and a client API key
+ and secret to consume web services.
+
+ Methods:
+ generate_identity_map: Generate identity map
+ """
+
+ def __init__(self, base_url: str, api_key: str, client_secret: str):
+ """Create a new IdentityMapV3Client.
+
+ Args:
+ base_url (str): base URL for all requests to UID services (e.g. 'https://prod.uidapi.com')
+ api_key (str): api key for consuming the UID services
+ client_secret (str): client secret for consuming the UID services
+
+ Note:
+ Your authorization key will determine which UID2 services you are allowed to use.
+ """
+ self._base_url = base_url
+ self._api_key = api_key
+ self._client_secret = base64.b64decode(client_secret)
+
+ def generate_identity_map(self, identity_map_input: IdentityMapV3Input) -> IdentityMapV3Response:
+ envelope = create_envelope(
+ self._client_secret,
+ dt.datetime.now(tz=timezone.utc),
+ identity_map_input.get_identity_map_input_as_json_string().encode()
+ )
+ uid2_response = make_binary_request(self._base_url, '/v3/identity/map', headers=auth_headers(self._api_key), envelope=envelope)
+ decrypted_response = parse_response(self._client_secret, uid2_response, envelope.nonce)
+ return IdentityMapV3Response(decrypted_response, identity_map_input)
diff --git a/uid2_client/identity_map_v3_input.py b/uid2_client/identity_map_v3_input.py
new file mode 100644
index 0000000..5fbf5ef
--- /dev/null
+++ b/uid2_client/identity_map_v3_input.py
@@ -0,0 +1,95 @@
+import json
+from typing import List, Dict, Literal
+
+from uid2_client import normalize_and_hash_email, normalize_and_hash_phone
+
+
+class IdentityMapV3Input:
+ """Input for IdentityMapV3Client, representing emails and/or phone numbers to be mapped"""
+
+ def __init__(self):
+ self._hashed_dii_to_raw_diis: Dict[str, List[str]] = {}
+ self._hashed_emails: List[str] = []
+ self._hashed_phones: List[str] = []
+
+ @staticmethod
+ def from_emails(emails: List[str]) -> 'IdentityMapV3Input':
+ return IdentityMapV3Input().with_emails(emails)
+
+ @staticmethod
+ def from_phones(phones: List[str]) -> 'IdentityMapV3Input':
+ return IdentityMapV3Input().with_phones(phones)
+
+ @staticmethod
+ def from_hashed_emails(hashed_emails: List[str]) -> 'IdentityMapV3Input':
+ return IdentityMapV3Input().with_hashed_emails(hashed_emails)
+
+ @staticmethod
+ def from_hashed_phones(hashed_phones: List[str]) -> 'IdentityMapV3Input':
+ return IdentityMapV3Input().with_hashed_phones(hashed_phones)
+
+ def with_emails(self, emails: List[str]) -> 'IdentityMapV3Input':
+ for email in emails:
+ self.with_email(email)
+ return self
+
+ def with_email(self, email: str) -> 'IdentityMapV3Input':
+ hashed_email = normalize_and_hash_email(email)
+ self._hashed_emails.append(hashed_email)
+ self._add_to_dii_mappings(hashed_email, email)
+ return self
+
+ def with_phones(self, phones: List[str]) -> 'IdentityMapV3Input':
+ for phone in phones:
+ self.with_phone(phone)
+ return self
+
+ def with_phone(self, phone: str) -> 'IdentityMapV3Input':
+ hashed_phone = normalize_and_hash_phone(phone)
+ self._hashed_phones.append(hashed_phone)
+ self._add_to_dii_mappings(hashed_phone, phone)
+ return self
+
+ def with_hashed_emails(self, hashed_emails: List[str]) -> 'IdentityMapV3Input':
+ for hashed_email in hashed_emails:
+ self.with_hashed_email(hashed_email)
+ return self
+
+ def with_hashed_email(self, hashed_email: str) -> 'IdentityMapV3Input':
+ self._hashed_emails.append(hashed_email)
+ self._add_to_dii_mappings(hashed_email, hashed_email)
+ return self
+
+ def with_hashed_phones(self, hashed_phones: List[str]) -> 'IdentityMapV3Input':
+ for hashed_phone in hashed_phones:
+ self.with_hashed_phone(hashed_phone)
+ return self
+
+ def with_hashed_phone(self, hashed_phone: str) -> 'IdentityMapV3Input':
+ self._hashed_phones.append(hashed_phone)
+ self._add_to_dii_mappings(hashed_phone, hashed_phone)
+ return self
+
+ def get_input_diis(self, identity_type: Literal['email_hash', 'phone_hash'], index: int) -> List[str]:
+ hashed_dii = self._get_hashed_dii(identity_type, index)
+ return self._hashed_dii_to_raw_diis.get(hashed_dii, [])
+
+ def _add_to_dii_mappings(self, hashed_dii: str, raw_dii: str) -> None:
+ if hashed_dii not in self._hashed_dii_to_raw_diis:
+ self._hashed_dii_to_raw_diis[hashed_dii] = []
+ self._hashed_dii_to_raw_diis[hashed_dii].append(raw_dii)
+
+ def _get_hashed_dii(self, identity_type: Literal['email_hash', 'phone_hash'], index: int) -> str:
+ if identity_type == "email_hash":
+ return self._hashed_emails[index]
+ elif identity_type == "phone_hash":
+ return self._hashed_phones[index]
+ else:
+ raise ValueError(f"Unexpected identity type: {identity_type}")
+
+ def get_identity_map_input_as_json_string(self) -> str:
+ json_object = {
+ "email_hash": self._hashed_emails,
+ "phone_hash": self._hashed_phones
+ }
+ return json.dumps({k: v for k, v in json_object.items() if v is not None})
diff --git a/uid2_client/identity_map_v3_response.py b/uid2_client/identity_map_v3_response.py
new file mode 100644
index 0000000..0c507d8
--- /dev/null
+++ b/uid2_client/identity_map_v3_response.py
@@ -0,0 +1,134 @@
+import json
+from datetime import datetime, timezone
+from typing import Dict, List, Optional, Any, Literal
+
+from .identity_map_v3_input import IdentityMapV3Input
+from .unmapped_identity_reason import UnmappedIdentityReason
+
+
+class IdentityMapV3Response:
+ def __init__(self, response: str, identity_map_input: IdentityMapV3Input):
+ self._mapped_identities: Dict[str, MappedIdentity] = {}
+ self._unmapped_identities: Dict[str, UnmappedIdentity] = {}
+
+ response_json = json.loads(response)
+ api_response = ApiResponse.from_json(response_json)
+ self._status = api_response.status
+
+ if not self.is_success():
+ raise ValueError("Got unexpected identity map status: " + self._status)
+
+ self._populate_identities(api_response.body, identity_map_input)
+
+ def _populate_identities(self, api_response: Dict[Literal['email_hash', 'phone_hash'], List['ApiIdentity']], identity_map_input: IdentityMapV3Input) -> None:
+ for identity_type, identities in api_response.items():
+ self._populate_identities_for_type(identity_map_input, identity_type, identities)
+
+ def _populate_identities_for_type(self, identity_map_input: IdentityMapV3Input, identity_type: Literal['email_hash', 'phone_hash'], identities: List['ApiIdentity']) -> None:
+ for i, api_identity in enumerate(identities):
+ input_diis = identity_map_input.get_input_diis(identity_type, i)
+
+ for input_dii in input_diis:
+ if api_identity.error is None:
+ self._mapped_identities[input_dii] = MappedIdentity.from_api_identity(api_identity)
+ else:
+ self._unmapped_identities[input_dii] = UnmappedIdentity(api_identity.error)
+
+ def is_success(self) -> bool:
+ return self._status == "success"
+
+ @property
+ def mapped_identities(self) -> Dict[str, 'MappedIdentity']:
+ return self._mapped_identities.copy()
+
+ @property
+ def unmapped_identities(self) -> Dict[str, 'UnmappedIdentity']:
+ return self._unmapped_identities.copy()
+
+ @property
+ def status(self) -> str:
+ return self._status
+
+
+class ApiResponse:
+ def __init__(self, status: str, body: Dict[Literal['email_hash', 'phone_hash'], List['ApiIdentity']]):
+ self.status = status
+ self.body = body
+
+ @classmethod
+ def from_json(cls, data) -> 'ApiResponse':
+ if not set(data['body'].keys()).issubset(['email', 'phone', 'email_hash', 'phone_hash']):
+ raise ValueError("api response body does not contain correct keys")
+
+ api_body: Dict[Literal['email_hash', 'phone_hash'], List['ApiIdentity']] = {
+ 'email_hash': [ApiIdentity.from_json(item) for item in data['body']['email_hash']] if data['body'].get('email_hash') else [],
+ 'phone_hash': [ApiIdentity.from_json(item) for item in data['body']['phone_hash']] if data['body'].get('phone_hash') else [],
+ }
+ return cls(
+ status=data['status'],
+ body=api_body
+ )
+
+
+class ApiIdentity:
+ def __init__(self, current_uid: Optional[str], previous_uid: Optional[str],
+ refresh_from_seconds: Optional[int], error: Optional[str]):
+ self.current_uid = current_uid
+ self.previous_uid = previous_uid
+ self.refresh_from_seconds = refresh_from_seconds
+ self.error = error
+
+ @classmethod
+ def from_json(cls, data: Dict[str, Any]) -> 'ApiIdentity':
+ mapped_identity = data.keys().__contains__("u") and data.keys().__contains__("p") and data.keys().__contains__("r")
+ unmapped_identity = data.keys().__contains__("e")
+ if not mapped_identity and not unmapped_identity:
+ raise ValueError("api identity does not contain the correct keys")
+
+ return cls(
+ current_uid=data.get("u"),
+ previous_uid=data.get("p"),
+ refresh_from_seconds=data.get("r"),
+ error=data.get("e")
+ )
+
+
+class MappedIdentity:
+ def __init__(self, current_uid: str, previous_uid: Optional[str], refresh_from_seconds: datetime):
+ self._current_uid = current_uid
+ self._previous_uid = previous_uid
+ self._refresh_from = refresh_from_seconds
+
+ @classmethod
+ def from_api_identity(cls, api_identity: ApiIdentity) -> 'MappedIdentity':
+ if api_identity.current_uid is None or api_identity.refresh_from_seconds is None:
+ raise ValueError("Mapped identity cannot be created from API identity with missing current_uid or refresh_from_seconds")
+ return cls(api_identity.current_uid,
+ api_identity.previous_uid,
+ datetime.fromtimestamp(api_identity.refresh_from_seconds, tz=timezone.utc))
+
+ @property
+ def current_raw_uid(self) -> str:
+ return self._current_uid
+
+ @property
+ def previous_raw_uid(self) -> Optional[str]:
+ return self._previous_uid
+
+ @property
+ def refresh_from(self) -> datetime:
+ return self._refresh_from
+
+
+class UnmappedIdentity:
+ def __init__(self, reason: str):
+ self._reason = UnmappedIdentityReason.from_string(reason)
+ self._raw_reason = reason
+
+ @property
+ def reason(self) -> UnmappedIdentityReason:
+ return self._reason
+
+ @property
+ def raw_reason(self) -> str:
+ return self._raw_reason
diff --git a/uid2_client/publisher_client.py b/uid2_client/publisher_client.py
index 80c4496..b4be3fc 100644
--- a/uid2_client/publisher_client.py
+++ b/uid2_client/publisher_client.py
@@ -47,10 +47,10 @@ def __init__(self, base_url, auth_key, secret_key):
self._secret_key = base64.b64decode(secret_key)
def generate_token(self, token_generate_input):
- req, nonce = make_v2_request(self._secret_key, dt.datetime.now(tz=timezone.utc),
+ envelope = create_envelope(self._secret_key, dt.datetime.now(tz=timezone.utc),
token_generate_input.get_as_json_string().encode())
- resp = post(self._base_url, '/v2/token/generate', headers=auth_headers(self._auth_key), data=req)
- resp_body = parse_v2_response(self._secret_key, resp.read(), nonce)
+ uid2_response = make_request(self._base_url, '/v2/token/generate', headers=auth_headers(self._auth_key), envelope=envelope)
+ resp_body = parse_response(self._secret_key, uid2_response, envelope.nonce)
return TokenGenerateResponse(resp_body)
def refresh_token(self, current_identity):
diff --git a/uid2_client/refresh_keys_util.py b/uid2_client/refresh_keys_util.py
index 791d89b..7e5854e 100644
--- a/uid2_client/refresh_keys_util.py
+++ b/uid2_client/refresh_keys_util.py
@@ -36,9 +36,10 @@ def _parse_keys_json(resp_body):
def _fetch_keys(base_url, path, auth_key, secret_key):
try:
- req, nonce = make_v2_request(secret_key, dt.datetime.now(tz=timezone.utc))
- resp = post(base_url, path, headers=auth_headers(auth_key), data=req)
- resp_body = json.loads(parse_v2_response(secret_key, resp.read(), nonce)).get('body')
+ envelope = create_envelope(secret_key, dt.datetime.now(tz=timezone.utc))
+ uid2_response = make_request(base_url, path, headers=auth_headers(auth_key), envelope=envelope)
+ decrypted_response = parse_response(secret_key, uid2_response, envelope.nonce)
+ resp_body = json.loads(decrypted_response).get('body')
keys = _parse_keys_json(resp_body)
return RefreshResponse.make_success(keys)
except Exception as exc:
diff --git a/uid2_client/request_response_util.py b/uid2_client/request_response_util.py
index 0caddc3..94fe2d3 100644
--- a/uid2_client/request_response_util.py
+++ b/uid2_client/request_response_util.py
@@ -1,17 +1,21 @@
import base64
import os
+from datetime import datetime
+from typing import Dict, Optional
from urllib import request
import pkg_resources
from uid2_client.encryption import _encrypt_gcm, _decrypt_gcm
+from .envelope import Envelope
+from .uid2_response import Uid2Response
+BINARY = 'application/octet-stream'
-def _make_url(base_url, path):
+def _make_url(base_url: str, path: str) -> str:
return base_url + path
-
-def auth_headers(auth_key):
+def auth_headers(auth_key: str) -> Dict[str, str]:
try:
version = pkg_resources.get_distribution("uid2_client").version
except Exception:
@@ -20,8 +24,7 @@ def auth_headers(auth_key):
return {'Authorization': 'Bearer ' + auth_key,
"X-UID2-Client-Version": "uid2-client-python-" + version}
-
-def make_v2_request(secret_key, now, data=None):
+def create_envelope(secret_key: bytes, now: datetime, data: Optional[bytes] = None) -> 'Envelope':
payload = int.to_bytes(int(now.timestamp() * 1000), 8, 'big')
nonce = os.urandom(8)
payload += nonce
@@ -30,17 +33,36 @@ def make_v2_request(secret_key, now, data=None):
envelope = int.to_bytes(1, 1, 'big')
envelope += _encrypt_gcm(payload, None, secret_key)
+ return Envelope(envelope, nonce)
+
+def make_request(base_url: str, path: str, headers: Dict[str, str], envelope: Envelope) -> 'Uid2Response':
+ resp = post(base_url, path, headers, envelope.envelope)
+ return Uid2Response.from_string(resp.read())
- return base64.b64encode(envelope), nonce
+def make_binary_request(base_url: str, path: str, headers: Dict[str, str], envelope: Envelope) -> 'Uid2Response':
+ headers['Content-Type'] = BINARY
+ resp = post(base_url, path, headers, envelope.binary_envelope)
+ return Uid2Response.from_bytes(resp.read())
+
+def post(base_url: str, path: str, headers: Dict[str, str], data: bytes):
+ req = request.Request(_make_url(base_url, path), headers=headers, method='POST', data=data)
+ return request.urlopen(req)
+def parse_response(secret_key: bytes, uid2_response: Uid2Response, nonce: bytes) -> str:
+ if uid2_response.is_binary():
+ as_bytes = uid2_response.as_bytes
+ if as_bytes is None:
+ raise ValueError("Binary response has no bytes data")
+ return _decrypt_payload(secret_key, as_bytes, nonce)
+ else:
+ as_string = uid2_response.as_string
+ if as_string is None:
+ raise ValueError("String response has no string data")
+ encrypted_string = base64.b64decode(as_string)
+ return _decrypt_payload(secret_key, encrypted_string, nonce)
-def parse_v2_response(secret_key, encrypted, nonce):
- payload = _decrypt_gcm(base64.b64decode(encrypted), secret_key)
+def _decrypt_payload(secret_key: bytes, encrypted: bytes, nonce: bytes) -> str:
+ payload = _decrypt_gcm(encrypted, secret_key)
if nonce != payload[8:16]:
raise ValueError("nonce mismatch")
return payload[16:]
-
-
-def post(base_url, path, headers, data):
- req = request.Request(_make_url(base_url, path), headers=headers, method='POST', data=data)
- return request.urlopen(req)
diff --git a/uid2_client/uid2_response.py b/uid2_client/uid2_response.py
new file mode 100644
index 0000000..61e9314
--- /dev/null
+++ b/uid2_client/uid2_response.py
@@ -0,0 +1,26 @@
+from typing import Optional
+
+
+class Uid2Response:
+ def __init__(self, as_string: Optional[str], as_bytes: Optional[bytes]):
+ self._as_string = as_string
+ self._as_bytes = as_bytes
+
+ @classmethod
+ def from_string(cls, as_string: str) -> 'Uid2Response':
+ return cls(as_string, None)
+
+ @classmethod
+ def from_bytes(cls, as_bytes: bytes) -> 'Uid2Response':
+ return cls(None, as_bytes)
+
+ @property
+ def as_string(self) -> Optional[str]:
+ return self._as_string
+
+ @property
+ def as_bytes(self) -> Optional[bytes]:
+ return self._as_bytes
+
+ def is_binary(self) -> bool:
+ return self._as_bytes is not None
\ No newline at end of file
diff --git a/uid2_client/unmapped_identity_reason.py b/uid2_client/unmapped_identity_reason.py
new file mode 100644
index 0000000..46d6c3a
--- /dev/null
+++ b/uid2_client/unmapped_identity_reason.py
@@ -0,0 +1,14 @@
+from enum import Enum
+
+
+class UnmappedIdentityReason(Enum):
+ OPTOUT = "optout"
+ INVALID_IDENTIFIER = "invalid identifier"
+ UNKNOWN = "unknown"
+
+ @classmethod
+ def from_string(cls, reason_str: str) -> 'UnmappedIdentityReason':
+ try:
+ return cls(reason_str)
+ except ValueError:
+ return cls.UNKNOWN
\ No newline at end of file