diff --git a/pom.xml b/pom.xml index 8c93ac0..5d26086 100644 --- a/pom.xml +++ b/pom.xml @@ -185,6 +185,11 @@ gson 2.8.8 + + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/src/main/java/com/cultural/heritage/config/CorsConfig.java b/src/main/java/com/cultural/heritage/config/CorsConfig.java index 1722a06..d457140 100644 --- a/src/main/java/com/cultural/heritage/config/CorsConfig.java +++ b/src/main/java/com/cultural/heritage/config/CorsConfig.java @@ -7,8 +7,6 @@ import org.springframework.core.Ordered; import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import org.springframework.web.filter.CorsFilter; -import org.springframework.web.servlet.config.annotation.CorsRegistry; -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; /** * 跨域配置 diff --git a/src/main/java/com/cultural/heritage/config/MyBatisPlusConfig.java b/src/main/java/com/cultural/heritage/config/MyBatisPlusConfig.java index 83df403..11a6254 100644 --- a/src/main/java/com/cultural/heritage/config/MyBatisPlusConfig.java +++ b/src/main/java/com/cultural/heritage/config/MyBatisPlusConfig.java @@ -3,7 +3,6 @@ package com.cultural.heritage.config; import com.baomidou.mybatisplus.annotation.DbType; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; -import org.mybatis.spring.annotation.MapperScan; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; diff --git a/src/main/java/com/cultural/heritage/config/RabbitMQConfig.java b/src/main/java/com/cultural/heritage/config/RabbitMQConfig.java new file mode 100644 index 0000000..58752a4 --- /dev/null +++ b/src/main/java/com/cultural/heritage/config/RabbitMQConfig.java @@ -0,0 +1,25 @@ +package com.cultural.heritage.config; + +import org.springframework.amqp.support.converter.DefaultClassMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitMQConfig { + + @Bean + public MessageConverter jsonToMapMessageConverter() { + DefaultClassMapper defaultClassMapper = new DefaultClassMapper(); + defaultClassMapper.setTrustedPackages("com.cultural.heritage.utils.MultiDelayMessage"); // trusted packages + Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); + jackson2JsonMessageConverter.setClassMapper(defaultClassMapper); + return jackson2JsonMessageConverter; + } + + @Bean + public MessageConverter messageConverter(){ + return new Jackson2JsonMessageConverter(); + } +} diff --git a/src/main/java/com/cultural/heritage/constant/MqConstant.java b/src/main/java/com/cultural/heritage/constant/MqConstant.java new file mode 100644 index 0000000..54371f2 --- /dev/null +++ b/src/main/java/com/cultural/heritage/constant/MqConstant.java @@ -0,0 +1,10 @@ +package com.cultural.heritage.constant; + +public interface MqConstant { + + String DELAY_EXCHANGE = "delay.topic"; + + String DELAY_ORDER_QUEUE = "order.delay.queue"; + + String DELAY_ORDER_ROUTING_KEY = "order.key"; +} diff --git a/src/main/java/com/cultural/heritage/controller/good/FestivalController.java b/src/main/java/com/cultural/heritage/controller/good/FestivalController.java new file mode 100644 index 0000000..1cc9c9d --- /dev/null +++ b/src/main/java/com/cultural/heritage/controller/good/FestivalController.java @@ -0,0 +1,97 @@ +package com.cultural.heritage.controller.good; + + +import com.cultural.heritage.annotation.AuthCheck; +import com.cultural.heritage.common.BaseResponse; +import com.cultural.heritage.common.ErrorCode; +import com.cultural.heritage.common.ResultUtils; +import com.cultural.heritage.constant.UserConstant; +import com.cultural.heritage.exception.BusinessException; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * 节日管理接口 + */ +@RestController +@RequestMapping("/festival") +@Slf4j +@Tag(name = "节日管理模块") +public class FestivalController { + + + @Resource + private RedisTemplate redisTemplate; + + // 节日集合的键 + private static final String FESTIVAL_KEY = "festivalList"; + + + /** + * Web端管理员添加节日项 + * @param value 添加的节日项 + */ + @GetMapping("/add") + @Operation(summary = "Web端管理员添加节日项", description = "参数:节日项名称,权限:管理员(admin, boss),方法名:addFestivalElement") + @AuthCheck(mustRole = UserConstant.ADMIN_ROLE) + public BaseResponse addFestivalElement(@RequestParam String value) { + if (StringUtils.isBlank(value)) { + throw new BusinessException(ErrorCode.PARAMS_ERROR); + } + if (isElementExistInList(value)) { + throw new BusinessException(ErrorCode.OPERATION_ERROR, "该节日项已存在"); + } + redisTemplate.opsForList().rightPush(FESTIVAL_KEY, value); + return ResultUtils.success(true); + } + + + /** + * Web端管理员查询节日集合 + * @return 节日列表 + */ + @GetMapping("/get") + @Operation(summary = "Web端管理员查询节日集合", description = "参数:无,权限:管理员(admin, boss),方法名:getFestivalList") + @AuthCheck(mustRole = UserConstant.ADMIN_ROLE) + public BaseResponse> getFestivalList() { + List festivalList = redisTemplate.opsForList().range(FESTIVAL_KEY, 0, -1); + return ResultUtils.success(festivalList); + } + + + /** + * Web端管理员删除节日项 + * @param value 删除的节日项 + * @return + */ + @GetMapping("/delete") + @Operation(summary = "Web端管理员删除节日项", description = "参数:无,权限:管理员(admin, boss),方法名:deleteFestivalElement") + @AuthCheck(mustRole = UserConstant.ADMIN_ROLE) + public BaseResponse deleteFestivalElement(@RequestParam String value) { + if (!isElementExistInList(value)) { + throw new BusinessException(ErrorCode.OPERATION_ERROR, "该节日项不存在"); + } + redisTemplate.opsForList().remove(FESTIVAL_KEY, 1, value); + return ResultUtils.success(true); + } + + + + // 检查元素是否存在 + private boolean isElementExistInList(@RequestParam String value) { + List festivalList = redisTemplate.opsForList().range(FESTIVAL_KEY, 0, -1); + return festivalList != null && festivalList.contains(value); + } + + +} diff --git a/src/main/java/com/cultural/heritage/controller/order/OrderController.java b/src/main/java/com/cultural/heritage/controller/order/OrderController.java index dab5b67..a443467 100644 --- a/src/main/java/com/cultural/heritage/controller/order/OrderController.java +++ b/src/main/java/com/cultural/heritage/controller/order/OrderController.java @@ -7,6 +7,7 @@ import com.cultural.heritage.annotation.AuthCheck; import com.cultural.heritage.common.BaseResponse; import com.cultural.heritage.common.ErrorCode; import com.cultural.heritage.common.ResultUtils; +import com.cultural.heritage.constant.MqConstant; import com.cultural.heritage.constant.OrderStatusConstant; import com.cultural.heritage.constant.UserConstant; import com.cultural.heritage.exception.BusinessException; @@ -26,11 +27,13 @@ import com.cultural.heritage.service.good.GoodService; import com.cultural.heritage.service.order.OrderItemService; import com.cultural.heritage.service.order.OrderService; import com.cultural.heritage.service.user.UserService; +import com.cultural.heritage.utils.MultiDelayMessage; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.annotation.Resource; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeanUtils; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.PostMapping; @@ -39,7 +42,10 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.math.BigDecimal; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; @RestController @RequestMapping("/order") @@ -74,6 +80,11 @@ public class OrderController { + @Resource + private RabbitTemplate rabbitTemplate; + + + /** * 用户通过购物车创建订单 */ @@ -174,6 +185,18 @@ public class OrderController { orderMainInfoAddRequest.setTotalAmount(totalAmount); // 创建通用订单(常规类和服务类商品) Long orderId = orderService.createCommonOrder(orderMainInfoAddRequest, userId, false, null); + + // 延迟检查订单状态信息 + MultiDelayMessage msg = new MultiDelayMessage<>(orderId, + 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L, 60000L, 60000L, 120000L, 300000L, 600000L, 600000L); + // 10000L, 10000L, 10000L, 15000L, 15000L, 30000L, 30000L, 60000L, 60000L, 120000L, 300000L, 600000L, 600000L + int delayValue = msg.removeNextDelay().intValue(); + rabbitTemplate.convertAndSend(MqConstant.DELAY_EXCHANGE, + MqConstant.DELAY_ORDER_ROUTING_KEY, msg, message -> { + // 添加延迟消息属性 + message.getMessageProperties().setDelay(delayValue); + return message; + }); return ResultUtils.success(orderId); } diff --git a/src/main/java/com/cultural/heritage/controller/wx/WeChatLogisticsController.java b/src/main/java/com/cultural/heritage/controller/wx/WeChatLogisticsController.java index 9fe6f18..aaf0a04 100644 --- a/src/main/java/com/cultural/heritage/controller/wx/WeChatLogisticsController.java +++ b/src/main/java/com/cultural/heritage/controller/wx/WeChatLogisticsController.java @@ -84,10 +84,10 @@ public class WeChatLogisticsController { public BaseResponse getAccessToken(HttpServletRequest request) { userService.getLoginUser(request); String accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); - if (accessToken == null) { - weChatLogisticsService.addAccessToken(); - accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); - } +// if (accessToken == null) { +// weChatLogisticsService.addAccessToken(); +// accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); +// } WxAccessToken wxAccessToken = WxAccessToken.builder() .access_token(accessToken) .expires_in("7200").build(); @@ -107,10 +107,10 @@ public class WeChatLogisticsController { User loginUser = userService.getLoginUser(request); String miniOpenId = loginUser.getMiniOpenId(); String accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); - if (accessToken == null) { - weChatLogisticsService.addAccessToken(); - accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); - } +// if (accessToken == null) { +// weChatLogisticsService.addAccessToken(); +// accessToken = (String) redisTemplate.opsForValue().get(ACCESS_TOKEN_KEY); +// } Long id = commonRequest.getId(); Order order = orderService.getById(id); ThrowUtils.throwIf(order == null, ErrorCode.SYSTEM_ERROR, "订单不存在"); diff --git a/src/main/java/com/cultural/heritage/listener/OrderStatusListener.java b/src/main/java/com/cultural/heritage/listener/OrderStatusListener.java new file mode 100644 index 0000000..ad72f1e --- /dev/null +++ b/src/main/java/com/cultural/heritage/listener/OrderStatusListener.java @@ -0,0 +1,68 @@ +package com.cultural.heritage.listener; + +import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; +import com.cultural.heritage.common.ErrorCode; +import com.cultural.heritage.constant.MqConstant; +import com.cultural.heritage.constant.OrderStatusConstant; +import com.cultural.heritage.exception.ThrowUtils; +import com.cultural.heritage.model.entity.Order; +import com.cultural.heritage.service.order.OrderService; +import com.cultural.heritage.utils.MultiDelayMessage; +import jakarta.annotation.Resource; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +@Component +public class OrderStatusListener { + + + @Resource + private OrderService orderService; + + + @Resource + private RabbitTemplate rabbitTemplate; + + + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(MqConstant.DELAY_ORDER_QUEUE), + exchange = @Exchange(name = MqConstant.DELAY_EXCHANGE, delayed = "true"), + key = MqConstant.DELAY_ORDER_ROUTING_KEY + )) + public void listenDelayMessage(MultiDelayMessage msg) { + System.out.println("\n\n\n\n\nOrderStatusListener.listenerDelayMessage msg-------------------------------->:" + msg); + //1.获取消息中的订单id + Long orderId = msg.getData(); + //2.查询订单,判断状态是否为待支付 + Order order = orderService.getById(orderId); + // 订单不存在或者订单已经支付 + if (order == null || !order.getOrderStatus().equals(OrderStatusConstant.PENDING_PAYMENT)) { + return ; + } + //3.订单未支付,判断是否还有剩余延时时间 + if (msg.hasNextDelay()) { + // 有延迟时间,需要重发延迟消息,先获取延迟时间的int值 + // 发送延时消息 + int delayValue = msg.removeNextDelay().intValue(); + rabbitTemplate.convertAndSend(MqConstant.DELAY_EXCHANGE, + MqConstant.DELAY_ORDER_ROUTING_KEY, msg, message -> { + // 添加延迟消息属性 + message.getMessageProperties().setDelay(delayValue); + return message; + }); + return ; + } + // 没有剩余延时时间,说明订单超时未支付,需取消订单 + UpdateWrapper updateWrapper = new UpdateWrapper<>(); + updateWrapper.eq("id", orderId).set("orderStatus", OrderStatusConstant.TRANSACTION_CLOSED); + boolean update = orderService.update(updateWrapper); + ThrowUtils.throwIf(!update, ErrorCode.SYSTEM_ERROR, "订单状态更新失败"); + } + + +} diff --git a/src/main/java/com/cultural/heritage/test/TestRedis.java b/src/main/java/com/cultural/heritage/test/TestRedis.java index 230298a..c053d16 100644 --- a/src/main/java/com/cultural/heritage/test/TestRedis.java +++ b/src/main/java/com/cultural/heritage/test/TestRedis.java @@ -1,18 +1,15 @@ package com.cultural.heritage.test; -import com.cultural.heritage.mapper.GoodMapper; -import jakarta.annotation.Resource; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component public class TestRedis { - @Resource - private StringRedisTemplate stringRedisTemplate; - - @Resource - private GoodMapper goodMapper; +// @Resource +// private StringRedisTemplate stringRedisTemplate; +// +// @Resource +// private GoodMapper goodMapper; // // @PostConstruct // public void test() { diff --git a/src/main/java/com/cultural/heritage/utils/MultiDelayMessage.java b/src/main/java/com/cultural/heritage/utils/MultiDelayMessage.java new file mode 100644 index 0000000..ff75922 --- /dev/null +++ b/src/main/java/com/cultural/heritage/utils/MultiDelayMessage.java @@ -0,0 +1,59 @@ +package com.cultural.heritage.utils; + +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serial; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +@Data +@NoArgsConstructor +public class MultiDelayMessage implements Serializable { + + /** + * 消息体 + */ + private T data; + + + /** + * 记录延时时间的集合 + */ + private List delayMillis; + + public MultiDelayMessage(T data, List delayMillis) { + this.data = data; + this.delayMillis = delayMillis; + } + + public MultiDelayMessage(T data, Long...delayMillis) { + this.data = data; + this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis)); + } + + /** + * 获取并移除下一个延迟时间 + * @return 集合中第一个延迟时间 + */ + public Long removeNextDelay() { + return delayMillis.remove(0); + } + + + /** + * 是否有下一个延迟时间 + */ + public boolean hasNextDelay() { + return !delayMillis.isEmpty(); + } + + + + @Serial + private static final long serialVersionUID = 1L; + + +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 053ccb0..0cfdc9d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,15 +5,31 @@ spring: url: jdbc:mysql://154.8.193.216:3306/feiyi?serverTimezone=Asia/Shanghai username: feiyi password: 123456asd + hikari: + maximum-pool-size: 20 + max-lifetime: 120000 -# 测试环境 + rabbitmq: + host: 123.249.108.160 + port: 5672 + username: chenxinzhi + password: yuanteng + virtual-host: vhost1 + listener: + simple: + prefetch: 1 + + + + + + # 测试环境 # driver-class-name: com.mysql.cj.jdbc.Driver # url: jdbc:mysql://123.249.108.160:3306/feiyi?serverTimezone=Asia/Shanghai # username: feiyi # password: 123456asd - hikari: - max-lifetime: 120000 + data: redis: port: 6379 diff --git a/src/main/resources/mapper/AppointmentDateMapper.xml b/src/main/resources/mapper/AppointmentDateMapper.xml index b2c696d..23efc35 100644 --- a/src/main/resources/mapper/AppointmentDateMapper.xml +++ b/src/main/resources/mapper/AppointmentDateMapper.xml @@ -6,7 +6,7 @@ diff --git a/src/test/java/com/cultural/heritage/rabbit/Consumer.java b/src/test/java/com/cultural/heritage/rabbit/Consumer.java new file mode 100644 index 0000000..b607c1e --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbit/Consumer.java @@ -0,0 +1,44 @@ +package com.cultural.heritage.rabbit; + + +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.util.Date; + +@Component +public class Consumer { + + @RabbitListener(queues = "myQueue") + public void receive(String message) { + System.out.println("接收时间:" + new Date()); + System.out.println("消费者收到消息:" + message); + } + +// @RabbitListener(queues = "myQueue1") +// public void receive2(String message) { +// System.out.println("消费者接收消息:" + message); +// } + +// @RabbitListener(queues = "myQueue1") +// public void receive1(String productId, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long msgId) throws IOException { +// System.out.println(productId + "开始减少库存"); +// if (productId.equals("产品编号3")) { +// int num = 10 / 0; +// } +// // 手动应答 +// long deliveryTag = msgId; +// channel.basicAck(deliveryTag, false); +// System.out.println(productId + "减少库存成功!"); +// } + +// @RabbitListener(queues = "myQueue2") +// public void receive2(String message) { +// System.out.println("消费者2接收消息:" + message); +// } +// +// @RabbitListener(queues = "myQueue3") +// public void receive3(String message) { +// System.out.println("消费者3接收消息:" + message); +// } +} diff --git a/src/test/java/com/cultural/heritage/rabbit/Producer.java b/src/test/java/com/cultural/heritage/rabbit/Producer.java new file mode 100644 index 0000000..55e14c1 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbit/Producer.java @@ -0,0 +1,47 @@ +package com.cultural.heritage.rabbit; + +import jakarta.annotation.Resource; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageBuilder; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.stereotype.Component; + +import java.util.Date; + +@Component +public class Producer { + + @Resource + private RabbitTemplate rabbitTemplate; + + private String exchange = "myExchange"; + + + public void send(String routingKey, String message) { + Message msg = MessageBuilder + .withBody(message.getBytes()) + .setHeader("x-delay", 5000) + .build(); + System.out.println("发送时间:" + new Date()); + rabbitTemplate.convertAndSend(exchange, routingKey, msg); + } + + +// public void send(String routingKey, String message, MessageDeliveryMode mode) { +// Message msg = MessageBuilder +// .withBody(message.getBytes()) +// .setDeliveryMode(mode) // 指定消息是否持久化 +// .build(); +// rabbitTemplate.convertAndSend(exchange, routingKey, msg); +// } + +// public void send(String message, String routingKey) { +// +// CorrelationData correlationData = new CorrelationData(); +// correlationData.setId("101"); //UUID.randomUUID().toString() +// rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData); +// +// //rabbitTemplate.convertAndSend(exchangeName, routingKey, message); +// } + +} diff --git a/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java b/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java new file mode 100644 index 0000000..cf1d53c --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbit/TestRabbit.java @@ -0,0 +1,27 @@ +package com.cultural.heritage.rabbit; + +import jakarta.annotation.Resource; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@SpringBootTest +@RunWith(SpringRunner.class) +public class TestRabbit { + + @Resource + private Producer producer; + + @Test + public void test() { + String routingKey = "myKey"; + String message = "Hello, RabbitMQ"; + producer.send(routingKey, message); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java b/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java new file mode 100644 index 0000000..eedcefa --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmq/AppTest.java @@ -0,0 +1,94 @@ +package com.cultural.heritage.rabbitmq; + +import com.rabbitmq.client.*; +import org.junit.Test; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@RestController +public class AppTest { + + // 服务器IP + private String host = "123.249.108.160"; + + // RabbitMQ端口 + private int port = 5672; + + // 用户名 + private String username = "chenxinzhi"; + + // 密码 + private String password = "yuanteng"; + + // 虚拟机名字 + private String vhost = "vhost1"; + + // 交换机名字 + private String exchangeName = "myExchange1"; + + // 队列名字 + private String queueName = "myQueue1"; + + // 路由Key + private String routingKey = "myRouting1"; + + + @Test + public void testProducer() throws IOException, TimeoutException { + + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + Connection connection = factory.newConnection(); + // 信道 + Channel channel = connection.createChannel(); + // 创建交换机:直连交换机 + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, + true, false, null); + // 创建队列 + channel.queueDeclare(queueName, true, false, false, null); + // 交换机绑定 + channel.queueBind(queueName, exchangeName, routingKey); + // 发送消息 + String msg = "Hello, RabbitMQ"; + channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); + // 关闭 + channel.close(); + connection.close(); + + } + + + @Test + public void testConsumer() throws IOException, TimeoutException { + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + Connection connection = factory.newConnection(); + // 信道 + Channel channel = connection.createChannel(); + // 获取消息 + DeliverCallback deliverCallback = (consumerTag, message) -> { + String msg = new String(message.getBody()); + System.out.println(msg); + }; + CancelCallback cancelCallback = consumerTag -> { + + }; + channel.basicConsume(queueName, true, deliverCallback, cancelCallback); + } + + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java b/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java new file mode 100644 index 0000000..333ff13 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestDirect.java @@ -0,0 +1,119 @@ +package com.cultural.heritage.rabbitmq; + +import com.rabbitmq.client.*; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@SpringBootTest +@RunWith(SpringRunner.class) +public class TestDirect { + + // 服务器IP + private String host = "123.249.108.160"; + + // RabbitMQ端口 + private int port = 5672; + + // 用户名 + private String username = "chenxinzhi"; + + // 密码 + private String password = "yuanteng"; + + // 虚拟机名字 + private String vhost = "vhost1"; + + // 交换机名字 + private String exchangeName = "myExchange1"; + + // 队列名字 + private String queueName1 = "myQueue1"; + + private String queueName2 = "myQueue2"; + + private String queueName3 = "myQueue3"; + + // 路由Key + private String routingKey1 = "myRouting1"; + + private String routingKey2 = "myRouting2"; + + // 连接对象 + private Connection connection; + + // 信道对象 + private Channel channel; + + + public void setUp() throws IOException, TimeoutException { + + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + connection = factory.newConnection(); + // 信道 + channel = connection.createChannel(); + } + + + @Test + public void testProducer() throws IOException, TimeoutException { + // 创建交换机:直连交换机 + setUp(); + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, + true, false, null); + // 创建队列 + channel.queueDeclare(queueName1, true, false, false, null); + channel.queueDeclare(queueName2, true, false, false, null); + channel.queueDeclare(queueName3, true, false, false, null); + // 交换机绑定 + channel.queueBind(queueName1, exchangeName, routingKey1); + channel.queueBind(queueName2, exchangeName, routingKey2); + channel.queueBind(queueName3, exchangeName, routingKey1); + // 发送消息 + String msg = "Hello, RabbitMQ"; + channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes()); + String msg2 = "RabbitMQ, Hello"; + channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes()); + // 关闭 + channel.close(); + connection.close(); + + } + + + @Test + public void testConsumer() throws IOException, TimeoutException { + // 获取消息 + setUp(); + DeliverCallback deliverCallback = (consumerTag, message) -> { + String msg = new String(message.getBody()); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + }; + CancelCallback cancelCallback = consumerTag -> { + + }; + channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); + } + + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java b/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java new file mode 100644 index 0000000..5990b86 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestFanout.java @@ -0,0 +1,107 @@ +package com.cultural.heritage.rabbitmq; + +import com.rabbitmq.client.*; +import org.junit.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@SpringBootTest +public class TestFanout { + + // 服务器IP + private String host = "123.249.108.160"; + + // RabbitMQ端口 + private int port = 5672; + + // 用户名 + private String username = "chenxinzhi"; + + // 密码 + private String password = "yuanteng"; + + // 虚拟机名字 + private String vhost = "vhost1"; + + // 交换机名字 + private String exchangeName = "myExchange2"; + + // 队列名字 + private String queueName1 = "myQueue1"; + + private String queueName2 = "myQueue2"; + + private String queueName3 = "myQueue3"; + + // 路由Key + private String routingKey1 = "myRouting1"; + + private String routingKey2 = "myRouting2"; + + // 连接对象 + private Connection connection; + + // 信道对象 + private Channel channel; + + + public void setUp() throws IOException, TimeoutException { + + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + connection = factory.newConnection(); + // 信道 + channel = connection.createChannel(); + } + + + @Test + public void testProducer() throws IOException, TimeoutException { + setUp(); + // 创建交换机:直连交换机 + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, + true, false, null); + // 创建队列 + channel.queueDeclare(queueName1, true, false, false, null); + channel.queueDeclare(queueName2, true, false, false, null); + channel.queueDeclare(queueName3, true, false, false, null); + // 交换机绑定 + channel.queueBind(queueName1, exchangeName, ""); + channel.queueBind(queueName2, exchangeName, ""); + channel.queueBind(queueName3, exchangeName, ""); + // 发送消息 + String msg = "Hello, RabbitMQ"; + channel.basicPublish(exchangeName, "", null, msg.getBytes()); + // 关闭 + channel.close(); + connection.close(); + + } + + + @Test + public void testConsumer() throws IOException, TimeoutException { + setUp(); + // 获取消息 + DeliverCallback deliverCallback = (consumerTag, message) -> { + String msg = new String(message.getBody()); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + }; + CancelCallback cancelCallback = consumerTag -> { + + }; + channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); + } + + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java b/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java new file mode 100644 index 0000000..81109b8 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestHeader.java @@ -0,0 +1,118 @@ +package com.cultural.heritage.rabbitmq; + +import com.rabbitmq.client.*; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +@SpringBootTest +@RunWith(SpringRunner.class) +public class TestHeader { + + // 服务器IP + private String host = "123.249.108.160"; + + // RabbitMQ端口 + private int port = 5672; + + // 用户名 + private String username = "chenxinzhi"; + + // 密码 + private String password = "yuanteng"; + + // 虚拟机名字 + private String vhost = "vhost1"; + + // 交换机名字 + private String exchangeName = "myExchange4"; + + // 队列名字 + private String queueName1 = "myQueue1"; + + + // 连接对象 + private Connection connection; + + // 信道对象 + private Channel channel; + + + public void setUp() throws IOException, TimeoutException { + + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + connection = factory.newConnection(); + // 信道 + channel = connection.createChannel(); + } + + + @Test + public void testProducer() throws IOException, TimeoutException { + setUp(); + // 创建交换机:直连交换机 + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, + true, false, null); + // 创建队列 + channel.queueDeclare(queueName1, true, false, false, null); + + // 发送消息 + String msg = "Hello, RabbitMQ"; + Map map = new HashMap(); + map.put("one", "one2"); + map.put("two", "two"); + AMQP.BasicProperties props = new AMQP.BasicProperties().builder().headers(map).build(); + channel.basicPublish(exchangeName, "", props, msg.getBytes()); + // 关闭 + channel.close(); + connection.close(); + + } + + + @Test + public void testConsumer() throws IOException, TimeoutException { + setUp(); + // 获取消息 + DeliverCallback deliverCallback = (consumerTag, message) -> { + String msg = new String(message.getBody()); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + }; + CancelCallback cancelCallback = consumerTag -> { + + }; + // 绑定队列和交换机 + Map map = new HashMap(); + map.put("x-match", "any"); + map.put("one", "one"); + map.put("two", "two"); + channel.queueBind(queueName1, exchangeName, "", map); + // 接收消息 + channel.basicConsume(queueName1, true, deliverCallback, cancelCallback); + } + + + @Test + public void test() throws IOException, TimeoutException, InterruptedException { + testProducer(); + Thread.sleep(5000); + testConsumer(); + } + + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java b/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java new file mode 100644 index 0000000..21216a7 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmq/TestTopic.java @@ -0,0 +1,117 @@ +package com.cultural.heritage.rabbitmq; + +import com.rabbitmq.client.*; +import org.junit.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@SpringBootTest +public class TestTopic { + + // 服务器IP + private String host = "123.249.108.160"; + + // RabbitMQ端口 + private int port = 5672; + + // 用户名 + private String username = "chenxinzhi"; + + // 密码 + private String password = "yuanteng"; + + // 虚拟机名字 + private String vhost = "vhost1"; + + // 交换机名字 + private String exchangeName = "myExchange3"; + + // 队列名字 + private String queueName1 = "myQueue1"; + + private String queueName2 = "myQueue2"; + + private String queueName3 = "myQueue3"; + + // 路由Key + private String routingKey1 = "myRouting.*"; + + private String routingKey2 = "myRouting.b"; + + private String routingKey3 = "myRouting.#"; + + // 连接对象 + private Connection connection; + + // 信道对象 + private Channel channel; + + + public void setUp() throws IOException, TimeoutException { + + // 创建连接工厂、连接、信道 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + factory.setPort(port); + factory.setUsername(username); + factory.setPassword(password); + factory.setVirtualHost(vhost); + // 连接 + connection = factory.newConnection(); + // 信道 + channel = connection.createChannel(); + } + + + @Test + public void testProducer() throws IOException, TimeoutException { + setUp(); + // 创建交换机:直连交换机 + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, + true, false, null); + // 创建队列 + channel.queueDeclare(queueName1, true, false, false, null); + channel.queueDeclare(queueName2, true, false, false, null); + channel.queueDeclare(queueName3, true, false, false, null); + // 交换机绑定 + channel.queueBind(queueName1, exchangeName, routingKey1); + channel.queueBind(queueName2, exchangeName, routingKey2); + channel.queueBind(queueName3, exchangeName, routingKey3); + // 发送消息 + String msg = "Hello, RabbitMQ"; + channel.basicPublish(exchangeName, "myRouting.a.b", null, msg.getBytes()); + // 关闭 + channel.close(); + connection.close(); + + } + + + @Test + public void testConsumer() throws IOException, TimeoutException { + setUp(); + // 获取消息 + DeliverCallback deliverCallback = (consumerTag, message) -> { + String msg = new String(message.getBody()); + System.out.println(msg); + System.out.println(msg); + System.out.println(msg); + }; + CancelCallback cancelCallback = consumerTag -> { + + }; + channel.basicConsume(queueName3, true, deliverCallback, cancelCallback); + } + + + @Test + public void test() throws IOException, TimeoutException, InterruptedException { + testProducer(); + Thread.sleep(5000); + testConsumer(); + } + + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java b/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java new file mode 100644 index 0000000..2dd6760 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmqtest/ConsumerTest.java @@ -0,0 +1,21 @@ +package com.cultural.heritage.rabbitmqtest; + +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +@Component +public class ConsumerTest { + + @RabbitListener(bindings = @QueueBinding( + value = @Queue("myQueue"), + exchange = @Exchange(name = "myExchange", delayed = "true"), + key = "myKey" + )) + public void listenDelayMessage(String msg) { + System.out.println("接收延时消息:" + msg); + } + +} diff --git a/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java b/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java new file mode 100644 index 0000000..c5ff622 --- /dev/null +++ b/src/test/java/com/cultural/heritage/rabbitmqtest/RabbitMQTest.java @@ -0,0 +1,44 @@ +package com.cultural.heritage.rabbitmqtest; + + +import jakarta.annotation.Resource; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@SpringBootTest +@RunWith(SpringRunner.class) +public class RabbitMQTest { + + @Resource + private RabbitTemplate rabbitTemplate; + + + @Test + public void testDelayMessage() { + String exchange = "myExchange"; + String message = "Hello, delay message!"; + + rabbitTemplate.convertAndSend(exchange, "myKey", message, new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) throws AmqpException { + // 添加延迟消息属性 + message.getMessageProperties().setDelay(5000); + return message; + } + }); + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + +} diff --git a/src/test/java/com/cultural/heritage/temp/MessageConfirmConfig.java b/src/test/java/com/cultural/heritage/temp/MessageConfirmConfig.java new file mode 100644 index 0000000..d44459e --- /dev/null +++ b/src/test/java/com/cultural/heritage/temp/MessageConfirmConfig.java @@ -0,0 +1,52 @@ +//package com.cultural.heritage.config; +// +//import jakarta.annotation.PostConstruct; +//import jakarta.annotation.Resource; +//import org.springframework.amqp.core.Message; +//import org.springframework.amqp.core.ReturnedMessage; +//import org.springframework.amqp.rabbit.connection.CorrelationData; +//import org.springframework.amqp.rabbit.core.RabbitTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class MessageConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback { +// +// @Resource +// private RabbitTemplate rabbitTemplate; +// +// @PostConstruct +// public void init() { +// // 设置发布确认交当前对象处理-this +// rabbitTemplate.setConfirmCallback(this); +// +// // 设置消息退回交当前对象处理-this +// rabbitTemplate.setReturnsCallback(this); +// +// // 设置是否保留退回的消息(默认保留) +//// rabbitTemplate.setMandatory(true); +// } +// +// @Override +// public void confirm(CorrelationData correlationData, boolean ack, String cause) { +// if (ack) { +// System.out.println("mq收到消息: = " + correlationData.getId()); +// } else { +// System.out.println("mq未收到消息:" + cause); +// } +// } +// +// @Override +// public void returnedMessage(ReturnedMessage returned) { +// Message message = returned.getMessage(); +// String replyText = returned.getReplyText(); +// int replyCode = returned.getReplyCode(); +// String exchange = returned.getExchange(); +// String routingKey = returned.getRoutingKey(); +// System.out.println("消息被退回:"); +// System.out.println("消息:" + new String(message.getBody())); +// System.out.println("原因:" + replyText); +// System.out.println("应答码:" + replyCode); +// System.out.println("交换机:" + exchange); +// System.out.println("路由:" + routingKey); +// } +//} diff --git a/src/test/java/com/cultural/heritage/temp/RabbitConfig.java b/src/test/java/com/cultural/heritage/temp/RabbitConfig.java new file mode 100644 index 0000000..f1164b4 --- /dev/null +++ b/src/test/java/com/cultural/heritage/temp/RabbitConfig.java @@ -0,0 +1,77 @@ +//package com.cultural.heritage.config; +// +//import org.springframework.amqp.core.*; +//import org.springframework.beans.factory.annotation.Qualifier; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +// +//@Configuration +//public class RabbitConfig { +// +// private String exchange = "myExchange"; +// +// private String queue = "myQueue"; +//// +//// private String queueName2 = "myQueue2"; +//// +//// private String queueName3 = "myQueue3"; +//// +// private String routingKey = "myKey"; +//// +//// private String routingKey2 = "routingKey.b"; +//// +//// private String routingKey3 = "routingKey.#"; +// +// @Bean("myExchange") +// public org.springframework.amqp.core.Exchange exchange() { +// return ExchangeBuilder +// .directExchange(exchange) +// .delayed() +// .build(); +// } +// +// @Bean("myQueue") +// public Queue queue() { +// return QueueBuilder.durable(queue).build(); +//// return QueueBuilder.durable(queueName1).lazy().build(); +// } +// +//// @Bean("myQueue2") +//// public Queue queue2() { +//// return QueueBuilder.durable(queueName2).build(); +//// } +//// +//// @Bean("myQueue3") +//// public Queue queue3() { +//// return QueueBuilder.durable(queueName3).build(); +//// } +// +// @Bean +// public Binding binding(@Qualifier("myExchange") Exchange exchange, @Qualifier("myQueue") Queue queue) { +// return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); +// } +// +// +//// @Bean +//// public Binding binding1(@Qualifier("myExchange1") Exchange exchange, @Qualifier("myQueue1") Queue queue) { +//// Map map = new HashMap(); +//// map.put("x-match", "any"); +//// map.put("one", "one"); +//// map.put("two", "two"); +//// return BindingBuilder.bind(queue).to(exchange).with("").and(map); +//// } +//// +//// @Bean +//// public Binding binding2(@Qualifier("myExchange1") Exchange exchange, @Qualifier("myQueue2") Queue queue) { +//// return BindingBuilder.bind(queue).to(exchange).with(routingKey2).noargs(); +//// } +//// +//// @Bean +//// public Binding binding3(@Qualifier("myExchange1") Exchange exchange, @Qualifier("myQueue3") Queue queue) { +//// return BindingBuilder.bind(queue).to(exchange).with(routingKey3).noargs(); +//// } +// +// +// +// +//} diff --git a/src/test/java/com/cultural/heritage/test/TestHashCode.java b/src/test/java/com/cultural/heritage/test/TestHashCode.java new file mode 100644 index 0000000..6f0b8d9 --- /dev/null +++ b/src/test/java/com/cultural/heritage/test/TestHashCode.java @@ -0,0 +1,25 @@ +package com.cultural.heritage.test; + +public class TestHashCode { + private int num; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestHashCode that = (TestHashCode) o; + return num == that.num; + } + +// @Override +// public int hashCode() { +// return Objects.hash(num); +// } + + public static void main(String[] args) { + TestHashCode testHashCode1 = new TestHashCode(); + TestHashCode testHashCode2 = new TestHashCode(); + System.out.println(testHashCode1.equals(testHashCode2)); + testHashCode1.hashCode(); + } +}