001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.rsocket; 018 019import java.nio.ByteBuffer; 020 021import io.netty.buffer.ByteBuf; 022import io.netty.buffer.Unpooled; 023import io.rsocket.Payload; 024import io.rsocket.util.ByteBufPayload; 025import io.rsocket.util.DefaultPayload; 026 027import org.springframework.core.io.buffer.DataBuffer; 028import org.springframework.core.io.buffer.DataBufferFactory; 029import org.springframework.core.io.buffer.DefaultDataBuffer; 030import org.springframework.core.io.buffer.NettyDataBuffer; 031import org.springframework.core.io.buffer.NettyDataBufferFactory; 032 033/** 034 * Static utility methods to create {@link Payload} from {@link DataBuffer}s 035 * and vice versa. 036 * 037 * @author Rossen Stoyanchev 038 * @since 5.2 039 */ 040public abstract class PayloadUtils { 041 042 /** 043 * Use this method to slice, retain and wrap the data portion of the 044 * {@code Payload}, and also to release the {@code Payload}. This assumes 045 * the Payload metadata has been read by now and ensures downstream code 046 * need only be aware of {@code DataBuffer}s. 047 * @param payload the payload to process 048 * @param bufferFactory the DataBufferFactory to wrap with 049 * @return the created {@code DataBuffer} instance 050 */ 051 public static DataBuffer retainDataAndReleasePayload(Payload payload, DataBufferFactory bufferFactory) { 052 try { 053 if (bufferFactory instanceof NettyDataBufferFactory) { 054 ByteBuf byteBuf = payload.sliceData().retain(); 055 return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf); 056 } 057 else { 058 return bufferFactory.wrap(payload.getData()); 059 } 060 } 061 finally { 062 if (payload.refCnt() > 0) { 063 payload.release(); 064 } 065 } 066 } 067 068 /** 069 * Create a Payload from the given metadata and data. 070 * <p>If at least one is {@link NettyDataBuffer} then {@link ByteBufPayload} 071 * is created with either obtaining the underlying native {@link ByteBuf} 072 * or using {@link Unpooled#wrappedBuffer(ByteBuffer...)} if necessary. 073 * Otherwise, if both are {@link DefaultDataBuffer}, then 074 * {@link DefaultPayload} is created. 075 * @param data the data part for the payload 076 * @param metadata the metadata part for the payload 077 * @return the created payload 078 */ 079 public static Payload createPayload(DataBuffer data, DataBuffer metadata) { 080 return data instanceof NettyDataBuffer || metadata instanceof NettyDataBuffer ? 081 ByteBufPayload.create(asByteBuf(data), asByteBuf(metadata)) : 082 DefaultPayload.create(asByteBuffer(data), asByteBuffer(metadata)); 083 } 084 085 /** 086 * Create a Payload with data only. The created payload is 087 * {@link ByteBufPayload} if the input is {@link NettyDataBuffer} or 088 * otherwise it is {@link DefaultPayload}. 089 * @param data the data part for the payload 090 * @return created payload 091 */ 092 public static Payload createPayload(DataBuffer data) { 093 return data instanceof NettyDataBuffer ? 094 ByteBufPayload.create(asByteBuf(data)) : DefaultPayload.create(asByteBuffer(data)); 095 } 096 097 098 static ByteBuf asByteBuf(DataBuffer buffer) { 099 return buffer instanceof NettyDataBuffer ? 100 ((NettyDataBuffer) buffer).getNativeBuffer() : Unpooled.wrappedBuffer(buffer.asByteBuffer()); 101 } 102 103 private static ByteBuffer asByteBuffer(DataBuffer buffer) { 104 return buffer instanceof DefaultDataBuffer ? 105 ((DefaultDataBuffer) buffer).getNativeBuffer() : buffer.asByteBuffer(); 106 } 107 108}