Study/Spring

[Spring Batch] CompositeItemWriter

_gayeon 2022. 6. 26. 20:06

하나의 Step에서 여러개의 ItemWriter를 사용하는 방법

 

Issue

CompositeItemWriter에서 사용할 두 개의 writer 가 데이터를 받는 형식이 달라 문제가 발생하였다.

이전에 겪었던 문제에서 들었던 예시로 설명해보면 다음과 같다.

  • Sentence를 Reader에서 읽고
  • 띄어쓰기를 기준으로 Processor에서 Split해서 Word 들을 생성하고
  • Writer에서는 List<Word>를 받는 상황

이때, Writer를 두 개 생성하여 하나는 word를 저장하고 나머지는 문장이 처리 완료되었다고 sentence_id를 기준으로 상태값을 업데이트 한다고 해보자. (하나의 sentence에서 split된 word는 같은 sentence_id를 가지고 있다.)

jdbcBatchItemListWriter는 앞서 재정의했던 JdbcBatchItemListWriter를 사용해서 Processor로 부터 List<Word> 값을 받아서 insert 쿼리를 실행하고,

jdbcBatchListOfFirstItemWriter는 sentence_id를 기준으로 값을 업데이트한다. 문제는 writer가 받는 데이터 포맷을 통일해야 하기 때문에 jdbcBatchListOfFirstItemWriter에서도 List<Word>를 받아야한다.

즉, JdbcBatchItemWriter로 jdbcBatchListOfFirstItemWriter를 만들 수 없다.

 

JdbcBatchItemListWriter는 아래 링크 참고.

https://gayeon-cs.tistory.com/97

 

[Error 해결] JdbcBatchItemWriter로 리스트 처리하기

Issue Spring Batch에서 기본적으로 제공하는 Writer인 JdbcBatchItemWriter로 리스트 값을 처리하려고 하자 문제가 생겼습니다. ItemReader와 ItemProcessor는 하나의 item을 처리하고 chunk단위로 item을 ItemW..

gayeon-cs.tistory.com

 

Solution

JdbcBatchItemListWriter처럼 List<T> 형태로 Processor에서 데이터를 받는 JdbcBatchItemWriter를 상속받는 JdbcBatchListOfFirstItemWriter를 생성했다. 그리고, 해당 리스트의 첫번째 인덱스의 값만 update하도록 write 함수를 재정의 하였다.

코드는 아래와 같다.

import org.springframework.batch.item.database.JdbcBatchItemWriter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/*
- Processor에서 List로 Entity를 전달하지만 첫번째 value만 처리하기 위해 write 함수 재정의
 */
public class JdbcBatchListOfFirstItemWriter<T> extends JdbcBatchItemWriter<List<T>>  {
    private final JdbcBatchItemWriter<T> jdbcBatchItemWriter;

    public JdbcBatchListOfFirstItemWriter(JdbcBatchItemWriter<T> jdbcBatchItemWriter){
        this.jdbcBatchItemWriter = jdbcBatchItemWriter;
    }

    @Override
    public void write(List<? extends List<T>> items) {
        List<T> totalList = new ArrayList<>();

        for(List<T> list : items){
            totalList.addAll(Collections.singleton(list.get(0)));
        }

        try {
            jdbcBatchItemWriter.write(totalList);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

이용방법은 아래와 같다.

 

@Bean		
public Job job(){
    return jobBuilderFactory.get("job")
            .start(convertStep())
            .build();
}

@Bean
public Step convertStep(){
    return stepBuilderFactory.get("convertStep")
            .<Sentence, List<Word>>chunk(chunkSize)
            .reader(jdbcCursorItemReader())
            .processor(processor)
            .writer(jdbcBatchItemListWriter())
            .build();
}

@Bean
public CompositeItemWriter compositeItemWriter(){
    final CompositeItemWriter compositeItemWriter = new CompositeItemWriter<>()
    compositeItemWriter.setDelegates(Arrays.asList(jdbcBatchItemListWriter(), jdbcBatchListOfFirstItemWriter()));
    return compositeItemWriter;
}

public JdbcBatchItemListWriter<Word> jdbcBatchItemListWriter() {
    String insertWordQuery = "INSERT INTO word_tb(word) VALUES (:word)";

    JdbcBatchItemWriter<Word> writer = new JdbcBatchItemWriter<>();
    writer.setDataSource(dataSource);
    writer.setSql(insertWordQuery);
    writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
    writer.afterPropertiesSet();
    return new JdbcBatchItemListWriter<>(writer);
}

	public JdbcBatchListOfFirstItemWriter<Word> jdbcBatchListOfFirstItemWriter() {
        String insertWordQuery =
                "UPDATE status SET work_state = 'C' WHERE id = :sentence_id";

        JdbcBatchItemWriter<Word> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql(insertWordQuery);
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet();
        return new JdbcBatchListOfFirstItemWriter<>(writer);
    }