diff --git a/benchmark_test.go b/benchmark_test.go index 49419ee..5d35d72 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -18,7 +18,7 @@ func benchmarkPublishWithXReceivers(count int, b *testing.B) { var id uint64 var values []string for { - id, values, _ = top.Receive(ctx, id) + id, values, _ = top.ReceiveSince(ctx, id) if len(values) == 0 { return } @@ -47,7 +47,7 @@ func benchmarkRetrieveBigTopic(count int, b *testing.B) { b.ResetTimer() for b.Loop() { - top.Receive(b.Context(), 0) + top.ReceiveSince(b.Context(), 0) } } @@ -69,7 +69,7 @@ func benchmarkRetrieveLastBigTopic(count int, b *testing.B) { b.ResetTimer() for b.Loop() { - top.Receive(ctx, id-1) + top.ReceiveSince(ctx, id-1) } } diff --git a/doc.go b/doc.go index 5d8a20f..ba311e6 100644 --- a/doc.go +++ b/doc.go @@ -5,7 +5,7 @@ of being pushed. It solves the problem, that you want to publish data to many goroutines. The standard way in go uses channels to push values to the readers. But channels have the problems, that either the goroutine sending the data has to wait for -the reader or has to discard messages,if the reader is too slow. A buffered +the reader or has to discard messages, if the reader is too slow. A buffered channel can help to delay the problem, but eventually the buffer could be full. The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber @@ -44,30 +44,30 @@ ignored. # Receive messages -Messages can be received with the Receive()-method: +Messages can be received with the ReceiveAll()- or ReceiveSince()-method: - id, values, err := top.Receive(context.Background(), 0) + id, values := topic.ReceiveAll() + id, values, err := top.ReceiveSince(context.Background(), 42) -The first returned value is the id created by the last Publish()-call. The -second value is a slice of all messages that were published before. +The returned id is the number of values in the topic. It can only increase. -To receive newer values, Receive() can be called again with the id from the last +The returned values are a slice of the published messages. + +To receive newer values, ReceiveSince() can be called again with the id from the last call: - id, values, err := top.Receive(context.Background(), 0) + id, values, err := top.ReceiveAll() ... - id, values, err = top.Receive(context.Background(), id) + id, values, err = top.ReceiveSince(context.Background(), id) -If the given id is zero, then all messages are returned. If the id is greater -than zero, then only messages are returned that were published by the topic -after the id was created. +Only messages, that were published after the given id are returned. -When there are no new values in the topic, then the Receive()-call blocks until +When there are no new values in the topic, then the ReceiveSince()-call blocks until there are new values. To add a timeout to the call, the context can be used: ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, values, err = top.Receive(ctx, id) + id, values, err = top.ReceiveSince(ctx, id) If there are no new values before the context is canceled, the topic returns the error of the context. For example `context.DeadlineExceeded` or `context.Canceled`. @@ -81,7 +81,7 @@ The usual pattern to subscribe to a topic is: var values []string var err error for { - id, values, err = top.Receive(ctx, id) + id, values, err = top.ReceiveSince(ctx, id) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { // Timeout @@ -92,7 +92,7 @@ The usual pattern to subscribe to a topic is: // Process values } -The loop will process all values published by the topic for one minute. +The loop will process all values published to the topic for one minute. # Get Last ID @@ -101,10 +101,11 @@ should be processed that were published after the loop starts, the method LastID() can be used: id := top.LastID() - id, values, err = top.Receive(context.Background(), id) + id, values, err = top.ReceiveSince(context.Background(), id) -The return value of LastID() is the highest id in the topic. So a Receive() call -on top.LastID() will only return data that was published after the call. +The return value of LastID() is the highest id in the topic. So +a ReceiveSince() call on top.LastID() will only return data that +was published after the call. A pattern to receive only new data is: @@ -112,7 +113,7 @@ A pattern to receive only new data is: var values []string var err error for { - id, values, err = top.Receive(context.Background(), id) + id, values, err = top.ReceiveSince(context.Background(), id) if err != nil { // Handle error } diff --git a/example_test.go b/example_test.go index b8dc988..5c75d4b 100644 --- a/example_test.go +++ b/example_test.go @@ -24,7 +24,7 @@ func ExampleTopic() { var values []string var err error for { - id, values, err = top.Receive(ctx, id) + id, values, err = top.ReceiveSince(ctx, id) if err != nil { if errors.Is(err, context.Canceled) { // shutdown was called. diff --git a/topic.go b/topic.go index dd47fb8..bf6bef3 100644 --- a/topic.go +++ b/topic.go @@ -64,19 +64,28 @@ func (t *Topic[T]) Publish(value ...T) uint64 { return t.lastID() } -// Receive returns all values from the topic. If id is 0, all values are -// returned. Otherwise, all values that were inserted after the id are returned. +// ReceiveAll returns all values from the topic, that is not pruned. +// +// For performance reasons, this function returns the internal slice of the +// topic. It is not allowed to manipulate the values. +func (t *Topic[T]) ReceiveAll() (uint64, []T) { + t.mu.RLock() + defer t.mu.RUnlock() + + return t.lastID(), t.data +} + +// ReceiveSince returns all values from the topic after the given id. // // If the id is lower than the lowest id in the topic, an error of type // UnknownIDError is returned. // // If there is no new data, Receive() blocks until there is new data or the -// given channel is done. The same happens with id 0, when there is no data at -// all in the topic. +// given channel is done. // // For performance reasons, this function returns the internal slice of the // topic. It is not allowed to manipulate the values. -func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) { +func (t *Topic[T]) ReceiveSince(ctx context.Context, id uint64) (uint64, []T, error) { t.mu.RLock() lastIDWhenStarted := t.lastID() @@ -88,7 +97,7 @@ func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) select { case <-c: - return t.Receive(ctx, lastIDWhenStarted) + return t.ReceiveSince(ctx, lastIDWhenStarted) case <-ctx.Done(): return 0, nil, ctx.Err() } @@ -96,10 +105,6 @@ func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) defer t.mu.RUnlock() - if id == 0 { - return t.lastID(), t.data, nil - } - if id < t.offset { return 0, nil, UnknownIDError{ID: id, FirstID: t.offset + 1} } diff --git a/topic_test.go b/topic_test.go index f086d68..fef4969 100644 --- a/topic_test.go +++ b/topic_test.go @@ -58,7 +58,7 @@ func TestPublishReceive(t *testing.T) { top := topic.New[string]() tt.f(top) - _, got, err := top.Receive(t.Context(), tt.receiveID) + _, got, err := top.ReceiveSince(t.Context(), tt.receiveID) if err != nil { t.Errorf("Did not expect an error, got: %v", err) @@ -92,7 +92,7 @@ func TestPublishWithoutValues(t *testing.T) { t.Errorf("Publish() without values returned %d, expected 1", id) } - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -109,7 +109,7 @@ func TestReceiveWithIDEqualLastID(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 10*time.Millisecond) defer cancel() - _, data, err := top.Receive(ctx, lastID) + _, data, err := top.ReceiveSince(ctx, lastID) if !errors.Is(err, context.DeadlineExceeded) { t.Errorf("Expected context deadline exceeded, got %v", err) } @@ -120,10 +120,9 @@ func TestReceiveWithIDEqualLastID(t *testing.T) { func TestPrune(t *testing.T) { for _, tt := range []struct { - name string - f func(*topic.Topic[string]) time.Time - receiveID uint64 - expect []string + name string + f func(*topic.Topic[string]) time.Time + expect []string }{ { "Prune after two values", @@ -135,7 +134,6 @@ func TestPrune(t *testing.T) { top.Publish("v4") return pruneTime }, - 0, values("v3", "v4"), }, { @@ -144,7 +142,6 @@ func TestPrune(t *testing.T) { top.Publish("v1") return time.Now() }, - 0, values(), }, { @@ -155,7 +152,6 @@ func TestPrune(t *testing.T) { top.Publish("v2") return t }, - 0, values("v2"), }, } { @@ -165,10 +161,7 @@ func TestPrune(t *testing.T) { top.Prune(pruneTime) - ctxCanceled, cancel := context.WithCancel(t.Context()) - cancel() - - _, got, _ := top.Receive(ctxCanceled, 0) + _, got := top.ReceiveAll() if !cmpSlice(got, tt.expect) { t.Errorf("Got %v, want %v", got, tt.expect) @@ -183,7 +176,7 @@ func TestPruneEmptyTopic(t *testing.T) { top.Prune(time.Now()) top.Publish("foo") - _, got, err := top.Receive(t.Context(), 0) + _, got, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Fatalf("Receive(): %v", err) } @@ -206,7 +199,7 @@ func TestPruneAllElements(t *testing.T) { ctxCanceled, cancel := context.WithCancel(t.Context()) cancel() - _, data, _ := top.Receive(ctxCanceled, 0) + _, data, _ := top.ReceiveSince(ctxCanceled, 0) if data != nil { t.Errorf("Expected nil data after pruning all, got %v", data) } @@ -218,7 +211,7 @@ func TestPruneUsedValue(t *testing.T) { top.Publish("val2") top.Publish("val3") ti := time.Now() - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Fatalf("Receive(): %v", err) } @@ -239,7 +232,7 @@ func TestPruneWithPastTime(t *testing.T) { pastTime := time.Now().Add(-1 * time.Hour) top.Prune(pastTime) - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -260,10 +253,7 @@ func TestMultiplePrunes(t *testing.T) { top.Prune(t1) top.Prune(t2) - _, data, err := top.Receive(t.Context(), 0) - if err != nil { - t.Errorf("Receive() error: %v", err) - } + _, data := top.ReceiveAll() if len(data) != 1 || data[0] != "v4" { t.Errorf("After multiple prunes got %v, expected [v4]", data) } @@ -278,7 +268,7 @@ func TestReceiveWithExactOffsetAfterPrune(t *testing.T) { top.Prune(ti) - _, data, err := top.Receive(t.Context(), 2) + _, data, err := top.ReceiveSince(t.Context(), 2) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -297,7 +287,7 @@ func TestErrUnknownID(t *testing.T) { top.Prune(ti) - _, _, err := top.Receive(t.Context(), 1) + _, _, err := top.ReceiveSince(t.Context(), 1) topicErr, ok := err.(topic.UnknownIDError) if !ok { t.Errorf("Expected err to be a topic.ErrUnknownID, got: %v", err) @@ -320,7 +310,7 @@ func TestReceiveWithFutureID(t *testing.T) { done := make(chan struct{}) go func() { - _, _, err := top.Receive(t.Context(), 100) + _, _, err := top.ReceiveSince(t.Context(), 100) if err != nil { t.Errorf("Receive() returned unexpected error: %v", err) } @@ -349,7 +339,7 @@ func TestReceiveReturnsCorrectID(t *testing.T) { top.Publish("v1") top.Publish("v2") - id, _, err := top.Receive(t.Context(), 0) + id, _, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -357,7 +347,7 @@ func TestReceiveReturnsCorrectID(t *testing.T) { t.Errorf("Receive() returned id %d, expected 2", id) } - id, _, err = top.Receive(t.Context(), 1) + id, _, err = top.ReceiveSince(t.Context(), 1) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -433,7 +423,7 @@ func TestReceiveBlocking(t *testing.T) { // Send values as soon as Receive() returnes. received := make(chan []string) go func() { - _, got, err := top.Receive(t.Context(), 0) + _, got, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() returned the unexpected error %v", err) } @@ -462,7 +452,7 @@ func TestBlockUntilContexDone(t *testing.T) { // Send values as soon as Receive() returnes. received := make(chan []string) go func() { - _, got, err := top.Receive(ctx, 0) + _, got, err := top.ReceiveSince(ctx, 0) if !errors.Is(err, context.Canceled) { t.Errorf("Receive() returned the unexpected error %v", err) } @@ -502,7 +492,7 @@ func TestBlockOnHighestID(t *testing.T) { // Close done channel, after Receive() unblocks. done := make(chan struct{}) go func() { - if _, _, err := top.Receive(t.Context(), highestID); err != nil { + if _, _, err := top.ReceiveSince(t.Context(), highestID); err != nil { t.Errorf("Receive() returned the unexpected error %v", err) } close(done) @@ -534,7 +524,7 @@ func TestPruneDuringBlockedReceive(t *testing.T) { done := make(chan struct{}) go func() { - top.Receive(t.Context(), 1) // Blocking + top.ReceiveSince(t.Context(), 1) // Blocking close(done) }() @@ -562,7 +552,7 @@ func TestReceiveOnCanceledChannel(t *testing.T) { cancel() // When context is canceled, Receive(0) should still return its data. - _, got, err := top.Receive(ctx, 0) + _, got, err := top.ReceiveSince(ctx, 0) if err != nil { t.Errorf("Receive() returned the unexpected error %v", err) } @@ -573,7 +563,7 @@ func TestReceiveOnCanceledChannel(t *testing.T) { // Send values as soon as Receive() returnes. received := make(chan []string) go func() { - _, got, err := top.Receive(ctx, highestID+100) + _, got, err := top.ReceiveSince(ctx, highestID+100) if !errors.Is(err, context.Canceled) { t.Errorf("Receive() returned the unexpected error %v", err) } @@ -610,7 +600,7 @@ func TestConcurrentPublishes(t *testing.T) { wg.Wait() - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) } @@ -630,7 +620,7 @@ func TestMultipleConcurrentReceives(t *testing.T) { results := make([][]string, numReceivers) for i := range numReceivers { wg.Go(func() { - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) return @@ -660,7 +650,7 @@ func TestTopicWithStruct(t *testing.T) { top.Publish(myType{5, "foobar"}) top.Publish(myType{5, "foobar"}) - _, values, err := top.Receive(t.Context(), 0) + _, values, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("receive: %v", err) } @@ -680,7 +670,7 @@ func TestTopicWithPointer(t *testing.T) { top.Publish(&myType{5, "foobar"}) top.Publish(&myType{5, "foobar"}) - _, values, err := top.Receive(t.Context(), 0) + _, values, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("receive: %v", err) } @@ -695,7 +685,7 @@ func TestTopicWithNilPointers(t *testing.T) { top := topic.New[*string]() top.Publish(nil, nil) - _, data, err := top.Receive(t.Context(), 0) + _, data, err := top.ReceiveSince(t.Context(), 0) if err != nil { t.Errorf("Receive() error: %v", err) }