Commit 58f695c
Changed files (2)
pkg
pkg/event/aggregator.go
@@ -17,6 +17,15 @@ func (a *Aggregator) Subscribe(event Event, f Subscription) {
a.subscriptions[event] = append(a.subscriptions[event], f)
}
+func (a *Aggregator) SubscribeTo[T any](event Event, f func(T)) {
+ wrapper := func(message any) {
+ if typedMessage, ok := message.(T); ok {
+ f(typedMessage)
+ }
+ }
+ a.Subscribe(event, wrapper)
+}
+
func (a *Aggregator) Publish(event Event, message any) {
for _, subscription := range a.subscriptions[event] {
subscription(message)
pkg/event/aggregator_test.go
@@ -9,44 +9,58 @@ import (
func TestEventAggregator(t *testing.T) {
t.Run("Publish", func(t *testing.T) {
t.Run("without any subscribers", func(t *testing.T) {
- events := New()
+ aggregator := New()
- events.Publish("announcements.engineering", "Business, Business, Business... Numbers!")
+ aggregator.Publish("announcements.engineering", "Business, Business, Business... Numbers!")
})
t.Run("with a single subscriber", func(t *testing.T) {
- events := New()
+ aggregator := New()
called := false
- events.Subscribe("announcement", func(message any) {
+ aggregator.Subscribe("announcement", func(message any) {
called = true
assert.Equal(t, "Hello", message)
})
- events.Publish("announcement", "Hello")
+ aggregator.Publish("announcement", "Hello")
assert.True(t, called)
})
t.Run("with multiple subscribers", func(t *testing.T) {
- events := New()
+ aggregator := New()
called := map[int]bool{}
- events.Subscribe("announcement", func(message any) {
+ aggregator.Subscribe("announcement", func(message any) {
called[0] = true
assert.Equal(t, "Greetings", message)
})
- events.Subscribe("announcement", func(message any) {
+ aggregator.Subscribe("announcement", func(message any) {
called[1] = true
assert.Equal(t, "Greetings", message)
})
- events.Publish("announcement", "Greetings")
+ aggregator.Publish("announcement", "Greetings")
assert.Equal(t, 2, len(called))
assert.True(t, called[0])
assert.True(t, called[1])
})
+
+ t.Run("with a strongly typed payload", func(t *testing.T) {
+ aggregator := New()
+
+ type announcement struct {
+ message string
+ }
+
+ aggregator.SubscribeTo("announcement", func(payload announcement) {
+ assert.Equal(t, "Hello", payload.message)
+ })
+
+ aggregator.Publish("announcement", announcement{message: "Hello"})
+ })
})
}