delay_task

本文最后更新于:1 年前

在Springboot中使用RabbitMQ实现倒计时执行任务

死信顾名思义,就是死掉的信息,英文是Dead Letter。死信交换机(Dead-Letter-Exchange)和普通交换机没有区别,都是可以接受信息并转发到与之绑定并能路由到的队列,区别在于死信交换机是转发死信的,而和该死信交换机绑定的队列就是死信队列。说的再通俗一点,死信交换机和死信队列其实都只是普通的交换机和队列,只不过接受、转发的信息是死信,其他操作并没有区别。

1.配置RabbitMQ

1.1 创建交换机

交换机的存在可以将生产者发送的消息,路由到不同的queue

image-20230413095908823

1.2 创建死信队列

  • x-message-ttl: 消息的存活时间,单位是ms
  • x-dead-letter-exchange: 指定死信队列的交换机为之前创建的delay_exchange
  • x-dead-letter-routing-key: 当消息过期之后转发到交换机key(k2)对应的队列queue

image-20230413100411042

1.3 创建普通队列

image-20230413153741198

1.4 绑定交换机和队列

image-20230413100759703

1.5 代码实现

以上步骤也能使用代码实现

2 springboot整合RabbitMQ

2.1导入依赖

<!--raabbitmq客户端-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 添加配置

rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: root
  password: 123456
  virtual-host: host1
  listener:
    type: simple
    simple:
      default-requeue-rejected: false # 消息被拒绝
      acknowledge-mode: manual		# 手动确认消息

2.3 创建交换机、队列并绑定

第一节的实现,也可以使用代码实现

队列名、交换机名与key

public class RabbitConstants {
    public static final String DLX_EXCHANGE = "delay_exchange";
    public static final String DLX_ROUTING_KEY = "k1";
    public static final String DLX_QUEUE = "delay_queue1";
    public static final String QUEUE = "delay_queue2";
    public static final String ROUTING_KEY = "k2";
}
import com.sunzy.vulfocus.common.RabbitConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;
import java.util.Map;
//@Configuration
public class RabbitConfig {

    /**
     * 死信队列
     * @return {@link Queue}
     */
    @Bean
    public Queue Queue1() {
        Map<String, Object> argments = new HashMap<>();
        argments.put("x-message-ttl", 60000);// 一分钟的延时
        argments.put("x-dead-letter-exchange", RabbitConstants.DLX_EXCHANGE);
        argments.put("x-dead-letter-routing-key", RabbitConstants.ROUTING_KEY);
        return new Queue(RabbitConstants.DLX_QUEUE, true, false, false, argments);
    }


    @Bean
    public Queue Queue2(){
        return new Queue(RabbitConstants.QUEUE, true, false, false);
    }

    /**
     * 交换机
     *
     * @return {@link Exchange}
     */
    @Bean
    public Exchange orderExchange() {
        return new DirectExchange(RabbitConstants.DLX_EXCHANGE, true, false, null);
    }

    /**
     * 路由键
     *
     * @return {@link Binding}
     */
    @Bean
    public Binding Routing() {
        return BindingBuilder.bind(Queue2()).to(orderExchange()).with(RabbitConstants.ROUTING_KEY).noargs();
    }


    /**
     * 死信路由键
     *
     * @return {@link Binding}
     */
    @Bean
    public Binding DlxRouting() {
        return BindingBuilder.bind(Queue1()).to(orderExchange()).with(RabbitConstants.DLX_ROUTING_KEY).noargs();
    }
}

2.4 测试代码

生产者向交换机delay_exchange发送消息,并且key为k1,交换机接收消息后,会将该消息路由到delay_queue1,该消息会在死信队列中待60s后,转发到delay_queue2

@Test
void testSendAmqp() {
    String taskId = "cfdf21cd13f248d3885ec47329bbb3bf";
    amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
            RabbitConstants.DLX_ROUTING_KEY,
            taskId);
    System.out.println("发送时间:" + LocalDateTime.now());
    System.out.println("send message success");
}

消费者这边监听delay_queue2中的消息

@RabbitListener(queues = RabbitConstants.QUEUE, ackMode = "MANUAL")
public void getMessage(Message message, Channel channel) throws IOException {
    System.out.println("接收死信队列传递的消息...");
    System.out.println("时间:" + LocalDateTime.now());
    String taskId = new String(message.getBody());
    System.out.println(taskId);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

测试效果如下

image-20230413160512237

image-20230413160457446

从测试结果可以看出来,时间正好相差60s,已经实现了延时执行任务的操作。

3 实现倒计时关闭docker容器

上面的延时消息的实现,主要服务于这个倒计时执行任务。

有了上面的实现,再实现倒计时关闭docker容器就简单很多

3.1 启动容器

首先在启动容器时,如果容器启动成功,此时需要创建一个关闭容器的任务,并将关闭容器的任务发送到死信队列中。

@Async
public void runContainer(String containerId, UserDTO user, String taskId, int countdown) throws Exception {
    ContainerVul containerVul = containerService.query().eq("container_id", containerId).one();
    if (containerVul == null) {
        throw ErrorClass.ContainerNotExistsException;
    }
    String dockerContainerId = null;
    UserUserprofile userInfo = userService.getById(user.getId());
    dockerContainerId = containerVul.getDockerContainerId();
    ImageInfo imageInfo = imageService.query().eq("image_id", containerVul.getImageIdId()).one();
    String imageName = imageInfo.getImageName();
    String imagePort = imageInfo.getImagePort();
    Integer userId = userInfo.getId();

    Result msg = null;
    /**
         * 创建启动任务
         */
    HashMap<String, Object> args = new HashMap<>();
    args.put("imageName", imageName);
    args.put("userId", userId);
    args.put("imagePort", imagePort);
    TaskInfo taskInfo = query().eq("task_id", taskId).one();
    String command = "";
    String vulFlag = containerVul.getContainerFlag();
    String containerPort = containerVul.getContainerPort();

    String vulPort = "";
    if (containerVul.getVulPort() != null) {
        vulPort = containerVul.getVulPort();
    }
    String vulHost = containerVul.getVulHost();
    Container container = null;
    if (!StrUtil.isBlank(dockerContainerId)) {
        CheckResp checkResp = checkContainer(dockerContainerId);
        if (checkResp.isFlag()) {
            container = checkResp.getContainer();
            vulFlag = containerVul.getContainerFlag();
        }
    }

    // 容器被删除,此时要创建一个容器
    if (container == null) {
        String[] portList = imagePort.split(",");
        ArrayList<String> randomList = new ArrayList<>();
        HashMap<String, Integer> portDict = new HashMap<>();
        for (String port : portList) {
            String randomPort = "";
            for (int i = 0; i < 20; i++) {
                randomPort = DockerTools.getRandomPort();
                if (randomList.contains(randomPort) || containerService.query().eq("container_port", randomPort).one() != null) {
                    continue;
                }
                break;
            }
            if (StrUtil.isBlank(randomPort)) {
                msg = Result.fail("端口无效");
                break;
            }
            randomList.add(randomPort);
            portDict.put(port + "/tcp", Integer.valueOf(randomPort));
        }
        // 端口重复无法创建
        if (msg != null) {
            taskInfo.setTaskMsg(JSON.toJSONString(msg));
            taskInfo.setUpdateDate(LocalDateTime.now());
            taskInfo.setTaskStatus(4);
            save(taskInfo);
            //                return taskInfo.getTaskId();
        }
        // 记录端口映射
        // {"3306": "24471", "80": "29729"}
        // {"3306":"22113","8080":"12345"}
        vulPort = JSON.toJSONString(portDict);
        HashMap<String, Integer> vulPorts = new HashMap<>();
        Set<Map.Entry<String, Integer>> entries = portDict.entrySet();
        for (Map.Entry<String, Integer> entry : entries) {
            String port = entry.getKey().split("/")[0];
            Integer tmpRandomPort = entry.getValue();
            vulPorts.put(port, tmpRandomPort);
        }
        try {
            // 只创建不启动
            dockerContainerId = DockerTools.runContainerWithPorts(imageName, vulPorts);
            container = DockerTools.getContainerById(dockerContainerId);
        } catch (Exception e) {
            // 修改任务状态
            msg = Result.build("镜像不存在", null);
            taskInfo.setTaskMsg(JSON.toJSONString(msg));
            taskInfo.setUpdateDate(LocalDateTime.now());
            taskInfo.setTaskStatus(4);
            save(taskInfo);
            throw new RuntimeException(e);
        }
        vulFlag = "flag{" + UUID.randomUUID().toString() + "}";
        if (!StrUtil.isBlank(containerVul.getContainerFlag())) {
            vulFlag = containerVul.getContainerFlag();
        }
        command = "touch /tmp/" + vulFlag;
        vulHost = DockerTools.getLocalIp();
    }   // 容器存在
    LocalDateTime taskStartDate = LocalDateTime.now();
    LocalDateTime taskEndDate = null;
    if (countdown >= 60) {
        taskEndDate = taskStartDate.plusSeconds(countdown);
    } else if (countdown == 0) {

    } else {
        countdown = SystemConstants.DOCKER_CONTAINER_TIME;
        taskEndDate = taskStartDate.plusSeconds(countdown);
    }
    assert container != null;
    if (container.getState().equals("running")) {
        HashMap<String, Object> data = new HashMap<>();
        data.put("host", vulHost);
        data.put("port", vulPort);
        data.put("id", containerId);
        data.put("status", "running");
        data.put("start_data", taskStartDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
        data.put("end_data", taskEndDate != null ? taskEndDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli() : 0);
        LambdaQueryWrapper<TaskInfo> wrapper = new LambdaQueryWrapper<>();
        wrapper.eq(true, TaskInfo::getUserId, userId);
        wrapper.eq(true, TaskInfo::getTaskMsg, JSON.toJSONString(data));
        wrapper.eq(true, TaskInfo::getOperationType, 2);
        wrapper.eq(true, TaskInfo::getOperationArgs, JSON.toJSONString(args));
        wrapper.eq(true, TaskInfo::getTaskEndDate, taskEndDate);
        wrapper.eq(true, TaskInfo::getTaskName, "运行容器:" + imageName);
        TaskInfo searchTaskInfo = getOne(wrapper);
        if (searchTaskInfo == null) {
            taskInfo.setTaskId(taskId);
            taskInfo.setTaskStatus(3);
            taskInfo.setTaskMsg(JSON.toJSONString(data));
            taskInfo.setOperationArgs(JSON.toJSONString(args));
            taskInfo.setUpdateDate(LocalDateTime.now());
            taskId = taskInfo.getTaskId();
            LambdaQueryWrapper<TaskInfo> updateWrapperTask = new LambdaQueryWrapper<>();
            updateWrapperTask.eq(true, TaskInfo::getTaskId, taskId);
            update(taskInfo, updateWrapperTask);
        } else {
            LambdaQueryWrapper<TaskInfo> removeWapper = new LambdaQueryWrapper<>();
            removeWapper.eq(true, TaskInfo::getTaskId, taskId);
            remove(removeWapper);
            searchTaskInfo.setTaskId(taskId);
            searchTaskInfo.setUpdateDate(LocalDateTime.now());
            searchTaskInfo.setTaskStatus(3);
            save(searchTaskInfo);
        }

    } else {
        DockerTools.startContainer(dockerContainerId);
        // 写入flag
        msg = dockerContainerRun(container, command);
        if (msg != null && msg.getStatus() == SystemConstants.HTTP_ERROR) {
            try {
                DockerTools.deleteContainer(container.getId());
            } catch (Exception ignored) {
                log.info("删除容器失败!");
            }
            taskInfo.setTaskStatus(4);
        } else {
            HashMap<String, Object> data = (HashMap<String, Object>) msg.getData();
            String status = (String) data.get("status");
            data.put("host", vulHost);
            data.put("port", vulPort);
            data.put("id", containerId);
            data.put("start_data", taskStartDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
            data.put("end_data", taskEndDate != null ? taskEndDate.toInstant(ZoneOffset.ofHours(8)).toEpochMilli() : 0);

            containerVul.setContainerStatus(status);
            containerVul.setDockerContainerId(dockerContainerId);
            containerVul.setVulHost(vulHost);
            containerVul.setVulPort(vulPort);
            containerVul.setContainerFlag(vulFlag);
            containerVul.setContainerPort(containerPort);
            containerVul.setCreateDate(LocalDateTime.now());
            containerVul.setUserId(userId);
            containerVul.setIScheck(false);
            LambdaQueryWrapper<ContainerVul> updateWrapper = new LambdaQueryWrapper<>();
            updateWrapper.eq(true, ContainerVul::getContainerId, containerVul.getContainerId());
            containerService.update(containerVul, updateWrapper);

            taskInfo.setTaskStartDate(LocalDateTime.now());
            taskInfo.setTaskEndDate(LocalDateTime.now());
            taskInfo.setTaskStatus(3);
        }

        taskInfo.setTaskMsg(JSON.toJSONString(msg));
        taskInfo.setUpdateDate(LocalDateTime.now());

        LambdaQueryWrapper<TaskInfo> updateWrapperTask = new LambdaQueryWrapper<>();
        updateWrapperTask.eq(true, TaskInfo::getTaskId, taskId);
        update(taskInfo, updateWrapperTask);
    }
    log.info("启动漏洞容器成功,任务ID:" + taskId);
    // 创建关闭容器任务 半小时后关闭
    String stopContainerTaskId = createStopContainerTask(containerVul, user);
    // 发送到RabbitMQ的死信队列
    log.info("向死信队列发送任务id");
    amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
                                RabbitConstants.DLX_ROUTING_KEY,
                                stopContainerTaskId);
}

重点代码就是,就很简单。

// 发送到RabbitMQ的死信队列
log.info("向死信队列发送任务id");
amqpTemplate.convertAndSend(RabbitConstants.DLX_EXCHANGE,
                            RabbitConstants.DLX_ROUTING_KEY,
                            stopContainerTaskId);

3.2 关闭容器

在consumer的监听器中执行关闭容器的步骤

import com.rabbitmq.client.Channel;
import com.sunzy.vulfocus.common.RabbitConstants;
import com.sunzy.vulfocus.service.impl.TaskInfoServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;

@Component
public class StopContainerListener {

    @Resource
    @Lazy // 加这个注释,是因为在启动时,出现了Bean循环依赖的错误
    private TaskInfoServiceImpl taskInfoService;

    @RabbitListener(queues = RabbitConstants.QUEUE, ackMode = "MANUAL")
    public void getMessage(Message message, Channel channel) throws IOException {
        System.out.println("接收死信队列传递的消息...");
        System.out.println("时间:" + LocalDateTime.now());
        String taskId = new String(message.getBody());
        System.out.println(taskId);
        //
        taskInfoService.stopContainer(taskId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

一开始启动时出现报错,类似如下信息,增加@Lazy解决Bean循环依赖的错误

org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'AService': Bean with name 'AService' has been injected into other beans [BService] in its raw version as part of a circular reference, but has eventually been wrapped. This means that said other beans do not use the final version of the bean. This is often the result of over-eager type matching - consider using 'getBeanNamesForType' with the 'allowEagerInit' flag turned off, for example.

完整代码在vulfocus_java


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

 目录

Copyright © 2020 my blog
载入天数... 载入时分秒...