diff --git a/cmd/github-actions-manager/modules.go b/cmd/github-actions-manager/modules.go index d06d827..d57982c 100644 --- a/cmd/github-actions-manager/modules.go +++ b/cmd/github-actions-manager/modules.go @@ -62,7 +62,7 @@ func initModules(logger *zap.Logger, config *Config) ([]cmd.Module, error) { runners := runners.NewSynchronizer(logger, &config.GitHub.Runners, target, registry) modules = append(modules, runners) - jobs, err := jobs.NewSynchronizer(logger, &config.GitHub.Jobs, client, kv, registry) + jobs, err := jobs.NewSynchronizer(logger, &config.GitHub.Jobs, client, kv, registry, jobs.RealClock{}) if err != nil { return nil, fmt.Errorf("cannot setup job sync: %w", err) } diff --git a/go.mod b/go.mod index 4708dfe..f9f1960 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,17 @@ require ( github.com/BurntSushi/toml v1.1.0 github.com/Masterminds/sprig/v3 v3.2.2 github.com/bradleyfalzon/ghinstallation/v2 v2.0.4 + github.com/go-playground/assert/v2 v2.0.1 github.com/go-playground/validator/v10 v10.11.0 github.com/google/go-github/v45 v45.1.0 github.com/gorilla/mux v1.8.0 github.com/slack-go/slack v0.11.0 + github.com/stretchr/testify v1.7.0 go.uber.org/zap v1.21.0 golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/time v0.0.0-20220609170525-579cf78fd858 + gopkg.in/h2non/gock.v1 v1.1.2 k8s.io/api v0.24.2 k8s.io/apimachinery v0.24.2 k8s.io/client-go v0.24.2 @@ -22,7 +25,9 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/go.sum b/go.sum index 668b9a2..154d310 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,8 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= +github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -293,6 +295,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= +github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -738,6 +742,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/h2non/gock.v1 v1.1.2 h1:jBbHXgGBK/AoPVfJh5x4r/WxIrElvbLel8TCZkkZJoY= +gopkg.in/h2non/gock.v1 v1.1.2/go.mod h1:n7UGz/ckNChHiK05rDoiC4MYSunEC/lyaUm2WWaDva0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/pkg/github/jobs/clock.go b/pkg/github/jobs/clock.go new file mode 100644 index 0000000..5d00858 --- /dev/null +++ b/pkg/github/jobs/clock.go @@ -0,0 +1,18 @@ +package jobs + +import "time" + +type Clock interface { + Now() time.Time + After(d time.Duration) <-chan time.Time +} + +type RealClock struct{} + +func (RealClock) Now() time.Time { return time.Now() } +func (RealClock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +type TestClock struct{} + +func (TestClock) Now() time.Time { return time.Now() } +func (TestClock) After(d time.Duration) <-chan time.Time { return time.After(0) } diff --git a/pkg/github/jobs/synchronizer.go b/pkg/github/jobs/synchronizer.go index 3240744..db2ab32 100644 --- a/pkg/github/jobs/synchronizer.go +++ b/pkg/github/jobs/synchronizer.go @@ -7,7 +7,6 @@ import ( "net/http" "strconv" "strings" - "time" gh "github.com/oursky/github-actions-manager/pkg/github" "github.com/oursky/github-actions-manager/pkg/kv" @@ -26,23 +25,32 @@ type Synchronizer struct { github *github.Client kv kv.Store - state *channels.Broadcaster[*State] - metrics *metrics + state *channels.Broadcaster[*State] + metrics *metrics + webhookRuns chan webhookObject[*github.WorkflowRun] + webhookJobs chan webhookObject[*github.WorkflowJob] + clock Clock } -func NewSynchronizer(logger *zap.Logger, config *Config, client *http.Client, kv kv.Store, registry *prometheus.Registry) (*Synchronizer, error) { +func NewSynchronizer(logger *zap.Logger, config *Config, client *http.Client, kv kv.Store, registry *prometheus.Registry, clock Clock) (*Synchronizer, error) { logger = logger.Named("jobs-sync") server := newWebhookServer(logger, config.GetWebhookServerAddr(), config.WebhookSecret) + runs := make(chan webhookObject[*github.WorkflowRun]) + jobs := make(chan webhookObject[*github.WorkflowJob]) + return &Synchronizer{ - logger: logger, - config: config, - server: server, - github: github.NewClient(client), - kv: kv, - state: channels.NewBroadcaster[*State](nil), - metrics: newMetrics(registry), + logger: logger, + config: config, + server: server, + github: github.NewClient(client), + kv: kv, + state: channels.NewBroadcaster[*State](nil), + metrics: newMetrics(registry), + webhookRuns: runs, + webhookJobs: jobs, + clock: clock, }, nil } @@ -51,14 +59,11 @@ func (s *Synchronizer) Start(ctx context.Context, g *errgroup.Group) error { return nil } - runs := make(chan webhookObject[*github.WorkflowRun]) - jobs := make(chan webhookObject[*github.WorkflowJob]) - - if err := s.server.Start(ctx, g, runs, jobs); err != nil { + if err := s.server.Start(ctx, g, s.webhookRuns, s.webhookJobs); err != nil { return fmt.Errorf("jobs: %w", err) } g.Go(func() error { - s.run(ctx, runs, jobs) + s.run(ctx, s.webhookRuns, s.webhookJobs) return nil }) return nil @@ -71,8 +76,7 @@ func (s *Synchronizer) State() *channels.Broadcaster[*State] { func (s *Synchronizer) run( ctx context.Context, webhookRuns <-chan webhookObject[*github.WorkflowRun], - webhookJobs <-chan webhookObject[*github.WorkflowJob], -) { + webhookJobs <-chan webhookObject[*github.WorkflowJob]) { runs := make(map[Key]cell[github.WorkflowRun]) jobs := make(map[Key]cell[github.WorkflowJob]) @@ -87,13 +91,13 @@ func (s *Synchronizer) run( case run := <-webhookRuns: runs[run.Key] = cell[github.WorkflowRun]{ - UpdatedAt: time.Now(), + UpdatedAt: s.clock.Now(), Object: run.Object, } case job := <-webhookJobs: jobs[job.Key] = cell[github.WorkflowJob]{ - UpdatedAt: time.Now(), + UpdatedAt: s.clock.Now(), Object: job.Object, } @@ -110,10 +114,10 @@ func (s *Synchronizer) run( } runs[runKey] = cell[github.WorkflowRun]{ - UpdatedAt: time.Now(), + UpdatedAt: s.clock.Now(), Object: run, } - case <-time.After(syncInterval): + case <-s.clock.After(syncInterval): if len(runs) == 0 { continue } @@ -161,7 +165,7 @@ func (s *Synchronizer) run( } } - retentionLimit := time.Now().Add(-s.config.GetRetentionPeriod()) + retentionLimit := s.clock.Now().Add(-s.config.GetRetentionPeriod()) runRefs := make(map[Key]int) for key, job := range jobs { @@ -222,7 +226,7 @@ func (s *Synchronizer) loadState( continue } runs[Key{RepoOwner: owner, RepoName: repo, ID: id}] = cell[github.WorkflowRun]{ - UpdatedAt: time.Now(), + UpdatedAt: s.clock.Now(), Object: wrun, } @@ -236,7 +240,7 @@ func (s *Synchronizer) loadState( } for _, job := range wjobs.Jobs { jobs[Key{RepoOwner: owner, RepoName: repo, ID: job.GetID()}] = cell[github.WorkflowJob]{ - UpdatedAt: time.Now(), + UpdatedAt: s.clock.Now(), Object: job, } } diff --git a/pkg/github/jobs/synchronizer_test.go b/pkg/github/jobs/synchronizer_test.go new file mode 100644 index 0000000..9ce529b --- /dev/null +++ b/pkg/github/jobs/synchronizer_test.go @@ -0,0 +1,150 @@ +package jobs + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + "github.com/go-playground/assert/v2" + "github.com/google/go-github/v45/github" + "github.com/oursky/github-actions-manager/pkg/kv" + "github.com/oursky/github-actions-manager/pkg/utils/tomltypes" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "gopkg.in/h2non/gock.v1" +) + +func ptr[T any](v T) *T { + return &v +} +func TestRun(t *testing.T) { + + logger, _ := zap.NewProduction() + sync_page_size := 5 + webhook_server_addr := "127.0.0.1:8001" + config := &Config{ + Disabled: false, + ReplayEnabled: true, + RetentionPeriod: &tomltypes.Duration{1 * time.Hour}, + SyncInterval: &tomltypes.Duration{5 * time.Second}, + SyncPageSize: &sync_page_size, + WebhookServerAddr: &webhook_server_addr, + WebhookSecret: "testing", + } + + kv := kv.NewInMemoryStore() + registry := prometheus.NewRegistry() + client := &http.Client{Transport: &http.Transport{}} + gock.InterceptClient(client) + defer gock.Off() + + testGithubWorkflowRun := &github.WorkflowRun{ + ID: ptr(int64(0)), + Status: ptr("in_progress"), + Conclusion: ptr(""), + WorkflowID: ptr(int64(0)), + HeadCommit: &github.HeadCommit{}, + HeadRepository: &github.Repository{}, + } + + testCommitMsg := testGithubWorkflowRun.GetHeadCommit().GetMessage() + testCommitMsgTitle, _, _ := strings.Cut(testCommitMsg, "\n") + testCommitURL := testGithubWorkflowRun.GetHeadRepository().GetHTMLURL() + "/commit/" + testGithubWorkflowRun.GetHeadCommit().GetID() + + testWorkflowRun := &WorkflowRun{ + Key: Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"}, + + Name: testGithubWorkflowRun.GetName(), + URL: testGithubWorkflowRun.GetHTMLURL(), + Status: "completed", + Conclusion: "success", + + StartedAt: testGithubWorkflowRun.GetRunStartedAt().Time, + CommitMessageTitle: testCommitMsgTitle, + CommitURL: testCommitURL, + } + + testGithubWorkflowJob := &github.WorkflowJob{ + ID: ptr(int64(0)), + HTMLURL: ptr("testing"), + Status: ptr("in_progress"), + Conclusion: ptr(""), + } + + var startedAt *time.Time + if gt := testGithubWorkflowJob.GetStartedAt(); !gt.IsZero() { + startedAt = >.Time + } + + var completedAt *time.Time + if gt := testGithubWorkflowJob.GetCompletedAt(); !gt.IsZero() { + completedAt = >.Time + } + + testWorkflowJob := &WorkflowJob{ + Key: Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"}, + + Name: testGithubWorkflowJob.GetName(), + URL: testGithubWorkflowJob.GetHTMLURL(), + Status: "completed", + Conclusion: "success", + + StartedAt: startedAt, + CompletedAt: completedAt, + RunnerID: testGithubWorkflowJob.RunnerID, + RunnerName: testGithubWorkflowJob.RunnerName, + RunnerLabels: testGithubWorkflowJob.Labels, + } + + testWorkflowRun.Jobs = append(testWorkflowRun.Jobs, testWorkflowJob) + + testWebhookJob := NewWebhookObject( + Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"}, + testGithubWorkflowJob, + ) + + testWebhookRun := NewWebhookObject( + Key{ID: int64(0), RepoOwner: "tester", RepoName: "testing"}, + testGithubWorkflowRun, + ) + + testUpdatedGithubWorkflowJob := &github.WorkflowJob{ + ID: ptr(int64(0)), + HTMLURL: ptr("testing"), + Status: ptr("completed"), + Conclusion: ptr("success"), + } + + testUpdatedGithubWorkflowRun := &github.WorkflowRun{ + ID: ptr(int64(0)), + Status: ptr("completed"), + Conclusion: ptr("success"), + WorkflowID: ptr(int64(0)), + HeadCommit: &github.HeadCommit{}, + HeadRepository: &github.Repository{}, + } + + gock.New("https://api.github.com/repos/(.*)/(.*)/actions/jobs/(.*)"). + Persist(). + Reply(200). + JSON(testUpdatedGithubWorkflowJob) + + gock.New("https://api.github.com/repos/(.*)/(.*)/actions/runs/(.*)"). + Persist(). + Reply(200). + JSON(testUpdatedGithubWorkflowRun) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + g, ctx := errgroup.WithContext(ctx) + s, _ := NewSynchronizer(logger, config, client, kv, registry, TestClock{}) + s.Start(ctx, g) + s.webhookRuns <- testWebhookRun + s.webhookJobs <- testWebhookJob + time.Sleep(1 * time.Second) + assert.Equal(t, testWorkflowRun, s.metrics.state.WorkflowRuns[0]) + +} diff --git a/pkg/github/jobs/webhook_server.go b/pkg/github/jobs/webhook_server.go index ae636be..9c9ce32 100644 --- a/pkg/github/jobs/webhook_server.go +++ b/pkg/github/jobs/webhook_server.go @@ -112,3 +112,10 @@ func (s *webhookServer) handle( }) } } + +func NewWebhookObject[T any](key Key, object T) webhookObject[T] { + return webhookObject[T]{ + key, + object, + } +}