001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.messaging.rsocket.annotation.support;
017
018import java.util.Arrays;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.LinkedHashSet;
023import java.util.Map;
024import java.util.Set;
025
026import io.rsocket.frame.FrameType;
027
028import org.springframework.lang.Nullable;
029import org.springframework.messaging.Message;
030import org.springframework.messaging.handler.AbstractMessageCondition;
031import org.springframework.util.Assert;
032
033/**
034 * A condition to assist with mapping onto handler methods based on the RSocket
035 * frame type. This helps to separate the handling of connection-level frame
036 * types, i.e. {@code SETUP} and {@code METADATA_PUSH}, from the handling of
037 * stream requests.
038 *
039 * @author Rossen Stoyanchev
040 * @since 5.2
041 */
042public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<RSocketFrameTypeMessageCondition> {
043
044        /** The name of the header that contains the RSocket frame type being processed. */
045        public static final String FRAME_TYPE_HEADER = "rsocketFrameType";
046
047
048        /** Match connection-level frames "SETUP" or "METADATA_PUSH". */
049        public static final RSocketFrameTypeMessageCondition CONNECT_CONDITION =
050                        new RSocketFrameTypeMessageCondition(FrameType.SETUP, FrameType.METADATA_PUSH);
051
052        /** Match RSocket frames "REQUEST_FNF" or "REQUEST_RESPONSE". */
053        public static final RSocketFrameTypeMessageCondition REQUEST_FNF_OR_RESPONSE_CONDITION =
054                        new RSocketFrameTypeMessageCondition(FrameType.REQUEST_FNF, FrameType.REQUEST_RESPONSE);
055
056        /** Match RSocket frame "REQUEST_RESPONSE". */
057        public static final RSocketFrameTypeMessageCondition REQUEST_RESPONSE_CONDITION =
058                        new RSocketFrameTypeMessageCondition(FrameType.REQUEST_RESPONSE);
059
060        /** Match RSocket frame "REQUEST_STREAM". */
061        public static final RSocketFrameTypeMessageCondition REQUEST_STREAM_CONDITION =
062                        new RSocketFrameTypeMessageCondition(FrameType.REQUEST_STREAM);
063
064        /** Match RSocket frame "REQUEST_CHANNEL". */
065        public static final RSocketFrameTypeMessageCondition REQUEST_CHANNEL_CONDITION =
066                        new RSocketFrameTypeMessageCondition(FrameType.REQUEST_CHANNEL);
067
068        /** Empty condition that does not match to any RSocket frames (e.g. for type-level mappings) */
069        public static final RSocketFrameTypeMessageCondition EMPTY_CONDITION = new RSocketFrameTypeMessageCondition();
070
071
072        /**
073         * Condition to match "REQUEST_FNF", "REQUEST_RESPONSE", "REQUEST_STREAM",
074         * and "REQUEST_CHANNEL".
075         * @deprecated as of 5.2.2 because matching to all interaction types is too
076         * flexible. Please use one of the other constants in this class that match
077         * to specific frames.
078         */
079        @Deprecated
080        public static final RSocketFrameTypeMessageCondition REQUEST_CONDITION =
081                        new RSocketFrameTypeMessageCondition(
082                                        FrameType.REQUEST_FNF,
083                                        FrameType.REQUEST_RESPONSE,
084                                        FrameType.REQUEST_STREAM,
085                                        FrameType.REQUEST_CHANNEL);
086
087        /** Per FrameType cache to return ready instances from getMatchingCondition. */
088        private static final Map<String, RSocketFrameTypeMessageCondition> frameTypeConditionCache;
089
090        static {
091                frameTypeConditionCache = new HashMap<>(FrameType.values().length);
092                for (FrameType type : FrameType.values()) {
093                        frameTypeConditionCache.put(type.name(), new RSocketFrameTypeMessageCondition(type));
094                }
095        }
096
097
098        private final Set<FrameType> frameTypes;
099
100
101        public RSocketFrameTypeMessageCondition(FrameType... frameType) {
102                this(Arrays.asList(frameType));
103        }
104
105        public RSocketFrameTypeMessageCondition(Collection<FrameType> frameTypes) {
106                Assert.notEmpty(frameTypes, "`frameTypes` are required");
107                this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes));
108        }
109
110        private RSocketFrameTypeMessageCondition() {
111                this.frameTypes = Collections.emptySet();
112        }
113
114
115        public Set<FrameType> getFrameTypes() {
116                return this.frameTypes;
117        }
118
119        @Override
120        protected Collection<?> getContent() {
121                return this.frameTypes;
122        }
123
124        @Override
125        protected String getToStringInfix() {
126                return " || ";
127        }
128
129        /**
130         * Find the RSocket frame type in the message headers.
131         * @param message the current message
132         * @return the frame type or {@code null} if not found
133         */
134        @SuppressWarnings("ConstantConditions")
135        @Nullable
136        public static FrameType getFrameType(Message<?> message) {
137                return (FrameType) message.getHeaders().get(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER);
138        }
139
140
141        @Override
142        public RSocketFrameTypeMessageCondition combine(RSocketFrameTypeMessageCondition other) {
143                if (this.frameTypes.equals(other.frameTypes)) {
144                        return other;
145                }
146                Set<FrameType> set = new LinkedHashSet<>(this.frameTypes);
147                set.addAll(other.frameTypes);
148                return new RSocketFrameTypeMessageCondition(set);
149        }
150
151        @Override
152        public RSocketFrameTypeMessageCondition getMatchingCondition(Message<?> message) {
153                FrameType actual = message.getHeaders().get(FRAME_TYPE_HEADER, FrameType.class);
154                if (actual != null) {
155                        for (FrameType type : this.frameTypes) {
156                                if (actual == type) {
157                                        return frameTypeConditionCache.get(type.name());
158                                }
159                        }
160                }
161                return  null;
162        }
163
164        @Override
165        public int compareTo(RSocketFrameTypeMessageCondition other, Message<?> message) {
166                return other.frameTypes.size() - this.frameTypes.size();
167        }
168
169
170        /**
171         * Return a condition for matching the RSocket request interaction type with
172         * that is selected based on the delcared request and response cardinality
173         * of some handler method.
174         * <p>The table below shows the selections made:
175         * <table>
176         * <tr>
177         * <th>Request Cardinality</th>
178         * <th>Response Cardinality</th>
179         * <th>Interaction Types</th>
180         * </tr>
181         * <tr>
182         * <td>0,1</td>
183         * <td>0</td>
184         * <td>Fire-And-Forget, Request-Response</td>
185         * </tr>
186         * <tr>
187         * <td>0,1</td>
188         * <td>1</td>
189         * <td>Request-Response</td>
190         * </tr>
191         * <tr>
192         * <td>0,1</td>
193         * <td>2</td>
194         * <td>Request-Stream</td>
195         * </tr>
196         * <tr>
197         * <td>2</td>
198         * <td>Any</td>
199         * <td>Request-Channel</td>
200         * </tr>
201         * </table>
202         * @param cardinalityIn -- the request cardinality: 1 for a single payload,
203         * 2 for many payloads, and 0 if input is not handled.
204         * @param cardinalityOut -- the response cardinality: 0 for no output
205         * payloads, 1 for a single payload, and 2 for many payloads.
206         * @return a condition to use for matching the interaction type
207         * @since 5.2.2
208         */
209        public static RSocketFrameTypeMessageCondition getCondition(int cardinalityIn, int cardinalityOut) {
210                switch (cardinalityIn) {
211                        case 0:
212                        case 1:
213                                switch (cardinalityOut) {
214                                        case 0: return REQUEST_FNF_OR_RESPONSE_CONDITION;
215                                        case 1: return REQUEST_RESPONSE_CONDITION;
216                                        case 2: return REQUEST_STREAM_CONDITION;
217                                        default: throw new IllegalStateException("Invalid cardinality: " + cardinalityOut);
218                                }
219                        case 2:
220                                return REQUEST_CHANNEL_CONDITION;
221                        default:
222                                throw new IllegalStateException("Invalid cardinality: " + cardinalityIn);
223                }
224        }
225
226}