Java

Fork/Join 프레임워크와 CompletableFuture

AlbertIm 2025. 1. 6. 15:07

시작

동시성 프로그래밍은 프로그램에서 매우 중요한 개념입니다. 이 Post에서는 Java에서 동시성 프로그래밍을 위해 사용하는 Fork/Join 프레임워크와 CompletableFuture에 대해 알아보겠습니다.

본문

Fork/Join 프레임워크

Fork/Join 프레임워크는 보이지 않은 스레드 풀을 사용하여 작업을 분할하고 자동으로 스케줄링하고 병렬로 실행하는 프레임워크입니다.

 

코드로 살펴보겠습니다.

// RecursiveAction은 ForkJoinTask<Void>의 하위 클래스입니다.
// RecursiveAction은 compute() 메서드를 사용하여 작업을 수행합니다.
public class TransactionSorter extends RecursiveAction {

    // 작업을 분할할 때, 작업의 크기가 이 값보다 작으면 작업을 분할하지 않고 작업을 수행합니다.
    private static final int SMALL_ENOUGH = 32;
    private final Transaction[] transactions;
    private final int start;
    private final int end;
    private final Transaction[] result;

    public TransactionSorter(List<Transaction> transactions) {
        this(transactions.toArray(new Transaction[0]), 0, transactions.size());
    }

    public TransactionSorter(Transaction[] transactions) {
        this(transactions, 0, transactions.length);
    }

    private TransactionSorter(Transaction[] transactions, int start, int end) {
        this.transactions = transactions;
        this.start = start;
        this.end = end;
        this.result = new Transaction[size()];
    }

    public int size() {
        return end - start;
    }

    public Transaction[] getResult() {
        return result;
    }

    @Override
    protected void compute() {
        if (size() < SMALL_ENOUGH) {
            System.arraycopy(transactions, start, result, 0, size());
            Arrays.sort(result, 0, size());
            // 작업이 킅난 쓰레드의 이름과 정렬된 결과를 출력합니다.
            System.out.println(Thread.currentThread().getName() + " : " + Arrays.toString(result));
        } else {
            // 작업을 분할합니다.
            int mid = start + size() / 2;
            TransactionSorter left = new TransactionSorter(transactions, start, mid);
            TransactionSorter right = new TransactionSorter(transactions, mid, end);

            // 작업을 병렬로 실행합니다.
            invokeAll(left, right);
            // 작업을 병합합니다.
            merge(left, right);
        }
    }

    private void merge(TransactionSorter left, TransactionSorter right) {
        int i = 0;
        int l = 0;
        int r = 0;

        while (l < left.size() && r < right.size()) {
            int compare = left.result[l].compareTo(right.result[r]);
            result[i++] = compare < 0 ? left.result[l++] : right.result[r++];
        }

        while (l < left.size()) {
            result[i++] = left.result[l++];
        }

        while (r < right.size()) {
            result[i++] = right.result[r++];
        }
    }
}

// ----------------------------
public class Transaction implements Comparable<Transaction> {

    private static final AtomicLong counter = new AtomicLong(1);
    private final Account sender;
    private final Account receiver;
    private final int amount;
    private final long id;
    private final LocalDateTime time;

    public Transaction(Account sender, Account receiver, int amount, LocalDateTime time) {
        this.sender = sender;
        this.receiver = receiver;
        this.amount = amount;
        this.id = counter.getAndIncrement();
        this.time = time;
    }

    public static Transaction of(Account sender, Account receiver, int amount) {
        return new Transaction(sender, receiver, amount, LocalDateTime.now());
    }

    @Override
    public int compareTo(Transaction o) {
        return Comparator.nullsFirst(LocalDateTime::compareTo).compare(this.time, o.time);
    }

    public Account getSender() {
        return sender;
    }

    public Account getReceiver() {
        return receiver;
    }

    public int getAmount() {
        return amount;
    }

    public long getId() {
        return id;
    }

    public LocalDateTime getTime() {
        return time;
    }

    @Override
    public int hashCode() {
        return Objects.hash(getSender(), getReceiver(), getAmount(), getId(), getTime());
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Transaction)) {
            return false;
        }
        Transaction that = (Transaction) o;
        return getAmount() == that.getAmount() && getId() == that.getId() &&
               Objects.equals(getSender(), that.getSender()) &&
               Objects.equals(getReceiver(), that.getReceiver()) &&
               Objects.equals(getTime(), that.getTime());
    }

    @Override
    public String toString() {
        return new StringJoiner(", ", Transaction.class.getSimpleName() + "[", "]")
                .add("id=" + id)
                .toString();
    }
}
// ----------------------------

public class Account {

    private int balance;

    public Account(int balance) {
        this.balance = balance;
    }

    public int getBalance() {
        return balance;
    }

    public void deposit(int amount) {
        balance += amount;
    }

    public void withdraw(int amount) {
        balance -= amount;
    }
}

// ----------------------------

public static void main(String[] args) {
    var transactions = new ArrayList<Transaction>();
    var accounts = List.of(new Account(1000), new Account(2000), new Account(3000));

    for (int i = 0; i < 100; i++) {
        var sender = accounts.get(i % accounts.size());
        var receiver = accounts.get((i + 1) % accounts.size());
        var amount = (i + 1) * 10;
        transactions.add(Transaction.of(sender, receiver, amount));
    }

    Collections.shuffle(transactions);
    var sorter = new TransactionSorter(transactions);

    ForkJoinPool.commonPool().invoke(sorter);

    for (Transaction transaction : sorter.getResult()) {
        System.out.println(transaction);
    }
}

 

위 코드는 Fork/Join 프레임워크를 사용하여 Transaction 객체를 정렬하는 코드입니다. 출력 결과는 다음과 같습니다.

main : [Transaction[id=1], Transaction[id=4], Transaction[id=7], Transaction[id=10], Transaction[id=16], Transaction[id=29], Transaction[id=35], Transaction[id=37], Transaction[id=41], Transaction[id=42], Transaction[id=45], Transaction[id=47], Transaction[id=49], Transaction[id=50], Transaction[id=54], Transaction[id=58], Transaction[id=61], Transaction[id=67], Transaction[id=72], Transaction[id=78], Transaction[id=83], Transaction[id=84], Transaction[id=86], Transaction[id=91], Transaction[id=99]]
ForkJoinPool.commonPool-worker-5 : [Transaction[id=9], Transaction[id=12], Transaction[id=13], Transaction[id=14], Transaction[id=17], Transaction[id=22], Transaction[id=24], Transaction[id=25], Transaction[id=26], Transaction[id=27], Transaction[id=30], Transaction[id=32], Transaction[id=33], Transaction[id=40], Transaction[id=60], Transaction[id=62], Transaction[id=63], Transaction[id=68], Transaction[id=69], Transaction[id=75], Transaction[id=85], Transaction[id=93], Transaction[id=95], Transaction[id=97], Transaction[id=100]]
ForkJoinPool.commonPool-worker-3 : [Transaction[id=11], Transaction[id=15], Transaction[id=20], Transaction[id=21], Transaction[id=31], Transaction[id=36], Transaction[id=39], Transaction[id=44], Transaction[id=46], Transaction[id=48], Transaction[id=56], Transaction[id=59], Transaction[id=66], Transaction[id=70], Transaction[id=73], Transaction[id=74], Transaction[id=76], Transaction[id=77], Transaction[id=79], Transaction[id=80], Transaction[id=82], Transaction[id=87], Transaction[id=94], Transaction[id=96], Transaction[id=98]]
ForkJoinPool.commonPool-worker-7 : [Transaction[id=2], Transaction[id=3], Transaction[id=5], Transaction[id=6], Transaction[id=8], Transaction[id=18], Transaction[id=19], Transaction[id=23], Transaction[id=28], Transaction[id=34], Transaction[id=38], Transaction[id=43], Transaction[id=51], Transaction[id=52], Transaction[id=53], Transaction[id=55], Transaction[id=57], Transaction[id=64], Transaction[id=65], Transaction[id=71], Transaction[id=81], Transaction[id=88], Transaction[id=89], Transaction[id=90], Transaction[id=92]]
Transaction[id=1]
Transaction[id=2]
Transaction[id=3]
Transaction[id=4]
Transaction[id=5]
....
Transaction[id=100]

 

결과에서 볼 수 있듯이 작업이 분할되어 병렬로 스레드에서 실행되고 병합되어 정렬된 결과가 출력됩니다.

Fork/Join 프레임워크는 work-stealing 알고리즘을 사용하여 작업을 분할하고 병렬로 실행합니다. work-stealing 알고리즘은 스레드가 작업을 끝내면 다른 스레드의 시작하지 않은 작업을 가져와서 실행합니다.

CompletableFuture

CompletableFuture는 Future 인터페이스를 확장한 인터페이스로 비동기 작업을 쉽게 처리할 수 있도록 도와줍니다.

 

코드를 통해 살펴보겠습니다.

public static void main(String[] args) {
    var future = CompletableFuture.supplyAsync(() -> {
        System.out.println("Starting on: " + Thread.currentThread().getName());
        return "Starting!";
    });

    var future2 = future.thenApply(s -> {
        System.out.println("ThenApply on: " + Thread.currentThread().getName());
        return s + "\nThenApply!";
    });

    var future3 = future2.thenApplyAsync(s -> {
        System.out.println("ThenApplyAsync on: " + Thread.currentThread().getName());
        return s + "\nThenApplyAsync!";
    });

    try {
        System.out.println(future3.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

 

위 코드는 CompletableFuture를 사용하여 비동기 작업을 처리하는 코드입니다. 출력 결과는 다음과 같습니다.

Starting on: ForkJoinPool.commonPool-worker-3
ThenApply on: ForkJoinPool.commonPool-worker-3
ThenApplyAsync on: ForkJoinPool.commonPool-worker-5
Starting!
ThenApply!
ThenApplyAsync!

결과에서 볼 수 있듯이 supplyAsync() 메서드를 사용하여 비동기 작업을 시작하고 thenApply() 메서드를 사용하여 작업을 처리하고 thenApplyAsync() 메서드를 사용하여 비동기로 작업을 처리합니다. async 메서드를 사용하면 작업이 다른 스레드에서 실행됩니다.

마무리

이 Post에서는 Java의 동시성 프로그래밍에 사용하는 Fork/Join 프레임워크와 CompletableFuture에 대해 정리해 보았습니다. Fork/Join 프레임워크는 작업을 분할하고 병렬로 실행한 후 병합하는 프레임워크이며 CompletableFuture는 비동기 작업을 쉽게 처리할 수 있도록 도와주는 인터페이스입니다.

참조