delay_task
本文最后更新于:1 年前
在Springboot中使用RabbitMQ实现倒计时执行任务
死信
顾名思义,就是死掉的信息,英文是Dead Letter。死信交换机(Dead-Letter-Exchange)
和普通交换机没有区别,都是可以接受信息并转发到与之绑定并能路由到的队列,区别在于死信交换机
是转发死信
的,而和该死信交换机
绑定的队列就是死信队列
。说的再通俗一点,死信交换机和死信队列其实都只是普通的交换机和队列,只不过接受、转发的信息是死信
,其他操作并没有区别。
1.配置RabbitMQ
1.1 创建交换机
交换机的存在可以将生产者发送的消息,路由到不同的queue
1.2 创建死信队列
- x-message-ttl: 消息的存活时间,单位是ms
- x-dead-letter-exchange: 指定死信队列的交换机为之前创建的delay_exchange
- x-dead-letter-routing-key: 当消息过期之后转发到交换机key(k2)对应的队列queue
1.3 创建普通队列
1.4 绑定交换机和队列
1.5 代码实现
以上步骤也能使用代码实现
2 springboot整合RabbitMQ
2.1导入依赖
<!--raabbitmq客户端-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 添加配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: 123456
virtual-host: host1
listener:
type: simple
simple:
default-requeue-rejected: false # 消息被拒绝
acknowledge-mode: manual # 手动确认消息
2.3 创建交换机、队列并绑定
第一节的实现,也可以使用代码实现
队列名、交换机名与key
public class RabbitConstants {
public static final String DLX_EXCHANGE = "delay_exchange";
public static final String DLX_ROUTING_KEY = "k1";
public static final String DLX_QUEUE = "delay_queue1";
public static final String QUEUE = "delay_queue2";
public static final String ROUTING_KEY = "k2";
}
import com.sunzy.vulfocus.common.RabbitConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import java.util.HashMap;
import java.util.Map;
//@Configuration
public class RabbitConfig {
/**
* 死信队列
* @return {@link Queue}
*/
@Bean
public Queue Queue1() {
Map<String, Object> argments = new HashMap<>();
argments.put("x-message-ttl", 60000);// 一分钟的延时
argments.put("x-dead-letter-exchange", RabbitConstants.DLX_EXCHANGE);
argments.put("x-dead-letter-routing-key", RabbitConstants.ROUTING_KEY);
return new Queue(RabbitConstants.DLX_QUEUE, true, false, false, argments);
}
@Bean
public Queue Queue2(){
return new Queue(RabbitConstants.QUEUE, true, false, false);
}
/**
* 交换机
*
* @return {@link Exchange}
*/
@Bean
public Exchange orderExchange() {
return new DirectExchange(RabbitConstants.DLX_EXCHANGE, true, false, null);
}
/**
* 路由键
*
* @return {@link Binding}
*/
@Bean
public Binding Routing() {
return BindingBuilder.bind(Queue2()).to(orderExchange()).with(RabbitConstants.ROUTING_KEY).noargs();
}
/**
* 死信路由键
*
* @return {@link Binding}
*/
@Bean
public Binding DlxRouting() {
return BindingBuilder.bind(Queue1()).to(orderExchange()).with(RabbitConstants.DLX_ROUTING_KEY).noargs();
}
}
2.4 测试代码
生产者向交换机delay_exchange
发送消息,并且key为k1,交换机接收消息后,会将该消息路由到delay_queue1
,该消息会在死信队列中待60s后,转发到delay_queue2
中
@Test
void testSendAmqp() {
String taskId = "cfdf21cd13f248d3885ec47329bbb3bf";
amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
RabbitConstants.DLX_ROUTING_KEY,
taskId);
System.out.println("发送时间:" + LocalDateTime.now());
System.out.println("send message success");
}
消费者这边监听delay_queue2
中的消息
@RabbitListener(queues = RabbitConstants.QUEUE, ackMode = "MANUAL")
public void getMessage(Message message, Channel channel) throws IOException {
System.out.println("接收死信队列传递的消息...");
System.out.println("时间:" + LocalDateTime.now());
String taskId = new String(message.getBody());
System.out.println(taskId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
测试效果如下
从测试结果可以看出来,时间正好相差60s,已经实现了延时执行任务的操作。
3 实现倒计时关闭docker容器
上面的延时消息的实现,主要服务于这个倒计时执行任务。
有了上面的实现,再实现倒计时关闭docker容器就简单很多
3.1 启动容器
首先在启动容器时,如果容器启动成功,此时需要创建一个关闭容器的任务,并将关闭容器的任务发送到死信队列中。
@Async
public void runContainer(String containerId, UserDTO user, String taskId, int countdown) throws Exception {
ContainerVul containerVul = containerService.query().eq("container_id", containerId).one();
if (containerVul == null) {
throw ErrorClass.ContainerNotExistsException;
}
String dockerContainerId = null;
UserUserprofile userInfo = userService.getById(user.getId());
dockerContainerId = containerVul.getDockerContainerId();
ImageInfo imageInfo = imageService.query().eq("image_id", containerVul.getImageIdId()).one();
String imageName = imageInfo.getImageName();
String imagePort = imageInfo.getImagePort();
Integer userId = userInfo.getId();
Result msg = null;
/**
* 创建启动任务
*/
HashMap<String, Object> args = new HashMap<>();
args.put("imageName", imageName);
args.put("userId", userId);
args.put("imagePort", imagePort);
TaskInfo taskInfo = query().eq("task_id", taskId).one();
String command = "";
String vulFlag = containerVul.getContainerFlag();
String containerPort = containerVul.getContainerPort();
String vulPort = "";
if (containerVul.getVulPort() != null) {
vulPort = containerVul.getVulPort();
}
String vulHost = containerVul.getVulHost();
Container container = null;
if (!StrUtil.isBlank(dockerContainerId)) {
CheckResp checkResp = checkContainer(dockerContainerId);
if (checkResp.isFlag()) {
container = checkResp.getContainer();
vulFlag = containerVul.getContainerFlag();
}
}
// 容器被删除,此时要创建一个容器
if (container == null) {
String[] portList = imagePort.split(",");
ArrayList<String> randomList = new ArrayList<>();
HashMap<String, Integer> portDict = new HashMap<>();
for (String port : portList) {
String randomPort = "";
for (int i = 0; i < 20; i++) {
randomPort = DockerTools.getRandomPort();
if (randomList.contains(randomPort) || containerService.query().eq("container_port", randomPort).one() != null) {
continue;
}
break;
}
if (StrUtil.isBlank(randomPort)) {
msg = Result.fail("端口无效");
break;
}
randomList.add(randomPort);
portDict.put(port + "/tcp", Integer.valueOf(randomPort));
}
// 端口重复无法创建
if (msg != null) {
taskInfo.setTaskMsg(JSON.toJSONString(msg));
taskInfo.setUpdateDate(LocalDateTime.now());
taskInfo.setTaskStatus(4);
save(taskInfo);
// return taskInfo.getTaskId();
}
// 记录端口映射
// {"3306": "24471", "80": "29729"}
// {"3306":"22113","8080":"12345"}
vulPort = JSON.toJSONString(portDict);
HashMap<String, Integer> vulPorts = new HashMap<>();
Set<Map.Entry<String, Integer>> entries = portDict.entrySet();
for (Map.Entry<String, Integer> entry : entries) {
String port = entry.getKey().split("/")[0];
Integer tmpRandomPort = entry.getValue();
vulPorts.put(port, tmpRandomPort);
}
try {
// 只创建不启动
dockerContainerId = DockerTools.runContainerWithPorts(imageName, vulPorts);
container = DockerTools.getContainerById(dockerContainerId);
} catch (Exception e) {
// 修改任务状态
msg = Result.build("镜像不存在", null);
taskInfo.setTaskMsg(JSON.toJSONString(msg));
taskInfo.setUpdateDate(LocalDateTime.now());
taskInfo.setTaskStatus(4);
save(taskInfo);
throw new RuntimeException(e);
}
vulFlag = "flag{" + UUID.randomUUID().toString() + "}";
if (!StrUtil.isBlank(containerVul.getContainerFlag())) {
vulFlag = containerVul.getContainerFlag();
}
command = "touch /tmp/" + vulFlag;
vulHost = DockerTools.getLocalIp();
} // 容器存在
LocalDateTime taskStartDate = LocalDateTime.now();
LocalDateTime taskEndDate = null;
if (countdown >= 60) {
taskEndDate = taskStartDate.plusSeconds(countdown);
} else if (countdown == 0) {
} else {
countdown = SystemConstants.DOCKER_CONTAINER_TIME;
taskEndDate = taskStartDate.plusSeconds(countdown);
}
assert container != null;
if (container.getState().equals("running")) {
HashMap<String, Object> data = new HashMap<>();
data.put("host", vulHost);
data.put("port", vulPort);
data.put("id", containerId);
data.put("status", "running");
data.put("start_data", taskStartDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
data.put("end_data", taskEndDate != null ? taskEndDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli() : 0);
LambdaQueryWrapper<TaskInfo> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(true, TaskInfo::getUserId, userId);
wrapper.eq(true, TaskInfo::getTaskMsg, JSON.toJSONString(data));
wrapper.eq(true, TaskInfo::getOperationType, 2);
wrapper.eq(true, TaskInfo::getOperationArgs, JSON.toJSONString(args));
wrapper.eq(true, TaskInfo::getTaskEndDate, taskEndDate);
wrapper.eq(true, TaskInfo::getTaskName, "运行容器:" + imageName);
TaskInfo searchTaskInfo = getOne(wrapper);
if (searchTaskInfo == null) {
taskInfo.setTaskId(taskId);
taskInfo.setTaskStatus(3);
taskInfo.setTaskMsg(JSON.toJSONString(data));
taskInfo.setOperationArgs(JSON.toJSONString(args));
taskInfo.setUpdateDate(LocalDateTime.now());
taskId = taskInfo.getTaskId();
LambdaQueryWrapper<TaskInfo> updateWrapperTask = new LambdaQueryWrapper<>();
updateWrapperTask.eq(true, TaskInfo::getTaskId, taskId);
update(taskInfo, updateWrapperTask);
} else {
LambdaQueryWrapper<TaskInfo> removeWapper = new LambdaQueryWrapper<>();
removeWapper.eq(true, TaskInfo::getTaskId, taskId);
remove(removeWapper);
searchTaskInfo.setTaskId(taskId);
searchTaskInfo.setUpdateDate(LocalDateTime.now());
searchTaskInfo.setTaskStatus(3);
save(searchTaskInfo);
}
} else {
DockerTools.startContainer(dockerContainerId);
// 写入flag
msg = dockerContainerRun(container, command);
if (msg != null && msg.getStatus() == SystemConstants.HTTP_ERROR) {
try {
DockerTools.deleteContainer(container.getId());
} catch (Exception ignored) {
log.info("删除容器失败!");
}
taskInfo.setTaskStatus(4);
} else {
HashMap<String, Object> data = (HashMap<String, Object>) msg.getData();
String status = (String) data.get("status");
data.put("host", vulHost);
data.put("port", vulPort);
data.put("id", containerId);
data.put("start_data", taskStartDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
data.put("end_data", taskEndDate != null ? taskEndDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli() : 0);
containerVul.setContainerStatus(status);
containerVul.setDockerContainerId(dockerContainerId);
containerVul.setVulHost(vulHost);
containerVul.setVulPort(vulPort);
containerVul.setContainerFlag(vulFlag);
containerVul.setContainerPort(containerPort);
containerVul.setCreateDate(LocalDateTime.now());
containerVul.setUserId(userId);
containerVul.setIScheck(false);
LambdaQueryWrapper<ContainerVul> updateWrapper = new LambdaQueryWrapper<>();
updateWrapper.eq(true, ContainerVul::getContainerId, containerVul.getContainerId());
containerService.update(containerVul, updateWrapper);
taskInfo.setTaskStartDate(LocalDateTime.now());
taskInfo.setTaskEndDate(LocalDateTime.now());
taskInfo.setTaskStatus(3);
}
taskInfo.setTaskMsg(JSON.toJSONString(msg));
taskInfo.setUpdateDate(LocalDateTime.now());
LambdaQueryWrapper<TaskInfo> updateWrapperTask = new LambdaQueryWrapper<>();
updateWrapperTask.eq(true, TaskInfo::getTaskId, taskId);
update(taskInfo, updateWrapperTask);
}
log.info("启动漏洞容器成功,任务ID:" + taskId);
// 创建关闭容器任务 半小时后关闭
String stopContainerTaskId = createStopContainerTask(containerVul, user);
// 发送到RabbitMQ的死信队列
log.info("向死信队列发送任务id");
amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
RabbitConstants.DLX_ROUTING_KEY,
stopContainerTaskId);
}
重点代码就是,就很简单。
// 发送到RabbitMQ的死信队列
log.info("向死信队列发送任务id");
amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
RabbitConstants.DLX_ROUTING_KEY,
stopContainerTaskId);
3.2 关闭容器
在consumer的监听器中执行关闭容器的步骤
import com.rabbitmq.client.Channel;
import com.sunzy.vulfocus.common.RabbitConstants;
import com.sunzy.vulfocus.service.impl.TaskInfoServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
@Component
public class StopContainerListener {
@Resource
@Lazy // 加这个注释,是因为在启动时,出现了Bean循环依赖的错误
private TaskInfoServiceImpl taskInfoService;
@RabbitListener(queues = RabbitConstants.QUEUE, ackMode = "MANUAL")
public void getMessage(Message message, Channel channel) throws IOException {
System.out.println("接收死信队列传递的消息...");
System.out.println("时间:" + LocalDateTime.now());
String taskId = new String(message.getBody());
System.out.println(taskId);
//
taskInfoService.stopContainer(taskId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
一开始启动时出现报错,类似如下信息,增加@Lazy
解决Bean循环依赖的错误
org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'AService': Bean with name 'AService' has been injected into other beans [BService] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.
完整代码在vulfocus_java
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!