Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Output record count metric from batch files insert #267

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ def merge_from_stage(
key_properties=key_properties,
)
self.logger.debug("Merging with SQL: %s", merge_statement)
conn.execute(merge_statement, **kwargs)
result = conn.execute(merge_statement, **kwargs)
return result.rowcount

def copy_from_stage(
self,
Expand All @@ -578,7 +579,8 @@ def copy_from_stage(
file_format=file_format,
)
self.logger.debug("Copying with SQL: %s", copy_statement)
conn.execute(copy_statement, **kwargs)
result = conn.execute(copy_statement, **kwargs)
return result.rowcount

def drop_file_format(self, file_format: str) -> None:
"""Drop a file format in the schema.
Expand Down
7 changes: 5 additions & 2 deletions target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage(

if self.key_properties:
# merge into destination table
self.connector.merge_from_stage(
record_count = self.connector.merge_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
Expand All @@ -199,13 +199,16 @@ def insert_batch_files_via_internal_stage(
)

else:
self.connector.copy_from_stage(
record_count = self.connector.copy_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
file_format=file_format,
)

with self.record_counter_metric as counter:
counter.increment(record_count)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this add records to the count twice? Once when the record is processed in https://github.com/meltano/sdk/blob/409a40b48c442e1382611d3a69b2f95df2e073d3/singer_sdk/target_base.py#L362 and again when they're batched in

self.insert_batch_files_via_internal_stage(
full_table_name=full_table_name,
files=files,
)
?

Copy link
Contributor Author

@ReubenFrankel ReubenFrankel Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I guess so. The intention of this change was to work with BATCH messages, but you're referring to when a user supplies batch_config on the target side specifically to batch together RECORD data into files for insert via internal stage?

Just to check my understanding, if batch_config is supplied on the tap side, it emits a BATCH message and then doesn't hit this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the full context of the function I linked is useful:

def bulk_insert_records(
self,
full_table_name: str,
schema: dict,
records: t.Iterable[dict[str, t.Any]],
) -> int | None:
"""Bulk insert records to an existing destination table.
The default implementation uses a generic SQLAlchemy bulk insert operation.
This method may optionally be overridden by developers in order to provide
faster, native bulk uploads.
Args:
full_table_name: the target table name.
schema: the JSON schema for the new table, to be used when inferring column
names.
records: the input records.
Returns:
True if table exists, False if not, None if unsure or undetectable.
"""
# prepare records for serialization
processed_records = (
conform_record_data_types(
stream_name=self.stream_name,
record=rcd,
schema=schema,
level="RECURSIVE",
logger=self.logger,
)
for rcd in records
)
# serialize to batch files and upload
# TODO: support other batchers
batcher = JSONLinesBatcher(
tap_name=self.target.name,
stream_name=self.stream_name,
batch_config=self.batch_config,
)
batches = batcher.get_batches(records=processed_records)
for files in batches:
self.insert_batch_files_via_internal_stage(
full_table_name=full_table_name,
files=files,
)
# if records list, we can quickly return record count.
return len(records) if isinstance(records, list) else None

RECORD messages are processed by this logic, even with the default batch_config.

Copy link
Contributor Author

@ReubenFrankel ReubenFrankel Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - I wasn't aware there was a default batch_config here. So processing of RECORD and BATCH messages both end up calling insert_batch_files_via_internal_stage where I've made this change, except RECORD messages are already counted by the SDK before they are batched so would now get counted twice.

In spite of this issue, in principle how do you feel about this idea of emitting a record count when this target receives a BATCH message? Is it OK or does it conflate two concepts?


finally:
self.logger.debug("Cleaning up after batch processing")
self.connector.drop_file_format(file_format=file_format)
Expand Down
Loading