■ INPUT FILE(hoge1~5.csv) hoge1.csv
1,aaa 2,bbb ・・・
hoge5.csv
51,aaa 52,bbb ・・・
~hoge2.csvからhoge4.csv同じような感じなので省略~ ■ OUTPUT DB(hogeテーブル)
mysql> create database hoge; Query OK, 1 row affected (0.00 sec) mysql> use hoge Database changed mysql> create table hoge ( -> hoge_id int unsigned not null, -> hoge_name varchar(20) not null, -> hoge_value varchar(20) not null -> ); Query OK, 0 rows affected (0.11 sec)
■ Job定義XML(長いので抜粋…) DBまわり
<import resource="HogeDB.xml"/>
Job。Stepにparent付けてマスターを指定
<batch:job id="hogeMultiJob" job-repository="jobRepository"> <batch:step id="hogeStep" parent="hogeStepMaster" /> </batch:job>
parentで指定したマスター。 パーティショニングするところで結構複雑なので中身の詳細は追って。。。
<bean id="hogeStepMaster" class="org.springframework.batch.core.partition.support.PartitionStep"> <property name="jobRepository" ref="jobRepository" /> <property name="stepExecutionSplitter"> <bean class="org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter"> <constructor-arg ref="jobRepository" /> <constructor-arg ref="step1" /> <constructor-arg> <bean class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> <property name="resources" value="file:/home/eshinohara/springsource/~略~/input/hoge*.csv" /> </bean> </constructor-arg> </bean> </property> <property name="partitionHandler"> <bean class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler"> <property name="taskExecutor" ref="asyncTaskExecutor" /> <property name="step" ref="step1" /> <property name="gridSize" value="5"/> </bean> </property> </bean>
非同期なTaskExecutorのbean定義
<bean id="asyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
パーティションハンドラから呼ばれるマルチスレッドで動くStep。
<batch:step id="step1"> <batch:tasklet transaction-manager="jobRepository-transactionManager"> <batch:chunk reader="hogeFileItemReader" processor="hogeProcessor" writer="hogeDBWriter" commit-interval="10" /> </batch:tasklet> </batch:step>
CSVをカンマ区切りで読込み。 scopeをstepにしてLate Binding→Late Binding of Job and Step Attributes
<bean id="hogeFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> <property name="resource" value="#{stepExecutionContext[fileName]}" /> <property name="strict" value="false" /> <property name="lineMapper"> <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer"> <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> <property name="delimiter" value=","/> <property name="names" value="ID,name" /> </bean> </property> <property name="fieldSetMapper"> <bean class="org.springframework.sample.batch.example.HogeFieldSetMapper" /> </property> </bean> </property> </bean>
DBに書き出し
<bean id="hogeDBWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"> <property name="assertUpdates" value="false" /> <property name="itemSqlParameterSourceProvider"> <bean name="sqlParameterSourceProvider" id="sqlParameterSourceProvider" class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" /> </property> <property name="sql" value="insert into hoge (hoge_id, hoge_name, hoge_value) values (:id, :hogeName, :hogeValue)"/> <property name="dataSource" ref="hoge-dataSource" /> </bean>
■プログラム Hoge.java … setter,getter持ってるアレ。 HogeFieldSetMapper ... ファイルから読み込んだデータをオブジェクト(上のHoge)にセット。 HogeProcessor ... なんとなく使ってみたかっただけ。ファイルから取得した値に文字列追加して別カラムにセット。 ■ 実行結果ログ(log4j.xmlに%tでスレッドを記載) ジョブ定義XML読み込み
2010-11-19 21:16:38,202 INFO main [org.springframework.beans.factory.xml.XmlBeanDefinitionReader] - <Loading XML bean definitions from class path resource [FileToTableMultiJob.xml]>
マルチスレッドで実行
2010-11-19 21:16:39,803 DEBUG SimpleAsyncTaskExecutor-1 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=2, name=step1:partition3, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,808 DEBUG SimpleAsyncTaskExecutor-4 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=4, name=step1:partition4, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,811 DEBUG SimpleAsyncTaskExecutor-3 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=3, name=step1:partition2, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,815 DEBUG SimpleAsyncTaskExecutor-2 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=6, name=step1:partition0, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=> 2010-11-19 21:16:39,822 DEBUG SimpleAsyncTaskExecutor-5 [org.springframework.batch.core.step.tasklet.TaskletStep] - <Saving step execution before commit: StepExecution: id=5, name=step1:partition1, status=STARTED, exitStatus=EXECUTING, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=1, rollbackCount=0, exitDescription=>
処理完了
2010-11-19 21:16:39,893 DEBUG main [org.springframework.batch.core.job.flow.support.SimpleFlow] - <Completed state=hogeMultiJob.end1 with status=COMPLETED>
==== バージョンによってJob定義の記述とかもルールがチョコチョコ変わってたり、 そもそも中身よくわかってなくて、英語のフォーラムみならが、 パズルみたいにbeanを組み合わせていったのであんまり自信ないですが、 それっぽく動いてるように見えるので、参考までに載せておきました。。 来週からはもっと中身を深堀して、レコードをスキップしたり、バリデーションしたり、 リランどうするんだ?とか、その辺みていこうと思います。。 にしても、確かに、自分でコードはあんまり書かなくてもよいけどXML地獄だな。。。汗