juc

本文最后更新于:1 年前

线程池demo

简化版java线程池

package cn.itcast.test;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j(topic = "c.ThredPoolDemo")
public class ThredPoolDemo {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1, ((queue, task) -> {
            // 1.死等
//            queue.put(task);
            // 2.带超时时间的等待
            queue.offer(task, 1500, TimeUnit.MILLISECONDS);
            // 3.放弃任务执行
//            log.debug("放弃任务{}", task);
            // 4.抛出异常
//            throw new RuntimeException("任务执行失败 " + task);
            // 5.让调用者自己执行任务
//            task.run();  // 在主线程中执行任务
        }));
        for (int i = 0; i < 3; i++) {
            int id = i;
            threadPool.execute(()->{
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("{}", id);
            });
        }
    }
}

/**
 * 将执行策略下放给调用者决定
 * @param <T>
 */
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockingQueue<T> queue, T task);
}

@Slf4j(topic = "c.ThreadPool")
class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;

    // 线程集合
    private HashSet<Worker> workers = new HashSet();

    // 核心线程数
    private int coreSize;

    // 超时时间
    private long timeout;

    // 时间单位
    private TimeUnit timeUnit;

    // 拒绝策略
    private RejectPolicy<Runnable> rejectPolicy;

    //执行任务
    public void execute(Runnable task){
        synchronized (workers){
            // 当任务线程数没有超过核心线程数 直接交个worker对象执行
            if(workers.size() < coreSize){
                Worker worker = new Worker(task);
                log.debug("worker被创建{}, task is {}", worker,task);
                workers.add(worker);
                // 执行任务
                worker.start();
            } else {
                // 如果任务数超过coreSize 则加入任务队列暂存
//                taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy,task);
            }
        }
    }

	// 构造函数
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapatiy, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueCapatiy);
        this.rejectPolicy = rejectPolicy;
    }
	// 任务处理对象
    class Worker extends Thread{
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 1.当task不为空时,直接执行任务
            // 2.当task为空时,需要到任务队列中拉去任务执行
//            while(task != null || (task = taskQueue.take()) != null){
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null){
                try{
                    log.debug("task正在被执行...{}", task);
                    task.run();
                }catch (Exception e){

                }finally {
                    task = null;
                }
            }
            // 退出循环说明任务执行完毕 需要从worker中移除
            synchronized (workers){
                log.debug("worker 被移除{}", this);
                workers.remove(this);
            }
        }
    }
}
@Slf4j(topic = "c.BlockingQueue")
class BlockingQueue<T> {
    // 1.任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 2.锁
    private ReentrantLock lock = new ReentrantLock();
    // 3.生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4.消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5.线程池的最大容量
    private int capacity;

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 将timeout统一转换为纳秒
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if (nanos <= 0) {
                        return null;
                    }
                    // 返回的是剩余的等待时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }


    // 阻塞获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        } finally {
            lock.unlock();
        }
    }


    // 带超时时间的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
                while (queue.size() == capacity) {
                try {
                    if(nanos <= 0){
                        return false;
                    }
                    log.debug("等待加入任务队列{}", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }

    //阻塞添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    log.debug("等待加入任务队列{}", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列{}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 有拒绝策略的添加
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断队列是否满
            if(queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {  // 有空闲
                log.debug("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    // 获取大小
    public int getSize() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

定时任务demo

能够在固定时间执行任务

package cn.itcast.n8;


import java.time.DayOfWeek;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleDemo {


    // 如何实现在 固定时间点执行任务
    public static void main(String[] args) {
        // 当前时间
        LocalDateTime nowTime = LocalDateTime.now();

        // 周四的时间
        LocalDateTime time =  nowTime.withHour(22).withMinute(12).withSecond(0).withNano(0).with(DayOfWeek.SUNDAY);
        // 当前时间与周四的时间差值
        long initDaley;
        // 一周的时间
        long period = 1000 * 3600 * 24 * 7;

        if(nowTime.compareTo(time) > 0 ){
            time.plusWeeks(1);
        }

        System.out.println(nowTime);
        System.out.println(time);
        initDaley = Duration.between(nowTime, time).toMillis();
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);

        pool.scheduleAtFixedRate(()->{
            System.out.println("running...");
        }, initDaley, period, TimeUnit.MILLISECONDS);
    }
}

CountDownLatch 应用例子

模拟游戏开始之前用户的加载过程

@Slf4j(topic = "c.CountDownLatchDemo")
public class CountDownLatchDemo {

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(10);
        CountDownLatch latch = new CountDownLatch(10);
        Random random = new Random();
        String[] allUsers = new String[10];
        for (int i = 0; i < 10; i++) {
            int k = i;
            service.submit(() -> {
                for (int j = 0; j <= 100; j++) {
                    try {
                        Thread.sleep(random.nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    allUsers[k] = j + "%";
                    System.out.print("\r" + Arrays.toString(allUsers));
                }
                latch.countDown();
            });
        }

        try {
            latch.await();
            System.out.println("\r" + Arrays.toString(allUsers));
            log.debug("游戏开始...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();
    }
}

image-20230511224925989


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

 目录

Copyright © 2020 my blog
载入天数... 载入时分秒...