Skip to content

Commit 174188e

Browse files
authored
Updated Writer and Reader docs to show use of multiple brokers. (#959)
Based on the discussion [here](#894 (comment))
1 parent f5c0e30 commit 174188e

File tree

3 files changed

+37
-33
lines changed

3 files changed

+37
-33
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,8 @@ _testmain.go
3333
# Goland
3434
.idea
3535

36+
#IntelliJ
37+
*.iml
38+
3639
# govendor
3740
/vendor/*/

README.md

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ process shutdown.
222222
```go
223223
// make a new reader that consumes from topic-A, partition 0, at offset 42
224224
r := kafka.NewReader(kafka.ReaderConfig{
225-
Brokers: []string{"localhost:9092"},
225+
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
226226
Topic: "topic-A",
227227
Partition: 0,
228228
MinBytes: 10e3, // 10KB
@@ -253,7 +253,7 @@ ReadMessage automatically commits offsets when using consumer groups.
253253
```go
254254
// make a new reader that consumes from topic-A
255255
r := kafka.NewReader(kafka.ReaderConfig{
256-
Brokers: []string{"localhost:9092"},
256+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
257257
GroupID: "consumer-group-id",
258258
Topic: "topic-A",
259259
MinBytes: 10e3, // 10KB
@@ -317,7 +317,7 @@ by setting CommitInterval on the ReaderConfig.
317317
```go
318318
// make a new reader that consumes from topic-A
319319
r := kafka.NewReader(kafka.ReaderConfig{
320-
Brokers: []string{"localhost:9092"},
320+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
321321
GroupID: "consumer-group-id",
322322
Topic: "topic-A",
323323
MinBytes: 10e3, // 10KB
@@ -342,7 +342,7 @@ to use in most cases as it provides additional features:
342342
```go
343343
// make a writer that produces to topic-A, using the least-bytes distribution
344344
w := &kafka.Writer{
345-
Addr: kafka.TCP("localhost:9092"),
345+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
346346
Topic: "topic-A",
347347
Balancer: &kafka.LeastBytes{},
348348
}
@@ -376,7 +376,7 @@ if err := w.Close(); err != nil {
376376
// Make a writer that publishes messages to topic-A.
377377
// The topic will be created if it is missing.
378378
w := &Writer{
379-
Addr: TCP("localhost:9092"),
379+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
380380
Topic: "topic-A",
381381
AllowAutoTopicCreation: true,
382382
}
@@ -427,7 +427,7 @@ the topic on a per-message basis by setting `Message.Topic`.
427427

428428
```go
429429
w := &kafka.Writer{
430-
Addr: kafka.TCP("localhost:9092"),
430+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
431431
// NOTE: When Topic is not defined here, each Message must define it instead.
432432
Balancer: &kafka.LeastBytes{},
433433
}
@@ -478,7 +478,7 @@ aforementioned Sarama partitioners would route them to.
478478

479479
```go
480480
w := &kafka.Writer{
481-
Addr: kafka.TCP("localhost:9092"),
481+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
482482
Topic: "topic-A",
483483
Balancer: &kafka.Hash{},
484484
}
@@ -491,7 +491,7 @@ default ```consistent_random``` partition strategy.
491491

492492
```go
493493
w := &kafka.Writer{
494-
Addr: kafka.TCP("localhost:9092"),
494+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
495495
Topic: "topic-A",
496496
Balancer: kafka.CRC32Balancer{},
497497
}
@@ -505,7 +505,7 @@ the partition which is not permitted.
505505

506506
```go
507507
w := &kafka.Writer{
508-
Addr: kafka.TCP("localhost:9092"),
508+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
509509
Topic: "topic-A",
510510
Balancer: kafka.Murmur2Balancer{},
511511
}
@@ -517,7 +517,7 @@ Compression can be enabled on the `Writer` by setting the `Compression` field:
517517

518518
```go
519519
w := &kafka.Writer{
520-
Addr: kafka.TCP("localhost:9092"),
520+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
521521
Topic: "topic-A",
522522
Compression: kafka.Snappy,
523523
}
@@ -559,7 +559,7 @@ dialer := &kafka.Dialer{
559559
}
560560

561561
r := kafka.NewReader(kafka.ReaderConfig{
562-
Brokers: []string{"localhost:9093"},
562+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
563563
GroupID: "consumer-group-id",
564564
Topic: "topic-A",
565565
Dialer: dialer,
@@ -568,6 +568,20 @@ r := kafka.NewReader(kafka.ReaderConfig{
568568

569569
### Writer
570570

571+
572+
Direct Writer creation
573+
574+
```go
575+
w := kafka.Writer{
576+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
577+
Topic: "topic-A",
578+
Balancer: &kafka.Hash{},
579+
Transport: &kafka.Transport{
580+
TLS: &tls.Config{},
581+
},
582+
}
583+
```
584+
571585
Using `kafka.NewWriter`
572586

573587
```go
@@ -578,26 +592,13 @@ dialer := &kafka.Dialer{
578592
}
579593

580594
w := kafka.NewWriter(kafka.WriterConfig{
581-
Brokers: []string{"localhost:9093"},
595+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
582596
Topic: "topic-A",
583597
Balancer: &kafka.Hash{},
584598
Dialer: dialer,
585599
})
586600
```
587-
588-
Direct Writer creation
589-
590-
```go
591-
w := kafka.Writer{
592-
Addr: kafka.TCP("localhost:9093"),
593-
Topic: "topic-A",
594-
Balancer: &kafka.Hash{},
595-
Transport: &kafka.Transport{
596-
TLS: &tls.Config{},
597-
},
598-
}
599-
600-
```
601+
Note that `kafka.NewWriter` and `kafka.WriterConfig` are deprecated and will be removed in a future release.
601602

602603
## SASL Support
603604

@@ -654,7 +655,7 @@ dialer := &kafka.Dialer{
654655
}
655656

656657
r := kafka.NewReader(kafka.ReaderConfig{
657-
Brokers: []string{"localhost:9093"},
658+
Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"},
658659
GroupID: "consumer-group-id",
659660
Topic: "topic-A",
660661
Dialer: dialer,
@@ -677,7 +678,7 @@ sharedTransport := &kafka.Transport{
677678
}
678679

679680
w := kafka.Writer{
680-
Addr: kafka.TCP("localhost:9092"),
681+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
681682
Topic: "topic-A",
682683
Balancer: &kafka.Hash{},
683684
Transport: sharedTransport,
@@ -700,7 +701,7 @@ sharedTransport := &kafka.Transport{
700701
}
701702

702703
client := &kafka.Client{
703-
Addr: kafka.TCP("localhost:9092"),
704+
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
704705
Timeout: 10 * time.Second,
705706
Transport: sharedTransport,
706707
}
@@ -714,7 +715,7 @@ endTime := time.Now()
714715
batchSize := int(10e6) // 10MB
715716

716717
r := kafka.NewReader(kafka.ReaderConfig{
717-
Brokers: []string{"localhost:9092"},
718+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
718719
Topic: "my-topic1",
719720
Partition: 0,
720721
MinBytes: batchSize,
@@ -756,7 +757,7 @@ func logf(msg string, a ...interface{}) {
756757
}
757758

758759
r := kafka.NewReader(kafka.ReaderConfig{
759-
Brokers: []string{"localhost:9092"},
760+
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
760761
Topic: "my-topic1",
761762
Partition: 0,
762763
Logger: kafka.LoggerFunc(logf),

writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
//
3030
// // Construct a synchronous writer (the default mode).
3131
// w := &kafka.Writer{
32-
// Addr: kafka.TCP("localhost:9092"),
32+
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
3333
// Topic: "topic-A",
3434
// RequiredAcks: kafka.RequireAll,
3535
// }
@@ -55,7 +55,7 @@ import (
5555
// writer to receive notifications of messages being written to kafka:
5656
//
5757
// w := &kafka.Writer{
58-
// Addr: kafka.TCP("localhost:9092"),
58+
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
5959
// Topic: "topic-A",
6060
// RequiredAcks: kafka.RequireAll,
6161
// Async: true, // make the writer asynchronous

0 commit comments

Comments
 (0)