001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.amqp;
018
019import org.springframework.amqp.core.AmqpTemplate;
020import org.springframework.amqp.core.Message;
021import org.springframework.batch.item.ItemReader;
022import org.springframework.util.Assert;
023
024/**
025 * <p>
026 * AMQP {@link ItemReader} implementation using an {@link AmqpTemplate} to
027 * receive and/or convert messages.
028 * </p>
029 *
030 * @author Chris Schaefer
031 * @author Mahmoud Ben Hassine
032 */
033public class AmqpItemReader<T> implements ItemReader<T> {
034        private final AmqpTemplate amqpTemplate;
035        private Class<? extends T> itemType;
036
037        /**
038         * Initialize the AmqpItemReader.
039         *
040         * @param amqpTemplate the template to be used.  Must not be null.
041         */
042        public AmqpItemReader(final AmqpTemplate amqpTemplate) {
043                Assert.notNull(amqpTemplate, "AmqpTemplate must not be null");
044
045                this.amqpTemplate = amqpTemplate;
046        }
047
048        @Override
049        @SuppressWarnings("unchecked")
050        public T read() {
051                if (itemType != null && itemType.isAssignableFrom(Message.class)) {
052                        return (T) amqpTemplate.receive();
053                }
054
055                Object result = amqpTemplate.receiveAndConvert();
056
057                if (itemType != null && result != null) {
058                        Assert.state(itemType.isAssignableFrom(result.getClass()),
059                                        "Received message payload of wrong type: expected [" + itemType + "]");
060                }
061
062                return (T) result;
063        }
064
065        /**
066         * Establish the itemType for the reader.
067         * 
068         * @param itemType class type that will be returned by the reader.
069         */
070        public void setItemType(Class<? extends T> itemType) {
071                Assert.notNull(itemType, "Item type cannot be null");
072                this.itemType = itemType;
073        }
074}