Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,7 @@ _*
results/

# remove background jobs
*nohup.out
nohup.out

# remove log
*.logs
8 changes: 8 additions & 0 deletions notebooks/0.download-data/1.download-data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@
" print(\"shape: \", cfret_df.shape)\n",
" cfret_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "da89481a",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
196 changes: 181 additions & 15 deletions notebooks/0.download-data/2.preprocessing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 2,
"id": "0387feba",
"metadata": {},
"outputs": [],
Expand Down Expand Up @@ -51,27 +51,31 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 3,
"id": "d0f8b798",
"metadata": {},
"outputs": [],
"source": [
"def load_and_concat_profiles(\n",
" profile_dir: str | pathlib.Path,\n",
" shared_features: Optional[list[str]] = None,\n",
" shared_contains_meta: bool = False,\n",
" specific_plates: Optional[list[pathlib.Path]] = None,\n",
") -> pl.DataFrame:\n",
" \"\"\"\n",
" Load all profile files from a directory and concatenate them into a single Polars DataFrame.\n",
" Load all profile files from a directory and concatenate them into a single Polars\n",
" DataFrame.\n",
"\n",
" Parameters\n",
" ----------\n",
" profile_dir : str or pathlib.Path\n",
" Directory containing the profile files (.parquet).\n",
" shared_features : Optional[list[str]], optional\n",
" List of shared feature names to filter the profiles. If None, all features are loaded.\n",
" List of shared feature names to filter the profiles. If None, all features are\n",
" loaded.\n",
" specific_plates : Optional[list[pathlib.Path]], optional\n",
" List of specific plate file paths to load. If None, all profiles in the directory are loaded.\n",
" List of specific plate file paths to load. If None, all profiles in the\n",
" directory are loaded.\n",
"\n",
" Returns\n",
" -------\n",
Expand All @@ -93,13 +97,24 @@
" \"All elements in specific_plates must be pathlib.Path objects\"\n",
" )\n",
"\n",
" def load_profile(file: pathlib.Path) -> pl.DataFrame:\n",
" def load_profile(profile_path: pathlib.Path) -> pl.DataFrame:\n",
" \"\"\"internal function to load a single profile file.\"\"\"\n",
" profile_df = pl.read_parquet(file)\n",
" meta_cols, _ = split_meta_and_features(profile_df)\n",
"\n",
" # load profiles\n",
" profile_df = pl.read_parquet(profile_path)\n",
"\n",
" # print shape\n",
" print(f\"Loaded profile {profile_path.name} with shape {profile_df.shape}\")\n",
"\n",
" # if provided shared feature list does not contain meta, split and select\n",
" # then get it from the profile, if it does, just select the shared features\n",
" # directly\n",
" if shared_features is not None:\n",
" # Only select metadata and shared features\n",
" return profile_df.select(meta_cols + shared_features)\n",
" if not shared_contains_meta:\n",
" meta_cols, _ = split_meta_and_features(profile_df)\n",
" return profile_df.select(meta_cols + shared_features)\n",
"\n",
" return profile_df.select(shared_features)\n",
" return profile_df\n",
"\n",
" # Use specific_plates if provided, otherwise gather all .parquet files\n",
Expand Down Expand Up @@ -179,7 +194,61 @@
" pl.DataFrame\n",
" DataFrame with cleaned column names\n",
" \"\"\"\n",
" return df.rename(lambda x: x.replace(prefix, \"\") if prefix in x else x)"
" return df.rename(lambda x: x.replace(prefix, \"\") if prefix in x else x)\n",
"\n",
"\n",
"def find_shared_features_across_parquets(\n",
" profile_paths: list[str | pathlib.Path],\n",
") -> list[str]:\n",
" \"\"\"\n",
" Finds the intersection of column names across multiple parquet files.\n",
"\n",
" This function returns the list of column names that are present in every provided parquet file.\n",
" The order of columns is preserved from the first file. Uses LazyFrame.collect_schema().names()\n",
" to avoid expensive full reads and the PerformanceWarning.\n",
"\n",
" Parameters\n",
" ----------\n",
" profile_paths : list of str or pathlib.Path\n",
" List of paths to parquet files.\n",
"\n",
" Returns\n",
" -------\n",
" list of str\n",
" List of shared column names present in all files, in the order from the first file.\n",
"\n",
" Raises\n",
" ------\n",
" FileNotFoundError\n",
" If no parquet files are provided or any file does not exist.\n",
" \"\"\"\n",
" if not profile_paths:\n",
" raise FileNotFoundError(\"No parquet files provided\")\n",
"\n",
" # check if they are all strings if so, convert to pathlib.Path\n",
" if all(isinstance(p, str) for p in profile_paths):\n",
" profile_paths = [pathlib.Path(p) for p in profile_paths]\n",
"\n",
" for p in profile_paths:\n",
" if not p.exists():\n",
" raise FileNotFoundError(f\"Profile file not found: {p}\")\n",
"\n",
" # set the first file columns as the initial set\n",
" first_cols = pl.scan_parquet(profile_paths[0]).collect_schema().names()\n",
" common = set(first_cols)\n",
"\n",
" # iterate through the rest of the files and find shared columns\n",
" # of the rest of the profiles\n",
" for p in profile_paths[1:]:\n",
" cols = pl.scan_parquet(p).collect_schema().names()\n",
" common &= set(cols)\n",
" if not common:\n",
" # Early exit if no shared columns remain\n",
" return []\n",
"\n",
" # Preserve first file ordering (Meta and features order)\n",
" shared_features = [c for c in first_cols if c in common]\n",
" return shared_features"
]
},
{
Expand Down Expand Up @@ -216,6 +285,9 @@
" cfret_profiles_dir / \"localhost230405150001_sc_feature_selected.parquet\"\n",
").resolve(strict=True)\n",
"\n",
"# cfret-screen profiles path\n",
"cfret_screen_profiles_path = profiles_dir / \"cfret-screen\"\n",
"\n",
"# Setting feature selection path\n",
"shared_features_config_path = (\n",
" profiles_dir / \"cpjump1\" / \"feature_selected_sc_qc_features.json\"\n",
Expand All @@ -227,6 +299,12 @@
" profiles_dir / \"mitocheck\" / \"normalized_data\"\n",
").resolve(strict=True)\n",
"\n",
"# seting cfret-screen profiles paths\n",
"cfret_screen_profiles_paths = [\n",
" path.resolve(strict=True)\n",
" for path in cfret_screen_profiles_path.glob(\"*_sc_feature_selected.parquet\")\n",
"]\n",
"\n",
"# output directories\n",
"cpjump1_output_dir = (profiles_dir / \"cpjump1\").resolve()\n",
"cpjump1_output_dir.mkdir(exist_ok=True)\n",
Expand Down Expand Up @@ -313,10 +391,8 @@
"\n",
"# Saving metadata and features of the concat profile into a json file\n",
"meta_features_dict = {\n",
" \"concat-profiles\": {\n",
" \"meta-features\": meta_cols,\n",
" \"shared-features\": features_cols,\n",
" }\n",
" \"metadata-features\": meta_cols,\n",
" \"morphology-features\": features_cols,\n",
"}\n",
"with open(cpjump1_output_dir / \"concat_profiles_meta_features.json\", \"w\") as f:\n",
" json.dump(meta_features_dict, f, indent=4)\n",
Expand Down Expand Up @@ -565,6 +641,96 @@
"# overwrite dataset with cell\n",
"cfret_profiles.select(meta_cols + features_cols).write_parquet(cfret_profiles_path)"
]
},
{
"cell_type": "markdown",
"id": "ea8f7f65",
"metadata": {},
"source": [
"## Preprocessing CFReT Screen Dataset\n",
"\n",
"This section preprocesses the CFReT Screen dataset by concatenating all plate profiles into a single unified dataframe. This represents the first batch of plates, which are technical replicates containing identical treatment conditions and dosages across all plates.\n",
"\n",
"**Dataset characteristics:**\n",
"- Each plate contains both positive (n=3) and negative (n=3) controls\n",
"- All treatment plates share the same experimental conditions\n",
"- Technical replicates is at the plate level\n",
"\n",
"**Preprocessing steps:**\n",
"\n",
"1. **Feature alignment**: Identify shared features across all CFReT Screen plates to ensure consistent feature space\n",
"2. **Profile concatenation**: Merge all plate profiles into a single comprehensive dataframe using the shared feature set\n",
"3. **Unique cell identification**: Add `Metadata_cell_id` column with unique hash values to enable precise single-cell tracking "
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "83e0411f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total shared features in cfret-screen profiles: 494\n",
"Loaded profile localhost240927060001_sc_feature_selected.parquet with shape (12397, 652)\n",
"Loaded profile localhost240928120001_sc_feature_selected.parquet with shape (12745, 641)\n",
"Loaded profile localhost240926150001_sc_feature_selected.parquet with shape (16566, 657)\n",
"Loaded profile localhost240927120001_sc_feature_selected.parquet with shape (12902, 684)\n",
"'Metadata_cell_id' column already exists in the DataFrame. Set force=True to overwrite the existing column.\n"
]
}
],
"source": [
"# find shared features across cfret-screen profiles and load and concat them\n",
"cfret_screen_shared_features = find_shared_features_across_parquets(\n",
" cfret_screen_profiles_paths\n",
")\n",
"print(\n",
" \"total shared features in cfret-screen profiles:\", len(cfret_screen_shared_features)\n",
")\n",
"\n",
"cfret_screen_concat_profiles = load_and_concat_profiles(\n",
" profile_dir=cfret_screen_profiles_path,\n",
" shared_features=cfret_screen_shared_features,\n",
" shared_contains_meta=True,\n",
")\n",
"\n",
"# add unique cell ID as a string type\n",
"cfret_screen_concat_profiles = cfret_screen_concat_profiles.with_columns(\n",
" cfret_screen_concat_profiles.hash_rows(seed=0)\n",
" .alias(\"Metadata_cell_id\")\n",
" .cast(pl.Utf8)\n",
")\n",
"\n",
"# split the metadata and features and reorganize features in the concat profile\n",
"cfret_screen_meta_cols, cfret_screen_features_cols = split_meta_and_features(\n",
" cfret_screen_concat_profiles\n",
")\n",
"cfret_screen_concat_profiles = cfret_screen_concat_profiles.select(\n",
" cfret_screen_meta_cols + cfret_screen_features_cols\n",
")\n",
"\n",
"# save feature space config to json file\n",
"with open(cfret_profiles_dir / \"cfret_screen_feature_space_configs.json\", \"w\") as f:\n",
" json.dump(\n",
" {\n",
" \"metadata-features\": cfret_screen_meta_cols,\n",
" \"morphology-features\": cfret_screen_features_cols,\n",
" },\n",
" f,\n",
" indent=4,\n",
" )\n",
"\n",
"# add cell id hash\n",
"cfret_screen_concat_profiles = add_cell_id_hash(cfret_screen_concat_profiles)\n",
"\n",
"# save concatenated cfret-screen profiles\n",
"cfret_screen_concat_profiles.write_parquet(\n",
" cfret_screen_profiles_path / \"cfret_screen_concat_profiles.parquet\"\n",
")"
]
}
],
"metadata": {
Expand Down
Loading