更新了写真预约特殊产品处理

This commit is contained in:
chen-xin-zhi 2025-03-05 19:01:56 +08:00
parent 626da3f5b7
commit 1a9e565cd7
11 changed files with 732 additions and 732 deletions

View File

@ -14,7 +14,7 @@ spring:
port: 5672
username: chenxinzhi
password: yuanteng
virtual-host: vhost1
virtual-host: vhost
listener:
simple:
prefetch: 1

View File

@ -1,44 +1,44 @@
package com.cultural.heritage.rabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Consumer {
@RabbitListener(queues = "myQueue")
public void receive(String message) {
System.out.println("接收时间:" + new Date());
System.out.println("消费者收到消息:" + message);
}
// @RabbitListener(queues = "myQueue1")
// public void receive2(String message) {
// System.out.println("消费者接收消息:" + message);
// }
// @RabbitListener(queues = "myQueue1")
// public void receive1(String productId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long msgId) throws IOException {
// System.out.println(productId + "开始减少库存");
// if (productId.equals("产品编号3")) {
// int num = 10 / 0;
// }
// // 手动应答
// long deliveryTag = msgId;
// channel.basicAck(deliveryTag, false);
// System.out.println(productId + "减少库存成功!");
// }
// @RabbitListener(queues = "myQueue2")
// public void receive2(String message) {
// System.out.println("消费者2接收消息" + message);
//package com.cultural.heritage.rabbit;
//
//
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.stereotype.Component;
//
//import java.util.Date;
//
//@Component
//public class Consumer {
//
// @RabbitListener(queues = "myQueue")
// public void receive(String message) {
// System.out.println("接收时间:" + new Date());
// System.out.println("消费者收到消息:" + message);
// }
//
// @RabbitListener(queues = "myQueue3")
// public void receive3(String message) {
// System.out.println("消费者3接收消息" + message);
// }
}
//// @RabbitListener(queues = "myQueue1")
//// public void receive2(String message) {
//// System.out.println("消费者接收消息:" + message);
//// }
//
//// @RabbitListener(queues = "myQueue1")
//// public void receive1(String productId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long msgId) throws IOException {
//// System.out.println(productId + "开始减少库存");
//// if (productId.equals("产品编号3")) {
//// int num = 10 / 0;
//// }
//// // 手动应答
//// long deliveryTag = msgId;
//// channel.basicAck(deliveryTag, false);
//// System.out.println(productId + "减少库存成功!");
//// }
//
//// @RabbitListener(queues = "myQueue2")
//// public void receive2(String message) {
//// System.out.println("消费者2接收消息" + message);
//// }
////
//// @RabbitListener(queues = "myQueue3")
//// public void receive3(String message) {
//// System.out.println("消费者3接收消息" + message);
//// }
//}

View File

@ -1,47 +1,47 @@
package com.cultural.heritage.rabbit;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Producer {
@Resource
private RabbitTemplate rabbitTemplate;
private String exchange = "myExchange";
public void send(String routingKey, String message) {
Message msg = MessageBuilder
.withBody(message.getBytes())
.setHeader("x-delay", 5000)
.build();
System.out.println("发送时间:" + new Date());
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
}
// public void send(String routingKey, String message, MessageDeliveryMode mode) {
//package com.cultural.heritage.rabbit;
//
//import jakarta.annotation.Resource;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessageBuilder;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
//import org.springframework.stereotype.Component;
//
//import java.util.Date;
//
//@Component
//public class Producer {
//
// @Resource
// private RabbitTemplate rabbitTemplate;
//
// private String exchange = "myExchange";
//
//
// public void send(String routingKey, String message) {
// Message msg = MessageBuilder
// .withBody(message.getBytes())
// .setDeliveryMode(mode) // 指定消息是否持久化
// .build();
// .withBody(message.getBytes())
// .setHeader("x-delay", 5000)
// .build();
// System.out.println("发送时间:" + new Date());
// rabbitTemplate.convertAndSend(exchange, routingKey, msg);
// }
// public void send(String message, String routingKey) {
//
// CorrelationData correlationData = new CorrelationData();
// correlationData.setId("101"); //UUID.randomUUID().toString()
// rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
//
// //rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
// }
}
//// public void send(String routingKey, String message, MessageDeliveryMode mode) {
//// Message msg = MessageBuilder
//// .withBody(message.getBytes())
//// .setDeliveryMode(mode) // 指定消息是否持久化
//// .build();
//// rabbitTemplate.convertAndSend(exchange, routingKey, msg);
//// }
//
//// public void send(String message, String routingKey) {
////
//// CorrelationData correlationData = new CorrelationData();
//// correlationData.setId("101"); //UUID.randomUUID().toString()
//// rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
////
//// //rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
//// }
//
//}

View File

@ -1,27 +1,27 @@
package com.cultural.heritage.rabbit;
import jakarta.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestRabbit {
@Resource
private Producer producer;
@Test
public void test() {
String routingKey = "myKey";
String message = "Hello, RabbitMQ";
producer.send(routingKey, message);
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//package com.cultural.heritage.rabbit;
//
//import jakarta.annotation.Resource;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.test.context.junit4.SpringRunner;
//
//@SpringBootTest
//@RunWith(SpringRunner.class)
//public class TestRabbit {
//
// @Resource
// private Producer producer;
//
// @Test
// public void test() {
// String routingKey = "myKey";
// String message = "Hello, RabbitMQ";
// producer.send(routingKey, message);
// try {
// Thread.sleep(30 * 1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
//}

View File

@ -1,94 +1,94 @@
package com.cultural.heritage.rabbitmq;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@RestController
public class AppTest {
// 服务器IP
private String host = "123.249.108.160";
// RabbitMQ端口
private int port = 5672;
// 用户名
private String username = "chenxinzhi";
// 密码
private String password = "yuanteng";
// 虚拟机名字
private String vhost = "vhost1";
// 交换机名字
private String exchangeName = "myExchange1";
// 队列名字
private String queueName = "myQueue1";
// 路由Key
private String routingKey = "myRouting1";
@Test
public void testProducer() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
Connection connection = factory.newConnection();
// 信道
Channel channel = connection.createChannel();
// 创建交换机:直连交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,
true, false, null);
// 创建队列
channel.queueDeclare(queueName, true, false, false, null);
// 交换机绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 发送消息
String msg = "Hello, RabbitMQ";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 关闭
channel.close();
connection.close();
}
@Test
public void testConsumer() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
Connection connection = factory.newConnection();
// 信道
Channel channel = connection.createChannel();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
//package com.cultural.heritage.rabbitmq;
//
//import com.rabbitmq.client.*;
//import org.junit.Test;
//import org.springframework.web.bind.annotation.RestController;
//
//import java.io.IOException;
//import java.util.concurrent.TimeoutException;
//
//@RestController
//public class AppTest {
//
// // 服务器IP
// private String host = "123.249.108.160";
//
// // RabbitMQ端口
// private int port = 5672;
//
// // 用户名
// private String username = "chenxinzhi";
//
// // 密码
// private String password = "yuanteng";
//
// // 虚拟机名字
// private String vhost = "vhost1";
//
// // 交换机名字
// private String exchangeName = "myExchange1";
//
// // 队列名字
// private String queueName = "myQueue1";
//
// // 路由Key
// private String routingKey = "myRouting1";
//
//
// @Test
// public void testProducer() throws IOException, TimeoutException {
//
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// Connection connection = factory.newConnection();
// // 信道
// Channel channel = connection.createChannel();
// // 创建交换机:直连交换机
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,
// true, false, null);
// // 创建队列
// channel.queueDeclare(queueName, true, false, false, null);
// // 交换机绑定
// channel.queueBind(queueName, exchangeName, routingKey);
// // 发送消息
// String msg = "Hello, RabbitMQ";
// channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// // 关闭
// channel.close();
// connection.close();
//
// }
//
//
// @Test
// public void testConsumer() throws IOException, TimeoutException {
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// Connection connection = factory.newConnection();
// // 信道
// Channel channel = connection.createChannel();
// // 获取消息
// DeliverCallback deliverCallback = (consumerTag, message) -> {
// String msg = new String(message.getBody());
// System.out.println(msg);
// };
// CancelCallback cancelCallback = consumerTag -> {
//
// };
// channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
// }
//
//
//}

View File

@ -1,119 +1,119 @@
package com.cultural.heritage.rabbitmq;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestDirect {
// 服务器IP
private String host = "123.249.108.160";
// RabbitMQ端口
private int port = 5672;
// 用户名
private String username = "chenxinzhi";
// 密码
private String password = "yuanteng";
// 虚拟机名字
private String vhost = "vhost1";
// 交换机名字
private String exchangeName = "myExchange1";
// 队列名字
private String queueName1 = "myQueue1";
private String queueName2 = "myQueue2";
private String queueName3 = "myQueue3";
// 路由Key
private String routingKey1 = "myRouting1";
private String routingKey2 = "myRouting2";
// 连接对象
private Connection connection;
// 信道对象
private Channel channel;
public void setUp() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
connection = factory.newConnection();
// 信道
channel = connection.createChannel();
}
@Test
public void testProducer() throws IOException, TimeoutException {
// 创建交换机:直连交换机
setUp();
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,
true, false, null);
// 创建队列
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueDeclare(queueName3, true, false, false, null);
// 交换机绑定
channel.queueBind(queueName1, exchangeName, routingKey1);
channel.queueBind(queueName2, exchangeName, routingKey2);
channel.queueBind(queueName3, exchangeName, routingKey1);
// 发送消息
String msg = "Hello, RabbitMQ";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
String msg2 = "RabbitMQ, Hello";
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
// 关闭
channel.close();
connection.close();
}
@Test
public void testConsumer() throws IOException, TimeoutException {
// 获取消息
setUp();
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName3, true, deliverCallback, cancelCallback);
}
}
//package com.cultural.heritage.rabbitmq;
//
//import com.rabbitmq.client.*;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.test.context.junit4.SpringRunner;
//
//import java.io.IOException;
//import java.util.concurrent.TimeoutException;
//
//@SpringBootTest
//@RunWith(SpringRunner.class)
//public class TestDirect {
//
// // 服务器IP
// private String host = "123.249.108.160";
//
// // RabbitMQ端口
// private int port = 5672;
//
// // 用户名
// private String username = "chenxinzhi";
//
// // 密码
// private String password = "yuanteng";
//
// // 虚拟机名字
// private String vhost = "vhost1";
//
// // 交换机名字
// private String exchangeName = "myExchange1";
//
// // 队列名字
// private String queueName1 = "myQueue1";
//
// private String queueName2 = "myQueue2";
//
// private String queueName3 = "myQueue3";
//
// // 路由Key
// private String routingKey1 = "myRouting1";
//
// private String routingKey2 = "myRouting2";
//
// // 连接对象
// private Connection connection;
//
// // 信道对象
// private Channel channel;
//
//
// public void setUp() throws IOException, TimeoutException {
//
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// connection = factory.newConnection();
// // 信道
// channel = connection.createChannel();
// }
//
//
// @Test
// public void testProducer() throws IOException, TimeoutException {
// // 创建交换机:直连交换机
// setUp();
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,
// true, false, null);
// // 创建队列
// channel.queueDeclare(queueName1, true, false, false, null);
// channel.queueDeclare(queueName2, true, false, false, null);
// channel.queueDeclare(queueName3, true, false, false, null);
// // 交换机绑定
// channel.queueBind(queueName1, exchangeName, routingKey1);
// channel.queueBind(queueName2, exchangeName, routingKey2);
// channel.queueBind(queueName3, exchangeName, routingKey1);
// // 发送消息
// String msg = "Hello, RabbitMQ";
// channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
// String msg2 = "RabbitMQ, Hello";
// channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
// // 关闭
// channel.close();
// connection.close();
//
// }
//
//
// @Test
// public void testConsumer() throws IOException, TimeoutException {
// // 获取消息
// setUp();
// DeliverCallback deliverCallback = (consumerTag, message) -> {
// String msg = new String(message.getBody());
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// };
// CancelCallback cancelCallback = consumerTag -> {
//
// };
// channel.basicConsume(queueName3, true, deliverCallback, cancelCallback);
// }
//
//
//}

View File

@ -1,107 +1,107 @@
package com.cultural.heritage.rabbitmq;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class TestFanout {
// 服务器IP
private String host = "123.249.108.160";
// RabbitMQ端口
private int port = 5672;
// 用户名
private String username = "chenxinzhi";
// 密码
private String password = "yuanteng";
// 虚拟机名字
private String vhost = "vhost1";
// 交换机名字
private String exchangeName = "myExchange2";
// 队列名字
private String queueName1 = "myQueue1";
private String queueName2 = "myQueue2";
private String queueName3 = "myQueue3";
// 路由Key
private String routingKey1 = "myRouting1";
private String routingKey2 = "myRouting2";
// 连接对象
private Connection connection;
// 信道对象
private Channel channel;
public void setUp() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
connection = factory.newConnection();
// 信道
channel = connection.createChannel();
}
@Test
public void testProducer() throws IOException, TimeoutException {
setUp();
// 创建交换机:直连交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,
true, false, null);
// 创建队列
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueDeclare(queueName3, true, false, false, null);
// 交换机绑定
channel.queueBind(queueName1, exchangeName, "");
channel.queueBind(queueName2, exchangeName, "");
channel.queueBind(queueName3, exchangeName, "");
// 发送消息
String msg = "Hello, RabbitMQ";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
// 关闭
channel.close();
connection.close();
}
@Test
public void testConsumer() throws IOException, TimeoutException {
setUp();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName1, true, deliverCallback, cancelCallback);
}
}
//package com.cultural.heritage.rabbitmq;
//
//import com.rabbitmq.client.*;
//import org.junit.Test;
//import org.springframework.boot.test.context.SpringBootTest;
//
//import java.io.IOException;
//import java.util.concurrent.TimeoutException;
//
//@SpringBootTest
//public class TestFanout {
//
// // 服务器IP
// private String host = "123.249.108.160";
//
// // RabbitMQ端口
// private int port = 5672;
//
// // 用户名
// private String username = "chenxinzhi";
//
// // 密码
// private String password = "yuanteng";
//
// // 虚拟机名字
// private String vhost = "vhost1";
//
// // 交换机名字
// private String exchangeName = "myExchange2";
//
// // 队列名字
// private String queueName1 = "myQueue1";
//
// private String queueName2 = "myQueue2";
//
// private String queueName3 = "myQueue3";
//
// // 路由Key
// private String routingKey1 = "myRouting1";
//
// private String routingKey2 = "myRouting2";
//
// // 连接对象
// private Connection connection;
//
// // 信道对象
// private Channel channel;
//
//
// public void setUp() throws IOException, TimeoutException {
//
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// connection = factory.newConnection();
// // 信道
// channel = connection.createChannel();
// }
//
//
// @Test
// public void testProducer() throws IOException, TimeoutException {
// setUp();
// // 创建交换机:直连交换机
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,
// true, false, null);
// // 创建队列
// channel.queueDeclare(queueName1, true, false, false, null);
// channel.queueDeclare(queueName2, true, false, false, null);
// channel.queueDeclare(queueName3, true, false, false, null);
// // 交换机绑定
// channel.queueBind(queueName1, exchangeName, "");
// channel.queueBind(queueName2, exchangeName, "");
// channel.queueBind(queueName3, exchangeName, "");
// // 发送消息
// String msg = "Hello, RabbitMQ";
// channel.basicPublish(exchangeName, "", null, msg.getBytes());
// // 关闭
// channel.close();
// connection.close();
//
// }
//
//
// @Test
// public void testConsumer() throws IOException, TimeoutException {
// setUp();
// // 获取消息
// DeliverCallback deliverCallback = (consumerTag, message) -> {
// String msg = new String(message.getBody());
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// };
// CancelCallback cancelCallback = consumerTag -> {
//
// };
// channel.basicConsume(queueName1, true, deliverCallback, cancelCallback);
// }
//
//
//}

View File

@ -1,118 +1,118 @@
package com.cultural.heritage.rabbitmq;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestHeader {
// 服务器IP
private String host = "123.249.108.160";
// RabbitMQ端口
private int port = 5672;
// 用户名
private String username = "chenxinzhi";
// 密码
private String password = "yuanteng";
// 虚拟机名字
private String vhost = "vhost1";
// 交换机名字
private String exchangeName = "myExchange4";
// 队列名字
private String queueName1 = "myQueue1";
// 连接对象
private Connection connection;
// 信道对象
private Channel channel;
public void setUp() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
connection = factory.newConnection();
// 信道
channel = connection.createChannel();
}
@Test
public void testProducer() throws IOException, TimeoutException {
setUp();
// 创建交换机:直连交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS,
true, false, null);
// 创建队列
channel.queueDeclare(queueName1, true, false, false, null);
// 发送消息
String msg = "Hello, RabbitMQ";
Map map = new HashMap();
map.put("one", "one2");
map.put("two", "two");
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(map).build();
channel.basicPublish(exchangeName, "", props, msg.getBytes());
// 关闭
channel.close();
connection.close();
}
@Test
public void testConsumer() throws IOException, TimeoutException {
setUp();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
// 绑定队列和交换机
Map map = new HashMap();
map.put("x-match", "any");
map.put("one", "one");
map.put("two", "two");
channel.queueBind(queueName1, exchangeName, "", map);
// 接收消息
channel.basicConsume(queueName1, true, deliverCallback, cancelCallback);
}
@Test
public void test() throws IOException, TimeoutException, InterruptedException {
testProducer();
Thread.sleep(5000);
testConsumer();
}
}
//package com.cultural.heritage.rabbitmq;
//
//import com.rabbitmq.client.*;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.test.context.junit4.SpringRunner;
//
//import java.io.IOException;
//import java.util.HashMap;
//import java.util.Map;
//import java.util.concurrent.TimeoutException;
//
//@SpringBootTest
//@RunWith(SpringRunner.class)
//public class TestHeader {
//
// // 服务器IP
// private String host = "123.249.108.160";
//
// // RabbitMQ端口
// private int port = 5672;
//
// // 用户名
// private String username = "chenxinzhi";
//
// // 密码
// private String password = "yuanteng";
//
// // 虚拟机名字
// private String vhost = "vhost1";
//
// // 交换机名字
// private String exchangeName = "myExchange4";
//
// // 队列名字
// private String queueName1 = "myQueue1";
//
//
// // 连接对象
// private Connection connection;
//
// // 信道对象
// private Channel channel;
//
//
// public void setUp() throws IOException, TimeoutException {
//
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// connection = factory.newConnection();
// // 信道
// channel = connection.createChannel();
// }
//
//
// @Test
// public void testProducer() throws IOException, TimeoutException {
// setUp();
// // 创建交换机:直连交换机
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS,
// true, false, null);
// // 创建队列
// channel.queueDeclare(queueName1, true, false, false, null);
//
// // 发送消息
// String msg = "Hello, RabbitMQ";
// Map map = new HashMap();
// map.put("one", "one2");
// map.put("two", "two");
// AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(map).build();
// channel.basicPublish(exchangeName, "", props, msg.getBytes());
// // 关闭
// channel.close();
// connection.close();
//
// }
//
//
// @Test
// public void testConsumer() throws IOException, TimeoutException {
// setUp();
// // 获取消息
// DeliverCallback deliverCallback = (consumerTag, message) -> {
// String msg = new String(message.getBody());
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// };
// CancelCallback cancelCallback = consumerTag -> {
//
// };
// // 绑定队列和交换机
// Map map = new HashMap();
// map.put("x-match", "any");
// map.put("one", "one");
// map.put("two", "two");
// channel.queueBind(queueName1, exchangeName, "", map);
// // 接收消息
// channel.basicConsume(queueName1, true, deliverCallback, cancelCallback);
// }
//
//
// @Test
// public void test() throws IOException, TimeoutException, InterruptedException {
// testProducer();
// Thread.sleep(5000);
// testConsumer();
// }
//
//
//}

View File

@ -1,117 +1,117 @@
package com.cultural.heritage.rabbitmq;
import com.rabbitmq.client.*;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SpringBootTest
public class TestTopic {
// 服务器IP
private String host = "123.249.108.160";
// RabbitMQ端口
private int port = 5672;
// 用户名
private String username = "chenxinzhi";
// 密码
private String password = "yuanteng";
// 虚拟机名字
private String vhost = "vhost1";
// 交换机名字
private String exchangeName = "myExchange3";
// 队列名字
private String queueName1 = "myQueue1";
private String queueName2 = "myQueue2";
private String queueName3 = "myQueue3";
// 路由Key
private String routingKey1 = "myRouting.*";
private String routingKey2 = "myRouting.b";
private String routingKey3 = "myRouting.#";
// 连接对象
private Connection connection;
// 信道对象
private Channel channel;
public void setUp() throws IOException, TimeoutException {
// 创建连接工厂连接信道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
// 连接
connection = factory.newConnection();
// 信道
channel = connection.createChannel();
}
@Test
public void testProducer() throws IOException, TimeoutException {
setUp();
// 创建交换机:直连交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,
true, false, null);
// 创建队列
channel.queueDeclare(queueName1, true, false, false, null);
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueDeclare(queueName3, true, false, false, null);
// 交换机绑定
channel.queueBind(queueName1, exchangeName, routingKey1);
channel.queueBind(queueName2, exchangeName, routingKey2);
channel.queueBind(queueName3, exchangeName, routingKey3);
// 发送消息
String msg = "Hello, RabbitMQ";
channel.basicPublish(exchangeName, "myRouting.a.b", null, msg.getBytes());
// 关闭
channel.close();
connection.close();
}
@Test
public void testConsumer() throws IOException, TimeoutException {
setUp();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
System.out.println(msg);
System.out.println(msg);
};
CancelCallback cancelCallback = consumerTag -> {
};
channel.basicConsume(queueName3, true, deliverCallback, cancelCallback);
}
@Test
public void test() throws IOException, TimeoutException, InterruptedException {
testProducer();
Thread.sleep(5000);
testConsumer();
}
}
//package com.cultural.heritage.rabbitmq;
//
//import com.rabbitmq.client.*;
//import org.junit.Test;
//import org.springframework.boot.test.context.SpringBootTest;
//
//import java.io.IOException;
//import java.util.concurrent.TimeoutException;
//
//@SpringBootTest
//public class TestTopic {
//
// // 服务器IP
// private String host = "123.249.108.160";
//
// // RabbitMQ端口
// private int port = 5672;
//
// // 用户名
// private String username = "chenxinzhi";
//
// // 密码
// private String password = "yuanteng";
//
// // 虚拟机名字
// private String vhost = "vhost1";
//
// // 交换机名字
// private String exchangeName = "myExchange3";
//
// // 队列名字
// private String queueName1 = "myQueue1";
//
// private String queueName2 = "myQueue2";
//
// private String queueName3 = "myQueue3";
//
// // 路由Key
// private String routingKey1 = "myRouting.*";
//
// private String routingKey2 = "myRouting.b";
//
// private String routingKey3 = "myRouting.#";
//
// // 连接对象
// private Connection connection;
//
// // 信道对象
// private Channel channel;
//
//
// public void setUp() throws IOException, TimeoutException {
//
// // 创建连接工厂连接信道
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost(host);
// factory.setPort(port);
// factory.setUsername(username);
// factory.setPassword(password);
// factory.setVirtualHost(vhost);
// // 连接
// connection = factory.newConnection();
// // 信道
// channel = connection.createChannel();
// }
//
//
// @Test
// public void testProducer() throws IOException, TimeoutException {
// setUp();
// // 创建交换机:直连交换机
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,
// true, false, null);
// // 创建队列
// channel.queueDeclare(queueName1, true, false, false, null);
// channel.queueDeclare(queueName2, true, false, false, null);
// channel.queueDeclare(queueName3, true, false, false, null);
// // 交换机绑定
// channel.queueBind(queueName1, exchangeName, routingKey1);
// channel.queueBind(queueName2, exchangeName, routingKey2);
// channel.queueBind(queueName3, exchangeName, routingKey3);
// // 发送消息
// String msg = "Hello, RabbitMQ";
// channel.basicPublish(exchangeName, "myRouting.a.b", null, msg.getBytes());
// // 关闭
// channel.close();
// connection.close();
//
// }
//
//
// @Test
// public void testConsumer() throws IOException, TimeoutException {
// setUp();
// // 获取消息
// DeliverCallback deliverCallback = (consumerTag, message) -> {
// String msg = new String(message.getBody());
// System.out.println(msg);
// System.out.println(msg);
// System.out.println(msg);
// };
// CancelCallback cancelCallback = consumerTag -> {
//
// };
// channel.basicConsume(queueName3, true, deliverCallback, cancelCallback);
// }
//
//
// @Test
// public void test() throws IOException, TimeoutException, InterruptedException {
// testProducer();
// Thread.sleep(5000);
// testConsumer();
// }
//
//
//}

View File

@ -1,21 +1,21 @@
package com.cultural.heritage.rabbitmqtest;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerTest {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("myQueue"),
exchange = @Exchange(name = "myExchange", delayed = "true"),
key = "myKey"
))
public void listenDelayMessage(String msg) {
System.out.println("接收延时消息:" + msg);
}
}
//package com.cultural.heritage.rabbitmqtest;
//
//import org.springframework.amqp.rabbit.annotation.Exchange;
//import org.springframework.amqp.rabbit.annotation.Queue;
//import org.springframework.amqp.rabbit.annotation.QueueBinding;
//import org.springframework.amqp.rabbit.annotation.RabbitListener;
//import org.springframework.stereotype.Component;
//
//@Component
//public class ConsumerTest {
//
// @RabbitListener(bindings = @QueueBinding(
// value = @Queue("myQueue"),
// exchange = @Exchange(name = "myExchange", delayed = "true"),
// key = "myKey"
// ))
// public void listenDelayMessage(String msg) {
// System.out.println("接收延时消息:" + msg);
// }
//
//}

View File

@ -1,44 +1,44 @@
package com.cultural.heritage.rabbitmqtest;
import jakarta.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMQTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testDelayMessage() {
String exchange = "myExchange";
String message = "Hello, delay message!";
rabbitTemplate.convertAndSend(exchange, "myKey", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//package com.cultural.heritage.rabbitmqtest;
//
//
//import jakarta.annotation.Resource;
//import org.junit.Test;
//import org.junit.runner.RunWith;
//import org.springframework.amqp.AmqpException;
//import org.springframework.amqp.core.Message;
//import org.springframework.amqp.core.MessagePostProcessor;
//import org.springframework.amqp.rabbit.core.RabbitTemplate;
//import org.springframework.boot.test.context.SpringBootTest;
//import org.springframework.test.context.junit4.SpringRunner;
//
//@SpringBootTest
//@RunWith(SpringRunner.class)
//public class RabbitMQTest {
//
// @Resource
// private RabbitTemplate rabbitTemplate;
//
//
// @Test
// public void testDelayMessage() {
// String exchange = "myExchange";
// String message = "Hello, delay message!";
//
// rabbitTemplate.convertAndSend(exchange, "myKey", message, new MessagePostProcessor() {
// @Override
// public Message postProcessMessage(Message message) throws AmqpException {
// // 添加延迟消息属性
// message.getMessageProperties().setDelay(5000);
// return message;
// }
// });
// try {
// Thread.sleep(30 * 1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
//
//
//
//}