Study/Spring

[Error 해결] JdbcBatchItemWriter로 리스트 처리하기

_gayeon 2022. 6. 26. 14:48

Issue

Spring Batch에서 기본적으로 제공하는 Writer인 JdbcBatchItemWriter로 리스트 값을 처리하려고 하자 문제가 생겼습니다.

ItemReader와 ItemProcessor는 하나의 item을 처리하고 chunk단위로 item을 ItemWriter로 넘기게 됩니다. 즉, ItemProcessor는 처리한 데이터를 하나씩 return 하게 됩니다.

문제는 제가 처리하고 싶은 로직이 ItemProcessor에서 여러 건의 데이터를 return 하고 싶다는 것입니다. 예를 들어, Reader에서 문자열을 읽고 문자열을 한 건씩 ItemProcessor로 넘겼을 때 ItemProcessor는 해당 문자열을 띄어쓰기 기준으로 파싱해서 단어 리스트를 ItemWriter로 보내야 합니다. 기존 ItemWriter는 ItemProcessor가 보내는 Item을 chunk 사이즈만큼(List) 처리하는 것이지, List<List<>> 를 처리하고 있지 않기 때문에 정상적으로 동작하지 않습니다.

Solution

위와 같은 상황을 해결하기 위해,

다음과 같은 JdbcBatchItemWriter의 write 메소드를

public void write(final List<? extends T> items) throws Exception {
	...
}        

아래와 같이 overriding 해주는 새로운 JdbcBatchItemListWriter를 생성했습니다.

import org.springframework.batch.item.database.JdbcBatchItemWriter;

import java.util.ArrayList;
import java.util.List;

public class JdbcBatchItemListWriter<T> extends JdbcBatchItemWriter<List<T>>  {
    private final JdbcBatchItemWriter<T> jdbcBatchItemWriter;

    public JdbcBatchItemListWriter(JdbcBatchItemWriter<T> jdbcBatchItemWriter){
        this.jdbcBatchItemWriter = jdbcBatchItemWriter;
    }

    @Override
    public void write(List<? extends List<T>> items) {
        List<T> totalList = new ArrayList<>();

        for(List<T> list : items){
            totalList.addAll(list);
        }

        try {
            jdbcBatchItemWriter.write(totalList);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

}

위에서 가정한 문자열을 단어로 저장하는 로직으로 정리된 Job 메인 코드는 다음과 같습니다.

public class JobConfig {

    @Qualifier("DataSource")
    private final DataSource dataSource;
    private static final int chunkSize = 1000;    //트랜잭션 범위

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final Processor processor;

		@Bean		
		public Job job(){
        return jobBuilderFactory.get("job")
                .start(convertStep())
                .build();
    }

		@Bean
    public Step convertStep(){
        return stepBuilderFactory.get("convertStep")
                .<Sentence, List<Word>>chunk(chunkSize)
                .reader(jdbcCursorItemReader())
                .processor(processor)
                .writer(jdbcBatchItemListWriter())
                .build();
    }

		@Bean
    public JdbcCursorItemReader<Sentence> jdbcCursorItemReader() {
        String getSentenceQuery =
                "SELECT sentece From sentence_tb"

        return new JdbcCursorItemReaderBuilder<Sentence>()
                .fetchSize(chunkSize)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Sentence.class))
                .sql(getSentenceQuery)
                .name("jdbcCursorItemReader")
                .build();
    }

		public JdbcBatchItemListWriter<Word> jdbcBatchItemListWriter() {
        String insertWordQuery =
                "INSERT INTO word_tb(word) VALUES (:word)";

        JdbcBatchItemWriter<Word> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql(insertWordQuery);
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
        writer.afterPropertiesSet();
        return new JdbcBatchItemListWriter<>(writer);
    }

참고

https://jojoldu.tistory.com/140