001/* 002 * Copyright 2006-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.file; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.List; 022import org.springframework.batch.item.ExecutionContext; 023import org.springframework.batch.item.ItemStreamException; 024import org.springframework.batch.item.support.AbstractItemStreamItemWriter; 025import org.springframework.core.io.FileSystemResource; 026import org.springframework.core.io.Resource; 027import org.springframework.util.Assert; 028import org.springframework.util.ClassUtils; 029 030/** 031 * Wraps a {@link ResourceAwareItemWriterItemStream} and creates a new output 032 * resource when the count of items written in current resource exceeds 033 * {@link #setItemCountLimitPerResource(int)}. Suffix creation can be customized 034 * with {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. 035 * 036 * Note that new resources are created only at chunk boundaries i.e. the number 037 * of items written into one resource is between the limit set by 038 * {@link #setItemCountLimitPerResource(int)} and (limit + chunk size). 039 * 040 * @param <T> item type 041 * 042 * @author Robert Kasanicky 043 */ 044public class MultiResourceItemWriter<T> extends AbstractItemStreamItemWriter<T> { 045 046 final static private String RESOURCE_INDEX_KEY = "resource.index"; 047 048 final static private String CURRENT_RESOURCE_ITEM_COUNT = "resource.item.count"; 049 050 private Resource resource; 051 052 private ResourceAwareItemWriterItemStream<? super T> delegate; 053 054 private int itemCountLimitPerResource = Integer.MAX_VALUE; 055 056 private int currentResourceItemCount = 0; 057 058 private int resourceIndex = 1; 059 060 private ResourceSuffixCreator suffixCreator = new SimpleResourceSuffixCreator(); 061 062 private boolean saveState = true; 063 064 private boolean opened = false; 065 066 public MultiResourceItemWriter() { 067 this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemWriter.class)); 068 } 069 070 @Override 071 public void write(List<? extends T> items) throws Exception { 072 if (!opened) { 073 File file = setResourceToDelegate(); 074 // create only if write is called 075 file.createNewFile(); 076 Assert.state(file.canWrite(), "Output resource " + file.getAbsolutePath() + " must be writable"); 077 delegate.open(new ExecutionContext()); 078 opened = true; 079 } 080 delegate.write(items); 081 currentResourceItemCount += items.size(); 082 if (currentResourceItemCount >= itemCountLimitPerResource) { 083 delegate.close(); 084 resourceIndex++; 085 currentResourceItemCount = 0; 086 setResourceToDelegate(); 087 opened = false; 088 } 089 } 090 091 /** 092 * Allows customization of the suffix of the created resources based on the 093 * index. 094 * 095 * @param suffixCreator {@link ResourceSuffixCreator} to be used by the writer. 096 */ 097 public void setResourceSuffixCreator(ResourceSuffixCreator suffixCreator) { 098 this.suffixCreator = suffixCreator; 099 } 100 101 /** 102 * After this limit is exceeded the next chunk will be written into newly 103 * created resource. 104 * 105 * @param itemCountLimitPerResource int item threshold used to determine when a new 106 * resource should be created. 107 */ 108 public void setItemCountLimitPerResource(int itemCountLimitPerResource) { 109 this.itemCountLimitPerResource = itemCountLimitPerResource; 110 } 111 112 /** 113 * Delegate used for actual writing of the output. 114 * 115 * @param delegate {@link ResourceAwareItemWriterItemStream} that will be used 116 * to write the output. 117 */ 118 public void setDelegate(ResourceAwareItemWriterItemStream<? super T> delegate) { 119 this.delegate = delegate; 120 } 121 122 /** 123 * Prototype for output resources. Actual output files will be created in 124 * the same directory and use the same name as this prototype with appended 125 * suffix (according to 126 * {@link #setResourceSuffixCreator(ResourceSuffixCreator)}. 127 * 128 * @param resource The prototype resource. 129 */ 130 public void setResource(Resource resource) { 131 this.resource = resource; 132 } 133 134 135 /** 136 * Indicates that the state of the reader will be saved after each commit. 137 * 138 * @param saveState true the state is saved. 139 */ 140 public void setSaveState(boolean saveState) { 141 this.saveState = saveState; 142 } 143 144 @Override 145 public void close() throws ItemStreamException { 146 super.close(); 147 resourceIndex = 1; 148 currentResourceItemCount = 0; 149 if (opened) { 150 delegate.close(); 151 } 152 } 153 154 @Override 155 public void open(ExecutionContext executionContext) throws ItemStreamException { 156 super.open(executionContext); 157 resourceIndex = executionContext.getInt(getExecutionContextKey(RESOURCE_INDEX_KEY), 1); 158 currentResourceItemCount = executionContext.getInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), 0); 159 160 try { 161 setResourceToDelegate(); 162 } 163 catch (IOException e) { 164 throw new ItemStreamException("Couldn't assign resource", e); 165 } 166 167 if (executionContext.containsKey(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT))) { 168 // It's a restart 169 delegate.open(executionContext); 170 opened = true; 171 } 172 else { 173 opened = false; 174 } 175 } 176 177 @Override 178 public void update(ExecutionContext executionContext) throws ItemStreamException { 179 super.update(executionContext); 180 if (saveState) { 181 if (opened) { 182 delegate.update(executionContext); 183 } 184 executionContext.putInt(getExecutionContextKey(CURRENT_RESOURCE_ITEM_COUNT), currentResourceItemCount); 185 executionContext.putInt(getExecutionContextKey(RESOURCE_INDEX_KEY), resourceIndex); 186 } 187 } 188 189 /** 190 * Create output resource (if necessary) and point the delegate to it. 191 */ 192 private File setResourceToDelegate() throws IOException { 193 String path = resource.getFile().getAbsolutePath() + suffixCreator.getSuffix(resourceIndex); 194 File file = new File(path); 195 delegate.setResource(new FileSystemResource(file)); 196 return file; 197 } 198}