diff --git a/README.md b/README.md index a5fc755..e631d80 100644 --- a/README.md +++ b/README.md @@ -54,17 +54,30 @@ query = stream.writeStream.format("console").start() ## Available Data Sources -| Data Source | Type | Description | Install | -|-------------|------|-------------|---------| -| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` | -| `github` | Batch | Read GitHub pull requests | Built-in | -| `googlesheets` | Batch | Read public Google Sheets | Built-in | +### Sources (Read) + +| Data Source | Type | Description | Dependency | +|-------------|------|-------------|------------| +| `fake` | Batch/Stream | Generate synthetic test data using Faker | `[faker]` | +| `github` | Batch | Read GitHub pull requests | None | +| `googlesheets` | Batch | Read public Google Sheets | None | | `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` | -| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in | -| `opensky` | Batch/Stream | Live flight tracking data | Built-in | +| `stock` | Batch | Fetch stock market data (Alpha Vantage) | None | +| `opensky` | Batch/Stream | Live flight tracking data | None | | `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` | | `arrow` | Batch | Read Apache Arrow files | `[arrow]` | +| `robinhood` | Batch | Read cryptocurrency market data from Robinhood API | `[robinhood]` | +| `jsonplaceholder` | Batch | Read JSON data for testing | None | +| `weather` | Batch | Read current weather data (OpenWeatherMap) | None | + +### Sinks (Write) + +| Data Source | Type | Description | Dependency | +|-------------|------|-------------|------------| | `lance` | Batch Write | Write Lance vector format | `[lance]` | +| `salesforce` | Stream Write | Write to Salesforce objects | `[salesforce]` | +| `meta_capi` | Batch/Stream Write | Write to Meta Conversions API | None | + 📚 **[See detailed examples for all data sources →](docs/data-sources-guide.md)** diff --git a/docs/data-sources-guide.md b/docs/data-sources-guide.md index 08113ca..36bc2f2 100644 --- a/docs/data-sources-guide.md +++ b/docs/data-sources-guide.md @@ -494,5 +494,4 @@ df = spark.read.format("fake") \ - Use partitioning for large datasets - Consider sampling: `df.sample(0.1)` - Increase Spark executor memory - -For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub. \ No newline at end of file +For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub. diff --git a/docs/datasources/meta_capi.md b/docs/datasources/meta_capi.md new file mode 100644 index 0000000..e2993e9 --- /dev/null +++ b/docs/datasources/meta_capi.md @@ -0,0 +1,7 @@ +# MetaCapiDataSource + +> Requires the `requests` library. You can install it manually: `pip install requests` +> or use `pip install pyspark-data-sources`. + +::: pyspark_datasources.meta_capi.MetaCapiDataSource + diff --git a/docs/index.md b/docs/index.md index ed53cd7..a75bf10 100644 --- a/docs/index.md +++ b/docs/index.md @@ -42,4 +42,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | | [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | | [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` | -| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | \ No newline at end of file +| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | +| [MetaCapiDataSource](./datasources/meta_capi.md) | `meta_capi` | Write event data to Meta Conversions API | None | \ No newline at end of file diff --git a/examples/meta_capi_example.py b/examples/meta_capi_example.py new file mode 100644 index 0000000..297ae17 --- /dev/null +++ b/examples/meta_capi_example.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Meta Conversions API (CAPI) Datasource Example + +This example demonstrates how to use the MetaCapiDataSource as a datasource +to write event data to Meta for ad optimization. + +Requirements: +- PySpark +- requests +- Valid Meta System User Access Token and Pixel ID + +Setup: + pip install pyspark requests + +Environment Variables: + export META_ACCESS_TOKEN="your-access-token" + export META_PIXEL_ID="your-pixel-id" +""" + +import os +import tempfile +import time +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp +from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType + +def check_credentials(): + """Check if Meta credentials are available""" + token = os.getenv("META_ACCESS_TOKEN") + pixel_id = os.getenv("META_PIXEL_ID") + + if not all([token, pixel_id]): + print("❌ Missing Meta credentials!") + print("Please set the following environment variables:") + print(" export META_ACCESS_TOKEN='your-access-token'") + print(" export META_PIXEL_ID='your-pixel-id'") + return False, None, None + + print(f"✅ Using Pixel ID: {pixel_id}") + return True, token, pixel_id + +def example_1_rate_source_to_capi(): + """Example 1: Stream simulated purchases to Meta CAPI""" + print("\n" + "=" * 60) + print("EXAMPLE 1: Simulated Purchases → Meta CAPI (Streaming)") + print("=" * 60) + + has_creds, token, pixel_id = check_credentials() + if not has_creds: + return + + spark = SparkSession.builder.appName("MetaCapiExample1").getOrCreate() + + try: + from pyspark_datasources.meta_capi import MetaCapiDataSource + spark.dataSource.register(MetaCapiDataSource) + print("✅ Meta CAPI datasource registered") + + # Create streaming data (simulating 1 purchase per second) + streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load() + + # Transform to CAPI format (Flat Mode) + # We simulate user data. In production, this comes from your tables. + events_df = streaming_df.select( + lit("Purchase").alias("event_name"), + col("timestamp").alias("event_time"), + lit("test@example.com").alias("email"), # Will be auto-hashed + lit("website").alias("action_source"), + (col("value") * 10.0 + 5.0).alias("value"), + lit("USD").alias("currency"), + lit("TEST12345").alias("test_event_code") # For testing in Events Manager + ) + + print("📊 Starting streaming write to Meta CAPI...") + print(" Check your Events Manager 'Test Events' tab!") + + # Write to Meta CAPI + query = ( + events_df.writeStream.format("meta_capi") + .option("access_token", token) + .option("pixel_id", pixel_id) + .option("test_event_code", "TEST12345") # Optional: direct test code option + .option("batch_size", "10") + .option("checkpointLocation", "/tmp/meta_capi_example1_checkpoint") + .trigger(processingTime="10 seconds") + .start() + ) + + # Run for 30 seconds then stop + time.sleep(30) + query.stop() + print("✅ Streaming stopped") + + except Exception as e: + print(f"❌ Error: {e}") + finally: + spark.stop() + +def example_2_batch_dataframe_to_capi(): + """Example 2: Batch write a static DataFrame to Meta CAPI""" + print("\n" + "=" * 60) + print("EXAMPLE 2: Static DataFrame → Meta CAPI (Batch)") + print("=" * 60) + + has_creds, token, pixel_id = check_credentials() + if not has_creds: + return + + spark = SparkSession.builder.appName("MetaCapiExample2").getOrCreate() + + try: + from pyspark_datasources.meta_capi import MetaCapiDataSource + spark.dataSource.register(MetaCapiDataSource) + print("✅ Meta CAPI datasource registered") + + # Create sample data + data = [ + ("Purchase", 1700000001, "user1@example.com", 120.50, "USD"), + ("Purchase", 1700000002, "user2@example.com", 85.00, "USD"), + ("AddToCart", 1700000003, "user3@example.com", 25.99, "USD"), + ] + + columns = ["event_name", "event_time", "email", "value", "currency"] + df = spark.createDataFrame(data, columns) + + # Add optional fields + df = df.withColumn("action_source", lit("website")) \ + .withColumn("test_event_code", lit("TEST12345")) + + print(f"📊 Writing {df.count()} records to Meta CAPI in batch mode...") + print(" Check your Events Manager 'Test Events' tab!") + + # Write to Meta CAPI (Batch) + df.write.format("meta_capi") \ + .option("access_token", token) \ + .option("pixel_id", pixel_id) \ + .option("test_event_code", "TEST12345") \ + .option("batch_size", "50") \ + .save() + + print("✅ Batch write completed") + + except Exception as e: + print(f"❌ Error: {e}") + finally: + spark.stop() + +def main(): + print("🚀 Meta CAPI Datasource Example") + example_1_rate_source_to_capi() + example_2_batch_dataframe_to_capi() + +if __name__ == "__main__": + main() diff --git a/mkdocs.yml b/mkdocs.yml index cdace9f..45a6217 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -37,6 +37,8 @@ nav: - datasources/lance.md - datasources/opensky.md - datasources/weather.md + - datasources/meta_capi.md + markdown_extensions: - pymdownx.highlight: diff --git a/pyspark_datasources/__init__.py b/pyspark_datasources/__init__.py index 75016e5..53ed7a3 100644 --- a/pyspark_datasources/__init__.py +++ b/pyspark_datasources/__init__.py @@ -10,3 +10,4 @@ from .simplejson import SimpleJsonDataSource from .stock import StockDataSource from .jsonplaceholder import JSONPlaceholderDataSource +from .meta_capi import MetaCapiDataSource diff --git a/pyspark_datasources/meta_capi.py b/pyspark_datasources/meta_capi.py new file mode 100644 index 0000000..da7054c --- /dev/null +++ b/pyspark_datasources/meta_capi.py @@ -0,0 +1,263 @@ +import hashlib +import json +import logging +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +import requests +from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, DataSourceWriter, WriterCommitMessage +from pyspark.sql.types import StructType + +logger = logging.getLogger(__name__) + +@dataclass +class MetaCapiCommitMessage(WriterCommitMessage): + batch_id: int + events_processed: int + events_succeeded: int + events_failed: int + +class MetaCapiDataSource(DataSource): + """ + A Meta Conversions API (CAPI) data source for PySpark. + + This data source enables writing data to Meta/Facebook via the Conversions API. + It supports both streaming (writeStream) and batch (write) execution. + + Name: `meta_capi` + + Parameters + ---------- + access_token : str + Meta System User Access Token (Required). + pixel_id : str + Meta Pixel ID / Dataset ID (Required). + api_version : str, optional + Graph API version, e.g., "v19.0" (Default: "v19.0"). + batch_size : str, optional + Number of events to send in one API call (Default: "1000"). + test_event_code : str, optional + Code for testing events in Events Manager. + + Schema Mapping + -------------- + The data source maps Spark DataFrame columns to CAPI fields. + + 1. **Structured Mode**: If a `user_data` (Struct) column exists, it is used directly. + 2. **Flat Mode**: If no `user_data` column exists, the writer looks for these columns: + - `email` -> `user_data.em` (auto-hashed if not SHA256) + - `phone` -> `user_data.ph` (auto-hashed if not SHA256) + - `client_ip_address` -> `user_data.client_ip_address` + - `client_user_agent` -> `user_data.client_user_agent` + - `event_name` -> `event_name` + - `event_time` -> `event_time` (Timestamp or Long) + - `event_id` -> `event_id` (deduplication key) + - `action_source` -> `action_source` (default: "website") + - `value` -> `custom_data.value` + - `currency` -> `custom_data.currency` + """ + + @classmethod + def name(cls) -> str: + return "meta_capi" + + def streamWriter(self, schema: StructType, overwrite: bool) -> "MetaCapiStreamWriter": + return MetaCapiStreamWriter(schema, self.options) + + def writer(self, schema: StructType, overwrite: bool) -> "MetaCapiBatchWriter": + return MetaCapiBatchWriter(schema, self.options) + +class _MetaCapiWriterCommon: + """Common logic for Meta CAPI writers.""" + def __init__(self, schema: StructType, options: Dict[str, str]): + self.schema = schema + self.options = options + + self.access_token = options.get("access_token") + self.pixel_id = options.get("pixel_id") + self.api_version = options.get("api_version", "v19.0") + self.batch_size = int(options.get("batch_size", "1000")) + self.test_event_code = options.get("test_event_code") + + if not self.access_token or not self.pixel_id: + raise ValueError("Meta CAPI requires 'access_token' and 'pixel_id' options.") + + self.api_url = f"https://graph.facebook.com/{self.api_version}/{self.pixel_id}/events" + + def write(self, iterator) -> MetaCapiCommitMessage: + from pyspark import TaskContext + context = TaskContext.get() + batch_id = context.taskAttemptId() if context else 0 + + events_buffer = [] + stats = {"processed": 0, "succeeded": 0, "failed": 0} + + for row in iterator: + try: + event = self._transform_row_to_event(row) + if event: + events_buffer.append(event) + stats["processed"] += 1 + + if len(events_buffer) >= self.batch_size: + self._send_batch(events_buffer, stats) + events_buffer = [] + except Exception as e: + logger.error(f"Error processing row: {e}") + # We count conversion failures as failed processing but don't stop the stream + pass + + if events_buffer: + self._send_batch(events_buffer, stats) + + return MetaCapiCommitMessage( + batch_id=batch_id, + events_processed=stats["processed"], + events_succeeded=stats["succeeded"], + events_failed=stats["failed"] + ) + + def _send_batch(self, events: List[Dict[str, Any]], stats: Dict[str, int]): + if not events: + return + + payload = { + "access_token": self.access_token, + "data": events + } + + if self.test_event_code: + payload["test_event_code"] = self.test_event_code + + try: + response = requests.post( + self.api_url, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=30 + ) + + if response.status_code == 200: + # CAPI returns { "events_received": N, "fbtrace_id": "..." } + stats["succeeded"] += len(events) + else: + logger.error(f"CAPI Batch Failed: {response.status_code} - {response.text}") + stats["failed"] += len(events) + + except Exception as e: + logger.error(f"Network error sending batch: {e}") + stats["failed"] += len(events) + + def _transform_row_to_event(self, row) -> Optional[Dict[str, Any]]: + # Helper to safely get field + def get_val(field): + return getattr(row, field, None) + + # 1. Base Event Data + event_name = get_val("event_name") + if not event_name: + # Skip rows without event_name + return None + + event_time = get_val("event_time") + # Handle TimestampType (datetime) or Long/Int (unix timestamp) + if hasattr(event_time, "timestamp"): + event_time = int(event_time.timestamp()) + elif event_time is None: + event_time = int(time.time()) + else: + event_time = int(event_time) + + event = { + "event_name": event_name, + "event_time": event_time, + "action_source": get_val("action_source") or "website" + } + + if get_val("event_id"): + event["event_id"] = str(get_val("event_id")) + + # 2. User Data + user_data = {} + row_user_data = get_val("user_data") + + if row_user_data: + # Trusted input: user provided a struct + # recursive conversion to dict + user_data = row_user_data.asDict(recursive=True) + else: + # Flat mode mapping + email = get_val("email") or get_val("em") + if email: + user_data["em"] = self._hash_if_needed(str(email).strip().lower()) + + phone = get_val("phone") or get_val("ph") + if phone: + user_data["ph"] = self._hash_if_needed(str(phone).strip()) + + ip = get_val("client_ip_address") or get_val("ip") + if ip: + user_data["client_ip_address"] = str(ip) + + agent = get_val("client_user_agent") or get_val("user_agent") + if agent: + user_data["client_user_agent"] = str(agent) + + # Add other common user_data fields if present + for field in ["fbc", "fbp", "external_id"]: + val = get_val(field) + if val: + user_data[field] = str(val) + + if user_data: + event["user_data"] = user_data + + # 3. Custom Data + custom_data = {} + row_custom_data = get_val("custom_data") + if row_custom_data: + custom_data = row_custom_data.asDict(recursive=True) + else: + # Flat mapping for value/currency + val = get_val("value") + if val is not None: + custom_data["value"] = val + + curr = get_val("currency") + if curr: + custom_data["currency"] = str(curr) + + if custom_data: + event["custom_data"] = custom_data + + return event + + def _hash_if_needed(self, val: str) -> str: + # Simple heuristic: SHA256 hex digest is 64 chars + if len(val) == 64 and all(c in "0123456789abcdef" for c in val): + return val + return hashlib.sha256(val.encode("utf-8")).hexdigest() + +class MetaCapiStreamWriter(_MetaCapiWriterCommon, DataSourceStreamWriter): + """Stream writer for Meta CAPI.""" + pass + +class MetaCapiBatchWriter(_MetaCapiWriterCommon, DataSourceWriter): + """Batch writer for Meta CAPI.""" + + def commit(self, messages: List[MetaCapiCommitMessage]) -> None: + """ + Handle job completion. + Since CAPI is stateless/API-based, we just log the summary. + """ + total_succeeded = sum(m.events_succeeded for m in messages) + total_failed = sum(m.events_failed for m in messages) + logger.info(f"Meta CAPI Batch Write Complete: {total_succeeded} sent, {total_failed} failed.") + + def abort(self, messages: List[MetaCapiCommitMessage]) -> None: + """ + Handle job failure. + Events sent to API cannot be rolled back. + """ + logger.warning("Meta CAPI Batch Write Aborted. Some events may have been sent.") diff --git a/tests/test_meta_capi.py b/tests/test_meta_capi.py new file mode 100644 index 0000000..1e87c5c --- /dev/null +++ b/tests/test_meta_capi.py @@ -0,0 +1,102 @@ +import pytest +from unittest.mock import patch, MagicMock +from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType +from pyspark_datasources.meta_capi import MetaCapiStreamWriter, MetaCapiBatchWriter, MetaCapiCommitMessage + +@pytest.fixture +def writers(): + schema = StructType([ + StructField("event_name", StringType()), + StructField("email", StringType()), + StructField("value", DoubleType()), + StructField("event_time", LongType()) + ]) + options = { + "access_token": "fake_token", + "pixel_id": "12345", + "batch_size": "2" + } + stream_writer = MetaCapiStreamWriter(schema, options) + batch_writer = MetaCapiBatchWriter(schema, options) + return {"stream": stream_writer, "batch": batch_writer} + +def test_transform_row_to_event(writers): + stream_writer = writers["stream"] + + # Mock a Row object + row = MagicMock() + row.event_name = "Purchase" + row.event_time = 1600000000 + row.email = "test@example.com" + row.value = 50.0 + # Missing fields return None + row.phone = None + row.user_data = None + row.custom_data = None + + # Test with stream writer (uses common logic) + event = stream_writer._transform_row_to_event(row) + + assert event["event_name"] == "Purchase" + assert event["event_time"] == 1600000000 + # Check hashing + expected_hash = "973dfe463ec85785f5f95af5ba3906eedb2d931c24e69824a89ea65dba4e813b" # sha256("test@example.com") + assert event["user_data"]["em"] == expected_hash + assert event["custom_data"]["value"] == 50.0 + +@patch("requests.post") +def test_send_batch(mock_post, writers): + batch_writer = writers["batch"] + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"events_received": 2} + mock_post.return_value = mock_response + + events = [{"event_name": "Test1"}, {"event_name": "Test2"}] + stats = {"succeeded": 0, "failed": 0} + + # Test with batch writer (uses common logic) + batch_writer._send_batch(events, stats) + + assert stats["succeeded"] == 2 + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + + assert "https://graph.facebook.com/v19.0/12345/events" in args[0] + assert kwargs["json"]["access_token"] == "fake_token" + assert len(kwargs["json"]["data"]) == 2 + +def test_batch_writer_commit(writers): + batch_writer = writers["batch"] + # Ensure commit doesn't raise error + messages = [ + MetaCapiCommitMessage(batch_id=1, events_processed=10, events_succeeded=9, events_failed=1), + MetaCapiCommitMessage(batch_id=2, events_processed=5, events_succeeded=5, events_failed=0) + ] + try: + batch_writer.commit(messages) + except Exception as e: + pytest.fail(f"commit() raised Exception: {e}") + +def test_transform_with_structs(writers): + stream_writer = writers["stream"] + # Test explicit structs for user_data + row = MagicMock() + row.event_name = "PageView" + row.event_time = 1234567890 + + # user_data struct mock + user_data_mock = MagicMock() + user_data_mock.asDict.return_value = {"em": "hashed_already"} + row.user_data = user_data_mock + + # custom_data struct mock + custom_data_mock = MagicMock() + custom_data_mock.asDict.return_value = {"status": "gold"} + row.custom_data = custom_data_mock + + event = stream_writer._transform_row_to_event(row) + + assert event["user_data"]["em"] == "hashed_already" + assert event["custom_data"]["status"] == "gold"