RATSENO

CompletableFuture<T> -1 본문

DEV/JAVA

CompletableFuture<T> -1

RATSENO 2021. 3. 14. 21:31

CompletableFuture를 간단하게 정리하기 전에, 속성으로 동기, 비동기에 대해서 정리하겠습니다.

저도 이해하기 편하도록 식당을 예를 들어 설명해보겠습니다.

 

동기 : 식당에 갔습니다. 음식을 점원분께 주문하고 저는 음식이 나오면 제가 직접 세팅을 해야 되기 때문에 나올 때까지 기다립니다.

           아무것도 하지않고

 

비동기  : 식당에 갔습니다. 음식을 점원분께 주문하고 식탁 위에 세팅을 해달라고 미리 말씀드려놨습니다.(callback)

                 음식이 나올 때까지 저는 기다리지 않고 다른 일 할 수 있었습니다.

 

이어서 블로킹, 논블로킹도 정리해보겠습니다. 음식을 가져다주시는 점원분의 입장에서 보시면 편합니다.

 

 [동기, 비동기] + 블로킹 :  점원분은 제가 주문을 한 순간, 음식이 다 나올 때까지 저를 붙잡고 아무것도 하지 못하게 합니다.  점원분이                                                 음식을 주실 때까지 저는 아무것도 할 수 없습니다.

 

 [동기, 비동기] + 논블로킹 :  점원분은 제가 주문을 한 순간, 주문을 전달하러 떠나셨습니다.

                                              그때까지 저는 핸드폰을 하며 다른 행동을 할 수 있습니다. 점원분에 음식을 저에게 전달해 주시고 떠납니다.

 

CompletableFutureFuture<T>, CompletionStage<T>의 구현체입니다.

CompletableFuture를 이용하여 비동기 처리와 블로킹, 논블로킹 방식의 프로세스를 구성할 수 있습니다.

 

간단하게 샘플 코드로 사용법을 정리하겠습니다.

샘플 코드는 회원 객체와 회원 리파지토리에서 회원명으로 회원 ID와 회원의 나이를 조회하는 서비스를 구성하였습니다.

여기서 회원을 조회하는 메서드를 호출하는 방식을 동기, 비동기식으로. 결괏값을 처리하는 방식을 블로킹, 논블로킹 방식으로 여러 가지 작성해 보도록 하겠습니다.

 

package com.ratseno.model;

public class MemberVO {
    //회원 번호
    private int id;
    //회원 명
    private String name;
    //회원 나이
    private int age;

    public MemberVO(int id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

 

package com.ratseno.repository;

import com.ratseno.model.MemberVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import org.springframework.util.StopWatch;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

@Repository
public class MemberRepository {

    private static final Logger log = LoggerFactory.getLogger(MemberRepository.class);

    private Map<String, MemberVO> memberVOMap = new HashMap<>();

    @PostConstruct
    public void init(){
        memberVOMap.put("철수", new MemberVO(1, "철수", 20));
        memberVOMap.put("영희", new MemberVO(2, "영희", 21));
        memberVOMap.put("미애", new MemberVO(3, "미애", 22));
    }

    public int getMemberIdByMemberName(String name){
        try {
        	//조회하는데 3초걸림
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return memberVOMap.get(name).getId();
    }

    public int getMemberAgeByMemberName(String name){
        try {
            //조회하는데 7초 걸림
            Thread.sleep(7000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return memberVOMap.get(name).getId();
    }
}

 

간단한 샘플이기 때문에, 샘플 데이터를 초기 세팅하여 사용하도록 하겠습니다.

getMemberIdByMemberName 메서드는 회원명으로 회원 ID를 return 합니다. 소요시간은 3초입니다.

getMemberAgeByMemberName 메서드는 회원명으로 회원 나이를 return 합니다. 소요시간은 6초입니다.

public interface MemberService {
	//동기 호출방식
    int getMemberId(String name);
}
package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Service
public class MemberServiceImpl implements MemberService{

    private final static Logger log = LoggerFactory.getLogger(MemberServiceImpl.class);

    @Autowired
    private MemberRepository memberRepository;

    @Override
    public int getMemberId(String name) {
        return memberRepository.getMemberIdByMemberName(name);
    }
}

 먼저 동기식 호출입니다. 흔히 볼 수 있는 방식입니다. 이 메서드를 junit를 이용하여 테스트를 진행하겠습니다.

 

package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
        MemberServiceImpl.class,
        MemberRepository.class
})
public class MemberServiceImplTest {

    private static final Logger log = LoggerFactory.getLogger(MemberServiceImplTest.class);

    @Autowired
    private MemberService memberService;

    @Test
    public void 동기_방식으로_ID조회하기(){

        log.info("시작");
        int memberId = memberService.getMemberId("철수");
        log.info("끝");

        Assert.assertEquals(1, memberId);
    }
}

해당 메서드를 호출하고 3초가 지나고 결괏값을 return 받고 나서야 "끝"log가 출력됩니다.

 

새로운 스레드를 이용하는  비동기 호출 방식 메서드를 생성하겠습니다.( 편의상 service구현체만 보여드립니다.)

package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Service
public class MemberServiceImpl implements MemberService{

    private final static Logger log = LoggerFactory.getLogger(MemberServiceImpl.class);

    @Autowired
    private MemberRepository memberRepository;

    @Override
    public int getMemberId(String name) {
        return memberRepository.getMemberIdByMemberName(name);
    }
    
    @Override
    public CompletableFuture<Integer> getMemberIdAsync(String name) {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberAgeByMemberName = memberRepository.getMemberAgeByMemberName(name);
            future.complete(memberAgeByMemberName);
        }).start();
        //결과값이 나오기 전에 먼저 CompletableFuture<Integer>를 return하여
        //메서드를 호출한곳에 주도권을 넘겨준다.
        return future;
    }
}

먼저 선언되어있는 CompletableFuture를 return 하고, 3초 후에 해당 결괏값을 future.complete(memberAgeByMemberName);를 이용하여 완료처리합니다.

 

package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
        MemberServiceImpl.class,
        MemberRepository.class
})
public class MemberServiceImplTest {

    private static final Logger log = LoggerFactory.getLogger(MemberServiceImplTest.class);

    @Autowired
    private MemberService memberService;

    @Test
    public void 동기_방식으로_ID조회하기(){

        log.info("시작");
        int memberId = memberService.getMemberId("철수");
        log.info("끝");

        Assert.assertEquals(1, memberId);
    }
    
    @Test
    public void 비동기_방식_잠시_블록킹(){
        CompletableFuture<Integer> completableFuture = memberService.getMemberIdAsync("철수");
        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");
        Integer join = completableFuture.join();
        if(join>0){
            log.info("/*메인 스레드가 끝나는것을 방지하기 위한 코드*/");
        }
    }
}

비동기로 호출하였기 때문에 "아직 최종 데이터를 전달받지는 않았지만, 다른 작업 수행가능"이" 출력됩니다.

하지만 결괏값을 얻기 위한 행위를 메인 스레드에서 completableFuture.join() 호출하였기 때문에, 해당 소요시간인 7초가 지

나서(이미 7초는 지나는 중) 결과가 나올 때까지 블로킹 상태에 놓이게 됩니다. 결과를 호출할 쪽에서 직접 관리하려 하기 때문입니다.

이 내용은 먼저 다양한 비동기 호출 방식을 설명하고 정리하겠습니다.

 

runAsync()

지금까지는 new Thread를 이용하는 방식 대신 다른 방식을 사용할 것입니다.

먼저 runAsync 메서드입니다. CompletableFuture 클래스에 직접 들어가 살펴보면,

runAsync를 호출하는 방식은 두 가지가 있습니다.

 public static CompletableFuture<Void> runAsync(Runnable runnable)
 public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

하나는 Executor를 파라미터로 받고 있습니다. Executor를 사용하지 않는 방식은 기본적으로 ForkJoinPool#commonPool()를 사용하여 실행될 task를 실행한다고 합니다. 검색해보니 ForkJoinPool의 commonPool을 사용하는 것은 바람직하지 않다고 하는데.. 이유는 좀 더 공부해야겠지만 아무튼 Executor를 넘겨주어 사용하도록 하겠습니다.

runAsync는 CompletableFuture <Void> 반환 값이 없습니다.(void)

package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Service
public class MemberServiceImpl implements MemberService{

    private final static Logger log = LoggerFactory.getLogger(MemberServiceImpl.class);

    Executor executor = Executors.newFixedThreadPool(10);

    @Autowired
    private MemberRepository memberRepository;

    @Override
    public int getMemberId(String name) {
        return memberRepository.getMemberIdByMemberName(name);
    }
    
    @Override
    public CompletableFuture<Integer> getMemberIdAsync(String name) {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberAgeByMemberName = memberRepository.getMemberAgeByMemberName(name);
            future.complete(memberAgeByMemberName);
        }).start();
        //결과값이 나오기 전에 먼저 CompletableFuture<Integer>를 return하여
        //메서드를 호출한곳에 주도권을 넘겨준다.
        return future;
    }
    
    @Override
    public CompletableFuture<Void> getMemberIdAsync_runAsync(String name) {
        log.info("비동기 호출 방식으로 회원 ID 조회 시작");

        //runAsync는 CompletableFuture<Void>를 return 한다.
        return CompletableFuture.runAsync(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberIdByMemberName = memberRepository.getMemberIdByMemberName(name);
            log.info("회원 ID:"+memberIdByMemberName);
        }, executor);
    }
}
package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
        MemberServiceImpl.class,
        MemberRepository.class
})
public class MemberServiceImplTest {

    private static final Logger log = LoggerFactory.getLogger(MemberServiceImplTest.class);

    @Autowired
    private MemberService memberService;

    @Test
    public void 동기_방식으로_ID조회하기(){

        log.info("시작");
        int memberId = memberService.getMemberId("철수");
        log.info("끝");

        Assert.assertEquals(1, memberId);
    }
    
    @Test
    public void 비동기_방식_잠시_블록킹(){
        CompletableFuture<Integer> completableFuture = memberService.getMemberIdAsync("철수");
        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");
        Integer join = completableFuture.join();
        if(join>0){
            log.info("/*메인 스레드가 끝나는것을 방지하기 위한 코드*/");
        }
    }
    
    @Test
    public void 비동기_방식_블록킹_되는_ID조회하기_반환값_없는_runAsync(){

        CompletableFuture<Void> completableFuture = memberService.getMemberIdAsync_runAsync("철수");
        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");
        completableFuture.join();
    }
}

supplyAsync()

앞에서 살펴본 runAsync() 메서드는 리턴 값이 없었습니다.

suplyAsync()는 리턴값이 있습니다.(Supplier <U> supplier) , CompletableFuture <U>

suplyAsync()도 runAsync()와 동일하게 Executor를 사용하는 메서드, 사용하지 않는 메서드가 있습니다.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Service
public class MemberServiceImpl implements MemberService{

    private final static Logger log = LoggerFactory.getLogger(MemberServiceImpl.class);

    Executor executor = Executors.newFixedThreadPool(10);

    @Autowired
    private MemberRepository memberRepository;

    @Override
    public int getMemberId(String name) {
        return memberRepository.getMemberIdByMemberName(name);
    }
    
    @Override
    public CompletableFuture<Integer> getMemberIdAsync(String name) {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        new Thread(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberAgeByMemberName = memberRepository.getMemberAgeByMemberName(name);
            future.complete(memberAgeByMemberName);
        }).start();
        //결과값이 나오기 전에 먼저 CompletableFuture<Integer>를 return하여
        //메서드를 호출한곳에 주도권을 넘겨준다.
        return future;
    }
    
    @Override
    public CompletableFuture<Void> getMemberIdAsync_runAsync(String name) {
        log.info("비동기 호출 방식으로 회원 ID 조회 시작");

        //runAsync는 CompletableFuture<Void>를 return 한다.
        return CompletableFuture.runAsync(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberIdByMemberName = memberRepository.getMemberIdByMemberName(name);
            log.info("회원 ID:"+memberIdByMemberName);
        }, executor);
    }
    
    @Override
    public CompletableFuture<Integer> getMemberIdAsync_supplyAsync(String name) {
        log.info("비동기 호출 방식으로 회원 ID 조회 시작");

        return CompletableFuture.supplyAsync(() -> {
            log.info("새로운 쓰레드로 작업 시작");
            int memberIdByMemberName = memberRepository.getMemberIdByMemberName(name);
            log.info("회원 ID:"+memberIdByMemberName);
            return memberIdByMemberName;
        },executor);
    }  
}
package com.ratseno.service;

import com.ratseno.repository.MemberRepository;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {
        MemberServiceImpl.class,
        MemberRepository.class
})
public class MemberServiceImplTest {

    private static final Logger log = LoggerFactory.getLogger(MemberServiceImplTest.class);

    @Autowired
    private MemberService memberService;

    @Test
    public void 동기_방식으로_ID조회하기(){

        log.info("시작");
        int memberId = memberService.getMemberId("철수");
        log.info("끝");

        Assert.assertEquals(1, memberId);
    }
    
    @Test
    public void 비동기_방식_잠시_블록킹(){
        CompletableFuture<Integer> completableFuture = memberService.getMemberIdAsync("철수");
        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");
        Integer join = completableFuture.join();
        if(join>0){
            log.info("/*메인 스레드가 끝나는것을 방지하기 위한 코드*/");
        }
    }
    
    @Test
    public void 비동기_방식_블록킹_되는_ID조회하기_반환값_없는_runAsync(){

        CompletableFuture<Void> completableFuture = memberService.getMemberIdAsync_runAsync("철수");
        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");
        completableFuture.join();
    }
    
    @Test
    public void 비동기_방식_블록킹_되는_ID조회하기_반환값_있는_supplyAsync(){

        log.info("비동기 방식으로 호출했기 때문에 3초동안 다른것 할수있다.");
        CompletableFuture<Integer> completableFuture = memberService.getMemberIdAsync_supplyAsync("철수");

        log.info("아직 최종 데이터를 전달 받지는 않았지만, 다른작업 수행가능");

        Integer memberId = completableFuture.join();
        log.info("completableFuture.join()하는 순간 블록킹 발생");

        Assert.assertEquals(1, memberId.intValue());
    }    
}

 

지금까지는 비동기 방식으로 호출하는 방법에 대해서 알아보았습니다.

하지만 get(),  join() 메서드를 호출함으로써 블로킹되는 순간이 발생하였기 때문에, 먼가 찜찜합니다.

이를 해결하여 논블로킹 방식으로 사용하기 위한 콜백 방식에 대해서 정리해보도록 하겠습니다.

 

잘못된 부분은 언제나 지적해주시면 열심히 공부하여 수정하도록 하겠습니다..!

 

참고 : www.baeldung.com/java-completablefuture

        brunch.co.kr/@springboot/267

        docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

github : github.com/RATSENO/CompletableFuture-Test

 

 

'DEV > JAVA' 카테고리의 다른 글

CompletableFuture<T> -2  (0) 2021.03.22
[Lombok]"is"prefix가 붙은 boolean, Boolean  (2) 2021.03.16
[JAVA]TreeSet  (0) 2020.02.05
[JAVA]Map 컬렉션 - HashMap  (0) 2020.02.05
[JAVA]Map 컬렉션  (0) 2020.02.05
Comments