NRIネットコム社員が様々な視点で、日々の気づきやナレッジを発信するメディアです

注目のタグ

    Spring Batchについて基礎からまとめてみた

    本記事は  ブログ書き初めウィーク  6日目の記事です。
    📝  5日目  â–¶â–¶ æœ¬è¨˜äº‹ â–¶â–¶  7日目  ðŸ“…

    はじめに

    こんにちは!入社1年目の池下です。 新入社員研修を終え、部署に配属されてから約半年が経ちました。 現在参画しているプロジェクトでは、開発でSpring Batchを使用しているのですが、私自身バッチについては知識がほぼゼロの状態からのスタートでした。 ですが、業務を通じてバッチがどんな役割を担いバックエンドではどう動いてるのかを知り、さらに実際にコードを書いてSpring Batchの構造を理解することができたので、アウトプットした内容をこちらの記事にまとめました!

    そもそも「バッチ」ってなに?

    バッチ処理という言葉を聞いたことはあるけど、具体的に何を指しているのかよくわからないという方も多いのではないでしょうか。 バッチ処理とは、あらかじめ実行される時間や処理条件を事前に登録しておき、自動的にその処理を一括で行うことです。 例えば、1日分の売上データなどを一括で集計し、データベースに登録するまでを自動化できます。

    このようにバッチ処理は一度にまとめて処理を行うため、効率的に大量のデータを扱うことができます。 そして、javaでバッチ処理を開発する際に一般的に使用されるフレームワークがSpring Batchになります。 以降でこのSpring Batchについての基本構成やコードを書いて実際にバッチを動かしてみた結果をまとめました。

    Spring Batchの基本構成とは?

    では、Spring Batchの全体概要について説明していきます。 下図のように、Spring Batchは、Job Rauncher、Job、Step、Tasklet、Chunk、Job Repository、Platform Transaction Managerを組み合わせてJobを定義します。 これらはバッチ処理を効率的に行うために提供される再利用可能なコンポーネントやツールのことで、これらの共通部品を使用することで、開発者は一からすべてを実装する必要がなくなります。

    JobRauncher

    Jobを起動するための機能です。ジョブ全体の成功または失敗を判定します。

    Job

    バッチ処理そのものを指します。売上データの取り込みやレポートの作成など1つのJobが1つのバッチ処理という単位になります。

    Step

    Job内で実行される処理の単位です。Stepから呼び出す実装方式として、TaskletモデルやChunkモデルがあります。

    • Taskletモデル

      具体的な処理内容を記述したり、実際に処理が実装される場所です。Taskletの1つに集約します。

    • Chunkモデル

      Taskletとは異なりChunkは下記3つを組み合わせてデータの処理を行う場所です。一連のフレームワークとして提供されており、これらのインターフェースを実装したクラスを作成することで、バッチ定型処理を作成することができます。

      • Item Reader : ファイルやデータベースの情報を読み込みます。

      • Item Processor : 読み込んだデータの処理を行います。

      • Item Writer : 処理したデータの書き込みを行います。

    Job Repository

    Jobの実行履歴を保存するためのものです。データベースには、Jobの実行開始時間や終了時間、Jobの判定結果、ステータス、実行パラメータなどを保存します。

    Platform Transaction Manager

    トランザクション管理を行う場所です。データの整合性を保つために、トランザクションの境界を定義してロールバックコミットを制御します。

    Spring Batchでバッチを実装していく

    では実際に、上記の構成を踏まえてバッチを実装していきます。 今回は、Springの公式ドキュメントを参考に、CSVファイルから売上データを読み込んでデータベースへ登録し、結果をログとして出力するバッチを実装しました。

    spring.pleiades.io

    実行環境
    • InteliJ IDEA
    • Java17 (coretto)
    • Spring Boot 3.4.1
    • Gradle8.8
    • mysql

    ① テーブルとCSVファイルを作成する

    アプリケーション起動時にCSVファイルを読み込むバッチを実装していきます。 まず、売上データが入ったCSVファイルを作成します。 さらに、読み込んだ売上データを格納する売上テーブルを作成しておきます。

    salesdata.csv

    りんご,180
    みかん,150
    ã‚‚ã‚‚,300
    ぶどう,500
    メロン,700

    table.sql

    CREATE TABLE sales  (
        sales_id INT IDENTITY NOT NULL PRIMARY KEY,
        fruit VARCHAR(20),
        price BIGINT
    );
    

    ② Modelを作成する

    テーブルに対応するモデルを作成します。

    SalesData.java

    package com.example.batchprocessing;
    
    public record SalesData(String fruit, int price) {
    
    }
    

    ③ ItemProcessorを実装する

    ItemProcessor.java

    package com.example.batchprocessing;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.item.ItemProcessor;
    
    public class SalesDataProcessor implements ItemProcessor<SalesData, SalesData> {
    
        private static final Logger log = LoggerFactory.getLogger(SalesDataProcessor.class);
        private int totalSales = 0;
    
        @Override
        public SalesData process(final SalesData data) {
            // 売上データの合計を計算
            totalSales += data.price();
            return data;
        }
    
        public int getTotalSales() {
            return totalSales;
        }
    }
    

    今回はChunkモデルを使用したバッチ処理を実装していきます。 前述の通り、Chunkモデルはデータの読み込み、加工、書き込みの3つの処理に分割されます。 データの処理を担うItemProcessorでは、ファイルから読み取った売上データの合計を計算するトランスフォーマーを作成しました。

    ④ ジョブをまとめるクラスを実装する

    BatchConfig.java

    package com.example.batchprocessing;
    
    import javax.sql.DataSource;
    
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.job.builder.JobBuilder;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.item.database.JdbcBatchItemWriter;
    import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
    import org.springframework.batch.item.file.FlatFileItemReader;
    import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    
    @Configuration
    public class BatchConfig {
        @Bean
        public FlatFileItemReader<SalesData> reader() {
            return new FlatFileItemReaderBuilder<SalesData>()
                    .name("salesDataReader")
                    .resource(new ClassPathResource("salesdata.csv"))
                    .delimited()
                    .names("fruit", "price")
                    .targetType(SalesData.class)
                    .build();
        }
        
        @Bean
        public SalesDataProcessor processor() {
            return new SalesDataProcessor();
        }
    
        @Bean
        public JdbcBatchItemWriter<SalesData> writer(DataSource dataSource) {
            return new JdbcBatchItemWriterBuilder<SalesData>()
                    .sql("INSERT INTO sales (fruit, price) VALUES (:fruit, :price)")
                    .dataSource(dataSource)
                    .beanMapped()
                    .build();
        }
    

    Configクラスは、ジョブをまとめるバッチ設定用クラスです。ItemReader、ItemProcessor、ItemWriterの処理を記述します。 reader()では①で作成したCSVファイルを読み込み、データベースに登録できるようSalesData型に変換します。 processor()では、③で定義したSalesDataProcessorインスタンスを作成します。 最後にwriter(Datasource dataSource)で、データベースへの書き込みを行います。 これらの処理は@Beanアノテーションを使用し定義されています。 @Beanアノテーションを使用することで、メソッドが返すオブジェクトがコンテナに登録されます。さらに、@Configurationアノテーションでアプリケーション全体において再利用可能な設定を一元管理できます。

        @Bean
        public Job importUserJob(JobRepository jobRepository, Step step, JobCompletionNotificationListener listener) {
            return new JobBuilder("importUserJob", jobRepository)
                    .listener(listener)
                    .start(step)
                    .build();
        }
    
        @Bean
        public Step step(JobRepository jobRepository, DataSourceTransactionManager transactionManager,
                FlatFileItemReader<SalesData> reader, SalesDataProcessor processor, JdbcBatchItemWriter<SalesData> writer) {
            return new StepBuilder("step1", jobRepository)
                    .<SalesData, SalesData>chunk(3, transactionManager)
                    .reader(reader)
                    .processor(processor)
                    .writer(writer)
                    .build();
        }
    }
    

    同一ファイル内でJobとStepも定義しています。 importUserJob()では、JobRepositoryがジョブの実行状態を管理し、ジョブの開始や終了時にリスナーを通じて通知を行います。 step()では、ファイルからデータを読み取り、処理し、データベースに書き込む一連のバッチ処理を行います。 さらに、DataSourceTransactionManagerがStepBuilderに渡され、ステップ内のすべてのデータベース操作がトランザクションとして管理されています。

    ⑤ 通知を受け取るリスナーを実装する

    JobCompletionNotificationListener.java

    package com.example.batchprocessing;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.batch.core.BatchStatus;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobExecutionListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jdbc.core.DataClassRowMapper;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class JobCompletionNotificationListener implements JobExecutionListener {
    
        private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
        private final SalesDataProcessor processor;
    
        private final JdbcTemplate jdbcTemplate;
    
        @Autowired
        public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate, SalesDataProcessor processor) {
            this.jdbcTemplate = jdbcTemplate;
            this.processor = processor;
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
            if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
                log.info("---- 20XX年XX月XX日の売上データを集計しました。----");
                jdbcTemplate
                        .query("SELECT fruit, price FROM sales", new DataClassRowMapper<>(SalesData.class))
                        .forEach(sales -> log.info("Found <{}> in the database.", sales));
                log.info("Total sales: " + processor.getTotalSales());
            }
        }
    }
    

    JobExecutionListenerとは、ジョブの開始前と終了後に特定の処理を実行するためのメソッドを提供するインターフェースです。 ジョブが正常に完了した場合(BatchStatus.COMPLETED)、以下の処理を行うよう実装しました。

    • JdbcTemplateを使ってデータベースから売上データをクエリし、各レコードをログに出力
    • SalesDataProcessorから総売上を取得し、ログに出力

    ⑥ メインクラスを実装する

    BatchProcessingApplication.java

    package com.example.batchprocessing;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class BatchProcessingApplication {
    
        public static void main(String[] args) {
            System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
        }
    }
    

    Spring Bootアプリケーションを起動するためのメインクラスです。 @SpringBootApplicationアノテーションは、@Configuration、@EnableAutoConfiguration、@ComponentScanの3つのアノテーションを組み合わせたものです。 @EnableAutoConfigurationとは、Spring Bootがアプリケーションの依存関係に基づいて適切な構成を自動的に行うためのアノテーションです。 @ComponentScanとは、指定されたパッケージ内のコンポーネントを自動的に検出し、Springコンテナに登録するためのもので、これにより、@Beanの定義が不要になります。

    ⑦ 実行結果を確認する

    2025-01-17T17:34:40.570+09:00  INFO 4684 --- [           main] c.e.b.BatchProcessingApplication         : Started BatchProcessingApplication in 2.282 seconds (process running for 2.884)
    2025-01-17T17:34:40.573+09:00  INFO 4684 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
    2025-01-17T17:34:40.633+09:00  INFO 4684 --- [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=importUserJob]] launched with the following parameters: [{}]
    2025-01-17T17:34:40.662+09:00  INFO 4684 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
    2025-01-17T17:34:40.705+09:00  INFO 4684 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 40ms
    2025-01-17T17:34:40.712+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : ---- 20XX年XX月XX日の売上データを集計しました。----
    2025-01-17T17:34:40.715+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Found <SalesData[fruit=りんご, price=180]> in the database.
    2025-01-17T17:34:40.720+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Found <SalesData[fruit=みかん, price=150]> in the database.
    2025-01-17T17:34:40.720+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Found <SalesData[fruit=ã‚‚ã‚‚, price=300]> in the database.
    2025-01-17T17:34:40.720+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Found <SalesData[fruit=ぶどう, price=500]> in the database.
    2025-01-17T17:34:40.720+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Found <SalesData[fruit=メロン, price=700]> in the database.
    2025-01-17T17:34:40.722+09:00  INFO 4684 --- [           main] c.e.b.JobCompletionNotificationListener  : Total sales: 1830
    2025-01-17T17:34:40.725+09:00  INFO 4684 --- [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=importUserJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 66ms
    2025-01-17T17:34:40.732+09:00  INFO 4684 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
    2025-01-17T17:34:40.735+09:00  INFO 4684 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
    

    コンソールに出力されたログを確認すると、CSVファイルの情報がデータベースに登録され集計結果も正確にログとして出力されているため、正常にバッチが実行されたことが分かります。

    おわりに

    今回はSpring Batchについての構成や実際にコードを書いてバッチを動かしていく流れをご紹介しました。 Spring Batchは、バッチ処理を効率的に行うために多くの共通部品が提供されており、これらを使用することで同じ処理を何度も書く必要がなくなるということが分かりました。 この記事を通して、皆さんがSpring Batchについて理解を深める一助となれば幸いです。 今回のバッチの実装やブログ執筆を通して、改めてアウトプットした内容を今後の業務にも活かしていきたいと思います。

    執筆者:池下絢香 システムエンジニア