Threadpool là gì

Giới thiệu ThreadPool

Xét về hiệu suất, tạo ra một Thread mới là một hoạt động tốn kém bởi vì nó đòi hỏi hệ điều hành cung cấp tài nguyên để có thể thực thi task (tác vụ). Trên thực tế, ThreadPool được sử dụng cho các ứng dụng quy mô lớn khởi chạy rất nhiều luồng ngắn ngủi để sử dụng hiệu quả các tài nguyên và tăng hiệu suất.

Bạn đang xem: Thread pool là gì

Trong Java, ThreadPool được dùng để giới hạn số lượng Thread được chạy bên trong ứng dụng của chúng ta trong cùng một thời điểm. Nếu chúng ta không có sự giới hạn này, mỗi khi có một Thread mới được tạo ra và được cấp phát bộ nhớ bằng từ khóa new thì sẽ có vấn đề về bộ nhớ và hiệu suất, có thể dẫn đến lỗi crash chương trình.

Ví dụ: Khi chúng ta viết chương trình tải các tập tin từ Internet, mỗi tập tin cần 1 Thread để thực hiện quá trình tải, giả sử cần tải 100 tệp hình ảnh thì chúng ta phải cần tới 100 Thread hoạt động cùng một thời điểm trong cùng một chương trình. Điều này sẽ dễ dẫn đến lỗi quá tải của chương trình, làm ảnh hưởng đến hiệu suất và có thể dẫn đến gây lỗi (crash) chương trình.

Vì vậy, thay vì tạo các luồng mới khi các task (nhiệm vụ) mới đến, một ThreadPool sẽ giữ một số luồng nhàn rỗi (no task) đã sẵn sàng để thực hiện tác vụ nếu cần. Sau khi một thread hoàn thành việc thực thi một tác vụ, nó sẽ không chết. Thay vào đó nó vẫn không hoạt động trong ThreadPool và chờ đợi được lựa chọn để thực hiện nhiệm vụ mới.

Chúng ta có thể giới hạn một số lượng nhất định các Thread đồng thời trong ThreadPool, rất hữu ích để ngăn chặn quá tải. Nếu tất cả các Thread đang bận rộn thực hiện nhiệm vụ, nhiệm vụ mới được đặt trong một hàng đợi (BlockingQueue), chờ đợi một Thread trở nên có sẵn.

Threadpool là gì

Java Concurrency API định nghĩa 3 interfaces cơ bản sau cho các Executor:

Executor: là interface cha của tất cả Executor. Nó xác định chỉ một phương thực excute(Runnable).ExecutorService: là một Executor cho phép theo dõi tiến trình của các tác vụ trả về giá trị (Callable) thông qua đối tượng Future, và quản lý việc kết thúc các luồng. Các phương thức chính của nó bao gồm submit()shutdown().

Xem thêm: Tài Trợ Thương Mại Trong Ngân Hàng Là Gì, Những Lợi Ích Từ Tài Trợ Thương Mại

ScheduledExecutorService: là một ExecutorService có thể lên lịch cho các tác vụ để thực thi sau một khoảng thời gian nhất định, hoặc để thực hiện định kỳ. Các phương thức chính của nó là schedule(), scheduleAtFixedRate() and scheduleWithFixedDelay().

Chúng có thể tạo một Executor bằng cách sử dụng một trong các phương thức được cung cấp bởi lớp tiện ích Executors như sau:

newSingleThreadExecutor(): trong ThreadPool chỉ có 1 Thread và các task (nhiệm vụ) sẽ được xử lý một cách tuần tự.newCachedThreadPool(): trong ThreadPool sẽ có nhiều Thread và các nhiệm vụ sẽ được xử lý một cách song song. Các Thread cũ sau khi xử lý xong sẽ được sử dụng lại cho nhiệm vụ mới. Mặc định nếu một Thread không được sử dụng trong vòng 60 giây thì Thread đó sẽ bị tắt.newFixedThreadPool(int n): trong ThreadPool sẽ được cố định các Thread. Nếu một nhiệm vụ mới được đưa vào mà các Thread đều đang “bận rộn” thì nhiệm vụ đó sẽ được gửi vào Blocking Queue và sau đó nếu có một Thread đã thực thi xong nhiệm vụ của nó thì nhiệm vụ đang ở trong Queue đó sẽ được push ra khỏi Queue và được Thread đó xử lý tiếp.newScheduledThreadPool(int corePoolSize): tương tự như newCachedThreadPool() nhưng sẽ có thời gian delay giữa các Thread.newSingleThreadScheduledExecutor(): tương tự như newSingleThreadExecutor() nhưng sẽ có khoảng thời gian delay giữa các Thread.

Xem thêm: Sinh Năm 1951 Mệnh Gì? Tuổi Tân Mão Hợp Tuổi Nào, Màu Gì, Hướng Nào?

Các ví dụ minh họa tạo và sử dụng ThreadPool

Tạo WorkerThread

Trước khi đi vào chi tiết cách sử dụng các phương thức của lớp Executors, hãy xem một task (tác vụ) sẽ được sử dụng để minh họa cho các ví dụ tiếp theo.

package com.hanic.com.vn.threadpool;public class WorkerThread implements Runnable {private String task;public WorkerThread(String s) {this.task = s;}

Mục lục

  • 1 Thread Pool hoạt động như thế nào
    • 1.1 Executors, Executor and ExecutorService
    • 1.2 ThreadPoolExecutor
    • 1.3 ScheduledThreadPoolExecutor
    • 1.4 ForkJoinPool
  • 2 Tóm lược

Trong Java, khởi tạo một thread sẽ chiếm một phần tài nguyên của hệ thống, dùng để cấp phát cho thread mới, cho phép nó thực thi các nhiệm vụ được giao.

Nếu chúng ta tạo thread một cách không kiểm soát, có thể dẫn đến tình trạng cạn kiệt tài nguyên hệ thống.

Thread Pool được sinh giúp chúng ta quản lý các thread, ngăn chặn tối đa tình trạng cạn kiệt tài nguyên từ việc khởi tạo thread trong chương trình.

Thread Pool hoạt động như thế nào

Threadpool là gì

Thread Pool là 1 collection các thread được khởi trước đó. Thông thường kích thước của collection này là cố định. Nó cho việc thực hiện N task trên các thread được quản lý bởi Thread Pool. Nếu cho nhiều task hơn số Thread hiện hành, những task vào sau sẽ được xếp vào hàng đợi. 

Khi một thread hoàn thành task được giao, nó sẽ đợi để được giao một task khác từ hàng đợi và thực thi. Khi tất cả các task được hoàn thành, những thread này sẽ chờ các task mới để thực thi từ Thread Pool.

Executors, Executor and ExecutorService

Executors class cung cấp một số method cho phép tạo ra các thread pool instance cấu hình sẵn, chúng ta không cần phải tự cấu hình cho các thread pool này, đây là một lợi thế khi mới làm quen với thread pool.

Executor và ExecutorService là các interface được sử dụng trong quá trình thực thi của các thread pool. Executor chứa duy nhất 1 execute() method dùng gửi các Runnable instance để thực thi.

Ví dụ tạo một Executor instance trong thread pool với số lượng thread đối đa là 1, execute() method nhận một implement của Runnable interface dưới dạng lambda expression.

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Hello World"));

ExecutorService interface implement từ Exectuor interface, chứa số lượng lớn method dùng để kiểm soát tiến độ của các task cho đến khi hoàn thành. Sử dụng service này, chúng ta cũng có thể gửi các Runnable instance để thực thi cũng như kiểm soát việc thực hiện của chúng với Future instance.

Ví dụ sử dụng ExecutorService để gửi các task cần thực thi, nó sẽ trả về Future instance. Sử dụng Future.get() method để chờ cho đến khi task được gửi đi hoàn tất và trả về giá trị.

ExecutorService executorService = Executors.newFixedThreadPool(10);
Future future = executorService.submit(() -> "Hello World");
// some operations
String result = future.get();

Trong thực tế, thì chúng ta không cần gọi get() ngay lập tức, mà chỉ nên gọi khi thực sự cần thiết.

ThreadPoolExecutor

Từ Java 5 trở đi, Java concurrency API cung cấp Executor framework. Dựa trêm Executor interface, và các sub-interface của ExecutorService, và ThreadPoolExecutor class implement cả 2 interface này.

ThreadPoolExecutor tách công việc tạo task và thực thi chúng thành 2 phần. Khi làm việc với ThreadPoolExecutor chúng ta chỉ cần triển khai task bằng cách implement Runnable interface và gửi chúng vào executor, nó sẽ chịu trách nhiệm cho việc thực thi, khởi tạo và chạy các thread cần thiết để hoàn thành các task.

Chúng ta sẽ tìm hiểu về 3 tham số chính sử dụng trong ThreadPoolExecutor: corePoolSize, maximunPoolSize và keepAliveTime.

Mỗi thread pool sẽ chứa một số lượng thread nhất định, chúng sẽ được duy trì mọi lúc trong thread pool, khi số lượng task nhiều quá mức, thread pool có thể khởi tạo thêm một số thread tối đa lên đến maximunPoolSize để tăng hiệu suất làm việc. Các thread được tạo thêm này sẽ được chấm dứt khi không còn cần đến, và giữa lại số corePoolSize thread.

Tham số keepAliveTime là khoảng thời gian cho phép các thread được thread pool tạo thêm được phép tồn tại khi ở trạng thái không hoạt động

Ví dụ sử dụng newFixedThreadPool() method để khởi tạo 1 ThreadPoolExecutor với corePoolSize và maximumPoolSize bằng nhau, và keepAliveTime là 0. Như vậy, tại mọi thời thời điểm thì thread pool luôn chứa 2 thread .

executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: "executor.getQueue().size());

Output: 

Pool size: 2
Queue size: 1

Ở ví dụ trên, ThreadPoolExecutor được tạo ra với 2 thread bên trong, nghĩa là chúng ta có thể thực thi nhiều nhất là 2 task cùng lúc. Ở trên, chúng ta đã thực thi 3 task cùng lúc nên một task sẽ được đưa vào hàng đợi và 2 task đang được thực thi.

Một cách khác để khởi tạo ThreadPoolExecutor từ Executors.newCachedThreadPool() method, method này sẽ không nhận bất kỳ tham số nào, corePoolSize sẽ được gán bằng 0, maximunPoolSize sẽ là Interger.MAX, keepAliveTime là 60 seconds.

Với các thông số trên, newCachedThreadPool() sẽ tạo ra thread pool không giới hạn số lượng thread được tạo ra, số lượng thread sẽ tăng lên theo số lượng task, nhưng khi không còn cần đến, chúng sẽ được chấm dứt khi không hoạt động trong 60s.

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
executor.submit(() -> {
    Thread.sleep(1000);
    return null;
});
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: "executor.getQueue().size());

Output: 

Pool size: 3
Queue size: 0

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor extends ThreadPoolExecutor và implement ScheduledExecutorService. Ngoài các tính năng tương tự ThreadPoolExecutor, ScheduledThreadPoolExecutor còn có một số chức năng khác như:

  • schedule() method cho phép thực thi một task sau một khoảng thời gian delay xác định.
  • scheduleAtFixedRate() method cho phép thực thi một task sau một khoảng thời gian delay xác định tính từ thời điểm khởi tạo, sau đó thực thi task lặp đi lặp lại trong một khoảng thời gian định trước, khoảng thời gian này được tính từ lúc bắt đầu task trước đó đến thời gian bắt đầu task tiếp theo.
  • scheduleWithFixedDelay() method tương tự scheduleAtFixedRate() nhưng khoảng thời gian thực thi giữa 2 task được tính đừ lúc kết thúc task trước đi đến khi bắt đầu task kế tiếp.

Sử dụng Executors.newScheduledThreadPool() method để tạo một ScheduledThreadPoolExecutor với corePoolSize là tham số truyền vào, không giới hạn maximumPoolSize, và keepAliveTime là 0.

Ví dụ tạo một ScheduledThreadPoolExecutor, schedule() sẽ thực thi sau khoảng thời gian 500 milliseconds

ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
    System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

Ví dụ tạo một ScheduledThreadPoolExecutor, sử dụng scheduleAtFixedRate() để thực thi task sau khoảng thời gian delay 500 milliseconds từ khi khởi tạo, khoảng thời gian thực thi giữa 2 lần kể tiếp là 100 milliseconds; Chúng ta sẽ thực thi task 3 lần liên tiếp sau đó dừng với Future.cancel(), và CountDownLatch.

CountDownLatch lock = new CountDownLatch(3);
 
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
ScheduledFuture future = executor.scheduleAtFixedRate(() -> {
    System.out.println("Hello World");
    lock.countDown();
}, 500, 100, TimeUnit.MILLISECONDS);
 
lock.await(1000, TimeUnit.MILLISECONDS);
future.cancel(true);

Output:

Hello World
Hello World
Hello World

ForkJoinPool

Các thuật toán đệ quy sinh ra rất nhiều task trong một lúc, nếu sử dụng ThreasPoolExecutor sẽ dẫn đến cạn kiệt nguồn tài nguyên của hệ thống nhanh chóng, vì mỗi task hoặc các subtask của nó đều yêu cầu một thread riêng để chạy. ForkJoinPool là một thành phần quan trọng trong fork/join framework được giới thiệu trong Java 7 để giải quyết vấn đề trên.

Trong fork/join framework, mỗi task có thể chia thành các subtask và chờ chúng hoàn thành với join() method. Một điểm đáng chú ý là fork/join framework sẽ không tạo ra các thread mới cho mỗi task và subtask của nó.

Chúng ta đi đến một ví dụ đơn giản sử dụng ForkJoinPool để duyệt qua tất cả các node của một cây và tính tổng giá trị của các node.

class TreeNode {

    int value;

    List children;

    TreeNode(int value, List children) {
        this.value = value;
        this.children = children;
    }
}

Để tính tổng giá trị của các node, chúng ta cần tạo ra một CountingTask class implement RecursiveTask interface. Mỗi task sẽ nhận một node và tính tổng giá trị các node con của nó bằng cách:

  • Dùng Stream API để duyệt qua các node con của mỗi node
  • Các node con tương ứng tạo một CountingTask thông qua map()method
  • Gọi fork() để tính tổng giá trị các node con của một node.
  • Tập hợp tất cả giá trị với join() method
class CountingTask extends RecursiveTask {

    private final TreeNode node;

    public CountingTask(TreeNode node) {
        this.node = node;
    }

    @Override
    protected Integer compute() {
        return node.value + node.children.stream()
                .map(childNode -> new CountingTask(childNode).fork())
                .mapToInt(ForkJoinTask::join).sum();
    }
}
class Main {
    public static void main(String[] agrs) throws InterruptedException {
        TreeNode tree = new TreeNode(5,
                Arrays.asList(new TreeNode(3, Collections.emptyList()), new TreeNode(2,
                        Arrays.asList(new TreeNode(2, Collections.emptyList()), new TreeNode(8, Collections.emptyList())))));

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        int sum = forkJoinPool.invoke(new CountingTask(tree));
        System.out.println("Sum: " + sum);
    }
}

Output: Sum: 20

Tóm lược

Như vậy Thread Pool quả là một công cụ hiệu quả khi làm việc với thread phải không nào. Chúng ta sẽ giảm thiểu đáng kể việc cạn kiệt tài nguyên hệ thống khi mà các thread được tạo ra vô số mà không được quản lý chặt chẽ. 

Nguồn tham khảo

https://www.baeldung.com/thread-pool-java-and-guava

https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-example/