001/*
002 * Copyright 2013-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.configuration.xml;
017
018import java.util.List;
019import java.util.concurrent.ConcurrentLinkedQueue;
020import java.util.concurrent.locks.ReentrantLock;
021
022import org.springframework.batch.core.jsr.configuration.support.BatchArtifactType;
023import org.springframework.batch.core.jsr.partition.JsrPartitionHandler;
024import org.springframework.beans.MutablePropertyValues;
025import org.springframework.beans.factory.config.RuntimeBeanReference;
026import org.springframework.beans.factory.support.AbstractBeanDefinition;
027import org.springframework.beans.factory.support.BeanDefinitionBuilder;
028import org.springframework.beans.factory.support.BeanDefinitionRegistry;
029import org.springframework.beans.factory.xml.ParserContext;
030import org.springframework.util.StringUtils;
031import org.springframework.util.xml.DomUtils;
032import org.w3c.dom.Element;
033
034/**
035 * Parser for the <partition> element as defined by JSR-352.
036 *
037 * @author Michael Minella
038 * @author Mahmoud Ben Hassine
039 * @since 3.0
040 */
041public class PartitionParser {
042
043        private static final String REF = "ref";
044        private static final String MAPPER_ELEMENT = "mapper";
045        private static final String PLAN_ELEMENT = "plan";
046        private static final String PARTITIONS_ATTRIBUTE = "partitions";
047        private static final String THREADS_ATTRIBUTE = "threads";
048        private static final String PROPERTIES_ELEMENT = "properties";
049        private static final String ANALYZER_ELEMENT = "analyzer";
050        private static final String COLLECTOR_ELEMENT = "collector";
051        private static final String REDUCER_ELEMENT = "reducer";
052        private static final String PARTITION_CONTEXT_PROPERTY = "propertyContext";
053        private static final String PARTITION_MAPPER_PROPERTY = "partitionMapper";
054        private static final String PARTITION_ANALYZER_PROPERTY = "partitionAnalyzer";
055        private static final String PARTITION_REDUCER_PROPERTY = "partitionReducer";
056        private static final String PARTITION_QUEUE_PROPERTY = "partitionDataQueue";
057        private static final String LISTENERS_PROPERTY = "listeners";
058        private static final String THREADS_PROPERTY = "threads";
059        private static final String PARTITIONS_PROPERTY = "partitions";
060        private static final String PARTITION_LOCK_PROPERTY = "partitionLock";
061
062        private final String name;
063        private boolean allowStartIfComplete = false;
064
065        /**
066         * @param stepName the name of the step that is being partitioned
067         * @param allowStartIfComplete  boolean to establish the allowStartIfComplete property for partition properties.
068         */
069        public PartitionParser(String stepName, boolean allowStartIfComplete) {
070                this.name = stepName;
071                this.allowStartIfComplete = allowStartIfComplete;
072        }
073
074        public void parse(Element element, AbstractBeanDefinition bd, ParserContext parserContext, String stepName) {
075                BeanDefinitionRegistry registry = parserContext.getRegistry();
076                MutablePropertyValues factoryBeanProperties = bd.getPropertyValues();
077
078                AbstractBeanDefinition partitionHandlerDefinition = BeanDefinitionBuilder.genericBeanDefinition(JsrPartitionHandler.class)
079                                .getBeanDefinition();
080
081                MutablePropertyValues properties = partitionHandlerDefinition.getPropertyValues();
082                properties.addPropertyValue(PARTITION_CONTEXT_PROPERTY, new RuntimeBeanReference("batchPropertyContext"));
083                properties.addPropertyValue("jobRepository", new RuntimeBeanReference("jobRepository"));
084                properties.addPropertyValue("allowStartIfComplete", allowStartIfComplete);
085
086                parseMapperElement(element, parserContext, properties);
087                parsePartitionPlan(element, parserContext, stepName, properties);
088                parseAnalyzerElement(element, parserContext, properties);
089                parseReducerElement(element, parserContext, factoryBeanProperties);
090                parseCollectorElement(element, parserContext, factoryBeanProperties,
091                                properties);
092
093                String partitionHandlerBeanName = name + ".partitionHandler";
094                registry.registerBeanDefinition(partitionHandlerBeanName, partitionHandlerDefinition);
095                factoryBeanProperties.add("partitionHandler", new RuntimeBeanReference(partitionHandlerBeanName));
096        }
097
098        private void parseCollectorElement(Element element,
099                        ParserContext parserContext,
100                        MutablePropertyValues factoryBeanProperties,
101                        MutablePropertyValues properties) {
102                Element collectorElement = DomUtils.getChildElementByTagName(element, COLLECTOR_ELEMENT);
103
104                if(collectorElement != null) {
105                        // Only needed if a collector is used
106                        registerCollectorAnalyzerQueue(parserContext);
107                        properties.add(PARTITION_QUEUE_PROPERTY, new RuntimeBeanReference(name + "PartitionQueue"));
108                        properties.add(PARTITION_LOCK_PROPERTY, new RuntimeBeanReference(name + "PartitionLock"));
109                        factoryBeanProperties.add("partitionQueue", new RuntimeBeanReference(name + "PartitionQueue"));
110                        factoryBeanProperties.add("partitionLock", new RuntimeBeanReference(name + "PartitionLock"));
111                        String collectorName = collectorElement.getAttribute(REF);
112                        factoryBeanProperties.add(LISTENERS_PROPERTY, new RuntimeBeanReference(collectorName));
113                        new PropertyParser(collectorName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(collectorElement);
114                }
115        }
116
117        private void parseReducerElement(Element element,
118                        ParserContext parserContext,
119                        MutablePropertyValues factoryBeanProperties) {
120                Element reducerElement = DomUtils.getChildElementByTagName(element, REDUCER_ELEMENT);
121
122                if(reducerElement != null) {
123                        String reducerName = reducerElement.getAttribute(REF);
124                        factoryBeanProperties.add(PARTITION_REDUCER_PROPERTY, new RuntimeBeanReference(reducerName));
125                        new PropertyParser(reducerName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(reducerElement);
126                }
127        }
128
129        private void parseAnalyzerElement(Element element,
130                        ParserContext parserContext, MutablePropertyValues properties) {
131                Element analyzerElement = DomUtils.getChildElementByTagName(element, ANALYZER_ELEMENT);
132
133                if(analyzerElement != null) {
134                        String analyzerName = analyzerElement.getAttribute(REF);
135                        properties.add(PARTITION_ANALYZER_PROPERTY, new RuntimeBeanReference(analyzerName));
136                        new PropertyParser(analyzerName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(analyzerElement);
137                }
138        }
139
140        private void parseMapperElement(Element element,
141                        ParserContext parserContext, MutablePropertyValues properties) {
142                Element mapperElement = DomUtils.getChildElementByTagName(element, MAPPER_ELEMENT);
143
144                if(mapperElement != null) {
145                        String mapperName = mapperElement.getAttribute(REF);
146                        properties.add(PARTITION_MAPPER_PROPERTY, new RuntimeBeanReference(mapperName));
147                        new PropertyParser(mapperName, parserContext, BatchArtifactType.STEP_ARTIFACT, name).parseProperties(mapperElement);
148                }
149        }
150
151        private void registerCollectorAnalyzerQueue(ParserContext parserContext) {
152                AbstractBeanDefinition partitionQueueDefinition = BeanDefinitionBuilder.genericBeanDefinition(ConcurrentLinkedQueue.class)
153                                .getBeanDefinition();
154                AbstractBeanDefinition partitionLockDefinition = BeanDefinitionBuilder.genericBeanDefinition(ReentrantLock.class)
155                                .getBeanDefinition();
156
157                parserContext.getRegistry().registerBeanDefinition(name + "PartitionQueue", partitionQueueDefinition);
158                parserContext.getRegistry().registerBeanDefinition(name + "PartitionLock", partitionLockDefinition);
159        }
160
161        protected void parsePartitionPlan(Element element,
162                        ParserContext parserContext, String stepName,
163                        MutablePropertyValues properties) {
164                Element planElement = DomUtils.getChildElementByTagName(element, PLAN_ELEMENT);
165
166                if(planElement != null) {
167                        String partitions = planElement.getAttribute(PARTITIONS_ATTRIBUTE);
168                        String threads = planElement.getAttribute(THREADS_ATTRIBUTE);
169
170                        if(!StringUtils.hasText(threads)) {
171                                threads = partitions;
172                        }
173
174                        List<Element> partitionProperties = DomUtils.getChildElementsByTagName(planElement, PROPERTIES_ELEMENT);
175
176                        if(partitionProperties != null) {
177                                for (Element partition : partitionProperties) {
178                                        String partitionStepName = stepName + ":partition" + partition.getAttribute("partition");
179                                        new PropertyParser(partitionStepName, parserContext, BatchArtifactType.STEP, partitionStepName).parseProperty(partition);
180                                }
181                        }
182
183                        properties.add(THREADS_PROPERTY, threads);
184                        properties.add(PARTITIONS_PROPERTY, partitions);
185                }
186        }
187}