001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.messaging.rsocket.annotation.support; 017 018import java.util.Arrays; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.HashMap; 022import java.util.LinkedHashSet; 023import java.util.Map; 024import java.util.Set; 025 026import io.rsocket.frame.FrameType; 027 028import org.springframework.lang.Nullable; 029import org.springframework.messaging.Message; 030import org.springframework.messaging.handler.AbstractMessageCondition; 031import org.springframework.util.Assert; 032 033/** 034 * A condition to assist with mapping onto handler methods based on the RSocket 035 * frame type. This helps to separate the handling of connection-level frame 036 * types, i.e. {@code SETUP} and {@code METADATA_PUSH}, from the handling of 037 * stream requests. 038 * 039 * @author Rossen Stoyanchev 040 * @since 5.2 041 */ 042public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<RSocketFrameTypeMessageCondition> { 043 044 /** The name of the header that contains the RSocket frame type being processed. */ 045 public static final String FRAME_TYPE_HEADER = "rsocketFrameType"; 046 047 048 /** Match connection-level frames "SETUP" or "METADATA_PUSH". */ 049 public static final RSocketFrameTypeMessageCondition CONNECT_CONDITION = 050 new RSocketFrameTypeMessageCondition(FrameType.SETUP, FrameType.METADATA_PUSH); 051 052 /** Match RSocket frames "REQUEST_FNF" or "REQUEST_RESPONSE". */ 053 public static final RSocketFrameTypeMessageCondition REQUEST_FNF_OR_RESPONSE_CONDITION = 054 new RSocketFrameTypeMessageCondition(FrameType.REQUEST_FNF, FrameType.REQUEST_RESPONSE); 055 056 /** Match RSocket frame "REQUEST_RESPONSE". */ 057 public static final RSocketFrameTypeMessageCondition REQUEST_RESPONSE_CONDITION = 058 new RSocketFrameTypeMessageCondition(FrameType.REQUEST_RESPONSE); 059 060 /** Match RSocket frame "REQUEST_STREAM". */ 061 public static final RSocketFrameTypeMessageCondition REQUEST_STREAM_CONDITION = 062 new RSocketFrameTypeMessageCondition(FrameType.REQUEST_STREAM); 063 064 /** Match RSocket frame "REQUEST_CHANNEL". */ 065 public static final RSocketFrameTypeMessageCondition REQUEST_CHANNEL_CONDITION = 066 new RSocketFrameTypeMessageCondition(FrameType.REQUEST_CHANNEL); 067 068 /** Empty condition that does not match to any RSocket frames (e.g. for type-level mappings) */ 069 public static final RSocketFrameTypeMessageCondition EMPTY_CONDITION = new RSocketFrameTypeMessageCondition(); 070 071 072 /** 073 * Condition to match "REQUEST_FNF", "REQUEST_RESPONSE", "REQUEST_STREAM", 074 * and "REQUEST_CHANNEL". 075 * @deprecated as of 5.2.2 because matching to all interaction types is too 076 * flexible. Please use one of the other constants in this class that match 077 * to specific frames. 078 */ 079 @Deprecated 080 public static final RSocketFrameTypeMessageCondition REQUEST_CONDITION = 081 new RSocketFrameTypeMessageCondition( 082 FrameType.REQUEST_FNF, 083 FrameType.REQUEST_RESPONSE, 084 FrameType.REQUEST_STREAM, 085 FrameType.REQUEST_CHANNEL); 086 087 /** Per FrameType cache to return ready instances from getMatchingCondition. */ 088 private static final Map<String, RSocketFrameTypeMessageCondition> frameTypeConditionCache; 089 090 static { 091 frameTypeConditionCache = new HashMap<>(FrameType.values().length); 092 for (FrameType type : FrameType.values()) { 093 frameTypeConditionCache.put(type.name(), new RSocketFrameTypeMessageCondition(type)); 094 } 095 } 096 097 098 private final Set<FrameType> frameTypes; 099 100 101 public RSocketFrameTypeMessageCondition(FrameType... frameType) { 102 this(Arrays.asList(frameType)); 103 } 104 105 public RSocketFrameTypeMessageCondition(Collection<FrameType> frameTypes) { 106 Assert.notEmpty(frameTypes, "`frameTypes` are required"); 107 this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes)); 108 } 109 110 private RSocketFrameTypeMessageCondition() { 111 this.frameTypes = Collections.emptySet(); 112 } 113 114 115 public Set<FrameType> getFrameTypes() { 116 return this.frameTypes; 117 } 118 119 @Override 120 protected Collection<?> getContent() { 121 return this.frameTypes; 122 } 123 124 @Override 125 protected String getToStringInfix() { 126 return " || "; 127 } 128 129 /** 130 * Find the RSocket frame type in the message headers. 131 * @param message the current message 132 * @return the frame type or {@code null} if not found 133 */ 134 @SuppressWarnings("ConstantConditions") 135 @Nullable 136 public static FrameType getFrameType(Message<?> message) { 137 return (FrameType) message.getHeaders().get(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER); 138 } 139 140 141 @Override 142 public RSocketFrameTypeMessageCondition combine(RSocketFrameTypeMessageCondition other) { 143 if (this.frameTypes.equals(other.frameTypes)) { 144 return other; 145 } 146 Set<FrameType> set = new LinkedHashSet<>(this.frameTypes); 147 set.addAll(other.frameTypes); 148 return new RSocketFrameTypeMessageCondition(set); 149 } 150 151 @Override 152 public RSocketFrameTypeMessageCondition getMatchingCondition(Message<?> message) { 153 FrameType actual = message.getHeaders().get(FRAME_TYPE_HEADER, FrameType.class); 154 if (actual != null) { 155 for (FrameType type : this.frameTypes) { 156 if (actual == type) { 157 return frameTypeConditionCache.get(type.name()); 158 } 159 } 160 } 161 return null; 162 } 163 164 @Override 165 public int compareTo(RSocketFrameTypeMessageCondition other, Message<?> message) { 166 return other.frameTypes.size() - this.frameTypes.size(); 167 } 168 169 170 /** 171 * Return a condition for matching the RSocket request interaction type with 172 * that is selected based on the delcared request and response cardinality 173 * of some handler method. 174 * <p>The table below shows the selections made: 175 * <table> 176 * <tr> 177 * <th>Request Cardinality</th> 178 * <th>Response Cardinality</th> 179 * <th>Interaction Types</th> 180 * </tr> 181 * <tr> 182 * <td>0,1</td> 183 * <td>0</td> 184 * <td>Fire-And-Forget, Request-Response</td> 185 * </tr> 186 * <tr> 187 * <td>0,1</td> 188 * <td>1</td> 189 * <td>Request-Response</td> 190 * </tr> 191 * <tr> 192 * <td>0,1</td> 193 * <td>2</td> 194 * <td>Request-Stream</td> 195 * </tr> 196 * <tr> 197 * <td>2</td> 198 * <td>Any</td> 199 * <td>Request-Channel</td> 200 * </tr> 201 * </table> 202 * @param cardinalityIn -- the request cardinality: 1 for a single payload, 203 * 2 for many payloads, and 0 if input is not handled. 204 * @param cardinalityOut -- the response cardinality: 0 for no output 205 * payloads, 1 for a single payload, and 2 for many payloads. 206 * @return a condition to use for matching the interaction type 207 * @since 5.2.2 208 */ 209 public static RSocketFrameTypeMessageCondition getCondition(int cardinalityIn, int cardinalityOut) { 210 switch (cardinalityIn) { 211 case 0: 212 case 1: 213 switch (cardinalityOut) { 214 case 0: return REQUEST_FNF_OR_RESPONSE_CONDITION; 215 case 1: return REQUEST_RESPONSE_CONDITION; 216 case 2: return REQUEST_STREAM_CONDITION; 217 default: throw new IllegalStateException("Invalid cardinality: " + cardinalityOut); 218 } 219 case 2: 220 return REQUEST_CHANNEL_CONDITION; 221 default: 222 throw new IllegalStateException("Invalid cardinality: " + cardinalityIn); 223 } 224 } 225 226}