001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.dao;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.IOException;
022import java.sql.PreparedStatement;
023import java.sql.ResultSet;
024import java.sql.SQLException;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Map.Entry;
031
032import org.springframework.batch.core.JobExecution;
033import org.springframework.batch.core.StepExecution;
034import org.springframework.batch.core.repository.ExecutionContextSerializer;
035import org.springframework.batch.item.ExecutionContext;
036import org.springframework.core.serializer.Serializer;
037import org.springframework.jdbc.core.BatchPreparedStatementSetter;
038import org.springframework.jdbc.core.PreparedStatementSetter;
039import org.springframework.jdbc.core.RowMapper;
040import org.springframework.jdbc.support.lob.DefaultLobHandler;
041import org.springframework.jdbc.support.lob.LobHandler;
042import org.springframework.util.Assert;
043
044/**
045 * JDBC DAO for {@link ExecutionContext}.
046 *
047 * Stores execution context data related to both Step and Job using
048 * a different table for each.
049 *
050 * @author Lucas Ward
051 * @author Robert Kasanicky
052 * @author Thomas Risberg
053 * @author Michael Minella
054 * @author David Turanski
055 * @author Mahmoud Ben Hassine
056 */
057public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao {
058
059        private static final String FIND_JOB_EXECUTION_CONTEXT = "SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT "
060                        + "FROM %PREFIX%JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID = ?";
061
062        private static final String INSERT_JOB_EXECUTION_CONTEXT = "INSERT INTO %PREFIX%JOB_EXECUTION_CONTEXT "
063                        + "(SHORT_CONTEXT, SERIALIZED_CONTEXT, JOB_EXECUTION_ID) " + "VALUES(?, ?, ?)";
064
065        private static final String UPDATE_JOB_EXECUTION_CONTEXT = "UPDATE %PREFIX%JOB_EXECUTION_CONTEXT "
066                        + "SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? " + "WHERE JOB_EXECUTION_ID = ?";
067
068        private static final String FIND_STEP_EXECUTION_CONTEXT = "SELECT SHORT_CONTEXT, SERIALIZED_CONTEXT "
069                        + "FROM %PREFIX%STEP_EXECUTION_CONTEXT WHERE STEP_EXECUTION_ID = ?";
070
071        private static final String INSERT_STEP_EXECUTION_CONTEXT = "INSERT INTO %PREFIX%STEP_EXECUTION_CONTEXT "
072                        + "(SHORT_CONTEXT, SERIALIZED_CONTEXT, STEP_EXECUTION_ID) " + "VALUES(?, ?, ?)";
073
074        private static final String UPDATE_STEP_EXECUTION_CONTEXT = "UPDATE %PREFIX%STEP_EXECUTION_CONTEXT "
075                        + "SET SHORT_CONTEXT = ?, SERIALIZED_CONTEXT = ? " + "WHERE STEP_EXECUTION_ID = ?";
076
077        private static final int DEFAULT_MAX_VARCHAR_LENGTH = 2500;
078
079        private int shortContextLength = DEFAULT_MAX_VARCHAR_LENGTH;
080
081        private LobHandler lobHandler = new DefaultLobHandler();
082
083        private ExecutionContextSerializer serializer;
084
085        /**
086         * Setter for {@link Serializer} implementation
087         *
088         * @param serializer {@link ExecutionContextSerializer} instance to use.
089         */
090        public void setSerializer(ExecutionContextSerializer serializer) {
091                Assert.notNull(serializer, "Serializer must not be null");
092                this.serializer = serializer;
093        }
094
095        /**
096         * The maximum size that an execution context can have and still be stored
097         * completely in short form in the column <code>SHORT_CONTEXT</code>.
098         * Anything longer than this will overflow into large-object storage, and
099         * the first part only will be retained in the short form for readability.
100         * Default value is 2500. Clients using multi-bytes charsets on the database
101         * server may need to reduce this value to as little as half the value of
102         * the column size.
103         * @param shortContextLength int max length of the short context.
104         */
105        public void setShortContextLength(int shortContextLength) {
106                this.shortContextLength = shortContextLength;
107        }
108
109        @Override
110        public ExecutionContext getExecutionContext(JobExecution jobExecution) {
111                Long executionId = jobExecution.getId();
112                Assert.notNull(executionId, "ExecutionId must not be null.");
113
114                List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTION_CONTEXT),
115                                new ExecutionContextRowMapper(), executionId);
116                if (results.size() > 0) {
117                        return results.get(0);
118                }
119                else {
120                        return new ExecutionContext();
121                }
122        }
123
124        @Override
125        public ExecutionContext getExecutionContext(StepExecution stepExecution) {
126                Long executionId = stepExecution.getId();
127                Assert.notNull(executionId, "ExecutionId must not be null.");
128
129                List<ExecutionContext> results = getJdbcTemplate().query(getQuery(FIND_STEP_EXECUTION_CONTEXT),
130                                new ExecutionContextRowMapper(), executionId);
131                if (results.size() > 0) {
132                        return results.get(0);
133                }
134                else {
135                        return new ExecutionContext();
136                }
137        }
138
139        @Override
140        public void updateExecutionContext(final JobExecution jobExecution) {
141                Long executionId = jobExecution.getId();
142                ExecutionContext executionContext = jobExecution.getExecutionContext();
143                Assert.notNull(executionId, "ExecutionId must not be null.");
144                Assert.notNull(executionContext, "The ExecutionContext must not be null.");
145
146                String serializedContext = serializeContext(executionContext);
147
148                persistSerializedContext(executionId, serializedContext, UPDATE_JOB_EXECUTION_CONTEXT);
149        }
150
151        @Override
152        public void updateExecutionContext(final StepExecution stepExecution) {
153                // Attempt to prevent concurrent modification errors by blocking here if
154                // someone is already trying to do it.
155                synchronized (stepExecution) {
156                        Long executionId = stepExecution.getId();
157                        ExecutionContext executionContext = stepExecution.getExecutionContext();
158                        Assert.notNull(executionId, "ExecutionId must not be null.");
159                        Assert.notNull(executionContext, "The ExecutionContext must not be null.");
160
161                        String serializedContext = serializeContext(executionContext);
162
163                        persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT);
164                }
165        }
166
167        @Override
168        public void saveExecutionContext(JobExecution jobExecution) {
169
170                Long executionId = jobExecution.getId();
171                ExecutionContext executionContext = jobExecution.getExecutionContext();
172                Assert.notNull(executionId, "ExecutionId must not be null.");
173                Assert.notNull(executionContext, "The ExecutionContext must not be null.");
174
175                String serializedContext = serializeContext(executionContext);
176
177                persistSerializedContext(executionId, serializedContext, INSERT_JOB_EXECUTION_CONTEXT);
178        }
179
180        @Override
181        public void saveExecutionContext(StepExecution stepExecution) {
182                Long executionId = stepExecution.getId();
183                ExecutionContext executionContext = stepExecution.getExecutionContext();
184                Assert.notNull(executionId, "ExecutionId must not be null.");
185                Assert.notNull(executionContext, "The ExecutionContext must not be null.");
186
187                String serializedContext = serializeContext(executionContext);
188
189                persistSerializedContext(executionId, serializedContext, INSERT_STEP_EXECUTION_CONTEXT);
190        }
191
192        @Override
193        public void saveExecutionContexts(Collection<StepExecution> stepExecutions) {
194                Assert.notNull(stepExecutions, "Attempt to save an null collection of step executions");
195                Map<Long, String> serializedContexts = new HashMap<Long, String>(stepExecutions.size());
196                for (StepExecution stepExecution : stepExecutions) {
197                        Long executionId = stepExecution.getId();
198                        ExecutionContext executionContext = stepExecution.getExecutionContext();
199                        Assert.notNull(executionId, "ExecutionId must not be null.");
200                        Assert.notNull(executionContext, "The ExecutionContext must not be null.");
201                        serializedContexts.put(executionId, serializeContext(executionContext));
202                }
203                persistSerializedContexts(serializedContexts, INSERT_STEP_EXECUTION_CONTEXT);
204        }
205
206        public void setLobHandler(LobHandler lobHandler) {
207                this.lobHandler = lobHandler;
208        }
209
210        @Override
211        public void afterPropertiesSet() throws Exception {
212                super.afterPropertiesSet();
213                Assert.state(serializer != null, "ExecutionContextSerializer is required");
214        }
215
216        /**
217         * @param executionId
218         * @param serializedContext
219         * @param sql with parameters (shortContext, longContext, executionId)
220         */
221        private void persistSerializedContext(final Long executionId, String serializedContext, String sql) {
222
223                final String shortContext;
224                final String longContext;
225                if (serializedContext.length() > shortContextLength) {
226                        // Overestimate length of ellipsis to be on the safe side with
227                        // 2-byte chars
228                        shortContext = serializedContext.substring(0, shortContextLength - 8) + " ...";
229                        longContext = serializedContext;
230                }
231                else {
232                        shortContext = serializedContext;
233                        longContext = null;
234                }
235
236                getJdbcTemplate().update(getQuery(sql), new PreparedStatementSetter() {
237                        @Override
238                        public void setValues(PreparedStatement ps) throws SQLException {
239                                ps.setString(1, shortContext);
240                                if (longContext != null) {
241                                        lobHandler.getLobCreator().setClobAsString(ps, 2, longContext);
242                                }
243                                else {
244                                        ps.setNull(2, getClobTypeToUse());
245                                }
246                                ps.setLong(3, executionId);
247                        }
248                });
249        }
250
251        /**
252         * @param serializedContexts
253         * @param sql with parameters (shortContext, longContext, executionId)
254         */
255        private void persistSerializedContexts(final Map<Long, String> serializedContexts, String sql) {
256        if (!serializedContexts.isEmpty()) {
257            final Iterator<Long> executionIdIterator = serializedContexts.keySet().iterator();
258
259            getJdbcTemplate().batchUpdate(getQuery(sql), new BatchPreparedStatementSetter() {
260                @Override
261                public void setValues(PreparedStatement ps, int i) throws SQLException {
262                    Long executionId = executionIdIterator.next();
263                    String serializedContext = serializedContexts.get(executionId);
264                    String shortContext;
265                    String longContext;
266                    if (serializedContext.length() > shortContextLength) {
267                        // Overestimate length of ellipsis to be on the safe side with
268                        // 2-byte chars
269                        shortContext = serializedContext.substring(0, shortContextLength - 8) + " ...";
270                        longContext = serializedContext;
271                    } else {
272                        shortContext = serializedContext;
273                        longContext = null;
274                    }
275                    ps.setString(1, shortContext);
276                    if (longContext != null) {
277                        lobHandler.getLobCreator().setClobAsString(ps, 2, longContext);
278                    } else {
279                        ps.setNull(2, getClobTypeToUse());
280                    }
281                    ps.setLong(3, executionId);
282                }
283
284                @Override
285                public int getBatchSize() {
286                    return serializedContexts.size();
287                }
288            });
289        }
290    }
291
292        private String serializeContext(ExecutionContext ctx) {
293                Map<String, Object> m = new HashMap<String, Object>();
294                for (Entry<String, Object> me : ctx.entrySet()) {
295                        m.put(me.getKey(), me.getValue());
296                }
297
298                ByteArrayOutputStream out = new ByteArrayOutputStream();
299                String results = "";
300
301                try {
302                        serializer.serialize(m, out);
303                        results = new String(out.toByteArray(), "ISO-8859-1");
304                }
305                catch (IOException ioe) {
306                        throw new IllegalArgumentException("Could not serialize the execution context", ioe);
307                }
308
309                return results;
310        }
311
312        private class ExecutionContextRowMapper implements RowMapper<ExecutionContext> {
313
314                @Override
315                public ExecutionContext mapRow(ResultSet rs, int i) throws SQLException {
316                        ExecutionContext executionContext = new ExecutionContext();
317                        String serializedContext = rs.getString("SERIALIZED_CONTEXT");
318                        if (serializedContext == null) {
319                                serializedContext = rs.getString("SHORT_CONTEXT");
320                        }
321
322                        Map<String, Object> map;
323                        try {
324                                ByteArrayInputStream in = new ByteArrayInputStream(serializedContext.getBytes("ISO-8859-1"));
325                                map = serializer.deserialize(in);
326                        }
327                        catch (IOException ioe) {
328                                throw new IllegalArgumentException("Unable to deserialize the execution context", ioe);
329                        }
330                        for (Map.Entry<String, Object> entry : map.entrySet()) {
331                                executionContext.put(entry.getKey(), entry.getValue());
332                        }
333                        return executionContext;
334                }
335        }
336
337}