Go操作RabbitMQ
下面我们根据RabbitMQ的六种工作模式,依次学习Go操作RabbitMQ。
使用amqp库:github.com/streadway/amqp
函数说明
源码是最好的说明,不懂也可以多看源码。
QueueDeclare()
创建队列的函数QueueDeclare()
,函数签名:
1
| func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
|
name
:队列名
durable
:是否是持久队列,如果是,那么重启MQ之后,该队列不会被删除,否则,该队列会被删除
autoDelete
:是否自动删除。如果为true,当没有Consumer时,会被自动删除掉
exclusive
:是否独占。如果为true,只能有一个消费者监听这队列
noWait
:如果为True,那么其他连接尝试修改该队列,将会触发异常
args
:额外参数
args的Table类型:type Table map[string]interface{}
Publish()
Channel的发送消息的函数Publish()
,函数签名:
1
| func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
|
exchange
:交换机名称。Simple模式下交换机会使用默认的 ""
key
:路由名称。
mandatory
:如果为 true,那么当没有合适的 RoutingKey 时,将会触发Channel的NotifyReturn(把消息返还给发送者)
immediate
:如果为 true,那么当没有合适的消费着时,将会触发Channel的NotifyReturn(把消息返还给发送者)
msg
:具体要发送消息的结构体
Consume()
Chanel消费函数Consume()
,函数签名:
1
| func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
|
queue
:队列名称
consumer
:路由名称
autoAck
:是否自动确认
exclusive
:是否独占
noLocal
:未使用的参数
noWait
:如果为True,那么其他连接尝试修改该队列,将会触发异常
args
:额外参数
ExchangeDeclare()
创建交换机ExchangeDeclare(),函数签名:
1
| func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
|
主要说说kind
这个参数,有fanout、direct、topic三种:
- fanout:fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中(在Publish/Subscribe模式中使用)
- direct:direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中(在Routing模式中使用)
- topic:规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送(在Topics模式中使用)。它的约定是:
- routing key是一个由
.
分隔的字符串(我们将被.
分隔开的每一段独立的字符串称为一个单词),如stock.usd.nyse
、nyse.vmw
、quick.orange.rabbit
。
- binding key中可以存在两种特殊字符
*
与#
,用于做模糊匹配,其中*
用于匹配1个单词,#
用于匹配n个单词(可以是0个),binding key与routing key一样也是由.
分隔的字符串。
Simple
simple模式
简单来说就是1个生产者生产的消息由1个消费者消费掉。
我们实现生产者向 RabbitMQ 中写入一条消息,然后消费者获取该消息这么一个简单的功能,具体流程图如下:
在web管理页面上新建一个qingbo用户以及新建一个Virtual Hosts参考:Simple模式
目录结构:
在rabbitmq/simple/
下创建rabbitmq.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package simple
import ( "fmt" "log"
"github.com/streadway/amqp" )
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"
type Rabbit struct { conn *amqp.Connection channel *amqp.Channel QueueName string Exchange string Key string MqUrl string }
func NewRabbitMQ(queueName, exchange, key string) *Rabbit { return &Rabbit{ QueueName: queueName, Exchange: exchange, Key: key, MqUrl: rmqURL, } }
func (r Rabbit) Destroy() error { err := r.channel.Close() err = r.conn.Close() return err }
func (r Rabbit) failOnErr(err error, msg string) { if err != nil { log.Fatal(msg, err) } }
func NewRabbitMQSimple(queueName string) *Rabbit { rabbitMQ := NewRabbitMQ(queueName, "", "") var err error rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) rabbitMQ.failOnErr(err, "failed to connect RabbitMQ") rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.failOnErr(err, "failed to open a channel") return rabbitMQ }
func (r Rabbit) PublishSimple(msg string) { _, err := r.channel.QueueDeclare( r.QueueName, false, false, false, false, nil, ) if err != nil { log.Println(err) } err = r.channel.Publish( r.Exchange, r.QueueName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) if err != nil { log.Println(err) } }
func (r Rabbit) ConsumeSimple() { queue, err := r.channel.QueueDeclare( r.QueueName, false, false, false, false, nil, ) if err != nil { log.Println(err) } msgs, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) if err != nil { log.Println(err) }
forever := make(chan bool) go func() { for delivery := range msgs { fmt.Println("Received a message:", string(delivery.Body)) } }() fmt.Println(" [*] Waiting for messages.") <-forever }
|
在rabbitmq/simple/publisher/
下创建main.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12
| package main
import ( "fmt" "mq/rabbitmq/simple" )
func main() { rabbitMQSimple := simple.NewRabbitMQSimple("test1") rabbitMQSimple.PublishSimple("hello,world!") fmt.Println("发送成功!") }
|
在rabbitmq/simple/consumer/
下创建main.go
,代码如下:
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/simple"
func main() { rabbitMQSimple := simple.NewRabbitMQSimple("test1") rabbitMQSimple.ConsumeSimple() }
|
运行生产者下的main和消费者下的main:
在RabbitMQ的Web管理界面中也可以看到:
Work queues
Work queues
简单来说就是1个生产者生产消息由多个消费者消费,其中每个消息只会被1个消费者消费!
Work模式和Simple其实是很好理解的两种模式。Work和Simple相比无非就是Work模式有多个消费者。这样可以起到负载均衡的效果。
所以在代码层面Work和Simple是完全一样的,只不过Work有多个消费者罢了。
目录结构:
在rabbitmq/work/
创建rabbit.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package work
import ( "fmt" "log"
"github.com/streadway/amqp" )
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"
type Rabbit struct { conn *amqp.Connection channel *amqp.Channel QueueName string Exchange string Key string MqUrl string }
func NewRabbitMQ(queueName, exchange, key string) *Rabbit { return &Rabbit{ QueueName: queueName, Exchange: exchange, Key: key, MqUrl: rmqURL, } }
func (r Rabbit) Destroy() error { err := r.channel.Close() err = r.conn.Close() return err }
func (r Rabbit) failOnErr(err error, msg string) { if err != nil { log.Fatal(msg, err) } }
func NewRabbitMQWork(queueName string) *Rabbit { rabbitMQ := NewRabbitMQ(queueName, "", "") var err error rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) rabbitMQ.failOnErr(err, "failed to connect RabbitMQ") rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.failOnErr(err, "failed to open a channel") return rabbitMQ }
func (r Rabbit) PublishWork(msg string) { _, err := r.channel.QueueDeclare( r.QueueName, false, false, false, false, nil, ) if err != nil { log.Println(err) } err = r.channel.Publish( r.Exchange, r.QueueName, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) if err != nil { log.Println(err) } }
func (r Rabbit) ConsumeWork() { queue, err := r.channel.QueueDeclare( r.QueueName, false, false, false, false, nil, ) if err != nil { log.Println(err) } msgs, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) if err != nil { log.Println(err) }
forever := make(chan bool) go func() { for delivery := range msgs { fmt.Println("Received a message:", string(delivery.Body)) } }() fmt.Println(" [*] Waiting for messages.") <-forever }
|
在rabbitmq/work/publisher/
下创建publish.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package main
import ( "fmt" "mq/rabbitmq/work" "strconv" "time" )
func main() { rabbitMQWork := work.NewRabbitMQWork("test") for i := 0; i < 100; i++ { rabbitMQWork.PublishWork(strconv.Itoa(i) + " : hello,world!") time.Sleep(time.Second * 2) fmt.Println(i) } }
|
在rabbitmq/work/consumer/
下创建consume1.go
、consume2.go
、consume3.go
。代码是一样的:
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/work"
func main() { rabbitMQWork := work.NewRabbitMQWork("test") rabbitMQWork.ConsumeWork() }
|
运行效果如下:
可以看到每个消息只会被消费一次。
Publish/Subscribe
Publish/Subscribe
简单来说就是1个生产者生产消息,通过路由投递给多个队列,一个消息能被多个消费者获取!
目录结构:
在rabbitmq/pub-sub/
下创建rabbit.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package pub_sub
import ( "fmt" "log"
"github.com/streadway/amqp" )
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"
type Rabbit struct { conn *amqp.Connection channel *amqp.Channel QueueName string Exchange string Key string MqUrl string }
func NewRabbitMQ(queueName, exchange, key string) *Rabbit { return &Rabbit{ QueueName: queueName, Exchange: exchange, Key: key, MqUrl: rmqURL, } }
func (r Rabbit) Destroy() error { err := r.channel.Close() err = r.conn.Close() return err }
func (r Rabbit) failOnErr(err error, msg string) { if err != nil { log.Fatal(msg, err) } }
func NewRabbitMQPubSub(exchangeName string) *Rabbit { rabbitMQ := NewRabbitMQ("", exchangeName, "") var err error rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) rabbitMQ.failOnErr(err, "failed to connect rabbitmq!") rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.failOnErr(err, "failed to open a channel") return rabbitMQ }
func (r Rabbit) PublishPub(msg string) { err := r.channel.ExchangeDeclare( r.Exchange, "fanout", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") err = r.channel.Publish( r.Exchange, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) if err != nil { log.Println(err) } }
func (r Rabbit) ConsumeSub() { err := r.channel.ExchangeDeclare( r.Exchange, "fanout", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") queue, err := r.channel.QueueDeclare( "", false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") err = r.channel.QueueBind( queue.Name, "", r.Exchange, false, nil, ) msgs, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) r.failOnErr(err, "Failed to Consume") forever := make(chan bool) go func() { for delivery := range msgs { fmt.Println("Received a message:", string(delivery.Body)) } }() fmt.Println(" [*] Waiting for messages.") <-forever }
|
在rabbitmq/pub-sub/publishe/
下创建publish.go
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package main
import ( "fmt" "mq/rabbitmq/pub-sub" "strconv" "time" )
func main() { rabbitMQPubSub := pub_sub.NewRabbitMQPubSub("exchange1") for i := 0; i < 100; i++ { rabbitMQPubSub.PublishPub("订阅模式生产的第" + strconv.Itoa(i) + "条数据") fmt.Println("订阅模式生产第" + strconv.Itoa(i) + "条数据") time.Sleep(time.Second * 1) } }
|
在rabbitmq/pub-sub/subscribe/
下创建subscribe1.go
、subscribe2.go
、subscribe3.go
。代码是一样的:
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/pub-sub"
func main() { rabbitMQPubSub := pub_sub.NewRabbitMQPubSub("exchange1") rabbitMQPubSub.ConsumeSub() }
|
运行效果如下:
可以看到每个消息都会被消费者消费。
Routing
Routing
简单来说就是1个消息可以被多个消费者获取,并且消息的目标队列可以被生产者指定。
在上面的Publish/Subscribe模式中,消息的是被随机发放到队列的,而Routing模式则可以指定消息的目标队列。
目录结构:
在rabbitmq/routing/
下创建rabbit.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package routing
import ( "fmt" "log"
"github.com/streadway/amqp" )
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"
type Rabbit struct { conn *amqp.Connection channel *amqp.Channel QueueName string Exchange string Key string MqUrl string }
func NewRabbitMQ(queueName, exchange, key string) *Rabbit { return &Rabbit{ QueueName: queueName, Exchange: exchange, Key: key, MqUrl: rmqURL, } }
func (r Rabbit) Destroy() error { err := r.channel.Close() err = r.conn.Close() return err }
func (r Rabbit) failOnErr(err error, msg string) { if err != nil { log.Fatal(msg, err) } }
func NewRabbitMQRouting(exchangeName, routingKey string) *Rabbit { rabbitMQ := NewRabbitMQ("", exchangeName, routingKey) var err error rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) rabbitMQ.failOnErr(err, "failed to connect rabbitmq!") rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.failOnErr(err, "failed to open a channel") return rabbitMQ }
func (r Rabbit) PublishRouting(msg string) { err := r.channel.ExchangeDeclare( r.Exchange, "direct", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) if err != nil { log.Println(err) } }
func (r Rabbit) ConsumeRouting() { err := r.channel.ExchangeDeclare( r.Exchange, "direct", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") queue, err := r.channel.QueueDeclare( "", false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") err = r.channel.QueueBind( queue.Name, r.Key, r.Exchange, false, nil, ) msgs, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) r.failOnErr(err, "Failed to Consume") forever := make(chan bool) go func() { for delivery := range msgs { fmt.Println("Received a message:", string(delivery.Body)) } }() fmt.Println(" [*] Waiting for messages.") <-forever }
|
在rabbitmq/routing/publisher/
下创建publishe.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package main
import ( "fmt" "mq/rabbitmq/routing" "strconv" "time" )
func main() { rabbitMQRouting1 := routing.NewRabbitMQRouting("exchange2", "qingbo1") rabbitMQRouting2 := routing.NewRabbitMQRouting("exchange2", "qingbo2") for i := 0; i < 100; i++ { rabbitMQRouting1.PublishRouting(strconv.Itoa(i) + " : Hello qingbo1!") rabbitMQRouting2.PublishRouting(strconv.Itoa(i) + " : Hello qingbo2!") time.Sleep(time.Second * 1) fmt.Println(i) } }
|
在rabbitmq/routing/consumer/
下创建consume1.go
、consume2.go
。这两者的区别在于:一个指定routeKey为qingbo1,一个为qingbo2。
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/routing"
func main() { rabbitMQRouting1 := routing.NewRabbitMQRouting("exchange2", "qingbo1") rabbitMQRouting1.ConsumeRouting() }
|
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/routing"
func main() { rabbitMQRouting2 := routing.NewRabbitMQRouting("exchange2", "qingbo2") rabbitMQRouting2.ConsumeRouting() }
|
运行效果如下:
可以看到消息被投放到指定的队列并被消费。这里当然可以被多个消费者消费,以qingbo1为例,只要再创建一个消费者指定routeKey为qingbo1去消费即可。
Topics
Topics
Topics模式和Routing模式类似。只不过Routing模式是明确指定消息的目标队列;而Topics模式是模糊匹配消息的目标队列。(消息的目标队列可用BindingKey以通配符的方式指定,其中*
用于匹配1个单词,#
用于匹配n个单词(可以是0个))
目录结构:
在rabbitmq/topics/
下创建rabbit.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package topics
import ( "fmt" "log"
"github.com/streadway/amqp" )
const rmqURL = "amqp://qingbo:qingbo@43.138.57.192:5672/qingbo"
type Rabbit struct { conn *amqp.Connection channel *amqp.Channel QueueName string Exchange string Key string MqUrl string }
func NewRabbitMQ(queueName, exchange, key string) *Rabbit { return &Rabbit{ QueueName: queueName, Exchange: exchange, Key: key, MqUrl: rmqURL, } }
func (r Rabbit) Destroy() error { err := r.channel.Close() err = r.conn.Close() return err }
func (r Rabbit) failOnErr(err error, msg string) { if err != nil { log.Fatal(msg, err) } }
func NewRabbitMQTopics(exchangeName, routingKey string) *Rabbit { rabbitMQ := NewRabbitMQ("", exchangeName, routingKey) var err error rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MqUrl) rabbitMQ.failOnErr(err, "failed to connect rabbitmq!") rabbitMQ.channel, err = rabbitMQ.conn.Channel() rabbitMQ.failOnErr(err, "failed to open a channel") return rabbitMQ }
func (r Rabbit) PublishTopics(msg string) { err := r.channel.ExchangeDeclare( r.Exchange, "topic", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") err = r.channel.Publish( r.Exchange, r.Key, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }, ) if err != nil { log.Println(err) } }
func (r Rabbit) ConsumeTopics() { err := r.channel.ExchangeDeclare( r.Exchange, "topic", true, false, false, false, nil, ) r.failOnErr(err, "Failed to declare an exchange") queue, err := r.channel.QueueDeclare( "", false, false, true, false, nil, ) r.failOnErr(err, "Failed to declare a queue") err = r.channel.QueueBind( queue.Name, r.Key, r.Exchange, false, nil, ) msgs, err := r.channel.Consume( queue.Name, "", true, false, false, false, nil, ) r.failOnErr(err, "Failed to Consume") forever := make(chan bool) go func() { for delivery := range msgs { fmt.Println("Received a message:", string(delivery.Body)) } }() fmt.Println(" [*] Waiting for messages.") <-forever }
|
在rabbitmq/topics/publisher/
下创建publish.go
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| package main
import ( "fmt" "mq/rabbitmq/topics" "strconv" "time" )
func main() { rabbitMQTopics1 := topics.NewRabbitMQTopics("exchangeTopics", "qingbo.1011.top") rabbitMQTopics2 := topics.NewRabbitMQTopics("exchangeTopics", "qingbo.xyz.top") for i := 0; i < 100; i++ { rabbitMQTopics1.PublishTopics(strconv.Itoa(i) + "rabbitMQTopics1生产的消息") rabbitMQTopics2.PublishTopics(strconv.Itoa(i) + "rabbitMQTopics2生产的消息") time.Sleep(time.Second * 1) fmt.Println(i) } }
|
在rabbitmq/topics/consumer/
下创建consume1.go
、consume2.go
。同样的这两者只有routeKey有区别。
要注意匹配规则。其中*
用于匹配一个单词,#
用于匹配N个单词(可以是0个)。
例如qingbo.*
就可以匹配qingbo.123
、qingbo.hello
等等;而qingbo.#
则能匹配qingbo.123.456.789
等等。
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/topics"
func main() { rabbitMQTopics := topics.NewRabbitMQTopics("exchangeTopics", "#") rabbitMQTopics.ConsumeTopics() }
|
1 2 3 4 5 6 7 8
| package main
import "mq/rabbitmq/topics"
func main() { rabbitMQTopics := topics.NewRabbitMQTopics("exchangeTopics", "*.1011.*") rabbitMQTopics.ConsumeTopics() }
|
在这个例子中,#
可以匹配到qingbo.1011.top
和qingbo.xyz.top
;*.1011.*
能匹配到qingbo.1011.top
。
运行效果如下:
可以看到运行效果和我们预料的匹配结果是一致的。