From ec3754bc4a88c9377db9b30bce759fe31bd8231e Mon Sep 17 00:00:00 2001 From: inetufo Date: Mon, 27 Feb 2017 19:30:07 +0800 Subject: [PATCH 1/2] missing the first doc record bug missing the first doc record bug --- lib/logstash/inputs/mongodb.rb | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/mongodb.rb b/lib/logstash/inputs/mongodb.rb index 615ec88..42c9927 100644 --- a/lib/logstash/inputs/mongodb.rb +++ b/lib/logstash/inputs/mongodb.rb @@ -112,7 +112,7 @@ def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) if x[:place].nil? || x[:place] == 0 first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name) @logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}") - return first_entry_id + return [first_entry_id, first_entry_id] else @logger.debug("placeholder already exists, it is #{x[:place]}") return x[:place][:place] @@ -144,10 +144,15 @@ def get_collection_names(mongodb, collection) end public - def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size) + def get_cursor_for_collection(mongodb, mongo_collection_name, first_entry_id_object, last_id_object, batch_size) collection = mongodb.collection(mongo_collection_name) # Need to make this sort by date in object id then get the first of the series # db.events_20150320.find().limit(1).sort({ts:1}) + if first_entry_id_object == last_id_object + @logger.info("get_cursor_for_collection first entry:#{first_entry_id_object}") + return collection.find({:_id => {:$eq => first_entry_id_object}}).limit(1) + end + return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size) end @@ -157,7 +162,7 @@ def update_watched_collections(mongodb, collection, sqlitedb) collection_data = {} collections.each do |my_collection| init_placeholder_table(sqlitedb) - last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection) + last_id, first_entry_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection) if !collection_data[my_collection] collection_data[my_collection] = { :name => my_collection, :last_id => last_id } end @@ -240,6 +245,18 @@ def run(queue) # get batch of events starting at the last_place if it is set + first_entry_id = @collection_data[index][:first_entry_id] + first_entry_id_object = first_entry_id + if since_type == 'id' + if !first_entry_id.nil? + first_entry_id_object = BSON::ObjectId(first_entry_id) + end + elsif since_type == 'time' + if first_entry_id != '' && !first_entry_id.nil? + first_entry_id_object = Time.at(first_entry_id) + end + end + last_id_object = last_id if since_type == 'id' last_id_object = BSON::ObjectId(last_id) @@ -248,7 +265,8 @@ def run(queue) last_id_object = Time.at(last_id) end end - cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size) + + cursor = get_cursor_for_collection(@mongodb, collection_name, first_entry_id_object, last_id_object, batch_size) cursor.each do |doc| logdate = DateTime.parse(doc['_id'].generation_time.to_s) event = LogStash::Event.new("host" => @host) From ee25f27decdb1c9e1b34cd9b442cbabd3cea1f19 Mon Sep 17 00:00:00 2001 From: inetufo Date: Mon, 27 Feb 2017 19:35:17 +0800 Subject: [PATCH 2/2] missing the first doc record missing the first doc record --- lib/logstash/inputs/mongodb.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/mongodb.rb b/lib/logstash/inputs/mongodb.rb index 42c9927..7a12d23 100644 --- a/lib/logstash/inputs/mongodb.rb +++ b/lib/logstash/inputs/mongodb.rb @@ -164,7 +164,7 @@ def update_watched_collections(mongodb, collection, sqlitedb) init_placeholder_table(sqlitedb) last_id, first_entry_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection) if !collection_data[my_collection] - collection_data[my_collection] = { :name => my_collection, :last_id => last_id } + collection_data[my_collection] = { :name => my_collection, :last_id => last_id, :first_entry_id => first_entry_id} end end return collection_data