[Spring Batch] CompositeItemWriter
하나의 Step에서 여러개의 ItemWriter를 사용하는 방법
Issue
CompositeItemWriter에서 사용할 두 개의 writer 가 데이터를 받는 형식이 달라 문제가 발생하였다.
이전에 겪었던 문제에서 들었던 예시로 설명해보면 다음과 같다.
- Sentence를 Reader에서 읽고
- 띄어쓰기를 기준으로 Processor에서 Split해서 Word 들을 생성하고
- Writer에서는 List<Word>를 받는 상황
이때, Writer를 두 개 생성하여 하나는 word를 저장하고 나머지는 문장이 처리 완료되었다고 sentence_id를 기준으로 상태값을 업데이트 한다고 해보자. (하나의 sentence에서 split된 word는 같은 sentence_id를 가지고 있다.)
jdbcBatchItemListWriter는 앞서 재정의했던 JdbcBatchItemListWriter를 사용해서 Processor로 부터 List<Word> 값을 받아서 insert 쿼리를 실행하고,
jdbcBatchListOfFirstItemWriter는 sentence_id를 기준으로 값을 업데이트한다. 문제는 writer가 받는 데이터 포맷을 통일해야 하기 때문에 jdbcBatchListOfFirstItemWriter에서도 List<Word>를 받아야한다.
즉, JdbcBatchItemWriter로 jdbcBatchListOfFirstItemWriter를 만들 수 없다.
JdbcBatchItemListWriter는 아래 링크 참고.
https://gayeon-cs.tistory.com/97
[Error 해결] JdbcBatchItemWriter로 리스트 처리하기
Issue Spring Batch에서 기본적으로 제공하는 Writer인 JdbcBatchItemWriter로 리스트 값을 처리하려고 하자 문제가 생겼습니다. ItemReader와 ItemProcessor는 하나의 item을 처리하고 chunk단위로 item을 ItemW..
gayeon-cs.tistory.com
Solution
JdbcBatchItemListWriter처럼 List<T> 형태로 Processor에서 데이터를 받는 JdbcBatchItemWriter를 상속받는 JdbcBatchListOfFirstItemWriter를 생성했다. 그리고, 해당 리스트의 첫번째 인덱스의 값만 update하도록 write 함수를 재정의 하였다.
코드는 아래와 같다.
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/*
- Processor에서 List로 Entity를 전달하지만 첫번째 value만 처리하기 위해 write 함수 재정의
*/
public class JdbcBatchListOfFirstItemWriter<T> extends JdbcBatchItemWriter<List<T>> {
private final JdbcBatchItemWriter<T> jdbcBatchItemWriter;
public JdbcBatchListOfFirstItemWriter(JdbcBatchItemWriter<T> jdbcBatchItemWriter){
this.jdbcBatchItemWriter = jdbcBatchItemWriter;
}
@Override
public void write(List<? extends List<T>> items) {
List<T> totalList = new ArrayList<>();
for(List<T> list : items){
totalList.addAll(Collections.singleton(list.get(0)));
}
try {
jdbcBatchItemWriter.write(totalList);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
이용방법은 아래와 같다.
@Bean
public Job job(){
return jobBuilderFactory.get("job")
.start(convertStep())
.build();
}
@Bean
public Step convertStep(){
return stepBuilderFactory.get("convertStep")
.<Sentence, List<Word>>chunk(chunkSize)
.reader(jdbcCursorItemReader())
.processor(processor)
.writer(jdbcBatchItemListWriter())
.build();
}
@Bean
public CompositeItemWriter compositeItemWriter(){
final CompositeItemWriter compositeItemWriter = new CompositeItemWriter<>()
compositeItemWriter.setDelegates(Arrays.asList(jdbcBatchItemListWriter(), jdbcBatchListOfFirstItemWriter()));
return compositeItemWriter;
}
public JdbcBatchItemListWriter<Word> jdbcBatchItemListWriter() {
String insertWordQuery = "INSERT INTO word_tb(word) VALUES (:word)";
JdbcBatchItemWriter<Word> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql(insertWordQuery);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return new JdbcBatchItemListWriter<>(writer);
}
public JdbcBatchListOfFirstItemWriter<Word> jdbcBatchListOfFirstItemWriter() {
String insertWordQuery =
"UPDATE status SET work_state = 'C' WHERE id = :sentence_id";
JdbcBatchItemWriter<Word> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql(insertWordQuery);
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();
return new JdbcBatchListOfFirstItemWriter<>(writer);
}