diff --git a/linodecli/plugins/get_metrics.py b/linodecli/plugins/get_metrics.py new file mode 100644 index 00000000..ce36470e --- /dev/null +++ b/linodecli/plugins/get_metrics.py @@ -0,0 +1,457 @@ +""" +This plugin allows users to query metrics from the monitoring service for various services. + +""" + +import json +import os +import sys +from argparse import ArgumentParser +from typing import List, Optional + +import requests +import urllib3 + +from linodecli.exit_codes import ExitCodes +from linodecli.help_formatter import SortingHelpFormatter +from linodecli.helpers import register_debug_arg + +PLUGIN_BASE = "linode-cli get_metrics" + +# API Configuration +API_BASE_URL = "https://monitor-api.linode.com/v2/monitor/services" + + +def get_auth_token(): + """ + Get authentication token from JWE_TOKEN environment variable + Raises an error if the environment variable is not set + """ + token = os.getenv("JWE_TOKEN") + if not token: + raise ValueError( + "JWE_TOKEN environment variable is required but not set. " + "Please set it with: export JWE_TOKEN='your_token_here'" + ) + return token + + + +# Aggregate functions +AGGREGATE_FUNCTIONS = ["sum", "avg", "max", "min", "count"] + + +def make_api_request( + service_name: str, + endpoint: str, + method: str = "POST", + data: Optional[dict] = None, + token: Optional[str] = None, +) -> tuple: + """ + Make an API request to the monitoring service + + Args: + service_name: The service name (nodebalancer, netloadbalancer, etc.) + endpoint: The API endpoint to call + method: HTTP method + data: Request payload for POST requests + token: Bearer token for authentication + + Returns: + Tuple of (status_code, response_data) + """ + url = f"{API_BASE_URL}/{service_name}/{endpoint}" + + headers = { + "Authorization": f"Bearer {token or get_auth_token()}", + "Authentication-type": "jwe", + "Pragma": "akamai-x-get-extracted-values", + "Content-Type": "application/json", + } + + try: + if method.upper() == "POST": + response = requests.post( + url, headers=headers, json=data, timeout=30, verify=False + ) + else: + response = requests.get( + url, headers=headers, timeout=30, verify=False + ) + + # Try to parse JSON response, fallback to text if it fails + try: + response_data = response.json() if response.content else {} + except json.JSONDecodeError: + response_data = {"error": f"Non-JSON response: {response.text}"} + + return response.status_code, response_data + except requests.exceptions.RequestException as e: + print(f"Error making API request: {e}", file=sys.stderr) + return 500, {"error": str(e)} + + +def get_metrics( + service_name: str, + entity_ids: List, + duration: Optional[int], + duration_unit: Optional[str], + start_time: Optional[str], + end_time: Optional[str], + metrics: List[str], + granularity: Optional[int], + granularity_unit: Optional[str], + filters: Optional[List[str]] = None, + group_by: Optional[List[str]] = None, + entity_region: Optional[str] = None, + associated_entity_region: Optional[str] = None, + token: Optional[str] = None, +): + """ + Get metrics for specified service entities + """ + + # Parse metrics with mandatory aggregate functions + parsed_metrics = [] + for metric in metrics: + if ":" in metric: + metric_name, agg_func = metric.split(":", 1) + parsed_metrics.append( + {"aggregate_function": agg_func, "name": metric_name.strip()} + ) + else: + # No aggregate function specified - this is an error + print( + f"Aggregate function required for metric '{metric}'", + file=sys.stderr, + ) + print( + f"Format: 'metric_name:function' where function is one of: {', '.join(AGGREGATE_FUNCTIONS)}", + file=sys.stderr, + ) + sys.exit(ExitCodes.REQUEST_FAILED) + + # Build request payload + payload = {"metrics": parsed_metrics} + if entity_ids: + payload["entity_ids"] = entity_ids + if group_by: + payload["group_by"] = group_by + if entity_region: + payload["entity_region"] = entity_region + if associated_entity_region: + payload["associated_entity_region"] = associated_entity_region + + # Add time duration - either relative or absolute + if start_time and end_time: + payload["absolute_time_duration"] = { + "start": start_time, + "end": end_time, + } + elif duration is not None and duration_unit is not None: + payload["relative_time_duration"] = { + "unit": duration_unit, + "value": duration, + } + else: + print( + "Either (--duration and --duration-unit) or (--start-time and --end-time) must be provided", + file=sys.stderr, + ) + sys.exit(ExitCodes.REQUEST_FAILED) + + # Add time_granularity only if both granularity and granularity_unit are provided + if granularity is not None and granularity_unit is not None: + payload["time_granularity"] = { + "unit": granularity_unit, + "value": granularity, + } + + # Add filters if provided + if filters: + parsed_filters = [] + for filter_str in filters: + parts = filter_str.split( + ":", 2 + ) # Split into max 3 parts: dimension, operator, value + if len(parts) != 3: + print( + f"Invalid filter format: '{filter_str}'. Expected format: 'dimension:operator:value'", + file=sys.stderr, + ) + sys.exit(ExitCodes.REQUEST_FAILED) + + dimension_label, operator, value = parts + parsed_filters.append( + { + "dimension_label": dimension_label.strip(), + "operator": operator.strip(), + "value": value.strip(), + } + ) + + payload["filters"] = parsed_filters + + if entity_ids: + print(f"Fetching metrics for {service_name} entities: {entity_ids}") + else: + print(f"Fetching metrics for {service_name} (all entities)") + print(f"Request payload: {json.dumps(payload, indent=2)}") + try: + status, response = make_api_request( + service_name, "metrics", "POST", payload, token + ) + except ValueError as e: + print(f"Authentication Error: {e}", file=sys.stderr) + sys.exit(ExitCodes.REQUEST_FAILED) + + if status != 200: + print(f"API request failed with status {status}", file=sys.stderr) + print( + f"Error response: {json.dumps(response, indent=2)}", file=sys.stderr + ) + print("Exiting due to API error...", file=sys.stderr) + sys.exit(ExitCodes.REQUEST_FAILED) + + print_metrics_response(response) + + +def print_metrics_response(data: dict): + """ + Print metrics data as formatted JSON + """ + if not data: + print("No response received") + return + + if data.get("status") == "success": + metrics_data = data.get("data", {}).get("result", []) + stats = data.get("stats", {}) + + if not metrics_data: + print("No metrics data found for the specified parameters") + print(f"Execution time: {stats.get('executionTimeMsec', 0)}ms") + print(f"Series fetched: {stats.get('seriesFetched', 0)}") + else: + print(f"Series fetched: {stats.get('seriesFetched', 0)}") + print("\nMetrics Data:") + print(json.dumps(data.get("data"), indent=2)) + else: + print(f"API returned error status: {data.get('status', 'unknown')}") + if "error" in data: + print(f"Error: {data['error']}") + + +def print_help(parser: ArgumentParser): + """ + Print help information + """ + parser.print_help() + + print("\nExamples:") + print(" # Get metrics with relative time duration") + print( + " linode-cli get_metrics dbaas --entity-ids 123 --duration 15 --duration-unit min --metrics cpu_usage:avg" + ) + + print("\n # Get metrics for all entities (only allowed for objectstorage service)") + print( + " linode-cli get_metrics objectstorage --duration 15 --duration-unit min --metrics obj_requests_num:avg --entity-region us-east-1" + ) + + print("\n # Get metrics with absolute time duration") + print( + " linode-cli get_metrics dbaas --entity-ids 123 --start-time 2024-10-10T00:00:01Z --end-time 2024-10-10T23:59:59Z --metrics cpu_usage:avg,memory_usage:sum" + ) + + print("\n # Get metrics with filters") + print( + " linode-cli get_metrics dbaas --entity-ids 123 --duration 15 --duration-unit min --metrics cpu_usage:avg --filters 'node_type:in:primary,secondary'" + ) + + print("\n # Get metrics with multiple filters") + print( + " linode-cli get_metrics dbaas --entity-ids 123 --duration 15 --duration-unit min --metrics cpu_usage:avg --filters 'node_type:in:primary,secondary;status:eq:active'" + ) + + print("\n # Get metrics with granularity") + print( + " linode-cli get_metrics netloadbalancer --entity-ids 123 --duration 1 --duration-unit hour --metrics nlb_ingress_traffic:sum --granularity 10 --granularity-unit min" + ) + + print("\n # Get metrics with entity region (required ObjectStorage)") + print( + " linode-cli get_metrics objectstorage --entity-region us-east-1 --duration 15 --duration-unit min --metrics obj_requests_num:sum" + ) + + print("\n # Get metrics with associated entity region (mandatory for cloud firewall service)") + print( + " linode-cli get_metrics firewall --entity-region us-east-1 --associated-entity-region us-west-1 --duration 15 --duration-unit min --metrics fw_active_connections:sum" + ) + + +def get_metrics_parser(): + """ + Build argument parser for metrics plugin + """ + parser = ArgumentParser( + PLUGIN_BASE, add_help=False, formatter_class=SortingHelpFormatter + ) + + register_debug_arg(parser) + + # Service name as positional argument + parser.add_argument( + "service", + nargs="?", + help="Service name (Dbaas, Nodebalancer, NetLoadBalancer, Linode, Firewall, ObjectStorage, Blockstorage,LKE)", + ) + + # Optional arguments for get-metrics functionality + parser.add_argument( + "--entity-ids", + help="Comma-separated list of entity IDs (can be integers or strings depending on service type)", + required=False, + ) + + parser.add_argument( + "--entity-region", + help="Region for entities (required for services like ObjectStorage)", + required=False, + ) + + parser.add_argument( + "--associated-entity-region", + help="Associated region for entities (Required for cloud firewall service)", + required=False, + ) + + # Time duration options - either relative or absolute + parser.add_argument( + "--duration", + type=int, + help="Relative time duration to look back (e.g., 15 for 15 minutes)", + ) + parser.add_argument( + "--duration-unit", help="Unit for relative duration: min, hr, day" + ) + parser.add_argument( + "--start-time", + help="Absolute start time (ISO format: 2024-10-10T00:00:01Z)", + ) + parser.add_argument( + "--end-time", + help="Absolute end time (ISO format: 2024-10-10T23:59:59Z)", + ) + + parser.add_argument( + "--metrics", + help="Comma-separated list of metrics with mandatory aggregate functions. Format: 'metric1:function1,metric2:function2' (e.g., 'cpu_usage:avg,memory_usage:sum')", + ) + parser.add_argument( + "--granularity", + type=int, + help="Time granularity for data points (optional)", + ) + parser.add_argument( + "--granularity-unit", + help="Unit for granularity: min, hr, day (optional)", + ) + parser.add_argument( + "--filters", + help="Optional filters in format 'dimension:operator:value'. Multiple filters separated by semicolons. Example: 'node_type:in:primary,secondary;status:eq:active'", + ) + parser.add_argument( + "--group_by", + help="Comma-separated list of fields to group by (default: entity_id)", + ) + + return parser + + +def call(args, context): + """ + The entrypoint for this plugin + """ + parser = get_metrics_parser() + parsed, remaining_args = parser.parse_known_args(args) + + # Handle help cases + if not parsed.service or parsed.service == "help" or "--help" in args: + print_help(parser) + sys.exit(ExitCodes.SUCCESS) + + if remaining_args: + print(f"Unknown arguments: {' '.join(remaining_args)}", file=sys.stderr) + print_help(parser) + sys.exit(ExitCodes.REQUEST_FAILED) + + # Validate required arguments for get-metrics functionality + if not parsed.metrics: + print( + "Missing required arguments for metrics retrieval:", file=sys.stderr + ) + print(" --metrics: required", file=sys.stderr) + print_help(parser) + sys.exit(ExitCodes.REQUEST_FAILED) + + # Validate time duration arguments - either relative or absolute required + has_relative = ( + parsed.duration is not None and parsed.duration_unit is not None + ) + has_absolute = parsed.start_time is not None and parsed.end_time is not None + + if not has_relative and not has_absolute: + print("Time duration required:", file=sys.stderr) + print(" Either: --duration and --duration-unit", file=sys.stderr) + print(" Or: --start-time and --end-time", file=sys.stderr) + print_help(parser) + sys.exit(ExitCodes.REQUEST_FAILED) + + if has_relative and has_absolute: + print( + "Cannot specify both relative and absolute time duration", + file=sys.stderr, + ) + sys.exit(ExitCodes.REQUEST_FAILED) + + # Parse entity IDs (can be integers or strings depending on service type) + entity_ids = [] + if parsed.entity_ids: + for entity_id in parsed.entity_ids.split(","): + entity_id = entity_id.strip() + # Try to convert to int first, if that fails keep as string + try: + entity_ids.append(int(entity_id)) + except ValueError: + entity_ids.append(entity_id) + + # Parse metrics + metrics = [x.strip() for x in parsed.metrics.split(",")] + + # Parse group_by if provided + group_by = None + if parsed.group_by: + group_by = [x.strip() for x in parsed.group_by.split(",")] + + # Parse filters if provided + filters = None + if parsed.filters: + filters = [x.strip() for x in parsed.filters.split(";")] + + get_metrics( + service_name=parsed.service, + entity_ids=entity_ids, + duration=parsed.duration, + duration_unit=parsed.duration_unit, + start_time=parsed.start_time, + end_time=parsed.end_time, + metrics=metrics, + granularity=parsed.granularity, + granularity_unit=parsed.granularity_unit, + filters=filters, + group_by=group_by, + entity_region=parsed.entity_region, + associated_entity_region=parsed.associated_entity_region, + ) diff --git a/tests/integration/monitor/test_plugin_get_metrics.py b/tests/integration/monitor/test_plugin_get_metrics.py new file mode 100644 index 00000000..53722059 --- /dev/null +++ b/tests/integration/monitor/test_plugin_get_metrics.py @@ -0,0 +1,339 @@ +""" +Integration tests for the get_metrics plugin +""" + +import json +import os + +import pytest + +from tests.integration.helpers import ( + exec_failing_test_command, + exec_test_command, + get_random_text, +) +from linodecli.exit_codes import ExitCodes + +# Base command for get_metrics plugin +BASE_CMD = ["linode-cli", "get_metrics"] + + +""" +Integration tests for the get_metrics plugin +""" + +import os + +import pytest + +from tests.integration.helpers import ( + exec_failing_test_command, + exec_test_command, +) +from linodecli.exit_codes import ExitCodes + +# Base command for get_metrics plugin +BASE_CMD = ["linode-cli", "get_metrics"] + + +def test_missing_required_args(): + """Test error handling for missing required arguments""" + # Missing entity-ids + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + # Missing metrics + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + # Missing duration and time parameters + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_invalid_service(): + """Test error handling for invalid service name""" + exec_failing_test_command( + BASE_CMD + [ + "invalid_service", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_invalid_aggregate_function(): + """Test error handling for metrics without aggregate functions""" + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--metrics", "cpu_usage", # Missing :avg + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_invalid_duration_unit(): + """Test handling of invalid duration unit""" + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "invalid_unit" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_conflicting_time_params(): + """Test handling of conflicting time parameters""" + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min", + "--start-time", "2025-12-22T00:00:00Z", + "--end-time", "2025-12-22T12:00:00Z" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +@pytest.mark.skipif( + not os.getenv('JWE_TOKEN'), + reason="JWE_TOKEN environment variable required for monitoring tests" +) +@pytest.mark.smoke +def test_nodebalancer_metrics_basic(): + """Test get_metrics with nodebalancer service (with authentication)""" + # Use a non-existent entity ID to avoid side effects + # This will test the complete command flow but fail gracefully + try: + output = exec_test_command(BASE_CMD + [ + "nodebalancer", + "--entity-ids", "999999", + "--metrics", "nb_ingress_traffic_rate:sum", + "--duration", "15", + "--duration-unit", "min" + ]) + + # If it succeeds, check for expected output structure + assert "Fetching metrics" in output or "data" in output.lower() + + except RuntimeError as e: + # Expected to fail with entity not found or similar API error + # Ensure it's not a command structure error + error_output = str(e) + assert "API request failed" in error_output or "entity" in error_output.lower() + # Should not be argument parsing errors + assert "unrecognized arguments" not in error_output + assert "invalid choice" not in error_output + + +@pytest.mark.skipif( + not os.getenv('JWE_TOKEN'), + reason="JWE_TOKEN environment variable required for monitoring tests" +) +def test_dbaas_metrics_with_filters(): + """Test get_metrics with dbaas service and filters""" + try: + output = exec_test_command(BASE_CMD + [ + "dbaas", + "--entity-ids", "999999", + "--metrics", "cpu_usage:avg,memory_usage:max", + "--duration", "30", + "--duration-unit", "min", + "--filters", "node_type:in:primary,secondary", + "--group-by", "entity_id,node_type" + ]) + + assert "Fetching metrics" in output or "data" in output.lower() + + except RuntimeError as e: + error_output = str(e) + assert "API request failed" in error_output or "entity" in error_output.lower() + assert "unrecognized arguments" not in error_output + + +@pytest.mark.skipif( + not os.getenv('JWE_TOKEN'), + reason="JWE_TOKEN environment variable required for monitoring tests" +) +def test_absolute_time_metrics(): + """Test get_metrics with absolute time range""" + try: + output = exec_test_command(BASE_CMD + [ + "linodes", + "--entity-ids", "999999", + "--metrics", "cpu_percent:avg", + "--start-time", "2025-12-22T00:00:00Z", + "--end-time", "2025-12-22T12:00:00Z", + "--granularity", "5", + "--granularity-unit", "min" + ]) + + assert "Fetching metrics" in output or "data" in output.lower() + + except RuntimeError as e: + error_output = str(e) + assert "API request failed" in error_output or "entity" in error_output.lower() + assert "unrecognized arguments" not in error_output + + +@pytest.mark.skipif( + not os.getenv('JWE_TOKEN'), + reason="JWE_TOKEN environment variable required for monitoring tests" +) +def test_multiple_entity_ids(): + """Test get_metrics with multiple entity IDs""" + try: + output = exec_test_command(BASE_CMD + [ + "nodebalancer", + "--entity-ids", "999999,888888,777777", + "--metrics", "nb_ingress_traffic_rate:sum,nb_egress_traffic_rate:avg", + "--duration", "1", + "--duration-unit", "hr", + "--granularity", "15", + "--granularity-unit", "min" + ]) + + assert "Fetching metrics" in output or "data" in output.lower() + + except RuntimeError as e: + error_output = str(e) + assert "API request failed" in error_output or "entity" in error_output.lower() + assert "unrecognized arguments" not in error_output + + +@pytest.mark.skipif( + not os.getenv('JWE_TOKEN'), + reason="JWE_TOKEN environment variable required for monitoring tests" +) +def test_complex_filters(): + """Test get_metrics with complex filter combinations""" + try: + output = exec_test_command(BASE_CMD + [ + "dbaas", + "--entity-ids", "999999", + "--metrics", "cpu_usage:avg,memory_usage:avg,connections:count", + "--duration", "2", + "--duration-unit", "hr", + "--filters", "node_type:in:primary,secondary;status:eq:active;environment:ne:test", + "--group-by", "entity_id,node_type,environment", + "--granularity", "30", + "--granularity-unit", "min" + ]) + + assert "Fetching metrics" in output or "data" in output.lower() + + except RuntimeError as e: + error_output = str(e) + assert "API request failed" in error_output or "entity" in error_output.lower() + assert "unrecognized arguments" not in error_output + + +def test_missing_token_error(): + """Test error handling when JWE_TOKEN is missing""" + # Temporarily remove token + original_token = os.getenv('JWE_TOKEN') + if 'JWE_TOKEN' in os.environ: + del os.environ['JWE_TOKEN'] + + try: + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + finally: + # Restore token + if original_token: + os.environ['JWE_TOKEN'] = original_token + + +def test_empty_entity_ids(): + """Test handling of empty entity IDs""" + exec_failing_test_command( + BASE_CMD + [ + "nodebalancer", + "--entity-ids", "", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_malformed_filters(): + """Test handling of malformed filter syntax""" + exec_failing_test_command( + BASE_CMD + [ + "dbaas", + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min", + "--filters", "invalid_filter_format" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + + +def test_service_validation(): + """Test that valid services are recognized correctly""" + valid_services = ["nodebalancer", "netloadbalancer", "linodes", "dbaas"] + + for service in valid_services: + # This should fail due to missing authentication, not service validation + try: + exec_failing_test_command( + BASE_CMD + [ + service, + "--entity-ids", "123", + "--metrics", "cpu_usage:avg", + "--duration", "15", + "--duration-unit", "min" + ], + expected_code=ExitCodes.REQUEST_FAILED + ) + except AssertionError as e: + # If it fails with wrong exit code, check it's not service validation error + error_msg = str(e).lower() + assert "invalid choice" not in error_msg + assert f"invalid choice: '{service}'" not in error_msg diff --git a/tests/unit/test_plugin_get_metrics.py b/tests/unit/test_plugin_get_metrics.py new file mode 100644 index 00000000..afd09a12 --- /dev/null +++ b/tests/unit/test_plugin_get_metrics.py @@ -0,0 +1,251 @@ +""" +Unit tests for the get_metrics plugin +""" + +import json +import os +from unittest.mock import MagicMock, Mock, patch + +import pytest +from pytest import CaptureFixture + +from linodecli.plugins.get_metrics import ( + call, + get_auth_token, + get_metrics, + get_metrics_parser, + make_api_request, + print_metrics_response, +) + + +class TestAuthToken: + """Test authentication token handling""" + + def test_get_auth_token_success(self): + """Test successful token retrieval from environment""" + with patch.dict(os.environ, {"JWE_TOKEN": "test_token"}): + token = get_auth_token() + assert token == "test_token" + + def test_get_auth_token_missing(self): + """Test error when token is missing""" + with patch.dict(os.environ, {}, clear=True): + with pytest.raises(ValueError) as excinfo: + get_auth_token() + assert "JWE_TOKEN environment variable is required" in str( + excinfo.value + ) + + +class TestAPIRequest: + """Test API request functionality""" + + @patch("linodecli.plugins.get_metrics.requests.post") + def test_make_api_request_success(self, mock_post): + """Test successful API request""" + # Mock successful response + mock_response = Mock() + mock_response.status_code = 200 + mock_response.content = b'{"data": {"test": "data"}}' + mock_response.json.return_value = {"data": {"test": "data"}} + mock_post.return_value = mock_response + + status_code, result = make_api_request( + "nodebalancer", "metrics", "POST", {"test": "data"}, "test_token" + ) + + assert status_code == 200 + assert result == {"data": {"test": "data"}} + mock_post.assert_called_once() + + @patch("linodecli.plugins.get_metrics.requests.post") + def test_make_api_request_http_error(self, mock_post): + """Test API request with HTTP error""" + mock_response = Mock() + mock_response.status_code = 401 + mock_response.content = b"Unauthorized" + mock_response.json.return_value = {"error": "Unauthorized"} + mock_post.return_value = mock_response + + status_code, result = make_api_request( + "nodebalancer", "metrics", "POST", {}, "test_token" + ) + assert status_code == 401 + + +class TestGetMetrics: + """Test get_metrics function""" + + @patch("linodecli.plugins.get_metrics.print_metrics_response") + @patch("linodecli.plugins.get_metrics.make_api_request") + def test_get_metrics_relative_time(self, mock_api_request, mock_print): + """Test get_metrics with relative time duration""" + mock_api_request.return_value = (200, {"data": {"test": "data"}}) + + # get_metrics doesn't return data, it calls print_metrics_response + get_metrics( + service_name="nodebalancer", + entity_ids=[123, 456], + duration=15, + duration_unit="min", + start_time=None, + end_time=None, + metrics=["cpu_usage:avg"], + granularity=None, + granularity_unit=None, + token="test_token", + ) + + # Verify that print_metrics_response was called with the response + mock_print.assert_called_once_with({"data": {"test": "data"}}) + mock_api_request.assert_called_once() + + @patch("linodecli.plugins.get_metrics.print_metrics_response") + @patch("linodecli.plugins.get_metrics.make_api_request") + def test_get_metrics_absolute_time(self, mock_api_request, mock_print): + """Test get_metrics with absolute time range""" + mock_api_request.return_value = (200, {"data": {"test": "data"}}) + + get_metrics( + service_name="dbaas", + entity_ids=[789], + duration=None, + duration_unit=None, + start_time="2025-12-22T00:00:00Z", + end_time="2025-12-22T12:00:00Z", + metrics=["memory_usage:max"], + granularity=None, + granularity_unit=None, + token="test_token", + ) + + # Verify print_metrics_response was called + mock_print.assert_called_once_with({"data": {"test": "data"}}) + + @patch("linodecli.plugins.get_metrics.print_metrics_response") + @patch("linodecli.plugins.get_metrics.make_api_request") + def test_get_metrics_with_filters_and_groupby( + self, mock_api_request, mock_print + ): + """Test get_metrics with filters and group_by""" + mock_api_request.return_value = (200, {"data": {"test": "data"}}) + + get_metrics( + service_name="dbaas", + entity_ids=[123], + duration=1, + duration_unit="hr", + start_time=None, + end_time=None, + metrics=["cpu_usage:avg"], + granularity=None, + granularity_unit=None, + filters=["node_type:in:primary,secondary", "status:eq:active"], + group_by=["entity_id", "node_type"], + token="test_token", + ) + + # Verify print_metrics_response was called + mock_print.assert_called_once_with({"data": {"test": "data"}}) + + @patch("linodecli.plugins.get_metrics.make_api_request") + @patch("builtins.print") + @patch("sys.exit") + def test_get_metrics_api_error( + self, mock_exit, mock_print, mock_api_request + ): + """Test get_metrics with API error response""" + mock_api_request.return_value = (401, {"error": "Unauthorized"}) + + get_metrics( + service_name="nodebalancer", + entity_ids=[123], + duration=15, + duration_unit="min", + start_time=None, + end_time=None, + metrics=["cpu_usage:avg"], + granularity=None, + granularity_unit=None, + token="test_token", + ) + + # Verify that sys.exit was called due to API error + mock_exit.assert_called_with(2) # ExitCodes.REQUEST_FAILED + + +class TestArgumentParsing: + """Test argument parsing""" + + def test_get_metrics_parser(self): + """Test parser creation""" + parser = get_metrics_parser() + + # Test that parser has all expected arguments + args = parser.parse_args( + [ + "nodebalancer", + "--entity-ids", + "123,456", + "--metrics", + "cpu_usage:avg,memory_usage:max", + "--duration", + "15", + "--duration-unit", + "min", + ] + ) + + assert args.service == "nodebalancer" + assert args.entity_ids == "123,456" + assert args.metrics == "cpu_usage:avg,memory_usage:max" + assert args.duration == 15 + assert args.duration_unit == "min" + + +class TestPrintResponse: + """Test response printing""" + + def test_print_metrics_response_success(self, capsys: CaptureFixture): + """Test metrics response printing for successful response""" + response_data = { + "status": "success", + "data": { + "result": [ + { + "entity_id": 123, + "cpu_usage": [ + {"timestamp": "2025-12-22T10:00:00Z", "value": 45.2} + ], + } + ] + }, + "stats": {"executionTimeMsec": 150, "seriesFetched": 1}, + } + + print_metrics_response(response_data) + captured = capsys.readouterr() + + # Verify success output + assert "✓ Success" in captured.out + assert "150ms" in captured.out + assert "Metrics Data:" in captured.out + + def test_print_metrics_response_error(self, capsys: CaptureFixture): + """Test metrics response printing for error response""" + response_data = {"status": "error", "error": "Invalid parameters"} + + print_metrics_response(response_data) + captured = capsys.readouterr() + + # Verify error output + assert "API returned error status: error" in captured.out + assert "Error: Invalid parameters" in captured.out + + def test_print_metrics_response_empty(self, capsys: CaptureFixture): + """Test metrics response printing for empty response""" + print_metrics_response({}) + captured = capsys.readouterr() + + assert "No response received" in captured.out