Commit 63793f9

mo khan <mo@mokhan.ca>
2025-07-24 19:09:22
feat: use mutually exclusive lock for pub/sub
1 parent c383121
pkg/event/aggregator.go
@@ -1,11 +1,23 @@
 package event
 
-import "github.com/xlgmokha/x/pkg/x"
+import (
+	"sync"
+
+	"github.com/xlgmokha/x/pkg/x"
+)
 
 type Aggregator struct {
+	mu            sync.RWMutex
 	subscriptions map[Event][]Subscription
 }
 
+func WithDefaults() x.Option[*Aggregator] {
+	return x.With(func(item *Aggregator) {
+		item.mu = sync.RWMutex{}
+		item.subscriptions = map[Event][]Subscription{}
+	})
+}
+
 func WithoutSubscriptions() x.Option[*Aggregator] {
 	return WithSubscriptions(map[Event][]Subscription{})
 }
@@ -17,10 +29,16 @@ func WithSubscriptions(subscriptions map[Event][]Subscription) x.Option[*Aggrega
 }
 
 func (a *Aggregator) Subscribe(event Event, f Subscription) {
+	a.mu.Lock()
+	defer a.mu.Unlock()
+
 	a.subscriptions[event] = append(a.subscriptions[event], f)
 }
 
 func (a *Aggregator) Publish(event Event, message any) {
+	a.mu.RLock()
+	defer a.mu.RUnlock()
+
 	for _, subscription := range a.subscriptions[event] {
 		subscription(message)
 	}
pkg/event/aggregator_test.go
@@ -10,13 +10,13 @@ import (
 func TestAggregator(t *testing.T) {
 	t.Run("Publish", func(t *testing.T) {
 		t.Run("without any subscribers", func(t *testing.T) {
-			aggregator := x.New(WithoutSubscriptions())
+			aggregator := x.New(WithDefaults())
 
 			aggregator.Publish("announcements.engineering", "Business, Business, Business... Numbers!")
 		})
 
 		t.Run("with a single subscriber", func(t *testing.T) {
-			aggregator := x.New(WithoutSubscriptions())
+			aggregator := x.New(WithDefaults())
 			called := false
 
 			aggregator.Subscribe("announcement", func(message any) {
@@ -30,7 +30,7 @@ func TestAggregator(t *testing.T) {
 		})
 
 		t.Run("with multiple subscribers", func(t *testing.T) {
-			aggregator := x.New(WithoutSubscriptions())
+			aggregator := x.New(WithDefaults())
 			called := map[int]bool{}
 
 			aggregator.Subscribe("announcement", func(message any) {
pkg/event/typed_aggregator.go
@@ -9,7 +9,7 @@ type TypedAggregator[T any] struct {
 func New[T any]() *TypedAggregator[T] {
 	return NewWith[T](
 		x.New(
-			WithoutSubscriptions(),
+			WithDefaults(),
 		),
 	)
 }