001/*
002 * Copyright 2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.jsr.item;
017
018import java.io.Serializable;
019
020import javax.batch.api.chunk.ItemReader;
021import javax.batch.api.chunk.ItemWriter;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.springframework.batch.item.ExecutionContext;
026import org.springframework.batch.item.ItemStreamException;
027import org.springframework.batch.item.ItemStreamSupport;
028import org.springframework.util.Assert;
029import org.springframework.util.SerializationUtils;
030
031/**
032 * Provides support for JSR-352 checkpointing.  Checkpoint objects are copied prior
033 * to being added to the {@link ExecutionContext} for persistence by the framework.
034 * If the checkpoint object cannot be copied and further changes occur to the same
035 * instance, side effects may occur.  In cases like this, it is recommended that a
036 * copy of the object being acted upon in the reader/writer is returned via the
037 * {@link ItemReader#checkpointInfo()} or {@link ItemWriter#checkpointInfo()} calls.
038 *
039 * @author Michael Minella
040 * @since 3.0
041 */
042public abstract class CheckpointSupport extends ItemStreamSupport{
043
044        private final Log logger = LogFactory.getLog(this.getClass());
045
046        private final String checkpointKey;
047
048        /**
049         * @param checkpointKey key to store the checkpoint object with in the {@link ExecutionContext}
050         */
051        public CheckpointSupport(String checkpointKey) {
052                Assert.hasText(checkpointKey, "checkpointKey is required");
053                this.checkpointKey = checkpointKey;
054        }
055
056        /* (non-Javadoc)
057         * @see org.springframework.batch.item.ItemStreamSupport#open(org.springframework.batch.item.ExecutionContext)
058         */
059        @Override
060        public void open(ExecutionContext executionContext)
061                        throws ItemStreamException {
062                try {
063                        String executionContextKey = getExecutionContextKey(checkpointKey);
064                        Serializable checkpoint = (Serializable) executionContext.get(executionContextKey);
065                        doOpen(checkpoint);
066                } catch (Exception e) {
067                        throw new ItemStreamException(e);
068                }
069        }
070
071        /**
072         * Used to open a batch artifact with previously saved checkpoint information.
073         *
074         * @param checkpoint previously saved checkpoint object
075         * @throws Exception thrown by the implementation
076         */
077        protected abstract void doOpen(Serializable checkpoint) throws Exception;
078
079        /* (non-Javadoc)
080         * @see org.springframework.batch.item.ItemStreamSupport#update(org.springframework.batch.item.ExecutionContext)
081         */
082        @Override
083        public void update(ExecutionContext executionContext)
084                        throws ItemStreamException {
085                try {
086                        executionContext.put(getExecutionContextKey(checkpointKey), deepCopy(doCheckpoint()));
087                } catch (Exception e) {
088                        throw new ItemStreamException(e);
089                }
090        }
091
092        /**
093         * Used to provide a {@link Serializable} representing the current state of the
094         * batch artifact.
095         *
096         * @return the current state of the batch artifact
097         * @throws Exception thrown by the implementation
098         */
099        protected abstract Serializable doCheckpoint() throws Exception;
100
101        /* (non-Javadoc)
102         * @see org.springframework.batch.item.ItemStreamSupport#close()
103         */
104        @Override
105        public void close() throws ItemStreamException {
106                try {
107                        doClose();
108                } catch (Exception e) {
109                        throw new ItemStreamException(e);
110                }
111        }
112
113        /**
114         * Used to close the underlying batch artifact
115         *
116         * @throws Exception thrown by the underlying implementation
117         */
118        protected abstract void doClose() throws Exception;
119
120        private Object deepCopy(Serializable orig) {
121                Object obj = orig;
122
123                try {
124                        obj = SerializationUtils.deserialize(SerializationUtils.serialize(orig));
125                } catch (Exception e) {
126                        logger.warn("Unable to copy checkpoint object.  Updating the instance passed may cause side effects");
127                }
128
129                return obj;
130        }
131
132}