Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,30 @@ query = stream.writeStream.format("console").start()

## Available Data Sources

| Data Source | Type | Description | Install |
|-------------|------|-------------|---------|
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `pip install pyspark-data-sources[faker]` |
| `github` | Batch | Read GitHub pull requests | Built-in |
| `googlesheets` | Batch | Read public Google Sheets | Built-in |
### Sources (Read)

| Data Source | Type | Description | Dependency |
|-------------|------|-------------|------------|
| `fake` | Batch/Stream | Generate synthetic test data using Faker | `[faker]` |
| `github` | Batch | Read GitHub pull requests | None |
| `googlesheets` | Batch | Read public Google Sheets | None |
| `huggingface` | Batch | Load Hugging Face datasets | `[huggingface]` |
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | Built-in |
| `opensky` | Batch/Stream | Live flight tracking data | Built-in |
| `stock` | Batch | Fetch stock market data (Alpha Vantage) | None |
| `opensky` | Batch/Stream | Live flight tracking data | None |
| `kaggle` | Batch | Load Kaggle datasets | `[kaggle]` |
| `arrow` | Batch | Read Apache Arrow files | `[arrow]` |
| `robinhood` | Batch | Read cryptocurrency market data from Robinhood API | `[robinhood]` |
| `jsonplaceholder` | Batch | Read JSON data for testing | None |
| `weather` | Batch | Read current weather data (OpenWeatherMap) | None |

### Sinks (Write)

| Data Source | Type | Description | Dependency |
|-------------|------|-------------|------------|
| `lance` | Batch Write | Write Lance vector format | `[lance]` |
| `salesforce` | Stream Write | Write to Salesforce objects | `[salesforce]` |
| `meta_capi` | Batch/Stream Write | Write to Meta Conversions API | None |


📚 **[See detailed examples for all data sources →](docs/data-sources-guide.md)**

Expand Down
3 changes: 1 addition & 2 deletions docs/data-sources-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -494,5 +494,4 @@ df = spark.read.format("fake") \
- Use partitioning for large datasets
- Consider sampling: `df.sample(0.1)`
- Increase Spark executor memory

For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub.
For more help, see the [Development Guide](../contributing/DEVELOPMENT.md) or open an issue on GitHub.
7 changes: 7 additions & 0 deletions docs/datasources/meta_capi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# MetaCapiDataSource

> Requires the `requests` library. You can install it manually: `pip install requests`
> or use `pip install pyspark-data-sources`.

::: pyspark_datasources.meta_capi.MetaCapiDataSource

3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None |
| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
| [MetaCapiDataSource](./datasources/meta_capi.md) | `meta_capi` | Write event data to Meta Conversions API | None |
156 changes: 156 additions & 0 deletions examples/meta_capi_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Meta Conversions API (CAPI) Datasource Example

This example demonstrates how to use the MetaCapiDataSource as a datasource
to write event data to Meta for ad optimization.

Requirements:
- PySpark
- requests
- Valid Meta System User Access Token and Pixel ID

Setup:
pip install pyspark requests

Environment Variables:
export META_ACCESS_TOKEN="your-access-token"
export META_PIXEL_ID="your-pixel-id"
"""

import os
import tempfile
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

def check_credentials():
"""Check if Meta credentials are available"""
token = os.getenv("META_ACCESS_TOKEN")
pixel_id = os.getenv("META_PIXEL_ID")

if not all([token, pixel_id]):
print("❌ Missing Meta credentials!")
print("Please set the following environment variables:")
print(" export META_ACCESS_TOKEN='your-access-token'")
print(" export META_PIXEL_ID='your-pixel-id'")
return False, None, None

print(f"✅ Using Pixel ID: {pixel_id}")
return True, token, pixel_id

def example_1_rate_source_to_capi():
"""Example 1: Stream simulated purchases to Meta CAPI"""
print("\n" + "=" * 60)
print("EXAMPLE 1: Simulated Purchases → Meta CAPI (Streaming)")
print("=" * 60)

has_creds, token, pixel_id = check_credentials()
if not has_creds:
return

spark = SparkSession.builder.appName("MetaCapiExample1").getOrCreate()

try:
from pyspark_datasources.meta_capi import MetaCapiDataSource
spark.dataSource.register(MetaCapiDataSource)
print("✅ Meta CAPI datasource registered")

# Create streaming data (simulating 1 purchase per second)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform to CAPI format (Flat Mode)
# We simulate user data. In production, this comes from your tables.
events_df = streaming_df.select(
lit("Purchase").alias("event_name"),
col("timestamp").alias("event_time"),
lit("test@example.com").alias("email"), # Will be auto-hashed
lit("website").alias("action_source"),
(col("value") * 10.0 + 5.0).alias("value"),
lit("USD").alias("currency"),
lit("TEST12345").alias("test_event_code") # For testing in Events Manager
)

print("📊 Starting streaming write to Meta CAPI...")
print(" Check your Events Manager 'Test Events' tab!")

# Write to Meta CAPI
query = (
events_df.writeStream.format("meta_capi")
.option("access_token", token)
.option("pixel_id", pixel_id)
.option("test_event_code", "TEST12345") # Optional: direct test code option
.option("batch_size", "10")
.option("checkpointLocation", "/tmp/meta_capi_example1_checkpoint")
.trigger(processingTime="10 seconds")
.start()
)

# Run for 30 seconds then stop
time.sleep(30)
query.stop()
print("✅ Streaming stopped")

except Exception as e:
print(f"❌ Error: {e}")
finally:
spark.stop()

def example_2_batch_dataframe_to_capi():
"""Example 2: Batch write a static DataFrame to Meta CAPI"""
print("\n" + "=" * 60)
print("EXAMPLE 2: Static DataFrame → Meta CAPI (Batch)")
print("=" * 60)

has_creds, token, pixel_id = check_credentials()
if not has_creds:
return

spark = SparkSession.builder.appName("MetaCapiExample2").getOrCreate()

try:
from pyspark_datasources.meta_capi import MetaCapiDataSource
spark.dataSource.register(MetaCapiDataSource)
print("✅ Meta CAPI datasource registered")

# Create sample data
data = [
("Purchase", 1700000001, "user1@example.com", 120.50, "USD"),
("Purchase", 1700000002, "user2@example.com", 85.00, "USD"),
("AddToCart", 1700000003, "user3@example.com", 25.99, "USD"),
]

columns = ["event_name", "event_time", "email", "value", "currency"]
df = spark.createDataFrame(data, columns)

# Add optional fields
df = df.withColumn("action_source", lit("website")) \
.withColumn("test_event_code", lit("TEST12345"))

print(f"📊 Writing {df.count()} records to Meta CAPI in batch mode...")
print(" Check your Events Manager 'Test Events' tab!")

# Write to Meta CAPI (Batch)
df.write.format("meta_capi") \
.option("access_token", token) \
.option("pixel_id", pixel_id) \
.option("test_event_code", "TEST12345") \
.option("batch_size", "50") \
.save()

print("✅ Batch write completed")

except Exception as e:
print(f"❌ Error: {e}")
finally:
spark.stop()

def main():
print("🚀 Meta CAPI Datasource Example")
example_1_rate_source_to_capi()
example_2_batch_dataframe_to_capi()

if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ nav:
- datasources/lance.md
- datasources/opensky.md
- datasources/weather.md
- datasources/meta_capi.md


markdown_extensions:
- pymdownx.highlight:
Expand Down
1 change: 1 addition & 0 deletions pyspark_datasources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .simplejson import SimpleJsonDataSource
from .stock import StockDataSource
from .jsonplaceholder import JSONPlaceholderDataSource
from .meta_capi import MetaCapiDataSource
Loading