diff --git a/README.md b/README.md index 5a63f18..3cf57c1 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ The functions include a built-in encryption mechanism for sensitive information: - Enables interaction with **Azure OpenAI** and other **Azure AI** models. - Supports Azure Search integration for enhanced document retrieval. +- **Native OpenWebUI Citations Support** 🎯: Rich citation cards, source previews, and inline citation correlations for Azure AI Search responses (Azure OpenAI only). - Supports multiple Azure AI models selection via the `AZURE_AI_MODEL` environment variable (e.g. `gpt-4o;gpt-4o-mini`). - Customizable pipeline display with configurable prefix via `AZURE_AI_PIPELINE_PREFIX`. - Azure AI Search / RAG integration with enhanced collapsible citation display (Azure OpenAI only). @@ -112,6 +113,8 @@ The functions include a built-in encryption mechanism for sensitive information: 🔗 [Learn More About Azure AI](https://azure.microsoft.com/en-us/solutions/ai) +📖 [Azure AI Citations Documentation](./docs/azure-ai-citations.md) + ### **2. [N8N Pipeline](./pipelines/n8n/n8n.py)** > [!TIP] diff --git a/docs/azure-ai-citations.md b/docs/azure-ai-citations.md new file mode 100644 index 0000000..26401a6 --- /dev/null +++ b/docs/azure-ai-citations.md @@ -0,0 +1,202 @@ +# Azure AI Foundry Pipeline - Native OpenWebUI Citations + +This document describes the native OpenWebUI citation support in the Azure AI Foundry Pipeline, which enables rich citation cards and source previews in the OpenWebUI frontend. + +## Overview + +The Azure AI Foundry Pipeline supports **native OpenWebUI citations** for Azure AI Search (RAG) responses. This feature is **automatically enabled** when you configure Azure AI Search data sources (`AZURE_AI_DATA_SOURCES`). The OpenWebUI frontend will display: + +- **Citation cards** with source information and relevance scores +- **Source previews** with content snippets +- **Relevance percentage** displayed on citation cards (requires `AZURE_AI_INCLUDE_SEARCH_SCORES=true`) +- **Clickable `[docX]` references** that link directly to document URLs +- **Interactive citation UI** with expandable source details + +## Features + +### Automatic Citation Support + +When Azure AI Search is configured, the pipeline automatically: + +1. Emits citation events via `__event_emitter__` for the OpenWebUI frontend +2. Converts `[docX]` references in the response to clickable markdown links +3. Filters citations to only show documents actually referenced in the response +4. Extracts relevance scores from Azure Search when available + +### Configuration Options + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `AZURE_AI_DATA_SOURCES` | `""` | JSON configuration for Azure AI Search (required for citations) | +| `AZURE_AI_INCLUDE_SEARCH_SCORES` | `true` | Enable relevance score extraction from Azure Search | + +### How It Works + +#### Streaming Responses + +When Azure AI Search returns citations in a streaming response: + +1. The pipeline detects citations in the SSE (Server-Sent Events) stream +2. `[docX]` references in each chunk are converted to markdown links with document URLs +3. After the stream ends, citation events are emitted via `__event_emitter__` +4. Citations are filtered to only include documents referenced in the response + +#### Non-Streaming Responses + +When Azure AI Search returns citations in a non-streaming response: + +1. The pipeline extracts citations from the response context +2. `[docX]` references in the content are converted to markdown links +3. Individual citation events are emitted via `__event_emitter__` for each referenced source + +## Citation Format + +### OpenWebUI Citation Event Structure + +Each citation is emitted as a separate event to ensure all sources appear in the UI. Citation events follow the official OpenWebUI specification (see [OpenWebUI Events Documentation](https://docs.openwebui.com/features/plugin/development/events#source-or-citation-and-code-execution)): + +```python +{ + "type": "citation", + "data": { + "document": ["Document content..."], # Content from this citation + "metadata": [{"source": "https://..."}], # Metadata with source URL + "source": { + "name": "[doc1] Document Title", # Unique name with index + "url": "https://..." # Source URL if available + }, + "distances": [0.95] # Relevance score (displayed as percentage) + } +} +``` + +Key points: +- Each source document gets its own citation event +- The `source.name` includes the doc index (`[doc1]`, `[doc2]`, etc.) to prevent grouping +- The `distances` array contains relevance scores from Azure AI Search, which OpenWebUI displays as a percentage on the citation cards + +### Azure Citation Format (Input) + +Azure AI Search returns citations in this format: + +```python +{ + "title": "Document Title", + "content": "Full or partial content", + "url": "https://...", + "filepath": "/path/to/file", + "chunk_id": "chunk-123", + "score": 0.95, + "metadata": {} +} +``` + +The pipeline automatically converts Azure citations to OpenWebUI format. + +## Usage + +### Basic Setup + +Configure Azure AI Search to enable citation support: + +```bash +# Azure AI Search configuration (required for citations) +AZURE_AI_DATA_SOURCES='[{"type":"azure_search","parameters":{"endpoint":"https://YOUR-SEARCH-SERVICE.search.windows.net","index_name":"YOUR-INDEX-NAME","authentication":{"type":"api_key","key":"YOUR-SEARCH-API-KEY"}}}]' + +# Enable relevance scores (default: true) +AZURE_AI_INCLUDE_SEARCH_SCORES=true +``` + +### Clickable Document Links + +The pipeline automatically converts `[docX]` references to clickable markdown links: + +```markdown +# Input from Azure AI +The answer can be found in [doc1] and [doc2]. + +# Output (converted by pipeline) +The answer can be found in [[doc1]](https://example.com/doc1.pdf) and [[doc2]](https://example.com/doc2.pdf). +``` + +This works for both streaming and non-streaming responses. + +### Relevance Scores + +When `AZURE_AI_INCLUDE_SEARCH_SCORES=true` (default), the pipeline: + +1. Automatically adds `include_contexts: ["citations", "all_retrieved_documents"]` to Azure Search requests +2. Extracts scores based on the `filter_reason` field: + - `filter_reason="rerank"` → uses `rerank_score` + - `filter_reason="score"` or not present → uses `original_search_score` +3. Displays the score as a percentage on citation cards + +## Implementation Details + +### Helper Functions + +The pipeline includes these helper functions for citation processing: + +1. **`_extract_citations_from_response()`**: Extracts citations from Azure responses +2. **`_normalize_citation_for_openwebui()`**: Converts Azure citations to OpenWebUI format +3. **`_emit_openwebui_citation_events()`**: Emits citation events via `__event_emitter__` +4. **`_merge_score_data()`**: Matches citations with score data from `all_retrieved_documents` +5. **`_build_citation_urls_map()`**: Builds mapping of citation indices to URLs +6. **`_format_citation_link()`**: Creates markdown links for `[docX]` references +7. **`_convert_doc_refs_to_links()`**: Converts all `[docX]` references in content to markdown links + +### Title Fallback Logic + +The pipeline uses intelligent title fallback: + +1. Use `title` field if available +2. Fallback to filename extracted from `filepath` or `url` +3. Fallback to `"Unknown Document"` if all are empty + +This ensures every citation has a meaningful display name. + +### Citation Filtering + +Citations are filtered to only show documents that are actually referenced in the response content. For example, if Azure returns 5 citations but the response only references `[doc1]` and `[doc3]`, only those 2 citations will appear in the UI. + +## Troubleshooting + +### Citations Not Appearing + +**Problem**: Citations don't appear in the OpenWebUI frontend + +**Solutions**: +1. Check that Azure AI Search is properly configured (`AZURE_AI_DATA_SOURCES`) +2. Ensure you're using an Azure OpenAI endpoint (not a generic Azure AI endpoint) +3. Verify the response contains `[docX]` references +4. Check browser console and server logs for errors + +### Relevance Scores Showing 0% + +**Problem**: All citation cards show 0% relevance + +**Solutions**: +1. Verify `AZURE_AI_INCLUDE_SEARCH_SCORES=true` is set +2. Check that your Azure Search index supports scoring +3. Enable DEBUG logging to see the raw score values from Azure + +### Links Not Working + +**Problem**: `[docX]` references are not clickable + +**Solutions**: +1. Ensure citations have valid `url` or `filepath` fields +2. Check that the document URL is accessible +3. Verify the markdown link format is being generated correctly + +## References + +- [OpenWebUI Pipelines Citation Feature Discussion](https://github.com/open-webui/pipelines/issues/229) +- [OpenWebUI Event Emitter Documentation](https://docs.openwebui.com/features/plugin/development/events) +- [Azure AI Search Documentation](https://learn.microsoft.com/en-us/azure/search/) +- [Azure On Your Data API Reference](https://learn.microsoft.com/en-us/azure/ai-foundry/openai/references/on-your-data) + +## Version History + +- **v2.6.0**: Major refactor - removed `AZURE_AI_ENHANCE_CITATIONS` and `AZURE_AI_OPENWEBUI_CITATIONS` valves; citation support is now always enabled when `AZURE_AI_DATA_SOURCES` is configured; added clickable `[docX]` markdown links; improved score extraction using `filter_reason` field +- **v2.5.x**: Dual citation modes (OpenWebUI events + markdown/HTML) diff --git a/docs/azure-ai-integration.md b/docs/azure-ai-integration.md index 6d0782b..1f9766b 100644 --- a/docs/azure-ai-integration.md +++ b/docs/azure-ai-integration.md @@ -60,8 +60,9 @@ AZURE_AI_ENDPOINT="https://.openai.azure.com/openai/deployments/ -📚 Sources and References - -
-[doc1] - README.md - -📁 **File:** `README.md` -📄 **Chunk ID:** 0 -**Content:** -> environment variable. The token can be used to authenticate the workflow when accessing GitHub resources... - -
- -
-[doc2] - Documentation.md +# Enhanced response (with clickable links) +**Docker container actions** are a type of GitHub Actions [[doc1]](https://example.com/README.md)... +``` -📁 **File:** `Documentation.md` -📄 **Chunk ID:** 1 -**Content:** -> Docker container actions contain all their dependencies in the container and are therefore very consistent... +**Citation Card Features:** -
+- **Source information** with `[docX]` prefix for easy identification +- **Relevance percentage** displayed on citation cards (requires `AZURE_AI_INCLUDE_SEARCH_SCORES=true`) +- **Document preview** with content snippets +- **Clickable links** to source documents when URLs are available +- **Streaming support** with links converted inline as content streams - -``` +**Relevance Score Selection:** -**Enhanced Citation Features:** +The pipeline uses the `filter_reason` field from Azure Search to select the appropriate score: +- `filter_reason="rerank"` → uses `rerank_score` +- `filter_reason="score"` or not present → uses `original_search_score` -- **Collapsible interface** with expandable sections for clean presentation -- **Two-level organization** - main sources section and individual document details -- **Complete content display** - full document content, not just previews -- **Document references** with clear [doc1], [doc2] labels for easy cross-referencing -- **Source metadata** including file paths, URLs, and chunk IDs for precise tracking -- **Streaming support** with citations properly formatted for both streaming and non-streaming responses -- **Space efficient** - collapsed by default to avoid overwhelming the main response +For more details, see the [Azure AI Citations Documentation](azure-ai-citations.md). > [!TIP] > To use **Azure OpenAI** and other **Azure AI** models **simultaneously**, you can use the following URL: `https://.services.ai.azure.com/models/chat/completions?api-version=2024-05-01-preview` diff --git a/pipelines/azure/azure_ai_foundry.py b/pipelines/azure/azure_ai_foundry.py index b8a841c..0a87638 100644 --- a/pipelines/azure/azure_ai_foundry.py +++ b/pipelines/azure/azure_ai_foundry.py @@ -4,7 +4,7 @@ author_url: https://github.com/owndev/ project_url: https://github.com/owndev/Open-WebUI-Functions funding_url: https://github.com/sponsors/owndev -version: 2.5.2 +version: 2.6.0 license: Apache License 2.0 description: A pipeline for interacting with Azure AI services, enabling seamless communication with various AI models via configurable headers and robust error handling. This includes support for Azure OpenAI models as well as other Azure AI models by dynamically managing headers and request configurations. Azure AI Search (RAG) integration is only supported with Azure OpenAI endpoints. features: @@ -15,7 +15,9 @@ - Compatible with Azure OpenAI and other Azure AI models. - Predefined models for easy access. - Encrypted storage of sensitive API keys - - Azure AI Search / RAG integration with enhanced citation display (Azure OpenAI only) + - Azure AI Search / RAG integration with native OpenWebUI citations (Azure OpenAI only) + - Automatic [docX] to markdown link conversion for clickable citations + - Relevance scores from Azure AI Search displayed in citation cards """ from typing import ( @@ -28,6 +30,7 @@ Any, AsyncIterator, Set, + Callable, ) from urllib.parse import urlparse from fastapi.responses import StreamingResponse @@ -123,6 +126,8 @@ def __get_pydantic_core_schema__( # Helper functions + + async def cleanup_response( response: Optional[aiohttp.ClientResponse], session: Optional[aiohttp.ClientSession], @@ -141,6 +146,9 @@ async def cleanup_response( class Pipe: + # Regex pattern for matching [docX] citation references + DOC_REF_PATTERN = re.compile(r"\[doc(\d+)\]") + # Environment variables for API key, endpoint, and optional model class Valves(BaseModel): # Custom prefix for pipeline display name @@ -174,19 +182,26 @@ class Valves(BaseModel): # Switch for sending model name in request body AZURE_AI_MODEL_IN_BODY: bool = Field( - default=os.getenv("AZURE_AI_MODEL_IN_BODY", False), + default=bool( + os.getenv("AZURE_AI_MODEL_IN_BODY", "false").lower() == "true" + ), description="If True, include the model name in the request body instead of as a header.", ) # Flag to indicate if predefined Azure AI models should be used USE_PREDEFINED_AZURE_AI_MODELS: bool = Field( - default=os.getenv("USE_PREDEFINED_AZURE_AI_MODELS", False), + default=bool( + os.getenv("USE_PREDEFINED_AZURE_AI_MODELS", "false").lower() == "true" + ), description="Flag to indicate if predefined Azure AI models should be used.", ) # If True, use Authorization header with Bearer token instead of api-key header. USE_AUTHORIZATION_HEADER: bool = Field( - default=bool(os.getenv("AZURE_AI_USE_AUTHORIZATION_HEADER", False)), + default=bool( + os.getenv("AZURE_AI_USE_AUTHORIZATION_HEADER", "false").lower() + == "true" + ), description="Set to True to use Authorization header with Bearer token instead of api-key header.", ) @@ -197,10 +212,30 @@ class Valves(BaseModel): description='JSON configuration for data_sources field (for Azure AI Search / RAG). Example: \'[{"type":"azure_search","parameters":{"endpoint":"https://xxx.search.windows.net","index_name":"your-index","authentication":{"type":"api_key","key":"your-key"}}}]\'', ) - # Enable enhanced citation display for Azure AI Search responses - AZURE_AI_ENHANCE_CITATIONS: bool = Field( - default=bool(os.getenv("AZURE_AI_ENHANCE_CITATIONS", True)), - description="If True, enhance Azure AI Search responses with better citation formatting and source content display.", + # Enable relevance scores from Azure AI Search + AZURE_AI_INCLUDE_SEARCH_SCORES: bool = Field( + default=bool( + os.getenv("AZURE_AI_INCLUDE_SEARCH_SCORES", "true").lower() == "true" + ), + description="If True, automatically add 'include_contexts' with 'all_retrieved_documents' to Azure AI Search requests to get relevance scores (original_search_score and rerank_score). This enables relevance percentage display in citation cards.", + ) + + # BM25 score normalization factor for relevance percentage display + # BM25 scores are unbounded and vary by collection. This value is used to normalize + # scores to 0-1 range: normalized = min(score / BM25_SCORE_MAX, 1.0) + # See: https://learn.microsoft.com/en-us/azure/search/index-ranking-similarity + BM25_SCORE_MAX: float = Field( + default=float(os.getenv("AZURE_AI_BM25_SCORE_MAX", "100.0")), + description="Normalization divisor for BM25 search scores (0-1 range). Adjust based on your index characteristics. Default 100.0 is suitable for typical collections; higher values (e.g., 200.0) reduce saturation for large documents.", + ) + + # Rerank score normalization factor for relevance percentage display + # Cohere rerankers via Azure return 0-4, most others 0-1. This value normalizes + # scores above 1.0 to 0-1 range: normalized = min(score / RERANK_SCORE_MAX, 1.0) + # See: https://learn.microsoft.com/en-us/azure/search/semantic-ranking + RERANK_SCORE_MAX: float = Field( + default=float(os.getenv("AZURE_AI_RERANK_SCORE_MAX", "4.0")), + description="Normalization divisor for rerank scores (0-1 range). Use 4.0 for Cohere rerankers, 1.0 for standard semantic rerankers.", ) def __init__(self): @@ -316,34 +351,619 @@ def get_azure_ai_data_sources(self) -> Optional[List[Dict[str, Any]]]: Builds Azure AI data sources configuration from the AZURE_AI_DATA_SOURCES environment variable. Only works with Azure OpenAI endpoints: https://.openai.azure.com/openai/deployments//chat/completions?api-version=2025-01-01-preview + If AZURE_AI_INCLUDE_SEARCH_SCORES is enabled, automatically adds 'include_contexts' + with 'all_retrieved_documents' to get relevance scores from Azure AI Search. + Returns: List containing Azure AI data source configuration, or None if not configured. """ if not self.valves.AZURE_AI_DATA_SOURCES: return None + log = logging.getLogger("azure_ai.get_azure_ai_data_sources") + try: data_sources = json.loads(self.valves.AZURE_AI_DATA_SOURCES) - if isinstance(data_sources, list): - return data_sources - else: + if not isinstance(data_sources, list): # If it's a single object, wrap it in a list - return [data_sources] + data_sources = [data_sources] + + # If AZURE_AI_INCLUDE_SEARCH_SCORES is enabled, add include_contexts + if self.valves.AZURE_AI_INCLUDE_SEARCH_SCORES: + for source in data_sources: + if ( + isinstance(source, dict) + and source.get("type") == "azure_search" + and "parameters" in source + ): + params = source["parameters"] + # Get or create include_contexts list + include_contexts = params.get("include_contexts", []) + if not isinstance(include_contexts, list): + include_contexts = [include_contexts] + + # Add 'citations' and 'all_retrieved_documents' if not present + if "citations" not in include_contexts: + include_contexts.append("citations") + if "all_retrieved_documents" not in include_contexts: + include_contexts.append("all_retrieved_documents") + + params["include_contexts"] = include_contexts + log.debug( + f"Added include_contexts to Azure Search: {include_contexts}" + ) + + return data_sources except json.JSONDecodeError as e: # Log error and return None if JSON parsing fails - log = logging.getLogger("azure_ai.get_azure_ai_data_sources") log.error(f"Error parsing AZURE_AI_DATA_SOURCES: {e}") return None + def _extract_citations_from_response( + self, response_data: Dict[str, Any] + ) -> Optional[List[Dict[str, Any]]]: + """ + Extract citations from an Azure AI response (streaming or non-streaming). + + Supports both 'citations' and 'all_retrieved_documents' response structures. + When include_contexts includes 'all_retrieved_documents', the response contains + additional score fields like 'original_search_score' and 'rerank_score'. + + Args: + response_data: Response data from Azure AI (can be a delta or full message) + + Returns: + List of citation objects, or None if no citations found + """ + log = logging.getLogger("azure_ai._extract_citations_from_response") + + if not isinstance(response_data, dict): + log.debug(f"Response data is not a dict: {type(response_data)}") + return None + + # Try multiple possible locations for citations + citations = None + + # Check in choices[0].delta.context or choices[0].message.context + if "choices" in response_data and response_data["choices"]: + choice = response_data["choices"][0] + context = None + + # Get context from delta (streaming) or message (non-streaming) + if "delta" in choice and isinstance(choice["delta"], dict): + context = choice["delta"].get("context") + elif "message" in choice and isinstance(choice["message"], dict): + context = choice["message"].get("context") + + if context and isinstance(context, dict): + # Try citations first + if "citations" in context: + citations = context["citations"] + log.info( + f"Found {len(citations) if citations else 0} citations in context.citations" + ) + + # If all_retrieved_documents is present, merge score data into citations + if "all_retrieved_documents" in context: + all_docs = context["all_retrieved_documents"] + log.debug( + f"Found {len(all_docs) if all_docs else 0} all_retrieved_documents" + ) + + # If we have both citations and all_retrieved_documents, + # try to merge score data from all_retrieved_documents into citations + if citations and all_docs: + self._merge_score_data(citations, all_docs, log) + elif all_docs and not citations: + # Use all_retrieved_documents as citations if no citations found + citations = all_docs + log.info( + f"Using {len(citations)} all_retrieved_documents as citations" + ) + else: + log.debug( + f"No context found in response. Choice keys: {choice.keys() if isinstance(choice, dict) else 'not a dict'}" + ) + else: + log.debug(f"No choices in response. Response keys: {response_data.keys()}") + + if citations and isinstance(citations, list): + log.info(f"Extracted {len(citations)} citations from response") + # Log first citation structure for debugging (only if INFO logging is enabled) + if citations and log.isEnabledFor(logging.INFO): + log.info( + f"First citation structure: {json.dumps(citations[0], default=str)[:500]}" + ) + return citations + + log.debug("No valid citations found in response") + return None + + def _merge_score_data( + self, + citations: List[Dict[str, Any]], + all_docs: List[Dict[str, Any]], + log: logging.Logger, + ) -> None: + """ + Merge score data from all_retrieved_documents into citations. + + When include_contexts includes 'all_retrieved_documents', Azure returns + additional documents with score fields. This method attempts to match + them with citations and copy over the score data. + + Copies: + - original_search_score: BM25/keyword search score + - rerank_score: Semantic reranker score (if enabled) + - filter_reason: Indicates which score is relevant ("score" or "rerank") + + Args: + citations: List of citation objects to update (modified in place) + all_docs: List of all_retrieved_documents with score data + log: Logger instance + """ + # Build multiple lookup maps to maximize matching chances + # all_retrieved_documents may have different keys than citations + doc_data_by_title = {} + doc_data_by_filepath = {} + doc_data_by_content = {} + doc_data_by_chunk_id = {} + + for doc in all_docs: + doc_data = { + "original_search_score": doc.get("original_search_score"), + "rerank_score": doc.get("rerank_score"), + "filter_reason": doc.get("filter_reason"), + } + + log.debug( + f"Processing all_retrieved_document: title='{doc.get('title')}', " + f"chunk_id='{doc.get('chunk_id')}', " + f"original_search_score={doc_data['original_search_score']}, " + f"rerank_score={doc_data['rerank_score']}, " + f"filter_reason={doc_data['filter_reason']}" + ) + + # Only store if we have at least one score + if ( + doc_data["original_search_score"] is None + and doc_data["rerank_score"] is None + ): + log.debug(f"Skipping doc with no scores: {doc.get('title')}") + continue + + # Index by title + if doc.get("title"): + doc_data_by_title[doc["title"]] = doc_data + + # Index by filepath + if doc.get("filepath"): + doc_data_by_filepath[doc["filepath"]] = doc_data + + # Index by chunk_id (may include title as prefix for uniqueness) + if doc.get("chunk_id") is not None: + # Store by plain chunk_id + doc_data_by_chunk_id[str(doc["chunk_id"])] = doc_data + # Also store by title-prefixed chunk_id for uniqueness + if doc.get("title"): + chunk_key_with_title = f"{doc['title']}_{doc['chunk_id']}" + doc_data_by_chunk_id[chunk_key_with_title] = doc_data + + # Index by content prefix (first 100 chars) + if doc.get("content"): + content_key = ( + doc["content"][:100] + if len(doc.get("content", "")) > 100 + else doc.get("content") + ) + doc_data_by_content[content_key] = doc_data + + log.debug( + f"Built score lookup: by_title={len(doc_data_by_title)}, " + f"by_filepath={len(doc_data_by_filepath)}, " + f"by_chunk_id={len(doc_data_by_chunk_id)}, " + f"by_content={len(doc_data_by_content)}" + ) + + # Match citations with score data using multiple strategies + matched = 0 + for citation in citations: + doc_data = None + + # Try matching by title first (most reliable) + if not doc_data and citation.get("title"): + doc_data = doc_data_by_title.get(citation["title"]) + if doc_data: + log.debug(f"Matched citation by title: {citation['title']}") + + # Try matching by filepath + if not doc_data and citation.get("filepath"): + doc_data = doc_data_by_filepath.get(citation["filepath"]) + if doc_data: + log.debug(f"Matched citation by filepath: {citation['filepath']}") + + # Try matching by chunk_id with title prefix + if not doc_data and citation.get("chunk_id") is not None: + chunk_key = str(citation["chunk_id"]) + if citation.get("title"): + chunk_key_with_title = f"{citation['title']}_{citation['chunk_id']}" + doc_data = doc_data_by_chunk_id.get(chunk_key_with_title) + if not doc_data: + doc_data = doc_data_by_chunk_id.get(chunk_key) + if doc_data: + log.debug(f"Matched citation by chunk_id: {citation['chunk_id']}") + + # Try matching by content prefix + if not doc_data and citation.get("content"): + content_key = ( + citation["content"][:100] + if len(citation.get("content", "")) > 100 + else citation.get("content") + ) + doc_data = doc_data_by_content.get(content_key) + if doc_data: + log.debug("Matched citation by content prefix") + + if doc_data: + if doc_data.get("original_search_score") is not None: + citation["original_search_score"] = doc_data[ + "original_search_score" + ] + if doc_data.get("rerank_score") is not None: + citation["rerank_score"] = doc_data["rerank_score"] + if doc_data.get("filter_reason") is not None: + citation["filter_reason"] = doc_data["filter_reason"] + matched += 1 + log.debug( + f"Citation scores: original={doc_data.get('original_search_score')}, " + f"rerank={doc_data.get('rerank_score')}, " + f"filter_reason={doc_data.get('filter_reason')}" + ) + + log.info(f"Merged score data for {matched}/{len(citations)} citations") + + def _normalize_citation_for_openwebui( + self, citation: Dict[str, Any], index: int + ) -> Dict[str, Any]: + """ + Normalize an Azure citation object to OpenWebUI citation event format. + + The format follows OpenWebUI's official citation event structure: + https://docs.openwebui.com/features/plugin/development/events#source-or-citation-and-code-execution + + Args: + citation: Azure citation object + index: Citation index (1-based) + + Returns: + Complete citation event object with type and data fields + """ + log = logging.getLogger("azure_ai._normalize_citation_for_openwebui") + + # Get title with fallback chain: title → filepath → url → "Unknown Document" + # Handle None values explicitly since dict.get() returns None if key exists but value is None + title_raw = citation.get("title") or "" + filepath_raw = citation.get("filepath") or "" + url_raw = citation.get("url") or "" + + base_title = ( + title_raw.strip() + or filepath_raw.strip() + or url_raw.strip() + or "Unknown Document" + ) + # Include [docX] prefix in OpenWebUI citation card titles for document identification + title = f"[doc{index}] - {base_title}" + + # Build source URL for metadata + source_url = url_raw or filepath_raw + + # Build metadata with source information + # Use title with [docX] prefix as metadata source for OpenWebUI display + # The UI may extract display name from metadata.source rather than source.name + metadata_entry = {"source": title, "url": source_url} + if citation.get("metadata"): + metadata_entry.update(citation.get("metadata", {})) + + # Get document content (handle None values) + content = citation.get("content") or "" + + # Build normalized citation data structure matching OpenWebUI format exactly + citation_data = { + "document": [content], + "metadata": [metadata_entry], + "source": {"name": title}, + } + + # Add URL to source if available + if source_url: + citation_data["source"]["url"] = source_url + + # Add distances array for relevance score (OpenWebUI uses this for percentage display) + # Azure AI Search returns filter_reason to indicate which score type is relevant: + # - filter_reason not present or "score": use original_search_score (BM25/keyword) + # - filter_reason "rerank": use rerank_score (semantic reranker) + # Reference: https://learn.microsoft.com/en-us/azure/ai-foundry/openai/references/on-your-data + filter_reason = citation.get("filter_reason") + rerank_score = citation.get("rerank_score") + original_search_score = citation.get("original_search_score") + legacy_score = citation.get("score") + + normalized_score = 0.0 + + # Select score based on filter_reason as per Azure documentation: + # - filter_reason="rerank": Document filtered by rerank score threshold, use rerank_score + # - filter_reason="score" or not present: Document filtered by/passed original search score, use original_search_score + if filter_reason == "rerank" and rerank_score is not None: + # Document filtered by rerank score - use rerank_score + # Cohere rerankers via Azure AI return scores in 0-4 range (source: Azure AI Search documentation) + # Most semantic rerankers return 0-1, so we normalize 0-4 range down to 0-1 for consistency. + # Reference: https://learn.microsoft.com/en-us/azure/search/semantic-ranking + score_val = float(rerank_score) + if score_val > 1.0: + normalized_score = min(score_val / self.valves.RERANK_SCORE_MAX, 1.0) + else: + normalized_score = score_val + log.debug( + f"Using rerank_score (filter_reason=rerank): {rerank_score} -> {normalized_score} " + f"(normalized via {self.valves.RERANK_SCORE_MAX})" + ) + elif ( + filter_reason is None or filter_reason == "score" + ) and original_search_score is not None: + # filter_reason is "score" or not present - use original_search_score + # BM25 scores are unbounded and vary by collection size and term distribution. + # We normalize by dividing by BM25_SCORE_MAX to produce a value in 0-1 range. + # This preserves relative ranking without hard-capping high-relevance documents. + # Reference: https://learn.microsoft.com/en-us/azure/search/index-ranking-similarity + score_val = float(original_search_score) + if score_val > 1.0: + normalized_score = min(score_val / self.valves.BM25_SCORE_MAX, 1.0) + else: + normalized_score = score_val + log.debug( + f"Using original_search_score (filter_reason={filter_reason}): {original_search_score} -> {normalized_score} " + f"(normalized via {self.valves.BM25_SCORE_MAX})" + ) + elif original_search_score is not None: + # Fallback for unknown filter_reason values - use original_search_score + score_val = float(original_search_score) + if score_val > 1.0: + normalized_score = min(score_val / self.valves.BM25_SCORE_MAX, 1.0) + else: + normalized_score = score_val + log.debug( + f"Using original_search_score (fallback, filter_reason={filter_reason}): {original_search_score} -> {normalized_score} " + f"(normalized via {self.valves.BM25_SCORE_MAX})" + ) + elif rerank_score is not None: + # Fallback to rerank_score if available but filter_reason doesn't match + score_val = float(rerank_score) + if score_val > 1.0: + normalized_score = min(score_val / self.valves.RERANK_SCORE_MAX, 1.0) + else: + normalized_score = score_val + log.debug( + f"Using rerank_score (fallback): {rerank_score} -> {normalized_score} " + f"(normalized via {self.valves.RERANK_SCORE_MAX})" + ) + elif legacy_score is not None: + normalized_score = float(legacy_score) + log.debug(f"Using legacy score: {legacy_score}") + else: + log.debug("No score available, using default 0.0") + + citation_data["distances"] = [normalized_score] + + # Build complete citation event structure + citation_event = { + "type": "citation", + "data": citation_data, + } + + # Log the normalized citation for debugging (only if INFO logging is enabled) + if log.isEnabledFor(logging.INFO): + log.info( + f"Normalized citation {index}: title='{title}', " + f"content_length={len(content)}, " + f"url='{source_url}', " + f"filter_reason={filter_reason}, " + f"rerank_score={rerank_score}, original_search_score={original_search_score}, " + f"distances={citation_data['distances']}, " + f"event={json.dumps(citation_event, default=str)[:500]}" + ) + + return citation_event + + def _build_citation_urls_map( + self, citations: Optional[List[Dict[str, Any]]] + ) -> Dict[int, Optional[str]]: + """ + Build a mapping of citation indices to document URLs. + + Args: + citations: List of citation objects with title, filepath, url, etc. + + Returns: + Dict mapping 1-based citation index to URL (or None if no URL available) + """ + citation_urls: Dict[int, Optional[str]] = {} + if not citations: + return citation_urls + + for i, citation in enumerate(citations, 1): + if isinstance(citation, dict): + # Get URL with fallback to filepath + url = citation.get("url") or "" + filepath = citation.get("filepath") or "" + + citation_url = url.strip() or filepath.strip() or None + citation_urls[i] = citation_url + + return citation_urls + + def _format_citation_link(self, doc_num: int, url: Optional[str] = None) -> str: + """ + Format a markdown link for a [docX] reference. + + If a URL is available, creates a clickable markdown link. + Otherwise, returns the original [docX] reference. + + Args: + doc_num: The document number (1-based) + url: Optional URL for the document + + Returns: + Formatted markdown link string or original [docX] reference + """ + if url: + # Create markdown link: [[doc1]](url) + return f"[[doc{doc_num}]]({url})" + else: + # No URL available, keep original reference + return f"[doc{doc_num}]" + + def _convert_doc_refs_to_links( + self, content: str, citations: List[Dict[str, Any]] + ) -> str: + """ + Convert [docX] references in content to markdown links with document URLs. + + If a citation has a URL, [doc1] becomes [[doc1]](url). This creates clickable + links to the source documents in the response. + + Args: + content: The response content containing [docX] references + citations: List of citation objects with title, url, etc. + + Returns: + Content with [docX] references converted to markdown links + """ + if not content or not citations: + return content + + log = logging.getLogger("azure_ai._convert_doc_refs_to_links") + + # Build a mapping of citation index to URL + citation_urls = self._build_citation_urls_map(citations) + + def replace_doc_ref(match): + """Replace [docX] with [[docX]](url) if URL available""" + doc_num = int(match.group(1)) + url = citation_urls.get(doc_num) + return self._format_citation_link(doc_num, url) + + # Replace all [docX] references + converted = re.sub(self.DOC_REF_PATTERN, replace_doc_ref, content) + + # Count conversions for logging + original_count = len(re.findall(self.DOC_REF_PATTERN, content)) + linked_count = sum( + 1 for i in range(1, len(citations) + 1) if citation_urls.get(i) + ) + if original_count > 0: + log.info( + f"Converted {original_count} [docX] references to markdown links ({linked_count} with URLs)" + ) + + return converted + + async def _emit_openwebui_citation_events( + self, + citations: List[Dict[str, Any]], + __event_emitter__: Optional[Callable[..., Any]], + content: str = "", + ) -> None: + """ + Emit OpenWebUI citation events for citations. + + Emits one citation event per source document, following the OpenWebUI + citation event format. Each citation is emitted separately to ensure + all sources appear in the UI. + + Only emits citations that are actually referenced in the content (e.g., [doc1], [doc2]). + + Args: + citations: List of Azure citation objects + __event_emitter__: Event emitter callable for sending citation events + content: The response content (used to filter only referenced citations) + """ + log = logging.getLogger("azure_ai._emit_openwebui_citation_events") + + if not __event_emitter__: + log.warning("No __event_emitter__ provided, cannot emit citation events") + return + + if not citations: + log.info("No citations to emit") + return + + # Extract which citations are actually referenced in the content + referenced_indices = self._extract_referenced_citations(content) + + # If we couldn't find any references, include all citations (backward compatibility) + if not referenced_indices: + referenced_indices = set(range(1, len(citations) + 1)) + log.debug( + f"No [docX] references found in content, including all {len(citations)} citations" + ) + else: + log.info( + f"Found {len(referenced_indices)} referenced citations: {sorted(referenced_indices)}" + ) + + log.info( + f"Emitting citation events for {len(referenced_indices)} referenced citations via __event_emitter__" + ) + + emitted_count = 0 + for i, citation in enumerate(citations, 1): + # Skip citations that are not referenced in the content + if i not in referenced_indices: + log.debug(f"Skipping citation {i} - not referenced in content") + continue + + if not isinstance(citation, dict): + log.warning(f"Citation {i} is not a dict, skipping: {type(citation)}") + continue + + try: + normalized = self._normalize_citation_for_openwebui(citation, i) + + # Log the full citation JSON for debugging + # log.debug( + # f"Full citation event JSON for doc{i}: {json.dumps(normalized, default=str)}" + # ) + + # Emit citation event for this individual source + source_name = ( + normalized.get("data", {}).get("source", {}).get("name", "unknown") + ) + log.info( + f"Emitting citation event {i}/{len(citations)} with source.name='{source_name}'" + ) + await __event_emitter__(normalized) + emitted_count += 1 + + log.info(f"Successfully emitted citation event for doc{i}") + + except Exception as e: + log.exception(f"Failed to emit citation event for citation {i}: {e}") + + log.info( + f"Finished emitting {emitted_count}/{len(referenced_indices)} citation events" + ) + def enhance_azure_search_response(self, response: Dict[str, Any]) -> Dict[str, Any]: """ - Enhances Azure AI Search responses by improving citation display and adding source content. + Enhance Azure AI Search responses by converting [docX] references to markdown links. + Modifies the response in-place and returns it. Args: - response: The original response from Azure AI + response: The original response from Azure AI (modified in-place) Returns: - Enhanced response with better citation formatting + The enhanced response with markdown links for citations """ if not isinstance(response, dict): return response @@ -365,37 +985,12 @@ def enhance_azure_search_response(self, response: Dict[str, Any]) -> Dict[str, A citations = context["citations"] content = message["content"] - # Create citation mappings - citation_details = {} - for i, citation in enumerate(citations, 1): - if not isinstance(citation, dict): - continue - - doc_ref = f"[doc{i}]" - citation_details[doc_ref] = { - "title": citation.get("title", "Unknown Document"), - "content": citation.get("content", ""), - "url": citation.get("url"), - "filepath": citation.get("filepath"), - "chunk_id": citation.get("chunk_id", "0"), - } - - # Enhance the content with better citation display - enhanced_content = content - - # Add citation section at the end - if citation_details: - citation_section = self._format_citation_section( - citations, content, for_streaming=False - ) - enhanced_content += citation_section + # Convert [docX] references to markdown links + enhanced_content = self._convert_doc_refs_to_links(content, citations) # Update the message content message["content"] = enhanced_content - # Add enhanced citation info to context for API consumers - context["enhanced_citations"] = citation_details - return response except Exception as e: @@ -587,16 +1182,20 @@ async def stream_processor_with_citations( full_response_buffer = "" response_content = "" # Track the actual response content citations_data = None - citations_added = False - all_chunks = [] + citation_urls = {} # Pre-allocate citation URLs map + + # Pre-define the replacement function outside the loop to avoid repeated creation + def replace_ref(m, urls_map): + doc_num = int(m.group(1)) + url = urls_map.get(doc_num) + return self._format_citation_link(doc_num, url) async for chunk in content: chunk_str = chunk.decode("utf-8", errors="ignore") full_response_buffer += chunk_str - all_chunks.append(chunk) # Log chunk for debugging (only first 200 chars to avoid spam) - log.debug(f"Processing chunk: {chunk_str[:200]}...") + # log.debug(f"Processing chunk: {chunk_str[:200]}...") # Extract content from delta messages to build the full response content try: @@ -625,9 +1224,14 @@ async def stream_processor_with_citations( except Exception as e: log.debug(f"Exception while processing chunk: {e}") - # Look for citations in any part of the response - if "citations" in chunk_str.lower() and not citations_data: - log.debug("Found 'citations' in chunk, attempting to parse...") + # Look for citations or all_retrieved_documents in any part of the response + if ( + "citations" in chunk_str.lower() + or "all_retrieved_documents" in chunk_str.lower() + ) and not citations_data: + log.debug( + "Found 'citations' or 'all_retrieved_documents' in chunk, attempting to parse..." + ) # Try to extract citation data from the current buffer try: @@ -645,54 +1249,83 @@ async def stream_processor_with_citations( # Check multiple possible locations for citations citations_found = None + all_docs_found = None if ( isinstance(response_data, dict) and "choices" in response_data ): for choice in response_data["choices"]: - # Check in delta.context.citations - if ( - "delta" in choice - and isinstance( - choice["delta"], dict - ) - and "context" in choice["delta"] - and "citations" - in choice["delta"]["context"] + context = None + # Get context from delta or message + if "delta" in choice and isinstance( + choice["delta"], dict ): - citations_found = choice["delta"][ + context = choice["delta"].get( "context" - ]["citations"] - log.debug( - f"Found citations in delta.context: {len(citations_found)} citations" - ) - break - - # Check in message.context.citations - elif ( - "message" in choice - and isinstance( - choice["message"], dict ) - and "context" in choice["message"] - and "citations" - in choice["message"]["context"] + elif "message" in choice and isinstance( + choice["message"], dict ): - citations_found = choice["message"][ + context = choice["message"].get( "context" - ]["citations"] - log.debug( - f"Found citations in message.context: {len(citations_found)} citations" ) + + if context and isinstance( + context, dict + ): + # Check for citations + if "citations" in context: + citations_found = context[ + "citations" + ] + log.debug( + f"Found citations in context: {len(citations_found)} citations" + ) + # Check for all_retrieved_documents + if ( + "all_retrieved_documents" + in context + ): + all_docs_found = context[ + "all_retrieved_documents" + ] + log.debug( + f"Found all_retrieved_documents in context: {len(all_docs_found)} docs" + ) break - # Store the first valid citations we find + # Merge score data if we have both + if citations_found and all_docs_found: + self._merge_score_data( + citations_found, all_docs_found, log + ) + + # Use citations if found, otherwise use all_retrieved_documents if citations_found and not citations_data: citations_data = citations_found + # Build citation URLs map once when citations are found + citation_urls = ( + self._build_citation_urls_map( + citations_data + ) + ) log.info( f"Successfully extracted {len(citations_data)} citations from stream" ) + elif all_docs_found and not citations_data: + citations_data = all_docs_found + # Build citation URLs map once when citations are found + citation_urls = ( + self._build_citation_urls_map( + citations_data + ) + ) + log.info( + f"Using {len(citations_data)} all_retrieved_documents as citations" + ) + # Note: OpenWebUI citation events are emitted after the stream ends + # to filter only citations referenced in the response content except json.JSONDecodeError: # Skip invalid JSON @@ -701,7 +1334,91 @@ async def stream_processor_with_citations( except Exception as parse_error: log.debug(f"Error parsing citations from chunk: {parse_error}") - # Always yield the original chunk first + # Convert [docX] references to markdown links in the chunk content + # This creates clickable links to source documents in streaming responses + chunk_modified = False + if "[doc" in chunk_str and citation_urls: + try: + # Parse and modify each SSE data line + modified_lines = [] + chunk_lines = chunk_str.split("\n") + + for line in chunk_lines: + # Early exit: skip lines without [doc references + if "[doc" not in line: + modified_lines.append(line) + continue + + # Process only SSE data lines + if ( + line.startswith("data: ") + and line.strip() != "data: [DONE]" + ): + json_str = line[6:].strip() + if json_str and json_str != "[DONE]": + try: + data = json.loads(json_str) + if ( + isinstance(data, dict) + and "choices" in data + and data["choices"] + ): + line_modified = False + # Process choices until we find and modify content + for choice in data["choices"]: + if ( + "delta" in choice + and "content" in choice["delta"] + ): + content_val = choice["delta"][ + "content" + ] + if "[doc" in content_val: + # Convert [docX] to markdown link using pre-compiled pattern + # Use lambda to pass citation_urls to pre-defined function + choice["delta"]["content"] = ( + self.DOC_REF_PATTERN.sub( + lambda m: replace_ref( + m, citation_urls + ), + content_val, + ) + ) + line_modified = True + # Early exit: content found and modified + break + + if line_modified: + modified_lines.append( + f"data: {json.dumps(data)}" + ) + chunk_modified = True + else: + modified_lines.append(line) + else: + modified_lines.append(line) + except json.JSONDecodeError: + modified_lines.append(line) + else: + modified_lines.append(line) + else: + modified_lines.append(line) + + # Reconstruct the chunk only if something was modified + if chunk_modified: + modified_chunk_str = "\n".join(modified_lines) + log.debug( + "Converted [docX] references to markdown links in streaming chunk" + ) + chunk = modified_chunk_str.encode("utf-8") + + except Exception as convert_err: + log.debug( + f"Error converting [docX] to markdown links: {convert_err}" + ) + # Fall through to yield original chunk + + # Yield the (possibly modified) chunk yield chunk # Check if this is the end of the stream @@ -709,42 +1426,13 @@ async def stream_processor_with_citations( log.debug("End of stream detected") break - # After the stream ends, add citations if we found any - if citations_data and not citations_added: - log.info("Adding citation summary at end of stream...") - - # Pass the accumulated response content to filter citations - citation_section = self._format_citation_section( - citations_data, response_content, for_streaming=True + # After the stream ends, emit OpenWebUI citation events + if citations_data and __event_emitter__: + log.info("Emitting OpenWebUI citation events at end of stream...") + # Filter to only citations referenced in the response content + await self._emit_openwebui_citation_events( + citations_data, __event_emitter__, response_content ) - if citation_section: - # Convert escaped newlines to actual newlines for display - display_section = citation_section.replace("\\n", "\n") - - # Send the citation section in smaller, safer chunks - # Split by lines and send each as a separate SSE event - lines = display_section.split("\n") - - for line in lines: - # Escape quotes and backslashes for JSON - safe_line = line.replace("\\", "\\\\").replace('"', '\\"') - # Create a simple SSE event - sse_event = f'data: {{"choices":[{{"delta":{{"content":"{safe_line}\\n"}}}}]}}\n\n' - yield sse_event.encode("utf-8") - - citations_added = True - log.info("Citation summary successfully added to stream") - - # If we didn't find citations in the stream but detected citation references, - # try one more time with the full buffer - elif not citations_data and "[doc" in full_response_buffer: - log.warning( - "Found [doc] references but no citation data - attempting final parse..." - ) - # This is a fallback for cases where citation detection failed - fallback_message = "\\n\\n
\\n⚠️ Citations Processing Issue\\n\\nThe response contains citation references [doc1], [doc2], etc., but the citation details could not be extracted from the streaming response.\\n\\n
\\n" - fallback_sse = f'data: {{"choices":[{{"delta":{{"content":"{fallback_message}"}}}}]}}\n\n' - yield fallback_sse.encode("utf-8") # Send completion status update when streaming is done if __event_emitter__: @@ -790,131 +1478,12 @@ def _extract_referenced_citations(self, content: str) -> Set[int]: Returns: Set of citation indices that are referenced (e.g., {1, 2, 7, 8, 9}) """ - # Find all [docN] references in the content - pattern = r"\[doc(\d+)\]" - matches = re.findall(pattern, content) + # Find all [docN] references in the content using class constant + matches = re.findall(self.DOC_REF_PATTERN, content) # Convert to integers and return as a set return {int(match) for match in matches} - def _format_citation_section( - self, - citations: List[Dict[str, Any]], - content: str = "", - for_streaming: bool = False, - ) -> str: - """ - Creates a formatted citation section using collapsible details elements. - Only includes citations that are actually referenced in the content. - - Args: - citations: List of citation objects - content: The response content (used to filter only referenced citations) - for_streaming: If True, format for streaming (with escaping), else for regular response - - Returns: - Formatted citation section with HTML details elements - """ - if not citations: - return "" - - # Extract which citations are actually referenced in the content - referenced_indices = self._extract_referenced_citations(content) - - # If we couldn't find any references, include all citations (backward compatibility) - if not referenced_indices: - referenced_indices = set(range(1, len(citations) + 1)) - - # Collect only referenced citation details - citation_entries = [] - - for i, citation in enumerate(citations, 1): - # Skip citations that are not referenced in the content - if i not in referenced_indices: - continue - - if not isinstance(citation, dict): - continue - - doc_ref = f"[doc{i}]" - - # Get title with fallback to filepath or url - title = citation.get("title", "") - # Check if title is empty (not just None) and use alternatives - if not title or not title.strip(): - # Try filepath first - filepath = citation.get("filepath", "") - if filepath and filepath.strip(): - title = filepath - else: - # Try url next - url = citation.get("url", "") - if url and url.strip(): - title = url - else: - # Final fallback - title = "Unknown Document" - - content_text = citation.get("content", "") - filepath = citation.get("filepath", "") - url = citation.get("url", "") - chunk_id = citation.get("chunk_id", "") - - # Build individual citation details - citation_info = [] - - # Show filepath if available and not empty - if filepath and filepath.strip(): - citation_info.append(f"📁 **File:** `{filepath}`") - # Show URL if available, not empty, and no filepath was shown - elif url and url.strip(): - citation_info.append(f"🔗 **URL:** {url}") - - # Show chunk_id if available and not empty - if chunk_id is not None and str(chunk_id).strip(): - citation_info.append(f"📄 **Chunk ID:** {chunk_id}") - - # Add full content if available - if content_text and str(content_text).strip(): - try: - # Clean content for display - clean_content = str(content_text).strip() - if for_streaming: - # Additional escaping for streaming - clean_content = clean_content.replace("\\", "\\\\").replace( - '"', '\\"' - ) - - citation_info.append("**Content:**") - citation_info.append(f"> {clean_content}") - except Exception: - citation_info.append("**Content:** [Content unavailable]") - - # Create collapsible details for individual citation - if for_streaming: - # For streaming, we need to escape newlines - citation_content = "\\n".join(citation_info) - citation_entry = f"
\\n{doc_ref} - {title}\\n\\n{citation_content}\\n\\n
" - else: - citation_content = "\n".join(citation_info) - citation_entry = f"
\n{doc_ref} - {title}\n\n{citation_content}\n\n
" - - citation_entries.append(citation_entry) - - # Only create the section if we have citations to show - if not citation_entries: - return "" - - # Combine all citations into main collapsible section - if for_streaming: - all_citations = "\\n\\n".join(citation_entries) - result = f"\\n\\n
\\n📚 Sources and References\\n\\n{all_citations}\\n\\n
\\n" - else: - all_citations = "\n\n".join(citation_entries) - result = f"\n\n
\n📚 Sources and References\n\n{all_citations}\n\n
\n" - - return result - async def stream_processor( self, content: aiohttp.StreamReader, @@ -1126,11 +1695,8 @@ async def pipe( sse_headers["Content-Type"] = "text/event-stream" sse_headers.pop("Content-Length", None) - # Use enhanced stream processor if Azure AI Search is configured and citations are enabled - if ( - self.valves.AZURE_AI_DATA_SOURCES - and self.valves.AZURE_AI_ENHANCE_CITATIONS - ): + # Use enhanced stream processor if Azure AI Search is configured + if self.valves.AZURE_AI_DATA_SOURCES: stream_processor = self.stream_processor_with_citations else: stream_processor = self.stream_processor @@ -1160,14 +1726,27 @@ async def pipe( request.raise_for_status() - # Enhance Azure Search responses with better citation display - if ( - isinstance(response, dict) - and self.valves.AZURE_AI_DATA_SOURCES - and self.valves.AZURE_AI_ENHANCE_CITATIONS - ): + # Enhance Azure Search responses with citation linking and emit citation events + if isinstance(response, dict) and self.valves.AZURE_AI_DATA_SOURCES: response = self.enhance_azure_search_response(response) + # Emit OpenWebUI citation events for non-streaming responses + if __event_emitter__: + citations = self._extract_citations_from_response(response) + if citations: + # Get response content for filtering + response_content = "" + if ( + isinstance(response, dict) + and "choices" in response + and response["choices"] + ): + message = response["choices"][0].get("message", {}) + response_content = message.get("content", "") + await self._emit_openwebui_citation_events( + citations, __event_emitter__, response_content + ) + # Send completion status update if __event_emitter__: await __event_emitter__(