RabbitMQ编程模型

2023-11-05

"Hello World"

在本教程的这一部分中,我们将用 Java 编写两个程序;发送单个消息的生产者和接收消息并将其打印出来的消费者。我们将忽略 Java API 中的一些细节,专注于这个非常简单的事情,以便开始。这是一个“Hello World”消息传递。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的框是一个队列 - RabbitMQ 代表消费者保留的消息缓冲区。

                                  

发送

                                                    

我们将调用消息发布者(发送者)Send和消息消费者(接收者) Recv。发布者将连接到 RabbitMQ,发送一条消息,然后退出。

在 Send.java中,我们需要导入一些类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置类并命名队列:

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

然后我们可以创建到服务器的连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}

Connection对socket连接进行了抽象,并为我们处理协议版本协商、认证等工作。在这里,我们连接到本地计算机上的 RabbitMQ 节点 - 因此是 localhost。如果我们想连接到另一台机器上的节点,我们只需在此处指定其主机名或 IP 地址即可。

接下来,我们创建一个通道,这是大多数用于完成任务的 API 所在的位置。请注意,我们可以使用 try-with-resources 语句,因为ConnectionChannel都实现了java.lang.AutoCloseable。这样我们就不需要在代码中显式关闭它们。

为了发送,我们必须声明一个队列供我们发送;然后我们可以将消息发布到队列,所有这些都在 try-with-resources 语句中:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

声明队列是幂等的 - 仅当队列尚不存在时才会创建它。消息内容是一个字节数组,因此您可以在那里编码任何您喜欢的内容。

这是整个 Send.java 类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

接收

这就是我们的出版商的工作。我们的消费者监听来自 RabbitMQ 的消息,因此与发布单个消息的发布者不同,我们将保持消费者运行以监听消息并将其打印出来。

                                             

代码(在Recv.java中)与Send具有几乎相同的导入:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

我们将使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。

设置与发布者相同;我们打开一个连接和一个通道,并声明我们要从中消费的队列。请注意,这与发送发布到的队列相匹配。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

请注意,我们也在这里声明了队列。因为我们可能会在发布者之前启动消费者,所以我们希望在尝试使用队列中的消息之前确保队列存在。

为什么我们不使用 try-with-resource 语句来自动关闭通道和连接?通过这样做,我们只需让程序继续运行,关闭所有内容,然后退出!这会很尴尬,因为我们希望进程在消费者异步侦听消息到达时保持活动状态。

我们即将通知服务器将队列中的消息传递给我们。由于它将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。这就是DeliverCallback子类的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

这是整个 Recv.java 类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

Work Queues

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务并必须等待其完成。相反,我们安排稍后完成的任务。我们将 任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工作人员时,任务将在他们之间共享。

这个概念在 Web 应用程序中特别有用,因为在 Web 应用程序中不可能在较短的 HTTP 请求窗口内处理复杂的任务。

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整图像大小或要渲染 pdf 文件,所以让我们通过使用 Thread.sleep() 函数假装我们很忙来伪造。我们将字符串中点数作为其复杂度;每个点将占一秒钟的“工作”。例如,Hello...描述的一个假任务 将需要三秒钟。

我们将稍微修改前面示例中的Send.java代码,以允许从命令行发送任意消息。该程序会将任务调度到我们的工作队列中,因此我们将其命名为 NewTask.java

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

我们旧的Recv.java程序还需要一些更改:它需要为消息正文中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务,所以我们将其称为Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

我们的假任务来模拟执行时间:

private  static  void  doWork (String task)  throws InterruptedException {
     for ( char ch: task.toCharArray()) {
         if (ch == '.' ) Thread.sleep( 1000 ); }
    }
}

默认情况下,RabbitMQ 会将每条消息按顺序发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环法。与三名或更多工人一起尝试此操作。

消息确认

执行一项任务可能需要几秒钟的时间,您可能想知道如果消费者启动一项长任务并在完成之前终止会发生什么。使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为删除。在这种情况下,如果终止一个工作线程,它刚刚处理的消息就会丢失。已发送给该特定工作人员但尚未处理的消息也会丢失。

但我们不想失去任何任务。如果一名工人死亡,我们希望将任务交付给另一名工人。

为了确保消息永远不会丢失,RabbitMQ 支持 消息确认。消费者发回确认消息,告诉 RabbitMQ 已收到并处理特定消息,并且 RabbitMQ 可以自由删除该消息。

如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或 TCP 连接丢失),RabbitMQ 将了解消息未完全处理并将重新排队。如果同时有其他消费者在线,那么它会快速将其重新传递给另一个消费者。这样你就可以确保不会丢失任何消息,即使工人偶尔会死亡。

消费者交付确认时强制执行超时(默认为 30 分钟)。这有助于检测从不确认交付的有问题(卡住)的消费者。您可以按照传送确认超时中所述增加此超时 。

默认情况下,手动消息确认处于打开状态。在前面的示例中,我们通过autoAck=true标志显式关闭它们 。当我们完成任务后,是时候将此标志设置为false并向工作人员发送适当的确认。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。要确保消息不丢失,需要做两件事:我们需要将队列和消息标记为持久的。

首先,我们需要确保队列能够在 RabbitMQ 节点重新启动后继续存在。为此,我们需要将其声明为持久的

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列 ,它是不持久的。RabbitMQ 不允许您使用不同的参数重新定义现有队列,并将向任何尝试执行此操作的程序返回错误。但有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare更改需要应用于生产者和消费者代码。

此时我们就可以确定,即使RabbitMQ重启, task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
关于消息持久化的注意事项

将消息标记为持久并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 已接受消息但尚未保存的时间窗口仍然很短。此外,RabbitMQ 不会对每条消息执行fsync(2) —— 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证并不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 publisher recognizes

公平调度

您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两名工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一名工作人员将一直忙碌,而另一名工作人员几乎不会做任何工作。好吧,RabbitMQ 对此一无所知,并且仍然会均匀地分发消息。

发生这种情况是因为 RabbitMQ 只是在消息进入队列时才调度该消息。它不会查看消费者未确认消息的数量。它只是盲目地将每条第 n 条消息分派给第 n 个消费者。

                          

为了解决这个问题,我们可以使用basicQos方法并 设置prefetchCount = 1。这告诉 RabbitMQ 不要一次给一个工作线程多于一条消息。或者,换句话说,在工作人员处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将其分派给下一个不忙的工作人员。

int prefetchCount = 1;
channel.basicQos(prefetchCount);
关于队列大小的注意事项

如果所有工作人员都很忙,您的队列可能会被填满。您需要密切关注这一点,也许添加更多的工人,或者制定其他策略。

NewTask.java类的最终代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

Publish/Subscribe

我们将向多个消费者传递消息。这种模式称为“发布/订阅”。

为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序将接收并打印它们。

在我们的日志系统中,接收程序的每个正在运行的副本都会收到消息。这样我们就能够运行一个接收器并将日志定向到磁盘;同时我们将能够运行另一个接收器并在屏幕上查看日志。

本质上,发布的日志消息将广播给所有接收者。

交流

在本教程的前面部分中,我们向队列发送消息和从队列接收消息。现在是时候介绍 Rabbit 中完整的消息传递模型了。

让我们快速回顾一下之前教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。

相反,生产者只能将消息发送到交换器。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将消息推送到队列。交换机必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列?是否应该将其附加到许多队列中?或者应该将其丢弃。其规则由 交换类型定义。

                                       

有几种可用的交换类型:directtopicheaders 和fanout。我们将重点关注最后一个——扇出。让我们创建一个这种类型的交换,并将其称为日志

channel.exchangeDeclare("logs", "fanout");

临时排队

您可能还记得之前我们使用具有特定名称的队列(还记得hellotask_queue吗?)。能够命名队列对我们来说至关重要——我们需要将工作人员指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名非常重要。

但我们的记录器并非如此。我们希望了解所有日志消息,而不仅仅是其中的一部分。我们也只对当前流动的消息感兴趣,而不是旧的消息。为了解决这个问题,我们需要两件事。

首先,每当我们连接到 Rabbit 时,我们都需要一个新鲜的空队列。为此,我们可以创建一个具有随机名称的队列,或者更好 - 让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者的连接,队列应该被自动删除。

在 Java 客户端中,当我们不向queueDeclare()提供任何参数时 ,我们会创建一个具有生成名称的非持久、独占、自动删除队列:

String queueName = channel.queueDeclare().getQueue();

绑定

                                   

我们已经创建了扇出交换和队列。现在我们需要告诉交换器将消息发送到我们的队列。交换器和队列之间的关系称为绑定

channel.queueBind(queueName, "logs", "");

从现在开始,日志交换会将消息附加到我们的队列中。

                                      

发出日志消息的生产者程序看起来与之前的教程没有太大不同。最重要的变化是我们现在想要将消息发布到我们的日志交换而不是无名的交换。发送时我们需要提供routingKey ,但对于扇出交换,它的值将被忽略。这是EmitLog.java程序的代码 :

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

如果还没有队列绑定到交换器,消息将会丢失,但这对我们来说没关系;如果还没有消费者在监听,我们可以安全地丢弃该消息。

ReceiveLogs.java的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

Routing

 我们将使其能够仅订阅消息的子集。例如,我们将能够仅将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

  在前面的示例中,我们已经创建了绑定。您可能还记得这样的代码:

Channel.queueBind(queueName, EXCHANGE_NAME, "" );

绑定是交换器和队列之间的关系。这可以简单地理解为:队列对来自此交换的消息感兴趣。

绑定可以采用额外的routingKey参数。为了避免与basic_publish参数混淆,我们将其称为 绑定键。这就是我们如何创建带有键的绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black" );

绑定密钥的含义取决于交换类型。我们之前使用的扇出交换完全忽略了它的价值 

直接兑换

上一篇教程中的日志系统将所有消息广播给所有消费者。我们希望扩展它以允许根据消息的严重性过滤消息。例如,我们可能希望一个将日志消息写入磁盘的程序仅接收关键错误,而不是在警告或信息日志消息上浪费磁盘空间。

我们使用的是扇出交换,这并没有给我们带来太大的灵活性——它只能进行无意识的广播。

我们将改用直接交换。直接交换背后的路由算法很简单 - 消息进入其 绑定键与消息的路由键完全匹配的队列。

为了说明这一点,请考虑以下设置:

                                  

在此设置中,我们可以看到直接交换器X绑定了两个队列。第一个队列使用绑定键Orange进行绑定,第二个队列有两个绑定,一个使用绑定键black,另一个使用green

在这样的设置中,使用路由键橙色发布到交换器的消息 将被路由到队列Q1。路由键为黑色 或绿色的消息将发送到Q2。所有其他消息将被丢弃。

多重绑定

                        

使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用绑定键black在XQ1之间添加绑定。在这种情况下,直接交换的行为将类似于扇出,并将消息广播到所有匹配的队列。带有路由密钥black 的消息将被传递到 Q1Q2

发出日志

我们将在我们的日志系统中使用这个模型。我们将把消息发送到直接交换器,而不是扇出。我们将提供日志严重性作为路由键。这样接收程序将能够选择它想要接收的严重性。让我们首先关注发出日志。

与往常一样,我们需要首先创建一个交换:

channel.exchangeDeclare(EXCHANGE_NAME, "direct" );

我们准备发送一条消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化事情,我们假设“严重性”可以是“信息”、“警告”、“错误”之一。

订阅中

接收消息的工作方式与上一篇教程类似,但有一个例外 - 我们将为我们感兴趣的每个严重性创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

                        

The code for EmitLogDirect.java class:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

The code for ReceiveLogsDirect.java:

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

Topics

话题交流

发送到主题交换的消息不能有任意的 routing_key - 它必须是一个由点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由密钥中可以有任意多个单词,最多 255 个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑 与直接交换类似- 使用特定路由键发送的消息将被传递到与匹配的绑定键绑定的所有队列。然而,绑定键有两种重要的特殊情况:

  • *(星号)可以恰好替代一个单词。
  • #(散列)可以替代零个或多个单词。

通过一个例子来解释这一点是最简单的:

                                      

在此示例中,我们将发送所有描述动物的消息。消息将使用由三个单词(两个点)组成的路由密钥发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:“ <speed>.<colour>.<species> ”。

我们创建了三个绑定:Q1 与绑定键“ *.orange.* ”绑定,Q2 与“ *.*.rabbit ”和“ lazy.# ”绑定。

这些绑定可以概括为:

  • Q1 对所有橙色动物都感兴趣。
  • Q2 想听听关于兔子的一切,以及关于懒惰动物的一切。

路由键设置为“ quick.orange.rabbit ”的消息将被传递到两个队列。消息“ lazy.orange.elephant ”也将发送给他们两人。另一方面,“ quick.orange.fox ”只会进入第一个队列,而“ lazy.brown.fox ”只会进入第二个队列。“ lazy.pink.rabbit ”只会被传递到第二个队列一次,即使它匹配两个绑定。“ quick.brown.fox ”不匹配任何绑定,因此它将被丢弃。

如果我们违反合同并发送包含一到四个单词(例如“ orange ”或“ quick.orange.new.rabbit ” )的消息,会发生什么情况?那么,这些消息不会与任何绑定匹配,并且将会丢失。

另一方面,“ lazy.orange.new.rabbit ”,即使它有四个单词,也会匹配最后一个绑定,并将被传递到第二个队列。

The code for EmitLogTopic.java:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String routingKey = getRouting(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    }
  }
  //..
}

ReceiveLogsTopic.java的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogsTopic {

  private static final String EXCHANGE_NAME = "topic_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
        System.exit(1);
    }

    for (String bindingKey : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    }

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

RPC

Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束。

这里生产者作为客户端来调用,消费者作为服务端接收请求然后响应给生产者。
1、同步调用
1.1、绑定队列

@Configuration public class RPCRabbitConfig {   
@Bean   
public Queue RPCQueue() {      
 return new Queue("RPCQueue", true, false, false);   
} ​   
@Bean   
public DirectExchange RPCExchange() {       
return new DirectExchange("RPCExchange", true, false);  
 } ​  
 @Bean   
public Binding bindingRPC() {      
 return BindingBuilder.bind(RPCQueue()).to(RPCExchange()).with("RPC");   
} 
}

1.2、消费者(服务端)
@Component @RabbitListener(queues = "RPCQueue") @Slf4j public class RPCReceiver {   @RabbitHandler   public String process(String message) {       log.info("接收远程调用请求消息:[{}]", message);       return "remote procedure call success!";   } }

1.3、生产者(客户端)
``` @RestController @Slf4j public class RPCController { @Autowired private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
    // 同步调用设置远程调用响应超时时间,单位:毫秒
    rabbitTemplate.setReplyTimeout(60000);
}
 
@PostMapping("/syncRPC")
public String syncRPC() {
    Object response = rabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC同步调用");
    String respMsg = response.toString();
    log.info("远程调用响应:[{}]", respMsg);
    return respMsg;
}
} ```

可以通过setReplyTimeout(long milliseconds)函数设置超时时间。

1.4、结果
接收远程调用请求消息:[RPC同步调用] 远程调用响应:[remote procedure call success!]

2、异步调用
2.1、配置Bean
/** * 配置AsyncRabbitTemplate SpringBoot 没有默认的AsyncRabbitTemplate注入,所以这里需要自己配置 * * @param rabbitTemplate * @return */ @Bean public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) { return new AsyncRabbitTemplate(rabbitTemplate); }2.2、生产者(客户端)

 @RestController @Slf4j public class RPCController { @Autowired private AsyncRabbitTemplate asyncRabbitTemplate;

@PostMapping("/asyncRPC")
public String asyncRPC() {
    AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("RPCExchange", "RPC", "RPC异步调用");
    future.addCallback(new ListenableFutureCallback<Object>() {
        @Override
        public void onFailure(Throwable throwable) {
            log.error("异步调用失败", throwable);
        }
 
        @Override
        public void onSuccess(Object o) {
            log.info("异步调用响应:[{}}", o.toString());
        }
    });
    return "ok";
}

} 

2.3、结果
SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-nHw71SucAmOUHb6hGVjaZA identity=5fbed23f] started 接收远程调用请求消息:[RPC异步调用] 异步调用响应:[remote procedure call success!}
 

Publisher Confirms 发送者消息确认

RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了 MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是 没有保证的。我们可以回顾下,发送者发送消息的基础API: Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功, 应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者 提供一些确认机制,来保证这个消息发送的过程是成功的
发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动 在channel中进行声明。
channel . confirmSelect ();
在官网的示例中,重点解释了三种策略:

1、发布单条消息

即发布一条消息就确认一条消息。核心代码:
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", queue, null, body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}

channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待 RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务 端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能 再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。
官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然 后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用 时,可以把他当作一个同步工具来看待。
然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异 常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息, 但是尝试重发时一定要注意不要使程序陷入死循环。

2、发送批量消息

之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方 式就是发送一批消息后,再一起确认。
核心代码:
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问 题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认 具体是哪一条消息出了问题。所以接下来就需要增加一个机制能够具体对每一条发 送出错的消息进行处理。

3、异步确认消息

实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。
核心代码就是一个:
channel . addConfirmListener ( ConfirmCallback var1 , ConfirmCallback var2 );
按说监听只要注册一个就可以了,那为什么这里要注册两个呢?如果对照下 RocketMQ的事务消息机制,这就很容易理解了。发送者在发送完消息后,就会执 行第一个监听器callback1,然后等服务端发过来的反馈后,再执行第二个监听器 callback2。
然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法 中的两个参数,
sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在 RabbitMQ中,他的消息体只是一个二进制数组,并不像RocketMQ一样有一个 封装的对象,所以默认消息是没有序列号的。而RabbitMQ提供了一个方法int sequenceNumber = channel.getNextPublishSeqNo());来生成一个全局 递增的序列号。然后应用程序需要自己来将这个序列号与消息对应起来。没错! 是的!需要客户端自己去做对应!
multiple:这个是一个Boolean型的参数。如果是true,就表示这一次只确认了 当前一条消息。如果是false,就表示RabbitMQ这一次确认了一批消息,在 sequenceNumber之前的所有消息都已经确认完成了。
对比下RocketMQ的事务消息机制,有没有觉得很熟悉,但是又很别 扭?当然,考虑到这个对于RabbitMQ来说还是个新鲜玩意,所以有理 由相信这个机制在未来会越来越完善。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ编程模型 的相关文章

  • 使用spring-amqp和rabbitmq实现带退避的非阻塞重试

    我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法 但要求是侦听器不应被阻止 因此可以自由地处理其他消息 我在这里看到了类似的问题 但它不包括 后退 的解决方案 RabbitMQ 和 Sprin
  • 为什么 Celery 工作人员给出“OSError:套接字已关闭”

    我的与rabbitMQ一起工作的celery工作人员在工作几分钟后不断给我一个套接字错误 见下文 我想知道问题的主要原因是什么 我认为这可能是防火墙 但是 禁用防火墙并没有解决问题 我正在 Windows 10 机器上工作 C Users
  • 如何覆盖 MassTransit 默认交换和队列拓扑约定?

    正如 在我关于SO的一个问题中 所指出的 为什么 MassTransit 中的简单配置会创建 2 个队列和 3 个交换机 https stackoverflow com questions 56064182 why a simple con
  • 如何在 celery 内为每个用户生成队列?

    因此 我尝试将 Web 请求中的阻塞内容移至后台任务并利用队列 我对消息传递和发布 订阅也很陌生 用户将数据推送到那里并进行处理 稍后用户会收到相关通知 我为此做了一个 celery 设置 发现它不能满足我为每个用户分配自己的任务的专用队列
  • 在点网核心应用程序中使用 RabbitMQ 跳过 MassTransit 中的队列

    我有三个项目 一个是Dot net core MVC 两个是API项目 MVC 正在调用一个 API 来获取用户详细信息 当询问用户详细信息时 我通过 MassTransit 向队列发送消息 我看到跳过队列 第三个项目中有消费者 即API项
  • 如何禁用 RabbitMQ 默认 tcp 监听端口 - 5672

    我已经配置了RabbitMQrabbitmq config具有新端口号的文件 即带有 SSL 的 5671 现在我想禁用默认端口 即 5672 配置文件如下 rabbit ssl listeners 5671 ssl options cac
  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • rabbitmq 通道因 PRECONDITION_FAILED 关闭 - 快速回复消费者不存在

    当我们从 Spring Boot 服务向rabbitmq 发布消息时 出现以下错误 而且这是间歇性的 我们无法重现这一点 AMQP 连接 123 11 xxx xx 5672 错误 org springframework amqp rabb
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • Amazon EC2 实例上和本地的 RabbitMQ?

    是否可以设置一个RabbitMQ服务器上的Amazon EC2 instance 并将我办公室的机器连接到此RabbitMQ服务器并向其发送 接收消息 我会被收取费用吗Amazon对于流入 流出我的带宽 消息RabbitMQ EC2 ins
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • RabbitMQ 3.1.3 和丢失的时间戳头

    如果消息中缺少时间戳头 是否可以将代理配置为插入时间戳头 因此 如果发布客户端没有添加时间戳标头 代理是否可以插入与交易所收到消息的时刻相匹配的时间戳值 我应该在哪里寻找该配置 或者这是一个坏主意 截至2015年 原来的问题有了新的答案 这
  • 公共交通错误队列正在消耗,但仍然不为空

    我正在使用 Mastransit 3 5 0 和 RabbitMq 如果队列消费者抛出异常 则默认由 MoveExceptionToTransportFilter 处理异常并移至 error 队列 对于 error 队列 我有单独的消费者

随机推荐

  • ListView与适配器

    ListView与适配器 ListView是什么 ListView是一个以垂直方式在项目中显示视图的列表 即在一个窗口里可以滚动查看数据 比如说查看聊天记录 是一种不能实现确定视图中的内容的适配器视图 adapter view 数据和视图的
  • $nextTick实现原理详解

    vue中有一个较为特殊的API nextTick 根据官方文档的解释 它可以在DOM更新完毕之后执行一个回调 用法如下 修改数据 vm msg Hello DOM 还没有更新 Vue nextTick function DOM 更新了 复制
  • 5 个免费开源的 3D 建模/渲染工具。

    5 个开源 3D 建模 渲染工具 3八 2011 作者 riku 本文采用 CC BY NC SA 2 5协议授权 转载请注明 本文链接 5 个免费开源的 3D 建模 渲染工具 1 Art of Illusion 跨平台 支持 Window
  • 15000cd是多少流明_光通量(lm)发光强度(cd)照度单位(lux)之间的关系

    光通量 lm 发光强度 cd 照度单位 lux 之间的关系 光通量 lm 由于人眼对不同波长的电磁波具有不同的灵敏度 我们不能直接用光源的辐 射功率或辐射通量来衡量光能量 必须采用以人眼对光的感觉量为基准的单位 光通量来衡量 光通量的用符号
  • SetUnhandledExceptionFilter处理未捕获异常

    一 首先看下百度上的对此函数的解释 设置异常捕获函数 当异常没有处理的时候 系统就会调用SetUnhandledExceptionFilter所设置异常处理函数 例如一些程序在出错的时候 会向用户报告说程序那出错就是利用这个 例如QQ 二
  • github时好时坏连接不上的问题

    1 找到自己的hosts文件 直接百度 hosts文件地址 一般都是C Windows System32 drivers etc 2 用ip在线查询工具查询github网站的ip地址 3 用记事本打开hosts文件 如图添加内容 我下载有的
  • 【Python】本地版 Whisper 自动转录器(附源码网址)

    目 录 一 实时自动语音转录器简介 二 开源Whisper实时转录器 三 pyinstaller 打包发布exe应用程序 四 修改版源代码 一 实时自动语音转录器简介 实时自动语音转录器是一种能够自动将语音信号转换为文字的应用程序 它通常具
  • 服务器被攻击怎么办?如何防止服务器被攻击?

    目前 服务器遭受攻击已经成为屡见不鲜的事情了 而且大部分企业都发生过服务器被攻击的情况 从而导致业务无法正常运行 造成严重的损失和影响 那么服务器被攻击怎么办 如何有效应对服务器被攻击呢 跟着小编来看看吧 1 换高防IP或切换高防服务器 流
  • 【华为OD机试真题 Java】创建二叉树

    前言 本专栏将持续更新华为OD机试题目 并进行详细的分析与解答 包含完整的代码实现 希望可以帮助到正在努力的你 关于OD机试流程 面经 面试指导等 如有任何疑问 欢迎联系我 wechat steven moda email nansun09
  • Binder机制详解(二)

    系列章节 Binder机制详解 一 Binder机制详解 三 文章目录 前言 一 什么是MMU 二 发展历史 三 相关概念 四 分页机制 1 页表的概念 2 页式内存管理 总结 前言 上一章通过一个例子让我们认识了Binder通信机制不同于
  • HbuilderX微信小程序uniapp分包小白教程&趟坑【伸手党福利】【干货】

    本教程为小白教程 主管操作 具体原理讲解欢迎评论区补充 微信小程序分包原因 1 多人开发 2 引入了大型js 3 单项目多模块需要分包 官方资料 https developers weixin qq com miniprogram dev
  • 扫描指定路径下有多少行代码

    import java io BufferedReader import java io File import java io FileReader import java io IOException Created by qiaoju
  • 使用蓝牙耳机听群晖ds218play中的音乐(audio station)

    缘起 有时需要欣赏nas中的音乐而又不影响家人 有什么方法呢 思路 研究了一下 发现新版的群晖dms支持蓝牙usb蓝牙适配器 可以使用audio station播放 蓝牙耳机收听 步骤 1 购买CSR USB蓝牙适配器 2 插入ds218p
  • 大数据CDC技术

    1 简介 CDC全称是Change Data Capture 是一种捕获增量数据的技术统称 目前主要应用在捕获数据库数据变更的技术 其中数据库变更包括DDL DML DCL等语句触发的变更 在数据备份容灾 数据分发 面向数仓的数据集成等场景
  • JavaScript实现WebService的http的Post请求

    javascript 这个脚本实现Webservice调用 function AjaxFunc var url http localhost MyService Service asmx var method DollarConvertTo
  • 使用Jmeter做压力测试,参数化

    1 首先在工作台下添加一个线程组 测试计划右键 添加 线程 用户 线程组 根据需求填写线程组信息 根据测试数据量填写 线程数也就是并发数 下面的调度时间代表规定的时间内完成并发 2 添加HTTP请求 在线程组下右键 添加 取样器 HTTP请
  • 微信小程序image组件的mode总结+介绍(包含heightFix)

    2 10 3版本后 微信小程序的图片即image组件新增了heightFix属性 mode 总共具有14种属性 满足各种情况的放置需要 14种属性可以分为两大类 一种是完全保留的缩放属性 一种是裁剪属性 原图 缩放属性 scaleToFil
  • 常见的List接口的实现类

    常见的List接口的实现类 ArrayList 数组实现 查询快 增删慢 轻量级 线程不安全 LinkedList 双向链表实现 增删快 查询慢 线程不安全 Vector 数组实现 重量级 线程安全 使用少 ArrayList实现类 pub
  • cesium-添加点线面可以动可编辑

    使用 const drawEntities new CesiumEntityDraw viewer drawEntities startDraw 需要绘制的类型 CesiumEntityDraw ts文件 import Cesium fro
  • RabbitMQ编程模型

    Hello World 在本教程的这一部分中 我们将用 Java 编写两个程序 发送单个消息的生产者和接收消息并将其打印出来的消费者 我们将忽略 Java API 中的一些细节 专注于这个非常简单的事情 以便开始 这是一个 Hello Wo