diff --git a/CHANGELOG.md b/CHANGELOG.md index e0e8d5f..0a3587e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.32.0] - 2026-01-02 +### Added +- Support for classify log method + ## [0.31.0] - 2025-12-29 ### Added - Support for list featured content rules method diff --git a/CLI.md b/CLI.md index ffe7455..caadf2d 100644 --- a/CLI.md +++ b/CLI.md @@ -249,10 +249,15 @@ secops log types --search "windows" # Fetch specific page using token secops log types --page-size 50 --page-token "next_page_token" -# Search for log types -secops log types --search "firewall" +# Classify logs to predict log type: +secops log classify --log '{"eventType": "user.session.start", "actor": {"alternateId": "user@example.com"}}' + +# Classify a log from a file +secops log classify --log /path/to/log_file.json ``` +> **Note:** The classify command returns predictions sorted by confidence score. Confidence scores are provided by the API as guidance only and may not always accurately reflect classification certainty. Use scores for relative ranking rather than absolute confidence. + > **Note:** Chronicle uses parsers to process and normalize raw log data into UDM format. If you're ingesting logs for a custom format, you may need to create or configure parsers. See the [Parser Management](#parser-management) section for details on managing parsers. ### Forwarder Management diff --git a/README.md b/README.md index 559c683..d904d76 100644 --- a/README.md +++ b/README.md @@ -389,7 +389,20 @@ else: print("Invalid log type") ``` -4. Use custom forwarders: +4. Classify logs to predict log type: +```python +# Classify a raw log to determine its type +okta_log = '{"eventType": "user.session.start", "actor": {"alternateId": "user@example.com"}}' +predictions = chronicle.classify_logs(log_data=okta_log) + +# Display predictions sorted by confidence score +for prediction in predictions: + print(f"Log Type: {prediction['logType']}, Score: {prediction['score']}") +``` + +> **Note:** Confidence scores are provided by the API as guidance only and may not always accurately reflect classification certainty. Use scores for relative ranking rather than absolute confidence. + +5. Use custom forwarders: ```python # Create or get a custom forwarder forwarder = chronicle.get_or_create_forwarder(display_name="MyCustomForwarder") diff --git a/api_module_mapping.md b/api_module_mapping.md index 21207be..7684b74 100644 --- a/api_module_mapping.md +++ b/api_module_mapping.md @@ -311,7 +311,7 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/ |logProcessingPipelines.list |v1alpha|chronicle.log_processing_pipelines.list_log_processing_pipelines|secops log-processing list | |logProcessingPipelines.patch |v1alpha|chronicle.log_processing_pipelines.update_log_processing_pipeline|secops log-processing update | |logProcessingPipelines.testPipeline |v1alpha|chronicle.log_processing_pipelines.test_pipeline |secops log-processing test | -|logs.classify |v1alpha| | | +|logs.classify |v1alpha|chronicle.log_types.classify_logs |secops log classify | | nativeDashboards.addChart | v1alpha |chronicle.dashboard.add_chart |secops dashboard add-chart | | nativeDashboards.create | v1alpha |chronicle.dashboard.create_dashboard |secops dashboard create | | nativeDashboards.delete | v1alpha |chronicle.dashboard.delete_dashboard |secops dashboard delete | diff --git a/examples/classify_logs_example.py b/examples/classify_logs_example.py new file mode 100644 index 0000000..2b38703 --- /dev/null +++ b/examples/classify_logs_example.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Example demonstrating log type classification with Chronicle.""" + +import argparse +import json +from datetime import datetime, timezone + +from secops import SecOpsClient +from secops.exceptions import APIError + + +def create_sample_okta_log(username: str = "jdoe@example.com") -> str: + """Create a sample OKTA log in JSON format. + + Args: + username: The username to include in the log. + + Returns: + A JSON string representing an OKTA log. + """ + current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + okta_log = { + "actor": {"displayName": "Joe Doe", "alternateId": username}, + "client": { + "ipAddress": "192.168.1.100", + "userAgent": {"os": "Mac OS X", "browser": "SAFARI"}, + }, + "displayMessage": "User login to Okta", + "eventType": "user.session.start", + "outcome": {"result": "SUCCESS"}, + "published": current_time, + } + + return json.dumps(okta_log) + + +def create_sample_windows_log(username: str = "user123") -> str: + """Create a sample Windows XML log. + + Args: + username: The username to include in the log. + + Returns: + An XML string representing a Windows Event log. + """ + current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + return f""" + + + 4624 + 1 + 0 + 12544 + 0 + 0x8020000000000000 + + 202117513 + + + Security + WIN-SERVER.xyz.net + + + + S-1-0-0 + - + {username} + CLIENT-PC + 3 + +""" + + +def create_sample_aws_cloudtrail_log() -> str: + """Create a sample AWS CloudTrail log. + + Returns: + A JSON string representing an AWS CloudTrail log. + """ + current_time = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + cloudtrail_log = { + "eventVersion": "1.05", + "userIdentity": { + "type": "IAMUser", + "principalId": "AIDAI1234EXAMPLE", + "arn": "arn:aws:iam::123456789012:user/admin", + "accountId": "123456789012", + "accessKeyId": "AKIAI1234EXAMPLE", + "userName": "admin", + }, + "eventTime": current_time, + "eventSource": "s3.amazonaws.com", + "eventName": "GetObject", + "awsRegion": "us-east-1", + "sourceIPAddress": "192.0.2.1", + "userAgent": "aws-cli/2.1.0", + "requestParameters": { + "bucketName": "my-bucket", + "key": "example-file.txt", + }, + "responseElements": None, + "requestID": "1234567890ABCDEF", + "eventID": "abcd1234-5678-90ef-ghij-klmnopqrstuv", + "eventType": "AwsApiCall", + "recipientAccountId": "123456789012", + } + + return json.dumps(cloudtrail_log) + + +def log_classification(chronicle_client): + """Raw log classification.""" + print("\n=== Log Type Classification Example ===\n") + + okta_log = create_sample_okta_log() + print(f"Classifying OKTA log...") + print(f"Raw log sample: {okta_log[:100]}...\n") + + try: + log_type_predictions = chronicle_client.classify_logs(log_data=okta_log) + + if log_type_predictions: + print("Predictions (sorted by confidence):") + for idx, pred in enumerate(log_type_predictions[:5], 1): + log_type = pred.get("logType", "Unknown") + score = pred.get("score", 0) + print(f" {idx}. {log_type}: {score:.2%}") + + top_pred = log_type_predictions[0] + print(f"\nTop prediction: {top_pred.get('logType')}") + else: + print("No predictions available") + + except APIError as e: + print(f"Error classifying log: {e}") + except ValueError as e: + print(f"Validation error: {e}") + + +def main(): + """Run the example.""" + parser = argparse.ArgumentParser( + description="Example of log type classification with Chronicle" + ) + parser.add_argument( + "--customer-id", + "--customer_id", + required=True, + help="Chronicle instance ID", + ) + parser.add_argument( + "--project-id", "--project_id", required=True, help="GCP project ID" + ) + parser.add_argument("--region", default="us", help="Chronicle API region") + + args = parser.parse_args() + + client = SecOpsClient() + + chronicle = client.chronicle( + customer_id=args.customer_id, + project_id=args.project_id, + region=args.region, + ) + + log_classification(chronicle) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index d211250..84981c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "secops" -version = "0.31.0" +version = "0.32.0" description = "Python SDK for wrapping the Google SecOps API for common use cases" readme = "README.md" requires-python = ">=3.10" diff --git a/src/secops/chronicle/__init__.py b/src/secops/chronicle/__init__.py index 8f250b1..3c6010a 100644 --- a/src/secops/chronicle/__init__.py +++ b/src/secops/chronicle/__init__.py @@ -85,6 +85,7 @@ update_log_processing_pipeline, ) from secops.chronicle.log_types import ( + classify_logs, get_all_log_types, get_log_type_description, is_valid_log_type, @@ -219,6 +220,7 @@ "extract_forwarder_id", "update_forwarder", # Log Types + "classify_logs", "get_all_log_types", "is_valid_log_type", "get_log_type_description", diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index 9d12388..0c8773f 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -108,6 +108,7 @@ from secops.chronicle.log_ingest import ingest_udm as _ingest_udm from secops.chronicle.log_ingest import list_forwarders as _list_forwarders from secops.chronicle.log_ingest import update_forwarder as _update_forwarder +from secops.chronicle.log_types import classify_logs as _classify_logs from secops.chronicle.log_types import get_all_log_types as _get_all_log_types from secops.chronicle.log_types import ( get_log_type_description as _get_log_type_description, @@ -3077,6 +3078,29 @@ def search_log_types( client=self, ) + def classify_logs( + self, + log_data: str, + ) -> list[dict[str, Any]]: + """Classify a raw log to predict its log type. + + Args: + log_data: Raw log string + + Returns: + List of possible log types sorted by confidence score. + + Note: + Confidence scores are provided by the API as guidance only and + may not always accurately reflect classification certainty. + Use scores for relative ranking rather than absolute confidence. + + Raises: + SecOpsError: If log_data is empty or not a string. + APIError: If the API request fails. + """ + return _classify_logs(client=self, log_data=log_data) + def ingest_udm( self, udm_events: dict[str, Any] | list[dict[str, Any]], diff --git a/src/secops/chronicle/log_types.py b/src/secops/chronicle/log_types.py index 197f1b5..798d6a8 100644 --- a/src/secops/chronicle/log_types.py +++ b/src/secops/chronicle/log_types.py @@ -20,8 +20,11 @@ product or vendor. """ +import base64 from typing import TYPE_CHECKING, Any +from secops.exceptions import APIError, SecOpsError + if TYPE_CHECKING: from secops.chronicle.client import ChronicleClient @@ -238,3 +241,51 @@ def search_log_types( results.append(log_type_data) return results + + +def classify_logs( + client: "ChronicleClient", + log_data: str, +) -> list[dict[str, Any]]: + """Classify a raw log to predict its log type. + + Args: + client: ChronicleClient instance. + log_data: Raw log string. + + Returns: + List of possible log types sorted by confidence score. + Example: + [ + {"logType": "OKTA", "score": 0.95}, + {"logType": "ONELOGIN", "score": 0.03} + ] + + Note: + Confidence scores are provided by the API as guidance only and + may not always accurately reflect classification certainty. + Use scores for relative ranking rather than absolute confidence. + + Raises: + SecOpsError: If log_data is empty or not a string. + APIError: If the API request fails. + """ + + if not log_data: + raise SecOpsError("log data cannot be empty") + + if not isinstance(log_data, str): + raise SecOpsError("log data must be a string") + + url = f"{client.base_url}/{client.instance_id}/logs:classify" + + encoded_log = base64.b64encode(log_data.encode("utf-8")).decode("utf-8") + payload = {"logData": [encoded_log]} + + response = client.session.post(url, json=payload) + + if response.status_code != 200: + raise APIError(f"Failed to classify log: {response.text}") + + data = response.json() + return data.get("predictions", []) diff --git a/src/secops/cli/commands/log.py b/src/secops/cli/commands/log.py index 0b8890c..aca5584 100644 --- a/src/secops/cli/commands/log.py +++ b/src/secops/cli/commands/log.py @@ -18,6 +18,7 @@ import sys from secops.cli.utils.formatters import output_formatter +from secops.cli.utils.input_utils import load_string_or_file def setup_log_command(subparsers): @@ -108,6 +109,16 @@ def setup_log_command(subparsers): func=handle_generate_udm_mapping_command ) + classify_parser = log_subparsers.add_parser( + "classify", help="Classify raw log to predict log type" + ) + classify_parser.add_argument( + "--log", + required=True, + help="Raw log content as a string or file path", + ) + classify_parser.set_defaults(func=handle_log_classify_command) + def handle_log_ingest_command(args, chronicle): """Handle log ingestion command.""" @@ -220,3 +231,21 @@ def handle_generate_udm_mapping_command(args, chronicle): except Exception as e: # pylint: disable=broad-exception-caught print(f"Error: {e}", file=sys.stderr) sys.exit(1) + + +def handle_log_classify_command(args, chronicle): + """Handle log classification command.""" + try: + log_data = load_string_or_file(args.log) + + print( + "Note: Confidence scores are for relative ranking, " + "not absolute certainty.\n", + file=sys.stderr, + ) + + result = chronicle.classify_logs(log_data=log_data) + output_formatter(result, args.output) + except Exception as e: # pylint: disable=broad-exception-caught + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/tests/chronicle/test_classify_log_integration.py b/tests/chronicle/test_classify_log_integration.py new file mode 100644 index 0000000..2c4fa30 --- /dev/null +++ b/tests/chronicle/test_classify_log_integration.py @@ -0,0 +1,74 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle log classification functionality.""" +import json +import pytest + +from secops import SecOpsClient +from secops.exceptions import APIError, SecOpsError +from ..config import CHRONICLE_CONFIG, SERVICE_ACCOUNT_JSON + + +@pytest.mark.integration +def test_classify_multiple_log_types(): + """Test classifying different log types in a single test workflow. + + This test demonstrates the workflow of classifying various log formats + and comparing their predictions. + """ + client = SecOpsClient(service_account_info=SERVICE_ACCOUNT_JSON) + chronicle = client.chronicle(**CHRONICLE_CONFIG) + + log_samples = { + "OKTA": json.dumps( + { + "eventType": "user.session.start", + "displayMessage": "User login to Okta", + "actor": {"alternateId": "user@example.com"}, + "outcome": {"result": "SUCCESS"}, + } + ), + "Windows": "4624", + "AWS_CloudTrail": json.dumps( + { + "eventName": "GetObject", + "eventSource": "s3.amazonaws.com", + "userIdentity": {"type": "IAMUser"}, + } + ), + } + + try: + results = {} + + for log_name, log_data in log_samples.items(): + print(f"\nClassifying {log_name} log...") + result = chronicle.classify_logs(log_data=log_data) + + assert isinstance(result, list) + results[log_name] = result + + if len(result) > 0: + print(f"Top prediction: {result[0]['logType']}") + print(f"Score: {result[0]['score']}") + + print(f"\nSuccessfully classified {len(results)} log types") + assert len(results) == 3 + + except APIError as e: + print(f"\nAPI Error details: {str(e)}") + if "permission" in str(e).lower(): + pytest.skip("Insufficient permissions to classify logs") + raise diff --git a/tests/chronicle/test_log_types.py b/tests/chronicle/test_log_types.py index 461be2c..c9e9a9a 100644 --- a/tests/chronicle/test_log_types.py +++ b/tests/chronicle/test_log_types.py @@ -20,12 +20,14 @@ from secops.chronicle import log_types from secops.chronicle.log_types import ( + classify_logs, get_all_log_types, get_log_type_description, is_valid_log_type, load_log_types, search_log_types, ) +from secops.exceptions import APIError, SecOpsError @pytest.fixture @@ -415,3 +417,150 @@ def test_api_response_missing_fields(mock_chronicle_client): for log_type in result if log_type.get("name") ) + + +def test_classify_logs_success(mock_chronicle_client): + """Test successful log classification.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "predictions": [ + {"logType": "OKTA", "score": 0.95}, + {"logType": "ONELOGIN", "score": 0.03}, + ] + } + mock_chronicle_client.session.post.return_value = mock_response + + log_data = '{"eventType": "user.session.start"}' + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert isinstance(result, list) + assert len(result) == 2 + assert result[0]["logType"] == "OKTA" + assert result[0]["score"] == 0.95 + assert result[1]["logType"] == "ONELOGIN" + assert result[1]["score"] == 0.03 + + mock_chronicle_client.session.post.assert_called_once() + call_args = mock_chronicle_client.session.post.call_args + assert "logs:classify" in call_args[0][0] + assert "logData" in call_args[1]["json"] + + +def test_classify_logs_empty_predictions(mock_chronicle_client): + """Test classification with empty predictions.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"predictions": []} + mock_chronicle_client.session.post.return_value = mock_response + + log_data = "unknown log format" + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert isinstance(result, list) + assert len(result) == 0 + + +def test_classify_logs_missing_predictions_key(mock_chronicle_client): + """Test classification when API response missing predictions key.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + mock_chronicle_client.session.post.return_value = mock_response + + log_data = "test log" + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert isinstance(result, list) + assert len(result) == 0 + + +def test_classify_logs_empty_log_data(mock_chronicle_client): + """Test classification with empty log data.""" + with pytest.raises(SecOpsError, match="log data cannot be empty"): + classify_logs(client=mock_chronicle_client, log_data="") + + mock_chronicle_client.session.post.assert_not_called() + + +def test_classify_logs_none_log_data(mock_chronicle_client): + """Test classification with None log data.""" + with pytest.raises(SecOpsError, match="log data cannot be empty"): + classify_logs(client=mock_chronicle_client, log_data=None) + + mock_chronicle_client.session.post.assert_not_called() + + +def test_classify_logs_non_string_log_data(mock_chronicle_client): + """Test classification with non-string log data.""" + with pytest.raises(SecOpsError, match="log data must be a string"): + classify_logs(client=mock_chronicle_client, log_data=123) + + mock_chronicle_client.session.post.assert_not_called() + + with pytest.raises(SecOpsError, match="log data must be a string"): + classify_logs(client=mock_chronicle_client, log_data=["log"]) + + with pytest.raises(SecOpsError, match="log data must be a string"): + classify_logs(client=mock_chronicle_client, log_data={"log": "data"}) + + +def test_classify_logs_api_error(mock_chronicle_client): + """Test classification with API error response.""" + mock_response = Mock() + mock_response.status_code = 400 + mock_response.text = "Invalid request" + mock_chronicle_client.session.post.return_value = mock_response + + log_data = "test log" + with pytest.raises(APIError, match="Failed to classify log"): + classify_logs(client=mock_chronicle_client, log_data=log_data) + + +def test_classify_logs_special_characters(mock_chronicle_client): + """Test classification with special characters in log data.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "predictions": [{"logType": "WINDOWS", "score": 0.88}] + } + mock_chronicle_client.session.post.return_value = mock_response + + log_data = "\n \n 4624\n \n" + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert len(result) == 1 + assert result[0]["logType"] == "WINDOWS" + + +def test_classify_logs_unicode_characters(mock_chronicle_client): + """Test classification with unicode characters.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "predictions": [{"logType": "CUSTOM", "score": 0.75}] + } + mock_chronicle_client.session.post.return_value = mock_response + + log_data = '{"user": "测试用户", "message": "Événement système"}' + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert len(result) == 1 + assert result[0]["logType"] == "CUSTOM" + mock_chronicle_client.session.post.assert_called_once() + + +def test_classify_logs_large_log(mock_chronicle_client): + """Test classification with large log data.""" + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "predictions": [{"logType": "AWS_CLOUDTRAIL", "score": 0.92}] + } + mock_chronicle_client.session.post.return_value = mock_response + + log_data = '{"eventName": "GetObject"}' * 1000 + result = classify_logs(client=mock_chronicle_client, log_data=log_data) + + assert len(result) == 1 + assert result[0]["logType"] == "AWS_CLOUDTRAIL" diff --git a/tests/cli/test_classify_log_integration.py b/tests/cli/test_classify_log_integration.py new file mode 100644 index 0000000..153c3a2 --- /dev/null +++ b/tests/cli/test_classify_log_integration.py @@ -0,0 +1,154 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Integration tests for Chronicle log classification CLI functionality.""" +import json +import subprocess +import tempfile +import pytest +from pathlib import Path + + +@pytest.mark.integration +def test_cli_classify_windows_log_from_file(cli_env, common_args): + """Test classifying Windows XML log from file.""" + windows_log = """ + + + 4624 + 2 + 0 + 12544 + + 12345 + Security + DESKTOP-TEST + + + S-1-5-18 + SYSTEM + testuser + 2 + +""" + + with tempfile.NamedTemporaryFile( + mode="w", suffix=".xml", delete=False + ) as tmp_file: + tmp_file.write(windows_log) + tmp_file_path = tmp_file.name + + try: + cmd = ( + ["secops"] + + common_args + + ["log", "classify", "--log", tmp_file_path] + ) + + result = subprocess.run( + cmd, env=cli_env, capture_output=True, text=True + ) + + assert result.returncode == 0 + assert result.stdout.strip(), "Expected non-empty output" + + try: + output = json.loads(result.stdout.strip()) + assert isinstance(output, list) + if len(output) > 0: + assert "logType" in output[0] + assert "score" in output[0] + except json.JSONDecodeError: + pytest.fail(f"Expected JSON output, got: {result.stdout}") + + print(f"\nCLI Output:\n{result.stdout}") + + finally: + Path(tmp_file_path).unlink(missing_ok=True) + + +@pytest.mark.integration +def test_cli_classify_multiple_logs_workflow(cli_env, common_args): + """Test workflow of classifying multiple different log types. + + This test demonstrates the complete workflow of classifying various + log formats using both inline strings and files. + """ + test_logs = [ + { + "name": "OKTA", + "data": json.dumps( + { + "eventType": "user.session.start", + "actor": {"alternateId": "user@example.com"}, + } + ), + "use_file": False, + }, + { + "name": "Windows", + "data": "4624", + "use_file": True, + }, + ] + + results = [] + temp_files = [] + + try: + for log_info in test_logs: + print(f"\nClassifying {log_info['name']} log...") + + if log_info["use_file"]: + tmp_file = tempfile.NamedTemporaryFile( + mode="w", suffix=".log", delete=False + ) + tmp_file.write(log_info["data"]) + tmp_file.close() + temp_files.append(tmp_file.name) + log_arg = tmp_file.name + else: + log_arg = log_info["data"] + + cmd = ( + ["secops"] + common_args + ["log", "classify", "--log", log_arg] + ) + + result = subprocess.run( + cmd, env=cli_env, capture_output=True, text=True + ) + + assert result.returncode == 0 + results.append({"name": log_info["name"], "output": result.stdout}) + + print(f"\nSuccessfully classified {len(results)} log types via CLI") + assert len(results) == len(test_logs) + + for result in results: + assert result["output"].strip(), "Expected non-empty output" + try: + output = json.loads(result["output"].strip()) + assert isinstance(output, list) + if len(output) > 0: + assert "logType" in output[0] + assert "score" in output[0] + except json.JSONDecodeError: + pytest.fail( + f"Expected JSON output for {result['name']}, " + f"got: {result['output']}" + ) + + finally: + for temp_file in temp_files: + Path(temp_file).unlink(missing_ok=True)