Skip to content

Commit

Permalink
feat: Output record count metric from batch files insert (#267)
Browse files Browse the repository at this point in the history
Bit of an opinionated change since it might be conflating
batches/records, but it has been helpful for us to know how many rows
(records) were created/updated from batch processing.
  • Loading branch information
ReubenFrankel authored Oct 4, 2024
1 parent 7c9a1fb commit 46df975
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
6 changes: 4 additions & 2 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,8 @@ def merge_from_stage(
key_properties=key_properties,
)
self.logger.debug("Merging with SQL: %s", merge_statement)
conn.execute(merge_statement, **kwargs)
result = conn.execute(merge_statement, **kwargs)
return result.rowcount

def copy_from_stage(
self,
Expand All @@ -578,7 +579,8 @@ def copy_from_stage(
file_format=file_format,
)
self.logger.debug("Copying with SQL: %s", copy_statement)
conn.execute(copy_statement, **kwargs)
result = conn.execute(copy_statement, **kwargs)
return result.rowcount

def drop_file_format(self, file_format: str) -> None:
"""Drop a file format in the schema.
Expand Down
13 changes: 9 additions & 4 deletions target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def insert_batch_files_via_internal_stage(
self,
full_table_name: str,
files: t.Sequence[str],
) -> None:
) -> int:
"""Process a batch file with the given batch context.
Args:
Expand All @@ -190,7 +190,7 @@ def insert_batch_files_via_internal_stage(

if self.key_properties:
# merge into destination table
self.connector.merge_from_stage(
record_count = self.connector.merge_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
Expand All @@ -199,7 +199,7 @@ def insert_batch_files_via_internal_stage(
)

else:
self.connector.copy_from_stage(
record_count = self.connector.copy_from_stage(
full_table_name=full_table_name,
schema=self.schema,
sync_id=sync_id,
Expand All @@ -217,6 +217,8 @@ def insert_batch_files_via_internal_stage(
if os.path.exists(file_path): # noqa: PTH110
os.remove(file_path) # noqa: PTH107

return record_count

def process_batch_files(
self,
encoding: BaseBatchFileEncoding,
Expand All @@ -232,7 +234,7 @@ def process_batch_files(
NotImplementedError: If the batch file encoding is not supported.
"""
if encoding.format == BatchFileFormat.JSONL:
self.insert_batch_files_via_internal_stage(
record_count = self.insert_batch_files_via_internal_stage(
full_table_name=self.full_table_name,
files=files,
)
Expand All @@ -242,6 +244,9 @@ def process_batch_files(
msg,
)

with self.record_counter_metric as counter:
counter.increment(record_count)

# TODO: remove after https://github.com/meltano/sdk/issues/1819 is fixed
def _singer_validate_message(self, record: dict) -> None:
"""Ensure record conforms to Singer Spec.
Expand Down

0 comments on commit 46df975

Please sign in to comment.