专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 hot资讯 Rabbitmq消息中间件视频,入门学习

Rabbitmq消息中间件视频,入门学习

更新时间:2021-01-25 16:16:59 来源:动力节点 浏览1159次

RabbitMQ是使用Erlang语言开发的消息中间件,其遵循了高级消息队列协议(Advanced Message Queuing Protocol,AMQP)。

 

Rabbitmq消息中间件视频

 

与Kafka等消息队列相比,RabbitMQ最大的优势在于其较高的可靠性:

 

提供确认(ACK)和重传机制保证消息完成消费,消费者异常不会导致消息丢失

 

提供消息持久化机制,broker崩溃不会导致消息丢失

 

集群模式下工作,保证高可用

 

因为具有较高可靠性和一致性,RabbitMQ可以胜任订单处理、秒杀等一致性要求较高的业务场景。

 

RabbitMQ概念与机制

 

RabbitMQ中的概念模型:

 

Broker:消息中间件实例,可能是单个节点也可能是运行在多节点集群上的逻辑实体

 

消息(Message):消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义RabbitMQ对消息行为。消息体是字节流,包含消息内容。

 

连接(Connection):客户端与Broker之间的TCP连接

 

信道(Channel):Channel是建立在TCP连接上的逻辑(虚拟)连接。多个Channel复用同一个TCP连接,以避免建立TCP连接的巨大开销。RabbitMQ官方要求每个线程使用独立的Channel,禁止多个线程共用Channel。

 

生产者(Publisher):发送消息的客户端线程

 

消费者(Consumer):处理消息的客户端线程

 

交换机(Exchange):交换机负责将消息投递到相应的队列

 

队列(Queue):接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出FIFO。

 

绑定(Binding):将队列(Queue)注册到交换机(Exchange)的路由表

 

虚拟主机(Vhost):每个Broker下可建立多个vhost,每个vhost可建立独立的Exchange、Queue、绑定及权限系统。同一个Broker下的vhost共享Connection、Channel和用户系统,就是说可以使用同一个用户身份使用同一个Channel访问不同vhost。

 

交换机(Exchange)

 

生产者发送的消息会首先送到交换机(Exchange),交换机根据自身类型和消息的routing-key等信息将消息投递到绑定的消息队列中。

 

RabbitMQ中的四种标准交换机:

 

direct:如果消息的routing-key与队列的binding-key完全相同,direct类型的交换机则会将消息投递到该队列中。

 

多个队列可以使用相同的binding-key绑定到同一个direct交换机,direct交换机会把消息投递到所有binding-key与消息routing-key相同的队列

 

topic:允许队列的binding-key中包含通配符*和#,topic交换机会将消息投递到binding-key与routing-key匹配的队列中。

 

通配符按照关键字进行匹配,如news.cn.a中的关键字是news、cn和a,即关键字按照.分割

 

#通配符匹配0个或多个关键字,news.#.a可以匹配news.a,news.cn.a和news.asia.cn.a等

 

*通配符匹配一个关键字,news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

 

fanout:fanout交换机不进行任何匹配,将消息投递到所有绑定的队列

 

header:header交换机根据消息头进行投递,现在已较少使用

 

我们可以使用RabbitMQ的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange。

 

消息头中的delivery-mode可以设置为persistent(持久化)或者transient(易失)。Exchange和Queue在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理,即使RabbitMQ崩溃也不会丢失。

 

消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息,即当有新消息时Broker通过channel主动向客户端发送消息。客户端也可以使用channel.basicGet从Broker拉取消息。

 

ACK机制

 

RabbitMQ提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。

 

确认送达的回执有三种:

 

ACK:消息已被成功处理

 

NACK:消息处理异常,需要重新投递

 

REJECT:消息非法,丢弃消息

 

RabbitMQ的Queue可以设置no_ack=true,则消息被投递后即删除不等待回执。

 

channel.basicConsume可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。

 

Java代码示例

 

首先安装并启动RabbitMQ实例,Mac用户可以使用Homebrew进行安装:

 

brew install rabbitmq

 

启动服务:

 

brew services start rabbitmq

 

或者使用官方docker镜像:

 

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

 

RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。

 

RabbitMQ默认TCP端口为5672,Web控制台默认端口15672。

 

在Maven中添加依赖:

 

<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>5.5.1</version>
</dependency>

 

编写生产者:

 

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author finley
 */
public class RabbitProducer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String routingKey = "hello";

   byte[] msg = "hello world".getBytes();
   AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
   propsBuilder.deliveryMode(2); // persistent
   propsBuilder.priority(0); // normal
   propsBuilder.contentType("text/plain");
   channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);
  }
 }
}

 

编写消费者:

 

package rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * @author finley
 */
public class RabbitConsumer {

 public static void main(String[] args) throws IOException, TimeoutException {
  ConnectionFactory factory = new ConnectionFactory();
  factory.setUsername("guest");
  factory.setPassword("guest");
  factory.setHost("localhost");
  try (Connection conn = factory.newConnection();
    Channel channel = conn.createChannel()) {
   String exchangeName = "test-exchange";
   channel.exchangeDeclare(exchangeName, "direct", true);

   String queueName = channel.queueDeclare().getQueue();
   String bindingKey = "hello";
   channel.queueBind(queueName, exchangeName, bindingKey);

   while(true) {
    channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      String routingKey = envelope.getRoutingKey();
      String contentType = properties.getContentType();
      String bodyStr = new String(body, "UTF-8");
      System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);
      long deliveryTag = envelope.getDeliveryTag();
      channel.basicAck(deliveryTag, false);
     }
    });
   }
  }
 }

}

 

RabbitMQ是流行的开源消息队列系统,用erlang语言开发,RabbitMQ是AMQP(高级消息队列协议)的标准实现。采用该技术,我们可以实现异步处理、流量削峰、系统解耦;动力节点RabbitMQ视频教程,课程将讲授RabbitMQ的环境搭建、消息的发送与接收、消息确认、与SpringBoot集成等,让大家快速掌握RabbitMQ技术,以适应项目开发的需要;

 

Rabbitmq消息中间件视频

 

以上就是动力节点Java培训机构的小编针对“Rabbitmq消息中间件视频,入门学习”的内容进行的回答,希望对大家有所帮助,如有疑问,请在线咨询,有专业老师随时为你服务。

 

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>