diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 738775c..f94c526 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -14,7 +14,7 @@ spring: port: 5672 username: chenxinzhi password: yuanteng - virtual-host: vhost1 + virtual-host: vhost listener: simple: prefetch: 1 diff --git a/src/test/java/com/cultural/heritage/rabbit/Consumer.java b/src/test/java/com/cultural/heritage/rabbit/Consumer.java index b607c1e..d3e1536 100644 --- a/src/test/java/com/cultural/heritage/rabbit/Consumer.java +++ b/src/test/java/com/cultural/heritage/rabbit/Consumer.java @@ -1,44 +1,44 @@ -package com.cultural.heritage.rabbit; - - -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -import java.util.Date; - -@Component -public class Consumer { - - @RabbitListener(queues = "myQueue") - public void receive(String message) { - System.out.println("接收时间:" + new Date()); - System.out.println("消费者收到消息:" + message); - } - -// @RabbitListener(queues = "myQueue1") -// public void receive2(String message) { -// System.out.println("消费者接收消息:" + message); -// } - -// @RabbitListener(queues = "myQueue1") -// public void receive1(String productId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long msgId) throws IOException { -// System.out.println(productId + "开始减少库存"); -// if (productId.equals("产品编号3")) { -// int num = 10 / 0; -// } -// // 手动应答 -// long deliveryTag = msgId; -// channel.basicAck(deliveryTag, false); -// System.out.println(productId + "减少库存成功!"); -// } - -// @RabbitListener(queues = "myQueue2") -// public void receive2(String message) { -// System.out.println("消费者2接收消息:" + message); +//package com.cultural.heritage.rabbit; +// +// +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.stereotype.Component; +// +//import java.util.Date; +// +//@Component +//public class Consumer { +// +// @RabbitListener(queues = "myQueue") +// public void receive(String message) { +// System.out.println("接收时间:" + new Date()); +// System.out.println("消费者收到消息:" + message); // } // -// @RabbitListener(queues = "myQueue3") -// public void receive3(String message) { -// System.out.println("消费者3接收消息:" + message); -// } -} +//// @RabbitListener(queues = "myQueue1") +//// public void receive2(String message) { +//// System.out.println("消费者接收消息:" + message); +//// } +// +//// @RabbitListener(queues = "myQueue1") +//// public void receive1(String productId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long msgId) throws IOException { +//// System.out.println(productId + "开始减少库存"); +//// if (productId.equals("产品编号3")) { +//// int num = 10 / 0; +//// } +//// // 手动应答 +//// long deliveryTag = msgId; +//// channel.basicAck(deliveryTag, false); +//// System.out.println(productId + "减少库存成功!"); +//// } +// +//// @RabbitListener(queues = "myQueue2") +//// public void receive2(String message) { +//// System.out.println("消费者2接收消息:" + message); +//// } +//// +//// @RabbitListener(queues = "myQueue3") +//// public void receive3(String message) { +//// System.out.println("消费者3接收消息:" + message); +//// } +//} diff --git a/src/test/java/com/cultural/heritage/rabbit/Producer.java b/src/test/java/com/cultural/heritage/rabbit/Producer.java index 55e14c1..4b48380 100644 --- a/src/test/java/com/cultural/heritage/rabbit/Producer.java +++ b/src/test/java/com/cultural/heritage/rabbit/Producer.java @@ -1,47 +1,47 @@ -package com.cultural.heritage.rabbit; - -import jakarta.annotation.Resource; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessageBuilder; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Component; - -import java.util.Date; - -@Component -public class Producer { - - @Resource - private RabbitTemplate rabbitTemplate; - - private String exchange = "myExchange"; - - - public void send(String routingKey, String message) { - Message msg = MessageBuilder - .withBody(message.getBytes()) - .setHeader("x-delay", 5000) - .build(); - System.out.println("发送时间:" + new Date()); - rabbitTemplate.convertAndSend(exchange, routingKey, msg); - } - - -// public void send(String routingKey, String message, MessageDeliveryMode mode) { +//package com.cultural.heritage.rabbit; +// +//import jakarta.annotation.Resource; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.core.MessageBuilder; +//import org.springframework.amqp.rabbit.core.RabbitTemplate; +//import org.springframework.stereotype.Component; +// +//import java.util.Date; +// +//@Component +//public class Producer { +// +// @Resource +// private RabbitTemplate rabbitTemplate; +// +// private String exchange = "myExchange"; +// +// +// public void send(String routingKey, String message) { // Message msg = MessageBuilder -// .withBody(message.getBytes()) -// .setDeliveryMode(mode) // 指定消息是否持久化 -// .build(); +// .withBody(message.getBytes()) +// .setHeader("x-delay", 5000) +// .build(); +// System.out.println("发送时间:" + new Date()); // rabbitTemplate.convertAndSend(exchange, routingKey, msg); // } - -// public void send(String message, String routingKey) { // -// CorrelationData correlationData = new CorrelationData(); -// correlationData.setId("101"); //UUID.randomUUID().toString() -// rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData); // -// //rabbitTemplate.convertAndSend(exchangeName, routingKey, message); -// } - -} +//// public void send(String routingKey, String message, MessageDeliveryMode mode) { +//// Message msg = MessageBuilder +//// .withBody(message.getBytes()) +//// .setDeliveryMode(mode) // 指定消息是否持久化 +//// .build(); +//// rabbitTemplate.convertAndSend(exchange, routingKey, msg); +//// } +// +//// public void send(String message, String routingKey) { +//// +//// CorrelationData correlationData = new CorrelationData(); +//// correlationData.setId("101"); //UUID.randomUUID().toString() +//// rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData); +//// +//// //rabbitTemplate.convertAndSend(exchangeName, routingKey, message); +//// } +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java b/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java index cf1d53c..31c648e 100644 --- a/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java +++ b/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java @@ -1,27 +1,27 @@ -package com.cultural.heritage.rabbit; - -import jakarta.annotation.Resource; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -@SpringBootTest -@RunWith(SpringRunner.class) -public class TestRabbit { - - @Resource - private Producer producer; - - @Test - public void test() { - String routingKey = "myKey"; - String message = "Hello, RabbitMQ"; - producer.send(routingKey, message); - try { - Thread.sleep(30 * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} +//package com.cultural.heritage.rabbit; +// +//import jakarta.annotation.Resource; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.springframework.boot.test.context.SpringBootTest; +//import org.springframework.test.context.junit4.SpringRunner; +// +//@SpringBootTest +//@RunWith(SpringRunner.class) +//public class TestRabbit { +// +// @Resource +// private Producer producer; +// +// @Test +// public void test() { +// String routingKey = "myKey"; +// String message = "Hello, RabbitMQ"; +// producer.send(routingKey, message); +// try { +// Thread.sleep(30 * 1000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java b/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java index eedcefa..6037960 100644 --- a/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java +++ b/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java @@ -1,94 +1,94 @@ -package com.cultural.heritage.rabbitmq; - -import com.rabbitmq.client.*; -import org.junit.Test; -import org.springframework.web.bind.annotation.RestController; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -@RestController -public class AppTest { - - // 服务器IP - private String host = "123.249.108.160"; - - // RabbitMQ端口 - private int port = 5672; - - // 用户名 - private String username = "chenxinzhi"; - - // 密码 - private String password = "yuanteng"; - - // 虚拟机名字 - private String vhost = "vhost1"; - - // 交换机名字 - private String exchangeName = "myExchange1"; - - // 队列名字 - private String queueName = "myQueue1"; - - // 路由Key - private String routingKey = "myRouting1"; - - - @Test - public void testProducer() throws IOException, TimeoutException { - - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - Connection connection = factory.newConnection(); - // 信道 - Channel channel = connection.createChannel(); - // 创建交换机:直连交换机 - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, - true, false, null); - // 创建队列 - channel.queueDeclare(queueName, true, false, false, null); - // 交换机绑定 - channel.queueBind(queueName, exchangeName, routingKey); - // 发送消息 - String msg = "Hello, RabbitMQ"; - channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); - // 关闭 - channel.close(); - connection.close(); - - } - - - @Test - public void testConsumer() throws IOException, TimeoutException { - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - Connection connection = factory.newConnection(); - // 信道 - Channel channel = connection.createChannel(); - // 获取消息 - DeliverCallback deliverCallback = (consumerTag, message) -> { - String msg = new String(message.getBody()); - System.out.println(msg); - }; - CancelCallback cancelCallback = consumerTag -> { - - }; - channel.basicConsume(queueName, true, deliverCallback, cancelCallback); - } - - -} +//package com.cultural.heritage.rabbitmq; +// +//import com.rabbitmq.client.*; +//import org.junit.Test; +//import org.springframework.web.bind.annotation.RestController; +// +//import java.io.IOException; +//import java.util.concurrent.TimeoutException; +// +//@RestController +//public class AppTest { +// +// // 服务器IP +// private String host = "123.249.108.160"; +// +// // RabbitMQ端口 +// private int port = 5672; +// +// // 用户名 +// private String username = "chenxinzhi"; +// +// // 密码 +// private String password = "yuanteng"; +// +// // 虚拟机名字 +// private String vhost = "vhost1"; +// +// // 交换机名字 +// private String exchangeName = "myExchange1"; +// +// // 队列名字 +// private String queueName = "myQueue1"; +// +// // 路由Key +// private String routingKey = "myRouting1"; +// +// +// @Test +// public void testProducer() throws IOException, TimeoutException { +// +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// Connection connection = factory.newConnection(); +// // 信道 +// Channel channel = connection.createChannel(); +// // 创建交换机:直连交换机 +// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, +// true, false, null); +// // 创建队列 +// channel.queueDeclare(queueName, true, false, false, null); +// // 交换机绑定 +// channel.queueBind(queueName, exchangeName, routingKey); +// // 发送消息 +// String msg = "Hello, RabbitMQ"; +// channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); +// // 关闭 +// channel.close(); +// connection.close(); +// +// } +// +// +// @Test +// public void testConsumer() throws IOException, TimeoutException { +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// Connection connection = factory.newConnection(); +// // 信道 +// Channel channel = connection.createChannel(); +// // 获取消息 +// DeliverCallback deliverCallback = (consumerTag, message) -> { +// String msg = new String(message.getBody()); +// System.out.println(msg); +// }; +// CancelCallback cancelCallback = consumerTag -> { +// +// }; +// channel.basicConsume(queueName, true, deliverCallback, cancelCallback); +// } +// +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java b/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java index 333ff13..f0c6523 100644 --- a/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java @@ -1,119 +1,119 @@ -package com.cultural.heritage.rabbitmq; - -import com.rabbitmq.client.*; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -@SpringBootTest -@RunWith(SpringRunner.class) -public class TestDirect { - - // 服务器IP - private String host = "123.249.108.160"; - - // RabbitMQ端口 - private int port = 5672; - - // 用户名 - private String username = "chenxinzhi"; - - // 密码 - private String password = "yuanteng"; - - // 虚拟机名字 - private String vhost = "vhost1"; - - // 交换机名字 - private String exchangeName = "myExchange1"; - - // 队列名字 - private String queueName1 = "myQueue1"; - - private String queueName2 = "myQueue2"; - - private String queueName3 = "myQueue3"; - - // 路由Key - private String routingKey1 = "myRouting1"; - - private String routingKey2 = "myRouting2"; - - // 连接对象 - private Connection connection; - - // 信道对象 - private Channel channel; - - - public void setUp() throws IOException, TimeoutException { - - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - connection = factory.newConnection(); - // 信道 - channel = connection.createChannel(); - } - - - @Test - public void testProducer() throws IOException, TimeoutException { - // 创建交换机:直连交换机 - setUp(); - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, - true, false, null); - // 创建队列 - channel.queueDeclare(queueName1, true, false, false, null); - channel.queueDeclare(queueName2, true, false, false, null); - channel.queueDeclare(queueName3, true, false, false, null); - // 交换机绑定 - channel.queueBind(queueName1, exchangeName, routingKey1); - channel.queueBind(queueName2, exchangeName, routingKey2); - channel.queueBind(queueName3, exchangeName, routingKey1); - // 发送消息 - String msg = "Hello, RabbitMQ"; - channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); - String msg2 = "RabbitMQ, Hello"; - channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); - // 关闭 - channel.close(); - connection.close(); - - } - - - @Test - public void testConsumer() throws IOException, TimeoutException { - // 获取消息 - setUp(); - DeliverCallback deliverCallback = (consumerTag, message) -> { - String msg = new String(message.getBody()); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - }; - CancelCallback cancelCallback = consumerTag -> { - - }; - channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); - } - - -} +//package com.cultural.heritage.rabbitmq; +// +//import com.rabbitmq.client.*; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.springframework.boot.test.context.SpringBootTest; +//import org.springframework.test.context.junit4.SpringRunner; +// +//import java.io.IOException; +//import java.util.concurrent.TimeoutException; +// +//@SpringBootTest +//@RunWith(SpringRunner.class) +//public class TestDirect { +// +// // 服务器IP +// private String host = "123.249.108.160"; +// +// // RabbitMQ端口 +// private int port = 5672; +// +// // 用户名 +// private String username = "chenxinzhi"; +// +// // 密码 +// private String password = "yuanteng"; +// +// // 虚拟机名字 +// private String vhost = "vhost1"; +// +// // 交换机名字 +// private String exchangeName = "myExchange1"; +// +// // 队列名字 +// private String queueName1 = "myQueue1"; +// +// private String queueName2 = "myQueue2"; +// +// private String queueName3 = "myQueue3"; +// +// // 路由Key +// private String routingKey1 = "myRouting1"; +// +// private String routingKey2 = "myRouting2"; +// +// // 连接对象 +// private Connection connection; +// +// // 信道对象 +// private Channel channel; +// +// +// public void setUp() throws IOException, TimeoutException { +// +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// connection = factory.newConnection(); +// // 信道 +// channel = connection.createChannel(); +// } +// +// +// @Test +// public void testProducer() throws IOException, TimeoutException { +// // 创建交换机:直连交换机 +// setUp(); +// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, +// true, false, null); +// // 创建队列 +// channel.queueDeclare(queueName1, true, false, false, null); +// channel.queueDeclare(queueName2, true, false, false, null); +// channel.queueDeclare(queueName3, true, false, false, null); +// // 交换机绑定 +// channel.queueBind(queueName1, exchangeName, routingKey1); +// channel.queueBind(queueName2, exchangeName, routingKey2); +// channel.queueBind(queueName3, exchangeName, routingKey1); +// // 发送消息 +// String msg = "Hello, RabbitMQ"; +// channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); +// String msg2 = "RabbitMQ, Hello"; +// channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); +// // 关闭 +// channel.close(); +// connection.close(); +// +// } +// +// +// @Test +// public void testConsumer() throws IOException, TimeoutException { +// // 获取消息 +// setUp(); +// DeliverCallback deliverCallback = (consumerTag, message) -> { +// String msg = new String(message.getBody()); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// }; +// CancelCallback cancelCallback = consumerTag -> { +// +// }; +// channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); +// } +// +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java b/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java index 5990b86..deb6487 100644 --- a/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java @@ -1,107 +1,107 @@ -package com.cultural.heritage.rabbitmq; - -import com.rabbitmq.client.*; -import org.junit.Test; -import org.springframework.boot.test.context.SpringBootTest; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -@SpringBootTest -public class TestFanout { - - // 服务器IP - private String host = "123.249.108.160"; - - // RabbitMQ端口 - private int port = 5672; - - // 用户名 - private String username = "chenxinzhi"; - - // 密码 - private String password = "yuanteng"; - - // 虚拟机名字 - private String vhost = "vhost1"; - - // 交换机名字 - private String exchangeName = "myExchange2"; - - // 队列名字 - private String queueName1 = "myQueue1"; - - private String queueName2 = "myQueue2"; - - private String queueName3 = "myQueue3"; - - // 路由Key - private String routingKey1 = "myRouting1"; - - private String routingKey2 = "myRouting2"; - - // 连接对象 - private Connection connection; - - // 信道对象 - private Channel channel; - - - public void setUp() throws IOException, TimeoutException { - - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - connection = factory.newConnection(); - // 信道 - channel = connection.createChannel(); - } - - - @Test - public void testProducer() throws IOException, TimeoutException { - setUp(); - // 创建交换机:直连交换机 - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, - true, false, null); - // 创建队列 - channel.queueDeclare(queueName1, true, false, false, null); - channel.queueDeclare(queueName2, true, false, false, null); - channel.queueDeclare(queueName3, true, false, false, null); - // 交换机绑定 - channel.queueBind(queueName1, exchangeName, ""); - channel.queueBind(queueName2, exchangeName, ""); - channel.queueBind(queueName3, exchangeName, ""); - // 发送消息 - String msg = "Hello, RabbitMQ"; - channel.basicPublish(exchangeName, "", null, msg.getBytes()); - // 关闭 - channel.close(); - connection.close(); - - } - - - @Test - public void testConsumer() throws IOException, TimeoutException { - setUp(); - // 获取消息 - DeliverCallback deliverCallback = (consumerTag, message) -> { - String msg = new String(message.getBody()); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - }; - CancelCallback cancelCallback = consumerTag -> { - - }; - channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); - } - - -} +//package com.cultural.heritage.rabbitmq; +// +//import com.rabbitmq.client.*; +//import org.junit.Test; +//import org.springframework.boot.test.context.SpringBootTest; +// +//import java.io.IOException; +//import java.util.concurrent.TimeoutException; +// +//@SpringBootTest +//public class TestFanout { +// +// // 服务器IP +// private String host = "123.249.108.160"; +// +// // RabbitMQ端口 +// private int port = 5672; +// +// // 用户名 +// private String username = "chenxinzhi"; +// +// // 密码 +// private String password = "yuanteng"; +// +// // 虚拟机名字 +// private String vhost = "vhost1"; +// +// // 交换机名字 +// private String exchangeName = "myExchange2"; +// +// // 队列名字 +// private String queueName1 = "myQueue1"; +// +// private String queueName2 = "myQueue2"; +// +// private String queueName3 = "myQueue3"; +// +// // 路由Key +// private String routingKey1 = "myRouting1"; +// +// private String routingKey2 = "myRouting2"; +// +// // 连接对象 +// private Connection connection; +// +// // 信道对象 +// private Channel channel; +// +// +// public void setUp() throws IOException, TimeoutException { +// +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// connection = factory.newConnection(); +// // 信道 +// channel = connection.createChannel(); +// } +// +// +// @Test +// public void testProducer() throws IOException, TimeoutException { +// setUp(); +// // 创建交换机:直连交换机 +// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, +// true, false, null); +// // 创建队列 +// channel.queueDeclare(queueName1, true, false, false, null); +// channel.queueDeclare(queueName2, true, false, false, null); +// channel.queueDeclare(queueName3, true, false, false, null); +// // 交换机绑定 +// channel.queueBind(queueName1, exchangeName, ""); +// channel.queueBind(queueName2, exchangeName, ""); +// channel.queueBind(queueName3, exchangeName, ""); +// // 发送消息 +// String msg = "Hello, RabbitMQ"; +// channel.basicPublish(exchangeName, "", null, msg.getBytes()); +// // 关闭 +// channel.close(); +// connection.close(); +// +// } +// +// +// @Test +// public void testConsumer() throws IOException, TimeoutException { +// setUp(); +// // 获取消息 +// DeliverCallback deliverCallback = (consumerTag, message) -> { +// String msg = new String(message.getBody()); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// }; +// CancelCallback cancelCallback = consumerTag -> { +// +// }; +// channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); +// } +// +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java b/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java index 81109b8..89da5b7 100644 --- a/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java @@ -1,118 +1,118 @@ -package com.cultural.heritage.rabbitmq; - -import com.rabbitmq.client.*; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeoutException; - -@SpringBootTest -@RunWith(SpringRunner.class) -public class TestHeader { - - // 服务器IP - private String host = "123.249.108.160"; - - // RabbitMQ端口 - private int port = 5672; - - // 用户名 - private String username = "chenxinzhi"; - - // 密码 - private String password = "yuanteng"; - - // 虚拟机名字 - private String vhost = "vhost1"; - - // 交换机名字 - private String exchangeName = "myExchange4"; - - // 队列名字 - private String queueName1 = "myQueue1"; - - - // 连接对象 - private Connection connection; - - // 信道对象 - private Channel channel; - - - public void setUp() throws IOException, TimeoutException { - - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - connection = factory.newConnection(); - // 信道 - channel = connection.createChannel(); - } - - - @Test - public void testProducer() throws IOException, TimeoutException { - setUp(); - // 创建交换机:直连交换机 - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, - true, false, null); - // 创建队列 - channel.queueDeclare(queueName1, true, false, false, null); - - // 发送消息 - String msg = "Hello, RabbitMQ"; - Map map = new HashMap(); - map.put("one", "one2"); - map.put("two", "two"); - AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(map).build(); - channel.basicPublish(exchangeName, "", props, msg.getBytes()); - // 关闭 - channel.close(); - connection.close(); - - } - - - @Test - public void testConsumer() throws IOException, TimeoutException { - setUp(); - // 获取消息 - DeliverCallback deliverCallback = (consumerTag, message) -> { - String msg = new String(message.getBody()); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - }; - CancelCallback cancelCallback = consumerTag -> { - - }; - // 绑定队列和交换机 - Map map = new HashMap(); - map.put("x-match", "any"); - map.put("one", "one"); - map.put("two", "two"); - channel.queueBind(queueName1, exchangeName, "", map); - // 接收消息 - channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); - } - - - @Test - public void test() throws IOException, TimeoutException, InterruptedException { - testProducer(); - Thread.sleep(5000); - testConsumer(); - } - - -} +//package com.cultural.heritage.rabbitmq; +// +//import com.rabbitmq.client.*; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.springframework.boot.test.context.SpringBootTest; +//import org.springframework.test.context.junit4.SpringRunner; +// +//import java.io.IOException; +//import java.util.HashMap; +//import java.util.Map; +//import java.util.concurrent.TimeoutException; +// +//@SpringBootTest +//@RunWith(SpringRunner.class) +//public class TestHeader { +// +// // 服务器IP +// private String host = "123.249.108.160"; +// +// // RabbitMQ端口 +// private int port = 5672; +// +// // 用户名 +// private String username = "chenxinzhi"; +// +// // 密码 +// private String password = "yuanteng"; +// +// // 虚拟机名字 +// private String vhost = "vhost1"; +// +// // 交换机名字 +// private String exchangeName = "myExchange4"; +// +// // 队列名字 +// private String queueName1 = "myQueue1"; +// +// +// // 连接对象 +// private Connection connection; +// +// // 信道对象 +// private Channel channel; +// +// +// public void setUp() throws IOException, TimeoutException { +// +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// connection = factory.newConnection(); +// // 信道 +// channel = connection.createChannel(); +// } +// +// +// @Test +// public void testProducer() throws IOException, TimeoutException { +// setUp(); +// // 创建交换机:直连交换机 +// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, +// true, false, null); +// // 创建队列 +// channel.queueDeclare(queueName1, true, false, false, null); +// +// // 发送消息 +// String msg = "Hello, RabbitMQ"; +// Map map = new HashMap(); +// map.put("one", "one2"); +// map.put("two", "two"); +// AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(map).build(); +// channel.basicPublish(exchangeName, "", props, msg.getBytes()); +// // 关闭 +// channel.close(); +// connection.close(); +// +// } +// +// +// @Test +// public void testConsumer() throws IOException, TimeoutException { +// setUp(); +// // 获取消息 +// DeliverCallback deliverCallback = (consumerTag, message) -> { +// String msg = new String(message.getBody()); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// }; +// CancelCallback cancelCallback = consumerTag -> { +// +// }; +// // 绑定队列和交换机 +// Map map = new HashMap(); +// map.put("x-match", "any"); +// map.put("one", "one"); +// map.put("two", "two"); +// channel.queueBind(queueName1, exchangeName, "", map); +// // 接收消息 +// channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); +// } +// +// +// @Test +// public void test() throws IOException, TimeoutException, InterruptedException { +// testProducer(); +// Thread.sleep(5000); +// testConsumer(); +// } +// +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java b/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java index 21216a7..8d8a838 100644 --- a/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java @@ -1,117 +1,117 @@ -package com.cultural.heritage.rabbitmq; - -import com.rabbitmq.client.*; -import org.junit.Test; -import org.springframework.boot.test.context.SpringBootTest; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -@SpringBootTest -public class TestTopic { - - // 服务器IP - private String host = "123.249.108.160"; - - // RabbitMQ端口 - private int port = 5672; - - // 用户名 - private String username = "chenxinzhi"; - - // 密码 - private String password = "yuanteng"; - - // 虚拟机名字 - private String vhost = "vhost1"; - - // 交换机名字 - private String exchangeName = "myExchange3"; - - // 队列名字 - private String queueName1 = "myQueue1"; - - private String queueName2 = "myQueue2"; - - private String queueName3 = "myQueue3"; - - // 路由Key - private String routingKey1 = "myRouting.*"; - - private String routingKey2 = "myRouting.b"; - - private String routingKey3 = "myRouting.#"; - - // 连接对象 - private Connection connection; - - // 信道对象 - private Channel channel; - - - public void setUp() throws IOException, TimeoutException { - - // 创建连接工厂、连接、信道 - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); - factory.setPort(port); - factory.setUsername(username); - factory.setPassword(password); - factory.setVirtualHost(vhost); - // 连接 - connection = factory.newConnection(); - // 信道 - channel = connection.createChannel(); - } - - - @Test - public void testProducer() throws IOException, TimeoutException { - setUp(); - // 创建交换机:直连交换机 - channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, - true, false, null); - // 创建队列 - channel.queueDeclare(queueName1, true, false, false, null); - channel.queueDeclare(queueName2, true, false, false, null); - channel.queueDeclare(queueName3, true, false, false, null); - // 交换机绑定 - channel.queueBind(queueName1, exchangeName, routingKey1); - channel.queueBind(queueName2, exchangeName, routingKey2); - channel.queueBind(queueName3, exchangeName, routingKey3); - // 发送消息 - String msg = "Hello, RabbitMQ"; - channel.basicPublish(exchangeName, "myRouting.a.b", null, msg.getBytes()); - // 关闭 - channel.close(); - connection.close(); - - } - - - @Test - public void testConsumer() throws IOException, TimeoutException { - setUp(); - // 获取消息 - DeliverCallback deliverCallback = (consumerTag, message) -> { - String msg = new String(message.getBody()); - System.out.println(msg); - System.out.println(msg); - System.out.println(msg); - }; - CancelCallback cancelCallback = consumerTag -> { - - }; - channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); - } - - - @Test - public void test() throws IOException, TimeoutException, InterruptedException { - testProducer(); - Thread.sleep(5000); - testConsumer(); - } - - -} +//package com.cultural.heritage.rabbitmq; +// +//import com.rabbitmq.client.*; +//import org.junit.Test; +//import org.springframework.boot.test.context.SpringBootTest; +// +//import java.io.IOException; +//import java.util.concurrent.TimeoutException; +// +//@SpringBootTest +//public class TestTopic { +// +// // 服务器IP +// private String host = "123.249.108.160"; +// +// // RabbitMQ端口 +// private int port = 5672; +// +// // 用户名 +// private String username = "chenxinzhi"; +// +// // 密码 +// private String password = "yuanteng"; +// +// // 虚拟机名字 +// private String vhost = "vhost1"; +// +// // 交换机名字 +// private String exchangeName = "myExchange3"; +// +// // 队列名字 +// private String queueName1 = "myQueue1"; +// +// private String queueName2 = "myQueue2"; +// +// private String queueName3 = "myQueue3"; +// +// // 路由Key +// private String routingKey1 = "myRouting.*"; +// +// private String routingKey2 = "myRouting.b"; +// +// private String routingKey3 = "myRouting.#"; +// +// // 连接对象 +// private Connection connection; +// +// // 信道对象 +// private Channel channel; +// +// +// public void setUp() throws IOException, TimeoutException { +// +// // 创建连接工厂、连接、信道 +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(host); +// factory.setPort(port); +// factory.setUsername(username); +// factory.setPassword(password); +// factory.setVirtualHost(vhost); +// // 连接 +// connection = factory.newConnection(); +// // 信道 +// channel = connection.createChannel(); +// } +// +// +// @Test +// public void testProducer() throws IOException, TimeoutException { +// setUp(); +// // 创建交换机:直连交换机 +// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, +// true, false, null); +// // 创建队列 +// channel.queueDeclare(queueName1, true, false, false, null); +// channel.queueDeclare(queueName2, true, false, false, null); +// channel.queueDeclare(queueName3, true, false, false, null); +// // 交换机绑定 +// channel.queueBind(queueName1, exchangeName, routingKey1); +// channel.queueBind(queueName2, exchangeName, routingKey2); +// channel.queueBind(queueName3, exchangeName, routingKey3); +// // 发送消息 +// String msg = "Hello, RabbitMQ"; +// channel.basicPublish(exchangeName, "myRouting.a.b", null, msg.getBytes()); +// // 关闭 +// channel.close(); +// connection.close(); +// +// } +// +// +// @Test +// public void testConsumer() throws IOException, TimeoutException { +// setUp(); +// // 获取消息 +// DeliverCallback deliverCallback = (consumerTag, message) -> { +// String msg = new String(message.getBody()); +// System.out.println(msg); +// System.out.println(msg); +// System.out.println(msg); +// }; +// CancelCallback cancelCallback = consumerTag -> { +// +// }; +// channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); +// } +// +// +// @Test +// public void test() throws IOException, TimeoutException, InterruptedException { +// testProducer(); +// Thread.sleep(5000); +// testConsumer(); +// } +// +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java b/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java index 2dd6760..8ced7dc 100644 --- a/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java +++ b/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java @@ -1,21 +1,21 @@ -package com.cultural.heritage.rabbitmqtest; - -import org.springframework.amqp.rabbit.annotation.Exchange; -import org.springframework.amqp.rabbit.annotation.Queue; -import org.springframework.amqp.rabbit.annotation.QueueBinding; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.stereotype.Component; - -@Component -public class ConsumerTest { - - @RabbitListener(bindings = @QueueBinding( - value = @Queue("myQueue"), - exchange = @Exchange(name = "myExchange", delayed = "true"), - key = "myKey" - )) - public void listenDelayMessage(String msg) { - System.out.println("接收延时消息:" + msg); - } - -} +//package com.cultural.heritage.rabbitmqtest; +// +//import org.springframework.amqp.rabbit.annotation.Exchange; +//import org.springframework.amqp.rabbit.annotation.Queue; +//import org.springframework.amqp.rabbit.annotation.QueueBinding; +//import org.springframework.amqp.rabbit.annotation.RabbitListener; +//import org.springframework.stereotype.Component; +// +//@Component +//public class ConsumerTest { +// +// @RabbitListener(bindings = @QueueBinding( +// value = @Queue("myQueue"), +// exchange = @Exchange(name = "myExchange", delayed = "true"), +// key = "myKey" +// )) +// public void listenDelayMessage(String msg) { +// System.out.println("接收延时消息:" + msg); +// } +// +//} diff --git a/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java b/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java index c5ff622..af0c47a 100644 --- a/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java +++ b/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java @@ -1,44 +1,44 @@ -package com.cultural.heritage.rabbitmqtest; - - -import jakarta.annotation.Resource; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.amqp.AmqpException; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessagePostProcessor; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; - -@SpringBootTest -@RunWith(SpringRunner.class) -public class RabbitMQTest { - - @Resource - private RabbitTemplate rabbitTemplate; - - - @Test - public void testDelayMessage() { - String exchange = "myExchange"; - String message = "Hello, delay message!"; - - rabbitTemplate.convertAndSend(exchange, "myKey", message, new MessagePostProcessor() { - @Override - public Message postProcessMessage(Message message) throws AmqpException { - // 添加延迟消息属性 - message.getMessageProperties().setDelay(5000); - return message; - } - }); - try { - Thread.sleep(30 * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - - -} +//package com.cultural.heritage.rabbitmqtest; +// +// +//import jakarta.annotation.Resource; +//import org.junit.Test; +//import org.junit.runner.RunWith; +//import org.springframework.amqp.AmqpException; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.core.MessagePostProcessor; +//import org.springframework.amqp.rabbit.core.RabbitTemplate; +//import org.springframework.boot.test.context.SpringBootTest; +//import org.springframework.test.context.junit4.SpringRunner; +// +//@SpringBootTest +//@RunWith(SpringRunner.class) +//public class RabbitMQTest { +// +// @Resource +// private RabbitTemplate rabbitTemplate; +// +// +// @Test +// public void testDelayMessage() { +// String exchange = "myExchange"; +// String message = "Hello, delay message!"; +// +// rabbitTemplate.convertAndSend(exchange, "myKey", message, new MessagePostProcessor() { +// @Override +// public Message postProcessMessage(Message message) throws AmqpException { +// // 添加延迟消息属性 +// message.getMessageProperties().setDelay(5000); +// return message; +// } +// }); +// try { +// Thread.sleep(30 * 1000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } +// +// +// +//}