From 92b28a36c0a2eb00d000998cda50b92dadc30900 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Sun, 22 Sep 2024 13:39:13 -0500 Subject: [PATCH] Add mongodb as alternate source name for documentdb source (#4969) Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../mongo/documentdb/DocumentDBSource.java | 2 +- .../transforms/rules/mongodb-rule.yaml | 4 + .../templates/mongodb-template.yaml | 81 +++++++++++++++++++ 3 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml create mode 100644 data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/mongodb-template.yaml diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java index b6ff1fbdf1..6a8e82a8a9 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/documentdb/DocumentDBSource.java @@ -23,7 +23,7 @@ import java.util.function.Function; -@DataPrepperPlugin(name = "documentdb", pluginType = Source.class, pluginConfigurationType = MongoDBSourceConfig.class) +@DataPrepperPlugin(name = "documentdb", alternateNames = "mongodb", pluginType = Source.class, pluginConfigurationType = MongoDBSourceConfig.class) public class DocumentDBSource implements Source>, UsesEnhancedSourceCoordination { private static final Logger LOG = LoggerFactory.getLogger(DocumentDBSource.class); diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml new file mode 100644 index 0000000000..33cc703072 --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/rules/mongodb-rule.yaml @@ -0,0 +1,4 @@ +plugin_name: "mongodb" +apply_when: + - "$..source.mongodb" + - "$..source.mongodb.s3_bucket" \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/mongodb-template.yaml b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/mongodb-template.yaml new file mode 100644 index 0000000000..88208e631f --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/resources/org/opensearch/dataprepper/transforms/templates/mongodb-template.yaml @@ -0,0 +1,81 @@ +"<>": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + mongodb: "<<$.<>.source.mongodb>>" + routes: + - initial_load: 'getMetadata("ingestion_type") == "EXPORT"' + - stream_load: 'getMetadata("ingestion_type") == "STREAM"' + sink: + - s3: + routes: + - initial_load + aws: + region: "<<$.<>.source.mongodb.s3_region>>" + sts_role_arn: "<<$.<>.source.mongodb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.mongodb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.mongodb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.mongodb.s3_bucket>>" + threshold: + event_collect_timeout: "120s" + maximum_size: "2mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.mongodb.aws.sts_role_arn>>" + - s3: + routes: + - stream_load + aws: + region: "<<$.<>.source.mongodb.s3_region>>" + sts_role_arn: "<<$.<>.source.mongodb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.mongodb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.mongodb.aws.sts_header_overrides>>" + bucket: "<<$.<>.source.mongodb.s3_bucket>>" + threshold: + event_collect_timeout: "15s" + maximum_size: "1mb" + aggregate_threshold: + maximum_size: "128mb" + flush_capacity_ratio: 0 + object_key: + path_prefix: "${getMetadata(\"s3_partition_key\")}" + codec: + event_json: + default_bucket_owner: "<>.source.mongodb.aws.sts_role_arn>>" +"<>-s3": + workers: "<<$.<>.workers>>" + delay: "<<$.<>.delay>>" + buffer: "<<$.<>.buffer>>" + source: + s3: + codec: + event_json: + compression: "none" + aws: + region: "<<$.<>.source.mongodb.s3_region>>" + sts_role_arn: "<<$.<>.source.mongodb.aws.sts_role_arn>>" + sts_external_id: "<<$.<>.source.mongodb.aws.sts_external_id>>" + sts_header_overrides: "<<$.<>.source.mongodb.aws.sts_header_overrides>>" + acknowledgments: true + delete_s3_objects_on_read: true + disable_s3_metadata_in_event: true + scan: + folder_partitions: + depth: "<>.source.mongodb.s3_prefix>>" + max_objects_per_ownership: 50 + buckets: + - bucket: + name: "<<$.<>.source.mongodb.s3_bucket>>" + filter: + include_prefix: ["<>.source.mongodb.s3_prefix>>"] + scheduling: + interval: "60s" + processor: "<<$.<>.processor>>" + sink: "<<$.<>.sink>>" + routes: "<<$.<>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel. \ No newline at end of file