后端 – Wasting_Misaka.Blog https://forelink.top Hso! Tue, 08 Oct 2024 13:27:00 +0000 zh-Hans hourly 1 https://wordpress.org/?v=6.7.1 SnowFlake算法 https://forelink.top/index.php/2024/10/08/snowflake%e7%ae%97%e6%b3%95/ https://forelink.top/index.php/2024/10/08/snowflake%e7%ae%97%e6%b3%95/#respond Tue, 08 Oct 2024 13:26:58 +0000 https://forelink.top/?p=600 简介:

本文介绍了snowflake算法,优缺点,以及应用场景。

传统的集中式系统中,ID 生成通常由一个单一的服务器完成,导致服务存在单点故障的问题。 当系统规模扩大后,单一服务器可能无法处理高并发请求。而分布式 ID 生成算法旨在解决这一问题,其中每个节点都能并行工作独立生成ID,是去中心化的。 分布式系统中,有需要使用全局唯一 ID 的场景。 UUID ,Snowflake 或 Leaf 算法独立生成ID

其中雪花算法,能按照时间生成有序的ID。

雪花算法

Snowflake 算法是 Twitter开源的分布式 ID 生成算法 算法生成一个64bit 的long型数值,组成部分引入了时间戳,基本保持了自增。

优点

  1. 高性能高可用: 可在内存生成ID,不依赖数据库。
  2. 高吞吐:雪花算法生成ID主要依赖简单的位移操作,速度极快。
  3. 有序性:生成的ID的高位是时间戳,可以作为索引存入缓存中。

缺点

  1. 对机器时间依赖较强:系统时钟的精度和一致性非常关键,如果服务器时钟出现回调或不同步,可能会导致ID 重复或生成顺序混乱
  2. 有时间限制:41位时间戳只能使用约69年,超过这个时间可能需要手动调整纪元时间。

雪花 ID 结构

默认的雪花算法生成64bit的ID

| 1位符号位 | 41位时间戳 | 10位机器标识 | 12位序列号 |

符号位:最高位是符号位,0表示正,1表示负,该位固定为0,用于保持生成的ID为正数。

时间戳:毫秒级的时间戳,由一个纪元时间(Epoch)开始计算到当前时间的毫秒数。从定义的纪元时间开始,最多支持69年的时间。

机器标识:用于区分分布式系统中不同的机器或者节点,10位可以支持 2^10 = 1024个不同机器或节点。通常,10位的机器标识被分为两部分,5位的数据中心ID 和 5位机器ID。这样可以支持32个数据中心中的32台机器。

序列号:用于在同一毫秒中生成多个ID,避免时间戳出现冲突。12位可以支持每台机器每毫秒中生成 2^12 = 4096个唯一ID。(当同一台机器在同一毫秒中生成的ID超过4096个,机器会等待下一毫秒继续生成新的ID。

雪花ID的长度是可以自行配置的,可以根据具体场景进行设计。 希望服务运行更久,可以增加时间戳的位数。 想要支持更多节点,增加标识位长度。 高并发场景,增加序列号位数。

雪花ID适用场景

因为雪花算法有序自增,在MYSQL中 B+树 索引可以高性能插入,所以在日常使用中,雪花算法更多是被应用在数据库的主键 ID 和业务关联主键 且雪花ID每秒能生成 4096 * 1000 = 409w 个ID,适用于高并发场景

雪花ID常见问题

生成ID重复问题

  1. 服务器通过集群的方式部署,其中部分机器标识位一致
  2. 业务存在一定的并发量,没有并发量无法触发问题
  3. 同一毫秒下生成

标识位重复定义

开源框架中如MyBatis和Hutool使用雪花算法,都实现了不重复标识位的构造方法。 MyBatis-Plus v3.4.2 雪花算法实现类 Sequence,提供了两种构造方法: 无参构造 -> 自动生成dataCenterId 和 workerId 有参构造 -> 创建Sequence时明确指定标识位

Sequence默认的无参构造,生成了dataCenterId和Iworkerd

public static long getDataCenterId(long maxDatacenterId) {
    long id = 1L;
    final byte[] mac = NetUtil.getLocalHardwareAddress();
    if (null != mac) {
        id = ((0x000000FF & (long) mac[mac.length - 2])
                | (0x0000FF00 & (((long) mac[mac.length - 1]) << 8))) >> 6;
        id = id % (maxDatacenterId + 1);
    }

    return id;
}

DatacenterId的取值与设备Mac地址有关

public static long getWorkerId(long datacenterId, long maxWorkerId) {
    final StringBuilder mpid = new StringBuilder();
    mpid.append(datacenterId);
    try {
    // 获取进程PID
        mpid.append(RuntimeUtil.getPid());
    } catch (UtilException igonre) {
        //ignore
    }
    return (mpid.toString().hashCode() & 0xffff) % (maxWorkerId + 1);
}

WorkerId是MAC+PID的hashcode,获取16个低位

标识符分配

依赖MAC地址和进程PID的标识符仍然有小几率重复,为了解决这个问题,可以用 预分配和动态分配 的方法。

预分配 是在应用上线前,统计当前服务的节点数,人工的申请标识位。但是无法解决服务节点动态扩展的问题。

动态分配 将标识位存放在Redis、Zookeeper、MySQL等中间件,在服务启动时请求标识位。 但要注意生成的ID是 服务内唯一 还是 全局唯一 全局唯一 可能会导致单点故障问题。

常见的实现是 Redis + Lua 脚本,在应用启动时,通过Lua脚本获取标识位。

]]>
https://forelink.top/index.php/2024/10/08/snowflake%e7%ae%97%e6%b3%95/feed/ 0
安装Clash-for-Linux科学上网 https://forelink.top/index.php/2024/09/15/%e5%ae%89%e8%a3%85clash-for-linux%e7%a7%91%e5%ad%a6%e4%b8%8a%e7%bd%91/ https://forelink.top/index.php/2024/09/15/%e5%ae%89%e8%a3%85clash-for-linux%e7%a7%91%e5%ad%a6%e4%b8%8a%e7%bd%91/#respond Sat, 14 Sep 2024 16:54:28 +0000 https://forelink.top/?p=581 简介:
主要介绍了Clash-for-Linux在debian系统上的的安装和配置方法

Github仓库链接

https://github.com/Elegycloud/clash-for-linux-backup

安装方法

在仓库README写的很详尽了。 先克隆仓库项目到本地

$ git clone https://github.com/Elegycloud/clash-for-linux-backup.git

编辑项目目录中的.env 文件,将CLASH_URL修改为机场的clash订阅地址,保存退出。

运行启动脚本,如果订阅地址能够正确访问,会提示加载环境变量和开启关闭系统代理的命令提示

./start.sh

正在检测订阅地址...
Clash订阅地址可访问!                                      [  OK  ]

正在下载Clash配置文件...
配置文件config.yaml下载成功!                              [  OK  ]

正在启动Clash服务...
服务启动成功!                                             [  OK  ]

Clash Dashboard 访问地址:http://<ip>:9090/ui
Secret:xxxxxxxxxxxxx

请执行以下命令加载环境变量: source /etc/profile.d/clash.sh

请执行以下命令开启系统代理: proxy_on

若要临时关闭系统代理,请执行: proxy_off

加载环境变量和,并开启系统代理

source /etc/profile.d/clash.sh
proxy_on

检查服务端口(需要 net-tools软件包)

$ netstat -tln | grep -E '9090|789.'
tcp        0      0 127.0.0.1:9090          0.0.0.0:*               LISTEN     
tcp6       0      0 :::7890                 :::*                    LISTEN     
tcp6       0      0 :::7891                 :::*                    LISTEN     
tcp6       0      0 :::7892                 :::*                    LISTEN

检查环境变量

$ env | grep -E 'http_proxy|https_proxy'
http_proxy=http://127.0.0.1:7890
https_proxy=http://127.0.0.1:7890

如果返回了后两行信息说明配置成功。 接下来设置代理。

如果使用Debian / Ubuntu ,需要在gnome桌面 设置中打开网络设置-代理设置,并修改 HTTP/HTTPS /SOCKS 为手动代理,并将 端口号修改为软件默认使用的端口号。

截图 2024-09-14 17-39-04.png

修改节点和端口号

提供了 yacd控制面板 可视化的管理 clash,访问

localhost:9090/ui
或
127.0.0.1:9090/ui
Snipaste_2024-09-14_17-50-56.png

URL填入如图中的信息,Secret在启动Clash脚本的终端获取,可以打开yacd控制面板的登陆界面,能够查看当前clash状态,修改节点,规则,查看连接配置和日志。

有Issue提到了存在流量激增的问题。

关闭代理后,应该将设置中的代理信息改回为自动。

]]>
https://forelink.top/index.php/2024/09/15/%e5%ae%89%e8%a3%85clash-for-linux%e7%a7%91%e5%ad%a6%e4%b8%8a%e7%bd%91/feed/ 0
将宿主机文件拷贝进容器 https://forelink.top/index.php/2024/09/15/%e5%b0%86%e5%ae%bf%e4%b8%bb%e6%9c%ba%e6%96%87%e4%bb%b6%e6%8b%b7%e8%b4%9d%e8%bf%9b%e5%ae%b9%e5%99%a8/ https://forelink.top/index.php/2024/09/15/%e5%b0%86%e5%ae%bf%e4%b8%bb%e6%9c%ba%e6%96%87%e4%bb%b6%e6%8b%b7%e8%b4%9d%e8%bf%9b%e5%ae%b9%e5%99%a8/#respond Sat, 14 Sep 2024 16:52:05 +0000 https://forelink.top/?p=579 简介:
使用 docker cp 命令将宿主机中的文件拷贝到Docker容器中

大多数 Docker 容器 内的目录结构与标准的 Linux 文件系统非常相似,包括常见的目录如 /bin/etc/sys 等。这是因为容器通常基于某种 Linux 操作系统镜像(如 Ubuntu、Debian、Alpine Linux 等)创建的,这些镜像本身就包含了一些基本的 Linux 目录和工具。

以bash方式启动可以打开容器的命令行。

拷贝文件

使用docker cp指令 例如:

docker cp /home/user/sql_files/file1.sql mysql-container:/tmp/file1.sql 
docker cp /home/user/sql_files/file2.sql mysql-container:/tmp/file2.sql

此命令将两个 sql 文件拷贝进了mysql容器的临时文件夹中 image.png

]]>
https://forelink.top/index.php/2024/09/15/%e5%b0%86%e5%ae%bf%e4%b8%bb%e6%9c%ba%e6%96%87%e4%bb%b6%e6%8b%b7%e8%b4%9d%e8%bf%9b%e5%ae%b9%e5%99%a8/feed/ 0
Mysql部署 https://forelink.top/index.php/2024/09/15/mysql%e9%83%a8%e7%bd%b2/ https://forelink.top/index.php/2024/09/15/mysql%e9%83%a8%e7%bd%b2/#respond Sat, 14 Sep 2024 16:51:38 +0000 https://forelink.top/?p=577 简介

docker 部署 mysql容器。

创建容器

docker run --name mysql_mycontainer -p 3307:3306 -e MYSQL_ROOT_HOST='%' -e MYSQL_ROOT_PASSWORD=root -d mysql:5.7.36

–name 指定容器的别名 -p 宿主机端口:容器端口 外部连接3307端口,即可连接容器 -e 设置环境变量的值 PATHVAR = VAR 此处将密码设置为了 root -d 以守护模式运行该容器(后台运行)

查看容器状态

docker ps -a
image.png

可以检查容器在docker中的状态。

外部连接容器

mysql -h [ip] -p [port] -u [username] -p
*输入密码*

image.png (如果不成功,可能需要开放对应的端口) 如图,在其设备上成功访问了mysql。

]]>
https://forelink.top/index.php/2024/09/15/mysql%e9%83%a8%e7%bd%b2/feed/ 0
数据卷概念和配置 https://forelink.top/index.php/2024/09/15/%e6%95%b0%e6%8d%ae%e5%8d%b7%e6%a6%82%e5%bf%b5%e5%92%8c%e9%85%8d%e7%bd%ae/ https://forelink.top/index.php/2024/09/15/%e6%95%b0%e6%8d%ae%e5%8d%b7%e6%a6%82%e5%bf%b5%e5%92%8c%e9%85%8d%e7%bd%ae/#respond Sat, 14 Sep 2024 16:51:01 +0000 https://forelink.top/?p=575 简介

包括了docker 数据卷的概念、作用和配置。

概念:

数据卷式宿主机的一个目录或文件,当容器目录和数据卷目录绑定后,对方修改会立即同步。 宿主机的目录能把数据卷中的数据在容器中同步挂载。 外部机器和容器虽不互通,但可以和宿主机互通。 一个数据卷可以被多个容器同时挂载。 一个容器也可以挂载多个数据卷。

作用:

数据卷的作用包括 容器数据持久化 外部机器和容器间接通信 容器之间数据交换

配置:

创建启动容器时,使用 – v 参数 设置数据卷 docker run … -v [宿主机目录(文件)]:[容器内目录(文件)]

注意事项: 目录必须是绝对路径 如果目录不存在,会自动创建 可以挂载多个数据卷

在挂载的目录中创建文件,能同步给容器和宿主机。 对文件的修改操作也能实时同步。

一个容器可以挂载多个数据卷目录 -v 宿主机目录1:数据卷1 -v 宿主机目录2:数据卷2

数据卷容器:是一种特殊的容器 多容器进行数据交换: 多个容器挂载同一个数据卷(麻烦) 数据卷容器(方便) 使用时让其他容器继承该容器即可。 创建数据卷容器: -v /volume(容器名?)

创建容器:
--volumes-from [数据卷容器名]
]]>
https://forelink.top/index.php/2024/09/15/%e6%95%b0%e6%8d%ae%e5%8d%b7%e6%a6%82%e5%bf%b5%e5%92%8c%e9%85%8d%e7%bd%ae/feed/ 0
Docker简介和安装方法 https://forelink.top/index.php/2024/09/15/docker%e7%ae%80%e4%bb%8b%e5%92%8c%e5%ae%89%e8%a3%85%e6%96%b9%e6%b3%95/ https://forelink.top/index.php/2024/09/15/docker%e7%ae%80%e4%bb%8b%e5%92%8c%e5%ae%89%e8%a3%85%e6%96%b9%e6%b3%95/#respond Sat, 14 Sep 2024 16:50:22 +0000 https://forelink.top/?p=573 简介:

Docker 是基于Go语言实现,轻量级,可移植的开源应用容器引擎 使用沙箱机制运行,容器开销低且相互隔离

可在MAC Windows Linux上安装运行

Linux Docker安装

官网下载文档 Debian | Docker Docs

Debian安装

配置官方apt源

# Add Docker's official GPG key:
sudo apt-get update
sudo apt-get install ca-certificates curl
sudo install -m 0755 -d /etc/apt/keyrings
sudo curl -fsSL https://download.docker.com/linux/debian/gpg -o /etc/apt/keyrings/docker.asc
sudo chmod a+r /etc/apt/keyrings/docker.asc

# Add the repository to Apt sources:
echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/debian \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
sudo apt-get update

安装 Docker

验证是否安装成功

sudo docker run hello-world

该命令会下载一个hello-world测试镜像,并分配一个容器运行。 容器运行后,如果docker安装成功,会打印出确认信息,随后自动退出。

image.png 看到该信息,说明docker服务正常,安装成功。

设置Docker自启(可选)

sudo systemlctl enable docker

安装 本地Debian Deb包

sudo apt-get update
sudo apt-get install ./docker-desktop-<arch>.deb

Docker Desktop 默认安装在 /opt/docker-desktop 中

配置 Docker配置文件

Debian 桌面版安装

在Debian 上推荐安装使用Docker Desktop。

image.png 右键复制链接 使用 wget命令下载。

wget https://desktop.docker.com/linux/main/amd64/docker-desktop-amd64.deb

镜像和容器

镜像

查看镜像
    docker image [-q]
    [-q] 返回所有镜像的标识符/ID
    REPOSITORY 镜像名
    TAG 版本号
    IMAGE ID 标识符
    CREATED 创建时间
    SIZE 镜像大小

搜索镜像(需联网)
    docker search [something]

拉取镜像(默认下载最新版本)
    docker pull [下载内容:版本号]
    在docker.hub可以查找image的信息

删除镜像
    docker rmi [image id/镜像名]

容器

运行容器
    docker run [options] [镜像名:版本号] [COMMANDS] [其他参数]
    options:
        -i 保持容器运行
        -t 能给容器分配一个终端,接受后续的命令(通常与-i同时使用)
        -d 以守护模式运行容器,创建一个后台容器,退出后容器不会自动关闭
        -i -t -d可以合并为 -dit
        -p 端口映射 将容器的端口映射到宿主机的端口
            格式为 -p 宿主机端口:容器端口
        -v 卷挂载,将宿主机的目录挂载到容器的目录
            格式为 -v /宿主机/目录:/容器/目录
        -e 环境变量 设置容器内部使用的环境变量
            格式为 -e "MY_VAR=my_value" 常用于系统容器中
        --rm 自动删除 容器停止后自动删除,释放资源。
        --name能给容器指定一个别名
            指定运行系统版本
            运行后能进入容器内部进行操作
            exit退出后容器会被关闭
        还有其他启动选项,不一一列出
    (以下为可选)
    COMMANDS:
        容器启动时调用的命令。
        如果运行一个 ubuntu 容器,默认命令是/bin/bash
        会进入一个交互式 Bash shell
    其他参数 是传递给命令的参数
    
进入容器内部
    docker exec [options] [容器别名name]

查看正在运行的容器
    docker ps [options]
    查看容器的创建命令
    -a 查看所有创建过的容器
    -q 返回容器的识别号

启动容器
    docker start [容器别名]
关闭容器
    docker stop [容器别名]
删除容器
    docker rm [容器别名]
    不能删除正在运行的容器

查看容器信息
    docker inspect [容器别名]

复合命令
    docker rm `docker ps -aq`
    删除所有容器
]]>
https://forelink.top/index.php/2024/09/15/docker%e7%ae%80%e4%bb%8b%e5%92%8c%e5%ae%89%e8%a3%85%e6%96%b9%e6%b3%95/feed/ 0
春靴+兔 TopicMode https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/ https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/#respond Fri, 13 Sep 2024 15:47:35 +0000 https://forelink.top/?p=570 简介
image.png

使用SpringBoot + RabbitMQ 实现需求: 通过 在HTML页面中填写表单并提交 发起 HTTP 请求来调用生产者方法发送消息 格式如 test.orange.test 并将Message输出打印到控制台上。

Topic 程序

配置类

在 spring框架的amcq中,提供了 队列,交换机和绑定关系的构建方法(Builder) 在使用 SpringBoot 提供 RabbitMQ 服务时,活用构建器可以简化开发流程。

Spring框架的amqp.core包提供了如下构建器: QueueBuilder ExchangeBuilder BindindBuilder 以点分隔接收参数,队列与交换机以 build() 结尾 绑定关系Binding以 noargs() 或者 and() 结尾

在构建过程中可以查看后续方法的类型。 image.png

import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
/**  
 * */  
@Configuration  
public class TopicConfig {  
    // 1.交换机 标识使用@Qualify注解调用这个bean  
    @Bean("topicExchange")  
    public Exchange topicExchange(){  
        // 创建一个 topic类型 的交换机  
        return ExchangeBuilder.topicExchange("topicExchange").durable(false).build();  
    }  
    // 2.队列  
    @Bean("topicQueue1")  
    public Queue topicQueue1(){  
        return QueueBuilder.durable("topicQueue1").build();  
    }  
    @Bean("topicQueue2")  
    public Queue topicQueue2(){  
        return QueueBuilder.durable("topicQueue2").build();  
    }  
    // 3.绑定Topic关系  
    @Bean  
    public Binding Binding1(@Qualifier("topicQueue1") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("*.orange.*").noargs();  
    }  
  
    @Bean  
    public Binding binding2(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit").noargs();  
    }  
  
    @Bean  
    public Binding binding3(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){  
        return BindingBuilder.bind(queue).to(exchange).with("lazy.#").noargs();  
    }  
  
}

HTML 页面

准备一个 HTML 页面,包含routingKey Message 的表单。

<!DOCTYPE html>  
<html lang="en">  
<head>  
    <meta charset="UTF-8">  
    <title>Topic Test</title>  
</head>  
<body>  
    <form action="http://localhost:8080/topic" method="get" >  
        routing key: <input type="text" name="routingKey">  
        message : <input type="text" name="message"></input>  
        <input type="submit" value="提交">  
    </form></body>  
</html>
<!--
    form 表单
        action发送表单中的数据到action的url里,
               url为空时发送到当前页面
        method
            get 有url长度限制
            post发送到f12中网络请求中,无网络限制
    input
        type
            text 文本类型,可操作
            submit 按钮类型,可点击
        name
            是必须的属性
        value
            框框中的值
-->

在浏览器中打开的效果如下

image.png

生产者

新建生产者类 ropicProducer 先创建一个Spring框架中的 RabbitTemplate 实现类 该类已装载在 Bean 容器中,设置AutoWired自动获取该对象

然后提供一个 sendMessage 方法,包含 routingKeymessage 形参,方便外部传参调用。

import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  
  
@Component  
public class TopicProducer {  
    // 创建一个rabbitTemplate对象  
    private RabbitTemplate rabbitTemplate;  
    private String exchangeName = "topicExchange";  
  
    @Autowired  
    public TopicProducer(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
    // sendMessage 方法  
    public void sendMessage(String routingKey,String message) {  
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);  
    }  
}

控制器

新建控制器 TopicController 用于响应HTTP请求,并执行生产者发送消息的 sendMessage 方法 创建一个生产者对象 topicProducer 用以调用发送消息的方法

并提供一个 GetMapping 方法, 在接收到HTTP页面发来的请求后, 将表单中包含的 routingKeyMessage 传给 sendMessage() 方法

import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
public class TopicController {  
  
    private final TopicProducer topicProducer;  
  
    public TopicController(TopicProducer topicProducer) {  
        this.topicProducer = topicProducer;  
    }  

    @GetMapping("/topic")  
    public String sendMessage(String routingKey,String message){  
        topicProducer.sendMessage(routingKey,message);  
        return "routingKey: "+routingKey+"\nmessage: "+message;  
    }  
}

测试效果

将主程序运行起来, 打开 html 页面, 通过表单提交三条测试信息和一条无关信息.

image.png

在控制台中可以大致确认信息达到了队列

image.png

消费者

新建两个消费者类, 分别用以处理两个队列的消息 并通过@RabbitListener 注解监听队列, 当取到队列的信息时, 会将参数传入注解下的方法并自动调用

import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
@Component  
public class TopicConsumer1 {  
    private final String queueName = "topicQueue1";  
    @RabbitListener(queues = queueName)  
    public void recv(String message) {  
        System.out.println("C1 receive message : " + message);  
    }  
}

消费者2中的 QueueName 改为 topicQueue2 控制台输出信息中的 C1 改为 C2 加以区分

运行

运行代码,拿到并处理了队列中积压的信息 image.png

打开 html 页面再测试几组数据,返回了正确的结果 image.png

]]>
https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-topicmode/feed/ 0
春靴+兔 入门程序 https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/ https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/#respond Fri, 13 Sep 2024 15:46:27 +0000 https://forelink.top/?p=568 简介:

将RabbitMQ 整合到 SpringBoot中。 并实现一个入门程序。

入门程序

导入依赖

在IDEA 创建SpringBoot工程前勾选以下依赖 image.png

修改配置文件

修改 application.properties 或 applications.yml 文件, 加入RabbitMQ 的配置信息

image.png

1. 生产者(发送消息)

创建一个 MessageProducer 类,用于发送消息

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.stereotype.Component;  
  
/**  
 * 生产者类  
 */  
@Component  
public class MessageProducer {  
    private final RabbitTemplate rabbitTemplate;  
  
    @Autowired  
    public MessageProducer(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
    public void sendMessage(String message) {  
        System.out.println("Sending message: " + message);  
        rabbitTemplate.convertAndSend("helloQueue", message);  
    }  
}

2. 配置队列

创建一个 RabbitMQConfig 类,用于配置队列、交换器和绑定等。

package com.wastingmisaka.springbootrabbitmq;  

import org.springframework.amqp.core.Queue;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
/**  
 * 配置类  
 * @Congiuration 告知Spring这是一个配置类
 */  
@Configuration  
public class RabbitMQConfig {  
    @Bean  
    public Queue helloQueue() {  
        return new Queue("helloQueue", false);  
        // 创建一个 helloQueue 的队列,且不需要持久化。
    }  
}

此处仅配置了队列。

3. 消费者(接收消息)

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  
  
/**  
 * 消费者类  
 */  
  
@Component  
public class MessageConsumer {  
    @RabbitListener(queues = "helloQueue")  
    public void receiveMessage(String message) {  
        System.out.println("Received message: " + message);  
    }  
}

@RabbitListener 注解用来监听指定队列的消息,当有消息到达时,携带信息参数自动触发方法执行。

4. 控制器(发起请求)

创建一个用于发起HTTP请求的控制器

package com.wastingmisaka.springbootrabbitmq;  
  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RequestParam;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
public class MessageController {  
  
    private final MessageProducer messageProducer;  
  
    public MessageController(MessageProducer messageProducer) {  
        this.messageProducer = messageProducer;  
    }  
  
    @GetMapping("/test")  
    public String test(){  
        String msg = "hello,world";  
        messageProducer.sendMessage(msg);  
        return "Message sent: "+msg;  
    }  
  
  
    @GetMapping("/send")  
    public String sendMessage(@RequestParam String message) {  
        messageProducer.sendMessage(message);  
        return "Message sent: "+message;  
    }  
}

其中 /test 请求路径包含了一个测试方法,向RabbitMQ发送 hello,world 消息。 /send 请求路径可以添加参数。

测试结果:

image.png

控制台对应的方法被调用。 且RabbitMQ管理台中存在helloQueue 队列

image.png

页面也返回了正确的信息

image.png

]]>
https://forelink.top/index.php/2024/09/13/%e6%98%a5%e9%9d%b4%e5%85%94-%e5%85%a5%e9%97%a8%e7%a8%8b%e5%ba%8f/feed/ 0
RabbitMQ 6种工作模式 https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/ https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/#respond Fri, 13 Sep 2024 15:45:11 +0000 https://forelink.top/?p=566 简介:主要介绍了兔的4种工作模式,先前的入门程序已经介绍了简单模式的运作模式,还有一种RPC模式不在这里介绍和演示。

工作模式

先前介绍过,兔有6种不同的工作模式, 除去先前介绍的简单模式外,还有其他5种工作模式。

Working Queue 工作队列

image.png

与入门程序的简单模式相比,工作队列方式是简单模式的拓展,多个消费端共同处理一个队列中的信息。 RabbitMQ 通过轮询等方式,将消息分发给每个消费者

改造生产者代码,让其发送多条消息到工作队列中 image.png

并新建一个消费者,运行代码 image.png

一个队列中多个消费者从同一个队列中竞争消息 在多个消费者之间进行任务分配,适用于在工作负载较重的场景

Pub/Sub 订阅

image.png

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化 交换机一方面接收生产者发送的消息。另一方面,能够根据自己的类型处理消息 交换机有 3种 常见类型:

Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定 routing key 的队列 Topic :通配符,把消息交给符合 routing pattern 的队列

交换机只符合转发消息,不具备存储消息的能力!

需要使用 Channel 类中二的方法 创建一个交换机。

// 创建交换机
channel.exchangeDeclare("test_fanout",
                        BuiltinExchangeType.FANOUT,
                        true,false,false,null);
image.png

Exchange:交换机名称 Type:交换机类型 durable:是否持久化 autoDelete:是否自动删除 internal:是否内部使用 arguments:参数列表

创建队列

//创建队列  
channel.queueDeclare("test_fanout_queue1",true,false,false,null);  
channel.queueDeclare("test_fanout_queue2",true,false,false,null);

绑定交换机

//绑定交换机  
channel.queueBind("test_fanout_queue1","test_fanout","",null);  
channel.queueBind("test_fanout_queue2","test_fanout","",null);

Channel类中绑定的方法 image.png

运行代码,可以看到创建的交换机,并绑定成功了两个队列。

image.png

image.png

并可以看到 fanout 模式下,所有queue都会收到这个信息 适用于消息需要广播给多个消费者的场景,如系统日志、新闻分发等。

Routing Mode 路由模式

image.png

生产者将消息发送给 交换机 (Exchange),并附带 路由键 (RoutingKey) 交换机根据路由键,将消息发送到与路由键匹配的队列。

使用 定向类型(direct)的交换机,允许生产者通过路由键有选择性的发送消息。 消息会根据路由键,发送到指定的一个或多个队列。

绑定队列时,要在 routingKey 中加入指定的路由键,交换机Direct接收到消息后,会根据路由键将消息发送到指定的队列中去

channel.queueBind("test_direct_queue1","test_direct","orange",null);  
channel.queueBind("test_direct_queue2","test_direct","black",null);  
channel.queueBind("test_direct_queue2","test_direct","green",null);

运行代码,在管理台交换机面板查看绑定结果

image.png

说明绑定成功。 此时创建并运行消费者代码,可以收到队列中的消息

image.png

队列与交换机的绑定,需要指定一个 RoutingKey 消息向交换机发送消息时,也必须要指定消息的 RoutingKey 交换机不再把消息交给每一个绑定的队列,而是根据消息的 RoutingKey 进行判断,只有队列绑定的 RoutingKey 与消息的 RoutingKey 完全一致,才能拿到消息

适用于不同类型的消费者订阅不同类型的消息的场景,如不同服务接收不同的日志类型。

Topic Mode 通配符模式

image.png

生产者将消息发送给 交换机,并附带 主题(Topic) 交换机会根据主题模式,将消息发送到符合条件的队列。

使用 topic 类型的 交换机,允许生产者通过复杂的模式匹配(类似通配符的方式)发送消息 队列可以根据绑定的主题模式接收响应的消息。 路由键通常是点分隔的词汇,比如 log.info 或 log.error , 可以使用 *# 来匹配部分路由键。

绑定队列时,使用通配符来匹配路由键

channel.queueBind("test_topic_queue1","test_topic","*.orange.*",null);  
channel.queueBind("test_topic_queue2","test_topic","*.*.rabbit",null);  
channel.queueBind("test_topic_queue2","test_topic","lazy.#",null);

运行代码,查看管理台中的交换机绑定信息 image.png

已经绑定了队列,并且指定了topic

模拟发送信息

String msg1 = "q1";  
String msg2 = "q2";  
String msg3 = "q2 but msg3";  
channel.basicPublish("test_topic","eat.orange.apple",null,msg1.getBytes());  
channel.basicPublish("test_topic","jumpy.jumpy.rabbit",null,msg2.getBytes());  
channel.basicPublish("test_topic","lazy.me",null,msg3.getBytes());

检查管理台队列收到信息 image.png

创建消费者并运行,收到消息。 image.png

适用于消息分类更加灵活、多样化的场景,比如日志分发系统,处理不同优先级、不同级别的日志。

RPC 模式

Remote Procedure Call Mode 生产者发送消息(请求)到队列,并等待消费者处理后返回结果(响应) 消费者从队列中接收请求,处理后将结果发送回生产者。

  • 用于模拟远程过程调用(Remote Procedure Call, RPC)的模式,生产者等待一个结果。
  • 使用专门的回复队列来接收结果。

适用于需要请求-响应模型的场景,如远程服务调用、异步任务处理等。

]]>
https://forelink.top/index.php/2024/09/13/rabbitmq-6%e7%a7%8d%e5%b7%a5%e4%bd%9c%e6%a8%a1%e5%bc%8f/feed/ 0
RabbitMQ+Java入门 https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/ https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/#respond Thu, 12 Sep 2024 16:58:55 +0000 https://forelink.top/?p=558 简介:

使用简单模式完成消息传递,作为RabbitMQ的入门程序。

步骤

创建工程 (生产者、消费者)

在项目中创建两个Maven模块(Modules) image.png

添加依赖

<!--  RabbitMQ Java 客户端  -->  
<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.6.0</version>  
</dependency>

两个模块都需要在 pom.xml 中添加该依赖

模拟发送消息代码

1、建立连接:

ConnectionFactory factory = new ConnectionFactory();

2、设置参数:

//2.设置参数  
factory.setHost("192.168.8.202");// ip 默认值localhost  
factory.setPort(5672);// 端口,默认值5672  
factory.setVirtualHost("vhost1");// 虚拟机,默认值/  
factory.setUsername("hso");// 用户名,默认guest
factory.setPassword("123");// 密码,默认 guest

3、建立连接 Connection

//3.创建连接 
ConnectionConnection connection = factory.newConnection();

4、建立Channel

//4.创建Channel
Channel channel = connection.createChannel();

5、创建队列:

//5.创建队列Queue  
// 如果没有一个名字叫 hello_comsumer 的队列,则会创建一个  
channel.queueDeclare("hello_comsumer",true,false,false,null);

队列创建有很多带参数构造器 进入到所在的类中,点击此处可以查看源码 image.png

查看开发者的注释信息 image.png

queue:队列名称 durable:是否持久化 exclusive: exclusive 为 true 时。只能有一个消费者监听该队列。 且Connection关闭时,会删除队列 autodelete:为 true 时,当没有Consumer时,会自动删除掉。 arguments:其他参数

6、发送消息:

//6.发送消息  
String msg = "hello comsumer!";  
channel.basicPublish("","hello_comsumer",null,msg.getBytes());

使用Channel类中的 用同样的方法查看源码中的参数

image.png exchange:交换机名称。简单模式下交换机会使用默认的”“ routingKey:路由名称 props:配置信息 body:字节数组(发送的消息)

7、释放资源

//7.释放资源
channel.close();
connection.close();

测试生产者代码

image.png

运行程序,然后打开RabbitMQ管理台 查看服务器中的Queues,可以看到队列的信息 如果注释掉第七步释放资源的部分,在Connections和Channels中也可以看到相关内容。 image.png

查看和打开Consumer Count image.png

可以发现当前队列存在已准备消息,但还没有消费者(还没有写消费者的代码) image.png

模拟接受消息代码

1~5 同上

从第1步到第4步建立连接,代码可以复用。 不过要注意的是队列和生产者创建的队列是不同的。

6、接受消息

接收消息的实现,使用 channel 中的 basicConsume 方法

查看源码查看参数的注释 image.png queue:队列名称 autoAck:是否自动确认 callback:回调的函数,可以自动执行一些方法

需要创建一个 Consumer 对象,Consumer 是一个接口,包含一个接收 Channel 对象作为参数的 DefaultConsumer 实现类,是空的实现类。 需要重写其中的回调函数方法。 image.png

//6.接收消息  
Consumer consumer = new DefaultConsumer(channel){  
    /**  
     * 回调方法,接收到消息后,会自动执行此方法  
     * @param consumerTag 标识  
     * @param envelope 获取一些信息,交换机,路由key...  
     * @param properties 配置信息  
     * @param body 数据(内容)  
     * @throws IOException  
     */    @Override  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        System.out.println("consumerTag : "+consumerTag);  
        System.out.println("envelope.getExchange() : "+envelope.getExchange());  
        System.out.println("envelope.getRoutingKey() : "+envelope.getRoutingKey());  
        System.out.println("properties" + properties);  
        System.out.println("body : "+new String(body));  
    }  
};  
channel.basicConsume("Q2",true,consumer);

消费者不需要关闭资源

消费者代码如下

image.png

测试消费者代码

运行消费者主函数 可以看到回调函数会被调用,并得到 Q1 队列中留存的 Message 结果

image.png

同时在 RabbitMQ中可以看到Q1队列中增加了一个消费者,并处理掉了消息 image.png

小结:

使用了简单模式,Producer 生产出要发送给 Consumer 处理的消息,Queue队列就相当于一个邮箱,存放着 P -> C 的资源。

]]>
https://forelink.top/index.php/2024/09/13/5-rabbitmqjava%e5%85%a5%e9%97%a8/feed/ 0