001/*
002 * Copyright 2006-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.file;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.List;
022import org.springframework.batch.item.ExecutionContext;
023import org.springframework.batch.item.ItemStreamException;
024import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
025import org.springframework.core.io.FileSystemResource;
026import org.springframework.core.io.Resource;
027import org.springframework.util.Assert;
028import org.springframework.util.ClassUtils;
029
030/**
031 * Wraps a {@link ResourceAwareItemWriterItemStream} and creates a new output
032 * resource when the count of items written in current resource exceeds
033 * {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized
034 * with {@link #setResourceSuffixCreator(ResourceSuffixCreator)}.
035 * 
036 * Note that new resources are created only at chunk boundaries i.e. the number
037 * of items written into one resource is between the limit set by
038 * {@link #setItemCountLimitPerResource(int)} and (limit + chunk size).
039 * 
040 * @param <T> item type
041 * 
042 * @author Robert Kasanicky
043 */
044public class MultiResourceItemWriter<T> extends AbstractItemStreamItemWriter<T> {
045
046        final static private String RESOURCE_INDEX_KEY = "resource.index";
047
048        final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count";
049
050        private Resource resource;
051
052        private ResourceAwareItemWriterItemStream<? super T> delegate;
053
054        private int itemCountLimitPerResource = Integer.MAX_VALUE;
055
056        private int currentResourceItemCount = 0;
057
058        private int resourceIndex = 1;
059
060        private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator();
061
062        private boolean saveState = true;
063
064        private boolean opened = false;
065
066        public MultiResourceItemWriter() {
067                this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemWriter.class));
068        }
069
070    @Override
071        public void write(List<? extends T> items) throws Exception {
072                if (!opened) {
073                        File file = setResourceToDelegate();
074                        // create only if write is called
075                        file.createNewFile();
076                        Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable");
077                        delegate.open(new ExecutionContext());
078                        opened = true;
079                }
080                delegate.write(items);
081                currentResourceItemCount += items.size();
082                if (currentResourceItemCount >= itemCountLimitPerResource) {
083                        delegate.close();
084                        resourceIndex++;
085                        currentResourceItemCount = 0;
086                        setResourceToDelegate();
087                        opened = false;
088                }
089        }
090
091        /**
092         * Allows customization of the suffix of the created resources based on the
093         * index.
094         *
095         * @param suffixCreator {@link ResourceSuffixCreator} to be used by the writer.
096         */
097        public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) {
098                this.suffixCreator = suffixCreator;
099        }
100
101        /**
102         * After this limit is exceeded the next chunk will be written into newly
103         * created resource.
104         *
105         * @param itemCountLimitPerResource int item threshold used to determine when a new
106         * resource should be created.
107         */
108        public void setItemCountLimitPerResource(int itemCountLimitPerResource) {
109                this.itemCountLimitPerResource = itemCountLimitPerResource;
110        }
111
112        /**
113         * Delegate used for actual writing of the output.
114         *
115         * @param delegate {@link ResourceAwareItemWriterItemStream} that will be used
116         * to write the output.
117         */
118        public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) {
119                this.delegate = delegate;
120        }
121
122        /**
123         * Prototype for output resources. Actual output files will be created in
124         * the same directory and use the same name as this prototype with appended
125         * suffix (according to
126         * {@link #setResourceSuffixCreator(ResourceSuffixCreator)}.
127         *
128         * @param resource The prototype resource.
129         */
130        public void setResource(Resource resource) {
131                this.resource = resource;
132        }
133
134
135        /**
136         * Indicates that the state of the reader will be saved after each commit.
137         *
138         * @param saveState true the state is saved.
139         */
140        public void setSaveState(boolean saveState) {
141                this.saveState = saveState;
142        }
143
144    @Override
145        public void close() throws ItemStreamException {
146                super.close();
147                resourceIndex = 1;
148                currentResourceItemCount = 0;
149                if (opened) {
150                        delegate.close();
151                }
152        }
153
154    @Override
155        public void open(ExecutionContext executionContext) throws ItemStreamException {
156                super.open(executionContext);
157                resourceIndex = executionContext.getInt(getExecutionContextKey(RESOURCE_INDEX_KEY), 1);
158                currentResourceItemCount = executionContext.getInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), 0);
159
160                try {
161                        setResourceToDelegate();
162                }
163                catch (IOException e) {
164                        throw new ItemStreamException("Couldn't assign resource", e);
165                }
166
167                if (executionContext.containsKey(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT))) {
168                        // It's a restart
169                        delegate.open(executionContext);
170                        opened = true;
171                }
172                else {
173                        opened = false;
174                }
175        }
176
177    @Override
178        public void update(ExecutionContext executionContext) throws ItemStreamException {
179                super.update(executionContext);
180                if (saveState) {
181                        if (opened) {
182                                delegate.update(executionContext);
183                        }
184                        executionContext.putInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount);
185                        executionContext.putInt(getExecutionContextKey(RESOURCE_INDEX_KEY), resourceIndex);
186                }
187        }
188
189        /**
190         * Create output resource (if necessary) and point the delegate to it.
191         */
192        private File setResourceToDelegate() throws IOException {
193                String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex);
194                File file = new File(path);
195                delegate.setResource(new FileSystemResource(file));
196                return file;
197        }
198}