001/* 002 * Copyright 2013-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.configuration.xml; 017 018import java.util.List; 019import java.util.concurrent.ConcurrentLinkedQueue; 020import java.util.concurrent.locks.ReentrantLock; 021 022import org.springframework.batch.core.jsr.configuration.support.BatchArtifactType; 023import org.springframework.batch.core.jsr.partition.JsrPartitionHandler; 024import org.springframework.beans.MutablePropertyValues; 025import org.springframework.beans.factory.config.RuntimeBeanReference; 026import org.springframework.beans.factory.support.AbstractBeanDefinition; 027import org.springframework.beans.factory.support.BeanDefinitionBuilder; 028import org.springframework.beans.factory.support.BeanDefinitionRegistry; 029import org.springframework.beans.factory.xml.ParserContext; 030import org.springframework.util.StringUtils; 031import org.springframework.util.xml.DomUtils; 032import org.w3c.dom.Element; 033 034/** 035 * Parser for the <partition> element as defined by JSR-352. 036 * 037 * @author Michael Minella 038 * @author Mahmoud Ben Hassine 039 * @since 3.0 040 */ 041public class PartitionParser { 042 043 private static final String REF = "ref"; 044 private static final String MAPPER_ELEMENT = "mapper"; 045 private static final String PLAN_ELEMENT = "plan"; 046 private static final String PARTITIONS_ATTRIBUTE = "partitions"; 047 private static final String THREADS_ATTRIBUTE = "threads"; 048 private static final String PROPERTIES_ELEMENT = "properties"; 049 private static final String ANALYZER_ELEMENT = "analyzer"; 050 private static final String COLLECTOR_ELEMENT = "collector"; 051 private static final String REDUCER_ELEMENT = "reducer"; 052 private static final String PARTITION_CONTEXT_PROPERTY = "propertyContext"; 053 private static final String PARTITION_MAPPER_PROPERTY = "partitionMapper"; 054 private static final String PARTITION_ANALYZER_PROPERTY = "partitionAnalyzer"; 055 private static final String PARTITION_REDUCER_PROPERTY = "partitionReducer"; 056 private static final String PARTITION_QUEUE_PROPERTY = "partitionDataQueue"; 057 private static final String LISTENERS_PROPERTY = "listeners"; 058 private static final String THREADS_PROPERTY = "threads"; 059 private static final String PARTITIONS_PROPERTY = "partitions"; 060 private static final String PARTITION_LOCK_PROPERTY = "partitionLock"; 061 062 private final String name; 063 private boolean allowStartIfComplete = false; 064 065 /** 066 * @param stepName the name of the step that is being partitioned 067 * @param allowStartIfComplete boolean to establish the allowStartIfComplete property for partition properties. 068 */ 069 public PartitionParser(String stepName, boolean allowStartIfComplete) { 070 this.name = stepName; 071 this.allowStartIfComplete = allowStartIfComplete; 072 } 073 074 public void parse(Element element, AbstractBeanDefinition bd, ParserContext parserContext, String stepName) { 075 BeanDefinitionRegistry registry = parserContext.getRegistry(); 076 MutablePropertyValues factoryBeanProperties = bd.getPropertyValues(); 077 078 AbstractBeanDefinition partitionHandlerDefinition = BeanDefinitionBuilder.genericBeanDefinition(JsrPartitionHandler.class) 079 .getBeanDefinition(); 080 081 MutablePropertyValues properties = partitionHandlerDefinition.getPropertyValues(); 082 properties.addPropertyValue(PARTITION_CONTEXT_PROPERTY, new RuntimeBeanReference("batchPropertyContext")); 083 properties.addPropertyValue("jobRepository", new RuntimeBeanReference("jobRepository")); 084 properties.addPropertyValue("allowStartIfComplete", allowStartIfComplete); 085 086 parseMapperElement(element, parserContext, properties); 087 parsePartitionPlan(element, parserContext, stepName, properties); 088 parseAnalyzerElement(element, parserContext, properties); 089 parseReducerElement(element, parserContext, factoryBeanProperties); 090 parseCollectorElement(element, parserContext, factoryBeanProperties, 091 properties); 092 093 String partitionHandlerBeanName = name + ".partitionHandler"; 094 registry.registerBeanDefinition(partitionHandlerBeanName, partitionHandlerDefinition); 095 factoryBeanProperties.add("partitionHandler", new RuntimeBeanReference(partitionHandlerBeanName)); 096 } 097 098 private void parseCollectorElement(Element element, 099 ParserContext parserContext, 100 MutablePropertyValues factoryBeanProperties, 101 MutablePropertyValues properties) { 102 Element collectorElement = DomUtils.getChildElementByTagName(element, COLLECTOR_ELEMENT); 103 104 if(collectorElement != null) { 105 // Only needed if a collector is used 106 registerCollectorAnalyzerQueue(parserContext); 107 properties.add(PARTITION_QUEUE_PROPERTY, new RuntimeBeanReference(name + "PartitionQueue")); 108 properties.add(PARTITION_LOCK_PROPERTY, new RuntimeBeanReference(name + "PartitionLock")); 109 factoryBeanProperties.add("partitionQueue", new RuntimeBeanReference(name + "PartitionQueue")); 110 factoryBeanProperties.add("partitionLock", new RuntimeBeanReference(name + "PartitionLock")); 111 String collectorName = collectorElement.getAttribute(REF); 112 factoryBeanProperties.add(LISTENERS_PROPERTY, new RuntimeBeanReference(collectorName)); 113 new PropertyParser(collectorName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(collectorElement); 114 } 115 } 116 117 private void parseReducerElement(Element element, 118 ParserContext parserContext, 119 MutablePropertyValues factoryBeanProperties) { 120 Element reducerElement = DomUtils.getChildElementByTagName(element, REDUCER_ELEMENT); 121 122 if(reducerElement != null) { 123 String reducerName = reducerElement.getAttribute(REF); 124 factoryBeanProperties.add(PARTITION_REDUCER_PROPERTY, new RuntimeBeanReference(reducerName)); 125 new PropertyParser(reducerName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(reducerElement); 126 } 127 } 128 129 private void parseAnalyzerElement(Element element, 130 ParserContext parserContext, MutablePropertyValues properties) { 131 Element analyzerElement = DomUtils.getChildElementByTagName(element, ANALYZER_ELEMENT); 132 133 if(analyzerElement != null) { 134 String analyzerName = analyzerElement.getAttribute(REF); 135 properties.add(PARTITION_ANALYZER_PROPERTY, new RuntimeBeanReference(analyzerName)); 136 new PropertyParser(analyzerName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(analyzerElement); 137 } 138 } 139 140 private void parseMapperElement(Element element, 141 ParserContext parserContext, MutablePropertyValues properties) { 142 Element mapperElement = DomUtils.getChildElementByTagName(element, MAPPER_ELEMENT); 143 144 if(mapperElement != null) { 145 String mapperName = mapperElement.getAttribute(REF); 146 properties.add(PARTITION_MAPPER_PROPERTY, new RuntimeBeanReference(mapperName)); 147 new PropertyParser(mapperName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(mapperElement); 148 } 149 } 150 151 private void registerCollectorAnalyzerQueue(ParserContext parserContext) { 152 AbstractBeanDefinition partitionQueueDefinition = BeanDefinitionBuilder.genericBeanDefinition(ConcurrentLinkedQueue.class) 153 .getBeanDefinition(); 154 AbstractBeanDefinition partitionLockDefinition = BeanDefinitionBuilder.genericBeanDefinition(ReentrantLock.class) 155 .getBeanDefinition(); 156 157 parserContext.getRegistry().registerBeanDefinition(name + "PartitionQueue", partitionQueueDefinition); 158 parserContext.getRegistry().registerBeanDefinition(name + "PartitionLock", partitionLockDefinition); 159 } 160 161 protected void parsePartitionPlan(Element element, 162 ParserContext parserContext, String stepName, 163 MutablePropertyValues properties) { 164 Element planElement = DomUtils.getChildElementByTagName(element, PLAN_ELEMENT); 165 166 if(planElement != null) { 167 String partitions = planElement.getAttribute(PARTITIONS_ATTRIBUTE); 168 String threads = planElement.getAttribute(THREADS_ATTRIBUTE); 169 170 if(!StringUtils.hasText(threads)) { 171 threads = partitions; 172 } 173 174 List<Element> partitionProperties = DomUtils.getChildElementsByTagName(planElement, PROPERTIES_ELEMENT); 175 176 if(partitionProperties != null) { 177 for (Element partition : partitionProperties) { 178 String partitionStepName = stepName + ":partition" + partition.getAttribute("partition"); 179 new PropertyParser(partitionStepName, parserContext, BatchArtifactType.STEP, partitionStepName).parseProperty(partition); 180 } 181 } 182 183 properties.add(THREADS_PROPERTY, threads); 184 properties.add(PARTITIONS_PROPERTY, partitions); 185 } 186 } 187}