@@ -3,7 +3,7 @@ package go_rabbitmq
3
3
import (
4
4
"errors"
5
5
"fmt"
6
- "github.com/streadway/amqp "
6
+ amqp "github.com/rabbitmq/amqp091-go "
7
7
"net/url"
8
8
"time"
9
9
)
@@ -55,10 +55,11 @@ var (
55
55
errAlreadyClosed = errors .New ("already closed: not connected to the server" )
56
56
errShutdown = errors .New ("session is shutting down" )
57
57
errFailedToPush = errors .New ("failed to push: not connected" )
58
+ err error
58
59
)
59
60
60
61
// New 创建一个新的消费者状态实例,并自动尝试连接到服务器
61
- func New (config * Config , queueName , exchange , routeKey string , exchangeType , prefetchCount int , durable bool ) * RabbitMQ {
62
+ func New (config * Config , queueName , exchange , routeKey string , exchangeType , prefetchCount int , durable bool ) ( * RabbitMQ , error ) {
62
63
// amqp 出现url.Parse导致的错误 是因为特殊字符需要进行urlencode编码
63
64
password := url .QueryEscape (config .Password )
64
65
// amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
@@ -89,8 +90,12 @@ func New(config *Config, queueName, exchange, routeKey string, exchangeType, pre
89
90
PrefetchCount : prefetchCount ,
90
91
Durable : durable ,
91
92
}
93
+ rabbitmq .conn , err = rabbitmq .connect (addr )
94
+ if err := rabbitmq .init (rabbitmq .conn ); err != nil {
95
+ return nil , err
96
+ }
92
97
go rabbitmq .handleReconnect (rabbitmq .Addr )
93
- return rabbitmq
98
+ return rabbitmq , nil
94
99
}
95
100
96
101
// handleReconnect 将在notifyConnClose上等待连接错误,然后不断尝试重新连接。
0 commit comments