Skip to content

Commit bf8775e

Browse files
authored
add WriterData field to the message struct (#1059)
1 parent 172fe75 commit bf8775e

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

message.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ type Message struct {
2020
Value []byte
2121
Headers []Header
2222

23+
// This field is used to hold arbitrary data you wish to include, so it
24+
// will be available when handle it on the Writer's `Completion` method,
25+
// this support the application can do any post operation on each message.
26+
WriterData interface{}
27+
2328
// If not set at the creation, Time will be automatically set when
2429
// writing the message.
2530
Time time.Time

writer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,10 @@ func TestWriter(t *testing.T) {
174174
scenario: "test default configuration values",
175175
function: testWriterDefaults,
176176
},
177+
{
178+
scenario: "test write message with writer data",
179+
function: testWriteMessageWithWriterData,
180+
},
177181
}
178182

179183
for _, test := range tests {
@@ -719,6 +723,45 @@ func testWriterUnexpectedMessageTopic(t *testing.T) {
719723
}
720724
}
721725

726+
func testWriteMessageWithWriterData(t *testing.T) {
727+
topic := makeTopic()
728+
createTopic(t, topic, 1)
729+
defer deleteTopic(t, topic)
730+
w := newTestWriter(WriterConfig{
731+
Topic: topic,
732+
Balancer: &RoundRobin{},
733+
})
734+
defer w.Close()
735+
736+
index := 0
737+
w.Completion = func(messages []Message, err error) {
738+
if err != nil {
739+
t.Errorf("unexpected error %v", err)
740+
}
741+
742+
for _, msg := range messages {
743+
meta := msg.WriterData.(int)
744+
if index != meta {
745+
t.Errorf("metadata is not correct, index = %d, writerData = %d", index, meta)
746+
}
747+
index += 1
748+
}
749+
}
750+
751+
msg := Message{Key: []byte("key"), Value: []byte("Hello World")}
752+
for i := 0; i < 5; i++ {
753+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
754+
defer cancel()
755+
756+
msg.WriterData = i
757+
err := w.WriteMessages(ctx, msg)
758+
if err != nil {
759+
t.Errorf("unexpected error %v", err)
760+
}
761+
}
762+
763+
}
764+
722765
func testWriterAutoCreateTopic(t *testing.T) {
723766
topic := makeTopic()
724767
// Assume it's going to get created.

0 commit comments

Comments
 (0)