SpringBatchで複数ファイルをマルチスレッドでDBに突っ込む

■ INPUT FILE(hoge1~5.csv) hoge1.csv

1,aaa
2,bbb
・・・

hoge5.csv

51,aaa
52,bbb
・・・

~hoge2.csvからhoge4.csv同じような感じなので省略~   ■ OUTPUT DB(hogeテーブル)

mysql> create database hoge;
Query OK, 1 row affected (0.00 sec)
mysql> use hoge
Database changed
mysql> create table hoge (
 -> hoge_id int unsigned not null,
 -> hoge_name varchar(20) not null,
 -> hoge_value varchar(20) not null
 -> );
Query OK, 0 rows affected (0.11 sec)

  ■ Job定義XML(長いので抜粋…) DBまわり

<import resource="HogeDB.xml"/>

Job。Stepにparent付けてマスターを指定

<batch:job id="hogeMultiJob" job-repository="jobRepository">
 <batch:step id="hogeStep" parent="hogeStepMaster" />
</batch:job>

parentで指定したマスター。 パーティショニングするところで結構複雑なので中身の詳細は追って。。。

<bean id="hogeStepMaster" class="org.springframework.batch.core.partition.support.PartitionStep">
 <property name="jobRepository" ref="jobRepository" />
 <property name="stepExecutionSplitter">
  <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter">
   <constructor-arg ref="jobRepository" />
   <constructor-arg ref="step1" />
   <constructor-arg>
    <bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
     <property name="resources" value="file:/home/eshinohara/springsource/~略~/input/hoge*.csv" />
    </bean>
   </constructor-arg>
  </bean>
 </property>
 <property name="partitionHandler">
  <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
   <property name="taskExecutor" ref="asyncTaskExecutor" />
   <property name="step" ref="step1" />
   <property name="gridSize" value="5"/>
  </bean>
 </property>
</bean>

非同期なTaskExecutorのbean定義

<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

パーティションハンドラから呼ばれるマルチスレッドで動くStep。

<batch:step id="step1">
 <batch:tasklet transaction-manager="jobRepository-transactionManager">
  <batch:chunk reader="hogeFileItemReader"  processor="hogeProcessor" writer="hogeDBWriter"  commit-interval="10" />
 </batch:tasklet>
</batch:step>

CSVをカンマ区切りで読込み。 scopeをstepにしてLate Binding→Late Binding of Job and Step Attributes

<bean id="hogeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
 <property name="resource" value="#{stepExecutionContext[fileName]}" />
 <property name="strict" value="false" />
 <property name="lineMapper">
  <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
   <property name="lineTokenizer">
    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
     <property name="delimiter" value=","/>
     <property name="names" value="ID,name" />
    </bean>
   </property>
   <property name="fieldSetMapper">
    <bean class="org.springframework.sample.batch.example.HogeFieldSetMapper" />
   </property>
  </bean>
 </property>
</bean>

DBに書き出し

<bean id="hogeDBWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
 <property name="assertUpdates" value="false" />
  <property name="itemSqlParameterSourceProvider">
    <bean name="sqlParameterSourceProvider" id="sqlParameterSourceProvider"
      class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
  </property>
  <property name="sql" value="insert into hoge (hoge_id, hoge_name, hoge_value) values (:id, :hogeName, :hogeValue)"/>
  <property name="dataSource" ref="hoge-dataSource" />
</bean>

  ■プログラム Hoge.java … setter,getter持ってるアレ。 HogeFieldSetMapper ... ファイルから読み込んだデータをオブジェクト(上のHoge)にセット。 HogeProcessor ... なんとなく使ってみたかっただけ。ファイルから取得した値に文字列追加して別カラムにセット。   ■ 実行結果ログ(log4j.xmlに%tでスレッドを記載) ジョブ定義XML読み込み

2010-11-19 21:16:38,202 INFO main [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - <Loading XML bean definitions from class path resource [FileToTableMultiJob.xml]>

マルチスレッドで実行

2010-11-19 21:16:39,803 DEBUG SimpleAsyncTaskExecutor-1 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=2, name=step1:partition3, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,808 DEBUG SimpleAsyncTaskExecutor-4 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=4, name=step1:partition4, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,811 DEBUG SimpleAsyncTaskExecutor-3 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=3, name=step1:partition2, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,815 DEBUG SimpleAsyncTaskExecutor-2 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=6, name=step1:partition0, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
2010-11-19 21:16:39,822 DEBUG SimpleAsyncTaskExecutor-5 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=5, name=step1:partition1, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>

処理完了

2010-11-19 21:16:39,893 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=hogeMultiJob.end1 with status=COMPLETED>

  ====   バージョンによってJob定義の記述とかもルールがチョコチョコ変わってたり、 そもそも中身よくわかってなくて、英語のフォーラムみならが、 パズルみたいにbeanを組み合わせていったのであんまり自信ないですが、 それっぽく動いてるように見えるので、参考までに載せておきました。。   来週からはもっと中身を深堀して、レコードをスキップしたり、バリデーションしたり、 リランどうするんだ?とか、その辺みていこうと思います。。   にしても、確かに、自分でコードはあんまり書かなくてもよいけどXML地獄だな。。。汗