简单的线程池及其实现

  |   0 评论   |   1,575 浏览

本文主要参考了<a href="">这篇http://www.cnblogs.com/hustcat/archive/2008/10/10/1308425.html文章,并在这个基础上做了一些修改。

进程,线程是现代操作系统两个比较重要的概念。正是由于它们的存在,使得程序和并发执行得了实现。通常,创建一个线程的代价远远小于创建一个进程,所以多线程是编写并发程序的首要选择。然而,即使有多线程,当线程数量太大时,不断的创建线程也会影响系统的性能,这时,我们可以创建线程池来达到重用线程的目的,从而尽可能有减小开消,从而大大提高系统性能,比如在网络爬虫heritrix中就使用了线程池。

以下是一个简单线程池的实现(java程序)。

本程序由4个类构成,TestThreadPool,用来测试的类,用来模拟客户端的请求。它会创建20个任务(Task),交给线程池(ThreadPoolManager)处理。

线程池默认维护10个线程,当客户请求一个任务时,它会获取一个空闲线程,然后
处理交给该线程(SimpleThread)处理。

TestThreadPool 测试线程

package threadpool;
public class TestThreadPool {
    public static void main(String[] args) {
        try {
            ThreadPoolManager manager = new ThreadPoolManager(); // 生成线程池
            for (int i = 0; i < 20; i++) {
                Task t = new Task(\"task\" + i);
                manager.ProcessTask(t); // 处理任务
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Task 用来添加任务

package threadpool;
public class Task {
    private String name; // 任务名称
    public Task(String name) {
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String toString() {
        return this.name;
    }
}

ThreadPoolManager 主要是线程管理

package threadpool;
import java.util.LinkedList;
public class ThreadPoolManager {
    private LinkedList<SimpleThread> threads; // 保存线程
    private int maxThreadCount; // 最大的线程数
    private final LinkedList<Task> stQueue = new LinkedList<Task>();
    public ThreadPoolManager() {
        this(10);
    }
    public ThreadPoolManager(int maxThreadCount) {
        this.maxThreadCount = maxThreadCount;
        threads = new LinkedList<SimpleThread>();
        for (int i = 0; i < maxThreadCount; i++) {
            SimpleThread thread = new SimpleThread(i);
            threads.offer(thread);
            thread.start();
        }
        new Thread(new Runnable() {
            @SuppressWarnings(\"static-access\")
            @Override
            public void run() {
                while(true){
                    while(!stQueue.isEmpty()){
                        ProcessTask(stQueue.poll());
                    }
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(\"检查stQueue是否有元素,队列中大小:\"+stQueue.size());
                }
            }
        }).start();
        System.out.println(\"thread pool created\");
    }
    // 处理一个任务
    public void ProcessTask(Task task) {
        SimpleThread thread = getIdleThread(); // 获取一个空闲线程
        if (thread != null) {
            thread.setArgument(task);
            thread.setRunningFlag(true);
        } else {
            System.out.println(\"没有空闲线程!\");
            //线程没有空闲的时候, 需要存储未执行的数据
            stQueue.offer(task);
        }
    }
    // 获取一个空闲线程
    private synchronized SimpleThread getIdleThread() {
        for (int i = 0; i < maxThreadCount; i++) {
            SimpleThread thread = threads.get(i);
            if (!thread.isRunning()) {
                return thread;
            }
        }
        return null;
    }
}

SimpleThread 执行任务的处理

package threadpool;
public class SimpleThread extends Thread {
    private boolean runningFlag; // 线程状态
    private Task argument; // 一个任务
    public SimpleThread(int num) {
        this.runningFlag = false;
        System.out.println(\"thread \" + num + \" starting\");
    }
    // 线程执行
    public synchronized void run() {
        try {
            while (true) {
                if (!runningFlag) {
                    this.wait(); // 睡眠
                } else {
                    System.out.println(\"processing \" + argument.toString());
                    sleep(3000); // 阻塞3秒
                    System.out.println(argument.toString() + \" processed\");
                    setRunningFlag(false);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean isRunning() {
        return runningFlag;
    }
    public synchronized void setRunningFlag(boolean flag) {
        this.runningFlag = flag;
        if (runningFlag) {
            this.notify(); // 唤醒线程
        }
    }
    public void setArgument(Task arg) {
        this.argument = arg;
    }
    public Task getArgument() {
        return argument;
    }
}

评论

发表评论

validate