Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
/.release
/.tarballs
/vendor
cmd/alertmanager/alertmanager

!.golangci.yml
!/cli/testdata/*.yml
Expand Down
32 changes: 31 additions & 1 deletion cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func run() int {
allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool()
label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String()
featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String()

alertPersistenceFile = kingpin.Flag("storage.alert-persistence-file", "Alert persistence filename (in the base folder). If set, Alertmanager will persist alerts to this file on shutdown and restore them on startup.").Default("persisted-alerts.json.gz").String()
)

promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig)
Expand Down Expand Up @@ -347,7 +349,35 @@ func run() int {
logger.Error("error creating memory provider", "err", err)
return 1
}
defer alerts.Close()

// if alertPersistenceFile is set, we will use it to load persisted alerts
alertPersistenceFilePath := ""
if *alertPersistenceFile != "" {
alertPersistenceFilePath = filepath.Join(*dataDir, *alertPersistenceFile)
}

defer func() {
defer alerts.Close()

// if alertPersistenceFile is set, persist alerts
if alertPersistenceFilePath != "" {
if err := alerts.PersistAlerts(alertPersistenceFilePath); err != nil {
logger.Error("error persisting alerts", "file", alertPersistenceFilePath, "err", err)
return
}
logger.Info("persisted alerts to file", "file", alertPersistenceFilePath)
}
}()

// if alertPersistenceFile is set, we will use it to load persisted alerts
if alertPersistenceFilePath != "" {
loadStart := time.Now()
if err := alerts.LoadAlerts(alertPersistenceFilePath); err != nil {
logger.Error("error loading persisted alerts", "file", alertPersistenceFilePath, "err", err)
} else {
logger.Info("loaded alerts from file", "file", alertPersistenceFilePath, "duration", time.Since(loadStart))
}
}

var disp *dispatch.Dispatcher
defer func() {
Expand Down
22 changes: 8 additions & 14 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
d.mtx.Lock()
defer d.mtx.Unlock()

// store the configured repeat interval in the alert annotations
// so that it can be used by alert-handler to deduplicate alerts / notifications
if alert.Annotations == nil {
alert.Annotations = model.LabelSet{}
}
d.logger.Debug("Storing repeat interval in alert annotations", "alert_name", alert.Name(), "repeat_interval", route.RouteOpts.RepeatInterval.String())
alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String())

routeGroups, ok := d.aggrGroupsPerRoute[route]
if !ok {
routeGroups = map[model.Fingerprint]*aggrGroup{}
Expand Down Expand Up @@ -391,9 +399,6 @@ type aggrGroup struct {
done chan struct{}
next *time.Timer
timeout func(time.Duration) time.Duration

mtx sync.RWMutex
hasFlushed bool
}

// newAggrGroup returns a new aggregation group.
Expand Down Expand Up @@ -460,10 +465,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ctx = notify.WithRouteID(ctx, ag.routeID)

// Wait the configured interval before calling flush again.
ag.mtx.Lock()
ag.next.Reset(ag.opts.GroupInterval)
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
Expand All @@ -489,14 +491,6 @@ func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
ag.logger.Error("error on set alert", "err", err)
}

// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
}

func (ag *aggrGroup) empty() bool {
Expand Down
24 changes: 7 additions & 17 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,18 @@ func TestAggrGroup(t *testing.T) {

ag.stop()

// Add an alert that started more than group_interval in the past. We expect
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
// Set all alerts to be resolved. After successful notify the aggregation group
ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger())
go ag.run(ntfy)

ag.insert(a1)
ag.insert(a2)

// a2 lies way in the past so the initial group_wait should be skipped.
select {
case <-time.After(opts.GroupWait / 2):
t.Fatalf("expected immediate alert but received none")

case batch := <-alertsCh:
exp := removeEndsAt(types.AlertSlice{a1, a2})
sort.Sort(batch)
batch := <-alertsCh
exp := removeEndsAt(types.AlertSlice{a1, a2})
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alerts %v but got %v", exp, batch)
}

for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -241,7 +231,7 @@ func TestAggrGroup(t *testing.T) {
a1r := *a1
a1r.EndsAt = time.Now()
ag.insert(&a1r)
exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)
exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...)

select {
case <-time.After(2 * opts.GroupInterval):
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mdlayher/socket v0.4.1 // indirect
github.com/mdlayher/vsock v1.2.1 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -395,9 +396,11 @@ github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
Expand Down
14 changes: 14 additions & 0 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
return a, nil
}

func (a *Alerts) PersistAlerts(alertPersistenceFilePath string) error {
a.mtx.Lock()
defer a.mtx.Unlock()

return a.alerts.PersistAlerts(alertPersistenceFilePath)
}

func (a *Alerts) LoadAlerts(alertPersistenceFilePath string) error {
a.mtx.Lock()
defer a.mtx.Unlock()

return a.alerts.LoadAlerts(alertPersistenceFilePath)
}

func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
Expand Down
66 changes: 66 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@
package store

import (
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"os"
"sync"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"

"github.com/prometheus/alertmanager/types"
Expand Down Expand Up @@ -47,6 +52,67 @@ func NewAlerts() *Alerts {
return a
}

func (a *Alerts) PersistAlerts(alertPersistenceFilePath string) error {
a.Lock()
defer a.Unlock()

data, err := jsoniter.Marshal(a.c)
if err != nil {
return fmt.Errorf("error marshalling alerts to persistence file %s: %w", alertPersistenceFilePath, err)
}

// Create the file
file, err := os.OpenFile(alertPersistenceFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
return fmt.Errorf("error creating persistence file %s: %w", alertPersistenceFilePath, err)
}
defer file.Close()

// Write compressed data
gzipWriter := gzip.NewWriter(file)
if _, err := gzipWriter.Write(data); err != nil {
return fmt.Errorf("error writing compressed alerts to persistence file %s: %w", alertPersistenceFilePath, err)
}
if err := gzipWriter.Close(); err != nil {
return fmt.Errorf("error closing gzip writer for persistence file %s: %w", alertPersistenceFilePath, err)
}

return nil
}

func (a *Alerts) LoadAlerts(alertPersistenceFilePath string) error {
a.Lock()
defer a.Unlock()

// Open the file
file, err := os.Open(alertPersistenceFilePath)
if err != nil {
return fmt.Errorf("error opening alert persistence file %s: %w", alertPersistenceFilePath, err)
}
defer file.Close()

// Create gzip reader
gzipReader, err := gzip.NewReader(file)
if err != nil {
return fmt.Errorf("error creating gzip reader for persistence file %s: %w", alertPersistenceFilePath, err)
}

// Read decompressed data
data, err := io.ReadAll(gzipReader)
if err != nil {
return fmt.Errorf("error reading decompressed data from persistence file %s: %w", alertPersistenceFilePath, err)
}
if err := gzipReader.Close(); err != nil {
return fmt.Errorf("error closing gzip reader for persistence file %s: %w", alertPersistenceFilePath, err)
}

if err := jsoniter.Unmarshal(data, &a.c); err != nil {
return fmt.Errorf("error unmarshalling alerts from persistence file %s: %w", alertPersistenceFilePath, err)
}

return nil
}

// SetGCCallback sets a GC callback to be executed after each GC.
func (a *Alerts) SetGCCallback(cb func([]types.Alert)) {
a.Lock()
Expand Down
Loading