Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

missing the first doc record. #60

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions lib/logstash/inputs/mongodb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def get_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
if x[:place].nil? || x[:place] == 0
first_entry_id = init_placeholder(sqlitedb, since_table, mongodb, mongo_collection_name)
@logger.debug("FIRST ENTRY ID for #{mongo_collection_name} is #{first_entry_id}")
return first_entry_id
return [first_entry_id, first_entry_id]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should return the placeholder or nil and we can wrap it with a get_processing_start_id() or something that returns us the palceholder or the beginning of the db.

else
@logger.debug("placeholder already exists, it is #{x[:place]}")
return x[:place][:place]
Expand Down Expand Up @@ -144,10 +144,15 @@ def get_collection_names(mongodb, collection)
end

public
def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
def get_cursor_for_collection(mongodb, mongo_collection_name, first_entry_id_object, last_id_object, batch_size)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably rename these variables as they're a bit confusing.

Maybe...
first_entry_id_object => processing_start_entry_id_object
last_id_object => last_unprocessed_entry_id_object (or maybe first_unprocessed_entry_id_object)

... or something like that.

collection = mongodb.collection(mongo_collection_name)
# Need to make this sort by date in object id then get the first of the series
# db.events_20150320.find().limit(1).sort({ts:1})
if first_entry_id_object == last_id_object
@logger.info("get_cursor_for_collection first entry:#{first_entry_id_object}")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might should be debug level logging.

return collection.find({:_id => {:$eq => first_entry_id_object}}).limit(1)
end

return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)
end

Expand All @@ -157,9 +162,9 @@ def update_watched_collections(mongodb, collection, sqlitedb)
collection_data = {}
collections.each do |my_collection|
init_placeholder_table(sqlitedb)
last_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
last_id, first_entry_id = get_placeholder(sqlitedb, since_table, mongodb, my_collection)
if !collection_data[my_collection]
collection_data[my_collection] = { :name => my_collection, :last_id => last_id }
collection_data[my_collection] = { :name => my_collection, :last_id => last_id, :first_entry_id => first_entry_id}
end
end
return collection_data
Expand Down Expand Up @@ -240,6 +245,18 @@ def run(queue)
# get batch of events starting at the last_place if it is set


first_entry_id = @collection_data[index][:first_entry_id]
first_entry_id_object = first_entry_id
if since_type == 'id'
if !first_entry_id.nil?
first_entry_id_object = BSON::ObjectId(first_entry_id)
end
elsif since_type == 'time'
if first_entry_id != '' && !first_entry_id.nil?
first_entry_id_object = Time.at(first_entry_id)
end
end

last_id_object = last_id
if since_type == 'id'
last_id_object = BSON::ObjectId(last_id)
Expand All @@ -248,7 +265,8 @@ def run(queue)
last_id_object = Time.at(last_id)
end
end
cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)

cursor = get_cursor_for_collection(@mongodb, collection_name, first_entry_id_object, last_id_object, batch_size)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather only pass get_cursor_for_collection the starting point and batch_size and use a separate method as mentioned above to work out what that should be.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, when are you going to merge this branch? I really need it, thank you.

cursor.each do |doc|
logdate = DateTime.parse(doc['_id'].generation_time.to_s)
event = LogStash::Event.new("host" => @host)
Expand Down