Go操作RabbitMQ

下面我们根据RabbitMQ的六种工作模式,依次学习Go操作RabbitMQ。

使用amqp库:github.com/streadway/amqp

函数说明

源码是最好的说明,不懂也可以多看源码。

QueueDeclare()

创建队列的函数QueueDeclare(),函数签名:

1
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
  • name:队列名
  • durable:是否是持久队列,如果是,那么重启MQ之后,该队列不会被删除,否则,该队列会被删除
  • autoDelete:是否自动删除。如果为true,当没有Consumer时,会被自动删除掉
  • exclusive:是否独占。如果为true,只能有一个消费者监听这队列
  • noWait:如果为True,那么其他连接尝试修改该队列,将会触发异常
  • args:额外参数

args的Table类型:type Table map[string]interface{}

Publish()

Channel的发送消息的函数Publish(),函数签名:

1
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
  • exchange:交换机名称。Simple模式下交换机会使用默认的 ""
  • key:路由名称。
  • mandatory:如果为 true,那么当没有合适的 RoutingKey 时,将会触发Channel的NotifyReturn(把消息返还给发送者)
  • immediate:如果为 true,那么当没有合适的消费着时,将会触发Channel的NotifyReturn(把消息返还给发送者)
  • msg:具体要发送消息的结构体

Consume()

Chanel消费函数Consume(),函数签名:

1
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
  • queue:队列名称
  • consumer:路由名称
  • autoAck:是否自动确认
  • exclusive:是否独占
  • noLocal:未使用的参数
  • noWait:如果为True,那么其他连接尝试修改该队列,将会触发异常
  • args:额外参数

ExchangeDeclare()

创建交换机ExchangeDeclare(),函数签名:

1
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error

主要说说kind这个参数,有fanout、direct、topic三种:

  • fanout:fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中(在Publish/Subscribe模式中使用)
  • direct:direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中(在Routing模式中使用)
  • topic:规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送(在Topics模式中使用)。它的约定是:
    • routing key是一个由.分隔的字符串(我们将被.分隔开的每一段独立的字符串称为一个单词),如stock.usd.nysenyse.vmwquick.orange.rabbit
    • binding key中可以存在两种特殊字符*#,用于做模糊匹配,其中*用于匹配1个单词,#用于匹配n个单词(可以是0个),binding key与routing key一样也是由.分隔的字符串。

Simple

simple模式

简单来说就是1个生产者生产的消息由1个消费者消费掉。

我们实现生产者向 RabbitMQ 中写入一条消息,然后消费者获取该消息这么一个简单的功能,具体流程图如下:

在web管理页面上新建一个qingbo用户以及新建一个Virtual Hosts参考:Simple模式

目录结构:

rabbitmq/simple/下创建rabbitmq.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package simple

import (
"fmt"
"log"

"github.com/streadway/amqp"
)

// 连接信息amqp://用户名:密码@ip/Virtual Hosts
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"

// Rabbit RabbitMQ结构体
type Rabbit struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind Key 名称
MqUrl string // 连接信息
}

// NewRabbitMQ 创建Rabbit结构体实例
func NewRabbitMQ(queueName, exchange, key string) *Rabbit {
return &Rabbit{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl: rmqURL,
}
}

// Destroy 断开channel和connection
func (r Rabbit) Destroy() error {
err := r.channel.Close()
err = r.conn.Close()
return err
}

// 错误处理函数
func (r Rabbit) failOnErr(err error, msg string) {
if err != nil {
log.Fatal(msg, err)
}
}

// NewRabbitMQSimple 创建Simple模式下RabbitMQ实例
func NewRabbitMQSimple(queueName string) *Rabbit {
rabbitMQ := NewRabbitMQ(queueName, "", "") // 创建RabbitMQ实例
var err error
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) // 获取connection
rabbitMQ.failOnErr(err, "failed to connect RabbitMQ")
rabbitMQ.channel, err = rabbitMQ.conn.Channel() // 获取channel
rabbitMQ.failOnErr(err, "failed to open a channel")
return rabbitMQ
}

// PublishSimple Simple模式 生产者
func (r Rabbit) PublishSimple(msg string) {
// 1.申请队列,如果队列不存在会自动创建,存在则跳过创建
_, err := r.channel.QueueDeclare(
r.QueueName, // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞处理
nil, // 其他额外的属性
)
if err != nil {
log.Println(err)
}
// 2.调用channel 发送消息到队列中
err = r.channel.Publish(
r.Exchange,
r.QueueName,
false, // 如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
if err != nil {
log.Println(err)
}
}

// ConsumeSimple Simple模式 消费者
func (r Rabbit) ConsumeSimple() {
// 1.申请队列,如果队列不存在会自动创建,存在则跳过创建
queue, err := r.channel.QueueDeclare(
r.QueueName, // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
log.Println(err)
}
// 2.消费消息
msgs, err := r.channel.Consume(
queue.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否独有
false, // 设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // 队列是否阻塞
nil, // 额外的属性
)
if err != nil {
log.Println(err)
}

// 3.启用协程处理消息
forever := make(chan bool) // 开个channel阻塞住,让开启的协程能一直跑着
go func() {
for delivery := range msgs {
// 消息逻辑处理,可以自行设计逻辑
fmt.Println("Received a message:", string(delivery.Body))
}
}()
fmt.Println(" [*] Waiting for messages.")
<-forever
}

rabbitmq/simple/publisher/下创建main.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"fmt"
"mq/rabbitmq/simple"
)

func main() {
rabbitMQSimple := simple.NewRabbitMQSimple("test1")
rabbitMQSimple.PublishSimple("hello,world!")
fmt.Println("发送成功!")
}

rabbitmq/simple/consumer/下创建main.go,代码如下:

1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/simple"

func main() {
rabbitMQSimple := simple.NewRabbitMQSimple("test1")
rabbitMQSimple.ConsumeSimple()
}

运行生产者下的main和消费者下的main:

在RabbitMQ的Web管理界面中也可以看到:

Work queues

Work queues

简单来说就是1个生产者生产消息由多个消费者消费,其中每个消息只会被1个消费者消费!

Work模式和Simple其实是很好理解的两种模式。Work和Simple相比无非就是Work模式有多个消费者。这样可以起到负载均衡的效果。

所以在代码层面Work和Simple是完全一样的,只不过Work有多个消费者罢了。

目录结构:

rabbitmq/work/创建rabbit.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package work

import (
"fmt"
"log"

"github.com/streadway/amqp"
)

// 连接信息amqp://用户名:密码@ip/Virtual Hosts
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"

// Rabbit RabbitMQ结构体
type Rabbit struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind Key 名称
MqUrl string // 连接信息
}

// NewRabbitMQ 创建Rabbit结构体实例
func NewRabbitMQ(queueName, exchange, key string) *Rabbit {
return &Rabbit{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl: rmqURL,
}
}

// Destroy 断开channel和connection
func (r Rabbit) Destroy() error {
err := r.channel.Close()
err = r.conn.Close()
return err
}

// 错误处理函数
func (r Rabbit) failOnErr(err error, msg string) {
if err != nil {
log.Fatal(msg, err)
}
}

// NewRabbitMQWork 创建Work模式下RabbitMQ实例
func NewRabbitMQWork(queueName string) *Rabbit {
rabbitMQ := NewRabbitMQ(queueName, "", "") // 创建RabbitMQ实例
var err error
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) // 获取connection
rabbitMQ.failOnErr(err, "failed to connect RabbitMQ")
rabbitMQ.channel, err = rabbitMQ.conn.Channel() // 获取channel
rabbitMQ.failOnErr(err, "failed to open a channel")
return rabbitMQ
}

// PublishWork Work模式 生产者
func (r Rabbit) PublishWork(msg string) {
// 1.申请队列,如果队列不存在会自动创建,存在则跳过创建
_, err := r.channel.QueueDeclare(
r.QueueName, // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞处理
nil, // 其他额外的属性
)
if err != nil {
log.Println(err)
}
// 2.调用channel 发送消息到队列中
err = r.channel.Publish(
r.Exchange,
r.QueueName,
false, // 如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
if err != nil {
log.Println(err)
}
}

// ConsumeWork Work模式 消费者
func (r Rabbit) ConsumeWork() {
// 1.申请队列,如果队列不存在会自动创建,存在则跳过创建
queue, err := r.channel.QueueDeclare(
r.QueueName, // 队列名
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞处理
nil, // 额外的属性
)
if err != nil {
log.Println(err)
}
// 2.消费消息
msgs, err := r.channel.Consume(
queue.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否独有
false, // 设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // 队列是否阻塞
nil, // 额外的属性
)
if err != nil {
log.Println(err)
}

// 3.启用协程处理消息
forever := make(chan bool) // 开个channel阻塞住,让开启的协程能一直跑着
go func() {
for delivery := range msgs {
// 消息逻辑处理,可以自行设计逻辑
fmt.Println("Received a message:", string(delivery.Body))
}
}()
fmt.Println(" [*] Waiting for messages.")
<-forever
}

rabbitmq/work/publisher/下创建publish.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"mq/rabbitmq/work"
"strconv"
"time"
)

func main() {
rabbitMQWork := work.NewRabbitMQWork("test")
for i := 0; i < 100; i++ {
rabbitMQWork.PublishWork(strconv.Itoa(i) + " : hello,world!")
time.Sleep(time.Second * 2)
fmt.Println(i)
}
}

rabbitmq/work/consumer/下创建consume1.goconsume2.goconsume3.go。代码是一样的:

1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/work"

func main() {
rabbitMQWork := work.NewRabbitMQWork("test")
rabbitMQWork.ConsumeWork()
}

运行效果如下:

可以看到每个消息只会被消费一次。

Publish/Subscribe

Publish/Subscribe

简单来说就是1个生产者生产消息,通过路由投递给多个队列,一个消息能被多个消费者获取!

目录结构:

rabbitmq/pub-sub/下创建rabbit.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package pub_sub

import (
"fmt"
"log"

"github.com/streadway/amqp"
)

// 连接信息amqp://用户名:密码@ip/Virtual Hosts
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"

// Rabbit RabbitMQ结构体
type Rabbit struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind Key 名称
MqUrl string // 连接信息
}

// NewRabbitMQ 创建Rabbit结构体实例
func NewRabbitMQ(queueName, exchange, key string) *Rabbit {
return &Rabbit{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl: rmqURL,
}
}

// Destroy 断开channel和connection
func (r Rabbit) Destroy() error {
err := r.channel.Close()
err = r.conn.Close()
return err
}

// 错误处理函数
func (r Rabbit) failOnErr(err error, msg string) {
if err != nil {
log.Fatal(msg, err)
}
}

// NewRabbitMQPubSub 创建Publish/Subscribe模式下RabbitMQ实例
func NewRabbitMQPubSub(exchangeName string) *Rabbit {
rabbitMQ := NewRabbitMQ("", exchangeName, "")
var err error
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) // 获取connection
rabbitMQ.failOnErr(err, "failed to connect rabbitmq!")
rabbitMQ.channel, err = rabbitMQ.conn.Channel() // 获取channel
rabbitMQ.failOnErr(err, "failed to open a channel")
return rabbitMQ
}

// PublishPub Publish/Subscribe模式 生产者
func (r Rabbit) PublishPub(msg string) {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"fanout", // 交换机类型,这里使用fanout类型,即: 发布订阅模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.发送消息
err = r.channel.Publish(
r.Exchange,
"",
false, // 如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
if err != nil {
log.Println(err)
}
}

// ConsumeSub Publish/Subscribe模式 消费者
func (r Rabbit) ConsumeSub() {
// 1.试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"fanout", // 交换机类型,这里使用fanout类型,即: 发布订阅模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.试探性创建队列,这里注意队列名称不要写
queue, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否具有排他性
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare a queue")
// 3.绑定队列到exchange中
err = r.channel.QueueBind(
queue.Name, // 队列名
"", // 路由参数,fanout类型交换机,自动忽略路由参数(在pub/sub模式下,这里的key要为空)
r.Exchange, // 交换机名字,需要跟消息发送端定义的交换器保持一致
false, // 是否阻塞处理
nil, // 额外的属性
)
// 4.消费消息
msgs, err := r.channel.Consume(
queue.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否独有
false, // 设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // 队列是否阻塞
nil, // 额外的属性
)
r.failOnErr(err, "Failed to Consume")
// 5.启用协程处理消息
forever := make(chan bool) // 开个channel阻塞住,让开启的协程能一直跑着
go func() {
for delivery := range msgs {
// 消息逻辑处理,可以自行设计逻辑
fmt.Println("Received a message:", string(delivery.Body))
}
}()
fmt.Println(" [*] Waiting for messages.")
<-forever
}

rabbitmq/pub-sub/publishe/下创建publish.go代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import (
"fmt"
"mq/rabbitmq/pub-sub"
"strconv"
"time"
)

func main() {
rabbitMQPubSub := pub_sub.NewRabbitMQPubSub("exchange1")
for i := 0; i < 100; i++ {
rabbitMQPubSub.PublishPub("订阅模式生产的第" + strconv.Itoa(i) + "条数据")
fmt.Println("订阅模式生产第" + strconv.Itoa(i) + "条数据")
time.Sleep(time.Second * 1)
}
}

rabbitmq/pub-sub/subscribe/下创建subscribe1.gosubscribe2.gosubscribe3.go。代码是一样的:

1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/pub-sub"

func main() {
rabbitMQPubSub := pub_sub.NewRabbitMQPubSub("exchange1")
rabbitMQPubSub.ConsumeSub()
}

运行效果如下:

可以看到每个消息都会被消费者消费。

Routing

Routing

简单来说就是1个消息可以被多个消费者获取,并且消息的目标队列可以被生产者指定。

在上面的Publish/Subscribe模式中,消息的是被随机发放到队列的,而Routing模式则可以指定消息的目标队列。

目录结构:

rabbitmq/routing/下创建rabbit.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package routing

import (
"fmt"
"log"

"github.com/streadway/amqp"
)

// 连接信息amqp://用户名:密码@ip/Virtual Hosts
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"

// Rabbit RabbitMQ结构体
type Rabbit struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind Key 名称
MqUrl string // 连接信息
}

// NewRabbitMQ 创建Rabbit结构体实例
func NewRabbitMQ(queueName, exchange, key string) *Rabbit {
return &Rabbit{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl: rmqURL,
}
}

// Destroy 断开channel和connection
func (r Rabbit) Destroy() error {
err := r.channel.Close()
err = r.conn.Close()
return err
}

// 错误处理函数
func (r Rabbit) failOnErr(err error, msg string) {
if err != nil {
log.Fatal(msg, err)
}
}

// NewRabbitMQRouting 创建Routing模式下RabbitMQ实例
func NewRabbitMQRouting(exchangeName, routingKey string) *Rabbit {
rabbitMQ := NewRabbitMQ("", exchangeName, routingKey) // 创建RabbitMQ实例
var err error
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) // 获取connection
rabbitMQ.failOnErr(err, "failed to connect rabbitmq!")
rabbitMQ.channel, err = rabbitMQ.conn.Channel() // 获取channel
rabbitMQ.failOnErr(err, "failed to open a channel")
return rabbitMQ
}

// PublishRouting Routing模式 生产者
func (r Rabbit) PublishRouting(msg string) {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"direct", // 交换机类型,这里使用direct类型,即: Routing 路由模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, // Routing模式这里要指定key
false, // 如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
if err != nil {
log.Println(err)
}
}

// ConsumeRouting Routing模式 消费者
func (r Rabbit) ConsumeRouting() {
// 1.试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"direct", // 交换机类型,这里使用direct类型,即: Routing路由模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.试探性创建队列,这里注意队列名称不要写
queue, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否具有排他性
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare a queue")
// 3.绑定队列到exchange中
err = r.channel.QueueBind(
queue.Name, // 队列名
r.Key, // 路由参数,如果匹配消息发送的时候指定的路由参数,消息就投递到当前队列(在Routing模式下,这里的key要指定)
r.Exchange, // 交换机名字,需要跟消息发送端定义的交换器保持一致
false, // 是否阻塞处理
nil, // 额外的属性
)
// 4.消费消息
msgs, err := r.channel.Consume(
queue.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否独有
false, // 设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // 队列是否阻塞
nil, // 额外的属性
)
r.failOnErr(err, "Failed to Consume")
// 5.启用协程处理消息
forever := make(chan bool) // 开个channel阻塞住,让开启的协程能一直跑着
go func() {
for delivery := range msgs {
// 消息逻辑处理,可以自行设计逻辑
fmt.Println("Received a message:", string(delivery.Body))
}
}()
fmt.Println(" [*] Waiting for messages.")
<-forever
}

rabbitmq/routing/publisher/下创建publishe.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"mq/rabbitmq/routing"
"strconv"
"time"
)

func main() {
rabbitMQRouting1 := routing.NewRabbitMQRouting("exchange2", "qingbo1")
rabbitMQRouting2 := routing.NewRabbitMQRouting("exchange2", "qingbo2")
for i := 0; i < 100; i++ {
rabbitMQRouting1.PublishRouting(strconv.Itoa(i) + " : Hello qingbo1!")
rabbitMQRouting2.PublishRouting(strconv.Itoa(i) + " : Hello qingbo2!")
time.Sleep(time.Second * 1)
fmt.Println(i)
}
}

rabbitmq/routing/consumer/下创建consume1.goconsume2.go。这两者的区别在于:一个指定routeKey为qingbo1,一个为qingbo2。

1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/routing"

func main() {
rabbitMQRouting1 := routing.NewRabbitMQRouting("exchange2", "qingbo1")
rabbitMQRouting1.ConsumeRouting()
}
1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/routing"

func main() {
rabbitMQRouting2 := routing.NewRabbitMQRouting("exchange2", "qingbo2")
rabbitMQRouting2.ConsumeRouting()
}

运行效果如下:

可以看到消息被投放到指定的队列并被消费。这里当然可以被多个消费者消费,以qingbo1为例,只要再创建一个消费者指定routeKey为qingbo1去消费即可。

Topics

Topics

Topics模式和Routing模式类似。只不过Routing模式是明确指定消息的目标队列;而Topics模式是模糊匹配消息的目标队列。(消息的目标队列可用BindingKey以通配符的方式指定,其中*用于匹配1个单词,#用于匹配n个单词(可以是0个))

目录结构:

rabbitmq/topics/下创建rabbit.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package topics

import (
"fmt"
"log"

"github.com/streadway/amqp"
)

// 连接信息amqp://用户名:密码@ip/Virtual Hosts
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"

// Rabbit RabbitMQ结构体
type Rabbit struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机名称
Key string // bind Key 名称
MqUrl string // 连接信息
}

// NewRabbitMQ 创建Rabbit结构体实例
func NewRabbitMQ(queueName, exchange, key string) *Rabbit {
return &Rabbit{
QueueName: queueName,
Exchange: exchange,
Key: key,
MqUrl: rmqURL,
}
}

// Destroy 断开channel和connection
func (r Rabbit) Destroy() error {
err := r.channel.Close()
err = r.conn.Close()
return err
}

// 错误处理函数
func (r Rabbit) failOnErr(err error, msg string) {
if err != nil {
log.Fatal(msg, err)
}
}

// NewRabbitMQTopics 创建Topics模式下RabbitMQ实例
func NewRabbitMQTopics(exchangeName, routingKey string) *Rabbit {
rabbitMQ := NewRabbitMQ("", exchangeName, routingKey) // 创建RabbitMQ实例
var err error
rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) // 获取connection
rabbitMQ.failOnErr(err, "failed to connect rabbitmq!")
rabbitMQ.channel, err = rabbitMQ.conn.Channel() // 获取channel
rabbitMQ.failOnErr(err, "failed to open a channel")
return rabbitMQ
}

// PublishTopics Topics模式 生产者
func (r Rabbit) PublishTopics(msg string) {
// 1.尝试创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"topic", // 交换机类型,这里使用topic类型,即: Topics模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, // Topics模式这里要指定key
false, // 如果为true,根据自身exchange类型和routekey规则无法找到符合条件的队列会把消息返还给发送者
false, // 如果为true,当exchange发送消息到队列后发现队列上没有消费者,则会把消息返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
},
)
if err != nil {
log.Println(err)
}
}

// ConsumeTopics Topics模式 消费者
func (r Rabbit) ConsumeTopics() {
// 1.试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名字
"topic", // 交换机类型,这里使用topic类型,即: Topics模式
true, // 是否持久化
false, // 是否自动删除
false, // true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2.试探性创建队列,这里注意队列名称不要写
queue, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否具有排他性
false, // 是否阻塞处理
nil, // 额外的属性
)
r.failOnErr(err, "Failed to declare a queue")
// 3.绑定队列到exchange中
err = r.channel.QueueBind(
queue.Name, // 队列名
r.Key, // 路由参数,如果匹配消息发送的时候指定的路由参数,消息就投递到当前队列(在Topics模式下,这里的key要指定)
r.Exchange, // 交换机名字,需要跟消息发送端定义的交换器保持一致
false, // 是否阻塞处理
nil, // 额外的属性
)
// 4.消费消息
msgs, err := r.channel.Consume(
queue.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否独有
false, // 设置为true,表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者
false, // 队列是否阻塞
nil, // 额外的属性
)
r.failOnErr(err, "Failed to Consume")
// 5.启用协程处理消息
forever := make(chan bool) // 开个channel阻塞住,让开启的协程能一直跑着
go func() {
for delivery := range msgs {
// 消息逻辑处理,可以自行设计逻辑
fmt.Println("Received a message:", string(delivery.Body))
}
}()
fmt.Println(" [*] Waiting for messages.")
<-forever
}

rabbitmq/topics/publisher/下创建publish.go,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"mq/rabbitmq/topics"
"strconv"
"time"
)

func main() {
rabbitMQTopics1 := topics.NewRabbitMQTopics("exchangeTopics", "qingbo.1011.top")
rabbitMQTopics2 := topics.NewRabbitMQTopics("exchangeTopics", "qingbo.xyz.top")
for i := 0; i < 100; i++ {
rabbitMQTopics1.PublishTopics(strconv.Itoa(i) + "rabbitMQTopics1生产的消息")
rabbitMQTopics2.PublishTopics(strconv.Itoa(i) + "rabbitMQTopics2生产的消息")
time.Sleep(time.Second * 1)
fmt.Println(i)
}
}

rabbitmq/topics/consumer/下创建consume1.goconsume2.go。同样的这两者只有routeKey有区别。

要注意匹配规则。其中*用于匹配一个单词,#用于匹配N个单词(可以是0个)。

例如qingbo.*就可以匹配qingbo.123qingbo.hello等等;而qingbo.#则能匹配qingbo.123.456.789等等。

1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/topics"

func main() {
rabbitMQTopics := topics.NewRabbitMQTopics("exchangeTopics", "#")
rabbitMQTopics.ConsumeTopics()
}
1
2
3
4
5
6
7
8
package main

import "mq/rabbitmq/topics"

func main() {
rabbitMQTopics := topics.NewRabbitMQTopics("exchangeTopics", "*.1011.*")
rabbitMQTopics.ConsumeTopics()
}

在这个例子中,#可以匹配到qingbo.1011.topqingbo.xyz.top*.1011.*能匹配到qingbo.1011.top

运行效果如下:

可以看到运行效果和我们预料的匹配结果是一致的。