diff --git a/lib/em-kafka/client.rb b/lib/em-kafka/client.rb index 6e3672e..7b6a6c7 100644 --- a/lib/em-kafka/client.rb +++ b/lib/em-kafka/client.rb @@ -1,17 +1,14 @@ module EventMachine module Kafka class Client - include EventMachine::Kafka::EventEmitter - include EM::Deferrable - def initialize(host, port) @host = host || 'localhost' @port = port || 9092 - @closing_connection = false @callback = nil end def send_data(data) + connect if @connection.nil? || @connection.disconnected? @connection.send_data(data) end @@ -20,58 +17,16 @@ def on_data(&block) end def connect - @connection = EM.connect(@host, @port, EM::Kafka::Connection, @host, @port) - - @connection.on(:closed) do - if @connected - @deferred_status = nil - @connected = false - unless @closing_connection - @reconnecting = true - reconnect - end - else - unless @closing_connection - EM.add_timer(1) { reconnect } - end - end - end - - @connection.on(:connected) do - @connected = true - succeed - - if @reconnecting - @reconnecting = false - emit(:reconnected) - end - end - + @connection = EM.connect(@host, @port, EM::Kafka::Connection) @connection.on(:message) do |message| - @callback.call(message) + @callback.call(message) if @callback end - - @connected = false - @reconnecting = false - - return self - end - - def connected? - @connected + @connection end def close_connection - @closing_connection = true @connection.close_connection_after_writing end - - private - - def reconnect - EventMachine::Kafka.logger.debug("Trying to reconnect to Kafka") - @connection.reconnect @host, @port - end end end end diff --git a/lib/em-kafka/connection.rb b/lib/em-kafka/connection.rb index c55c7df..632b632 100644 --- a/lib/em-kafka/connection.rb +++ b/lib/em-kafka/connection.rb @@ -2,9 +2,13 @@ module EventMachine::Kafka class Connection < EM::Connection include EventMachine::Kafka::EventEmitter - def initialize(host, port) + def initialize(*args) super - @host, @port = host, port + @disconnected = false + end + + def disconnected? + @disconnected end def connection_completed @@ -17,6 +21,7 @@ def receive_data(data) end def unbind + @disconnected = true EventMachine::Kafka.logger.info("Disconnected from Kafka") emit(:closed) end