반응형
필자는 chunk 방식에서 transaction이 언제 설정이 되고 사용되는지 확인하고 싶어서 학습을 시작하게 되었습니다
조사하다 보니 chunk 관련 글로 변경이 되었습니다
일부 코드를 생략을 했지만 필요한 코드는 주석으로 추가 설명을 작성하였습니다.
Spring Batch Chunk
TaskletStep
정의
- transactional 관리하는 객체와 실제 개발자가 작성한 item reader, processor, writer를 호출 하는
ChunkOrientedTasklet
를 관리하는 객체 - AbstractStep을 상속받은 구현체(전체 flow가 들어있는 Template Callback 패턴 abstract class)
Flow
public class TaskletStep extends AbstractStep {
private Tasklet tasklet; // ChunkOrientedTasklet 를 가지게 됩니다
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
stream.update(stepExecution.getExecutionContext());
getJobRepository().updateExecutionContext(stepExecution);
// Shared semaphore per step execution, so other step executions can run
// in parallel without needing the lock
final Semaphore semaphore = createSemaphore();
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { // iterator 내부에서 doInChunkContext method를 호출 하게 됩니다
@Override
public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
// Before starting a new transaction, check for
// interruption.
interruptionPolicy.checkInterrupted(stepExecution);
RepeatStatus result;
try {
result = new TransactionTemplate(transactionManager, transactionAttribute) // transactional Template 호출 template call back 패턴으로 처리
.execute(new ChunkTransactionCallback(chunkContext, semaphore));
} catch (UncheckedTransactionException e) {
// Allow checked exceptions to be thrown inside callback
throw (Exception) e.getCause();
}
chunkListener.afterChunk(chunkContext);
// Check for interruption after transaction as well, so that
// the interrupted exception is correctly propagated up to
// caller
interruptionPolicy.checkInterrupted(stepExecution);
return result == null ? RepeatStatus.FINISHED : result;
}
});
}
private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback<RepeatStatus> {
@Override
public RepeatStatus doInTransaction(TransactionStatus status) { // transactional 내부의 biz 로직 호출
// codes...
try {
try {
try {
result = tasklet.execute(contribution, chunkContext); // 한개의 chunk에 대해서 read process write 로직 호출
// TaskletStep(outer class)의 tasklet를 사용하게 됩니다
if (result == null) {
result = RepeatStatus.FINISHED;
}
} catch (Exception e) {
if (transactionAttribute.rollbackOn(e)) {
chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e); // 현재 transaction에 대해서 rollback 처리 할수 있도록 변경
throw e;
}
}
} finally {
// If the step operations are asynchronous then we need
// to synchronize changes to the step execution (at a
// minimum). Take the lock *before* changing the step
// execution.
try {
semaphore.acquire();
locked = true;
} catch (InterruptedException e) {
logger.error("Thread interrupted while locking for repository update");
stepExecution.setStatus(BatchStatus.STOPPED);
stepExecution.setTerminateOnly();
Thread.currentThread().interrupt();
}
// Apply the contribution to the step
// even if unsuccessful
if (logger.isDebugEnabled()) {
logger.debug("Applying contribution: " + contribution);
}
stepExecution.apply(contribution);
}
stepExecutionUpdated = true;
stream.update(stepExecution.getExecutionContext());
try { // job repository에 관한 추가 처리 진행(성공 실패에 대한 정보를 DB에 적제 하는 로직
getJobRepository().updateExecutionContext(stepExecution); // step정보 저장
stepExecution.incrementCommitCount();
if (logger.isDebugEnabled()) {
logger.debug("Saving step execution before commit: " + stepExecution);
}
getJobRepository().update(stepExecution);
} catch (Exception e) { // job repository 데이터 저장 실패
String msg = "JobRepository failure forcing rollback";
logger.error(msg, e);
throw new FatalStepExecutionException(msg, e);
}
} catch (Error e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
throw e;
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
}
rollback(stepExecution);
// Allow checked exceptions
throw new UncheckedTransactionException(e);
}
return result;
}
}
}
public class TransactionTemplate extends DefaultTransactionDefinition
implements TransactionOperations, InitializingBean {
@Override
@Nullable
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
} else {
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
result = action.doInTransaction(status); // transactional 생성 시점
} catch (RuntimeException | Error ex) {
// Transactional code threw application exception -> rollback
rollbackOnException(status, ex);
throw ex;
} catch (Throwable ex) {
// Transactional code threw 예상치 못한 exception 발생 -> rollback 처리
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
this.transactionManager.commit(status); // 정상 동작 commit 동작
return result;
}
}
}
TaskletStep
이RepeateTemplate
의executeInternal
method를 호출RepeateTemplate
의executeInternal
method는 chunk 단위의 로직을 반복적으로 호출 하는 전체 flow logic이 들어 있습니다RepeateTemplate
는 오직 반복적으로 chunk 단위로 나눈 횟수 만큼StepContextRepeatCallback
의doInChunkContext
method만 호출해준다고 보면 됩니다RepeateTemplate
의executeInternal
method는 내부적으로 transactional이 있지만 해당 부분은 외부에서 주입 받은StepContextRepeatCallback
class를 통해서 처리가 이뤄 집니다StepContextRepeatCallback
의doInChunkContext
를 호출하게 되면,TransactionTemplate
객체를 생성하게 되는데 해당 객체가 transactional을 처리하는 class 입니다
StepContextRepeatCallback
의doInChunkContext
method는 transactional 처리와ChunkTransactionCallback
객체를 통해서ChunkOrientedTasklet
의 chunk 단위 로직을 호출하는 부분 입니다ChunkTransactionCallback
는TaskletStep
의 내부 클래스 이기 때문에TaskletStep
이 가지고 있는ChunkOrientedTasklet
객체에 접근을 할 수 있습니다
ChunkOrientedTasklet
의 비즈니스 로직을 호출 후 처리 transaction 처리 완료 후 종료를 하게 됩니다
ChunkOrientedTasklet
정의
- 스프링 배치에서 제공하는 tasklet의 구현체이고, Chunk 지향 프로세싱을 담당
- read -> process -> write 작업을 진행
- spring batch에서는
TaskletStep
내부에 있는ChunkTransactionCallback
라는 객체가TaskletStep
class가 가지고 있는ChunkOrientedTasklet
의 execute를 호출해서 비즈니스 로직을 실행합니다
public class ChunkOrientedTasklet<I> implements Tasklet {
@Nullable
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
@SuppressWarnings("unchecked")
Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); // reader chunk size만큼 데이터 읽어들임
if (inputs == null) {
inputs = chunkProvider.provide(contribution);
if (buffering) {
chunkContext.setAttribute(INPUTS_KEY, inputs);
}
}
chunkProcessor.process(contribution, inputs); // processor를 통해서 중간 연산 처리 후 writer를 통해 write logic 호출 하는 기능을 제공합니다
chunkProvider.postProcess(contribution, inputs);
// Allow a message coming back from the processor to say that we
// are not done yet
if (inputs.isBusy()) {
logger.debug("Inputs still busy");
return RepeatStatus.CONTINUABLE;
}
chunkContext.removeAttribute(INPUTS_KEY);
chunkContext.setComplete();
if (logger.isDebugEnabled()) {
logger.debug("Inputs not busy, ended: " + inputs.isEnd());
}
return RepeatStatus.continueIf(!inputs.isEnd());
}
}
특징
- Reader, Processor, Writer를 사용해서 chunk 기반의 데이터 입출력 처리를 담당
- TaskletStep에 의해서 반복적으로 실행, ChunkOrientedTasklet이 실행 될 때 마다 새로운 Transaction이 생성
- exception 발생시 chunk 를 rollback, 이전 commit은 완료됩니다
- 데이터 읽기 처리는(ItemReader) ChunkProvider(구현체: SimpleChunkProvider)가 담당
- chunk size만큼 read 진행할때 repeatOperations는 chunk size만큼 iterate를 진행합니다
- repeatOperations doRead라는 API를 호출해서 데이터 1 개씩 읽어 들이게 되고, input이라는 변수에 저장 후 반환
- 데이터 가공(ItemProcessor) 및 데이터 변경 및 추가는(ItemWriter)는
ChunkProcessor
가 담당 합니다ChunkProcessor
는transform
method를 통해서 processor 로직을 호출하게 됩니다ChunkProcessor
는write
method를 통해서 writer 로직을 호출하게 됩니다
public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { @Override public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { // Allow temporary state to be stored in the user data field initializeUserData(inputs); if (isComplete(inputs)) { // input이 없을 경우 아무 것도 하지 않고 종료 return; } Chunk<O> outputs = transform(contribution, inputs); // processor에 정의한 로직 실행, remove로 iterator의 데이터가 삭제되면 exception 발생 // Adjust the filter count based on available data contribution.incrementFilterCount(getFilterCount(inputs, outputs)); // Adjust the outputs if necessary for housekeeping purposes, and then // write them out... write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); // process에서 처리한 결과를 바탕으로 저장 하는 기능을 제공 } }
반응형
'Spring > Spring Batch' 카테고리의 다른 글
Batch Test Code 작성 및 이슈 해결 (0) | 2022.11.11 |
---|