001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.codec.multipart; 018 019import java.io.IOException; 020import java.nio.channels.Channels; 021import java.nio.channels.FileChannel; 022import java.nio.channels.ReadableByteChannel; 023import java.nio.charset.Charset; 024import java.nio.charset.StandardCharsets; 025import java.nio.file.OpenOption; 026import java.nio.file.Path; 027import java.nio.file.StandardOpenOption; 028import java.util.List; 029import java.util.Map; 030import java.util.Optional; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.function.Consumer; 033 034import org.synchronoss.cloud.nio.multipart.DefaultPartBodyStreamStorageFactory; 035import org.synchronoss.cloud.nio.multipart.Multipart; 036import org.synchronoss.cloud.nio.multipart.MultipartContext; 037import org.synchronoss.cloud.nio.multipart.MultipartUtils; 038import org.synchronoss.cloud.nio.multipart.NioMultipartParser; 039import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener; 040import org.synchronoss.cloud.nio.multipart.PartBodyStreamStorageFactory; 041import org.synchronoss.cloud.nio.stream.storage.StreamStorage; 042import reactor.core.publisher.BaseSubscriber; 043import reactor.core.publisher.Flux; 044import reactor.core.publisher.FluxSink; 045import reactor.core.publisher.Mono; 046import reactor.core.publisher.SignalType; 047 048import org.springframework.core.ResolvableType; 049import org.springframework.core.codec.DecodingException; 050import org.springframework.core.codec.Hints; 051import org.springframework.core.io.buffer.DataBuffer; 052import org.springframework.core.io.buffer.DataBufferFactory; 053import org.springframework.core.io.buffer.DataBufferLimitException; 054import org.springframework.core.io.buffer.DataBufferUtils; 055import org.springframework.core.io.buffer.DefaultDataBufferFactory; 056import org.springframework.core.log.LogFormatUtils; 057import org.springframework.http.HttpHeaders; 058import org.springframework.http.MediaType; 059import org.springframework.http.ReactiveHttpInputMessage; 060import org.springframework.http.codec.HttpMessageReader; 061import org.springframework.http.codec.LoggingCodecSupport; 062import org.springframework.lang.Nullable; 063import org.springframework.util.Assert; 064 065/** 066 * {@code HttpMessageReader} for parsing {@code "multipart/form-data"} requests 067 * to a stream of {@link Part}'s using the Synchronoss NIO Multipart library. 068 * 069 * <p>This reader can be provided to {@link MultipartHttpMessageReader} in order 070 * to aggregate all parts into a Map. 071 * 072 * @author Sebastien Deleuze 073 * @author Rossen Stoyanchev 074 * @author Arjen Poutsma 075 * @author Brian Clozel 076 * @since 5.0 077 * @see <a href="https://github.com/synchronoss/nio-multipart">Synchronoss NIO Multipart</a> 078 * @see MultipartHttpMessageReader 079 */ 080public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implements HttpMessageReader<Part> { 081 082 // Static DataBufferFactory to copy from FileInputStream or wrap bytes[]. 083 private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 084 085 086 private int maxInMemorySize = 256 * 1024; 087 088 private long maxDiskUsagePerPart = -1; 089 090 private int maxParts = -1; 091 092 093 /** 094 * Configure the maximum amount of memory that is allowed to use per part. 095 * When the limit is exceeded: 096 * <ul> 097 * <li>file parts are written to a temporary file. 098 * <li>non-file parts are rejected with {@link DataBufferLimitException}. 099 * </ul> 100 * <p>By default this is set to 256K. 101 * @param byteCount the in-memory limit in bytes; if set to -1 this limit is 102 * not enforced, and all parts may be written to disk and are limited only 103 * by the {@link #setMaxDiskUsagePerPart(long) maxDiskUsagePerPart} property. 104 * @since 5.1.11 105 */ 106 public void setMaxInMemorySize(int byteCount) { 107 this.maxInMemorySize = byteCount; 108 } 109 110 /** 111 * Get the {@link #setMaxInMemorySize configured} maximum in-memory size. 112 * @since 5.1.11 113 */ 114 public int getMaxInMemorySize() { 115 return this.maxInMemorySize; 116 } 117 118 /** 119 * Configure the maximum amount of disk space allowed for file parts. 120 * <p>By default this is set to -1. 121 * @param maxDiskUsagePerPart the disk limit in bytes, or -1 for unlimited 122 * @since 5.1.11 123 */ 124 public void setMaxDiskUsagePerPart(long maxDiskUsagePerPart) { 125 this.maxDiskUsagePerPart = maxDiskUsagePerPart; 126 } 127 128 /** 129 * Get the {@link #setMaxDiskUsagePerPart configured} maximum disk usage. 130 * @since 5.1.11 131 */ 132 public long getMaxDiskUsagePerPart() { 133 return this.maxDiskUsagePerPart; 134 } 135 136 /** 137 * Specify the maximum number of parts allowed in a given multipart request. 138 * @since 5.1.11 139 */ 140 public void setMaxParts(int maxParts) { 141 this.maxParts = maxParts; 142 } 143 144 /** 145 * Return the {@link #setMaxParts configured} limit on the number of parts. 146 * @since 5.1.11 147 */ 148 public int getMaxParts() { 149 return this.maxParts; 150 } 151 152 153 @Override 154 public List<MediaType> getReadableMediaTypes() { 155 return MultipartHttpMessageReader.MIME_TYPES; 156 } 157 158 @Override 159 public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) { 160 if (Part.class.equals(elementType.toClass())) { 161 if (mediaType == null) { 162 return true; 163 } 164 for (MediaType supportedMediaType : getReadableMediaTypes()) { 165 if (supportedMediaType.isCompatibleWith(mediaType)) { 166 return true; 167 } 168 } 169 } 170 return false; 171 } 172 173 @Override 174 public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { 175 return Flux.create(new SynchronossPartGenerator(message)) 176 .doOnNext(part -> { 177 if (!Hints.isLoggingSuppressed(hints)) { 178 LogFormatUtils.traceDebug(logger, traceOn -> Hints.getLogPrefix(hints) + "Parsed " + 179 (isEnableLoggingRequestDetails() ? 180 LogFormatUtils.formatValue(part, !traceOn) : 181 "parts '" + part.name() + "' (content masked)")); 182 } 183 }); 184 } 185 186 @Override 187 public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { 188 return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part")); 189 } 190 191 192 /** 193 * Subscribe to the input stream and feed the Synchronoss parser. Then listen 194 * for parser output, creating parts, and pushing them into the FluxSink. 195 */ 196 private class SynchronossPartGenerator extends BaseSubscriber<DataBuffer> implements Consumer<FluxSink<Part>> { 197 198 private final ReactiveHttpInputMessage inputMessage; 199 200 private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory(); 201 202 @Nullable 203 private NioMultipartParserListener listener; 204 205 @Nullable 206 private NioMultipartParser parser; 207 208 public SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage) { 209 this.inputMessage = inputMessage; 210 } 211 212 @Override 213 public void accept(FluxSink<Part> sink) { 214 HttpHeaders headers = this.inputMessage.getHeaders(); 215 MediaType mediaType = headers.getContentType(); 216 Assert.state(mediaType != null, "No content type set"); 217 218 int length = getContentLength(headers); 219 Charset charset = Optional.ofNullable(mediaType.getCharset()).orElse(StandardCharsets.UTF_8); 220 MultipartContext context = new MultipartContext(mediaType.toString(), length, charset.name()); 221 222 this.listener = new FluxSinkAdapterListener(sink, context, this.storageFactory); 223 224 this.parser = Multipart 225 .multipart(context) 226 .usePartBodyStreamStorageFactory(this.storageFactory) 227 .forNIO(this.listener); 228 229 this.inputMessage.getBody().subscribe(this); 230 } 231 232 @Override 233 protected void hookOnNext(DataBuffer buffer) { 234 Assert.state(this.parser != null && this.listener != null, "Not initialized yet"); 235 236 int size = buffer.readableByteCount(); 237 this.storageFactory.increaseByteCount(size); 238 byte[] resultBytes = new byte[size]; 239 buffer.read(resultBytes); 240 241 try { 242 this.parser.write(resultBytes); 243 } 244 catch (IOException ex) { 245 cancel(); 246 int index = this.storageFactory.getCurrentPartIndex(); 247 this.listener.onError("Parser error for part [" + index + "]", ex); 248 } 249 finally { 250 DataBufferUtils.release(buffer); 251 } 252 } 253 254 @Override 255 protected void hookOnError(Throwable ex) { 256 if (this.listener != null) { 257 int index = this.storageFactory.getCurrentPartIndex(); 258 this.listener.onError("Failure while parsing part[" + index + "]", ex); 259 } 260 } 261 262 @Override 263 protected void hookOnComplete() { 264 if (this.listener != null) { 265 this.listener.onAllPartsFinished(); 266 } 267 } 268 269 @Override 270 protected void hookFinally(SignalType type) { 271 try { 272 if (this.parser != null) { 273 this.parser.close(); 274 } 275 } 276 catch (IOException ex) { 277 // ignore 278 } 279 } 280 281 private int getContentLength(HttpHeaders headers) { 282 // Until this is fixed https://github.com/synchronoss/nio-multipart/issues/10 283 long length = headers.getContentLength(); 284 return (int) length == length ? (int) length : -1; 285 } 286 } 287 288 289 private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory { 290 291 private final PartBodyStreamStorageFactory storageFactory = (maxInMemorySize > 0 ? 292 new DefaultPartBodyStreamStorageFactory(maxInMemorySize) : 293 new DefaultPartBodyStreamStorageFactory()); 294 295 private int index = 1; 296 297 private boolean isFilePart; 298 299 private long partSize; 300 301 public int getCurrentPartIndex() { 302 return this.index; 303 } 304 305 @Override 306 public StreamStorage newStreamStorageForPartBody(Map<String, List<String>> headers, int index) { 307 this.index = index; 308 this.isFilePart = (MultipartUtils.getFileName(headers) != null); 309 this.partSize = 0; 310 if (maxParts > 0 && index > maxParts) { 311 throw new DecodingException("Too many parts (" + index + " allowed)"); 312 } 313 return this.storageFactory.newStreamStorageForPartBody(headers, index); 314 } 315 316 public void increaseByteCount(long byteCount) { 317 this.partSize += byteCount; 318 if (maxInMemorySize > 0 && !this.isFilePart && this.partSize >= maxInMemorySize) { 319 throw new DataBufferLimitException("Part[" + this.index + "] " + 320 "exceeded the in-memory limit of " + maxInMemorySize + " bytes"); 321 } 322 if (maxDiskUsagePerPart > 0 && this.isFilePart && this.partSize > maxDiskUsagePerPart) { 323 throw new DecodingException("Part[" + this.index + "] " + 324 "exceeded the disk usage limit of " + maxDiskUsagePerPart + " bytes"); 325 } 326 } 327 328 public void partFinished() { 329 this.index++; 330 this.isFilePart = false; 331 this.partSize = 0; 332 } 333 } 334 335 336 /** 337 * Listen for parser output and adapt to {@code Flux<Sink<Part>>}. 338 */ 339 private static class FluxSinkAdapterListener implements NioMultipartParserListener { 340 341 private final FluxSink<Part> sink; 342 343 private final MultipartContext context; 344 345 private final LimitedPartBodyStreamStorageFactory storageFactory; 346 347 private final AtomicInteger terminated = new AtomicInteger(0); 348 349 FluxSinkAdapterListener( 350 FluxSink<Part> sink, MultipartContext context, LimitedPartBodyStreamStorageFactory factory) { 351 352 this.sink = sink; 353 this.context = context; 354 this.storageFactory = factory; 355 } 356 357 @Override 358 public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) { 359 HttpHeaders httpHeaders = new HttpHeaders(); 360 httpHeaders.putAll(headers); 361 this.storageFactory.partFinished(); 362 this.sink.next(createPart(storage, httpHeaders)); 363 } 364 365 private Part createPart(StreamStorage storage, HttpHeaders httpHeaders) { 366 String filename = MultipartUtils.getFileName(httpHeaders); 367 if (filename != null) { 368 return new SynchronossFilePart(httpHeaders, filename, storage); 369 } 370 else if (MultipartUtils.isFormField(httpHeaders, this.context)) { 371 String value = MultipartUtils.readFormParameterValue(storage, httpHeaders); 372 return new SynchronossFormFieldPart(httpHeaders, value); 373 } 374 else { 375 return new SynchronossPart(httpHeaders, storage); 376 } 377 } 378 379 @Override 380 public void onError(String message, Throwable cause) { 381 if (this.terminated.getAndIncrement() == 0) { 382 this.sink.error(new DecodingException(message, cause)); 383 } 384 } 385 386 @Override 387 public void onAllPartsFinished() { 388 if (this.terminated.getAndIncrement() == 0) { 389 this.sink.complete(); 390 } 391 } 392 393 @Override 394 public void onNestedPartStarted(Map<String, List<String>> headersFromParentPart) { 395 } 396 397 @Override 398 public void onNestedPartFinished() { 399 } 400 } 401 402 403 private abstract static class AbstractSynchronossPart implements Part { 404 405 private final String name; 406 407 private final HttpHeaders headers; 408 409 AbstractSynchronossPart(HttpHeaders headers) { 410 Assert.notNull(headers, "HttpHeaders is required"); 411 this.name = MultipartUtils.getFieldName(headers); 412 this.headers = headers; 413 } 414 415 @Override 416 public String name() { 417 return this.name; 418 } 419 420 @Override 421 public HttpHeaders headers() { 422 return this.headers; 423 } 424 425 @Override 426 public String toString() { 427 return "Part '" + this.name + "', headers=" + this.headers; 428 } 429 } 430 431 432 private static class SynchronossPart extends AbstractSynchronossPart { 433 434 private final StreamStorage storage; 435 436 SynchronossPart(HttpHeaders headers, StreamStorage storage) { 437 super(headers); 438 Assert.notNull(storage, "StreamStorage is required"); 439 this.storage = storage; 440 } 441 442 @Override 443 public Flux<DataBuffer> content() { 444 return DataBufferUtils.readInputStream(getStorage()::getInputStream, bufferFactory, 4096); 445 } 446 447 protected StreamStorage getStorage() { 448 return this.storage; 449 } 450 } 451 452 453 private static class SynchronossFilePart extends SynchronossPart implements FilePart { 454 455 private static final OpenOption[] FILE_CHANNEL_OPTIONS = 456 {StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE}; 457 458 private final String filename; 459 460 SynchronossFilePart(HttpHeaders headers, String filename, StreamStorage storage) { 461 super(headers, storage); 462 this.filename = filename; 463 } 464 465 @Override 466 public String filename() { 467 return this.filename; 468 } 469 470 @Override 471 public Mono<Void> transferTo(Path dest) { 472 ReadableByteChannel input = null; 473 FileChannel output = null; 474 try { 475 input = Channels.newChannel(getStorage().getInputStream()); 476 output = FileChannel.open(dest, FILE_CHANNEL_OPTIONS); 477 long size = (input instanceof FileChannel ? ((FileChannel) input).size() : Long.MAX_VALUE); 478 long totalWritten = 0; 479 while (totalWritten < size) { 480 long written = output.transferFrom(input, totalWritten, size - totalWritten); 481 if (written <= 0) { 482 break; 483 } 484 totalWritten += written; 485 } 486 } 487 catch (IOException ex) { 488 return Mono.error(ex); 489 } 490 finally { 491 if (input != null) { 492 try { 493 input.close(); 494 } 495 catch (IOException ignored) { 496 } 497 } 498 if (output != null) { 499 try { 500 output.close(); 501 } 502 catch (IOException ignored) { 503 } 504 } 505 } 506 return Mono.empty(); 507 } 508 509 @Override 510 public String toString() { 511 return "Part '" + name() + "', filename='" + this.filename + "'"; 512 } 513 } 514 515 516 private static class SynchronossFormFieldPart extends AbstractSynchronossPart implements FormFieldPart { 517 518 private final String content; 519 520 SynchronossFormFieldPart(HttpHeaders headers, String content) { 521 super(headers); 522 this.content = content; 523 } 524 525 @Override 526 public String value() { 527 return this.content; 528 } 529 530 @Override 531 public Flux<DataBuffer> content() { 532 byte[] bytes = this.content.getBytes(getCharset()); 533 return Flux.just(bufferFactory.wrap(bytes)); 534 } 535 536 private Charset getCharset() { 537 String name = MultipartUtils.getCharEncoding(headers()); 538 return (name != null ? Charset.forName(name) : StandardCharsets.UTF_8); 539 } 540 541 @Override 542 public String toString() { 543 return "Part '" + name() + "=" + this.content + "'"; 544 } 545 } 546 547}