æ³å®ããå¦çã¨ãã¦ã¯ãä¸è¨ã®ãããªããåç´ãªè¨ªåãã°ã«ãããæå»ã®åºç¾åæ°ã®ãµããªãéè¨å¦çãèããããã®ãã°ãã¡ã¤ã«ãè¤æ°æ ç¹ããéãããã®ã§ãããããã¹ã¦ãéè¨ããããã¨ããã
15:28 08:36 18:57 01:58 14:43
ç°å¢
ã½ã¼ã¹ã³ã¼ãã¨ã
ã¸ã§ãXML
å ¥åãã¡ã¤ã«ã¨ããç½®ããããã£ã¬ã¯ããªã¯ã¸ã§ãã¬ãã«ããããã£ã§æå®ããã
ä¸ã¤ã®stepãåå¨ããchunkã¨reducerãå®ç¾©ãã¦ããããããã®è©³ç´°ã¯å¾è¿°ã
partitionã®è¨å®ã¯éçã»åçã®äºç¨®é¡ãããã¾ããéçãªæå®æ¹æ³ã試ããä¸è¨ã¯ãéçã«ãã¼ãã£ã¼ã·ã§ã³æ°ã»ã¹ã¬ããæ°ã»åãã¼ãã£ã¼ã·ã§ã³ãã¨ã®ããããã£ï¼ããã§ã¯å ¥åãã¡ã¤ã«ï¼ãè¨å®ãã¦ããã
<?xml version="1.0" encoding="UTF-8"?> <job id="sample-job-partition" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_dir" value="C:\\Java\\sampleinput" /> <property name="output_dir" value="C:\\Java\\sampleoutput" /> </properties> <step id="myStep"> <chunk> <reader ref="mySampleReader"></reader> <processor ref="mySampleProcessor"></processor> <writer ref="mySampleWriter"></writer> </chunk> <partition> <plan partitions="2" threads="2"> <properties partition="0"> <property name="file" value="input1.txt" /> </properties> <properties partition="1"> <property name="file" value="input2.txt" /> </properties> </plan> <reducer ref="myReducer" /> </partition> <end on="COMPLETED"/> </step> </job>
partitionã®è¨å®ãåçã«ããå ´åãmapperãå®ç¾©ãããåç §å ã®myPartitionMapperã«ã¤ãã¦ã¯å¾è¿°ã
<partition> <mapper ref="myPartitionMapper"/> </partition>
PartitionMapper
ãã¼ãã£ã¼ã·ã§ã³æ°ãã¹ã¬ããæ°ããã¼ãã£ã·ã§ã³ãã¨ã®ããããã£ãåçã«è¨å®ããå ´åã¯PartitionMapper
ãå®è£
ãããããmapper
ã«æå®ããã
å®éã«ã¯getPartitions
çã§åçã«å¤ãè¿ãã³ã¼ããæ¸ããã¨ã«ãªãã¨æããããããã§ã¯åç´ã«ãéçã®å ´åã¨åçãªè¨å®ãè¿ãããã«ãã¦ããã
@Dependent @Named("myPartitionMapper") public class SamplePartitionMapper implements PartitionMapper { @Override public PartitionPlan mapPartitions() throws Exception { return new PartitionPlanImpl() { @Override public int getPartitions() { return 2; } @Override public int getThreads() { return 2; } @Override public Properties[] getPartitionProperties() { int partitions = getPartitions(); Properties[] props = new Properties[partitions]; for (int i = 0; i < partitions; i++) { props[i] = new Properties(); props[i].put("file", "input" + (i+1) + ".txt"); } return props; } }; } }
ItemReader
ã¸ã§ãXMLã¾ãã¯mapper
ã§æå®ãããããããã£ãããã¡ã¤ã«åãåå¾ããããããã®èªã¿è¾¼ã¿å¦çãæ¸ãã
åãã¼ãã£ã·ã§ã³ãã¨ã®ãã©ã¡ã¼ã¿ï¼ããã§ã¯å ¥åãã¡ã¤ã«ï¼ãããåå¾ã³ã¼ãã¯ãéçã»åçã«é¢ãããåãã³ã¼ãã§OK
@Dependent @Named("mySampleReader") public class SampleReader implements ItemReader { private BufferedReader br; @Inject JobContext jobCtx; @Override public void open(Serializable checkpoint) throws Exception { JobOperator jobOperator = BatchRuntime.getJobOperator(); long execID = jobCtx.getExecutionId(); Properties parameters = jobOperator.getParameters(execID); String inputDir = jobCtx.getProperties().getProperty("input_dir");; String inputFile = parameters.getProperty("file"); System.out.println("## execID" + execID + " " + inputDir + " " + inputFile); br = Files.newBufferedReader(Paths.get(inputDir, inputFile), Charset.defaultCharset()); } @Override public Object readItem() throws Exception { String line = br.readLine(); if (line == null || line.length() <= 0) { return null; } return line; } @Override public void close() throws Exception { br.close(); } @Override public Serializable checkpointInfo() throws Exception { return null; } }
ItemProcessor
åè¡ãHH:MM
ãã©ã¼ããããªã®ã§:
ã§ã¹ããªããããã ãã
@Dependent @Named("mySampleProcessor") public class SampleProcessor implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { return ((String) item).split(":")[0]; } }
ItemWriter
èªã¿è¾¼ãã ãã¡ã¤ã«ã®æå»ã®åºç¾åæ°ãã«ã¦ã³ãã¢ããããçµäºæã«ãã¼ãã£ã·ã§ã³ãã¨ã®ä¸éçµæããã¡ã¤ã«ã¸åºåããã
æ¬æ¥çã«ã¯ãwriteItems
ã¯ä½ããã®æ°¸ç¶åæ©æ§ã«æ¸ãè¾¼ããã¨ãæ³å®ããã¦ããã¨æããããããããã§ã¯ãã åç´ã«ãã¡ã¤ã«ã«æ¸ãåºãã ãã§ããã
ä»åãããã®å ´åãã¯ã¶ã¯ã¶ä¸éçµæä½ããªãã¦ãè¯ããã ãã©ããµã³ãã«ãªã®ã§ã
@Dependent @Named("mySampleWriter") public class SampleWriter implements ItemWriter { @Inject JobContext jobCtx; private Map<String, Integer> result; @Override public void open(Serializable checkpoint) throws Exception { result = new HashMap<>(); for (int i = 0; i < 24; i++) { result.put(String.format("%02d", i), 0); } } @Override public void writeItems(List<Object> items) throws Exception { for (Object i : items) { String time = (String) i; Integer newCount = result.get(time) + 1; result.put(time, newCount); } System.out.println("## execID=" + jobCtx.getExecutionId() + " writes:" + items.size()); } @Override public void close() throws Exception { long execID = jobCtx.getExecutionId(); try (BufferedWriter bw = Files.newBufferedWriter(Paths.get(jobCtx.getProperties().getProperty("output_dir"), execID + ".txt"), Charset.defaultCharset())) { for (Map.Entry<String, Integer> e : result.entrySet()) { bw.write(e.getKey() + ":" + e.getValue()); bw.newLine(); } } } @Override public Serializable checkpointInfo() throws Exception { return null; } }
PartitionReducer
åãã¼ãã£ã·ã§ã³ãã¨ã®ä¸éçµæãã¾ã¨ãã¦æçµçãªéè¨çµæãä½æãããçµæã¯ãã åã«æ¨æºåºåã«åºãã ããã¾ããä¸éçµæãã£ã¬ã¯ããªå ã®ãã¡ã¤ã«ããã¹ã¦åé¤ããã
@Dependent @Named("myReducer") public class SamplePartitionReducer implements PartitionReducer { @Inject JobContext jobCtx; @Override public void beginPartitionedStep() throws Exception { System.out.println("beginPartitionedStep"); } @Override public void beforePartitionedStepCompletion() throws Exception { System.out.println("beforePartitionedStepCompletion"); final HashMap<String, Integer> summary = new HashMap<>(); for (int i = 0; i < 24; i++) { summary.put(String.format("%02d", i), 0); } FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { try (BufferedReader br = Files.newBufferedReader(file, Charset.defaultCharset());) { String line; while ((line = br.readLine()) != null) { String[] r = line.split(":"); summary.put(r[0], summary.get(r[0]) + Integer.parseInt(r[1])); } } return FileVisitResult.CONTINUE; } }; Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor); System.out.println(summary); } @Override public void afterPartitionedStepCompletion(PartitionStatus status) throws Exception { FileVisitor<Path> visitor = new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return FileVisitResult.CONTINUE; } }; Files.walkFileTree(Paths.get(jobCtx.getProperties().getProperty("output_dir")), visitor); System.out.println("afterPartitionedStepCompletion"); } @Override public void rollbackPartitionedStep() throws Exception { System.out.println("rollbackPartitionedStep"); } }
å®è¡ã®æ§å
åãã¼ãã£ã¼ã·ã§ã³ãããããã®ãã¡ã¤ã«ã®éè¨ãä¸éçµæã«ã¾ã¨ããreducerã§ãã®ä¸éçµæãéç´ãã¦ãããã°ã表示ããã¦ããã
æ å ±: id = 155 æ å ±: beginPartitionedStep æ å ±: ## execID=156 C:\\Java\\sampleinput input1.txt æ å ±: ## execID=157 C:\\Java\\sampleinput input2.txt æ å ±: ## execID=156 writes:10 æ å ±: ## execID=157 writes:10 æ å ±: ## execID=156 writes:8 æ å ±: ## execID=157 writes:10 æ å ±: ## execID=157 writes:4 æ å ±: beforePartitionedStepCompletion æ å ±: {08=3, 09=0, 19=3, 22=3, 17=1, 04=1, 23=2, 18=5, 05=2, 15=3, 06=2, 16=0, 07=2, 13=1, 00=1, 14=1, 01=3, 02=1, 11=3, 12=0, 03=1, 21=2, 20=2, 10=0} æ å ±: afterPartitionedStepCompletion
ããã£ããã¨ã¨ã
setTransientUserDataã«ãããã¼ã¿ã®åã渡ãã¯åä¸ã¹ã¬ããé
jBatchã使ç¨ãã¦æ©ãã ã®ã¯stepéã§ã©ããã¼ã¿ãåã渡ãã®ããã ã£ãã
ä»åã®ã±ã¼ã¹ã§ã¯JobContext#setTransientUserData
ãããã®ã§ã³ã¬ã使ãã°ãããã¨æããããJobContextã®ã¤ã³ã¹ã¿ã³ã¹ã¯ã¹ã¬ãããã¨ã§ããããã¼ãã£ã·ã§ã³ã®ã¹ã¬ããã¨ãPartitionReducer
ãåä½ããã¹ã¬ããã¯ç°ãªãã®ã§ãsetTransientUserData
ã§ã¯ã¤ã³ã¹ã¿ã³ã¹ãå
±æã§ããªãã
ãªã®ã§ãä¸éãã¡ã¤ã«ãçµç±ããããã¨ã«ããã