Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func benchmarkPublishWithXReceivers(count int, b *testing.B) {
var id uint64
var values []string
for {
id, values, _ = top.Receive(ctx, id)
id, values, _ = top.ReceiveSince(ctx, id)
if len(values) == 0 {
return
}
Expand Down Expand Up @@ -47,7 +47,7 @@ func benchmarkRetrieveBigTopic(count int, b *testing.B) {
b.ResetTimer()

for b.Loop() {
top.Receive(b.Context(), 0)
top.ReceiveSince(b.Context(), 0)
}
}

Expand All @@ -69,7 +69,7 @@ func benchmarkRetrieveLastBigTopic(count int, b *testing.B) {
b.ResetTimer()

for b.Loop() {
top.Receive(ctx, id-1)
top.ReceiveSince(ctx, id-1)
}
}

Expand Down
39 changes: 20 additions & 19 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ of being pushed.
It solves the problem, that you want to publish data to many goroutines. The
standard way in go uses channels to push values to the readers. But channels
have the problems, that either the goroutine sending the data has to wait for
the reader or has to discard messages,if the reader is too slow. A buffered
the reader or has to discard messages, if the reader is too slow. A buffered
channel can help to delay the problem, but eventually the buffer could be full.

The idea of pulling updates is inspired by Kafka or Redis-Streams. A subscriber
Expand Down Expand Up @@ -44,30 +44,30 @@ ignored.

# Receive messages

Messages can be received with the Receive()-method:
Messages can be received with the ReceiveAll()- or ReceiveSince()-method:

id, values, err := top.Receive(context.Background(), 0)
id, values := topic.ReceiveAll()
id, values, err := top.ReceiveSince(context.Background(), 42)

The first returned value is the id created by the last Publish()-call. The
second value is a slice of all messages that were published before.
The returned id is the number of values in the topic. It can only increase.

To receive newer values, Receive() can be called again with the id from the last
The returned values are a slice of the published messages.

To receive newer values, ReceiveSince() can be called again with the id from the last
call:

id, values, err := top.Receive(context.Background(), 0)
id, values, err := top.ReceiveAll()
...
id, values, err = top.Receive(context.Background(), id)
id, values, err = top.ReceiveSince(context.Background(), id)

If the given id is zero, then all messages are returned. If the id is greater
than zero, then only messages are returned that were published by the topic
after the id was created.
Only messages, that were published after the given id are returned.

When there are no new values in the topic, then the Receive()-call blocks until
When there are no new values in the topic, then the ReceiveSince()-call blocks until
there are new values. To add a timeout to the call, the context can be used:

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, values, err = top.Receive(ctx, id)
id, values, err = top.ReceiveSince(ctx, id)

If there are no new values before the context is canceled, the topic returns the error
of the context. For example `context.DeadlineExceeded` or `context.Canceled`.
Expand All @@ -81,7 +81,7 @@ The usual pattern to subscribe to a topic is:
var values []string
var err error
for {
id, values, err = top.Receive(ctx, id)
id, values, err = top.ReceiveSince(ctx, id)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Timeout
Expand All @@ -92,7 +92,7 @@ The usual pattern to subscribe to a topic is:
// Process values
}

The loop will process all values published by the topic for one minute.
The loop will process all values published to the topic for one minute.

# Get Last ID

Expand All @@ -101,18 +101,19 @@ should be processed that were published after the loop starts, the method
LastID() can be used:

id := top.LastID()
id, values, err = top.Receive(context.Background(), id)
id, values, err = top.ReceiveSince(context.Background(), id)

The return value of LastID() is the highest id in the topic. So a Receive() call
on top.LastID() will only return data that was published after the call.
The return value of LastID() is the highest id in the topic. So
a ReceiveSince() call on top.LastID() will only return data that
was published after the call.

A pattern to receive only new data is:

id := top.LastID()
var values []string
var err error
for {
id, values, err = top.Receive(context.Background(), id)
id, values, err = top.ReceiveSince(context.Background(), id)
if err != nil {
// Handle error
}
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func ExampleTopic() {
var values []string
var err error
for {
id, values, err = top.Receive(ctx, id)
id, values, err = top.ReceiveSince(ctx, id)
if err != nil {
if errors.Is(err, context.Canceled) {
// shutdown was called.
Expand Down
25 changes: 15 additions & 10 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,28 @@ func (t *Topic[T]) Publish(value ...T) uint64 {
return t.lastID()
}

// Receive returns all values from the topic. If id is 0, all values are
// returned. Otherwise, all values that were inserted after the id are returned.
// ReceiveAll returns all values from the topic, that is not pruned.
//
// For performance reasons, this function returns the internal slice of the
// topic. It is not allowed to manipulate the values.
func (t *Topic[T]) ReceiveAll() (uint64, []T) {
t.mu.RLock()
defer t.mu.RUnlock()

return t.lastID(), t.data
}

// ReceiveSince returns all values from the topic after the given id.
//
// If the id is lower than the lowest id in the topic, an error of type
// UnknownIDError is returned.
//
// If there is no new data, Receive() blocks until there is new data or the
// given channel is done. The same happens with id 0, when there is no data at
// all in the topic.
// given channel is done.
//
// For performance reasons, this function returns the internal slice of the
// topic. It is not allowed to manipulate the values.
func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error) {
func (t *Topic[T]) ReceiveSince(ctx context.Context, id uint64) (uint64, []T, error) {
t.mu.RLock()
lastIDWhenStarted := t.lastID()

Expand All @@ -88,18 +97,14 @@ func (t *Topic[T]) Receive(ctx context.Context, id uint64) (uint64, []T, error)

select {
case <-c:
return t.Receive(ctx, lastIDWhenStarted)
return t.ReceiveSince(ctx, lastIDWhenStarted)
case <-ctx.Done():
return 0, nil, ctx.Err()
}
}

defer t.mu.RUnlock()

if id == 0 {
return t.lastID(), t.data, nil
}

if id < t.offset {
return 0, nil, UnknownIDError{ID: id, FirstID: t.offset + 1}
}
Expand Down
Loading
Loading