본문 바로가기
Spring/Spring Batch

Spring Batch Chunk와 Transaction 설정

by clearinging 2022. 8. 9.
반응형
필자는 chunk 방식에서 transaction이 언제 설정이 되고 사용되는지 확인하고 싶어서 학습을 시작하게 되었습니다
조사하다 보니 chunk 관련 글로 변경이 되었습니다
일부 코드를 생략을 했지만 필요한 코드는 주석으로 추가 설명을 작성하였습니다.

Spring Batch Chunk

TaskletStep

정의

  • transactional 관리하는 객체와 실제 개발자가 작성한 item reader, processor, writer를 호출 하는 ChunkOrientedTasklet를 관리하는 객체
  • AbstractStep을 상속받은 구현체(전체 flow가 들어있는 Template Callback 패턴 abstract class)

Flow

 public class TaskletStep extends AbstractStep {

    private Tasklet tasklet; // ChunkOrientedTasklet 를 가지게 됩니다

    @Override
    protected void doExecute(StepExecution stepExecution) throws Exception {
        stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
        stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

        stream.update(stepExecution.getExecutionContext());
        getJobRepository().updateExecutionContext(stepExecution);

        // Shared semaphore per step execution, so other step executions can run
        // in parallel without needing the lock
        final Semaphore semaphore = createSemaphore();

        stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { // iterator 내부에서 doInChunkContext method를 호출 하게 됩니다
            @Override
            public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
                    throws Exception {

                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

                // Before starting a new transaction, check for
                // interruption.
                interruptionPolicy.checkInterrupted(stepExecution);

                RepeatStatus result;
                try {
                    result = new TransactionTemplate(transactionManager, transactionAttribute) // transactional Template 호출 template call back 패턴으로 처리
                            .execute(new ChunkTransactionCallback(chunkContext, semaphore));
                } catch (UncheckedTransactionException e) {
                    // Allow checked exceptions to be thrown inside callback
                    throw (Exception) e.getCause();
                }

                chunkListener.afterChunk(chunkContext);

                // Check for interruption after transaction as well, so that
                // the interrupted exception is correctly propagated up to
                // caller
                interruptionPolicy.checkInterrupted(stepExecution);

                return result == null ? RepeatStatus.FINISHED : result;
            }

        });

    }

    private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback<RepeatStatus> {

        @Override
        public RepeatStatus doInTransaction(TransactionStatus status) { // transactional 내부의 biz 로직 호출
            // codes...

            try {

                try {
                    try {
                        result = tasklet.execute(contribution, chunkContext); // 한개의 chunk에 대해서 read process write 로직 호출
                        // TaskletStep(outer class)의 tasklet를 사용하게 됩니다
                        if (result == null) {
                            result = RepeatStatus.FINISHED;
                        }
                    } catch (Exception e) {
                        if (transactionAttribute.rollbackOn(e)) {
                            chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e); // 현재 transaction에 대해서 rollback 처리 할수 있도록 변경
                            throw e;
                        }
                    }
                } finally {

                    // If the step operations are asynchronous then we need
                    // to synchronize changes to the step execution (at a
                    // minimum). Take the lock *before* changing the step
                    // execution.
                    try {
                        semaphore.acquire();
                        locked = true;
                    } catch (InterruptedException e) {
                        logger.error("Thread interrupted while locking for repository update");
                        stepExecution.setStatus(BatchStatus.STOPPED);
                        stepExecution.setTerminateOnly();
                        Thread.currentThread().interrupt();
                    }

                    // Apply the contribution to the step
                    // even if unsuccessful
                    if (logger.isDebugEnabled()) {
                        logger.debug("Applying contribution: " + contribution);
                    }
                    stepExecution.apply(contribution);
                }

                stepExecutionUpdated = true;

                stream.update(stepExecution.getExecutionContext());

                try { // job repository에 관한 추가 처리 진행(성공 실패에 대한 정보를 DB에 적제 하는 로직
                    getJobRepository().updateExecutionContext(stepExecution); // step정보 저장
                    stepExecution.incrementCommitCount();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Saving step execution before commit: " + stepExecution);
                    }
                    getJobRepository().update(stepExecution);
                } catch (Exception e) { // job repository 데이터 저장 실패
                    String msg = "JobRepository failure forcing rollback";
                    logger.error(msg, e);
                    throw new FatalStepExecutionException(msg, e);
                }

            } catch (Error e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                throw e;
            } catch (RuntimeException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                throw e;
            } catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
                }
                rollback(stepExecution);
                // Allow checked exceptions
                throw new UncheckedTransactionException(e);
            }

            return result;

        }
    }
}
 public class TransactionTemplate extends DefaultTransactionDefinition
        implements TransactionOperations, InitializingBean {

    @Override
    @Nullable
    public <T> T execute(TransactionCallback<T> action) throws TransactionException {
        Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");

        if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
            return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
        } else {
            TransactionStatus status = this.transactionManager.getTransaction(this);
            T result;
            try {
                result = action.doInTransaction(status); // transactional 생성 시점
            } catch (RuntimeException | Error ex) {
                // Transactional code threw application exception -> rollback
                rollbackOnException(status, ex);
                throw ex;
            } catch (Throwable ex) {
                // Transactional code threw 예상치 못한 exception 발생 -> rollback 처리
                rollbackOnException(status, ex);
                throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
            }
            this.transactionManager.commit(status); // 정상 동작 commit 동작
            return result;
        }
    }
}
  • TaskletStepRepeateTemplateexecuteInternal method를 호출
  • RepeateTemplateexecuteInternal method는 chunk 단위의 로직을 반복적으로 호출 하는 전체 flow logic이 들어 있습니다
    • RepeateTemplate는 오직 반복적으로 chunk 단위로 나눈 횟수 만큼 StepContextRepeatCallbackdoInChunkContext method만 호출해준다고 보면 됩니다
    • RepeateTemplateexecuteInternal method는 내부적으로 transactional이 있지만 해당 부분은 외부에서 주입 받은 StepContextRepeatCallback class를 통해서 처리가 이뤄 집니다
    • StepContextRepeatCallbackdoInChunkContext 를 호출하게 되면, TransactionTemplate 객체를 생성하게 되는데 해당 객체가 transactional을 처리하는 class 입니다
  • StepContextRepeatCallbackdoInChunkContext method는 transactional 처리와 ChunkTransactionCallback 객체를 통해서 ChunkOrientedTasklet의 chunk 단위 로직을 호출하는 부분 입니다
    • ChunkTransactionCallbackTaskletStep의 내부 클래스 이기 때문에 TaskletStep이 가지고 있는 ChunkOrientedTasklet 객체에 접근을 할 수 있습니다
  • ChunkOrientedTasklet의 비즈니스 로직을 호출 후 처리 transaction 처리 완료 후 종료를 하게 됩니다

ChunkOrientedTasklet

정의

  • 스프링 배치에서 제공하는 tasklet의 구현체이고, Chunk 지향 프로세싱을 담당
  • read -> process -> write 작업을 진행
  • spring batch에서는 TaskletStep내부에 있는 ChunkTransactionCallback라는 객체가 TaskletStep class가 가지고 있는 ChunkOrientedTasklet의 execute를 호출해서 비즈니스 로직을 실행합니다
 public class ChunkOrientedTasklet<I> implements Tasklet {

    @Nullable
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        @SuppressWarnings("unchecked")
        Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); // reader chunk size만큼 데이터 읽어들임
        if (inputs == null) {
            inputs = chunkProvider.provide(contribution);
            if (buffering) {
                chunkContext.setAttribute(INPUTS_KEY, inputs);
            }
        }

        chunkProcessor.process(contribution, inputs); // processor를 통해서 중간 연산 처리 후 writer를 통해 write logic 호출 하는 기능을 제공합니다
        chunkProvider.postProcess(contribution, inputs);

        // Allow a message coming back from the processor to say that we
        // are not done yet
        if (inputs.isBusy()) {
            logger.debug("Inputs still busy");
            return RepeatStatus.CONTINUABLE;
        }

        chunkContext.removeAttribute(INPUTS_KEY);
        chunkContext.setComplete();

        if (logger.isDebugEnabled()) {
            logger.debug("Inputs not busy, ended: " + inputs.isEnd());
        }
        return RepeatStatus.continueIf(!inputs.isEnd());

    }
}

특징

  • Reader, Processor, Writer를 사용해서 chunk 기반의 데이터 입출력 처리를 담당
  • TaskletStep에 의해서 반복적으로 실행, ChunkOrientedTasklet이 실행 될 때 마다 새로운 Transaction이 생성
  • exception 발생시 chunk 를 rollback, 이전 commit은 완료됩니다
  • 데이터 읽기 처리는(ItemReader) ChunkProvider(구현체: SimpleChunkProvider)가 담당
    • chunk size만큼 read 진행할때 repeatOperations는 chunk size만큼 iterate를 진행합니다
    • repeatOperations doRead라는 API를 호출해서 데이터 1 개씩 읽어 들이게 되고, input이라는 변수에 저장 후 반환
  • 데이터 가공(ItemProcessor) 및 데이터 변경 및 추가는(ItemWriter)는 ChunkProcessor가 담당 합니다
    • ChunkProcessortransform method를 통해서 processor 로직을 호출하게 됩니다
    • ChunkProcessorwrite method를 통해서 writer 로직을 호출하게 됩니다
    public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
    
      @Override
      public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
          // Allow temporary state to be stored in the user data field
          initializeUserData(inputs);
    
          if (isComplete(inputs)) { // input이 없을 경우 아무 것도 하지 않고 종료
              return;
          }
    
          Chunk<O> outputs = transform(contribution, inputs); // processor에 정의한 로직 실행, remove로 iterator의 데이터가 삭제되면 exception 발생
    
          // Adjust the filter count based on available data
          contribution.incrementFilterCount(getFilterCount(inputs, outputs));
    
          // Adjust the outputs if necessary for housekeeping purposes, and then
          // write them out...
          write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); // process에서 처리한 결과를 바탕으로 저장 하는 기능을 제공
    
      }
    }
반응형

'Spring > Spring Batch' 카테고리의 다른 글

Batch Test Code 작성 및 이슈 해결  (0) 2022.11.11