From d8cccbe4e69e5ff358b0f75321fc7589e693dff9 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Fri, 25 Mar 2022 11:36:42 +0200 Subject: [PATCH] adds routing example with TaggingMetadata and CompositeMetadata (#1021) --- .../routing/CompositeMetadataExample.java | 102 ++++++++++++++++++ .../routing/RoutingMetadataExample.java | 83 ++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/CompositeMetadataExample.java create mode 100644 rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/RoutingMetadataExample.java diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/CompositeMetadataExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/CompositeMetadataExample.java new file mode 100644 index 000000000..a0a02a946 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/CompositeMetadataExample.java @@ -0,0 +1,102 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.examples.transport.tcp.metadata.routing; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.metadata.CompositeMetadata; +import io.rsocket.metadata.CompositeMetadataCodec; +import io.rsocket.metadata.RoutingMetadata; +import io.rsocket.metadata.TaggingMetadataCodec; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.ByteBufPayload; +import java.util.Collections; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +public class CompositeMetadataExample { + static final Logger logger = LoggerFactory.getLogger(CompositeMetadataExample.class); + + public static void main(String[] args) { + RSocketServer.create( + SocketAcceptor.forRequestResponse( + payload -> { + final String route = decodeRoute(payload.sliceMetadata()); + + logger.info("Received RequestResponse[route={}]", route); + + payload.release(); + + if ("my.test.route".equals(route)) { + return Mono.just(ByteBufPayload.create("Hello From My Test Route")); + } + + return Mono.error(new IllegalArgumentException("Route " + route + " not found")); + })) + .bindNow(TcpServerTransport.create("localhost", 7000)); + + RSocket socket = + RSocketConnector.create() + // here we specify that every metadata payload will be encoded using + // CompositeMetadata layout as specified in the following subspec + // https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md + .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()) + .connect(TcpClientTransport.create("localhost", 7000)) + .block(); + + final ByteBuf routeMetadata = + TaggingMetadataCodec.createTaggingContent( + ByteBufAllocator.DEFAULT, Collections.singletonList("my.test.route")); + final CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer(); + + CompositeMetadataCodec.encodeAndAddMetadata( + compositeMetadata, + ByteBufAllocator.DEFAULT, + WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, + routeMetadata); + + socket + .requestResponse( + ByteBufPayload.create( + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), compositeMetadata)) + .log() + .block(); + } + + static String decodeRoute(ByteBuf metadata) { + final CompositeMetadata compositeMetadata = new CompositeMetadata(metadata, false); + + for (CompositeMetadata.Entry metadatum : compositeMetadata) { + if (Objects.requireNonNull(metadatum.getMimeType()) + .equals(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())) { + return new RoutingMetadata(metadatum.getContent()).iterator().next(); + } + } + + return null; + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/RoutingMetadataExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/RoutingMetadataExample.java new file mode 100644 index 000000000..2aee18bf9 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/metadata/routing/RoutingMetadataExample.java @@ -0,0 +1,83 @@ +/* + * Copyright 2015-Present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.examples.transport.tcp.metadata.routing; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.metadata.RoutingMetadata; +import io.rsocket.metadata.TaggingMetadataCodec; +import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.ByteBufPayload; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +public class RoutingMetadataExample { + static final Logger logger = LoggerFactory.getLogger(RoutingMetadataExample.class); + + public static void main(String[] args) { + RSocketServer.create( + SocketAcceptor.forRequestResponse( + payload -> { + final String route = decodeRoute(payload.sliceMetadata()); + + logger.info("Received RequestResponse[route={}]", route); + + payload.release(); + + if ("my.test.route".equals(route)) { + return Mono.just(ByteBufPayload.create("Hello From My Test Route")); + } + + return Mono.error(new IllegalArgumentException("Route " + route + " not found")); + })) + .bindNow(TcpServerTransport.create("localhost", 7000)); + + RSocket socket = + RSocketConnector.create() + // here we specify that route will be encoded using + // Routing&Tagging Metadata layout specified at this + // subspec https://github.com/rsocket/rsocket/blob/master/Extensions/Routing.md + .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString()) + .connect(TcpClientTransport.create("localhost", 7000)) + .block(); + + final ByteBuf routeMetadata = + TaggingMetadataCodec.createTaggingContent( + ByteBufAllocator.DEFAULT, Collections.singletonList("my.test.route")); + socket + .requestResponse( + ByteBufPayload.create( + ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "HelloWorld"), routeMetadata)) + .log() + .block(); + } + + static String decodeRoute(ByteBuf metadata) { + final RoutingMetadata routingMetadata = new RoutingMetadata(metadata); + + return routingMetadata.iterator().next(); + } +}