001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.messaging.rsocket;
017
018import java.util.ArrayList;
019import java.util.Arrays;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.function.BiConsumer;
026
027import io.netty.buffer.ByteBuf;
028import io.netty.buffer.ByteBufAllocator;
029import io.rsocket.Payload;
030import io.rsocket.metadata.CompositeMetadata;
031import io.rsocket.metadata.RoutingMetadata;
032import io.rsocket.metadata.WellKnownMimeType;
033
034import org.springframework.core.ParameterizedTypeReference;
035import org.springframework.core.ResolvableType;
036import org.springframework.core.codec.Decoder;
037import org.springframework.core.io.buffer.NettyDataBuffer;
038import org.springframework.core.io.buffer.NettyDataBufferFactory;
039import org.springframework.lang.Nullable;
040import org.springframework.util.MimeType;
041
042/**
043 * Default {@link MetadataExtractor} implementation that relies on
044 * {@link Decoder}s to deserialize the content of metadata entries.
045 * <p>By default only {@code "message/x.rsocket.routing.v0""} is extracted and
046 * saved under {@link MetadataExtractor#ROUTE_KEY}. Use {@code metadataToExtract}
047 * methods to specify other metadata mime types of interest to extract.
048 *
049 * @author Rossen Stoyanchev
050 * @since 5.2
051 */
052public class DefaultMetadataExtractor implements MetadataExtractor, MetadataExtractorRegistry {
053
054        private final List<Decoder<?>> decoders;
055
056        private final Map<String, EntryExtractor<?>> registrations = new HashMap<>();
057
058
059        /**
060         * Constructor with decoders for de-serializing metadata entries.
061         */
062        public DefaultMetadataExtractor(Decoder<?>... decoders) {
063                this(Arrays.asList(decoders));
064        }
065
066        /**
067         * Constructor with list of decoders for de-serializing metadata entries.
068         */
069        public DefaultMetadataExtractor(List<Decoder<?>> decoders) {
070                this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
071        }
072
073
074        /**
075         * Return a read-only list with the configured decoders.
076         */
077        public List<? extends Decoder<?>> getDecoders() {
078                return this.decoders;
079        }
080
081        @Override
082        public <T> void metadataToExtract(
083                        MimeType mimeType, Class<T> targetType, BiConsumer<T, Map<String, Object>> mapper) {
084
085                registerMetadata(mimeType, ResolvableType.forClass(targetType), mapper);
086        }
087
088        @Override
089        public <T> void metadataToExtract(
090                        MimeType mimeType, ParameterizedTypeReference<T> type, BiConsumer<T, Map<String, Object>> mapper) {
091
092                registerMetadata(mimeType, ResolvableType.forType(type), mapper);
093        }
094
095        @SuppressWarnings("unchecked")
096        private <T> void registerMetadata(
097                        MimeType mimeType, ResolvableType targetType, BiConsumer<T, Map<String, Object>> mapper) {
098
099                for (Decoder<?> decoder : this.decoders) {
100                        if (decoder.canDecode(targetType, mimeType)) {
101                                this.registrations.put(mimeType.toString(),
102                                                new EntryExtractor<>((Decoder<T>) decoder, mimeType, targetType, mapper));
103                                return;
104                        }
105                }
106                throw new IllegalArgumentException("No decoder for " + mimeType + " and " + targetType);
107        }
108
109
110        @Override
111        public Map<String, Object> extract(Payload payload, MimeType metadataMimeType) {
112                Map<String, Object> result = new HashMap<>();
113                if (metadataMimeType.toString().equals(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString())) {
114                        for (CompositeMetadata.Entry entry : new CompositeMetadata(payload.metadata(), false)) {
115                                extractEntry(entry.getContent(), entry.getMimeType(), result);
116                        }
117                }
118                else {
119                        extractEntry(payload.metadata().slice(), metadataMimeType.toString(), result);
120                }
121                return result;
122        }
123
124        private void extractEntry(ByteBuf content, @Nullable String mimeType, Map<String, Object> result) {
125                if (content.readableBytes() == 0) {
126                        return;
127                }
128                EntryExtractor<?> extractor = this.registrations.get(mimeType);
129                if (extractor != null) {
130                        extractor.extract(content, result);
131                        return;
132                }
133                if (mimeType != null && mimeType.equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) {
134                        Iterator<String> iterator = new RoutingMetadata(content).iterator();
135                        if (iterator.hasNext()) {
136                                result.put(MetadataExtractor.ROUTE_KEY, iterator.next());
137                        }
138                }
139        }
140
141
142        private static class EntryExtractor<T> {
143
144                // We only need this to wrap ByteBufs
145                private final static NettyDataBufferFactory bufferFactory =
146                                new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
147
148
149                private final Decoder<T> decoder;
150
151                private final MimeType mimeType;
152
153                private final ResolvableType targetType;
154
155                private final BiConsumer<T, Map<String, Object>> accumulator;
156
157
158                EntryExtractor(Decoder<T> decoder, MimeType mimeType, ResolvableType targetType,
159                                BiConsumer<T, Map<String, Object>> accumulator) {
160
161                        this.decoder = decoder;
162                        this.mimeType = mimeType;
163                        this.targetType = targetType;
164                        this.accumulator = accumulator;
165                }
166
167
168                public void extract(ByteBuf content, Map<String, Object> result) {
169                        NettyDataBuffer dataBuffer = bufferFactory.wrap(content.retain());
170                        T value = this.decoder.decode(dataBuffer, this.targetType, this.mimeType, Collections.emptyMap());
171                        this.accumulator.accept(value, result);
172                }
173
174
175                @Override
176                public String toString() {
177                        return "mimeType=" + this.mimeType + ", targetType=" + this.targetType;
178                }
179        }
180
181}