From fbc7664324625068bc4453d2abb8f7a955cd5467 Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Tue, 22 Jul 2025 07:42:20 +0300 Subject: [PATCH 1/6] Jsonplaceholder data source addition --- docs/datasources/jsonplaceholder.md | 3 + docs/index.md | 1 + mkdocs.yml | 1 + pyspark_datasources/__init__.py | 1 + pyspark_datasources/jsonplaceholder.py | 201 +++++++++++++++++++++++++ tests/test_data_sources.py | 15 +- 6 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 docs/datasources/jsonplaceholder.md create mode 100644 pyspark_datasources/jsonplaceholder.py diff --git a/docs/datasources/jsonplaceholder.md b/docs/datasources/jsonplaceholder.md new file mode 100644 index 0000000..a175dd9 --- /dev/null +++ b/docs/datasources/jsonplaceholder.md @@ -0,0 +1,3 @@ +# JSONPlaceholderDataSource + +::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index bec6116..5ad350d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -41,3 +41,4 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | | [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | | [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | +| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | diff --git a/mkdocs.yml b/mkdocs.yml index 8fde0b5..16b3ceb 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -26,6 +26,7 @@ nav: - datasources/salesforce.md - datasources/googlesheets.md - datasources/kaggle.md + - datasources/jsonplaceholder.md markdown_extensions: - pymdownx.highlight: diff --git a/pyspark_datasources/__init__.py b/pyspark_datasources/__init__.py index 3076200..e1d1f18 100644 --- a/pyspark_datasources/__init__.py +++ b/pyspark_datasources/__init__.py @@ -8,3 +8,4 @@ from .salesforce import SalesforceDataSource from .simplejson import SimpleJsonDataSource from .stock import StockDataSource +from .jsonplaceholder import JSONPlaceholderDataSource diff --git a/pyspark_datasources/jsonplaceholder.py b/pyspark_datasources/jsonplaceholder.py new file mode 100644 index 0000000..9605be1 --- /dev/null +++ b/pyspark_datasources/jsonplaceholder.py @@ -0,0 +1,201 @@ +# pyspark_datasources/jsonplaceholder.py + +from typing import Dict, Any, List, Iterator +import requests +from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition +from pyspark.sql.types import StructType +from pyspark.sql import Row + + +class JSONPlaceholderDataSource(DataSource): + """ + A PySpark data source for JSONPlaceholder API. + + JSONPlaceholder is a free fake REST API for testing and prototyping. + This data source provides access to posts, users, todos, comments, albums, and photos. + + Supported endpoints: + - posts: Blog posts with userId, id, title, body + - users: User profiles with complete information + - todos: Todo items with userId, id, title, completed + - comments: Comments with postId, id, name, email, body + - albums: Albums with userId, id, title + - photos: Photos with albumId, id, title, url, thumbnailUrl + + Name: `jsonplaceholder` + + Examples + -------- + Register the data source: + + >>> spark.dataSource.register(JSONPlaceholderDataSource) + + Read posts (default): + + >>> spark.read.format("jsonplaceholder").load().show() + + Read users: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "users").load().show() + + Read with limit: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "todos").option("limit", "5").load().show() + + Read specific item: + + >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show() + """ + + @classmethod + def name(cls) -> str: + return "jsonplaceholder" + + def __init__(self, options=None): + self.options = options or {} + + def schema(self) -> str: + endpoint = self.options.get("endpoint", "posts") + + if endpoint == "posts": + return "userId INT, id INT, title STRING, body STRING" + elif endpoint == "users": + return ("id INT, name STRING, username STRING, email STRING, phone STRING, " + "website STRING, address_street STRING, address_suite STRING, " + "address_city STRING, address_zipcode STRING, address_geo_lat STRING, " + "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, " + "company_bs STRING") + elif endpoint == "todos": + return "userId INT, id INT, title STRING, completed BOOLEAN" + elif endpoint == "comments": + return "postId INT, id INT, name STRING, email STRING, body STRING" + elif endpoint == "albums": + return "userId INT, id INT, title STRING" + elif endpoint == "photos": + return "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING" + else: + return "userId INT, id INT, title STRING, body STRING" + + def reader(self, schema: StructType) -> DataSourceReader: + return JSONPlaceholderReader(self.options) + + +class JSONPlaceholderReader(DataSourceReader): + """Reader implementation for JSONPlaceholder API""" + + def __init__(self, options: Dict[str, str]): + self.options = options + self.base_url = "https://jsonplaceholder.typicode.com" + + self.endpoint = self.options.get("endpoint", "posts") + self.limit = self.options.get("limit") + self.id = self.options.get("id") + + def partitions(self) -> List[InputPartition]: + return [InputPartition(0)] + + def read(self, partition: InputPartition) -> Iterator[Row]: + url = f"{self.base_url}/{self.endpoint}" + + if self.id: + url += f"/{self.id}" + + params = {} + if self.limit and not self.id: + params["_limit"] = self.limit + + try: + response = requests.get(url, params=params, timeout=30) + response.raise_for_status() + + data = response.json() + + if isinstance(data, dict): + data = [data] + elif not isinstance(data, list): + data = [] + + processed_data = [] + for item in data: + processed_item = self._process_item(item) + processed_data.append(processed_item) + + return iter(processed_data) + + except Exception: + return iter([]) + + def _process_item(self, item: Dict[str, Any]) -> Row: + """Process individual items based on endpoint type""" + + if self.endpoint == "posts": + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", ""), + body=item.get("body", "") + ) + + elif self.endpoint == "users": + address = item.get("address", {}) + geo = address.get("geo", {}) + company = item.get("company", {}) + + return Row( + id=item.get("id"), + name=item.get("name", ""), + username=item.get("username", ""), + email=item.get("email", ""), + phone=item.get("phone", ""), + website=item.get("website", ""), + address_street=address.get("street", ""), + address_suite=address.get("suite", ""), + address_city=address.get("city", ""), + address_zipcode=address.get("zipcode", ""), + address_geo_lat=geo.get("lat", ""), + address_geo_lng=geo.get("lng", ""), + company_name=company.get("name", ""), + company_catchPhrase=company.get("catchPhrase", ""), + company_bs=company.get("bs", "") + ) + + elif self.endpoint == "todos": + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", ""), + completed=item.get("completed", False) + ) + + elif self.endpoint == "comments": + return Row( + postId=item.get("postId"), + id=item.get("id"), + name=item.get("name", ""), + email=item.get("email", ""), + body=item.get("body", "") + ) + + elif self.endpoint == "albums": + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", "") + ) + + elif self.endpoint == "photos": + return Row( + albumId=item.get("albumId"), + id=item.get("id"), + title=item.get("title", ""), + url=item.get("url", ""), + thumbnailUrl=item.get("thumbnailUrl", "") + ) + + else: + return Row( + userId=item.get("userId"), + id=item.get("id"), + title=item.get("title", ""), + body=item.get("body", "") + ) \ No newline at end of file diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 69924f4..f4359e2 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -72,7 +72,6 @@ def test_opensky_datasource_stream(spark): assert len(result.columns) == 18 # Check schema has expected number of fields assert result.count() > 0 # Verify we got some data - def test_salesforce_datasource_registration(spark): """Test that Salesforce DataSource can be registered and validates required options.""" spark.dataSource.register(SalesforceDataSource) @@ -176,3 +175,17 @@ def test_arrow_datasource_multiple_files(spark): rows = df.collect() names = {row["name"] for row in rows} assert names == {"Alice", "Bob", "Charlie", "Diana"} + +def test_jsonplaceholder_posts(): + from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource + spark.dataSource.register(JSONPlaceholderDataSource) + posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + assert posts_df.count() > 0 # Ensure we have some posts + + +def test_jsonplaceholder_users(): + from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource + spark.dataSource.register(JSONPlaceholderDataSource) + users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() + assert users_df.count() > 0 # Ensure we have some users + From 564a729bd21e5d326dbea62b92c63545c4da5f67 Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Tue, 22 Jul 2025 08:03:15 +0300 Subject: [PATCH 2/6] Add more tests and update docstring --- pyspark_datasources/jsonplaceholder.py | 25 +++++++++++++++++++++++-- tests/test_data_sources.py | 6 ++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/pyspark_datasources/jsonplaceholder.py b/pyspark_datasources/jsonplaceholder.py index 9605be1..ffb78d8 100644 --- a/pyspark_datasources/jsonplaceholder.py +++ b/pyspark_datasources/jsonplaceholder.py @@ -1,5 +1,3 @@ -# pyspark_datasources/jsonplaceholder.py - from typing import Dict, Any, List, Iterator import requests from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition @@ -45,6 +43,29 @@ class JSONPlaceholderDataSource(DataSource): Read specific item: >>> spark.read.format("jsonplaceholder").option("endpoint", "posts").option("id", "1").load().show() + + Referential Integrity + ------------------- + The data source supports joining related datasets: + + 1. Posts and Users relationship: + posts.userId = users.id + >>> posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + >>> users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() + >>> posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id) + + 2. Posts and Comments relationship: + comments.postId = posts.id + >>> comments_df = spark.read.format("jsonplaceholder").option("endpoint", "comments").load() + >>> posts_with_comments = posts_df.join(comments_df, posts_df.id == comments_df.postId) + + 3. Users, Albums and Photos relationship: + albums.userId = users.id + photos.albumId = albums.id + >>> albums_df = spark.read.format("jsonplaceholder").option("endpoint", "albums").load() + >>> photos_df = spark.read.format("jsonplaceholder").option("endpoint", "photos").load() + >>> user_albums = users_df.join(albums_df, users_df.id == albums_df.userId) + >>> user_photos = user_albums.join(photos_df, albums_df.id == photos_df.albumId) """ @classmethod diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index f4359e2..13a1a6f 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -183,9 +183,11 @@ def test_jsonplaceholder_posts(): assert posts_df.count() > 0 # Ensure we have some posts -def test_jsonplaceholder_users(): +def test_jsonplaceholder_referential_integrity(): from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource spark.dataSource.register(JSONPlaceholderDataSource) users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() assert users_df.count() > 0 # Ensure we have some users - + posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() + posts_with_authors = posts_df.join(users_df, posts_df.userId == users_df.id) + assert posts_with_authors.count() > 0 # Ensure join is valid and we have posts with authors From 494bab1333f5b755cfcf04f7ab592b85cc739888 Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Wed, 23 Jul 2025 07:21:34 +0300 Subject: [PATCH 3/6] PR review fixes --- pyspark_datasources/jsonplaceholder.py | 81 +++++++++++++------------- tests/test_data_sources.py | 2 - 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pyspark_datasources/jsonplaceholder.py b/pyspark_datasources/jsonplaceholder.py index ffb78d8..6ce104b 100644 --- a/pyspark_datasources/jsonplaceholder.py +++ b/pyspark_datasources/jsonplaceholder.py @@ -1,5 +1,6 @@ from typing import Dict, Any, List, Iterator import requests +import logging from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition from pyspark.sql.types import StructType from pyspark.sql import Row @@ -76,26 +77,22 @@ def __init__(self, options=None): self.options = options or {} def schema(self) -> str: - endpoint = self.options.get("endpoint", "posts") + """ Returns the schema for the selected endpoint.""" + schemas = { + "posts": "userId INT, id INT, title STRING, body STRING", + "users": ("id INT, name STRING, username STRING, email STRING, phone STRING, " + "website STRING, address_street STRING, address_suite STRING, " + "address_city STRING, address_zipcode STRING, address_geo_lat STRING, " + "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, " + "company_bs STRING"), + "todos": "userId INT, id INT, title STRING, completed BOOLEAN", + "comments": "postId INT, id INT, name STRING, email STRING, body STRING", + "albums": "userId INT, id INT, title STRING", + "photos": "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING" + } - if endpoint == "posts": - return "userId INT, id INT, title STRING, body STRING" - elif endpoint == "users": - return ("id INT, name STRING, username STRING, email STRING, phone STRING, " - "website STRING, address_street STRING, address_suite STRING, " - "address_city STRING, address_zipcode STRING, address_geo_lat STRING, " - "address_geo_lng STRING, company_name STRING, company_catchPhrase STRING, " - "company_bs STRING") - elif endpoint == "todos": - return "userId INT, id INT, title STRING, completed BOOLEAN" - elif endpoint == "comments": - return "postId INT, id INT, name STRING, email STRING, body STRING" - elif endpoint == "albums": - return "userId INT, id INT, title STRING" - elif endpoint == "photos": - return "albumId INT, id INT, title STRING, url STRING, thumbnailUrl STRING" - else: - return "userId INT, id INT, title STRING, body STRING" + endpoint = self.options.get("endpoint", "posts") + return schemas.get(endpoint, schemas["posts"]) def reader(self, schema: StructType) -> DataSourceReader: return JSONPlaceholderReader(self.options) @@ -136,20 +133,22 @@ def read(self, partition: InputPartition) -> Iterator[Row]: elif not isinstance(data, list): data = [] - processed_data = [] - for item in data: - processed_item = self._process_item(item) - processed_data.append(processed_item) - - return iter(processed_data) + return iter([self._process_item(item) for item in data]) - except Exception: + except requests.RequestException as e: + logging.warning(f"Failed to fetch data from {url}: {e}") + return iter([]) + except ValueError as e: + logging.warning(f"Failed to parse JSON from {url}: {e}") + return iter([]) + except Exception as e: + logging.error(f"Unexpected error while reading data: {e}") return iter([]) def _process_item(self, item: Dict[str, Any]) -> Row: """Process individual items based on endpoint type""" - if self.endpoint == "posts": + def _process_posts(item): return Row( userId=item.get("userId"), id=item.get("id"), @@ -157,7 +156,7 @@ def _process_item(self, item: Dict[str, Any]) -> Row: body=item.get("body", "") ) - elif self.endpoint == "users": + def _process_users(item): address = item.get("address", {}) geo = address.get("geo", {}) company = item.get("company", {}) @@ -180,7 +179,7 @@ def _process_item(self, item: Dict[str, Any]) -> Row: company_bs=company.get("bs", "") ) - elif self.endpoint == "todos": + def _process_todos(item): return Row( userId=item.get("userId"), id=item.get("id"), @@ -188,7 +187,7 @@ def _process_item(self, item: Dict[str, Any]) -> Row: completed=item.get("completed", False) ) - elif self.endpoint == "comments": + def _process_comments(item): return Row( postId=item.get("postId"), id=item.get("id"), @@ -197,14 +196,14 @@ def _process_item(self, item: Dict[str, Any]) -> Row: body=item.get("body", "") ) - elif self.endpoint == "albums": + def _process_albums(item): return Row( userId=item.get("userId"), id=item.get("id"), title=item.get("title", "") ) - elif self.endpoint == "photos": + def _process_photos(item): return Row( albumId=item.get("albumId"), id=item.get("id"), @@ -213,10 +212,14 @@ def _process_item(self, item: Dict[str, Any]) -> Row: thumbnailUrl=item.get("thumbnailUrl", "") ) - else: - return Row( - userId=item.get("userId"), - id=item.get("id"), - title=item.get("title", ""), - body=item.get("body", "") - ) \ No newline at end of file + processors = { + "posts": _process_posts, + "users": _process_users, + "todos": _process_todos, + "comments": _process_comments, + "albums": _process_albums, + "photos": _process_photos + } + + processor = processors.get(self.endpoint, _process_posts) + return processor(item) \ No newline at end of file diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 13a1a6f..147c3b2 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -177,14 +177,12 @@ def test_arrow_datasource_multiple_files(spark): assert names == {"Alice", "Bob", "Charlie", "Diana"} def test_jsonplaceholder_posts(): - from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource spark.dataSource.register(JSONPlaceholderDataSource) posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() assert posts_df.count() > 0 # Ensure we have some posts def test_jsonplaceholder_referential_integrity(): - from pyspark_datasources.jsonplaceholder import JSONPlaceholderDataSource spark.dataSource.register(JSONPlaceholderDataSource) users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() assert users_df.count() > 0 # Ensure we have some users From fd8e7e73bf64e367409fb4a9ddedff93fa9b7633 Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Sun, 3 Aug 2025 22:18:41 +0300 Subject: [PATCH 4/6] Address review feedback and final updates --- docs/index.md | 1 + poetry.lock | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 5ad350d..9aa6888 100644 --- a/docs/index.md +++ b/docs/index.md @@ -42,3 +42,4 @@ spark.readStream.format("fake").load().writeStream.format("console").start() | [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | | [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | | [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | +| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | diff --git a/poetry.lock b/poetry.lock index 5fcc9e3..56b452f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" From d0b9c1996b780659e334b0f64dfbaaf26809a5c1 Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Wed, 13 Aug 2025 22:45:22 +0300 Subject: [PATCH 5/6] Replace logging with print --- pyspark_datasources/jsonplaceholder.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pyspark_datasources/jsonplaceholder.py b/pyspark_datasources/jsonplaceholder.py index 6ce104b..42edf17 100644 --- a/pyspark_datasources/jsonplaceholder.py +++ b/pyspark_datasources/jsonplaceholder.py @@ -1,6 +1,5 @@ from typing import Dict, Any, List, Iterator import requests -import logging from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition from pyspark.sql.types import StructType from pyspark.sql import Row @@ -136,13 +135,13 @@ def read(self, partition: InputPartition) -> Iterator[Row]: return iter([self._process_item(item) for item in data]) except requests.RequestException as e: - logging.warning(f"Failed to fetch data from {url}: {e}") + print(f"Failed to fetch data from {url}: {e}") return iter([]) except ValueError as e: - logging.warning(f"Failed to parse JSON from {url}: {e}") + print(f"Failed to parse JSON from {url}: {e}") return iter([]) except Exception as e: - logging.error(f"Unexpected error while reading data: {e}") + print(f"Unexpected error while reading data: {e}") return iter([]) def _process_item(self, item: Dict[str, Any]) -> Row: From 18316ed19eeff168978ffaffe46bbffa5972ccce Mon Sep 17 00:00:00 2001 From: Chanukya Pekala Date: Thu, 14 Aug 2025 23:24:37 +0300 Subject: [PATCH 6/6] fix unit tests --- tests/test_data_sources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 147c3b2..94c382b 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -176,13 +176,13 @@ def test_arrow_datasource_multiple_files(spark): names = {row["name"] for row in rows} assert names == {"Alice", "Bob", "Charlie", "Diana"} -def test_jsonplaceholder_posts(): +def test_jsonplaceholder_posts(spark): spark.dataSource.register(JSONPlaceholderDataSource) posts_df = spark.read.format("jsonplaceholder").option("endpoint", "posts").load() assert posts_df.count() > 0 # Ensure we have some posts -def test_jsonplaceholder_referential_integrity(): +def test_jsonplaceholder_referential_integrity(spark): spark.dataSource.register(JSONPlaceholderDataSource) users_df = spark.read.format("jsonplaceholder").option("endpoint", "users").load() assert users_df.count() > 0 # Ensure we have some users