From 0d69604ae319610b9fde1b3a77fd8130f70b4ec2 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 16 Jun 2014 12:44:12 -0700 Subject: [PATCH] FLUME-1729. Better Flume-Spark integration. Use readFully instead of read in EventTransformer. --- .../org/apache/spark/streaming/flume/EventTransformer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala index 75b224afca39b..069a9a215675c 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala @@ -33,7 +33,7 @@ object EventTransformer extends Logging { Array[Byte]) = { val bodyLength = in.readInt() val bodyBuff = new Array[Byte](bodyLength) - in.read(bodyBuff) + in.readFully(bodyBuff) val numHeaders = in.readInt() val headers = new java.util.HashMap[CharSequence, CharSequence] @@ -41,12 +41,12 @@ object EventTransformer extends Logging { for (i <- 0 until numHeaders) { val keyLength = in.readInt() val keyBuff = new Array[Byte](keyLength) - in.read(keyBuff) + in.readFully(keyBuff) val key: String = Utils.deserialize(keyBuff) val valLength = in.readInt() val valBuff = new Array[Byte](valLength) - in.read(valBuff) + in.readFully(valBuff) val value: String = Utils.deserialize(valBuff) headers.put(key, value)