diff --git a/.github/workflows/issue-solver.yml b/.github/workflows/issue-solver.yml new file mode 100644 index 0000000..9f4e120 --- /dev/null +++ b/.github/workflows/issue-solver.yml @@ -0,0 +1,72 @@ + +name: Entelligence-AI +permissions: + contents: read + issues: write +on: + issues: + types: [opened, edited] +jobs: + handle_issues: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Setup Node.js 20 + uses: actions/setup-node@v4 + with: + node-version: '20' + + - name: Extract Repository and Username + id: extract-repo-info + run: | + repo_name=$(basename $GITHUB_REPOSITORY) + username=$(dirname $GITHUB_REPOSITORY | cut -d'/' -f1) + echo "REPO_NAME=$repo_name" >> $GITHUB_ENV + echo "USERNAME=$username" >> $GITHUB_ENV + + - name: Sanitize Issue Body + id: sanitize-body + run: | + sanitized_body=$(echo "${{ github.event.issue.body }}" | tr -d '\r' | tr '\n' ' ') + echo "SANITIZED_BODY=${sanitized_body}" >> $GITHUB_ENV + + - name: Debug Sanitized Body + run: | + echo "Sanitized Body: ${{ env.SANITIZED_BODY }}" + + - name: Call API + id: call-api + env: + API_URL: ${{ secrets.ENTELLIGENCE_AI_ISSUE_API }} + ISSUE_TITLE: ${{ github.event.issue.title }} + ISSUE_BODY: ${{ env.SANITIZED_BODY }} + REPO_NAME: ${{ env.REPO_NAME }} + USERNAME: ${{ env.USERNAME }} + run: | + set +e + response=$(curl -s -X POST ${{env.API_URL}} \ + -H "Content-Type: application/json" \ + -d "{\"vectorDBUrl\": \"${{env.USERNAME}}&${{env.REPO_NAME}}\", \"title\": \"${{env.ISSUE_TITLE}}\", \"summary\": \"${{env.ISSUE_BODY}}\", \"repoName\": \"${{env.USERNAME}}/${{env.REPO_NAME}}\"}") + body=$(echo "$response" | sed '$d') + echo "$response" + echo "API_RESPONSE<> $GITHUB_ENV + echo $(printf "%s" "$body" | base64) >> $GITHUB_ENV + echo "EOF" >> $GITHUB_ENV + set -e + + - name: Post Comment on Issue + uses: actions/github-script@v6 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + script: | + const issueNumber = context.issue.number; + const apiResponse = Buffer.from(process.env.API_RESPONSE, 'base64').toString('utf-8'); + + github.rest.issues.createComment({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: issueNumber, + body: apiResponse + }); diff --git a/README.md b/README.md index cbc72cb..591e9fa 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,14 @@ -# Deploy your AI Application in Production +# STAPRO: AI Interpretace Laboratorních Výsledků (Boilerplate pro Produkční Nasazení) ## Overview -🚀 **New: Updated deployment to match Foundry release at Build 2025!** +Tento repozitář původně sloužil jako základní šablona (**Deploy your AI Application in Production**) pro nasazení AI Foundry účtu a projektu do izolovaného prostředí v Azure, s důrazem na Well-Architected Framework (WAF). Využívá Azure Verified Modules (AVM) a Azure Developer CLI (AZD). + +V rámci projektu **STAPRO (Interpretace Laboratorního Výsledku)** byla tato šablona rozšířena o specifickou AI aplikaci. Cílem projektu STAPRO je automatická analýza a interpretace laboratorních výsledků pomocí LLM a RAG technik. + +🚀 **Poznámka k původní šabloně: Updated deployment to match Foundry release at Build 2025!** This new update has been tested in the EastUS2 region successfully. This is a foundational solution for deploying an AI Foundry account ([Cognitive Services accountKind = 'AIServices'](https://review.learn.microsoft.com/en-us/azure/templates/microsoft.cognitiveservices/2025-04-01-preview/accounts?branch=main&pivots=deployment-language-bicep)) and project ([cognitiveServices/projects](https://review.learn.microsoft.com/en-us/azure/templates/microsoft.cognitiveservices/2025-04-01-preview/accounts/projects?branch=main&pivots=deployment-language-bicep)) into an isolated environment (vNet) within Azure. The deployed features follow Microsoft's Well-Architected Framework [WAF](https://learn.microsoft.com/en-us/azure/well-architected/) to establish an isolated infrastructure for AI Foundry, intended to assist in moving from a Proof of Concept state to a production-ready application. @@ -16,7 +20,116 @@ This repository will automate: 1. Configuring the virtual network, private end points and private link services to isolate resources connecting to the account and project in a secure way. [Secure Data Playground](https://learn.microsoft.com/en-us/azure/ai-foundry/how-to/secure-data-playground) 2. Deploying and configuring the network isolation of the Azure AI Foundry account and project sub-resource within the virtual network, and with all services configured behind private end points. - +## Projekt STAPRO: Interpretace Laboratorního Výsledku + +Tento projekt má za cíl vyvinout AI aplikaci pro automatickou analýzu a interpretaci laboratorních výsledků. Aplikace bude využívat pokročilé metody umělé inteligence, včetně velkých jazykových modelů (LLM) a techniky Retrieval-Augmented Generation (RAG), k poskytování klinicky relevantních interpretací a generování standardizovaných zpráv. + +### Klíčové Cíle Projektu STAPRO +- **Zrychlení diagnostiky:** Poskytovat automatizované a přesné interpretace. +- **Optimalizace workflow:** Automatizovat tvorbu standardizovaných laboratorních zpráv. +- **Klinická relevance:** Zajistit, aby interpretace byly užitečné pro lékaře. +- **Standardizace:** Generovat zprávy v jednotném formátu. + +### Roadmapa a Postup Projektu + +Následující roadmapa popisuje klíčové fáze a kroky implementace projektu STAPRO. + +**Fáze 1: Inicializace a Plánování** +- [x] Analýza zadání a požadavků +- [x] Průzkum existujícího boilerplate kódu +- [x] Vytvoření detailního plánu implementace +- [x] Úprava `README.md` a vytvoření této roadmapy + +**Fáze 2: Příprava Infrastruktury a Vývojového Prostředí** +- [ ] Analýza a konfigurace Bicep šablon (`infra/main.bicep`) pro potřeby STAPRO + - [ ] Identifikace potřebných Azure služeb (Azure OpenAI, AI Search, CosmosDB/SQL, atd.) + - [ ] Nastavení parametrů pro nasazení (např. `main.parameters.json` nebo interaktivně přes `azd up`) + - [ ] Ověření konfigurace Azure OpenAI pro medicínské LLM + - [ ] Ověření konfigurace Azure AI Search pro vektorové vyhledávání (RAG) +- [ ] Nasazení základní infrastruktury pomocí `azd up` + +**Fáze 3: Vývoj Jádra AI Enginu (LangChain)** +- [ ] Vytvoření adresářové struktury pro AI Engine (např. `src/ai_engine/`) +- [ ] Implementace Nástrojů (Tools): + - [ ] `LabDataNormalizerTool` (normalizace vstupních JSON dat) + - [ ] `PredictiveAnalysisTool` (placeholder/maketa) + - [ ] `RAGRetrievalTool` (načítání znalostí, integrace s vektorovou DB) +- [ ] Implementace Promptů (`ChatPromptTemplate` dle specifikace) +- [ ] Výběr a konfigurace LLM (integrace `ChatOpenAI` s Azure OpenAI) +- [ ] Sestavení LangChain Agenta/Chainu (LCEL preferováno) +- [ ] Implementace formátování výstupu (`StrOutputParser`) + +**Fáze 4: Vývoj API Vrstvy** +- [ ] Návrh a implementace API endpointu (FastAPI / Azure Functions) + - [ ] Příjem JSON dat z OpenLIMS + - [ ] Volání AI Enginu + - [ ] Vracení JSON odpovědi dle specifikace +- [ ] Základní logování a error handling + +**Fáze 5: Implementace RAG (Retrieval-Augmented Generation)** +- [ ] Příprava a zpracování znalostní báze: + - [ ] Shromáždění ukázkových klinických směrnic + - [ ] Skripty pro načtení, rozdělení textu, generování embeddingů + - [ ] Uložení do vektorové databáze (Azure AI Search) +- [ ] Integrace `RAGRetrievalTool` s vektorovou databází + +**Fáze 6: Testování a Ladění** +- [ ] Vytvoření sady testovacích vstupních JSONů +- [ ] Manuální a (volitelně) automatizované testování API a AI Enginu +- [ ] Iterativní ladění promptů, nástrojů a celého toku +- [ ] Zaměření na kvalitu interpretací a minimalizaci "halucinací" + +**Fáze 7: Nasazení a Integrace** +- [ ] Příprava konfigurace pro nasazení AI Enginu (API) na Azure (Azure Functions, App Service, ACA) +- [ ] Nasazení aplikace pomocí `azd up` (nebo jiných CI/CD pipeline) +- [ ] Nastavení konfiguračních proměnných v Azure +- [ ] Simulace integrace s OpenLIMS (testovací klientský skript) + +**Fáze 8: Dokumentace a Finalizace** +- [ ] Průběžná dokumentace kódu a architektury +- [ ] Aktualizace `README.md` s instrukcemi pro nasazení a použití +- [ ] Celkové přezkoumání a příprava k předání + +## Komponenty Projektu STAPRO + +Aplikace STAPRO se skládá z několika klíčových komponent: + +### AI Engine (LangChain) +- **Umístění kódu:** `src/ai_engine/` +- **Popis:** Jádro aplikace, implementované pomocí frameworku LangChain. Využívá model LCEL (LangChain Expression Language) pro orchestraci komplexního řetězce zpracování. +- **Hlavní součásti:** + - **LLM (Large Language Model):** Využívá `AzureChatOpenAI` pro generování textových interpretací. Konfigurace se nachází v `src/ai_engine/core/llm.py` a spoléhá na proměnné prostředí pro Azure OpenAI (endpoint, klíč, název nasazení). + - **Prompty:** Strukturované prompty (`ChatPromptTemplate`) definující roli LLM a formát vstupu/výstupu. Nachází se v `src/ai_engine/core/prompts.py`. Obsahuje dynamické vkládání dat a specifické instrukce pro různé typy požadovaných popisů. + - **Nástroje (Tools):** + - `LabDataNormalizerTool` (`src/ai_engine/tools/lab_data_normalizer.py`): Normalizuje a validuje vstupní JSON data z OpenLIMS. + - `PredictiveAnalysisTool` (`src/ai_engine/tools/predictive_analysis.py`): Placeholder pro budoucí integraci prediktivních modelů. Aktuálně vrací mockovaná data. + - `RAGRetrievalTool` (`src/ai_engine/tools/rag_retrieval.py`): Zajišťuje Retrieval-Augmented Generation. Vyhledává relevantní informace v znalostní bázi (Azure AI Search) na základě dotazu odvozeného z laboratorních dat. + - **Řetězec (Chain):** Hlavní LCEL řetězec v `src/ai_engine/core/chains.py` spojuje jednotlivé kroky: normalizace dat -> prediktivní analýza (mock) -> RAG vyhledávání -> příprava promptu -> volání LLM. +- **Vstupní bod:** `src/ai_engine/main.py` obsahuje funkci `get_lab_interpretation(raw_json_input_string)`, která přijímá JSON string a vrací textovou interpretaci nebo chybu. + +### API Vrstva (FastAPI) +- **Umístění kódu:** `src/api/` +- **Popis:** Poskytuje REST API rozhraní pro komunikaci s externími systémy (např. OpenLIMS). +- **Hlavní součásti (`src/api/main.py`):** + - **FastAPI aplikace:** Instance FastAPI. + - **Endpoint `/interpret` (POST):** + - Přijímá JSON data s laboratorními výsledky (dle Pydantic modelu `InterpretationRequest`). + - Volá AI Engine (`get_lab_interpretation`) pro zpracování dat. + - Vrací odpověď ve formátu `InterpretationResponse` (obsahuje `request_id` a buď `interpretation_text` nebo `error`). + - **Endpoint `/health` (GET):** Pro ověření stavu API. +- **Spuštění:** API server se spouští pomocí Uvicorn, např.: `uvicorn src.api.main:app --reload --host 0.0.0.0 --port 8000` (z kořenového adresáře projektu). + +### RAG Data Pipeline +- **Umístění kódu:** `src/rag_pipeline/` +- **Popis:** Sada skriptů pro přípravu a nahrání znalostní báze do vektorové databáze (Azure AI Search) pro potřeby RAG. +- **Hlavní součásti:** + - `document_loader.py`: Načítá dokumenty (např. `.txt`, `.pdf`) z adresáře `data/knowledge_base/`. + - `text_splitter.py`: Dělí načtené dokumenty na menší textové chunky. + - `embedding_generator.py`: Generuje vektorové embeddingy pro chunky pomocí Azure OpenAI embedding modelu (např. `text-embedding-ada-002`). + - `vectorstore_updater.py`: Vytváří/aktualizuje index v Azure AI Search a nahrává do něj chunky spolu s jejich embeddingy. Definuje schéma indexu včetně vektorových polí a sémantické konfigurace. + - `main_pipeline.py`: Orchestruje celý proces (načtení -> dělení -> embedding -> nahrání). +- **Spuštění:** Pipeline se spouští skriptem `src/rag_pipeline/main_pipeline.py` (např. `python -m src.rag_pipeline.main_pipeline`). Vyžaduje nastavené proměnné prostředí pro Azure OpenAI a Azure AI Search. +- **Znalostní báze:** Ukázkové dokumenty se nacházejí v `data/knowledge_base/`. ## Architecture The diagram below illustrates the capabilities included in the template. @@ -75,11 +188,123 @@ For additional documentation of the default enabled services of this solution ac 11. [Azure Log Analytics](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-analytics-overview) 12. [Azure Application Insights](https://learn.microsoft.com/en-us/azure/azure-monitor/app/app-insights-overview) -## Getting Started +## Getting Started (Projekt STAPRO) + +Tato sekce popisuje kroky pro spuštění a lokální testování aplikace STAPRO. Pro nasazení na Azure pomocí `azd` se řiďte původní dokumentací šablony (viz níže a složka `docs/`). + +### 1. Příprava Prostředí a Závislostí + +- **Klonování Repozitáře:** + ```bash + git clone + cd + ``` +- **Vytvoření Virtuálního Prostředí (doporučeno):** + ```bash + python -m venv .venv + source .venv/bin/activate # Linux/macOS + # .venv\Scripts\activate # Windows + ``` +- **Instalace Závislostí:** + Všechny potřebné Python knihovny jsou definovány v souboru `requirements.txt`. + ```bash + pip install -r requirements.txt + ``` +- **Nastavení Proměnných Prostředí (`.env` soubor):** + Vytvořte soubor `.env` v kořenovém adresáři projektu. Tento soubor bude obsahovat citlivé údaje a konfiguraci pro připojení k Azure službám. **Tento soubor by neměl být commitován do Git repozitáře!** (Ujistěte se, že je v `.gitignore`). + + Obsah souboru `.env` by měl vypadat následovně (nahraďte placeholder hodnoty `<...>` vašimi skutečnými údaji): + ```env + # Azure OpenAI Configuration + AZURE_OPENAI_ENDPOINT="https://.openai.azure.com/" + AZURE_OPENAI_API_KEY="" + AZURE_OPENAI_CHAT_DEPLOYMENT_NAME="gpt-4o" # Nebo název vašeho nasazení LLM (např. gpt-35-turbo) + AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME="textembed" # Název vašeho nasazení embedding modelu (např. text-embedding-ada-002) + AZURE_OPENAI_API_VERSION="2024-02-01" # Nebo aktuální podporovaná verze + + # Azure AI Search Configuration + AZURE_AI_SEARCH_ENDPOINT="https://.search.windows.net" + AZURE_AI_SEARCH_ADMIN_KEY="" # Admin klíč pro vytváření/aktualizaci indexů + AZURE_AI_SEARCH_QUERY_KEY="" # Query klíč (volitelný, pokud se liší od admin) + AZURE_AI_SEARCH_INDEX_NAME="staprolab-knowledgebase-index" # Výchozí název indexu + + # Původní proměnné pro AZD (pokud plánujete nasazení přes AZD) + # AZURE_LOCATION="" # např. eastus2, westeurope + # AZURE_SUBSCRIPTION_ID="" + # AZURE_ENV_NAME="staprolabai" # Příklad názvu prostředí pro AZD + # AZURE_PRINCIPAL_ID="" # Pro RBAC + # AZURE_VM_ADMIN_PASSWORD="" # Pokud používáte VM pro jumpbox + ``` + +### 2. Spuštění RAG Data Pipeline +Tato pipeline zpracuje dokumenty z `data/knowledge_base/`, vygeneruje pro ně embeddingy a nahraje je do Azure AI Search. Spouští se z kořenového adresáře projektu: +```bash +python -m src.rag_pipeline.main_pipeline +``` +- **Kontrola:** Po úspěšném dokončení ověřte v Azure Portal, že byl vytvořen/aktualizován index v Azure AI Search a že obsahuje data. + +### 3. Spuštění API Serveru (FastAPI) +API server poskytuje endpoint pro interpretaci laboratorních výsledků. Spouští se z kořenového adresáře projektu: +```bash +uvicorn src.api.main:app --reload --host 0.0.0.0 --port 8000 +``` +- Aplikace bude dostupná na `http://localhost:8000`. +- Endpoint `/docs` (např. `http://localhost:8000/docs`) poskytuje automaticky generovanou Swagger/OpenAPI dokumentaci. +- Endpoint `/health` (např. `http://localhost:8000/health`) ověří stav API. + +### 4. Testování API +Pro odeslání požadavku na API můžete použít nástroje jako Postman, Insomnia, nebo `curl`, případně Python skript s knihovnou `requests`. + +**Příklad POST požadavku na `/interpret`:** +- **URL:** `http://localhost:8000/interpret` +- **Metoda:** `POST` +- **Headers:** `Content-Type: application/json` +- **Body (raw JSON):** + ```json + { + "request_id": "TEST-001", + "evaluation_method": "NEHRAZENY_POPIS_ABNORMITA", + "patient_metadata": { + "gender": "muz", + "age": 55 + }, + "current_lab_results": [ + { + "parameter_code": "CRP", + "parameter_name": "S_CRP", + "value": "25.0", + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "HIGH" + }, + { + "parameter_code": "GLUK", + "parameter_name": "S_Glukóza", + "value": "5.0", + "unit": "mmol/L", + "reference_range_raw": "3.9-5.6", + "interpretation_status": "NORMAL" + } + ], + "dasta_text_sections": { + "doctor_description": "Pacient si stěžuje na zvýšenou teplotu a kašel." + }, + "diagnoses": [], + "anamnesis_and_medication": { + "anamnesis_text": "Hypertenze, jinak zdráv.", + "medication_text": "Prestarium Neo" + } + } + ``` + +Očekávaná odpověď bude JSON objekt obsahující textovou interpretaci. + +--- +Původní sekce "Getting Started" pro nasazení pomocí Azure Developer CLI (AZD):


-QUICK DEPLOY +QUICK DEPLOY (via AZD)

| [![Open in GitHub Codespaces](https://github.com/codespaces/badge.svg)](https://codespaces.new/microsoft/Deploy-Your-AI-Application-In-Production) | [![Open in Dev Containers](https://img.shields.io/static/v1?style=for-the-badge&label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode)](https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/microsoft/Deploy-Your-AI-Application-In-Production) | diff --git a/data/knowledge_base/smernice_crp.txt b/data/knowledge_base/smernice_crp.txt new file mode 100644 index 0000000..bcbffa8 --- /dev/null +++ b/data/knowledge_base/smernice_crp.txt @@ -0,0 +1,27 @@ +Název: Klinické doporučení pro interpretaci C-reaktivního proteinu (CRP) + +Datum vydání: 2024-01-15 +Verze: 1.1 + +Úvod: +C-reaktivní protein (CRP) je protein akutní fáze zánětu, syntetizovaný v játrech. Jeho hladina v krvi stoupá v reakci na zánět, infekci nebo tkáňové poškození. + +Referenční hodnoty: +- Dospělí: < 5 mg/L +- Novorozenci: < 10 mg/L (může být fyziologicky vyšší) + +Interpretace zvýšených hodnot: +- Mírné zvýšení (5 - 10 mg/L): Může být přítomno u mírných lokálních zánětů, virových infekcí, po fyzické námaze, v těhotenství nebo u kuřáků. Vyžaduje zvážení klinického stavu. +- Střední zvýšení (10 - 50 mg/L): Často doprovází bakteriální infekce (např. bronchitida, cystitida), aktivní autoimunitní onemocnění, nebo pooperační stavy. +- Výrazné zvýšení (50 - 200 mg/L): Typické pro závažné bakteriální infekce (např. pneumonie, pyelonefritida), rozsáhlé trauma, popáleniny, akutní infarkt myokardu. +- Extrémní zvýšení (> 200 mg/L): Může signalizovat sepsi, závažné systémové infekce nebo rozsáhlé popáleniny. Vyžaduje urgentní řešení. + +Dynamika CRP: +Hladina CRP stoupá rychle (během 6-8 hodin od začátku podnětu) a také rychle klesá po odeznění příčiny (poločas cca 19 hodin). Sledování dynamiky CRP je užitečné pro monitorování průběhu onemocnění a odpovědi na léčbu. + +Upozornění: +- Izolované mírné zvýšení CRP bez klinických příznaků by mělo být hodnoceno s opatrností. +- Chronicky mírně zvýšené CRP (hs-CRP) je spojováno se zvýšeným kardiovaskulárním rizikem. + +Doporučení: +Interpretace CRP musí vždy probíhat v kontextu klinického stavu pacienta, anamnézy a dalších laboratorních nálezů. V případě nejasností nebo výrazně zvýšených hodnot je indikováno další došetření. diff --git a/data/knowledge_base/smernice_psa.txt b/data/knowledge_base/smernice_psa.txt new file mode 100644 index 0000000..f5da1b2 --- /dev/null +++ b/data/knowledge_base/smernice_psa.txt @@ -0,0 +1,38 @@ +Název: Doporučení pro interpretaci Prostatického Specifického Antigenu (PSA) + +Datum vydání: 2023-11-20 +Verze: 1.0 + +Úvod: +Prostatický specifický antigen (PSA) je glykoprotein produkovaný epiteliálními buňkami prostaty. Jeho hladina v séru se využívá jako tumorový marker pro karcinom prostaty, ale může být zvýšena i u nezhoubných stavů. + +Referenční hodnoty a věková specifika: +Referenční rozmezí pro PSA se liší v závislosti na věku: +- Muži 40-49 let: obvykle do 2.5 µg/L (ng/mL) +- Muži 50-59 let: obvykle do 3.5 µg/L +- Muži 60-69 let: obvykle do 4.5 µg/L +- Muži >70 let: obvykle do 6.5 µg/L +Je důležité používat věkově specifické referenční meze poskytované konkrétní laboratoří. + +Příčiny zvýšené hladiny PSA: +1. Karcinom prostaty: PSA je klíčovým markerem, ale ne všechny karcinomy prostaty produkují vysoké hladiny PSA. +2. Benigní hyperplazie prostaty (BHP): Častá příčina zvýšení PSA u starších mužů. Zvětšená prostata produkuje více PSA. +3. Prostatitida (zánět prostaty): Akutní i chronický zánět může vést k výraznému zvýšení PSA. +4. Mechanické podráždění prostaty: Např. po digitálním rektálním vyšetření, cystoskopii, biopsii prostaty, katetrizaci nebo ejakulaci. Doporučuje se odběr krve na PSA před těmito výkony nebo s odstupem. +5. Ischemie nebo infarkt prostaty. + +Interpretace a doplňující faktory: +- Rychlost PSA (PSA velocity): Změna hladiny PSA v čase. Rychlý nárůst může být podezřelý i při hodnotách v referenčním rozmezí. +- Poměr volného a celkového PSA (f/t PSA ratio): Užitečné při hodnotách PSA v tzv. "šedé zóně" (např. 4-10 µg/L). Nižší poměr (<0.15 nebo <15%) zvyšuje podezření na karcinom. +- Hustota PSA (PSA density): Hladina PSA vztažená k objemu prostaty. +- Anamnéza pacienta: Rodinná zátěž karcinomem prostaty, symptomy dolních močových cest, předchozí operace prostaty, probíhající léčba. +- Léky: Některé léky (např. inhibitory 5-alfa-reduktázy jako finasterid, dutasterid) mohou snižovat hladinu PSA. + +Doporučení: +- Zvýšená hladina PSA by měla být vždy hodnocena urologem. +- Samotná hodnota PSA není diagnostická pro karcinom prostaty. Je nutné komplexní posouzení zahrnující anamnézu, fyzikální vyšetření (včetně digitálního rektálního vyšetření) a případně další zobrazovací metody (např. transrektální ultrasonografie, MRI prostaty) a biopsii prostaty. +- U pacientů s diagnostikovaným karcinomem prostaty se PSA používá k monitorování úspěšnosti léčby a detekci recidivy. +- Screening PSA u asymptomatických mužů je kontroverzní téma a měl by být diskutován individuálně s lékařem s ohledem na potenciální přínosy a rizika. + +Kumulativní hodnocení: +Při hodnocení aktuální hodnoty PSA je klíčové srovnání s předchozími hodnotami pacienta (pokud jsou dostupné) pro posouzení trendu. AI systém by měl mít přístup k historickým datům PSA pro daného pacienta, pokud je to možné. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3d3e7b5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +langchain +langchain-openai +langchain-community +openai +fastapi +uvicorn[standard] +azure-search-documents +azure-identity +python-dotenv +pypdf +requests +tiktoken +# Pro testování PDF generace v document_loader.py, pokud by bylo odkomentováno: +# reportlab diff --git a/src/ai_engine/__init__.py b/src/ai_engine/__init__.py new file mode 100644 index 0000000..34b4448 --- /dev/null +++ b/src/ai_engine/__init__.py @@ -0,0 +1 @@ +# src/ai_engine/__init__.py diff --git a/src/ai_engine/core/__init__.py b/src/ai_engine/core/__init__.py new file mode 100644 index 0000000..d29e4b5 --- /dev/null +++ b/src/ai_engine/core/__init__.py @@ -0,0 +1 @@ +# src/ai_engine/core/__init__.py diff --git a/src/ai_engine/core/chains.py b/src/ai_engine/core/chains.py new file mode 100644 index 0000000..124b35e --- /dev/null +++ b/src/ai_engine/core/chains.py @@ -0,0 +1,256 @@ +# src/ai_engine/core/chains.py +import json +from typing import Dict, Any, Optional, Union +from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableParallel +from langchain_core.output_parsers import StrOutputParser +from ..tools.lab_data_normalizer import normalize_lab_data_func +from ..tools.predictive_analysis import run_predictive_analysis_func +from ..tools.rag_retrieval import retrieve_clinical_guidelines_func +from .prompts import interpretation_prompt, format_lab_results_for_prompt, get_specific_instructions +from .llm import get_llm + +def prepare_llm_input(processed_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Formátuje data pro vložení do hlavního interpretačního promptu LLM. + """ + normalized_data = processed_data.get("normalized_data", {}) + if isinstance(normalized_data, str): # Může se stát, pokud normalizace selže a vrátí string error + normalized_data = {"error_in_normalization": normalized_data} + + patient_metadata = normalized_data.get("patient_metadata", {}) + current_lab_results = normalized_data.get("current_lab_results", []) + dasta_sections = normalized_data.get("dasta_text_sections", {}) + diagnoses = normalized_data.get("diagnoses", []) + anamnesis = normalized_data.get("anamnesis_and_medication", {}) + + llm_input = { + "evaluation_method": normalized_data.get("evaluation_method", "Neznámá metoda vyhodnocení"), + "patient_age": patient_metadata.get("age", "Nezadáno"), + "patient_gender": patient_metadata.get("gender", "Nezadáno"), + "anamnesis_and_medication": f"Anamnéza: {anamnesis.get('anamnesis_text', 'Nezadáno')}, Medikace: {anamnesis.get('medication_text', 'Nezadáno')}", + "diagnoses": ", ".join(diagnoses) if diagnoses else "Nezadáno", + "doctor_description": dasta_sections.get("doctor_description", "Nezadáno"), + "memo_to_request": dasta_sections.get("memo_to_request", "Nezadáno"), # TODO: Zvážit i memo k metodám + "current_lab_results_formatted": format_lab_results_for_prompt(current_lab_results), + "rag_context": processed_data.get("rag_context", "Nebyly nalezeny žádné relevantní klinické směrnice."), + "predictive_outputs": json.dumps(processed_data.get("predictive_outputs", {"status": "Prediktivní analýza nebyla provedena."}), indent=2, ensure_ascii=False), + "specific_instructions": get_specific_instructions( + normalized_data.get("evaluation_method", ""), + patient_metadata, # patient_data + current_lab_results # lab_results + ) + } + return llm_input + +def generate_rag_query(processed_data: Dict[str, Any]) -> str: + """ + Generuje dotaz pro RAG na základě normalizovaných dat a prediktivních výstupů. + """ + normalized_data = processed_data.get("normalized_data", {}) + predictive_outputs = processed_data.get("predictive_outputs", {}) + + # Extrahujeme názvy parametrů s abnormálními hodnotami + abnormal_params = [ + res.get("parameter_name") + for res in normalized_data.get("current_lab_results", []) + if res.get("interpretation_status", "NORMAL").upper() != "NORMAL" and res.get("parameter_name") + ] + + # Extrahujeme identifikovaná rizika z prediktivní analýzy + identified_risks = predictive_outputs.get("identified_risks", []) + + # Sestavíme dotaz + query_parts = [] + if abnormal_params: + query_parts.append(f"Interpretace pro abnormální parametry: {', '.join(abnormal_params)}") + if identified_risks: + query_parts.append(f"Klinické informace k rizikům: {', '.join(identified_risks)}") + + # Fallback, pokud nic specifického není + if not query_parts: + # Můžeme vzít první parametr nebo obecný dotaz + first_param_name = normalized_data.get("current_lab_results", [{}])[0].get("parameter_name", "laboratorní výsledky") + query_parts.append(f"Obecné informace k {first_param_name}") + + return " ".join(query_parts) + + +# Hlavní LCEL řetězec pro AI Engine +# Používáme RunnableParallel pro souběžné (kde to dává smysl) nebo sekvenční zpracování +# s jasným předáváním dat mezi kroky. + +# Krok 1: Normalizace vstupních dat +# Vstup: {"raw_json_input": "..."} +# Výstup: {"raw_json_input": "...", "normalized_data": {...}} +chain_normalize = RunnablePassthrough.assign( + normalized_data=RunnableLambda(lambda x: normalize_lab_data_func(x["raw_json_input"])) +) + +# Krok 2: Prediktivní analýza (na základě normalizovaných dat) +# Vstup: Výstup z chain_normalize +# Výstup: {"raw_json_input": "...", "normalized_data": {...}, "predictive_outputs": {...}} +chain_predict = RunnablePassthrough.assign( + predictive_outputs=RunnableLambda(lambda x: run_predictive_analysis_func(x["normalized_data"])) +) + +# Krok 3: Generování dotazu pro RAG a samotné RAG vyhledávání +# Vstup: Výstup z chain_predict +# Výstup: {"raw_json_input": "...", "normalized_data": {...}, "predictive_outputs": {...}, "rag_query": "...", "rag_context": "..."} +chain_rag = RunnablePassthrough.assign( + rag_query=RunnableLambda(generate_rag_query), + # rag_context se přidá v dalším kroku, protože potřebuje rag_query +) +chain_rag = chain_rag.assign( + rag_context=RunnableLambda(lambda x: retrieve_clinical_guidelines_func(x["rag_query"])) +) + + +# Krok 4: Příprava finálního vstupu pro LLM a volání LLM +# Vstup: Výstup z chain_rag +# Výstup: String (finální interpretace) +chain_llm = ( + RunnableLambda(prepare_llm_input) # Připraví slovník pro prompt template + | interpretation_prompt # Vloží hodnoty do promptu + | get_llm() # Získá instanci LLM + | StrOutputParser() # Převede výstup LLM na string +) + +# Celkový AI Engine řetězec +# Tento řetězec spojuje všechny předchozí kroky. +# Pořadí je důležité: normalizace -> predikce -> RAG -> LLM +ai_engine_chain = ( + chain_normalize + | chain_predict + | chain_rag + | chain_llm +) + + +# Pro testování +if __name__ == "__main__": + sample_raw_json_input = """ + { + "request_id": "GUID_TEST_123", + "evaluation_method": "HRAZENY_POPIS_BALICEK_TEST", + "patient_metadata": { + "gender": "žena", + "age": 35, + "historical_data_access_key": "pid_hash_789" + }, + "current_lab_results": [ + { + "parameter_code": "01001", + "parameter_name": "S_CRP", + "value": "8.0", + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "HIGH", + "raw_dasta_skala": "| | |H" + }, + { + "parameter_code": "GLUC", + "parameter_name": "S_Glukóza", + "value": "6.5", + "unit": "mmol/L", + "reference_range_raw": "3.9-5.6", + "interpretation_status": "HIGH", + "raw_dasta_skala": "| | |H" + }, + { + "parameter_code": "HCG", + "parameter_name": "S_HCG", + "value": "1500", + "unit": "mIU/mL", + "reference_range_raw": "<5", + "interpretation_status": "HIGH", + "raw_dasta_skala": "HCG_HIGH" + } + ], + "dasta_text_sections": { + "doctor_description": "Pacientka přichází na preventivní prohlídku, udává únavu.", + "memo_to_request": "Prosím o komplexní zhodnocení v rámci balíčku 'Zdraví ženy'." + }, + "diagnoses": [ + "Mírná anémie v minulosti" + ], + "anamnesis_and_medication": { + "anamnesis_text": "OA: bez vážnějších onemocnění, občasné migrény. GA: 1x porod.", + "medication_text": "FA: Magnosolv při migréně." + } + } + """ + + print("--- Testování ai_engine_chain ---") + + # Pro spuštění tohoto testu je potřeba mít nastavené proměnné prostředí pro Azure OpenAI + # (AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_CHAT_DEPLOYMENT_NAME) + # a také nainstalované potřebné knihovny (langchain, langchain-openai, pydantic, python-dotenv). + + # Pokud nemáte nastavené Azure OpenAI, můžete testovat části chainu samostatně + # nebo mockovat LLM. Pro jednoduchost zde předpokládáme, že je LLM dostupné. + + try: + final_interpretation = ai_engine_chain.invoke({"raw_json_input": sample_raw_json_input}) + print("\n--- Finální Interpretace ---") + print(final_interpretation) + except Exception as e: + print(f"\nChyba při spouštění ai_engine_chain: {e}") + print("Ujistěte se, že máte správně nastavené proměnné prostředí pro Azure OpenAI.") + print("Např. AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_CHAT_DEPLOYMENT_NAME.") + + # Příklad testování dílčí části (např. normalizace a predikce) + print("\n--- Testování dílčí části (normalizace + predikce) ---") + partial_chain = chain_normalize | chain_predict + try: + intermediate_output = partial_chain.invoke({"raw_json_input": sample_raw_json_input}) + print(json.dumps(intermediate_output, indent=2, ensure_ascii=False)) + except Exception as e: + print(f"Chyba při spouštění dílčího řetězce: {e}") + + print("\n--- Testování dílčí části (normalizace + predikce + RAG) ---") + partial_chain_rag = chain_normalize | chain_predict | chain_rag + try: + intermediate_output_rag = partial_chain_rag.invoke({"raw_json_input": sample_raw_json_input}) + print(json.dumps(intermediate_output_rag, indent=2, ensure_ascii=False)) + except Exception as e: + print(f"Chyba při spouštění dílčího řetězce s RAG: {e}") + + print("\n--- Testování přípravy vstupu pro LLM ---") + # Krok 1, 2, 3 + processed_data_for_llm_prep = (chain_normalize | chain_predict | chain_rag).invoke({"raw_json_input": sample_raw_json_input}) + # Krok 4a - příprava vstupu + llm_input_data = prepare_llm_input(processed_data_for_llm_prep) + print(json.dumps(llm_input_data, indent=2, ensure_ascii=False)) + + # Test get_specific_instructions (samostatně) + print("\n--- Testování get_specific_instructions ---") + normalized_sample = normalize_lab_data_func(sample_raw_json_input) + instr = get_specific_instructions( + normalized_sample.get("evaluation_method"), + normalized_sample.get("patient_metadata"), + normalized_sample.get("current_lab_results") + ) + print(instr) + + instr_nehrazeny_normal = get_specific_instructions( + "NEHRAZENY_POPIS_NORMAL", + {"age": 30, "gender": "muz"}, + [{"parameter_name": "Cholesterol", "value": "5.0", "interpretation_status": "NORMAL"}] + ) + print(f"\nNehrazený normální:\n{instr_nehrazeny_normal}") + + instr_b1 = get_specific_instructions( + "HRAZENY_POPIS_INDIVIDUALNI_TEST", + {"age": 50, "gender": "žena"}, + [{"parameter_name": "S_Glukóza", "value": "8.0", "interpretation_status": "HIGH"}] + ) + print(f"\nHrazený B1:\n{instr_b1}") + + instr_b2_psa = get_specific_instructions( + "HRAZENY_POPIS_BALICEK_PSA_MUZ", + {"age": 65, "gender": "muz"}, + [{"parameter_name": "S_PSA", "value": "4.5", "interpretation_status": "HIGH"}] + ) + print(f"\nHrazený B2 (PSA):\n{instr_b2_psa}") + + pass diff --git a/src/ai_engine/core/llm.py b/src/ai_engine/core/llm.py new file mode 100644 index 0000000..faf4e13 --- /dev/null +++ b/src/ai_engine/core/llm.py @@ -0,0 +1,49 @@ +# src/ai_engine/core/llm.py +import os +from langchain_openai import AzureChatOpenAI +from dotenv import load_dotenv + +# Načtení proměnných prostředí (pokud používáte .env soubor lokálně) +load_dotenv() + +# Konfigurace Azure OpenAI LLM +# Tyto hodnoty by měly být bezpečně spravovány, např. pomocí Azure Key Vault v produkci +# Pro lokální vývoj mohou být v .env souboru nebo přímo nastaveny jako proměnné prostředí + +AZURE_OPENAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01") # Doporučeno použít nejnovější stabilní verzi +AZURE_OPENAI_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") # Např. https://.openai.azure.com/ +AZURE_OPENAI_API_KEY = os.getenv("AZURE_OPENAI_API_KEY") +AZURE_OPENAI_CHAT_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME") # Název vašeho nasazení modelu GPT (např. gpt-4o) + +def get_llm(temperature: float = 0.1, max_tokens: int = 2000): + """ + Vrací instanci AzureChatOpenAI nakonfigurovanou pro projekt. + """ + if not all([AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_CHAT_DEPLOYMENT_NAME]): + raise ValueError("Chybí jedna nebo více konfiguračních proměnných pro Azure OpenAI: " + "AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_CHAT_DEPLOYMENT_NAME") + + llm = AzureChatOpenAI( + azure_deployment=AZURE_OPENAI_CHAT_DEPLOYMENT_NAME, + openai_api_version=AZURE_OPENAI_API_VERSION, + azure_endpoint=AZURE_OPENAI_ENDPOINT, + api_key=AZURE_OPENAI_API_KEY, + temperature=temperature, + max_tokens=max_tokens, + # model_kwargs={"response_format": {"type": "json_object"}}, # Pokud byste chtěli JSON output, ale pro textovou interpretaci to není nutné + ) + return llm + +if __name__ == '__main__': + # Příklad použití a test připojení (vyžaduje nastavené proměnné prostředí) + try: + llm_instance = get_llm() + print("Úspěšně vytvořena instance AzureChatOpenAI.") + # Příklad jednoduchého volání (může vyžadovat prompt) + # from langchain_core.messages import HumanMessage + # response = llm_instance.invoke([HumanMessage(content="Ahoj, jak se máš?")]) + # print("Odpověď od LLM:", response.content) + except ValueError as e: + print(f"Chyba při inicializaci LLM: {e}") + except Exception as e: + print(f"Neočekávaná chyba: {e}") diff --git a/src/ai_engine/core/prompts.py b/src/ai_engine/core/prompts.py new file mode 100644 index 0000000..d12b71d --- /dev/null +++ b/src/ai_engine/core/prompts.py @@ -0,0 +1,165 @@ +# src/ai_engine/core/prompts.py +from langchain_core.prompts import ChatPromptTemplate + +# System message definuje roli a základní instrukce pro LLM +SYSTEM_MESSAGE = """Jsi zkušený lékařský diagnostik specializující se na interpretaci laboratorních výsledků. +Tvým úkolem je analyzovat poskytnutá laboratorní data, zohlednit kontext pacienta (pokud je dostupný) a relevantní klinické směrnice. +Generuj srozumitelné, fakticky správné a klinicky relevantní interpretace určené pro vložení do bloku "ME-PopisyNaVL" v lékařské zprávě. +Nikdy si nevymýšlej hodnoty ani fakta, která nejsou podložena v poskytnutých datech nebo znalostní bázi. +Neposkytuj diagnózu, ale pouze interpretaci výsledků a doporučení na základě dat a směrnic. +Dodržuj striktně formát a typ požadovaného popisu (nehrazený, hrazený individuální, hrazený balíčkový). +""" + +# Human message obsahuje dynamické části, které budou naplněny konkrétními daty +HUMAN_MESSAGE_TEMPLATE = """Analyzuj následující informace a vygeneruj laboratorní interpretaci. + +**Typ požadované interpretace:** {evaluation_method} + +**Informace o pacientovi (pokud relevantní a dostupné):** +Věk: {patient_age} +Pohlaví: {patient_gender} +Anamnéza a medikace: {anamnesis_and_medication} +Diagnózy: {diagnoses} +Poznámky lékaře: {doctor_description} +Poznámky k žádance/metodám: {memo_to_request} + +**Aktuální laboratorní výsledky:** +{current_lab_results_formatted} + +**Kontext z klinických směrnic (RAG):** +{rag_context} + +**Výstupy z prediktivních modelů (pokud relevantní):** +{predictive_outputs} + +**Specifické instrukce pro typ '{evaluation_method}':** +{specific_instructions} + +Poskytni strukturovanou a srozumitelnou interpretaci. Zaměř se na: +1. Identifikaci a hodnocení abnormálních hodnot (včetně závažnosti). +2. Vysvětlení klinického významu jednotlivých metod a jejich patologií. +3. Popis vzájemných vztahů mezi výsledky a jejich relevanci k možným onemocněním nebo stavům. +4. Zohlednění informací z dotazníku klienta (pokud jsou dostupné a relevantní). +5. Formulaci jasných doporučení pro další postup (např. konzultace s lékařem, doplňující vyšetření). +6. Pro výjimky (HCG, PSA, KO, M+S) vždy generuj popis dle specifikace, i když jsou hodnoty v normě. +7. Pro kumulativní hodnocení (HCG, PSA, hrazené popisy) zohledni historická data, pokud jsou dostupná (v této fázi simulováno nebo naznačeno v `rag_context` či `predictive_outputs`). + +Výsledná interpretace by měla být přímo použitelná pro vložení do bloku "ME-PopisyNaVL". +""" + +interpretation_prompt = ChatPromptTemplate.from_messages([ + ("system", SYSTEM_MESSAGE), + ("human", HUMAN_MESSAGE_TEMPLATE) +]) + +# Doplňkové funkce pro formátování vstupů pro prompt +def format_lab_results_for_prompt(lab_results: list[dict]) -> str: + if not lab_results: + return "Žádné laboratorní výsledky nebyly poskytnuty." + + formatted_str = "" + for result in lab_results: + line = f"- {result.get('parameter_name', 'N/A')} ({result.get('parameter_code', 'N/A')}): {result.get('value', 'N/A')} {result.get('unit', '')}" + line += f" (Ref. rozsah: {result.get('reference_range_raw', 'N/A')})" + line += f" - Interpretace: {result.get('interpretation_status', 'N/A')}" + if result.get('raw_dasta_skala'): + line += f" (DASTA škála: {result.get('raw_dasta_skala')})" + formatted_str += line + "\n" + return formatted_str + +def get_specific_instructions(evaluation_method: str, patient_data: dict, lab_results: list[dict]) -> str: + """ + Generuje specifické instrukce pro LLM na základě typu požadovaného popisu. + """ + # TODO: Rozšířit logiku pro různé typy popisů (nehrazený, B1, B2) a výjimky. + # Tato funkce by měla dynamicky sestavit text instrukcí. + # Například, pro nehrazený popis: "Generuj popis pouze v případě klinicky významné abnormity, jinak vrať 'Všechny výsledky v normě.'" + # Pro HCG: "Vždy generuj popis pro HCG, zohledni možné stavy (gravidita, nerozvíjející se gravidita)." + # Pro PSA: "Vždy generuj popis pro PSA, zohledni věk pacienta a jeho stav." + # Pro KO a M+S: "Vždy generuj popis, i když jsou výsledky v normě (např. 'Krevní obraz v normě.')." + + instructions = [] + if "NEHRAZENY" in evaluation_method.upper(): + instructions.append("Toto je NEHRAZENÝ popis. Generuj plný popis POUZE v případě záchytu klinicky významné abnormity (mimo explicitní výjimky HCG, PSA, KO, M+S). Pokud jsou všechny hodnoty (mimo výjimek) v normě, uveď pouze stručné konstatování, např. 'Všechny sledované parametry jsou v referenčním rozmezí.'") + # Detekce abnormit pro nehrazený popis + has_abnormality = any(res.get("interpretation_status") not in ["NORMAL", "N/A"] for res in lab_results if res.get("parameter_code") not in ["HCG_CODE", "PSA_CODE", "KO_GROUP_CODE", "MS_GROUP_CODE"]) # Nahradit skutečnými kódy + # if not has_abnormality and not any(code in evaluation_method for code in ["HCG", "PSA", "KO", "MS"]): # Zjednodušená kontrola + # return "NEHRAZENÝ POPIS: Všechny sledované parametry (mimo explicitních výjimek) jsou v referenčním rozmezí. Popis se negeneruje." + + # Výjimky, které se vždy popisují + # TODO: Identifikovat HCG, PSA, KO, M+S na základě parameter_code nebo evaluation_method + if "HCG" in evaluation_method.upper() or any(res.get("parameter_name", "").upper() == "HCG" for res in lab_results): + instructions.append("HCG: Vždy generuj popis – 'svědčí / nesvědčí pro graviditu', nebo 'svědčí pro nerozvíjející se graviditu'. Zohledni kumulativní hodnocení, pokud jsou dostupná historická data.") + if "PSA" in evaluation_method.upper() or any(res.get("parameter_name", "").upper() == "PSA" for res in lab_results): + instructions.append("PSA: Vždy generuj popis. Vyjádři se k výsledku vzhledem k věku pacienta a jeho stavu. Zohledni kumulativní hodnocení.") + if "KO" in evaluation_method.upper() or any("KREVNÍ OBRAZ" in res.get("parameter_name", "").upper() for res in lab_results): # Zjednodušená detekce + instructions.append("KREVNÍ OBRAZ (KO): Vždy doplň text, i když je v normě (např. 'Krevní obraz v normě.').") + if "M+S" in evaluation_method.upper() or any("MOČ + SEDIMENT" in res.get("parameter_name", "").upper() for res in lab_results): # Zjednodušená detekce + instructions.append("MOČ + SEDIMENT (M+S): Vždy doplň text, i když je v normě (např. 'Moč a sediment v normě, bez známek močové infekce.').") + + if "HRAZENY_POPIS_INDIVIDUALNI" in evaluation_method.upper(): # B1 + instructions.append("Toto je HRAZENÝ INDIVIDUÁLNÍ popis (B1):") + instructions.append("- Vždy popiš VŠECHNY metody s patologií (hodnocení, závažnost, vysvětlení, vztah).") + instructions.append("- Zohledni informace z dotazníku klienta (pokud jsou strukturovaně dostupné).") + instructions.append("- Proveď kumulativní hodnocení DB klienta (celý historický profil).") + instructions.append("- Stručně zhodnoť i metody v NORMĚ s jejich klinickým významem.") + instructions.append("- Stanov jednoznačnou 'laboratorní Diagnózu'.") + instructions.append("- Poskytni detailní doporučení.") + + if "HRAZENY_POPIS_BALICEK" in evaluation_method.upper(): # B2 + instructions.append("Toto je HRAZENÝ BALÍČKOVÝ popis (B2) - NEJPODROBNĚJŠÍ:") + instructions.append("- Vždy popiš VŠECHNY vyšetřené metody v balíčku.") + instructions.append("- U metod v referenčním rozmezí (RM) uveď nejen 'v normě', ale také např. 'nesvědčí pro…'.") + instructions.append("- U metod s patologií zahrň detailní hodnocení trendu patologie (pokud jsou data).") + instructions.append("- Detailně vysvětli metody/skupiny metod a jejich klinické vazby.") + instructions.append("- Popiš vztah patologií k uvažovaným onemocněním, vliv interferencí, preanalytické fáze, trendy.") + instructions.append("- Zohledni informace z dotazníku klienta.") + instructions.append("- Proveď kumulativní hodnocení DB klienta (včetně jiných metod v historii).") + instructions.append("- Hodnoť rizikové faktory (AS, OP, DM atd.).") + instructions.append("- Stanov 'laboratorní Diagnózu' i ve vztahu k terapii.") + instructions.append("- Poskytni VELMI PODROBNÁ doporučení (lékař, specialista, životospráva, dieta, pohyb, suplementace).") + + if not instructions: + return "Nebyly poskytnuty žádné specifické instrukce pro tento typ vyhodnocení. Postupuj podle obecných pokynů." + + return "\n".join(instructions) + +if __name__ == '__main__': + # Příklad použití + sample_patient_data = { + "gender": "muz", + "age": 45, + "historical_data_access_key": "patient_ID_hash_pro_historii" + } + sample_lab_results = [ + { "parameter_code": "01001", "parameter_name": "S_CRP", "value": "35.0", "unit": "mg/L", "reference_range_raw": "<5", "interpretation_status": "HIGH", "raw_dasta_skala": "| | ||" } + ] + sample_dasta = { + "doctor_description": "Pacient si stěžuje na únavu.", + "memo_to_request": "Prosím o zrychlené vyhodnocení.", + "memo_to_method_01001": None + } + sample_diagnoses = ["hypercholesterolemie"] + sample_anamnesis = {"anamnesis_text": "OA: Art. hypertenze", "medication_text": "FA: Ylpio"} + + formatted_results = format_lab_results_for_prompt(sample_lab_results) + specific_instr = get_specific_instructions("NEHRAZENY_POPIS_ABNORMITA", sample_patient_data, sample_lab_results) + + prompt_input = { + "evaluation_method": "NEHRAZENY_POPIS_ABNORMITA", + "patient_age": sample_patient_data.get("age"), + "patient_gender": sample_patient_data.get("gender"), + "anamnesis_and_medication": f"Anamnéza: {sample_anamnesis.get('anamnesis_text')}, Medikace: {sample_anamnesis.get('medication_text')}", + "diagnoses": ", ".join(sample_diagnoses) if sample_diagnoses else "Nezadáno", + "doctor_description": sample_dasta.get("doctor_description") or "Nezadáno", + "memo_to_request": sample_dasta.get("memo_to_request") or "Nezadáno", + "current_lab_results_formatted": formatted_results, + "rag_context": "Pro CRP > 10 mg/L u dospělých zvažte bakteriální infekci nebo jiný zánětlivý stav.", + "predictive_outputs": "Nezadáno", + "specific_instructions": specific_instr + } + + # Vygenerování promptu + # compiled_prompt = interpretation_prompt.invoke(prompt_input) + # print(compiled_prompt.to_string()) + pass diff --git a/src/ai_engine/main.py b/src/ai_engine/main.py new file mode 100644 index 0000000..0a49f8c --- /dev/null +++ b/src/ai_engine/main.py @@ -0,0 +1,109 @@ +# src/ai_engine/main.py +import json +from .core.chains import ai_engine_chain # Importujeme hlavní chain + +def get_lab_interpretation(raw_json_input_string: str) -> str: + """ + Hlavní funkce pro získání interpretace laboratorních výsledků. + Přijímá surový JSON string a vrací textovou interpretaci. + """ + if not isinstance(raw_json_input_string, str): + return json.dumps({ + "error": "Vstup musí být JSON string.", + "status_code": 400 + }) + + try: + # Ověření, zda je vstup validní JSON (i když normalizer to také dělá) + json.loads(raw_json_input_string) + except json.JSONDecodeError as e: + return json.dumps({ + "error": f"Nevalidní JSON vstup: {str(e)}", + "status_code": 400 + }) + + try: + # Spuštění hlavního LangChain řetězce + # Vstup pro řetězec je slovník s klíčem 'raw_json_input' + result_interpretation = ai_engine_chain.invoke({"raw_json_input": raw_json_input_string}) + return result_interpretation # Toto by měl být finální textový výstup od LLM + except Exception as e: + # Logování chyby by zde bylo vhodné v produkčním kódu + print(f"Došlo k chybě v AI enginu: {e}") # Základní logování na konzoli + # Vrátíme strukturovanou chybovou odpověď, kterou může API vrstva dále zpracovat + # V produkci by se neměly vracet detailní interní chyby koncovému uživateli přímo. + return json.dumps({ + "error": "Došlo k interní chybě při generování interpretace.", + "detail": str(e), # Pro debug, v produkci opatrně + "status_code": 500 + }) + +if __name__ == '__main__': + # Příklad použití z příkazové řádky nebo pro jednoduché testování + sample_input_json_string = """ + { + "request_id": "GUID_MAIN_TEST_001", + "evaluation_method": "NEHRAZENY_POPIS_ABNORMITA", + "patient_metadata": { + "gender": "muz", + "age": 52 + }, + "current_lab_results": [ + { + "parameter_code": "CHOL", + "parameter_name": "S_Cholesterol", + "value": "7.2", + "unit": "mmol/L", + "reference_range_raw": "<5.2", + "interpretation_status": "HIGH" + }, + { + "parameter_code": "TRIG", + "parameter_name": "S_Triacylglyceroly", + "value": "2.5", + "unit": "mmol/L", + "reference_range_raw": "<1.7", + "interpretation_status": "HIGH" + } + ], + "dasta_text_sections": {}, + "diagnoses": ["Esenciální hypertenze"], + "anamnesis_and_medication": { + "anamnesis_text": "Kuřák, stres v práci.", + "medication_text": "Concor COR" + } + } + """ + print("--- Volání get_lab_interpretation ---") + + # Pro spuštění tohoto testu je potřeba mít nastavené proměnné prostředí pro Azure OpenAI + # (AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_CHAT_DEPLOYMENT_NAME) + # a také nainstalované potřebné knihovny. + + # Načtení .env, pokud existuje (pro lokální testování, pokud spouštíte tento soubor přímo) + from dotenv import load_dotenv + load_dotenv() + + interpretation = get_lab_interpretation(sample_input_json_string) + + print("\n--- Výsledná interpretace (z main.py) ---") + # Výstup může být buď text interpretace nebo JSON s chybou + try: + # Zkusíme parsovat jako JSON, pokud je to chyba + error_data = json.loads(interpretation) + if "error" in error_data: + print(json.dumps(error_data, indent=2, ensure_ascii=False)) + else: # Pokud to není JSON chyba, je to text interpretace + print(interpretation) + except json.JSONDecodeError: + # Pokud to není validní JSON, je to přímo text interpretace + print(interpretation) + + print("\n--- Test s nevalidním JSON vstupem ---") + invalid_json = '{"key": "value", "unterminated_string: "test' + error_interpretation = get_lab_interpretation(invalid_json) + print(json.dumps(json.loads(error_interpretation), indent=2, ensure_ascii=False)) + + print("\n--- Test s None vstupem ---") + none_interpretation = get_lab_interpretation(None) + print(json.dumps(json.loads(none_interpretation), indent=2, ensure_ascii=False)) diff --git a/src/ai_engine/tools/__init__.py b/src/ai_engine/tools/__init__.py new file mode 100644 index 0000000..78fceb3 --- /dev/null +++ b/src/ai_engine/tools/__init__.py @@ -0,0 +1,13 @@ +# src/ai_engine/tools/__init__.py +from .lab_data_normalizer import LabDataNormalizerTool, normalize_lab_data_func +from .predictive_analysis import PredictiveAnalysisTool, run_predictive_analysis_func +from .rag_retrieval import RAGRetrievalTool, retrieve_clinical_guidelines_func + +__all__ = [ + "LabDataNormalizerTool", + "normalize_lab_data_func", + "PredictiveAnalysisTool", + "run_predictive_analysis_func", + "RAGRetrievalTool", + "retrieve_clinical_guidelines_func" +] diff --git a/src/ai_engine/tools/lab_data_normalizer.py b/src/ai_engine/tools/lab_data_normalizer.py new file mode 100644 index 0000000..bd9f017 --- /dev/null +++ b/src/ai_engine/tools/lab_data_normalizer.py @@ -0,0 +1,88 @@ +# src/ai_engine/tools/lab_data_normalizer.py +import json +from typing import Type, Dict, Any +from langchain_core.tools import BaseTool, tool +from pydantic import BaseModel, Field + +class LabDataNormalizerInput(BaseModel): + raw_json_data: str = Field(description="Surová laboratorní data ve formátu JSON string, jak byla přijata z OpenLIMS.") + +@tool("lab_data_normalizer_tool", args_schema=LabDataNormalizerInput, return_direct=False) +def normalize_lab_data_func(raw_json_data: str) -> Dict[str, Any]: + """ + Normalizuje a validuje surová laboratorní data z JSON vstupu. + Převede vstupní JSON string do standardizovaného Python slovníku. + V této fázi provádí základní parsování JSON. + Rozšíření mohou zahrnovat detailnější validaci struktury, kontrolu typů, + převod jednotek, doplnění referenčních rozsahů dle věku/pohlaví atd. + """ + try: + normalized_data = json.loads(raw_json_data) + except json.JSONDecodeError as e: + return {"error": f"Chyba při parsování JSON: {str(e)}", "original_data": raw_json_data} + + # Příklad jednoduché validace nebo transformace (lze rozšířit) + if not isinstance(normalized_data, dict): + return {"error": "Očekáván JSON objekt (slovník).", "parsed_data": normalized_data} + + # Můžeme přidat kontrolu povinných polí, pokud je to nutné + # např. if "request_id" not in normalized_data: + # return {"error": "Chybí povinné pole 'request_id'.", "data": normalized_data} + + # Prozatím vrací parsovaná data; v budoucnu zde bude více logiky + return normalized_data + +# Alternativní způsob definice nástroje jako třídy, pokud preferujete OOP přístup +class LabDataNormalizerTool(BaseTool): + name: str = "lab_data_normalizer_tool" + description: str = ( + "Normalizuje a validuje surová laboratorní data z JSON vstupu. " + "Převede vstupní JSON string do standardizovaného Python slovníku. " + "Použij tento nástroj jako první krok pro zpracování vstupních dat z OpenLIMS." + ) + args_schema: Type[BaseModel] = LabDataNormalizerInput + return_direct: bool = False # Agent rozhodne, co dál + + def _run(self, raw_json_data: str) -> Dict[str, Any]: + return normalize_lab_data_func(raw_json_data) + + async def _arun(self, raw_json_data: str) -> Dict[str, Any]: + # Pro asynchronní operace, pokud by byly potřeba + # V tomto případě můžeme volat synchronní verzi + return self._run(raw_json_data) + +if __name__ == '__main__': + # Příklad použití + sample_raw_json = """ + { + "request_id": "GUID12345", + "evaluation_method": "NEHRAZENY_POPIS_PSA", + "patient_metadata": { + "gender": "muz", + "age": 45 + }, + "current_lab_results": [ + { + "parameter_code": "01001", + "parameter_name": "S_CRP", + "value": "35.0", + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "HIGH" + } + ] + } + """ + + # Použití funkce přímo + result_func = normalize_lab_data_func(sample_raw_json) + print("Výsledek z funkce:", result_func) + + # Použití nástroje + tool_instance = LabDataNormalizerTool() + result_tool = tool_instance.invoke({"raw_json_data": sample_raw_json}) + print("Výsledek z nástroje:", result_tool) + + error_json = '{"key": "value", "unterminated_string: "test' + result_error = normalize_lab_data_func(error_json) + print("Výsledek chyby:", result_error) diff --git a/src/ai_engine/tools/predictive_analysis.py b/src/ai_engine/tools/predictive_analysis.py new file mode 100644 index 0000000..736de81 --- /dev/null +++ b/src/ai_engine/tools/predictive_analysis.py @@ -0,0 +1,143 @@ +# src/ai_engine/tools/predictive_analysis.py +from typing import Type, Dict, Any, List +from langchain_core.tools import BaseTool, tool +from pydantic import BaseModel, Field + +class PredictiveAnalysisInput(BaseModel): + normalized_lab_data: Dict[str, Any] = Field(description="Normalizovaná a validovaná laboratorní data ve formátu Python slovníku, výstup z LabDataNormalizerTool.") + # Mohou zde být další relevantní vstupy, např. patient_history_summary + +@tool("predictive_analysis_tool", args_schema=PredictiveAnalysisInput, return_direct=False) +def run_predictive_analysis_func(normalized_lab_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Placeholder pro nástroj spouštějící prediktivní AI modely. + V této fázi vrací pouze mockovaná (ukázková) data, protože skutečné + prediktivní modely zatím nejsou implementovány ani specifikovány. + Identifikuje potenciální patologické stavy, rizikové faktory nebo anomálie + na základě kombinace parametrů v `normalized_lab_data`. + """ + + # Příklad: Extrakce relevantních hodnot pro "predikci" + # Toto je velmi zjednodušené a pouze pro ilustraci + identified_risks: List[str] = [] + potential_diagnoses: List[str] = [] + + # Příklad jednoduché logiky na základě CRP (C-reaktivní protein) + # Skutečná logika by byla mnohem komplexnější a založená na trénovaných modelech + for result in normalized_lab_data.get("current_lab_results", []): + param_name = result.get("parameter_name", "").lower() + value_str = result.get("value", "") + interpretation = result.get("interpretation_status", "").upper() + + if "crp" in param_name: + try: + value = float(value_str) + if interpretation == "HIGH": + if value > 100: + identified_risks.append("Vysoké riziko závažné bakteriální infekce nebo rozsáhlého zánětu (na základě CRP).") + potential_diagnoses.append("Možná sepse / závažná infekce (na základě CRP).") + elif value > 10: + identified_risks.append("Zvýšené riziko zánětlivého onemocnění (na základě CRP).") + potential_diagnoses.append("Možný zánětlivý proces (na základě CRP).") + except ValueError: + # Hodnota není číslo, ignorovat pro tuto jednoduchou logiku + pass + + # Příklad pro glukózu + if "glukóza" in param_name or "glucose" in param_name: + try: + value = float(value_str) + if interpretation == "HIGH": + if value > 11.1: # Hodnota typická pro diabetes + identified_risks.append("Vysoké riziko Diabetes Mellitus (na základě glykémie).") + potential_diagnoses.append("Pravděpodobný Diabetes Mellitus (na základě glykémie).") + elif value > 7.0: # Hraniční hodnota + identified_risks.append("Zvýšené riziko poruchy glukózové tolerance nebo prediabetes (na základě glykémie).") + except ValueError: + pass + + + if not identified_risks and not potential_diagnoses: + output = { + "status": "No significant risks or conditions identified by predictive models.", + "identified_risks": [], + "potential_diagnoses_based_on_models": [], + "model_version": "mock_v0.1" + } + else: + output = { + "status": "Potential risks or conditions identified.", + "identified_risks": identified_risks, + "potential_diagnoses_based_on_models": potential_diagnoses, + "model_version": "mock_v0.1" + } + + # V reálném scénáři by zde bylo volání interního API prediktivních modelů + # nebo přímé spuštění modelů (např. scikit-learn, TensorFlow, PyTorch). + # Výstup by měl být strukturovaný, např. seznam identifikovaných rizik, + # pravděpodobnosti onemocnění, doporučení pro další testy atd. + + return output + +# Alternativní třídní implementace (pro konzistenci s předchozím, ale lze použít jen @tool) +class PredictiveAnalysisTool(BaseTool): + name: str = "predictive_analysis_tool" + description: str = ( + "Spouští (aktuálně mockované) prediktivní AI modely na normalizovaných laboratorních datech. " + "Identifikuje potenciální patologické stavy, rizikové faktory nebo anomálie. " + "Použij tento nástroj po normalizaci dat, pokud chceš získat dodatečný vhled z prediktivních modelů." + ) + args_schema: Type[BaseModel] = PredictiveAnalysisInput + return_direct: bool = False + + def _run(self, normalized_lab_data: Dict[str, Any]) -> Dict[str, Any]: + return run_predictive_analysis_func(normalized_lab_data) + + async def _arun(self, normalized_lab_data: Dict[str, Any]) -> Dict[str, Any]: + return self._run(normalized_lab_data) + +if __name__ == '__main__': + sample_data_normalized = { + "request_id": "GUID12345", + "evaluation_method": "NEHRAZENY_POPIS_PSA", + "patient_metadata": { + "gender": "muz", + "age": 45 + }, + "current_lab_results": [ + { + "parameter_code": "01001", + "parameter_name": "S_CRP", + "value": "35.0", # Středně zvýšené CRP + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "HIGH" + }, + { + "parameter_code": "GLUC", + "parameter_name": "S_Glukóza", + "value": "12.5", # Vysoká glukóza + "unit": "mmol/L", + "reference_range_raw": "3.9-5.6", + "interpretation_status": "HIGH" + } + ] + } + + # Použití funkce + predictions_func = run_predictive_analysis_func(sample_data_normalized) + print("Výstup z prediktivní analýzy (funkce):", json.dumps(predictions_func, indent=2, ensure_ascii=False)) + + # Použití nástroje + tool_instance_pred = PredictiveAnalysisTool() + predictions_tool = tool_instance_pred.invoke({"normalized_lab_data": sample_data_normalized}) + print("Výstup z prediktivní analýzy (nástroj):", json.dumps(predictions_tool, indent=2, ensure_ascii=False)) + + sample_data_normal = { + "current_lab_results": [ + {"parameter_name": "S_CRP", "value": "2.0", "interpretation_status": "NORMAL"}, + {"parameter_name": "S_Glukóza", "value": "5.0", "interpretation_status": "NORMAL"} + ] + } + predictions_normal = run_predictive_analysis_func(sample_data_normal) + print("Výstup (normální hodnoty):", json.dumps(predictions_normal, indent=2, ensure_ascii=False)) diff --git a/src/ai_engine/tools/rag_retrieval.py b/src/ai_engine/tools/rag_retrieval.py new file mode 100644 index 0000000..3080b6d --- /dev/null +++ b/src/ai_engine/tools/rag_retrieval.py @@ -0,0 +1,158 @@ +# src/ai_engine/tools/rag_retrieval.py +import os +import json +from typing import Type, Dict, Any, List +from langchain_core.tools import BaseTool, tool +from pydantic import BaseModel, Field +from dotenv import load_dotenv + +from langchain_openai import AzureOpenAIEmbeddings +from azure.core.credentials import AzureKeyCredential +from azure.search.documents import SearchClient +from azure.search.documents.models import VectorizedQuery + +# Načtení proměnných prostředí +load_dotenv() + +# Konfigurace pro Azure AI Search a Embeddings (měly by být již načteny díky load_dotenv) +AZURE_AI_SEARCH_ENDPOINT = os.getenv("AZURE_AI_SEARCH_ENDPOINT") +AZURE_AI_SEARCH_API_KEY = os.getenv("AZURE_AI_SEARCH_QUERY_KEY", os.getenv("AZURE_AI_SEARCH_ADMIN_KEY")) # Preferujeme query key, pokud je definován +AZURE_AI_SEARCH_INDEX_NAME = os.getenv("AZURE_AI_SEARCH_INDEX_NAME", "staprolab-knowledgebase-index") + +AZURE_OPENAI_API_VERSION_EMBEDDINGS = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01") +AZURE_OPENAI_ENDPOINT_EMBEDDINGS = os.getenv("AZURE_OPENAI_ENDPOINT") +AZURE_OPENAI_API_KEY_EMBEDDINGS = os.getenv("AZURE_OPENAI_API_KEY") +AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", "textembed") + +# Globální instance pro klienty, aby se neinicializovaly při každém volání +# Toto je zjednodušení; v produkční aplikaci by se správa klientů řešila robustněji (např. dependency injection) +_search_client_instance = None +_embedding_model_instance = None + +def get_embedding_model_instance() -> AzureOpenAIEmbeddings: + global _embedding_model_instance + if _embedding_model_instance is None: + if not all([AZURE_OPENAI_ENDPOINT_EMBEDDINGS, AZURE_OPENAI_API_KEY_EMBEDDINGS, AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME]): + raise ValueError("Chybí konfigurace pro Azure OpenAI Embeddings v RAG tool.") + _embedding_model_instance = AzureOpenAIEmbeddings( + azure_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME, + openai_api_version=AZURE_OPENAI_API_VERSION_EMBEDDINGS, + azure_endpoint=AZURE_OPENAI_ENDPOINT_EMBEDDINGS, + api_key=AZURE_OPENAI_API_KEY_EMBEDDINGS, + chunk_size=16 # Výchozí pro Langchain klienta, lze upravit + ) + return _embedding_model_instance + +def get_search_client_instance() -> SearchClient: + global _search_client_instance + if _search_client_instance is None: + if not all([AZURE_AI_SEARCH_ENDPOINT, AZURE_AI_SEARCH_API_KEY, AZURE_AI_SEARCH_INDEX_NAME]): + raise ValueError("Chybí konfigurace pro Azure AI Search v RAG tool.") + _search_client_instance = SearchClient( + endpoint=AZURE_AI_SEARCH_ENDPOINT, + index_name=AZURE_AI_SEARCH_INDEX_NAME, + credential=AzureKeyCredential(AZURE_AI_SEARCH_API_KEY) + ) + return _search_client_instance + + +class RAGInput(BaseModel): + query: str = Field(description="Dotaz nebo klíčová slova pro vyhledávání v znalostní bázi (např. název parametru, identifikované riziko).") + top_k: int = Field(default=3, description="Počet nejrelevantnějších dokumentů k vrácení.") + +@tool("rag_retrieval_tool", args_schema=RAGInput, return_direct=False) +def retrieve_clinical_guidelines_func(query: str, top_k: int = 3) -> str: + """ + Vyhledává relevantní klinické směrnice a odborné informace z vektorové databáze + (Azure AI Search) na základě zadaného dotazu. + Nejprve převede dotaz na embedding a poté provede vektorové vyhledávání. + Vrací spojený text nalezených dokumentů. + """ + try: + embedding_model = get_embedding_model_instance() + search_client = get_search_client_instance() + except ValueError as e: + print(f"Chyba inicializace klientů v RAG tool: {e}") + return f"Chyba konfigurace RAG: {e}. Zkontrolujte proměnné prostředí." + + if not query: + return "Nebyl poskytnut žádný dotaz pro RAG." + + try: + print(f"RAG Tool: Přijat dotaz: '{query}', top_k: {top_k}") + query_embedding = embedding_model.embed_query(query) + print(f"RAG Tool: Dotaz převeden na embedding (dim: {len(query_embedding)}).") + + vector_query = VectorizedQuery( + vector=query_embedding, + k_nearest_neighbors=top_k, + fields="content_vector" # Pole obsahující vektory + ) + + results = search_client.search( + search_text=None, # Můžeme kombinovat s full-text: query, ale pro čistý RAG stačí vektorové + vector_queries=[vector_query], + select=["source", "content", "start_index"], # Pole, která chceme vrátit + # query_type="semantic", # Pro sémantické reranking, pokud je index takto konfigurován + # semantic_configuration_name="my-semantic-config", + top=top_k + ) + + found_docs_content = [] + print(f"RAG Tool: Nalezeno výsledků (před zpracováním):") + for i, result in enumerate(results): + # Logování detailů každého výsledku + # print(f" Výsledek {i+1}: ID={result.get('id', 'N/A')}, Zdroj={result.get('source', 'N/A')}, Skóre={result.get('@search.score', 'N/A')}") + # print(f" Obsah (část): {result.get('content', '')[:100]}...") + + # Přidání obsahu dokumentu do seznamu + # Můžeme přidat i metadata, pokud chceme, aby LLM viděl např. zdroj + doc_info = f"Zdroj: {result.get('source', 'N/A')}" + # doc_info += f" (Pozice v dokumentu: {result.get('start_index', 'N/A')})" # Volitelné + doc_info += f"\nObsah: {result.get('content', '')}" + found_docs_content.append(doc_info) + + if not found_docs_content: + print(f"RAG Tool: Pro dotaz '{query}' nebyly nalezeny žádné relevantní dokumenty v Azure AI Search.") + return f"Pro dotaz '{query}' nebyly ve znalostní bázi nalezeny žádné specifické informace." + + print(f"RAG Tool: Počet nalezených a formátovaných dokumentů: {len(found_docs_content)}") + return "\n\n---\n\n".join(found_docs_content) # Oddělení dokumentů + + except Exception as e: + print(f"Chyba při provádění RAG vyhledávání: {e}") + import traceback + traceback.print_exc() + return f"Došlo k chybě při vyhledávání ve znalostní bázi: {str(e)}" + +class RAGRetrievalTool(BaseTool): + name: str = "rag_retrieval_tool" + description: str = ( + "Vyhledává relevantní klinické směrnice, medicínské informace a odborné články " + "ze znalostní báze na základě zadaného dotazu (např. název laboratorního testu, " + "symptom, identifikované riziko). Použij pro získání kontextu k interpretaci výsledků." + ) + args_schema: Type[BaseModel] = RAGInput + return_direct: bool = False + + def _run(self, query: str) -> str: + return retrieve_clinical_guidelines_func(query) + + async def _arun(self, query: str) -> str: + return self._run(query) + +if __name__ == '__main__': + # Použití funkce + retrieved_info_crp = retrieve_clinical_guidelines_func("S_CRP") + print(f"Info pro CRP:\n{retrieved_info_crp}\n") + + retrieved_info_psa = retrieve_clinical_guidelines_func("Prostatický specifický antigen (PSA) a jeho význam") + print(f"Info pro PSA:\n{retrieved_info_psa}\n") + + retrieved_info_unknown = retrieve_clinical_guidelines_func("NeznámýTermín") + print(f"Info pro neznámý termín:\n{retrieved_info_unknown}\n") + + # Použití nástroje + tool_instance_rag = RAGRetrievalTool() + retrieved_tool_hcg = tool_instance_rag.invoke({"query": "hCG v těhotenství"}) + print(f"Info pro hCG (nástroj):\n{retrieved_tool_hcg}\n") diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..a0cb9fc --- /dev/null +++ b/src/api/__init__.py @@ -0,0 +1 @@ +# src/api/__init__.py diff --git a/src/api/main.py b/src/api/main.py new file mode 100644 index 0000000..9ec5f2e --- /dev/null +++ b/src/api/main.py @@ -0,0 +1,213 @@ +# src/api/main.py +import json +from fastapi import FastAPI, HTTPException, Body +from pydantic import BaseModel, Field, Json +from typing import Dict, Any, Union + +# Importujeme logiku AI enginu +# Předpokládáme, že repozitář je strukturován tak, aby tento import fungoval. +# Možná bude potřeba upravit PYTHONPATH nebo strukturu projektu, pokud by nastal problém s importem. +# Pro jednoduchost předpokládám, že `src` je v PYTHONPATHu nebo FastAPI spouštíme z kořene projektu. +try: + from src.ai_engine.main import get_lab_interpretation +except ImportError: + # Fallback pro případ, že by import selhal (např. při testování mimo plný kontext projektu) + # Toto by se v produkci nemělo dít, pokud je projekt správně strukturován. + print("WARN: Nepodařilo se importovat 'get_lab_interpretation' z 'src.ai_engine.main'. Používám mock funkci.") + def get_lab_interpretation(raw_json_input_string: str) -> str: + return json.dumps({"mock_interpretation": "Toto je mockovaná odpověď, AI engine nebyl správně načten.", "input": raw_json_input_string}) + +# Pydantic modely pro vstup a výstup API + +class InterpretationRequest(BaseModel): + # Očekáváme, že tělo POST požadavku bude přímo JSON string, + # jak je specifikováno ("OpenLIMS bude ... odesílat jeden JSON objekt jako stringový atribut v těle HTTP požadavku") + # FastAPI však typicky parsuje JSON tělo automaticky do Pydantic modelu. + # Abychom přijali raw JSON string v těle, můžeme použít `Body(..., media_type="application/json")` + # nebo jednodušeji definovat model, který očekává tento string. + # Problém je, že specifikace říká "jeden JSON objekt jako stringový atribut v těle HTTP požadavku". + # To je trochu nejednoznačné. Pokud by to znamenalo {"json_payload": "stringified_json_here"}, + # pak by model byl: json_payload: str. + # Pokud to znamená, že celé tělo je ten stringified JSON, pak je to složitější s FastAPI, + # které by se ho snažilo parsovat. + # Prozatím předpokládám, že OpenLIMS pošle JSON objekt, který FastAPI zparsuje do slovníku. + # A my tento slovník převedeme zpět na JSON string pro náš AI engine, který očekává string. + # Toto je potřeba vyjasnit! + # + # AKTUALIZACE PO ÚVAZE: Zadání říká: + # "OpenLIMS bude pro každé volání AI sestavovat a odesílat jeden JSON objekt jako stringový atribut v těle HTTP požadavku." + # To zní, jako by tělo POST bylo např.: {"request_data_as_string": "{ \"request_id\": \"...\", ... }"} + # Nebo, že celé tělo je ten string, ale pak by Content-Type měl být text/plain nebo application/octet-stream, + # a FastAPI by ho muselo číst manuálně. + # + # Zvolím kompromis: API bude očekávat standardní JSON tělo (FastAPI ho zparsuje). + # Pokud OpenLIMS posílá string v atributu, bude to např. {"data": "escapovaný JSON string"} + # Pokud OpenLIMS posílá přímo JSON objekt, je to ideální. + # Náš `get_lab_interpretation` očekává JSON string, takže data z požadavku převedeme na string. + + # Pro jednoduchost nyní budeme očekávat přímo ten JSON objekt, který pak převedeme na string. + # FastAPI defaultně očekává `application/json` a parsuje ho. + # Pokud by OpenLIMS posílalo `Content-Type: text/plain` s JSON stringem, museli bychom to řešit jinak. + # Původní specifikace JSON vstupu pro AI engine: + # { + # "request_id": "GUID_unikatni_identifikator_zadanky", + # "evaluation_method": "NEHRAZENY_POPIS_PSA", + # "patient_metadata": { ... }, + # "current_lab_results": [ ... ], + # ... + # } + # Toto bude tělo požadavku. + request_id: str = Field(..., examples=["GUID_unikatni_identifikator_zadanky"]) + evaluation_method: str = Field(..., examples=["NEHRAZENY_POPIS_PSA"]) + patient_metadata: Dict[str, Any] = Field(default_factory=dict) + current_lab_results: list[Dict[str, Any]] = Field(default_factory=list) + dasta_text_sections: Optional[Dict[str, Any]] = Field(default_factory=dict) + diagnoses: Optional[list[str]] = Field(default_factory=list) + anamnesis_and_medication: Optional[Dict[str, Any]] = Field(default_factory=dict) + + # Přidáme metodu pro konverzi na JSON string, který očekává AI engine + def to_engine_input_string(self) -> str: + return self.model_dump_json() + + +class InterpretationResponse(BaseModel): + request_id: str = Field(examples=["GUID_unikatni_identifikator_zadanky"]) + interpretation_text: Optional[str] = None + error: Optional[str] = None + # Můžeme přidat další metadata, např. verze modelu, timestamp z AI enginu, pokud je poskytne + +# Inicializace FastAPI aplikace +app = FastAPI( + title="STAPRO AI Laboratorní Interpretace API", + version="0.1.0", + description="API pro automatickou interpretaci laboratorních výsledků pomocí AI." +) + +@app.post("/interpret", response_model=InterpretationResponse, tags=["Interpretace"]) +async def interpret_lab_results(request_data: InterpretationRequest): + """ + Přijme laboratorní data ve formátu JSON a vrátí textovou interpretaci. + + - **request_id**: Unikátní ID žádanky pro sledování. + - **evaluation_method**: Určuje typ interpretace/metodu AI agenta. + - **patient_metadata**: Metadata o pacientovi (pohlaví, věk, atd.). + - **current_lab_results**: Aktuální laboratorní výsledky. + - **dasta_text_sections**: Textové sekce z DASTA. + - **diagnoses**: Diagnózy z RES bloku. + - **anamnesis_and_medication**: Anamnéza a medikace. + """ + input_json_string = request_data.to_engine_input_string() + + # Volání AI enginu + ai_response_str = get_lab_interpretation(input_json_string) + + try: + # AI engine může vrátit buď přímo text interpretace (úspěch) + # nebo JSON string s chybou. + ai_response_data = json.loads(ai_response_str) + + if "error" in ai_response_data: + # Chyba z AI enginu + # Logování chyby by zde bylo vhodné + print(f"API: Chyba z AI enginu: {ai_response_data.get('error')}, Detail: {ai_response_data.get('detail')}") + raise HTTPException( + status_code=ai_response_data.get("status_code", 500), + detail=ai_response_data.get("error", "Neznámá chyba v AI enginu") + ) + + # Pokud AI engine vrátí JSON, který není chyba, ale obsahuje interpretaci + # (dle původní specifikace výstupu AI enginu) + # { "request_id": "...", "interpretation_text": "..." } + # To by znamenalo, že get_lab_interpretation vrací JSON string i v úspěšném případě. + # Aktuálně je navržen tak, že vrací přímo text interpretace. + # Pokud by se to změnilo, musela by se tato část upravit. + # Prozatím předpokládáme, že pokud to není JSON s 'error', je to text. + # Tento scénář by nastal, pokud by get_lab_interpretation vracel JSON i pro úspěch. + # Pro jednoduchost nyní předpokládám, že pokud je to validní JSON a nemá 'error', + # tak je to struktura { "request_id": "...", "interpretation_text": "..." } + # Ale náš `get_lab_interpretation` vrací přímo string interpretace. + # Takže tato větev se pravděpodobně neuplatní, pokud `ai_response_str` není chyba. + + # Tento blok by se uplatnil, pokud by AI engine vracel strukturovaný JSON i pro úspěch: + # return InterpretationResponse( + # request_id=ai_response_data.get("request_id", request_data.request_id), + # interpretation_text=ai_response_data.get("interpretation_text") + # ) + # Jelikož get_lab_interpretation vrací přímo string interpretace (pokud není chyba), + # tak tento případ nenastane. Chyba json.loads() nastane, pokud je to čistý text. + + except json.JSONDecodeError: + # Předpokládáme, že `ai_response_str` je přímo text interpretace (úspěšný případ) + return InterpretationResponse( + request_id=request_data.request_id, + interpretation_text=ai_response_str + ) + except HTTPException: + raise # Znovu vyvoláme HTTPException, aby ji FastAPI zpracovalo + except Exception as e: + # Jakákoliv jiná neočekávaná chyba + print(f"API: Neočekávaná chyba při zpracování odpovědi z AI enginu: {e}") + raise HTTPException(status_code=500, detail="Interní chyba serveru při zpracování AI odpovědi.") + + +@app.get("/health", tags=["Stav"]) +async def health_check(): + """Jednoduchý endpoint pro ověření stavu API.""" + return {"status": "ok", "message": "STAPRO AI Interpretation API je funkční."} + +# Pro spuštění FastAPI aplikace lokálně (např. pro testování): +# Použijte příkaz: uvicorn src.api.main:app --reload +# (Spusťte z kořenového adresáře projektu) + +if __name__ == "__main__": + # Tento blok se typicky nepoužívá pro FastAPI, spouští se přes uvicorn. + # Ale pro rychlý test můžeme: + # import uvicorn + # uvicorn.run(app, host="0.0.0.0", port=8000) + print("FastAPI aplikace definována. Spusťte pomocí Uvicorn, např.:") + print("uvicorn src.api.main:app --reload --host 0.0.0.0 --port 8000") + + # Příklad, jak by OpenLIMS mohlo volat API (pomocí `requests`): + """ + import requests + import json + + api_url = "http://localhost:8000/interpret" + + payload = { + "request_id": "GUID_API_TEST_002", + "evaluation_method": "NEHRAZENY_POPIS_NORMAL", + "patient_metadata": { + "gender": "žena", + "age": 28 + }, + "current_lab_results": [ + { + "parameter_code": "CRP", + "parameter_name": "S_CRP", + "value": "1.5", + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "NORMAL" + } + ] + } + + try: + response = requests.post(api_url, json=payload) + response.raise_for_status() # Vyvolá chybu pro HTTP kódy 4xx/5xx + + response_data = response.json() + print("Odpověď z API:") + print(json.dumps(response_data, indent=2, ensure_ascii=False)) + + except requests.exceptions.HTTPError as http_err: + print(f"HTTP chyba: {http_err}") + try: + print(f"Detail chyby z API: {response.json()}") + except json.JSONDecodeError: + print(f"Detail chyby z API (raw): {response.text}") + except Exception as err: + print(f"Jiná chyba: {err}") + """ + pass diff --git a/src/medila_integration_simulation/__init__.py b/src/medila_integration_simulation/__init__.py new file mode 100644 index 0000000..0134022 --- /dev/null +++ b/src/medila_integration_simulation/__init__.py @@ -0,0 +1 @@ +# src/medila_integration_simulation/__init__.py diff --git a/src/medila_integration_simulation/client_simulator.py b/src/medila_integration_simulation/client_simulator.py new file mode 100644 index 0000000..7de6fa7 --- /dev/null +++ b/src/medila_integration_simulation/client_simulator.py @@ -0,0 +1,256 @@ +# src/medila_integration_simulation/client_simulator.py +import requests +import json +import os +from dotenv import load_dotenv + +# Načtení .env pro případné API URL z proměnných prostředí, pokud není lokální +load_dotenv() + +# URL API endpointu - předpokládáme lokálně běžící API, pokud není specifikováno jinak +API_BASE_URL = os.getenv("STAPRO_API_URL", "http://localhost:8000") +INTERPRET_ENDPOINT_URL = f"{API_BASE_URL}/interpret" +HEALTH_ENDPOINT_URL = f"{API_BASE_URL}/health" + +# --- Definice ukázkových JSON payloadů --- + +payload_nehrazeny_abnormita_crp = { + "request_id": "SIM_REQ_001_CRP_HIGH", + "evaluation_method": "NEHRAZENY_POPIS_ABNORMITA", # Aktivace popisu jen při abnormitě + "patient_metadata": { + "gender": "muz", + "age": 45 + }, + "current_lab_results": [ + { + "parameter_code": "01001", # Kód pro CRP + "parameter_name": "S_CRP", + "value": "35.0", + "unit": "mg/L", + "reference_range_raw": "<5", + "interpretation_status": "HIGH", + "raw_dasta_skala": "| | ||" + }, + { + "parameter_code": "GLUC", + "parameter_name": "S_Glukóza", + "value": "5.1", + "unit": "mmol/L", + "reference_range_raw": "3.9-5.6", + "interpretation_status": "NORMAL" + } + ], + "dasta_text_sections": { + "doctor_description": "Pacient si stěžuje na horečku a bolest v krku.", + "memo_to_request": None + }, + "diagnoses": ["Probíhající respirační infekt?"], + "anamnesis_and_medication": { + "anamnesis_text": "OA: Zdráv. FA: Žádná pravidelná medikace.", + "medication_text": None + } +} + +payload_nehrazeny_vse_normal = { + "request_id": "SIM_REQ_002_ALL_NORMAL", + "evaluation_method": "NEHRAZENY_POPIS_NORMAL", # Popis se nemá generovat (mimo výjimek) + "patient_metadata": { + "gender": "žena", + "age": 30 + }, + "current_lab_results": [ + { + "parameter_code": "CHOL", + "parameter_name": "S_Cholesterol", + "value": "4.5", + "unit": "mmol/L", + "reference_range_raw": "<5.2", + "interpretation_status": "NORMAL" + }, + { + "parameter_code": "ALT", + "parameter_name": "S_ALT", + "value": "0.4", + "unit": "ukat/L", + "reference_range_raw": "<0.58", + "interpretation_status": "NORMAL" + } + ], + "dasta_text_sections": {}, "diagnoses": [], "anamnesis_and_medication": {} +} + +payload_vyjimka_psa_normal = { + "request_id": "SIM_REQ_003_PSA_NORMAL", + "evaluation_method": "NEHRAZENY_POPIS_PSA", # PSA se popisuje vždy + "patient_metadata": { + "gender": "muz", + "age": 62 + }, + "current_lab_results": [ + { + "parameter_code": "PSA01", # Kód pro PSA + "parameter_name": "S_PSA celkový", + "value": "3.1", + "unit": "ug/L", + "reference_range_raw": "Věkově specifické, např. <4.5", # Toto by mělo být přesnější + "interpretation_status": "NORMAL" # I když je v normě, má se popsat + } + ], + "dasta_text_sections": {}, "diagnoses": [], "anamnesis_and_medication": {} +} + +payload_hrazeny_individualni_b1 = { + "request_id": "SIM_REQ_004_HRAZ_B1", + "evaluation_method": "HRAZENY_POPIS_INDIVIDUALNI", # B1 + "patient_metadata": { + "gender": "žena", + "age": 58, + "historical_data_access_key": "patient_history_token_b1" + }, + "current_lab_results": [ + { + "parameter_code": "TSH", "parameter_name": "S_TSH", "value": "6.5", "unit": "mIU/L", + "reference_range_raw": "0.27-4.2", "interpretation_status": "HIGH" + }, + { + "parameter_code": "FT4", "parameter_name": "S_FT4", "value": "12.0", "unit": "pmol/L", + "reference_range_raw": "12-22", "interpretation_status": "NORMAL" + }, + { # Přidáno pro demonstraci popisu normální hodnoty + "parameter_code": "VITD", "parameter_name": "S_Vitamin D (25-OH)", "value": "75", "unit": "nmol/L", + "reference_range_raw": "50-250", "interpretation_status": "NORMAL" + } + ], + "dasta_text_sections": {"doctor_description": "Kontrola štítné žlázy, pacientka udává únavu a zimomřivost."}, + "diagnoses": ["Hypothyreosis subclinica susp."], + "anamnesis_and_medication": {"anamnesis_text": "Rodinná anamnéza onemocnění štítné žlázy."} +} + +payload_hrazeny_balicek_b2 = { + "request_id": "SIM_REQ_005_HRAZ_B2", + "evaluation_method": "HRAZENY_POPIS_BALICEK_PREVENCE_MUZ", # B2 + "patient_metadata": { + "gender": "muz", + "age": 50, + "historical_data_access_key": "patient_history_token_b2" + }, + "current_lab_results": [ + {"parameter_code": "GLUC", "parameter_name": "S_Glukóza", "value": "5.8", "unit": "mmol/L", "reference_range_raw": "3.9-5.6", "interpretation_status": "HIGH"}, + {"parameter_code": "CHOLT", "parameter_name": "S_Cholesterol celkový", "value": "6.1", "unit": "mmol/L", "reference_range_raw": "<5.2", "interpretation_status": "HIGH"}, + {"parameter_code": "HDLCH", "parameter_name": "S_HDL Cholesterol", "value": "1.1", "unit": "mmol/L", "reference_range_raw": ">1.0", "interpretation_status": "NORMAL"}, + {"parameter_code": "LDLCH", "parameter_name": "S_LDL Cholesterol", "value": "4.0", "unit": "mmol/L", "reference_range_raw": "<3.0", "interpretation_status": "HIGH"}, + {"parameter_code": "TRIG", "parameter_name": "S_Triacylglyceroly", "value": "2.1", "unit": "mmol/L", "reference_range_raw": "<1.7", "interpretation_status": "HIGH"}, + {"parameter_code": "KOCRB", "parameter_name": "B_Erytrocyty (KO)", "value": "4.9", "unit": "10^12/L", "reference_range_raw": "4.2-5.4", "interpretation_status": "NORMAL"}, + {"parameter_code": "PSA01", "parameter_name": "S_PSA celkový", "value": "1.5", "unit": "ug/L", "reference_range_raw": "<4.0", "interpretation_status": "NORMAL"} + ], + "dasta_text_sections": {"doctor_description": "Preventivní prohlídka v 50 letech."}, + "diagnoses": ["Mírná arteriální hypertenze"], + "anamnesis_and_medication": {"anamnesis_text": "Kuřák 10 cig/den. Otec IM v 60 letech.", "medication_text": "Agen 5mg"} +} + +payloads_to_test = [ + payload_nehrazeny_abnormita_crp, + payload_nehrazeny_vse_normal, + payload_vyjimka_psa_normal, + payload_hrazeny_individualni_b1, + payload_hrazeny_balicek_b2 +] + +def call_interpret_api(payload: dict) -> dict: + """ + Odešle požadavek na /interpret endpoint a vrátí odpověď jako slovník. + """ + print(f"\n--- Volání API pro Request ID: {payload.get('request_id', 'N/A')} ---") + print(f"Evaluation method: {payload.get('evaluation_method')}") + print(f"Odesílaná data (část): {json.dumps(payload, indent=2, ensure_ascii=False)[:500]}...") + + try: + response = requests.post(INTERPRET_ENDPOINT_URL, json=payload, timeout=120) # Timeout 120s pro LLM + response.raise_for_status() # Vyvolá HTTPError pro chybové status kódy (4xx, 5xx) + + response_data = response.json() + print("\nOdpověď z API (JSON):") + print(json.dumps(response_data, indent=2, ensure_ascii=False)) + return response_data + + except requests.exceptions.HTTPError as http_err: + print(f"HTTP chyba: {http_err}") + try: + error_detail = response.json() + print(f"Detail chyby z API: {json.dumps(error_detail, indent=2, ensure_ascii=False)}") + return {"error_type": "HTTPError", "status_code": response.status_code, "detail": error_detail} + except json.JSONDecodeError: + print(f"Detail chyby z API (raw text): {response.text}") + return {"error_type": "HTTPError", "status_code": response.status_code, "raw_text_detail": response.text} + + except requests.exceptions.RequestException as req_err: + print(f"Chyba spojení nebo požadavku: {req_err}") + return {"error_type": "RequestException", "message": str(req_err)} + except Exception as e: + print(f"Neočekávaná chyba při volání API: {e}") + return {"error_type": "Unknown", "message": str(e)} + +def simulate_medila_processing(api_response: dict): + """ + Simuluje, jak by Medila (OpenLIMS) mohla zpracovat odpověď z API. + """ + print("\n--- Simulace zpracování v Medile ---") + if "error_type" in api_response or api_response.get("error"): + print("API vrátilo chybu, interpretace nebude vložena.") + print(f"Detail chyby pro Medilu: {api_response}") + # Zde by Medila logovala chybu, případně upozornila uživatele + elif api_response.get("interpretation_text"): + interpretation_text = api_response["interpretation_text"] + request_id = api_response.get("request_id", "N/A") + print(f"Pro žádanku ID: {request_id}, byla přijata interpretace:") + print("----------------------------------------------------") + print(interpretation_text) + print("----------------------------------------------------") + print("Tato interpretace by se nyní vložila do bloku 'ME-PopisyNaVL' v Medile.") + # Zde by následovala logika pro uložení textu do příslušného bloku v OpenLIMS. + # Např. aktualizace databáze, zobrazení v UI pro editaci lékařem atd. + else: + print("API nevrátilo očekávaný formát interpretace.") + print(f"Přijatá data: {api_response}") + +def check_api_health(): + print("\n--- Kontrola stavu API (/health) ---") + try: + response = requests.get(HEALTH_ENDPOINT_URL, timeout=10) + response.raise_for_status() + print(f"Stav API: {response.status_code}") + print(f"Odpověď: {response.json()}") + return True + except requests.exceptions.RequestException as e: + print(f"Chyba při kontrole stavu API: {e}") + return False + +if __name__ == "__main__": + print("Simulátor klientských volání pro STAPRO AI Interpretace API") + + # Nejprve zkontrolujeme stav API + if not check_api_health(): + print("\nAPI není dostupné. Ukončuji simulaci.") + print(f"Ujistěte se, že FastAPI server běží na {API_BASE_URL}") + exit() + + for i, payload_example in enumerate(payloads_to_test): + print(f"\n\n===== Testovací scénář {i+1} =====") + api_result = call_interpret_api(payload_example) + simulate_medila_processing(api_result) + + if i < len(payloads_to_test) - 1: + # Malá pauza mezi voláními, aby se předešlo zahlcení (pokud by LLM byl pomalý) + # import time + # time.sleep(2) + pass + + print("\n\n===== Simulace dokončena =====") + + # Poznámka pro uživatele: + # Pro spuštění tohoto skriptu: + # 1. Ujistěte se, že máte nastavené proměnné prostředí v .env souboru + # (zejména pro Azure OpenAI a Azure AI Search, pokud je RAG plně integrován). + # 2. Spusťte FastAPI server: uvicorn src.api.main:app --reload --host 0.0.0.0 --port 8000 + # 3. Spusťte tento skript: python -m src.medila_integration_simulation.client_simulator + # (nebo python src/medila_integration_simulation/client_simulator.py pokud je src v PYTHONPATH) diff --git a/src/rag_pipeline/__init__.py b/src/rag_pipeline/__init__.py new file mode 100644 index 0000000..b2bf689 --- /dev/null +++ b/src/rag_pipeline/__init__.py @@ -0,0 +1,2 @@ +# src/rag_pipeline/__init__.py +# Tento soubor může zůstat prázdný nebo obsahovat importy pro snadnější přístup. diff --git a/src/rag_pipeline/document_loader.py b/src/rag_pipeline/document_loader.py new file mode 100644 index 0000000..ad7a5b3 --- /dev/null +++ b/src/rag_pipeline/document_loader.py @@ -0,0 +1,116 @@ +# src/rag_pipeline/document_loader.py +import os +from typing import List +from langchain_community.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader +from langchain_core.documents import Document + +# Adresář, kde jsou uloženy dokumenty znalostní báze +# Cesta je relativní ke kořenovému adresáři projektu +DEFAULT_KNOWLEDGE_BASE_DIR = "data/knowledge_base" + +def load_documents(directory_path: str = DEFAULT_KNOWLEDGE_BASE_DIR) -> List[Document]: + """ + Načte dokumenty z určeného adresáře. + Podporuje .txt a .pdf soubory. + + Args: + directory_path (str): Cesta k adresáři s dokumenty. + + Returns: + List[Document]: Seznam načtených dokumentů (objekty LangChain Document). + """ + if not os.path.isdir(directory_path): + print(f"Chyba: Adresář '{directory_path}' nebyl nalezen.") + return [] + + # Konfigurace pro DirectoryLoader + # Načte všechny .txt soubory pomocí TextLoaderu + # a všechny .pdf soubory pomocí PyPDFLoaderu. + # Globbing pattern '**/' znamená rekurzivní prohledávání podadresářů. + loader = DirectoryLoader( + directory_path, + glob="**/*.*", # Načte všechny soubory, pak filtrujeme podle typu loaderu + loader_map={ + ".txt": TextLoader, + ".pdf": PyPDFLoader, # Vyžaduje `pip install pypdf` + # Lze přidat další loadery pro .docx, .md atd. + # ".md": UnstructuredMarkdownLoader, + # ".docx": UnstructuredWordDocumentLoader, + }, + show_progress=True, + use_multithreading=True, # Může zrychlit načítání velkého množství souborů + silent_errors=True # Potlačí chyby při načítání jednotlivých souborů (např. poškozený PDF) + # a pokusí se načíst ostatní. + ) + + try: + loaded_docs = loader.load() + print(f"Úspěšně načteno {len(loaded_docs)} dokumentů z '{directory_path}'.") + + # Příklad metadat, která LangChain loadery typicky přidávají: + # for doc in loaded_docs: + # print(f" Zdroj: {doc.metadata.get('source')}, obsah (část): {doc.page_content[:100]}...") + + return loaded_docs + except Exception as e: + print(f"Došlo k chybě při načítání dokumentů z '{directory_path}': {e}") + return [] + +if __name__ == '__main__': + print("Testování načítání dokumentů...") + + # Vytvoření dočasných souborů pro test (pokud nejsou přítomny) + # V reálném běhu budou soubory v data/knowledge_base + temp_dir = "temp_kb_test" + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + with open(os.path.join(temp_dir, "test1.txt"), "w", encoding="utf-8") as f: + f.write("Toto je první testovací textový dokument.") + with open(os.path.join(temp_dir, "test2.txt"), "w", encoding="utf-8") as f: + f.write("Toto je druhý testovací dokument s dalším obsahem.") + + # Pro test PDF by bylo potřeba mít ukázkový PDF soubor a nainstalovaný pypdf + # Např. vytvořit dummy.pdf v temp_dir + # try: + # from reportlab.pdfgen import canvas + # c = canvas.Canvas(os.path.join(temp_dir, "dummy.pdf")) + # c.drawString(100, 750, "Toto je testovací PDF dokument.") + # c.save() + # print("Vytvořen dummy.pdf pro testování.") + # except ImportError: + # print("Knihovna reportlab není nainstalována, PDF test bude přeskočen.") + # pass + + + documents = load_documents(temp_dir) + + if documents: + print(f"\nCelkem načteno dokumentů: {len(documents)}") + for i, doc in enumerate(documents): + print(f"\nDokument {i+1}:") + print(f" Zdroj: {doc.metadata.get('source')}") + # print(f" Obsah: {doc.page_content}") # Může být dlouhé + print(f" Obsah (prvních 50 znaků): {doc.page_content[:50]}...") + else: + print("Nebyly načteny žádné dokumenty.") + + # Úklid dočasných souborů + import shutil + if os.path.exists(temp_dir): + try: + shutil.rmtree(temp_dir) + print(f"\nDočasný adresář '{temp_dir}' byl smazán.") + except Exception as e: + print(f"Chyba při mazání dočasného adresáře '{temp_dir}': {e}") + + print("\nTestování s výchozím adresářem (data/knowledge_base):") + # Tento test předpokládá, že soubory smernice_crp.txt a smernice_psa.txt existují + # v data/knowledge_base + kb_documents = load_documents() # Použije DEFAULT_KNOWLEDGE_BASE_DIR + if kb_documents: + print(f"\nCelkem načteno dokumentů z KB: {len(kb_documents)}") + for doc in kb_documents: + print(f" Zdroj z KB: {doc.metadata.get('source')}, obsah (část): {doc.page_content[:50]}...") + else: + print("Nebyly načteny žádné dokumenty z výchozího KB adresáře.") diff --git a/src/rag_pipeline/embedding_generator.py b/src/rag_pipeline/embedding_generator.py new file mode 100644 index 0000000..3bdac4f --- /dev/null +++ b/src/rag_pipeline/embedding_generator.py @@ -0,0 +1,157 @@ +# src/rag_pipeline/embedding_generator.py +import os +from typing import List +from langchain_core.documents import Document +from langchain_openai import AzureOpenAIEmbeddings +from dotenv import load_dotenv + +# Načtení proměnných prostředí (pro lokální vývoj) +load_dotenv() + +# Konfigurace pro Azure OpenAI Embeddings +# Tyto hodnoty by měly odpovídat nasazené službě Azure OpenAI +AZURE_OPENAI_API_VERSION_EMBEDDINGS = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-01") # Použijte stejnou nebo kompatibilní verzi jako pro LLM +AZURE_OPENAI_ENDPOINT_EMBEDDINGS = os.getenv("AZURE_OPENAI_ENDPOINT") # Např. https://.openai.azure.com/ +AZURE_OPENAI_API_KEY_EMBEDDINGS = os.getenv("AZURE_OPENAI_API_KEY") +AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", "textembed") # Název nasazení embedding modelu (např. text-embedding-ada-002) + # V main.parameters.json je to "textembed" + +# Maximální počet textů (chunků), které se pošlou najednou do embedding API. +# Pro text-embedding-ada-002 je limit 2048 tokenů na text a Azure API může mít limity na velikost requestu. +# LangChain AzureOpenAIEmbeddings by měl interně řešit batching, ale chunk_size zde +# určuje, kolik dokumentů (textů) se zpracuje v jednom volání metody embed_documents. +# Pro text-embedding-ada-002 je doporučený limit 16 dokumentů na request, pokud jsou krátké. +# Pokud jsou chunky delší (blízko 2048 tokenů), může být potřeba menší batch size. +# Azure OpenAI API má limit 2048 tokenů na vstupní text a max 1MB na request. +# Langchain client pro Azure OpenAI embeddings má `max_retries` a `chunk_size` (počet dokumentů na API call). +# Výchozí chunk_size v Langchain klientovi je často 16. +DEFAULT_EMBEDDING_BATCH_SIZE = 16 + + +def get_embedding_model(batch_size: int = DEFAULT_EMBEDDING_BATCH_SIZE) -> AzureOpenAIEmbeddings: + """ + Vrací instanci AzureOpenAIEmbeddings nakonfigurovanou pro projekt. + """ + if not all([AZURE_OPENAI_ENDPOINT_EMBEDDINGS, AZURE_OPENAI_API_KEY_EMBEDDINGS, AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME]): + raise ValueError("Chybí jedna nebo více konfiguračních proměnných pro Azure OpenAI Embeddings: " + "AZURE_OPENAI_ENDPOINT, AZURE_OPENAI_API_KEY, AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME.") + + embedding_model = AzureOpenAIEmbeddings( + azure_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME, + openai_api_version=AZURE_OPENAI_API_VERSION_EMBEDDINGS, + azure_endpoint=AZURE_OPENAI_ENDPOINT_EMBEDDINGS, + api_key=AZURE_OPENAI_API_KEY_EMBEDDINGS, + chunk_size=batch_size, # Kolik dokumentů se pošle v jednom API volání + # model="text-embedding-ada-002" # Název modelu je určen nasazením (`azure_deployment`) + ) + return embedding_model + +def generate_embeddings_for_documents( + documents: List[Document], + embedding_model: AzureOpenAIEmbeddings +) -> List[List[float]]: + """ + Generuje vektorové embeddingy pro seznam dokumentů (chunků). + + Args: + documents (List[Document]): Seznam dokumentů (chunků), pro které se mají generovat embeddingy. + embedding_model (AzureOpenAIEmbeddings): Instance nakonfigurovaného embedding modelu. + + Returns: + List[List[float]]: Seznam embeddingů, kde každý embedding je seznam float čísel. + Pořadí odpovídá vstupním dokumentům. + Vrací prázdný seznam v případě chyby nebo prázdného vstupu. + """ + if not documents: + print("Nebyly poskytnuty žádné dokumenty pro generování embeddingů.") + return [] + + texts_to_embed = [doc.page_content for doc in documents] + + try: + print(f"Generování embeddingů pro {len(texts_to_embed)} textových chunků...") + embeddings = embedding_model.embed_documents(texts_to_embed) + print(f"Úspěšně vygenerováno {len(embeddings)} embeddingů.") + # Každý embedding pro text-embedding-ada-002 by měl mít dimenzi 1536 + # if embeddings: + # print(f" Dimenze prvního embeddingu: {len(embeddings[0])}") + return embeddings + except Exception as e: + print(f"Došlo k chybě při generování embeddingů: {e}") + # Zde by mohlo být detailnější logování chyby, např. e.response pokud jde o API error + return [] + +if __name__ == '__main__': + print("Testování generování embeddingů...") + + # Pro tento test je nutné mít nastavené proměnné prostředí pro Azure OpenAI + # (ENDPOINT, API_KEY, název nasazení pro embedding model) + # Např. AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME="textembed" + + # Vytvoření ukázkových dokumentů (chunků) + sample_chunks = [ + Document(page_content="Toto je první chunk textu určený pro embedding.", metadata={"source": "doc1.txt", "chunk_id": 1}), + Document(page_content="Druhý chunk obsahuje jiné informace a také bude převeden na vektor.", metadata={"source": "doc1.txt", "chunk_id": 2}), + Document(page_content="Krátký text.", metadata={"source": "doc2.txt", "chunk_id": 1}) + ] + + try: + model = get_embedding_model() + print("Úspěšně inicializován embedding model.") + + # Test generování embeddingu pro jeden dokument (embed_query) + # query_embedding = model.embed_query("Testovací dotaz pro embedding.") + # print(f"Embedding pro testovací dotaz (dimenze: {len(query_embedding)}): {query_embedding[:5]}...") # Jen prvních 5 dimenzí + + # Test generování embeddingů pro více dokumentů + document_embeddings = generate_embeddings_for_documents(sample_chunks, model) + + if document_embeddings and len(document_embeddings) == len(sample_chunks): + print(f"\nÚspěšně vygenerováno {len(document_embeddings)} embeddingů pro dokumenty.") + for i, emb in enumerate(document_embeddings): + print(f" Embedding pro chunk {i+1} (zdroj: {sample_chunks[i].metadata['source']}):") + print(f" Dimenze: {len(emb)}") + print(f" Prvních 5 hodnot: {emb[:5]}") + elif not document_embeddings and sample_chunks: + print("Nepodařilo se vygenerovat embeddingy, ale dokumenty byly poskytnuty. Zkontrolujte chybové hlášky a konfiguraci Azure OpenAI.") + else: + print("Nebyly vygenerovány žádné embeddingy (nebo počet nesouhlasí).") + + except ValueError as ve: + print(f"Chyba konfigurace: {ve}") + print("Ujistěte se, že máte správně nastavené proměnné prostředí pro Azure OpenAI (ENDPOINT, API_KEY, název deploymentu pro embeddings).") + except Exception as e: + print(f"Neočekávaná chyba při testování embeddingů: {e}") + print("Zkontrolujte připojení k Azure a kvóty pro embedding model.") + + print("\nTestování s reálnými, rozdělenými dokumenty (vyžaduje předchozí kroky):") + try: + from .document_loader import load_documents + from .text_splitter import split_documents + + real_docs = load_documents() # Načte z data/knowledge_base + if real_docs: + real_chunks = split_documents(real_docs) + if real_chunks: + print(f"Načteno a rozděleno {len(real_chunks)} reálných chunků.") + emb_model_for_real = get_embedding_model() + real_embeddings = generate_embeddings_for_documents(real_chunks, emb_model_for_real) + if real_embeddings and len(real_embeddings) == len(real_chunks): + print(f"Úspěšně vygenerováno {len(real_embeddings)} embeddingů pro reálné chunky.") + # Můžeme k dokumentům přidat jejich embeddingy pro další krok + for doc, emb in zip(real_chunks, real_embeddings): + doc.metadata["embedding"] = emb # Toto je jen pro ukázku, neukládáme takto do Document objektu typicky + # print(f"První reálný chunk s embeddingem (prvních 5 dimenzí): {real_chunks[0].metadata['embedding'][:5]}") + else: + print("Nepodařilo se vygenerovat embeddingy pro reálné chunky.") + else: + print("Reálné dokumenty nebyly rozděleny.") + else: + print("Nebyly načteny žádné reálné dokumenty pro test embeddingů.") + + except ImportError: + print("Nepodařilo se importovat document_loader nebo text_splitter. Spusťte testy jednotlivě.") + except ValueError as ve: + print(f"Chyba konfigurace při testu s reálnými dokumenty: {ve}") + except Exception as e: + print(f"Chyba při testu s reálnými dokumenty: {e}") diff --git a/src/rag_pipeline/main_pipeline.py b/src/rag_pipeline/main_pipeline.py new file mode 100644 index 0000000..6a8c2d1 --- /dev/null +++ b/src/rag_pipeline/main_pipeline.py @@ -0,0 +1,129 @@ +# src/rag_pipeline/main_pipeline.py +import os +from dotenv import load_dotenv + +# Importy z ostatních modulů v tomto balíčku +from .document_loader import load_documents, DEFAULT_KNOWLEDGE_BASE_DIR +from .text_splitter import split_documents, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_OVERLAP +from .embedding_generator import get_embedding_model, generate_embeddings_for_documents +from .vectorstore_updater import ( + get_search_index_client, + get_search_client, + create_or_update_index, + upload_documents_to_vector_store, + AZURE_AI_SEARCH_INDEX_NAME # Importujeme výchozí název indexu +) + +def run_rag_data_pipeline( + knowledge_base_dir: str = DEFAULT_KNOWLEDGE_BASE_DIR, + chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_overlap: int = DEFAULT_CHUNK_OVERLAP, + index_name: str = AZURE_AI_SEARCH_INDEX_NAME +): + """ + Spustí kompletní RAG data pipeline: + 1. Načte dokumenty z adresáře. + 2. Rozdělí dokumenty na chunky. + 3. Vygeneruje embeddingy pro chunky. + 4. Vytvoří/aktualizuje index v Azure AI Search. + 5. Nahraje chunky a jejich embeddingy do Azure AI Search. + """ + print("--- Spouštění RAG Data Pipeline ---") + + # Načtení .env souboru pro případ, že skript běží samostatně + # V produkčním nasazení by proměnné prostředí měly být nastaveny systémově. + load_dotenv() + + # Ověření, zda jsou nastaveny potřebné proměnné prostředí pro Azure služby + # (Endpointy, klíče pro OpenAI a AI Search) + # Toto je základní kontrola, detailnější kontroly jsou v jednotlivých modulech. + required_env_vars = [ + "AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_API_KEY", "AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME", + "AZURE_AI_SEARCH_ENDPOINT", "AZURE_AI_SEARCH_ADMIN_KEY" + ] + missing_vars = [var for var in required_env_vars if not os.getenv(var)] + if missing_vars: + print(f"Chyba: Chybí následující povinné proměnné prostředí: {', '.join(missing_vars)}") + print("Pipeline nemůže pokračovat. Nastavte prosím tyto proměnné.") + return False + + try: + # Krok 1: Načtení dokumentů + print(f"\n[Krok 1/5] Načítání dokumentů z '{knowledge_base_dir}'...") + documents = load_documents(knowledge_base_dir) + if not documents: + print("Nebyly nalezeny žádné dokumenty. Pipeline končí.") + return False + print(f"Načteno {len(documents)} dokumentů.") + + # Krok 2: Dělení dokumentů na chunky + print(f"\n[Krok 2/5] Dělení dokumentů na chunky (velikost: {chunk_size}, překryv: {chunk_overlap})...") + chunks = split_documents(documents, chunk_size=chunk_size, chunk_overlap=chunk_overlap) + if not chunks: + print("Nepodařilo se rozdělit dokumenty na chunky. Pipeline končí.") + return False + print(f"Dokumenty rozděleny na {len(chunks)} chunků.") + + # Krok 3: Generování embeddingů + print("\n[Krok 3/5] Generování embeddingů pro chunky...") + embedding_model = get_embedding_model() # Použije výchozí batch size + embeddings_list = generate_embeddings_for_documents(chunks, embedding_model) + if not embeddings_list or len(embeddings_list) != len(chunks): + print("Nepodařilo se vygenerovat embeddingy pro všechny chunky. Pipeline končí.") + return False + print(f"Vygenerováno {len(embeddings_list)} embeddingů.") + + # Krok 4: Příprava Azure AI Search (vytvoření/aktualizace indexu) + print(f"\n[Krok 4/5] Příprava Azure AI Search indexu '{index_name}'...") + index_client = get_search_index_client() + create_or_update_index(index_client, index_name) + # Poznámka: get_search_client() vrací klienta s index_name z proměnné prostředí, + # pokud chceme použít `index_name` z argumentu funkce, museli bychom ho předat. + # Pro konzistenci, pokud je index_name parametr, měl by se použít i pro search_client. + search_client = get_search_client() # Použije AZURE_AI_SEARCH_INDEX_NAME, pokud index_name není předán explicitně + # Pokud chceme dynamický název indexu i pro search_client: + if index_name != AZURE_AI_SEARCH_INDEX_NAME: + search_client = SearchClient(endpoint=os.getenv("AZURE_AI_SEARCH_ENDPOINT"), + index_name=index_name, + credential=AzureKeyCredential(os.getenv("AZURE_AI_SEARCH_ADMIN_KEY"))) + + + # Krok 5: Nahrání dokumentů a embeddingů do Azure AI Search + print(f"\n[Krok 5/5] Nahrávání chunků a embeddingů do indexu '{search_client.index_name}'...") + upload_documents_to_vector_store(search_client, chunks, embeddings_list) + + print("\n--- RAG Data Pipeline byla úspěšně dokončena. ---") + return True + + except ValueError as ve: + print(f"Chyba konfigurace v pipeline: {ve}") + return False + except Exception as e: + print(f"Došlo k neočekávané chybě během RAG Data Pipeline: {e}") + import traceback + traceback.print_exc() + return False + +if __name__ == '__main__': + # Tento skript lze spustit pro naplnění/aktualizaci vektorové databáze. + # Ujistěte se, že máte .env soubor s potřebnými klíči a endpointy, + # nebo že jsou proměnné prostředí nastaveny jinak. + + print("Spouštění RAG Data Pipeline z __main__...") + + # Příklad spuštění s výchozími hodnotami + success = run_rag_data_pipeline() + + if success: + print("\nPipeline proběhla úspěšně.") + else: + print("\nPipeline selhala nebo byla přerušena kvůli chybě.") + + # Příklad spuštění s vlastním názvem indexu (pokud by to bylo potřeba) + # print("\nSpouštění RAG Data Pipeline s vlastním názvem indexu...") + # custom_index_name = "staprolab-kb-test-custom" + # success_custom = run_rag_data_pipeline(index_name=custom_index_name) + # if success_custom: + # print(f"Pipeline pro index '{custom_index_name}' proběhla úspěšně.") + # else: + # print(f"Pipeline pro index '{custom_index_name}' selhala.") diff --git a/src/rag_pipeline/text_splitter.py b/src/rag_pipeline/text_splitter.py new file mode 100644 index 0000000..534b15e --- /dev/null +++ b/src/rag_pipeline/text_splitter.py @@ -0,0 +1,138 @@ +# src/rag_pipeline/text_splitter.py +from typing import List +from langchain_core.documents import Document +from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter # Přidán CharacterTextSplitter pro jednoduchost + +# Doporučené hodnoty pro chunk_size a chunk_overlap se mohou lišit +# v závislosti na povaze textů a použitém embedding modelu. +# Pro modely jako text-embedding-ada-002 je dobré mít chunky, +# které nejsou příliš krátké ani příliš dlouhé. +DEFAULT_CHUNK_SIZE = 1000 # Počet znaků na chunk +DEFAULT_CHUNK_OVERLAP = 200 # Počet znaků překryvu mezi chunky + +def split_documents( + documents: List[Document], + chunk_size: int = DEFAULT_CHUNK_SIZE, + chunk_overlap: int = DEFAULT_CHUNK_OVERLAP +) -> List[Document]: + """ + Rozdělí seznam LangChain dokumentů na menší části (chunky). + + Args: + documents (List[Document]): Seznam dokumentů k rozdělení. + chunk_size (int): Maximální velikost jednoho chunku (v počtu znaků). + chunk_overlap (int): Počet znaků překryvu mezi sousedními chunky. + + Returns: + List[Document]: Seznam rozdělených dokumentů (chunků). + Každý chunk je také LangChain Document objekt, + který si typicky zachovává metadata původního dokumentu. + """ + if not documents: + print("Nebyly poskytnuty žádné dokumenty k rozdělení.") + return [] + + # Použijeme RecursiveCharacterTextSplitter, který se snaží dělit text + # na základě sady oddělovačů (např. "\n\n", "\n", " ", "") a udržovat + # sémanticky související části textu pohromadě. + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=len, # Funkce pro měření délky textu (standardně len) + add_start_index=True, # Přidá do metadat pozici začátku chunku v původním dokumentu + separators=["\n\n", "\n", ". ", ", ", " ", ""] # Preferované oddělovače + ) + + # Alternativně, pro velmi jednoduché texty nebo specifické případy: + # text_splitter = CharacterTextSplitter( + # separator="\n\n", # Například dělení podle odstavců + # chunk_size=chunk_size, + # chunk_overlap=chunk_overlap, + # length_function=len, + # add_start_index=True, + # ) + + try: + split_docs = text_splitter.split_documents(documents) + print(f"Původních {len(documents)} dokumentů bylo rozděleno na {len(split_docs)} chunků.") + + # Příklad informací o chuncích + # for i, chunk_doc in enumerate(split_docs[:3]): # Jen prvních pár + # print(f" Chunk {i+1} (zdroj: {chunk_doc.metadata.get('source', 'N/A')}):") + # print(f" Začátek na indexu: {chunk_doc.metadata.get('start_index', 'N/A')}") + # print(f" Obsah (část): {chunk_doc.page_content[:100]}...") + + return split_docs + except Exception as e: + print(f"Došlo k chybě při dělení dokumentů: {e}") + return [] + +if __name__ == '__main__': + print("Testování dělení dokumentů...") + + # Vytvoření ukázkových dokumentů pro test + doc1_content = "Toto je první dokument. Má několik vět. Bude rozdělen na menší části. " * 50 + doc2_content = "Druhý dokument je kratší. Ale také obsahuje důležité informace. " * 30 + + sample_documents = [ + Document(page_content=doc1_content, metadata={"source": "doc1.txt", "category": "test"}), + Document(page_content=doc2_content, metadata={"source": "doc2.txt", "category": "test"}), + Document(page_content="Krátký dokument.", metadata={"source": "doc3.txt"}) # Tento by neměl být moc dělen + ] + + print(f"Počet vstupních dokumentů: {len(sample_documents)}") + for i, doc in enumerate(sample_documents): + print(f" Dokument {i+1} (zdroj: {doc.metadata['source']}), délka: {len(doc.page_content)} znaků.") + + # Test s výchozími parametry + print("\nTest s výchozími parametry (chunk_size=1000, chunk_overlap=200):") + chunks_default = split_documents(sample_documents) + if chunks_default: + print(f"Celkem vytvořeno chunků: {len(chunks_default)}") + # for i, chunk in enumerate(chunks_default): + # print(f" Chunk {i+1} (zdroj: {chunk.metadata['source']}, start_index: {chunk.metadata.get('start_index')}), délka: {len(chunk.page_content)}") + # print(f" Obsah: {chunk.page_content[:80]}...") + # if i > 4 : break # Jen prvních pár + else: + print("Nevytvořeny žádné chunky (výchozí parametry).") + + # Test s menší velikostí chunku + print("\nTest s menší velikostí chunku (chunk_size=200, chunk_overlap=50):") + chunks_small = split_documents(sample_documents, chunk_size=200, chunk_overlap=50) + if chunks_small: + print(f"Celkem vytvořeno chunků: {len(chunks_small)}") + # for i, chunk in enumerate(chunks_small): + # print(f" Chunk {i+1} (zdroj: {chunk.metadata['source']}, start_index: {chunk.metadata.get('start_index')}), délka: {len(chunk.page_content)}") + # print(f" Obsah: {chunk.page_content[:80]}...") + # if i > 4 : break + else: + print("Nevytvořeny žádné chunky (malé parametry).") + + # Test s prázdným vstupem + print("\nTest s prázdným vstupem:") + chunks_empty = split_documents([]) + if not chunks_empty: + print("Správně vrácen prázdný seznam pro prázdný vstup.") + + # Načtení reálných dokumentů a jejich rozdělení + print("\nTest s reálnými dokumenty z data/knowledge_base:") + # Předpokládáme, že document_loader.py je ve stejném adresáři nebo je Python path správně nastavena + try: + from .document_loader import load_documents # Relativní import pro __main__ + real_docs = load_documents() # Načte z data/knowledge_base + if real_docs: + print(f"Načteno {len(real_docs)} reálných dokumentů.") + real_chunks = split_documents(real_docs) + if real_chunks: + print(f"Reálné dokumenty rozděleny na {len(real_chunks)} chunků.") + # for i, chunk in enumerate(real_chunks[:5]): + # print(f" Chunk {i+1} (zdroj: {chunk.metadata['source']}), délka: {len(chunk.page_content)}") + # print(f" Obsah: {chunk.page_content[:80]}...") + else: + print("Reálné dokumenty nebyly rozděleny.") + else: + print("Nebyly načteny žádné reálné dokumenty pro test dělení.") + except ImportError: + print("Nepodařilo se importovat document_loader pro test s reálnými dokumenty. Spusťte testy jednotlivě.") + except Exception as e: + print(f"Chyba při testu s reálnými dokumenty: {e}") diff --git a/src/rag_pipeline/vectorstore_updater.py b/src/rag_pipeline/vectorstore_updater.py new file mode 100644 index 0000000..6f363b5 --- /dev/null +++ b/src/rag_pipeline/vectorstore_updater.py @@ -0,0 +1,244 @@ +# src/rag_pipeline/vectorstore_updater.py +import os +from typing import List +from langchain_core.documents import Document +from langchain_openai import AzureOpenAIEmbeddings # Použito pro typovou anotaci a případně pro získání dimenze +from azure.core.credentials import AzureKeyCredential +from azure.search.documents import SearchClient +from azure.search.documents.indexes import SearchIndexClient +from azure.search.documents.models import VectorizedQuery +from azure.search.documents.indexes.models import ( + SearchIndex, + SearchField, + SearchFieldDataType, + SimpleField, + SearchableField, + VectorSearch, + VectorSearchProfile, + HnswAlgorithmConfiguration, + SemanticSearch, + SemanticField, + SemanticConfiguration, + SemanticPrioritizedFields +) +from dotenv import load_dotenv + +# Načtení proměnných prostředí +load_dotenv() + +# Konfigurace pro Azure AI Search +AZURE_AI_SEARCH_ENDPOINT = os.getenv("AZURE_AI_SEARCH_ENDPOINT") # Např. https://.search.windows.net +AZURE_AI_SEARCH_API_KEY = os.getenv("AZURE_AI_SEARCH_ADMIN_KEY") # Admin klíč pro vytváření/aktualizaci indexů +AZURE_AI_SEARCH_INDEX_NAME = os.getenv("AZURE_AI_SEARCH_INDEX_NAME", "staprolab-knowledgebase-index") + +# Dimenze embeddingů - pro text-embedding-ada-002 je to 1536 +# Měla by být konzistentní s modelem použitým v embedding_generator.py +EMBEDDING_DIMENSION = 1536 # text-embedding-ada-002 + +def get_search_index_client() -> SearchIndexClient: + if not all([AZURE_AI_SEARCH_ENDPOINT, AZURE_AI_SEARCH_API_KEY]): + raise ValueError("Chybí konfigurace pro Azure AI Search: ENDPOINT nebo ADMIN_KEY.") + return SearchIndexClient(endpoint=AZURE_AI_SEARCH_ENDPOINT, credential=AzureKeyCredential(AZURE_AI_SEARCH_API_KEY)) + +def get_search_client() -> SearchClient: + if not all([AZURE_AI_SEARCH_ENDPOINT, AZURE_AI_SEARCH_API_KEY, AZURE_AI_SEARCH_INDEX_NAME]): + raise ValueError("Chybí konfigurace pro Azure AI Search: ENDPOINT, ADMIN_KEY nebo INDEX_NAME.") + return SearchClient(endpoint=AZURE_AI_SEARCH_ENDPOINT, index_name=AZURE_AI_SEARCH_INDEX_NAME, credential=AzureKeyCredential(AZURE_AI_SEARCH_API_KEY)) + +def create_or_update_index(index_client: SearchIndexClient, index_name: str): + """ + Vytvoří nebo aktualizuje index v Azure AI Search pro ukládání dokumentů a jejich embeddingů. + """ + fields = [ + SimpleField(name="id", type=SearchFieldDataType.String, key=True, filterable=True), # Unikátní ID dokumentu/chunku + SearchableField(name="content", type=SearchFieldDataType.String, searchable=True, analyzable=True), # Textový obsah chunku + SearchField(name="content_vector", type=SearchFieldDataType.Collection(SearchFieldDataType.Single), + searchable=True, vector_search_dimensions=EMBEDDING_DIMENSION, vector_search_profile_name="my-hnsw-profile"), + SimpleField(name="source", type=SearchFieldDataType.String, filterable=True, facetable=True), # Zdrojový soubor + SimpleField(name="category", type=SearchFieldDataType.String, filterable=True, facetable=True, default_value="general"), # Kategorie (pokud je) + SimpleField(name="start_index", type=SearchFieldDataType.Int32, filterable=True, sortable=True, default_value=0), # Pozice v původním dokumentu + # Další metadata lze přidat podle potřeby + ] + + vector_search = VectorSearch( + algorithms=[HnswAlgorithmConfiguration(name="my-hnsw-algo")], # Můžeme mít více algoritmů + profiles=[VectorSearchProfile(name="my-hnsw-profile", algorithm_configuration_name="my-hnsw-algo")] + ) + + # Sémantické vyhledávání (volitelné, ale doporučené pro lepší relevanci) + # Vyžaduje, aby služba AI Search byla na úrovni Basic nebo vyšší a v podporovaném regionu. + semantic_search_config = SemanticSearch(configurations=[ + SemanticConfiguration( + name="my-semantic-config", + prioritized_fields=SemanticPrioritizedFields( + title_field=None, # Nemáme explicitní titulkové pole pro chunky + content_fields=[SemanticField(field_name="content")] + ) + ) + ]) + + index = SearchIndex( + name=index_name, + fields=fields, + vector_search=vector_search, + semantic_search=semantic_search_config # Přidání sémantické konfigurace + ) + + try: + print(f"Vytváření/aktualizace indexu '{index_name}'...") + index_client.create_or_update_index(index) + print(f"Index '{index_name}' je připraven.") + except Exception as e: + print(f"Chyba při vytváření/aktualizaci indexu '{index_name}': {e}") + raise + +def upload_documents_to_vector_store( + search_client: SearchClient, + documents: List[Document], + embeddings: List[List[float]] +): + """ + Nahraje dokumenty (chunky) a jejich embeddingy do Azure AI Search. + Předpokládá, že index již existuje a má správnou strukturu. + """ + if len(documents) != len(embeddings): + raise ValueError("Počet dokumentů a embeddingů se neshoduje.") + if not documents: + print("Nebyly poskytnuty žádné dokumenty k nahrání.") + return + + docs_to_upload = [] + for i, (doc, emb) in enumerate(zip(documents, embeddings)): + # Vytvoření unikátního ID pro každý chunk, např. kombinací zdroje a indexu/hashe + # Pro jednoduchost použijeme source a start_index, pokud je dostupné + source_name = doc.metadata.get("source", f"unknown_source_{i}") + start_idx = doc.metadata.get("start_index", i) + doc_id = f"{os.path.basename(source_name)}-{start_idx}" + # Nahrazení nevalidních znaků pro ID + doc_id = "".join(c if c.isalnum() or c in ['-', '_'] else '_' for c in doc_id) + + + docs_to_upload.append({ + "id": doc_id, + "content": doc.page_content, + "content_vector": emb, + "source": doc.metadata.get("source", "N/A"), + "category": doc.metadata.get("category", "general"), + "start_index": doc.metadata.get("start_index", 0) + }) + + try: + print(f"Nahrávání {len(docs_to_upload)} dokumentů do indexu '{search_client.index_name}'...") + # upload_documents může přijmout seznam slovníků + result = search_client.upload_documents(documents=docs_to_upload) + + successful_uploads = sum(1 for r in result if r.succeeded) + print(f"Úspěšně nahráno {successful_uploads} z {len(docs_to_upload)} dokumentů.") + + for item_result in result: + if not item_result.succeeded: + print(f" Chyba při nahrávání dokumentu ID {item_result.key}: {item_result.error_message}") + + except Exception as e: + print(f"Došlo k chybě při nahrávání dokumentů: {e}") + # Zde by mohlo být detailnější logování + raise + +# Funkce pro vyhledávání (pro testování a pro RAGRetrievalTool) +def perform_vector_search(query_text: str, query_embedding: List[float], search_client: SearchClient, top_k: int = 3) -> List[Dict[str, Any]]: + """ + Provede vektorové vyhledávání v Azure AI Search. + """ + vector_query = VectorizedQuery(vector=query_embedding, k_nearest_neighbors=top_k, fields="content_vector") + + try: + results = search_client.search( + search_text=None, # Můžeme kombinovat s full-text vyhledáváním: query_text, + vector_queries=[vector_query], + select=["id", "source", "content", "start_index"], # Která pole chceme vrátit + # query_type="semantic", # Pokud chceme použít sémantické reranking + # semantic_configuration_name="my-semantic-config", # Název sémantické konfigurace + top=top_k + ) + + found_docs = [] + for result in results: + found_docs.append({ + "id": result.get("id"), + "score": result.get("@search.score"), # Relevance score pro full-text + "vector_score": result.get("@search.score"), # Prozatím, AI Search vrací jedno skóre, v budoucnu se může lišit + "reranker_score": result.get("@search.reranker_score"), # Pokud je použit semantic search + "content": result.get("content"), + "source": result.get("source"), + "start_index": result.get("start_index") + }) + return found_docs + except Exception as e: + print(f"Chyba při vektorovém vyhledávání: {e}") + return [] + + +if __name__ == '__main__': + print("Testování Azure AI Search Vector Store Updater...") + + # Pro tento test je nutné mít nastavené proměnné prostředí: + # AZURE_AI_SEARCH_ENDPOINT, AZURE_AI_SEARCH_ADMIN_KEY + # a také pro embedding model (AZURE_OPENAI_ENDPOINT, atd.) + + # Krok 1: Načtení, rozdělení a embeddování dokumentů (z předchozích skriptů) + try: + from .document_loader import load_documents + from .text_splitter import split_documents + from .embedding_generator import get_embedding_model, generate_embeddings_for_documents + + print("\n--- Fáze 1: Načítání a příprava dokumentů ---") + docs = load_documents() # Z data/knowledge_base + if not docs: + raise Exception("Nebyly načteny žádné dokumenty z data/knowledge_base. Ukončuji test.") + + chunks = split_documents(docs) + if not chunks: + raise Exception("Dokumenty nebyly rozděleny na chunky. Ukončuji test.") + + emb_model = get_embedding_model() + embeddings_list = generate_embeddings_for_documents(chunks, emb_model) + if not embeddings_list or len(embeddings_list) != len(chunks): + raise Exception("Nepodařilo se vygenerovat embeddingy pro všechny chunky. Ukončuji test.") + + print(f"Připraveno {len(chunks)} chunků s embeddingy.") + + # Krok 2: Vytvoření/aktualizace indexu a nahrání dokumentů + print("\n--- Fáze 2: Práce s Azure AI Search ---") + idx_client = get_search_index_client() + s_client = get_search_client() # Pro nahrávání a vyhledávání + + create_or_update_index(idx_client, AZURE_AI_SEARCH_INDEX_NAME) + upload_documents_to_vector_store(s_client, chunks, embeddings_list) + + print("\n--- Fáze 3: Testovací vyhledávání ---") + test_query = "Jaké jsou referenční hodnoty pro CRP u dospělých?" + print(f"Testovací dotaz: {test_query}") + + query_vector = emb_model.embed_query(test_query) + + search_results = perform_vector_search(test_query, query_vector, s_client, top_k=2) + + if search_results: + print(f"Nalezeno {len(search_results)} výsledků pro dotaz:") + for res_doc in search_results: + print(f" ID: {res_doc['id']}, Zdroj: {res_doc['source']}") + print(f" Skóre: {res_doc.get('score', 'N/A')}, Vektorové skóre: {res_doc.get('vector_score', 'N/A')}, Reranker skóre: {res_doc.get('reranker_score', 'N/A')}") + print(f" Obsah (část): {res_doc['content'][:150]}...") + print("-" * 20) + else: + print("Pro testovací dotaz nebyly nalezeny žádné výsledky.") + + except ValueError as ve: + print(f"Chyba konfigurace: {ve}") + print("Ujistěte se, že máte správně nastavené proměnné prostředí pro Azure AI Search a Azure OpenAI.") + except ImportError: + print("Chyba importu. Ujistěte se, že všechny potřebné skripty jsou dostupné.") + except Exception as e: + print(f"Neočekávaná chyba v testovacím scénáři: {e}") + import traceback + traceback.print_exc()