From 9306ab6f6372cfeec25db72c22c2c0a520020c90 Mon Sep 17 00:00:00 2001 From: hsrong Date: Fri, 22 Dec 2017 23:39:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=9A=90=E5=BC=8F=E5=8F=82?= =?UTF-8?q?=E6=95=B0=E7=9A=84=E5=9B=9E=E4=BC=A0=EF=BC=88=E4=BB=8E=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AB=AF=E4=BC=A0=E5=88=B0=E6=B6=88=E8=B4=B9=E7=AB=AF?= =?UTF-8?q?=EF=BC=89=EF=BC=8C#889=E3=80=81#895?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../protocol/dubbo/DecodeableRpcResult.java | 276 ++++++------ .../dubbo/rpc/protocol/dubbo/DubboCodec.java | 412 +++++++++--------- 2 files changed, 344 insertions(+), 344 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java index c85c0e5bcfe..ec690cb15b1 100644 --- a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DecodeableRpcResult.java @@ -1,138 +1,138 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.dubbo.rpc.protocol.dubbo; - -import com.alibaba.dubbo.common.logger.Logger; -import com.alibaba.dubbo.common.logger.LoggerFactory; -import com.alibaba.dubbo.common.serialize.ObjectInput; -import com.alibaba.dubbo.common.utils.Assert; -import com.alibaba.dubbo.common.utils.StringUtils; -import com.alibaba.dubbo.remoting.Channel; -import com.alibaba.dubbo.remoting.Codec; -import com.alibaba.dubbo.remoting.Decodeable; -import com.alibaba.dubbo.remoting.exchange.Response; -import com.alibaba.dubbo.remoting.transport.CodecSupport; -import com.alibaba.dubbo.rpc.Invocation; -import com.alibaba.dubbo.rpc.RpcResult; -import com.alibaba.dubbo.rpc.support.RpcUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.Type; -import java.util.HashMap; - -public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable { - - private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class); - - private Channel channel; - - private byte serializationType; - - private InputStream inputStream; - - private Response response; - - private Invocation invocation; - - private volatile boolean hasDecoded; - - public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) { - Assert.notNull(channel, "channel == null"); - Assert.notNull(response, "response == null"); - Assert.notNull(is, "inputStream == null"); - this.channel = channel; - this.response = response; - this.inputStream = is; - this.invocation = invocation; - this.serializationType = id; - } - - public void encode(Channel channel, OutputStream output, Object message) throws IOException { - throw new UnsupportedOperationException(); - } - - public Object decode(Channel channel, InputStream input) throws IOException { - ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) - .deserialize(channel.getUrl(), input); - - byte flag = in.readByte(); - switch (flag) { - case DubboCodec.RESPONSE_NULL_VALUE: - break; - case DubboCodec.RESPONSE_VALUE: - try { - Type[] returnType = RpcUtils.getReturnTypes(invocation); - setValue(returnType == null || returnType.length == 0 ? in.readObject() : - (returnType.length == 1 ? in.readObject((Class) returnType[0]) - : in.readObject((Class) returnType[0], returnType[1]))); - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read response data failed.", e)); - } - break; - case DubboCodec.RESPONSE_WITH_EXCEPTION: - try { - Object obj = in.readObject(); - if (obj instanceof Throwable == false) - throw new IOException("Response data error, expect Throwable, but get " + obj); - setException((Throwable) obj); - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read response data failed.", e)); - } - break; - default: - throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); - } - byte attachmentFlag = in.readByte(); - switch (attachmentFlag) { - case DubboCodec.RESPONSE_ATTACHMENTS_NO_EXIST: - break; - case DubboCodec.RESPONSE_ATTACHMENTS_EXIST: - try { - HashMap attachments = in.readObject(HashMap.class); - if(attachments != null - && attachments.size() > 0){ - this.getAttachments().putAll(attachments); - } - } catch (ClassNotFoundException e) { - throw new IOException(StringUtils.toString("Read response data failed.", e)); - } - break; - default: - throw new IOException("Unknown result attachments flag, expect '3' '4', get " + attachmentFlag); - } - return this; - } - - public void decode() throws Exception { - if (!hasDecoded && channel != null && inputStream != null) { - try { - decode(channel, inputStream); - } catch (Throwable e) { - if (log.isWarnEnabled()) { - log.warn("Decode rpc result failed: " + e.getMessage(), e); - } - response.setStatus(Response.CLIENT_ERROR); - response.setErrorMessage(StringUtils.toString(e)); - } finally { - hasDecoded = true; - } - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.protocol.dubbo; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Type; +import java.util.HashMap; + +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.serialize.ObjectInput; +import com.alibaba.dubbo.common.utils.Assert; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.Codec; +import com.alibaba.dubbo.remoting.Decodeable; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.transport.CodecSupport; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.RpcResult; +import com.alibaba.dubbo.rpc.support.RpcUtils; + +public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable { + + private static final Logger log = LoggerFactory.getLogger(DecodeableRpcResult.class); + + private Channel channel; + + private byte serializationType; + + private InputStream inputStream; + + private Response response; + + private Invocation invocation; + + private volatile boolean hasDecoded; + + public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) { + Assert.notNull(channel, "channel == null"); + Assert.notNull(response, "response == null"); + Assert.notNull(is, "inputStream == null"); + this.channel = channel; + this.response = response; + this.inputStream = is; + this.invocation = invocation; + this.serializationType = id; + } + + public void encode(Channel channel, OutputStream output, Object message) throws IOException { + throw new UnsupportedOperationException(); + } + + public Object decode(Channel channel, InputStream input) throws IOException { + ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) + .deserialize(channel.getUrl(), input); + + byte flag = in.readByte(); + switch (flag) { + case DubboCodec.RESPONSE_NULL_VALUE: + break; + case DubboCodec.RESPONSE_VALUE: + try { + Type[] returnType = RpcUtils.getReturnTypes(invocation); + setValue(returnType == null || returnType.length == 0 ? in.readObject() : + (returnType.length == 1 ? in.readObject((Class) returnType[0]) + : in.readObject((Class) returnType[0], returnType[1]))); + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read response data failed.", e)); + } + break; + case DubboCodec.RESPONSE_WITH_EXCEPTION: + try { + Object obj = in.readObject(); + if (obj instanceof Throwable == false) + throw new IOException("Response data error, expect Throwable, but get " + obj); + setException((Throwable) obj); + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read response data failed.", e)); + } + break; + default: + throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag); + } + byte attachmentFlag = in.readByte(); + switch (attachmentFlag) { + case DubboCodec.RESPONSE_ATTACHMENTS_NO_EXIST: + break; + case DubboCodec.RESPONSE_ATTACHMENTS_EXIST: + try { + HashMap attachments = in.readObject(HashMap.class); + if(attachments != null + && attachments.size() > 0){ + this.getAttachments().putAll(attachments); + } + } catch (ClassNotFoundException e) { + throw new IOException(StringUtils.toString("Read response data failed.", e)); + } + break; + default: + throw new IOException("Unknown result attachments flag, expect '3' '4', get " + attachmentFlag); + } + return this; + } + + public void decode() throws Exception { + if (!hasDecoded && channel != null && inputStream != null) { + try { + decode(channel, inputStream); + } catch (Throwable e) { + if (log.isWarnEnabled()) { + log.warn("Decode rpc result failed: " + e.getMessage(), e); + } + response.setStatus(Response.CLIENT_ERROR); + response.setErrorMessage(StringUtils.toString(e)); + } finally { + hasDecoded = true; + } + } + } + +} diff --git a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java index 2350727d2c5..3bb2735962d 100644 --- a/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java +++ b/dubbo-rpc/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboCodec.java @@ -1,207 +1,207 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.dubbo.rpc.protocol.dubbo; - -import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.URL; -import com.alibaba.dubbo.common.Version; -import com.alibaba.dubbo.common.io.Bytes; -import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; -import com.alibaba.dubbo.common.logger.Logger; -import com.alibaba.dubbo.common.logger.LoggerFactory; -import com.alibaba.dubbo.common.serialize.ObjectInput; -import com.alibaba.dubbo.common.serialize.ObjectOutput; -import com.alibaba.dubbo.common.serialize.Serialization; -import com.alibaba.dubbo.common.utils.ReflectUtils; -import com.alibaba.dubbo.common.utils.StringUtils; -import com.alibaba.dubbo.remoting.Channel; -import com.alibaba.dubbo.remoting.Codec2; -import com.alibaba.dubbo.remoting.exchange.Request; -import com.alibaba.dubbo.remoting.exchange.Response; -import com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec; -import com.alibaba.dubbo.remoting.transport.CodecSupport; -import com.alibaba.dubbo.rpc.Invocation; -import com.alibaba.dubbo.rpc.Result; -import com.alibaba.dubbo.rpc.RpcInvocation; - -import java.io.IOException; -import java.io.InputStream; - -import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument; - -/** - * Dubbo codec. - */ -public class DubboCodec extends ExchangeCodec implements Codec2 { - - public static final String NAME = "dubbo"; - public static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion()); - public static final byte RESPONSE_WITH_EXCEPTION = 0; - public static final byte RESPONSE_VALUE = 1; - public static final byte RESPONSE_NULL_VALUE = 2; - public static final byte RESPONSE_ATTACHMENTS_NO_EXIST = 3; - public static final byte RESPONSE_ATTACHMENTS_EXIST = 4; - public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; - public static final Class[] EMPTY_CLASS_ARRAY = new Class[0]; - private static final Logger log = LoggerFactory.getLogger(DubboCodec.class); - - protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { - byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); - Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); - // get request id. - long id = Bytes.bytes2long(header, 4); - if ((flag & FLAG_REQUEST) == 0) { - // decode response. - Response res = new Response(id); - if ((flag & FLAG_EVENT) != 0) { - res.setEvent(Response.HEARTBEAT_EVENT); - } - // get status. - byte status = header[3]; - res.setStatus(status); - if (status == Response.OK) { - try { - Object data; - if (res.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); - } else if (res.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); - } else { - DecodeableRpcResult result; - if (channel.getUrl().getParameter( - Constants.DECODE_IN_IO_THREAD_KEY, - Constants.DEFAULT_DECODE_IN_IO_THREAD)) { - result = new DecodeableRpcResult(channel, res, is, - (Invocation) getRequestData(id), proto); - result.decode(); - } else { - result = new DecodeableRpcResult(channel, res, - new UnsafeByteArrayInputStream(readMessageData(is)), - (Invocation) getRequestData(id), proto); - } - data = result; - } - res.setResult(data); - } catch (Throwable t) { - if (log.isWarnEnabled()) { - log.warn("Decode response failed: " + t.getMessage(), t); - } - res.setStatus(Response.CLIENT_ERROR); - res.setErrorMessage(StringUtils.toString(t)); - } - } else { - res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); - } - return res; - } else { - // decode request. - Request req = new Request(id); - req.setVersion("2.0.0"); - req.setTwoWay((flag & FLAG_TWOWAY) != 0); - if ((flag & FLAG_EVENT) != 0) { - req.setEvent(Request.HEARTBEAT_EVENT); - } - try { - Object data; - if (req.isHeartbeat()) { - data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); - } else if (req.isEvent()) { - data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); - } else { - DecodeableRpcInvocation inv; - if (channel.getUrl().getParameter( - Constants.DECODE_IN_IO_THREAD_KEY, - Constants.DEFAULT_DECODE_IN_IO_THREAD)) { - inv = new DecodeableRpcInvocation(channel, req, is, proto); - inv.decode(); - } else { - inv = new DecodeableRpcInvocation(channel, req, - new UnsafeByteArrayInputStream(readMessageData(is)), proto); - } - data = inv; - } - req.setData(data); - } catch (Throwable t) { - if (log.isWarnEnabled()) { - log.warn("Decode request failed: " + t.getMessage(), t); - } - // bad request - req.setBroken(true); - req.setData(t); - } - return req; - } - } - - private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) - throws IOException { - return serialization.deserialize(url, is); - } - - private byte[] readMessageData(InputStream is) throws IOException { - if (is.available() > 0) { - byte[] result = new byte[is.available()]; - is.read(result); - return result; - } - return new byte[]{}; - } - - @Override - protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { - RpcInvocation inv = (RpcInvocation) data; - - out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); - out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); - out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); - - out.writeUTF(inv.getMethodName()); - out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); - Object[] args = inv.getArguments(); - if (args != null) - for (int i = 0; i < args.length; i++) { - out.writeObject(encodeInvocationArgument(channel, inv, i)); - } - out.writeObject(inv.getAttachments()); - } - - @Override - protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { - Result result = (Result) data; - - Throwable th = result.getException(); - if (th == null) { - Object ret = result.getValue(); - if (ret == null) { - out.writeByte(RESPONSE_NULL_VALUE); - } else { - out.writeByte(RESPONSE_VALUE); - out.writeObject(ret); - } - } else { - out.writeByte(RESPONSE_WITH_EXCEPTION); - out.writeObject(th); - } - if (result.getAttachments() != null - && result.getAttachments().size() > 0) { - out.writeByte(RESPONSE_ATTACHMENTS_EXIST); - out.writeObject(result.getAttachments()); - } else { - out.writeByte(RESPONSE_ATTACHMENTS_NO_EXIST); - } - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.dubbo.rpc.protocol.dubbo; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.Version; +import com.alibaba.dubbo.common.io.Bytes; +import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.serialize.ObjectInput; +import com.alibaba.dubbo.common.serialize.ObjectOutput; +import com.alibaba.dubbo.common.serialize.Serialization; +import com.alibaba.dubbo.common.utils.ReflectUtils; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.Codec2; +import com.alibaba.dubbo.remoting.exchange.Request; +import com.alibaba.dubbo.remoting.exchange.Response; +import com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec; +import com.alibaba.dubbo.remoting.transport.CodecSupport; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Result; +import com.alibaba.dubbo.rpc.RpcInvocation; + +import java.io.IOException; +import java.io.InputStream; + +import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.encodeInvocationArgument; + +/** + * Dubbo codec. + */ +public class DubboCodec extends ExchangeCodec implements Codec2 { + + public static final String NAME = "dubbo"; + public static final String DUBBO_VERSION = Version.getVersion(DubboCodec.class, Version.getVersion()); + public static final byte RESPONSE_WITH_EXCEPTION = 0; + public static final byte RESPONSE_VALUE = 1; + public static final byte RESPONSE_NULL_VALUE = 2; + public static final byte RESPONSE_ATTACHMENTS_NO_EXIST = 3; + public static final byte RESPONSE_ATTACHMENTS_EXIST = 4; + public static final Object[] EMPTY_OBJECT_ARRAY = new Object[0]; + public static final Class[] EMPTY_CLASS_ARRAY = new Class[0]; + private static final Logger log = LoggerFactory.getLogger(DubboCodec.class); + + protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { + byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); + Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); + // get request id. + long id = Bytes.bytes2long(header, 4); + if ((flag & FLAG_REQUEST) == 0) { + // decode response. + Response res = new Response(id); + if ((flag & FLAG_EVENT) != 0) { + res.setEvent(Response.HEARTBEAT_EVENT); + } + // get status. + byte status = header[3]; + res.setStatus(status); + if (status == Response.OK) { + try { + Object data; + if (res.isHeartbeat()) { + data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + } else if (res.isEvent()) { + data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + } else { + DecodeableRpcResult result; + if (channel.getUrl().getParameter( + Constants.DECODE_IN_IO_THREAD_KEY, + Constants.DEFAULT_DECODE_IN_IO_THREAD)) { + result = new DecodeableRpcResult(channel, res, is, + (Invocation) getRequestData(id), proto); + result.decode(); + } else { + result = new DecodeableRpcResult(channel, res, + new UnsafeByteArrayInputStream(readMessageData(is)), + (Invocation) getRequestData(id), proto); + } + data = result; + } + res.setResult(data); + } catch (Throwable t) { + if (log.isWarnEnabled()) { + log.warn("Decode response failed: " + t.getMessage(), t); + } + res.setStatus(Response.CLIENT_ERROR); + res.setErrorMessage(StringUtils.toString(t)); + } + } else { + res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); + } + return res; + } else { + // decode request. + Request req = new Request(id); + req.setVersion("2.0.0"); + req.setTwoWay((flag & FLAG_TWOWAY) != 0); + if ((flag & FLAG_EVENT) != 0) { + req.setEvent(Request.HEARTBEAT_EVENT); + } + try { + Object data; + if (req.isHeartbeat()) { + data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); + } else if (req.isEvent()) { + data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); + } else { + DecodeableRpcInvocation inv; + if (channel.getUrl().getParameter( + Constants.DECODE_IN_IO_THREAD_KEY, + Constants.DEFAULT_DECODE_IN_IO_THREAD)) { + inv = new DecodeableRpcInvocation(channel, req, is, proto); + inv.decode(); + } else { + inv = new DecodeableRpcInvocation(channel, req, + new UnsafeByteArrayInputStream(readMessageData(is)), proto); + } + data = inv; + } + req.setData(data); + } catch (Throwable t) { + if (log.isWarnEnabled()) { + log.warn("Decode request failed: " + t.getMessage(), t); + } + // bad request + req.setBroken(true); + req.setData(t); + } + return req; + } + } + + private ObjectInput deserialize(Serialization serialization, URL url, InputStream is) + throws IOException { + return serialization.deserialize(url, is); + } + + private byte[] readMessageData(InputStream is) throws IOException { + if (is.available() > 0) { + byte[] result = new byte[is.available()]; + is.read(result); + return result; + } + return new byte[]{}; + } + + @Override + protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { + RpcInvocation inv = (RpcInvocation) data; + + out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); + out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); + out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); + + out.writeUTF(inv.getMethodName()); + out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); + Object[] args = inv.getArguments(); + if (args != null) + for (int i = 0; i < args.length; i++) { + out.writeObject(encodeInvocationArgument(channel, inv, i)); + } + out.writeObject(inv.getAttachments()); + } + + @Override + protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException { + Result result = (Result) data; + + Throwable th = result.getException(); + if (th == null) { + Object ret = result.getValue(); + if (ret == null) { + out.writeByte(RESPONSE_NULL_VALUE); + } else { + out.writeByte(RESPONSE_VALUE); + out.writeObject(ret); + } + } else { + out.writeByte(RESPONSE_WITH_EXCEPTION); + out.writeObject(th); + } + if (result.getAttachments() != null + && result.getAttachments().size() > 0) { + out.writeByte(RESPONSE_ATTACHMENTS_EXIST); + out.writeObject(result.getAttachments()); + } else { + out.writeByte(RESPONSE_ATTACHMENTS_NO_EXIST); + } + } } \ No newline at end of file