-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
dos-protectionDenial of service protectionDenial of service protectionvulnerabilitySecurity vulnerabilitiesSecurity vulnerabilities
Milestone
Description
Security Issue
Signal handlers can be flooded causing denial of service. SIGHUP triggers database operations without rate limiting.
Location
tools/prometheus_exporter.py:276-283
def _handle_reload(self, signum, frame):
# No rate limiting!
self.collect_metrics() # Expensive operationAttack Vector
# DoS attack
while true; do kill -HUP $PID; doneSolution
1. Add Rate Limiting
import time
from collections import deque
class RateLimiter:
def __init__(self, max_calls=5, window_seconds=60):
self.max_calls = max_calls
self.window = window_seconds
self.calls = deque()
def allow(self):
now = time.time()
# Remove old calls outside window
while self.calls and self.calls[0] < now - self.window:
self.calls.popleft()
if len(self.calls) >= self.max_calls:
return False
self.calls.append(now)
return True
class MetricsCollector:
def __init__(self):
self.reload_limiter = RateLimiter(max_calls=3, window_seconds=60)
self.shutdown_limiter = RateLimiter(max_calls=1, window_seconds=10)
def _handle_reload(self, signum, frame):
if not self.reload_limiter.allow():
logger.warning(f"Rate limit exceeded for signal {signum}")
return
logger.info(f"Processing reload signal")
self.collect_metrics()2. Add Mutex for Critical Sections
import threading
class MetricsCollector:
def __init__(self):
self.collection_lock = threading.Lock()
self.collecting = False
def _handle_reload(self, signum, frame):
if self.collecting:
logger.warning("Collection already in progress")
return
with self.collection_lock:
self.collecting = True
try:
self.collect_metrics()
finally:
self.collecting = False3. Add Signal Queue
from queue import Queue
class SignalQueue:
def __init__(self, max_size=10):
self.queue = Queue(maxsize=max_size)
self.processing = False
def add_signal(self, signum):
try:
self.queue.put_nowait(signum)
except queue.Full:
logger.warning(f"Signal queue full, dropping signal {signum}")
def process_signals(self):
while not self.queue.empty():
signum = self.queue.get()
self._process_signal(signum)Validation
- Rate limiting prevents signal flooding
- Concurrent signals handled safely
- No deadlocks or race conditions
- Metrics remain accurate under load
Testing
def test_signal_flood():
collector = MetricsCollector()
# Send 100 signals rapidly
for _ in range(100):
os.kill(os.getpid(), signal.SIGHUP)
# Should only process rate limit amount
assert collector.reload_count <= 3Effort: 2 hours
References
- CWE-400: Uncontrolled Resource Consumption
- PR feat: Add comprehensive monitoring with Prometheus and Grafana #1
Metadata
Metadata
Assignees
Labels
dos-protectionDenial of service protectionDenial of service protectionvulnerabilitySecurity vulnerabilitiesSecurity vulnerabilities