001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.jsr.item; 017 018import java.io.Serializable; 019 020import javax.batch.api.chunk.ItemReader; 021import javax.batch.api.chunk.ItemWriter; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.springframework.batch.item.ExecutionContext; 026import org.springframework.batch.item.ItemStreamException; 027import org.springframework.batch.item.ItemStreamSupport; 028import org.springframework.util.Assert; 029import org.springframework.util.SerializationUtils; 030 031/** 032 * Provides support for JSR-352 checkpointing. Checkpoint objects are copied prior 033 * to being added to the {@link ExecutionContext} for persistence by the framework. 034 * If the checkpoint object cannot be copied and further changes occur to the same 035 * instance, side effects may occur. In cases like this, it is recommended that a 036 * copy of the object being acted upon in the reader/writer is returned via the 037 * {@link ItemReader#checkpointInfo()} or {@link ItemWriter#checkpointInfo()} calls. 038 * 039 * @author Michael Minella 040 * @since 3.0 041 */ 042public abstract class CheckpointSupport extends ItemStreamSupport{ 043 044 private final Log logger = LogFactory.getLog(this.getClass()); 045 046 private final String checkpointKey; 047 048 /** 049 * @param checkpointKey key to store the checkpoint object with in the {@link ExecutionContext} 050 */ 051 public CheckpointSupport(String checkpointKey) { 052 Assert.hasText(checkpointKey, "checkpointKey is required"); 053 this.checkpointKey = checkpointKey; 054 } 055 056 /* (non-Javadoc) 057 * @see org.springframework.batch.item.ItemStreamSupport#open(org.springframework.batch.item.ExecutionContext) 058 */ 059 @Override 060 public void open(ExecutionContext executionContext) 061 throws ItemStreamException { 062 try { 063 String executionContextKey = getExecutionContextKey(checkpointKey); 064 Serializable checkpoint = (Serializable) executionContext.get(executionContextKey); 065 doOpen(checkpoint); 066 } catch (Exception e) { 067 throw new ItemStreamException(e); 068 } 069 } 070 071 /** 072 * Used to open a batch artifact with previously saved checkpoint information. 073 * 074 * @param checkpoint previously saved checkpoint object 075 * @throws Exception thrown by the implementation 076 */ 077 protected abstract void doOpen(Serializable checkpoint) throws Exception; 078 079 /* (non-Javadoc) 080 * @see org.springframework.batch.item.ItemStreamSupport#update(org.springframework.batch.item.ExecutionContext) 081 */ 082 @Override 083 public void update(ExecutionContext executionContext) 084 throws ItemStreamException { 085 try { 086 executionContext.put(getExecutionContextKey(checkpointKey), deepCopy(doCheckpoint())); 087 } catch (Exception e) { 088 throw new ItemStreamException(e); 089 } 090 } 091 092 /** 093 * Used to provide a {@link Serializable} representing the current state of the 094 * batch artifact. 095 * 096 * @return the current state of the batch artifact 097 * @throws Exception thrown by the implementation 098 */ 099 protected abstract Serializable doCheckpoint() throws Exception; 100 101 /* (non-Javadoc) 102 * @see org.springframework.batch.item.ItemStreamSupport#close() 103 */ 104 @Override 105 public void close() throws ItemStreamException { 106 try { 107 doClose(); 108 } catch (Exception e) { 109 throw new ItemStreamException(e); 110 } 111 } 112 113 /** 114 * Used to close the underlying batch artifact 115 * 116 * @throws Exception thrown by the underlying implementation 117 */ 118 protected abstract void doClose() throws Exception; 119 120 private Object deepCopy(Serializable orig) { 121 Object obj = orig; 122 123 try { 124 obj = SerializationUtils.deserialize(SerializationUtils.serialize(orig)); 125 } catch (Exception e) { 126 logger.warn("Unable to copy checkpoint object. Updating the instance passed may cause side effects"); 127 } 128 129 return obj; 130 } 131 132}