RabbitMQ – Wasting_Misaka.Blog https://forelink.top Hso! Sat, 14 Sep 2024 16:58:39 +0000 zh-Hans hourly 1 https://wordpress.org/?v=6.7.1 春靴+兔 TopicMode https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/ https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/#respond Fri, 13 Sep 2024 15:47:35 +0000 https://forelink.top/?p=570 简介
image.png

使用SpringBoot + RabbitMQ 实现需求: 通过 在HTML页面中填写表单并提交 发起 HTTP 请求来调用生产者方法发送消息 格式如 test.orange.test 并将Message输出打印到控制台上。

Topic 程序

配置类

在 spring框架的amcq中,提供了 队列,交换机和绑定关系的构建方法(Builder) 在使用 SpringBoot 提供 RabbitMQ 服务时,活用构建器可以简化开发流程。

Spring框架的amqp.core包提供了如下构建器: QueueBuilder ExchangeBuilder BindindBuilder 以点分隔接收参数,队列与交换机以 build() 结尾 绑定关系Binding以 noargs() 或者 and() 结尾

在构建过程中可以查看后续方法的类型。 image.png

import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
/**  
 * */  
@Configuration  
public class TopicConfig {  
    // 1.交换机 标识使用@Qualify注解调用这个bean  
    @Bean("topicExchange")  
    public Exchange topicExchange(){  
        // 创建一个 topic类型 的交换机  
        return ExchangeBuilder.topicExchange("topicExchange").durable(false).build();  
    }  
    // 2.队列  
    @Bean("topicQueue1")  
    public Queue topicQueue1(){  
        return QueueBuilder.durable("topicQueue1").build();  
    }  
    @Bean("topicQueue2")  
    public Queue topicQueue2(){  
        return QueueBuilder.durable("topicQueue2").build();  
    }  
    // 3.绑定Topic关系  
    @Bean  
    public Binding Binding1(@Qualifier("topicQueue1") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("*.orange.*").noargs();  
    }  
  
    @Bean  
    public Binding binding2(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit").noargs();  
    }  
  
    @Bean  
    public Binding binding3(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("lazy.#").noargs();  
    }  
  
}

HTML 页面

准备一个 HTML 页面,包含routingKey Message 的表单。

<!DOCTYPE html>  
<html lang="en">  
<head>  
    <meta charset="UTF-8">  
    <title>Topic Test</title>  
</head>  
<body>  
    <form action="http://localhost:8080/topic" method="get" >  
        routing key: <input type="text" name="routingKey">  
        message : <input type="text" name="message"></input>  
        <input type="submit" value="提交">  
    </form></body>  
</html>
<!--
    form 表单
        action发送表单中的数据到action的url里,
               url为空时发送到当前页面
        method
            get 有url长度限制
            post发送到f12中网络请求中,无网络限制
    input
        type
            text 文本类型,可操作
            submit 按钮类型,可点击
        name
            是必须的属性
        value
            框框中的值
-->

在浏览器中打开的效果如下

image.png

生产者

新建生产者类 ropicProducer 先创建一个Spring框架中的 RabbitTemplate 实现类 该类已装载在 Bean 容器中,设置AutoWired自动获取该对象

然后提供一个 sendMessage 方法,包含 routingKeymessage 形参,方便外部传参调用。

import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  
  
@Component  
public class TopicProducer {  
    // 创建一个rabbitTemplate对象  
    private RabbitTemplate rabbitTemplate;  
    private String exchangeName = "topicExchange";  
  
    @Autowired  
    public TopicProducer(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
    // sendMessage 方法  
    public void sendMessage(String routingKey,String message) {  
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);  
    }  
}

控制器

新建控制器 TopicController 用于响应HTTP请求,并执行生产者发送消息的 sendMessage 方法 创建一个生产者对象 topicProducer 用以调用发送消息的方法

并提供一个 GetMapping 方法, 在接收到HTTP页面发来的请求后, 将表单中包含的 routingKeyMessage 传给 sendMessage() 方法

import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
public class TopicController {  
  
    private final TopicProducer topicProducer;  
  
    public TopicController(TopicProducer topicProducer) {  
        this.topicProducer = topicProducer;  
    }  

    @GetMapping("/topic")  
    public String sendMessage(String routingKey,String message){  
        topicProducer.sendMessage(routingKey,message);  
        return "routingKey: "+routingKey+"\nmessage: "+message;  
    }  
}

测试效果

将主程序运行起来, 打开 html 页面, 通过表单提交三条测试信息和一条无关信息.

image.png

在控制台中可以大致确认信息达到了队列

image.png

消费者

新建两个消费者类, 分别用以处理两个队列的消息 并通过@RabbitListener 注解监听队列, 当取到队列的信息时, 会将参数传入注解下的方法并自动调用

import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class TopicConsumer1 {  
    private final String queueName = "topicQueue1";  
    @RabbitListener(queues = queueName)  
    public void recv(String message) {  
        System.out.println("C1 receive message : " + message);  
    }  
}

消费者2中的 QueueName 改为 topicQueue2 控制台输出信息中的 C1 改为 C2 加以区分

运行

运行代码,拿到并处理了队列中积压的信息 image.png

打开 html 页面再测试几组数据,返回了正确的结果 image.png

]]>
https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/feed/ 0
春靴+兔 入门程序 https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/ https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/#respond Fri, 13 Sep 2024 15:46:27 +0000 https://forelink.top/?p=568 简介:

将RabbitMQ 整合到 SpringBoot中。 并实现一个入门程序。

入门程序

导入依赖

在IDEA 创建SpringBoot工程前勾选以下依赖 image.png

修改配置文件

修改 application.properties 或 applications.yml 文件, 加入RabbitMQ 的配置信息

image.png

1. 生产者(发送消息)

创建一个 MessageProducer 类,用于发送消息

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.stereotype.Component;  
  
/**  
 * 生产者类  
 */  
@Component  
public class MessageProducer {  
    private final RabbitTemplate rabbitTemplate;  
  
    @Autowired  
    public MessageProducer(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
    public void sendMessage(String message) {  
        System.out.println("Sending message: " + message);  
        rabbitTemplate.convertAndSend("helloQueue", message);  
    }  
}

2. 配置队列

创建一个 RabbitMQConfig 类,用于配置队列、交换器和绑定等。

package com.wastingmisaka.springbootrabbitmq;  

import org.springframework.amqp.core.Queue;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
/**  
 * 配置类  
 * @Congiuration 告知Spring这是一个配置类
 */  
@Configuration  
public class RabbitMQConfig {  
    @Bean  
    public Queue helloQueue() {  
        return new Queue("helloQueue", false);  
        // 创建一个 helloQueue 的队列,且不需要持久化。
    }  
}

此处仅配置了队列。

3. 消费者(接收消息)

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
/**  
 * 消费者类  
 */  
  
@Component  
public class MessageConsumer {  
    @RabbitListener(queues = "helloQueue")  
    public void receiveMessage(String message) {  
        System.out.println("Received message: " + message);  
    }  
}

@RabbitListener 注解用来监听指定队列的消息,当有消息到达时,携带信息参数自动触发方法执行。

4. 控制器(发起请求)

创建一个用于发起HTTP请求的控制器

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RequestParam;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
public class MessageController {  
  
    private final MessageProducer messageProducer;  
  
    public MessageController(MessageProducer messageProducer) {  
        this.messageProducer = messageProducer;  
    }  
  
    @GetMapping("/test")  
    public String test(){  
        String msg = "hello,world";  
        messageProducer.sendMessage(msg);  
        return "Message sent: "+msg;  
    }  
  
  
    @GetMapping("/send")  
    public String sendMessage(@RequestParam String message) {  
        messageProducer.sendMessage(message);  
        return "Message sent: "+message;  
    }  
}

其中 /test 请求路径包含了一个测试方法,向RabbitMQ发送 hello,world 消息。 /send 请求路径可以添加参数。

测试结果:

image.png

控制台对应的方法被调用。 且RabbitMQ管理台中存在helloQueue 队列

image.png

页面也返回了正确的信息

image.png

]]>
https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/feed/ 0
RabbitMQ 6种工作模式 https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/ https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/#respond Fri, 13 Sep 2024 15:45:11 +0000 https://forelink.top/?p=566 简介:主要介绍了兔的4种工作模式,先前的入门程序已经介绍了简单模式的运作模式,还有一种RPC模式不在这里介绍和演示。

工作模式

先前介绍过,兔有6种不同的工作模式, 除去先前介绍的简单模式外,还有其他5种工作模式。

Working Queue 工作队列

image.png

与入门程序的简单模式相比,工作队列方式是简单模式的拓展,多个消费端共同处理一个队列中的信息。 RabbitMQ 通过轮询等方式,将消息分发给每个消费者

改造生产者代码,让其发送多条消息到工作队列中 image.png

并新建一个消费者,运行代码 image.png

一个队列中多个消费者从同一个队列中竞争消息 在多个消费者之间进行任务分配,适用于在工作负载较重的场景

Pub/Sub 订阅

image.png

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化 交换机一方面接收生产者发送的消息。另一方面,能够根据自己的类型处理消息 交换机有 3种 常见类型:

Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定 routing key 的队列 Topic :通配符,把消息交给符合 routing pattern 的队列

交换机只符合转发消息,不具备存储消息的能力!

需要使用 Channel 类中二的方法 创建一个交换机。

// 创建交换机
channel.exchangeDeclare("test_fanout",
                        BuiltinExchangeType.FANOUT,
                        true,false,false,null);
image.png

Exchange:交换机名称 Type:交换机类型 durable:是否持久化 autoDelete:是否自动删除 internal:是否内部使用 arguments:参数列表

创建队列

//创建队列  
channel.queueDeclare("test_fanout_queue1",true,false,false,null);  
channel.queueDeclare("test_fanout_queue2",true,false,false,null);

绑定交换机

//绑定交换机  
channel.queueBind("test_fanout_queue1","test_fanout","",null);  
channel.queueBind("test_fanout_queue2","test_fanout","",null);

Channel类中绑定的方法 image.png

运行代码,可以看到创建的交换机,并绑定成功了两个队列。

image.png

image.png

并可以看到 fanout 模式下,所有queue都会收到这个信息 适用于消息需要广播给多个消费者的场景,如系统日志、新闻分发等。

Routing Mode 路由模式

image.png

生产者将消息发送给 交换机 (Exchange),并附带 路由键 (RoutingKey) 交换机根据路由键,将消息发送到与路由键匹配的队列。

使用 定向类型(direct)的交换机,允许生产者通过路由键有选择性的发送消息。 消息会根据路由键,发送到指定的一个或多个队列。

绑定队列时,要在 routingKey 中加入指定的路由键,交换机Direct接收到消息后,会根据路由键将消息发送到指定的队列中去

channel.queueBind("test_direct_queue1","test_direct","orange",null);  
channel.queueBind("test_direct_queue2","test_direct","black",null);  
channel.queueBind("test_direct_queue2","test_direct","green",null);

运行代码,在管理台交换机面板查看绑定结果

image.png

说明绑定成功。 此时创建并运行消费者代码,可以收到队列中的消息

image.png

队列与交换机的绑定,需要指定一个 RoutingKey 消息向交换机发送消息时,也必须要指定消息的 RoutingKey 交换机不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列绑定的 RoutingKey 与消息的 RoutingKey 完全一致,才能拿到消息

适用于不同类型的消费者订阅不同类型的消息的场景,如不同服务接收不同的日志类型。

Topic Mode 通配符模式

image.png

生产者将消息发送给 交换机,并附带 主题(Topic) 交换机会根据主题模式,将消息发送到符合条件的队列。

使用 topic 类型的 交换机,允许生产者通过复杂的模式匹配(类似通配符的方式)发送消息 队列可以根据绑定的主题模式接收响应的消息。 路由键通常是点分隔的词汇,比如 log.info 或 log.error , 可以使用 *# 来匹配部分路由键。

绑定队列时,使用通配符来匹配路由键

channel.queueBind("test_topic_queue1","test_topic","*.orange.*",null);  
channel.queueBind("test_topic_queue2","test_topic","*.*.rabbit",null);  
channel.queueBind("test_topic_queue2","test_topic","lazy.#",null);

运行代码,查看管理台中的交换机绑定信息 image.png

已经绑定了队列,并且指定了topic

模拟发送信息

String msg1 = "q1";  
String msg2 = "q2";  
String msg3 = "q2 but msg3";  
channel.basicPublish("test_topic","eat.orange.apple",null,msg1.getBytes());  
channel.basicPublish("test_topic","jumpy.jumpy.rabbit",null,msg2.getBytes());  
channel.basicPublish("test_topic","lazy.me",null,msg3.getBytes());

检查管理台队列收到信息 image.png

创建消费者并运行,收到消息。 image.png

适用于消息分类更加灵活、多样化的场景,比如日志分发系统,处理不同优先级、不同级别的日志。

RPC 模式

Remote Procedure Call Mode 生产者发送消息(请求)到队列,并等待消费者处理后返回结果(响应) 消费者从队列中接收请求,处理后将结果发送回生产者。

  • 用于模拟远程过程调用(Remote Procedure Call, RPC)的模式,生产者等待一个结果。
  • 使用专门的回复队列来接收结果。

适用于需要请求-响应模型的场景,如远程服务调用、异步任务处理等。

]]>
https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/feed/ 0
RabbitMQ+Java入门 https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/ https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/#respond Thu, 12 Sep 2024 16:58:55 +0000 https://forelink.top/?p=558 简介:

使用简单模式完成消息传递,作为RabbitMQ的入门程序。

步骤

创建工程 (生产者、消费者)

在项目中创建两个Maven模块(Modules) image.png

添加依赖

<!--  RabbitMQ Java 客户端  -->  
<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.6.0</version>  
</dependency>

两个模块都需要在 pom.xml 中添加该依赖

模拟发送消息代码

1、建立连接:

ConnectionFactory factory = new ConnectionFactory();

2、设置参数:

//2.设置参数  
factory.setHost("192.168.8.202");// ip 默认值localhost  
factory.setPort(5672);// 端口,默认值5672  
factory.setVirtualHost("vhost1");// 虚拟机,默认值/  
factory.setUsername("hso");// 用户名,默认guest
factory.setPassword("123");// 密码,默认 guest

3、建立连接 Connection

//3.创建连接 
ConnectionConnection connection = factory.newConnection();

4、建立Channel

//4.创建Channel
Channel channel = connection.createChannel();

5、创建队列:

//5.创建队列Queue  
// 如果没有一个名字叫 hello_comsumer 的队列,则会创建一个  
channel.queueDeclare("hello_comsumer",true,false,false,null);

队列创建有很多带参数构造器 进入到所在的类中,点击此处可以查看源码 image.png

查看开发者的注释信息 image.png

queue:队列名称 durable:是否持久化 exclusive: exclusive 为 true 时。只能有一个消费者监听该队列。 且Connection关闭时,会删除队列 autodelete:为 true 时,当没有Consumer时,会自动删除掉。 arguments:其他参数

6、发送消息:

//6.发送消息  
String msg = "hello comsumer!";  
channel.basicPublish("","hello_comsumer",null,msg.getBytes());

使用Channel类中的 用同样的方法查看源码中的参数

image.png exchange:交换机名称。简单模式下交换机会使用默认的”“ routingKey:路由名称 props:配置信息 body:字节数组(发送的消息)

7、释放资源

//7.释放资源
channel.close();
connection.close();

测试生产者代码

image.png

运行程序,然后打开RabbitMQ管理台 查看服务器中的Queues,可以看到队列的信息 如果注释掉第七步释放资源的部分,在Connections和Channels中也可以看到相关内容。 image.png

查看和打开Consumer Count image.png

可以发现当前队列存在已准备消息,但还没有消费者(还没有写消费者的代码) image.png

模拟接受消息代码

1~5 同上

从第1步到第4步建立连接,代码可以复用。 不过要注意的是队列和生产者创建的队列是不同的。

6、接受消息

接收消息的实现,使用 channel 中的 basicConsume 方法

查看源码查看参数的注释 image.png queue:队列名称 autoAck:是否自动确认 callback:回调的函数,可以自动执行一些方法

需要创建一个 Consumer 对象,Consumer 是一个接口,包含一个接收 Channel 对象作为参数的 DefaultConsumer 实现类,是空的实现类。 需要重写其中的回调函数方法。 image.png

//6.接收消息  
Consumer consumer = new DefaultConsumer(channel){  
    /**  
     * 回调方法,接收到消息后,会自动执行此方法  
     * @param consumerTag 标识  
     * @param envelope 获取一些信息,交换机,路由key...  
     * @param properties 配置信息  
     * @param body 数据(内容)  
     * @throws IOException  
     */    @Override  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        System.out.println("consumerTag : "+consumerTag);  
        System.out.println("envelope.getExchange() : "+envelope.getExchange());  
        System.out.println("envelope.getRoutingKey() : "+envelope.getRoutingKey());  
        System.out.println("properties" + properties);  
        System.out.println("body : "+new String(body));  
    }  
};  
channel.basicConsume("Q2",true,consumer);

消费者不需要关闭资源

消费者代码如下

image.png

测试消费者代码

运行消费者主函数 可以看到回调函数会被调用,并得到 Q1 队列中留存的 Message 结果

image.png

同时在 RabbitMQ中可以看到Q1队列中增加了一个消费者,并处理掉了消息 image.png

小结:

使用了简单模式,Producer 生产出要发送给 Consumer 处理的消息,Queue队列就相当于一个邮箱,存放着 P -> C 的资源。

]]>
https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/feed/ 0
RabbitMQ 管控台使用WIP https://forelink.top/index.php/2024/09/13/rabbitmq-%e7%ae%a1%e6%8e%a7%e5%8f%b0%e4%bd%bf%e7%94%a8wip/ https://forelink.top/index.php/2024/09/13/rabbitmq-%e7%ae%a1%e6%8e%a7%e5%8f%b0%e4%bd%bf%e7%94%a8wip/#respond Thu, 12 Sep 2024 16:57:13 +0000 https://forelink.top/?p=556 控制台选项卡

image.png

Overview 概览

在此处能看到服务器的运行状态和信息

点击Nodes的名称,可以查看详细的信息,如配置文件和日志文件的位置,进程的数量和内存使用量。 image.png image.png

Admin 管理

image.png

创建虚拟机

点击右侧选项卡 Virtual Hosts (虚拟机) 可以创建新的虚拟机

image.png

创建的虚拟机可以通过点击名称来配置信息 在虚拟机页面中将用户添加进虚拟机Permissions中

image.png

]]>
https://forelink.top/index.php/2024/09/13/rabbitmq-%e7%ae%a1%e6%8e%a7%e5%8f%b0%e4%bd%bf%e7%94%a8wip/feed/ 0
RabbitMQ 安装与配置(Debian) https://forelink.top/index.php/2024/09/13/rabbitmq-%e5%ae%89%e8%a3%85%e4%b8%8e%e9%85%8d%e7%bd%ae%ef%bc%88debian%ef%bc%89/ https://forelink.top/index.php/2024/09/13/rabbitmq-%e5%ae%89%e8%a3%85%e4%b8%8e%e9%85%8d%e7%bd%ae%ef%bc%88debian%ef%bc%89/#respond Thu, 12 Sep 2024 16:56:13 +0000 https://forelink.top/?p=552 官网链接

RabbitMQ: One broker to queue them all | RabbitMQ

安装步骤

1、进入官网

进入官网,点击快速开始-下载

image.png

选择 Debian,Ubuntu

2、安装官方APT源

在新页面中找到Apt安装方式

# 安装官方的 apt源 和 官方签名密钥
sudo apt-get install curl gnupg apt-transport-https -y

curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null

将 RabbitMQ 官方源添加到 APT 源列表中

sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF

3、安装Erlang和RabbitMQ

更新软件包列表,并安装 Erlang语言 和 RabbitMQ 服务器

sudo apt-get update -y

sudo apt-get install -y erlang-base \  
erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets \  
erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key \  
erlang-runtime-tools erlang-snmp erlang-ssl \  
erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl

sudo apt-get install rabbitmq-server -y --fix-missing

4、启用 RabbitMQ

sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

查看 RabbitMQ 状态

systemctl status rabbitmq-server

下图为正确安装和配置 RabbitMQ 的返回结果

image.png

5、启用 RabbitMQ 管理插件

sudo rabbitmq-plugins enable rabbitmq_management

终端输出下图结果时成功 image.png

启用后,可以通过访问主机的15672端口访问Web界面。

image.png

虽然使用localhost访问该管理页面,可以用默认账号guest(密码:guest)登录。 但是此时在非主机是没有用户的,需要添加一个用户用于学习

6、创建新用户并登录管理面板

# 创建一个新的 RabbitMQ 用户
# 例如 创建用户名为 hso 密码为123 的用户
sudo rabbitmqctl add_user hso 123

# 为新用户设置权限
# 该条命令允许 hso 对虚拟主机执行 读、写和配置操作
sudo rabbitmqctl set_permissions -p / hso ".*" ".*" ".*"

# 为用户分配管理员权限
sudo rabbitmqctl set_user_tags hso administrator

使用新账户登录管理页面

image.png

如果看到此面板,说明至此安装和配置成功,耶(^▽^)。

在官网快速开始中有RabbitMQ 6中工作模式的介绍
]]>
https://forelink.top/index.php/2024/09/13/rabbitmq-%e5%ae%89%e8%a3%85%e4%b8%8e%e9%85%8d%e7%bd%ae%ef%bc%88debian%ef%bc%89/feed/ 0
RabbitMQ简介 https://forelink.top/index.php/2024/09/13/rabbitmq%e7%ae%80%e4%bb%8b/ https://forelink.top/index.php/2024/09/13/rabbitmq%e7%ae%80%e4%bb%8b/#respond Thu, 12 Sep 2024 16:53:40 +0000 https://forelink.top/?p=550 AMQP协议

AMQP, 即 Advanced Message Queuing Protocol (高级消息队列协议) 是2006年发布的网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

RabbitMQ

2007年,Rabbit 技术公司基于 AMQP 标准开发了 RabbitMQ 1.0。 RabbitMQ 采用 Erlang 语言开发,Erlang 是专门为开发高并发和分布式系统的语言,在电信领域使用广泛。

RabbitMQ 基础架构如下图

image.png

Broker : 接收和分发消息的应用,全称Message Broker,RabbitMQ Server 就是 Message Broker

Virtual host : 出于多租户和安全因素设计,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于 namepsace(命名空间)的概念。当多个用户使用不同服务时,划分出多个vhost,每个用户在自己的vhost创建 Exchange / Queue

Connection : Producer / Consumer 和 Broker 之间的 TCP连接

Channel : 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大时建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的 Channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 Message broker 识别 Channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的连接,极大减少了操作系统建立 TCP Connection 的开销

Exchange : message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct(point-to-point)topic(publish-subscribe)fanout(multicast)

Queue : message 到达此处,等待被Consumer取走

Binding : exchange 和 queue 之间的虚拟连接,Binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 工作模式

RabbitMQ 提供了 6种工作模式: 简单模式 work queues Publish/Subscribe Routing Topics RPC

JMS

RabbitMQ 官方没有提供 JMS 的实现包, 开源社区有提供。 JMS 即 Java 消息服务(JavaMessage Service) 应用程序接口,是一个 Java 平台中关于面向消息中间件的API JMS 是 JavaEE 规范中的一种,类比JDBC 尽管RabbitMQ没有,很多消息中间件都实现了JMS规范。

]]>
https://forelink.top/index.php/2024/09/13/rabbitmq%e7%ae%80%e4%bb%8b/feed/ 0
MQ简介 https://forelink.top/index.php/2024/09/13/1-mq%e7%ae%80%e4%bb%8b/ https://forelink.top/index.php/2024/09/13/1-mq%e7%ae%80%e4%bb%8b/#respond Thu, 12 Sep 2024 16:52:50 +0000 https://forelink.top/?p=548 简介:

MQ 全称 Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间的通信。

image.png

图中A为消息生产者(Producer),B为消息消费者(Consumer),则MQ为中间件。

MQ 优劣

–在A系统与B系统中间加一层MQ。

优势:

· 应用解耦:
    降低系统间的耦合度,提高容错,便于维护。
    (如果系统异常,MQ会停止发送消息)
· 异步提速:
    接收到请求后,添加到消息队列中,不立即访问对应的服务
    提升用户体验 & 系统吞吐量
· 削峰填谷:
    高并发场景下,MQ不用访问具体的服务,仅接收消息
    并将请求限流发送给服务端可靠性高

劣势:

· 系统可用性降低
    系统引入的外部依赖越多,系统稳定性越差。
    (MQ的高可用?)
· 系统复杂度提高
    MQ 增加了系统的复杂度。
    之前系统间是同步的远程调用,加入 MQ 后通过 MQ 进行异步调用。
    (消息重复消费?消息丢失?消息传递的顺序?)
· 一致性问题
    A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据。
    如果B系统、C系统处理成功,D系统处理失败。
    (消息处理的一致性?)

MQ应用场景

1、生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,实现异步。 2、容许短暂的数据不一致性 3、解耦、提速、削峰的收益 > 加入MQ、管理MQ的成本

常见的 MQ产品:

image.png

也有使用 Redis 实现 MQ 的案例。

]]>
https://forelink.top/index.php/2024/09/13/1-mq%e7%ae%80%e4%bb%8b/feed/ 0