kagamihogeの日記

kagamihogeの日記です。

jBatchのpartitionを使ってみる

想定する処理としては、下記のようなごく単純な訪問ログにおける時刻の出現回数のサマリを集計処理を考える。このログファイルが複数拠点から送られるので、それらすべてを集計するもんとする。

15:28
08:36
18:57
01:58
14:43

環境

ソースコードとか

ジョブXML

入力ファイルとかが置かれるディレクトリはジョブレベルプロパティで指定する。

一つのstepが存在し、chunkとreducerが定義してある。これらの詳細は後述。

partitionの設定は静的・動的の二種類ある。まず、静的な指定方法を試す。下記は、静的にパーティーション数・スレッド数・各パーティーションごとのプロパティ(ここでは入力ファイル)を設定している。

<?xml version="1.0" encoding="UTF-8"?>
<job id="sample-job-partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    version="1.0">

    <properties>
        <property name="input_dir" value="C:\\Java\\sampleinput" />
        <property name="output_dir" value="C:\\Java\\sampleoutput" />
    </properties>

    <step id="myStep">
        <chunk>
            <reader ref="mySampleReader"></reader>
            <processor ref="mySampleProcessor"></processor>
            <writer ref="mySampleWriter"></writer>
        </chunk>
        <partition>
            <plan partitions="2" threads="2">
                <properties partition="0">
                    <property name="file" value="input1.txt" />
                </properties>
                <properties partition="1">
                    <property name="file" value="input2.txt" />
                </properties>
            </plan>

            <reducer ref="myReducer" />
        </partition>
        
        <end on="COMPLETED"/>
    </step>
</job>

partitionの設定を動的にする場合、mapperを定義する。参照先のmyPartitionMapperについては後述。

<partition>
    <mapper ref="myPartitionMapper"/>
</partition>

PartitionMapper

パーティーション数やスレッド数、パーティションごとのプロパティを動的に設定する場合はPartitionMapperを実装し、それをmapperに指定する。

実際にはgetPartitions等で動的に値を返すコードを書くことになると思われる。ここでは単純に、静的の場合と同等な設定を返すようにしておく。

@Dependent
@Named("myPartitionMapper")
public class SamplePartitionMapper implements PartitionMapper {

    @Override
    public PartitionPlan mapPartitions() throws Exception {
        return new PartitionPlanImpl() {
            
            @Override
            public int getPartitions() {
                return 2;
            }
            
            @Override
            public int getThreads() {
                return 2;
            }
            
            @Override
            public Properties[] getPartitionProperties() {
                int partitions = getPartitions();
                
                Properties[] props = new Properties[partitions];
                for (int i = 0; i < partitions; i++) {
                    props[i] = new Properties();
                    props[i].put("file", "input" + (i+1) + ".txt");
                }
                
                return props;
            }
        };
    }

}

ItemReader

ジョブXMLまたはmapperで指定されるプロパティからファイル名を取得し、そこからの読み込み処理を書く。

各パーティションごとのパラメータ(ここでは入力ファイル)をする取得コードは、静的・動的に関わらず同じコードでOK

@Dependent
@Named("mySampleReader")
public class SampleReader implements ItemReader {

    private BufferedReader br;

    @Inject
    JobContext jobCtx;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        long execID = jobCtx.getExecutionId();
        Properties parameters = jobOperator.getParameters(execID);
        String inputDir = jobCtx.getProperties().getProperty("input_dir");;
        String inputFile = parameters.getProperty("file");
        System.out.println("## execID" + execID + " " +  inputDir + " " + inputFile);
        
        br = Files.newBufferedReader(Paths.get(inputDir, inputFile), Charset.defaultCharset());
    }

    @Override
    public Object readItem() throws Exception {
        String line = br.readLine();
        if (line == null || line.length() <= 0) {
            return null;
        }
        return line;
    }

    @Override
    public void close() throws Exception {
        br.close();
    }
    
    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }

}

ItemProcessor

各行がHH:MMフォーマットなので:でスプリットするだけ。

@Dependent
@Named("mySampleProcessor")
public class SampleProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        return ((String) item).split(":")[0];
    }
}

ItemWriter

読み込んだファイルの時刻の出現回数をカウントアップし、終了時にパーティションごとの中間結果をファイルへ出力する。

本来的には、writeItemsは何らかの永続化機構に書き込むことを想定されていると思われる。が、ここではただ単純にファイルに書き出すだけである。

今回くらいの場合、ワザワザ中間結果作らなくても良いんだけど、サンプルなので。

@Dependent
@Named("mySampleWriter")
public class SampleWriter implements ItemWriter {

    @Inject
    JobContext jobCtx;
    
    private Map<String, Integer> result;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        result = new HashMap<>();
        for (int i = 0; i < 24; i++) {
            result.put(String.format("%02d", i), 0);
        }
    }

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (Object i : items) {
            String time = (String) i;
            Integer newCount = result.get(time) + 1;

            result.put(time, newCount);
        }
        System.out.println("## execID=" + jobCtx.getExecutionId() + " writes:" + items.size());
    }
    
    @Override
    public void close() throws Exception {
        long execID = jobCtx.getExecutionId();
        try (BufferedWriter bw = Files.newBufferedWriter(Paths.get(jobCtx.getProperties().getProperty("output_dir"), execID + ".txt"),
                Charset.defaultCharset())) {
            for (Map.Entry<String, Integer> e : result.entrySet()) {
                bw.write(e.getKey() + ":" + e.getValue());
                bw.newLine();
            }
        }
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

PartitionReducer

各パーティションごとの中間結果をまとめて最終的な集計結果を作成する。結果はただ単に標準出力に出すだけ。また、中間結果ディレクトリ内のファイルをすべて削除する。

@Dependent
@Named("myReducer")
public class SamplePartitionReducer implements PartitionReducer {

    @Inject
    JobContext jobCtx;

    @Override
    public void beginPartitionedStep() throws Exception {
        System.out.println("beginPartitionedStep");
    }

    @Override
    public void beforePartitionedStepCompletion() throws Exception {
        System.out.println("beforePartitionedStepCompletion");

        final HashMap<String, Integer> summary = new HashMap<>();
        for (int i = 0; i < 24; i++) {
            summary.put(String.format("%02d", i), 0);
        }

        FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                try (BufferedReader br = Files.newBufferedReader(file, Charset.defaultCharset());) {
                    String line;
                    while ((line = br.readLine()) != null) {
                        String[] r = line.split(":");
                        summary.put(r[0], summary.get(r[0]) + Integer.parseInt(r[1]));
                    }
                }
                return FileVisitResult.CONTINUE;
            }
        };
        Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor);

        System.out.println(summary);
    }

    @Override
    public void afterPartitionedStepCompletion(PartitionStatus status)
            throws Exception {
        FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                Files.delete(file);
                return FileVisitResult.CONTINUE;
            }
        };
        
        Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor);
        System.out.println("afterPartitionedStepCompletion");
    }

    @Override
    public void rollbackPartitionedStep() throws Exception {
        System.out.println("rollbackPartitionedStep");
    }

}

実行の様子

各パーティーションがそれぞれのファイルの集計を中間結果にまとめ、reducerでその中間結果を集約しているログが表示されている。

情報: id = 155
情報: beginPartitionedStep
情報: ## execID=156 C:\\Java\\sampleinput input1.txt
情報: ## execID=157 C:\\Java\\sampleinput input2.txt
情報: ## execID=156 writes:10
情報: ## execID=157 writes:10
情報: ## execID=156 writes:8
情報: ## execID=157 writes:10
情報: ## execID=157 writes:4
情報: beforePartitionedStepCompletion
情報: {08=3, 09=0, 19=3, 22=3, 17=1, 04=1, 23=2, 18=5, 05=2, 15=3, 06=2, 16=0, 07=2, 13=1, 00=1, 14=1, 01=3, 02=1, 11=3, 12=0, 03=1, 21=2, 20=2, 10=0}
情報: afterPartitionedStepCompletion

ハマったこととか

setTransientUserDataによるデータの受け渡しは同一スレッド間

jBatchを使用して悩んだのはstep間でどうデータを受け渡すのか、だった。

今回のケースではJobContext#setTransientUserDataがあるのでコレを使えばよいかと思いきや、JobContextのインスタンスはスレッドごとである。パーティションのスレッドと、PartitionReducerが動作するスレッドは異なるので、setTransientUserDataではインスタンスを共有できない。

なので、中間ファイルを経由させることにした。

ソースコード

https://github.com/kagamihoge/jbatchsample/tree/master/src/main/java/kagamihoge/glassfish4/javaee7/jbatchsample/partition