A lightweight Go application for capturing MQTT traffic from production brokers and saving it to JSONL files for later replay and analysis.
- Captures MQTT messages with full metadata (topic, payload, QoS, retained flag, timestamps)
- Saves to JSONL (JSON Lines) format for easy processing and replay
- Configurable via YAML file and environment variables
- Automatic reconnection with exponential backoff
- Graceful shutdown with message buffer flushing
- Real-time statistics reporting
- Support for binary payloads (base64 encoded)
- Configurable capture duration or continuous capture
- Non-blocking message handling for high-throughput scenarios
- Go 1.19 or later
- Access to an MQTT broker
cd mqtt-capture
go mod tidy
go build -o mqtt-captureThe application can be configured using a YAML file and/or environment variables.
Copy the example configuration file and edit it:
cp config.yaml.example config.yamlEdit config.yaml with your MQTT broker details and preferences.
Sensitive credentials should be provided via environment variables:
export MQTT_USERNAME="your_username"
export MQTT_PASSWORD="your_password"All configuration options can be overridden using environment variables with the prefix MQTTCAP_:
export MQTTCAP_MQTT_BROKER="mqtt.example.com"
export MQTTCAP_MQTT_PORT=1883
export MQTTCAP_CAPTURE_OUTPUT_FILE="./capture.jsonl"
export MQTTCAP_DURATION_SECONDS=3600| Section | Option | Description | Default |
|---|---|---|---|
| mqtt.broker | string | MQTT broker hostname or IP | Required |
| mqtt.port | int | MQTT broker port | 1883 |
| mqtt.client_id | string | Unique client identifier | Required |
| mqtt.username | string | MQTT username | "" |
| mqtt.password | string | MQTT password | "" |
| mqtt.use_tls | bool | Enable TLS/SSL | false |
| mqtt.keep_alive | int | Keep-alive interval (seconds) | 30 |
| mqtt.topics | []string | Topics to subscribe to | ["#"] |
| capture.output_file | string | Output JSONL file path | Required |
| capture.buffer_size | int | Message buffer size | 5000 |
| capture.flush_interval_s | int | Flush interval (seconds) | 5 |
| duration.seconds | int | Capture duration (0=unlimited) | 0 |
| duration.shutdown_timeout_s | int | Graceful shutdown timeout | 30 |
# Using default config.yaml in current directory
./mqtt-capture
# Using config.yaml in specific directory
./mqtt-capture -config /path/to/config/dir
# Show version
./mqtt-capture -version- Create your configuration:
mqtt:
broker: "production-mqtt.example.com"
port: 1883
client_id: "mqtt-capture-prod"
topics:
- "#" # Capture all topics
capture:
output_file: "./production_capture.jsonl"
buffer_size: 10000
duration:
seconds: 3600 # 1 hour- Set credentials:
export MQTT_USERNAME="capture_user"
export MQTT_PASSWORD="secure_password"- Run the capture:
./mqtt-captureThe application will capture for 1 hour and then gracefully shutdown.
Set duration.seconds: 0 in config, then run:
./mqtt-capturePress Ctrl+C to stop gracefully.
Messages are saved in JSONL format (one JSON object per line). Each line contains:
{
"topic": "sensors/temperature",
"payload": "22.5",
"payload_format": "text",
"timestamp_unix_ms": 1699873245123,
"timestamp_iso": "2024-11-13T10:34:05.123Z",
"qos": 1,
"retained": false,
"duplicate": false,
"message_id": 12345,
"client_id": "mqtt-capture-001",
"capture_version": "1.0"
}- topic: MQTT topic
- payload: Message payload (text or base64 for binary)
- payload_format: "text" or "binary"
- timestamp_unix_ms: Capture timestamp in milliseconds since epoch
- timestamp_iso: ISO 8601 formatted timestamp
- qos: Quality of Service (0, 1, or 2)
- retained: Retained message flag
- duplicate: Duplicate message flag
- message_id: MQTT message ID (for QoS 1 and 2)
- client_id: Capture client identifier
- capture_version: Format version for future compatibility
Count total messages:
wc -l production_capture.jsonlFilter by topic:
cat production_capture.jsonl | jq -c 'select(.topic | startswith("sensors/"))'Get unique topics:
cat production_capture.jsonl | jq -r '.topic' | sort -uCalculate messages per topic:
cat production_capture.jsonl | jq -r '.topic' | sort | uniq -c | sort -rnYou can use the captured data to replay messages on a different broker by writing a simple replay script that reads the JSONL file and publishes each message.
The application prints statistics every 10 seconds:
Stats: received=15234 written=15234 dropped=0 buffer=12.5% filesize=4523891 bytes
- received: Total messages received from broker
- written: Total messages written to file
- dropped: Messages dropped due to full buffer
- buffer: Current buffer usage percentage
- filesize: Current output file size
The application handles SIGINT (Ctrl+C) and SIGTERM gracefully:
- Stops accepting new messages
- Flushes all buffered messages to disk
- Disconnects from MQTT broker
- Closes output file
Default shutdown timeout is 30 seconds (configurable).
- Verify broker address and port
- Check firewall rules
- Verify credentials
- Try with
use_tls: falsefor testing
- Increase
buffer_size - Decrease
flush_interval_s - Check disk write speed
- Decrease
buffer_size - Decrease
max_message_mb - Use topic filters instead of
#wildcard
- Never commit credentials to version control
- Use environment variables for sensitive data
- Enable TLS in production (
use_tls: true) - Use strong, unique client IDs
- Limit topic subscriptions to necessary patterns
- Secure the output file with appropriate file permissions
Apache 2 License
Contributions are welcome! Please open an issue or submit a pull request.
For issues and questions, please open an issue on the GitHub repository.