본문 바로가기

카테고리 없음

TransactionManager 까보기 - DataSourceTransactionManager? JpaTransactionManager?

😥 문제

Batch 개발 도중 Batch 모듈에서 JPA를 이용해, insert 작업을 진행했는데

Transaction이 없다고 에러가 발생했다

 

🤔 원인은?

insert 작업을 진행하는 메서드가 @Trasnactional(propagation=REQUIRE_NEW)로 

새로운 transaction을 생성해서 작업을 수행하는 부분이라서

새로운 transaction 생성에 문제가 있겠구나 하고 찾아봤었는데, 

애초에 TransactionManager에 문제가 있음을 확인했다

 

Batch 모듈에서는 TransacationManager에 대한 Config가 커스텀되어있었는데

@Configuration
public class TransactionManagerConfig {

    private static final String DB1_DATA_SOURCE = "db1_dataSource";
    private static final String DB2_DATA_SOURCE = "db2_dataSource";

    @Bean(name = "db1TransactionManager")
    public PlatformTransactionManager db1TransactionManager(
            @Qualifier(DB1_DATA_SOURCE) DataSource db1DataSource) {
        return new DataSourceTransactionManager(db1DataSource);
    }

    @Primary
    @Bean(name = "db2TransactionManager")
    public PlatformTransactionManager db2TransactionManager(
            @Qualifier(DB2_DATA_SOURCE) DataSource db2DataSource) {
        return new DataSourceTransactionManager(db2DataSource);
    }
}

- 회사 프로젝트 config라서 코드는 전체적으로 수정했다

 

Batch 모듈에서 접근하는 DB는 아래와 같다

  • DB1 - Mybatis 만을 사용하여 데이터 접근
  • DB2 - Mybatis와 JPA를 같이 사용하여 데이터 접근

 

그래서 각각의 DataSource에 따라서 PlatformTransactionManager 구현체인

DataSourceTransactionManager를 각 DB에 대한 TransactionManager로 Bean으로 등록해주었다

 

문제는 insert가 발생하는 메서드는 DB2를 사용하고 있었고

해당 메서드는 JPA를 사용해서 insert를 진행한다는 점이었다

 

@Primary
    @Bean(name = "db2TransactionManager")
    public PlatformTransactionManager db2TransactionManager(
            @Qualifier(DB2_DATA_SOURCE) DataSource db2DataSource) {
        return new JpaTransactionManager(db2DataSource);
    }

 

따라서 db2DataSource에 대한 TransactionManager를 생성할 때

DataSourceTransactionManger가 아닌 JPA 영속성 컨텍스트를 사용할 수 있는 (즉, EntityManager을 고려하여 Transaction을 생성하는) JpaTransactionManager를 구현체로 Bean으로 등록해야하는 것이다

 

그럼 PlatformTransactionManager는 어떻게 구성되어있고, 어떻게 Transaction 생성에 관여하는지 궁금해져서 

해당 코드들을 까보기 시작했다

 

 

🔎 PlatformTransactionManager 구조

 

DataSourceTransactionManager와 JpaTransactionManager는 

 

  • PlatformTransactionManager를 상속한 ResourceTransactionManager를 구현하는 구현체들중 하나
  • PlatformTransactionManager를 구현한 추상 클래스인 AbstractPlatformTransactionManager를 상속 받은 구현체들중 하나

 

PlatformTransactionManager interface

PlatformTransactionManager는 3개의 메서드만 가지고 있는데

각 메서드가 2개의 구현체(ChainedTransactionManager, AbstractPlatformTransactionManager)를 가지고 있는 것을 볼 수 있듯이 

플랫폼(JDBC든, JPA 든..)에 상관 없이, 세개의 메서드를 충분히 사용해서 transaction을 시작하고 commit하고 rollback할 수 있다

 

그리고 이러한 인터페이스를 추상 클래스인 AbstractPlatformTransactionManager가 구현하고 있다

 

🔬 AbstractPlatformTransactionManager

AbstractPlatformTransactionManager 는 다양한 트랜잭션 플랫폼(JDBC, JPA 등)을 처리하기위한 주요 흐름을 정의해놓았다

 

그렇다면? DataSourceTransactionManager, JpaTransactionManager 같은 PlatformTransactionManager의 구현체들은 무슨 메서드들을 오버라이딩하고 있는 것일까??

 

코드를 더 까보니,

 PlatformTransactionManager 인터페이스를 구현하고 있는

AbstractPlatformTransactionManager의 내부 abstract 메서드들을 오버라이딩 하고 있었다

 

그래서 호출되는 플로우를 한번 보자면

  1. PlatformTransactionManager.getTransaction이 호출
  2. AbstractPlatformTransactionManager.getTransaction 이 구현 메서드이므로 호출됨
  3. getTransaction 내부에서 doGetTransaction 추상화 메서드 호출
  4. 해당 메서드는 DB 플랫폼에 맞게 각 XXXXXTransactionManager에서 구현되어 위임

즉,

  • AbstractPlatformTransactionManager의 getTransaction은 플랫폼 상관없이 공통되는 흐름제어
  • 각 플랫폼에 해당하는 로직은 AbstractPlatformTransactionManager를 상속받은 각 XXXXTransactionManager에서 구현된 메서드에 위임

으로 정리할 수 있겠다

 

하나씩 까보면서, 어떻게 DB와 통신하고 Transaction 생성 -> Commit / Rollback 되는지 살펴본다면

 

1️⃣ (Abstarct) PlatformTransactionManager.getTransaction()

AbstractPlatformTransactionManager getTransaction()에서 startTransaction()을 호출한다

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    boolean newSynchronization = this.getTransactionSynchronization() != 2;
    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    this.doBegin(transaction, definition);
    this.prepareSynchronization(status, definition);
    return status;
}

 

여기에서 doBegin 메서드 추상 메서드로,

각 XXXXXTransacactionManager 구현체에서 구현한다

 

DataSourceTransactionManager.doBegin() 에서는

  • JDBC를 이용해 DB와 커넥션을 맺고
  • JDBC를 이용해 Transaction이 시작될 수 있는 환경을 구성
  • transcation이 readonly인지 / timeout 등 transaction 관련 추가 설정
if (con.getAutoCommit()) {
    txObject.setMustRestoreAutoCommit(true);
    if (this.logger.isDebugEnabled()) {
        this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    }

    con.setAutoCommit(false);
}

 

Connection의 autoCommit을 false로 세팅하면서 (default는 true라고 한다)

각 데이터베이스들은 Transaction을 수행할 수 있게 된다

SET AUTOCOMMIT = 0;

 

  • MySQL - auto commit이 꺼지면서, 내부적으로 START TRANSACTION을 실행
  • PostgreSQL - auto commit이 꺼지면서 트래잭션이 시작되고, BEGIN 명령어가 내부적으로 실행
  • Oracle - auto commit이 꺼진 상태에서 SQL이 실행되면 암묵적으로 트랜젝션 시작

JpaTransactionManager.doBegin()에서는

코드를 이해하기 위해서, JPA와 Hibernate의 관계를 아는 것이 중요할거 같아서 서칭 후 정리해보았다

 

Hibernate

  • 자바에서 정의한 ORM 표준인 JPA를 구현한 구현체
    • Hibernate 어플리케이션과 DB간의 연결을 담당하는 Session 객체를 중심으로 Entity CRUD + Transaction 같은 작업을 영속성컨텍스트(1차 캐시)를 끼고 진행한다

 

Hibernate의 Session

  • Thread-Safe로 Request마다 생성
  • 하나의 DB Connection으로 Transaction 종료시에 연결이 같이 해제

 

Hibernate는 결국에 JPA를 구현한 것으로 Hibernate의 Session은 JPA의 EntityManager로 Wrapping한 대응되는 개념이다

 

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        JpaTransactionObject txObject = (JpaTransactionObject)transaction;
        if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            throw new IllegalTransactionStateException("Pre-bound JDBC Connection found! JpaTransactionManager does not support running within DataSourceTransactionManager if told to manage the DataSource itself. It is recommended to use a single JpaTransactionManager for all transactions on a single DataSource, no matter whether JPA or JDBC access.");
        } else {
            try {
                EntityManager em;
                if (!txObject.hasEntityManagerHolder() || txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {
                    em = this.createEntityManagerForTransaction();
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Opened new EntityManager [" + em + "] for JPA transaction");
                    }

                    txObject.setEntityManagerHolder(new EntityManagerHolder(em), true);
                }

                em = txObject.getEntityManagerHolder().getEntityManager();
                int timeoutToUse = this.determineTimeout(definition);
                Object transactionData = this.getJpaDialect().beginTransaction(em, new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder()));
                txObject.setTransactionData(transactionData);
                txObject.setReadOnly(definition.isReadOnly());
                if (timeoutToUse != -1) {
                    txObject.getEntityManagerHolder().setTimeoutInSeconds(timeoutToUse);
                }
                ... 생략

 

transaction이 새로 시작되면 

createEntityManagerForTransaction() 내부 메서드에서 EntityManagerFactory는 Hibernate의 SessionFactoryImpl에 

 

Session 즉, EntityManager 생성을 위임한다

 

Transaction을 시작시킬 때에는

jpaDialect 즉, DB의 종류에 따라 entityManager를 가지고 transaction을 시작하는 작업을 시작한다

 

Hibernate 구현체

HibernateJpaDialect는 beginTransaction에서 

  • entityManager를 Unwraping해서 즉 JPA를 걷어내서 -> Session을 얻어내고, Session을 가지고 진행할 수 있는 DB연결 / 영속성컨텍스트 세팅 등 작업 진행
  • entityManager를 가지고 Transaction 시작

을 위임받아 작업한다

2️⃣ (Abstract)PlatformTransactionManager.commit()

 

processCommit은 TransactionStatus를 파라미터로 넘겨주며 

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                this.prepareForCommit(status);
                this.triggerBeforeCommit(status);
                this.triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        this.logger.debug("Releasing transaction savepoint");
                    }

                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                } else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        this.logger.debug("Initiating transaction commit");
                    }

                    unexpectedRollback = status.isGlobalRollbackOnly();
                    this.doCommit(status);
                } else if (this.isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }
                ... 생략

 

  • commit 전에 Transaction 동기화 작업을 진행하고 (추후 TransactionSynchronization에 대해서 정리해보고자 한다..)
  • Save Point 들이 있으면, 해당 Save Point들을 해제해주고
  • 새로운 Transaction이면, processCommit 안에 doCommit protected 메서드를 호출하고 있다

 

여기에서 doCommit은 추상 메서드로 

각 XXXXXTransacactionManager 구현체에서 구현한다

 

DataSourceTransactionManager.doCommit() 에서는 

    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject)status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            this.logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }

        try {
            con.commit();
        } catch (SQLException var5) {
            throw this.translateException("JDBC commit", var5);
        }
    }

 

Thread 별로 만들어진 DB Connection을 가져와서

commit을 DB로 날려준다

 

JpaTransactionManager.doCommit() 에서는

    protected void doCommit(DefaultTransactionStatus status) {
        JpaTransactionObject txObject = (JpaTransactionObject)status.getTransaction();
        if (status.isDebug()) {
            this.logger.debug("Committing JPA transaction on EntityManager [" + txObject.getEntityManagerHolder().getEntityManager() + "]");
        }

        try {
            EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();
            tx.commit();
        } catch (RollbackException var6) {
            Throwable var5 = var6.getCause();
            if (var5 instanceof RuntimeException runtimeException) {
                DataAccessException dae = this.getJpaDialect().translateExceptionIfPossible(runtimeException);
                if (dae != null) {
                    throw dae;
                }
            }

            throw new TransactionSystemException("Could not commit JPA transaction", var6);
        } catch (RuntimeException var7) {
            throw DataAccessUtils.translateIfNecessary(var7, this.getJpaDialect());
        }
    }

 

status에서 EntityManager의 EntityTransaction을 가져와서, commit을 진행한다

 

Connection에서 바로 commit하는 DataSourceTransactionManager와는 달리,

EntityTransaction을 이용해서 commit을 하는데 이는 JPA의 영속성 컨텍스트랑 엮여 있기 때문에 그렇다

 

JPA에서 Transaction이 종료될때는

  1. 영속성 컨텍스트에서 Dirty 상태인 Entity들을 변경감지(Dirty Checking)하여 flush해준다
    • 변경된 엔티티에 대해서는 UPDATE 쿼리 생성
    • 생성된 엔티티는 INSERT 쿼리 생성
    • 삭제된 엔티티는 DELETE 쿼리 생성
  2. Commit 이전에 위의 변경감지에서 생성된 SQL을 실행 및 Transaction Log에 기록
  3. 데이터베이스와 영속성컨텍스트 동기화
  4. Commit 발생하고 -> SQL 실행 / Rollback 발생시, Transaction Log 보면서 rollback
  5. Commit 성공 후 영속성 컨텍스트 초기화

가 순서대로 발생된다. 영속성 컨텍스트가 엮여있기 때문에 DataSourceTransactionManager의 Commit보다 당연히 복잡한 프로세스가 요구되는데

 

JtaTransactionCoordinatorImpl

 

이러한 프로세스의 관리를 Hibernate가 호출한 JtaTransactionCoordinatorImpl 이 위임받아서 진행하게 된다

flushBeforeTransactionCompletion을 통해 확인할 수 있듯이,

JDBC로 commit 전에 flush를 진행하는 것을 확인 할 수 있다

 

 

💡TransactionManager를 찾아보면서, TransactionManager 모듈이 어떻게 구성되어있고, Transaction이 ORM과 JDBC를 사용했을 때 어떻게 생성되는지 확인해볼 수 있었다 

다음에는 JTA에 대해서도 알아보고, 다중 데이터소스가 있을 때 transaction은 어떻게 관리되는지도 공부하면 좋을거 같다