001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.codec.multipart;
018
019import java.io.IOException;
020import java.nio.channels.Channels;
021import java.nio.channels.FileChannel;
022import java.nio.channels.ReadableByteChannel;
023import java.nio.charset.Charset;
024import java.nio.charset.StandardCharsets;
025import java.nio.file.OpenOption;
026import java.nio.file.Path;
027import java.nio.file.StandardOpenOption;
028import java.util.List;
029import java.util.Map;
030import java.util.Optional;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.function.Consumer;
033
034import org.synchronoss.cloud.nio.multipart.DefaultPartBodyStreamStorageFactory;
035import org.synchronoss.cloud.nio.multipart.Multipart;
036import org.synchronoss.cloud.nio.multipart.MultipartContext;
037import org.synchronoss.cloud.nio.multipart.MultipartUtils;
038import org.synchronoss.cloud.nio.multipart.NioMultipartParser;
039import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener;
040import org.synchronoss.cloud.nio.multipart.PartBodyStreamStorageFactory;
041import org.synchronoss.cloud.nio.stream.storage.StreamStorage;
042import reactor.core.publisher.BaseSubscriber;
043import reactor.core.publisher.Flux;
044import reactor.core.publisher.FluxSink;
045import reactor.core.publisher.Mono;
046import reactor.core.publisher.SignalType;
047
048import org.springframework.core.ResolvableType;
049import org.springframework.core.codec.DecodingException;
050import org.springframework.core.codec.Hints;
051import org.springframework.core.io.buffer.DataBuffer;
052import org.springframework.core.io.buffer.DataBufferFactory;
053import org.springframework.core.io.buffer.DataBufferLimitException;
054import org.springframework.core.io.buffer.DataBufferUtils;
055import org.springframework.core.io.buffer.DefaultDataBufferFactory;
056import org.springframework.core.log.LogFormatUtils;
057import org.springframework.http.HttpHeaders;
058import org.springframework.http.MediaType;
059import org.springframework.http.ReactiveHttpInputMessage;
060import org.springframework.http.codec.HttpMessageReader;
061import org.springframework.http.codec.LoggingCodecSupport;
062import org.springframework.lang.Nullable;
063import org.springframework.util.Assert;
064
065/**
066 * {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests
067 * to a stream of {@link Part}'s using the Synchronoss NIO Multipart library.
068 *
069 * <p>This reader can be provided to {@link MultipartHttpMessageReader} in order
070 * to aggregate all parts into a Map.
071 *
072 * @author Sebastien Deleuze
073 * @author Rossen Stoyanchev
074 * @author Arjen Poutsma
075 * @author Brian Clozel
076 * @since 5.0
077 * @see <a href="https://github.com/synchronoss/nio-multipart">Synchronoss NIO Multipart</a>
078 * @see MultipartHttpMessageReader
079 */
080public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implements HttpMessageReader<Part> {
081
082        // Static DataBufferFactory to copy from FileInputStream or wrap bytes[].
083        private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
084
085
086        private int maxInMemorySize = 256 * 1024;
087
088        private long maxDiskUsagePerPart = -1;
089
090        private int maxParts = -1;
091
092
093        /**
094         * Configure the maximum amount of memory that is allowed to use per part.
095         * When the limit is exceeded:
096         * <ul>
097         * <li>file parts are written to a temporary file.
098         * <li>non-file parts are rejected with {@link DataBufferLimitException}.
099         * </ul>
100         * <p>By default this is set to 256K.
101         * @param byteCount the in-memory limit in bytes; if set to -1 this limit is
102         * not enforced, and all parts may be written to disk and are limited only
103         * by the {@link #setMaxDiskUsagePerPart(long) maxDiskUsagePerPart} property.
104         * @since 5.1.11
105         */
106        public void setMaxInMemorySize(int byteCount) {
107                this.maxInMemorySize = byteCount;
108        }
109
110        /**
111         * Get the {@link #setMaxInMemorySize configured} maximum in-memory size.
112         * @since 5.1.11
113         */
114        public int getMaxInMemorySize() {
115                return this.maxInMemorySize;
116        }
117
118        /**
119         * Configure the maximum amount of disk space allowed for file parts.
120         * <p>By default this is set to -1.
121         * @param maxDiskUsagePerPart the disk limit in bytes, or -1 for unlimited
122         * @since 5.1.11
123         */
124        public void setMaxDiskUsagePerPart(long maxDiskUsagePerPart) {
125                this.maxDiskUsagePerPart = maxDiskUsagePerPart;
126        }
127
128        /**
129         * Get the {@link #setMaxDiskUsagePerPart configured} maximum disk usage.
130         * @since 5.1.11
131         */
132        public long getMaxDiskUsagePerPart() {
133                return this.maxDiskUsagePerPart;
134        }
135
136        /**
137         * Specify the maximum number of parts allowed in a given multipart request.
138         * @since 5.1.11
139         */
140        public void setMaxParts(int maxParts) {
141                this.maxParts = maxParts;
142        }
143
144        /**
145         * Return the {@link #setMaxParts configured} limit on the number of parts.
146         * @since 5.1.11
147         */
148        public int getMaxParts() {
149                return this.maxParts;
150        }
151
152
153        @Override
154        public List<MediaType> getReadableMediaTypes() {
155                return MultipartHttpMessageReader.MIME_TYPES;
156        }
157
158        @Override
159        public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
160                if (Part.class.equals(elementType.toClass())) {
161                        if (mediaType == null) {
162                                return true;
163                        }
164                        for (MediaType supportedMediaType : getReadableMediaTypes()) {
165                                if (supportedMediaType.isCompatibleWith(mediaType)) {
166                                        return true;
167                                }
168                        }
169                }
170                return false;
171        }
172
173        @Override
174        public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
175                return Flux.create(new SynchronossPartGenerator(message))
176                                .doOnNext(part -> {
177                                        if (!Hints.isLoggingSuppressed(hints)) {
178                                                LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " +
179                                                                (isEnableLoggingRequestDetails() ?
180                                                                                LogFormatUtils.formatValue(part, !traceOn) :
181                                                                                "parts '" + part.name() + "' (content masked)"));
182                                        }
183                                });
184        }
185
186        @Override
187        public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
188                return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part"));
189        }
190
191
192        /**
193         * Subscribe to the input stream and feed the Synchronoss parser. Then listen
194         * for parser output, creating parts, and pushing them into the FluxSink.
195         */
196        private class SynchronossPartGenerator extends BaseSubscriber<DataBuffer> implements Consumer<FluxSink<Part>> {
197
198                private final ReactiveHttpInputMessage inputMessage;
199
200                private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory();
201
202                @Nullable
203                private NioMultipartParserListener listener;
204
205                @Nullable
206                private NioMultipartParser parser;
207
208                public SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage) {
209                        this.inputMessage = inputMessage;
210                }
211
212                @Override
213                public void accept(FluxSink<Part> sink) {
214                        HttpHeaders headers = this.inputMessage.getHeaders();
215                        MediaType mediaType = headers.getContentType();
216                        Assert.state(mediaType != null, "No content type set");
217
218                        int length = getContentLength(headers);
219                        Charset charset = Optional.ofNullable(mediaType.getCharset()).orElse(StandardCharsets.UTF_8);
220                        MultipartContext context = new MultipartContext(mediaType.toString(), length, charset.name());
221
222                        this.listener = new FluxSinkAdapterListener(sink, context, this.storageFactory);
223
224                        this.parser = Multipart
225                                        .multipart(context)
226                                        .usePartBodyStreamStorageFactory(this.storageFactory)
227                                        .forNIO(this.listener);
228
229                        this.inputMessage.getBody().subscribe(this);
230                }
231
232                @Override
233                protected void hookOnNext(DataBuffer buffer) {
234                        Assert.state(this.parser != null && this.listener != null, "Not initialized yet");
235
236                        int size = buffer.readableByteCount();
237                        this.storageFactory.increaseByteCount(size);
238                        byte[] resultBytes = new byte[size];
239                        buffer.read(resultBytes);
240
241                        try {
242                                this.parser.write(resultBytes);
243                        }
244                        catch (IOException ex) {
245                                cancel();
246                                int index = this.storageFactory.getCurrentPartIndex();
247                                this.listener.onError("Parser error for part [" + index + "]", ex);
248                        }
249                        finally {
250                                DataBufferUtils.release(buffer);
251                        }
252                }
253
254                @Override
255                protected void hookOnError(Throwable ex) {
256                        if (this.listener != null) {
257                                int index = this.storageFactory.getCurrentPartIndex();
258                                this.listener.onError("Failure while parsing part[" + index + "]", ex);
259                        }
260                }
261
262                @Override
263                protected void hookOnComplete() {
264                        if (this.listener != null) {
265                                this.listener.onAllPartsFinished();
266                        }
267                }
268
269                @Override
270                protected void hookFinally(SignalType type) {
271                        try {
272                                if (this.parser != null) {
273                                        this.parser.close();
274                                }
275                        }
276                        catch (IOException ex) {
277                                // ignore
278                        }
279                }
280
281                private int getContentLength(HttpHeaders headers) {
282                        // Until this is fixed https://github.com/synchronoss/nio-multipart/issues/10
283                        long length = headers.getContentLength();
284                        return (int) length == length ? (int) length : -1;
285                }
286        }
287
288
289        private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory {
290
291                private final PartBodyStreamStorageFactory storageFactory = (maxInMemorySize > 0 ?
292                                new DefaultPartBodyStreamStorageFactory(maxInMemorySize) :
293                                new DefaultPartBodyStreamStorageFactory());
294
295                private int index = 1;
296
297                private boolean isFilePart;
298
299                private long partSize;
300
301                public int getCurrentPartIndex() {
302                        return this.index;
303                }
304
305                @Override
306                public StreamStorage newStreamStorageForPartBody(Map<String, List<String>> headers, int index) {
307                        this.index = index;
308                        this.isFilePart = (MultipartUtils.getFileName(headers) != null);
309                        this.partSize = 0;
310                        if (maxParts > 0 && index > maxParts) {
311                                throw new DecodingException("Too many parts (" + index + " allowed)");
312                        }
313                        return this.storageFactory.newStreamStorageForPartBody(headers, index);
314                }
315
316                public void increaseByteCount(long byteCount) {
317                        this.partSize += byteCount;
318                        if (maxInMemorySize > 0 && !this.isFilePart && this.partSize >= maxInMemorySize) {
319                                throw new DataBufferLimitException("Part[" + this.index + "] " +
320                                                "exceeded the in-memory limit of " + maxInMemorySize + " bytes");
321                        }
322                        if (maxDiskUsagePerPart > 0 && this.isFilePart && this.partSize > maxDiskUsagePerPart) {
323                                throw new DecodingException("Part[" + this.index + "] " +
324                                                "exceeded the disk usage limit of " + maxDiskUsagePerPart + " bytes");
325                        }
326                }
327
328                public void partFinished() {
329                        this.index++;
330                        this.isFilePart = false;
331                        this.partSize = 0;
332                }
333        }
334
335
336        /**
337         * Listen for parser output and adapt to {@code Flux<Sink<Part>>}.
338         */
339        private static class FluxSinkAdapterListener implements NioMultipartParserListener {
340
341                private final FluxSink<Part> sink;
342
343                private final MultipartContext context;
344
345                private final LimitedPartBodyStreamStorageFactory storageFactory;
346
347                private final AtomicInteger terminated = new AtomicInteger(0);
348
349                FluxSinkAdapterListener(
350                                FluxSink<Part> sink, MultipartContext context, LimitedPartBodyStreamStorageFactory factory) {
351
352                        this.sink = sink;
353                        this.context = context;
354                        this.storageFactory = factory;
355                }
356
357                @Override
358                public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) {
359                        HttpHeaders httpHeaders = new HttpHeaders();
360                        httpHeaders.putAll(headers);
361                        this.storageFactory.partFinished();
362                        this.sink.next(createPart(storage, httpHeaders));
363                }
364
365                private Part createPart(StreamStorage storage, HttpHeaders httpHeaders) {
366                        String filename = MultipartUtils.getFileName(httpHeaders);
367                        if (filename != null) {
368                                return new SynchronossFilePart(httpHeaders, filename, storage);
369                        }
370                        else if (MultipartUtils.isFormField(httpHeaders, this.context)) {
371                                String value = MultipartUtils.readFormParameterValue(storage, httpHeaders);
372                                return new SynchronossFormFieldPart(httpHeaders, value);
373                        }
374                        else {
375                                return new SynchronossPart(httpHeaders, storage);
376                        }
377                }
378
379                @Override
380                public void onError(String message, Throwable cause) {
381                        if (this.terminated.getAndIncrement() == 0) {
382                                this.sink.error(new DecodingException(message, cause));
383                        }
384                }
385
386                @Override
387                public void onAllPartsFinished() {
388                        if (this.terminated.getAndIncrement() == 0) {
389                                this.sink.complete();
390                        }
391                }
392
393                @Override
394                public void onNestedPartStarted(Map<String, List<String>> headersFromParentPart) {
395                }
396
397                @Override
398                public void onNestedPartFinished() {
399                }
400        }
401
402
403        private abstract static class AbstractSynchronossPart implements Part {
404
405                private final String name;
406
407                private final HttpHeaders headers;
408
409                AbstractSynchronossPart(HttpHeaders headers) {
410                        Assert.notNull(headers, "HttpHeaders is required");
411                        this.name = MultipartUtils.getFieldName(headers);
412                        this.headers = headers;
413                }
414
415                @Override
416                public String name() {
417                        return this.name;
418                }
419
420                @Override
421                public HttpHeaders headers() {
422                        return this.headers;
423                }
424
425                @Override
426                public String toString() {
427                        return "Part '" + this.name + "', headers=" + this.headers;
428                }
429        }
430
431
432        private static class SynchronossPart extends AbstractSynchronossPart {
433
434                private final StreamStorage storage;
435
436                SynchronossPart(HttpHeaders headers, StreamStorage storage) {
437                        super(headers);
438                        Assert.notNull(storage, "StreamStorage is required");
439                        this.storage = storage;
440                }
441
442                @Override
443                public Flux<DataBuffer> content() {
444                        return DataBufferUtils.readInputStream(getStorage()::getInputStream, bufferFactory, 4096);
445                }
446
447                protected StreamStorage getStorage() {
448                        return this.storage;
449                }
450        }
451
452
453        private static class SynchronossFilePart extends SynchronossPart implements FilePart {
454
455                private static final OpenOption[] FILE_CHANNEL_OPTIONS =
456                                {StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE};
457
458                private final String filename;
459
460                SynchronossFilePart(HttpHeaders headers, String filename, StreamStorage storage) {
461                        super(headers, storage);
462                        this.filename = filename;
463                }
464
465                @Override
466                public String filename() {
467                        return this.filename;
468                }
469
470                @Override
471                public Mono<Void> transferTo(Path dest) {
472                        ReadableByteChannel input = null;
473                        FileChannel output = null;
474                        try {
475                                input = Channels.newChannel(getStorage().getInputStream());
476                                output = FileChannel.open(dest, FILE_CHANNEL_OPTIONS);
477                                long size = (input instanceof FileChannel ? ((FileChannel) input).size() : Long.MAX_VALUE);
478                                long totalWritten = 0;
479                                while (totalWritten < size) {
480                                        long written = output.transferFrom(input, totalWritten, size - totalWritten);
481                                        if (written <= 0) {
482                                                break;
483                                        }
484                                        totalWritten += written;
485                                }
486                        }
487                        catch (IOException ex) {
488                                return Mono.error(ex);
489                        }
490                        finally {
491                                if (input != null) {
492                                        try {
493                                                input.close();
494                                        }
495                                        catch (IOException ignored) {
496                                        }
497                                }
498                                if (output != null) {
499                                        try {
500                                                output.close();
501                                        }
502                                        catch (IOException ignored) {
503                                        }
504                                }
505                        }
506                        return Mono.empty();
507                }
508
509                @Override
510                public String toString() {
511                        return "Part '" + name() + "', filename='" + this.filename + "'";
512                }
513        }
514
515
516        private static class SynchronossFormFieldPart extends AbstractSynchronossPart implements FormFieldPart {
517
518                private final String content;
519
520                SynchronossFormFieldPart(HttpHeaders headers, String content) {
521                        super(headers);
522                        this.content = content;
523                }
524
525                @Override
526                public String value() {
527                        return this.content;
528                }
529
530                @Override
531                public Flux<DataBuffer> content() {
532                        byte[] bytes = this.content.getBytes(getCharset());
533                        return Flux.just(bufferFactory.wrap(bytes));
534                }
535
536                private Charset getCharset() {
537                        String name = MultipartUtils.getCharEncoding(headers());
538                        return (name != null ? Charset.forName(name) : StandardCharsets.UTF_8);
539                }
540
541                @Override
542                public String toString() {
543                        return "Part '" + name() + "=" + this.content + "'";
544                }
545        }
546
547}