From bd3f870bb53bdafd546f884b64a3a57807e965b9 Mon Sep 17 00:00:00 2001 From: Martin Zink Date: Mon, 28 Mar 2022 11:40:57 +0200 Subject: [PATCH] MINIFICPP-1743 Added PutGCSObject processor Closes #1268 Signed-off-by: Marton Szasz --- .github/workflows/ci.yml | 8 +- CMakeLists.txt | 5 + CONTROLLERS.md | 25 +- LICENSE | 89 +++++ NOTICE | 3 + PROCESSORS.md | 59 ++++ README.md | 59 ++-- bootstrap.sh | 2 + bstrp_functions.sh | 6 +- cmake/Abseil.cmake | 27 ++ cmake/DockerConfig.cmake | 1 + cmake/GoogleCloudCpp.cmake | 50 +++ cmake/Nlohmann.cmake | 24 ++ docker/Dockerfile | 3 +- .../MiNiFi_integration_test_driver.py | 6 + .../features/google_cloud_storage.feature | 19 ++ .../GCPCredentialsControllerService.py | 17 + .../minifi/core/DockerTestCluster.py | 10 + .../minifi/core/FakeGcsServerContainer.py | 27 ++ .../minifi/core/SingleNodeDockerCluster.py | 3 + .../minifi/processors/PutGCSObject.py | 14 + .../test-bucket/test-file | 1 + docker/test/integration/steps/steps.py | 27 ++ extensions/gcp/CMakeLists.txt | 30 ++ extensions/gcp/GCPAttributes.h | 79 +++++ .../GCPCredentialsControllerService.cpp | 114 +++++++ .../GCPCredentialsControllerService.h | 73 ++++ extensions/gcp/processors/PutGCSObject.cpp | 300 +++++++++++++++++ extensions/gcp/processors/PutGCSObject.h | 92 +++++ extensions/gcp/tests/CMakeLists.txt | 47 +++ .../GCPCredentialsControllerServiceTests.cpp | 145 ++++++++ extensions/gcp/tests/PutGCSObjectTests.cpp | 316 ++++++++++++++++++ run_flake8.sh | 2 +- .../nlohmann_lib_as_interface.patch | 13 + .../remove-find_package.patch | 11 + win_build_vs.bat | 3 +- 36 files changed, 1669 insertions(+), 41 deletions(-) create mode 100644 cmake/Abseil.cmake create mode 100644 cmake/GoogleCloudCpp.cmake create mode 100644 cmake/Nlohmann.cmake create mode 100644 docker/test/integration/features/google_cloud_storage.feature create mode 100644 docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py create mode 100644 docker/test/integration/minifi/core/FakeGcsServerContainer.py create mode 100644 docker/test/integration/minifi/processors/PutGCSObject.py create mode 100644 docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file create mode 100644 extensions/gcp/CMakeLists.txt create mode 100644 extensions/gcp/GCPAttributes.h create mode 100644 extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp create mode 100644 extensions/gcp/controllerservices/GCPCredentialsControllerService.h create mode 100644 extensions/gcp/processors/PutGCSObject.cpp create mode 100644 extensions/gcp/processors/PutGCSObject.h create mode 100644 extensions/gcp/tests/CMakeLists.txt create mode 100644 extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp create mode 100644 extensions/gcp/tests/PutGCSObjectTests.cpp create mode 100644 thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch create mode 100644 thirdparty/google-cloud-cpp/remove-find_package.patch diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1593dae48..575ad1af09 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: export LDFLAGS="-L/usr/local/opt/flex/lib" export CPPFLAGS="-I/usr/local/opt/flex/include" # CPPFLAGS are not recognized by cmake, so we have to force them to CFLAGS and CXXFLAGS to have flex 2.6 working - ./bootstrap.sh -e -t && cd build && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="${CPPFLAGS} ${CFLAGS}" -DCMAKE_CXX_FLAGS="${CPPFLAGS} ${CXXFLAGS}" -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_SQL=ON -DUSE_REAL_ODBC_TEST_DRIVER=ON -DENABLE_AZURE=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 + ./bootstrap.sh -e -t && cd build && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="${CPPFLAGS} ${CFLAGS}" -DCMAKE_CXX_FLAGS="${CPPFLAGS} ${CXXFLAGS}" -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_SQL=ON -DUSE_REAL_ODBC_TEST_DRIVER=ON -DENABLE_AZURE=ON -DENABLE_GCP=ON -DCMAKE_VERBOSE_MAKEFILE=ON -DCMAKE_RULE_MESSAGES=OFF -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON .. && cmake --build . --parallel 4 - name: test run: cd build && make test ARGS="--timeout 300 -j4 --output-on-failure" - name: linter @@ -81,7 +81,7 @@ jobs: run: | PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64 PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn - win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /K /L /R /Z /N /RO + win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /GCP /K /L /R /Z /N /RO shell: cmd - name: test run: cd ..\b && ctest --timeout 300 --parallel 8 -C Release --output-on-failure @@ -119,7 +119,7 @@ jobs: cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \ -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \ -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \ - -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON .. + -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON .. make -j$(nproc) VERBOSE=1 - name: test run: cd build && make test ARGS="--timeout 300 -j2 --output-on-failure" @@ -204,7 +204,7 @@ jobs: if [ -d ~/.ccache ]; then mv ~/.ccache .; fi mkdir build cd build - cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache .. + cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache .. make docker - id: install_deps run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 5284a66c8a..99ebfd10e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -618,6 +618,11 @@ if (NOT SKIP_TESTS OR ENABLE_TEST_PROCESSORS) createExtension(TEST-PROCESSORS-EXTENSION "TEST-PROCESSORS EXTENSION" "This enables processors purely for testing purposes" "extensions/test-processors") endif() +## Google Cloud Platform Extension +if (ENABLE_ALL OR ENABLE_GCP) + createExtension(GCP-EXTENSIONS "GCP EXTENSIONS" "This enables Google Cloud Platform support" "extensions/gcp" "extensions/gcp/tests") +endif() + ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN add_subdirectory(main) diff --git a/CONTROLLERS.md b/CONTROLLERS.md index 3f864eba18..84aa57eead 100644 --- a/CONTROLLERS.md +++ b/CONTROLLERS.md @@ -17,11 +17,11 @@ ## Table of Contents -- [AzureStorageCredentialsService](#azureStorageCredentialsService) -- [AWSCredentialsService](#awsCredentialsService) +- [AWSCredentialsService](#AWSCredentialsService) +- [AzureStorageCredentialsService](#AzureStorageCredentialsService) +- [GCPCredentialsControllerService](#GCPCredentialsControllerService) - [KubernetesControllerService](#kubernetesControllerService) - ## AWSCredentialsService ### Description @@ -64,6 +64,25 @@ properties (not in bold) are considered optional. |Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.| |**Use Managed Identity Credentials**|false||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.| +## GCPCredentialsControllerService + +### Description + +Manages the credentials for Google Cloud Platform. This allows for multiple Google Cloud Platform related processors to reference this single +controller service so that Google Cloud Platform credentials can be managed and controlled in a central location. + +### Properties + +In the list below, the names of required properties appear in bold. Any other +properties (not in bold) are considered optional. + + +| Name | Default Value | Allowable Values | Description | +|---------------------------|----------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------| +| **Credentials Location** | Google Application Default Credentials | Google Application Default Credentials
Use Compute Engine Credentials
Service Account JSON File
Service Account JSON
Use Anonymous credentials | The location of the credentials. | +| Service Account JSON File | | | Path to a file containing a Service Account key file in JSON format. | +| Service Account JSON | | | The raw JSON containing a Service Account keyfile. | + ## KubernetesControllerService diff --git a/LICENSE b/LICENSE index 2d547f0f69..8c3a9a21d2 100644 --- a/LICENSE +++ b/LICENSE @@ -208,6 +208,8 @@ This product bundles 'Simple-Windows-Posix-Semaphore' which is available under a This project bundles 'mbedTLS' which is available under an ALv2 license This project bundles 'RocksDB' which is available under an ALv2 license This project bundles 'AWS SDK for C++' which is available under an ALv2 license +This project bundles 'C++ Client Libraries for Google Cloud Services' which is available under an ALv2 license +This project bundles 'Abseil Common Libraries (C++)' which is available under an ALv2 license The Apache NiFi - MiNiFi C++ project contains subcomponents with separate copyright notices and license terms. Your use of the source code for the these @@ -244,6 +246,69 @@ This product bundles 'cpplint.py' which is available under a 3-Clause BSD Licen (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +This product bundles 'crc32c' which is available under a 3-Clause BSD License. + + Copyright 2017, The CRC32C Authors. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +This product bundles 'GoogleTest - Google Testing and Mocking Framework' which is available under a 3-Clause BSD License. + + Copyright 2008, Google Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + This product bundles 'spdlog' which is available under an MIT license. Copyright (c) 2016 Alexander Dalshov. @@ -441,6 +506,30 @@ COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +This product bundles 'nlohmann/json' which is available under The MIT License. + + MIT License + + Copyright (c) 2013-2022 Niels Lohmann + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + This product bundles 'pybind11' which is available under a BSD-style license. Copyright (c) 2016 Wenzel Jakob , All rights reserved. diff --git a/NOTICE b/NOTICE index d3d76449f4..ce26b31538 100644 --- a/NOTICE +++ b/NOTICE @@ -63,6 +63,9 @@ This software includes third party software subject to the following copyrights: - libyaml - Copyright (c) 2006-2016 Kirill Simonov, Copyright (c) 2017-2020 Ingy döt Net - libwebsockets - Copyright (C) 2010 - 2020 Andy Green - kubernetes-client/c - Brendan Burns, Hui Yu and other contributors +- nlohmann json - Copyright (c) 2013-2022 Niels Lohmann +- abseil-cpp - Google Inc. +- crc32c - Google Inc., Fangming Fang, Vadim Skipin, Rodrigo Tobar, Harry Mallon The licenses for these third party components are included in LICENSE.txt diff --git a/PROCESSORS.md b/PROCESSORS.md index 54caa873ad..4290a8b254 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -50,6 +50,7 @@ - [PublishMQTT](#publishmqtt) - [PutAzureBlobStorage](#putazureblobstorage) - [PutAzureDataLakeStorage](#putazuredatalakestorage) +- [PutGCSObject](#putgcsobject) - [PutFile](#putfile) - [PutOPCProcessor](#putopcprocessor) - [PutS3Object](#puts3object) @@ -1477,6 +1478,64 @@ In the list below, the names of required properties appear in bold. Any other pr |success|Files that have been successfully written to Azure storage are transferred to this relationship| +## PutGCSObject + +### Description + +Puts content into a Google Cloud Storage bucket +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|--------------------------------------|---------------|-----------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Bucket** | ${gcs.bucket} | | Bucket of the object.
**Supports Expression Language: true** | +| **Key** | ${filename} | | Name of the object.
**Supports Expression Language: true** | +| **Number Of retries** | 6 | integers | How many retry attempts should be made before routing to the failure relationship. | +| **GCP Credentials Provider Service** | | [GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService) | The Controller Service used to obtain Google Cloud Platform credentials. | +| Object ACL | | authenticatedRead
bucketOwnerFullControl
bucketOwnerRead
private
projectPrivate
publicRead | Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults. For more information please visit [Google Cloud Access control lists](https://cloud.google.com/storage/docs/access-control/lists#predefined-acl) | +| Server Side Encryption Key | | | An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.
**Supports Expression Language: true** | +| CRC32 Checksum | | | The name of the attribute where the crc32 checksum is stored for server-side validation.
**Supports Expression Language: true** | +| MD5 Hash | | | The name of the attribute where the md5 hash is stored for server-side validation.
**Supports Expression Language: true** | +| Content Type | ${mime.type} | | Content Type for the file, i.e. text/plain
**Supports Expression Language: true** | +| Endpoint Override URL | | | Overrides the default Google Cloud Storage endpoints | + +### Relationships + +| Name | Description | +|---------|----------------------------------------------------------------------------------------| +| success | FlowFiles that are sent successfully to the destination are sent to this relationship. | +| failure | FlowFiles that failed to be sent to the destination are sent to this relationship. | + +### Output Attributes + +| Attribute | Relationship | Description | +|----------------------------|--------------|--------------------------------------------------------------------| +| _gcs.error.reason_ | failure | The description of the error occurred during upload. | +| _gcs.error.domain_ | failure | The domain of the error occurred during upload. | +| _gcs.bucket_ | success | Bucket of the object. | +| _gcs.key_ | success | Name of the object. | +| _gcs.size_ | success | Size of the object. | +| _gcs.crc32c_ | success | The CRC32C checksum of object's data, encoded in base64 | +| _gcs.md5_ | success | The MD5 hash of the object's data encoded in base64. | +| _gcs.owner.entity_ | success | The owner entity, in the form "user-emailAddress". | +| _gcs.owner.entity.id_ | success | The ID for the entity. | +| _gcs.content.encoding_ | success | The content encoding of the object. | +| _gcs.content.language_ | success | The content language of the object. | +| _gcs.content.disposition_ | success | The data content disposition of the object. | +| _gcs.media.link_ | success | The media download link to the object. | +| _gcs.self.link_ | success | The link to this object. | +| _gcs.etag_ | success | The HTTP 1.1 Entity tag for the object. | +| _gcs.generated.id_ | success | The service-generated ID for the object | +| _gcs.generation_ | success | The content generation of this object. Used for object versioning. | +| _gcs.metageneration_ | success | The metageneration of the object. | +| _gcs.create.time_ | success | The creation time of the object (milliseconds) | +| _gcs.update.time_ | success | The last modification time of the object (milliseconds) | +| _gcs.delete.time_ | success | The deletion time of the object (milliseconds) | +| _gcs.encryption.algorithm_ | success | The algorithm used to encrypt the object. | +| _gcs.encryption.sha256_ | success | The SHA256 hash of the key used to encrypt the object | + + ## PutFile ### Description diff --git a/README.md b/README.md index 74de4a0533..f4b92c018a 100644 --- a/README.md +++ b/README.md @@ -63,41 +63,42 @@ MiNiFi - C++ supports the following C++ processors: The following table lists the base set of processors. -| Extension Set | Processors | -| ------------- |:-------------| -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[ListenSyslog](PROCESSORS.md#listensyslog)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) +| Extension Set | Processors | +|---------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[ListenSyslog](PROCESSORS.md#listensyslog)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) | The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line. Through JNI extensions you can run NiFi processors using NARs. The JNI extension set allows you to run these Java processors. MiNiFi C++ will favor C++ implementations over Java implements. In the case where a processor is implemented in either language, the one in C++ will be selected; however, will remain transparent to the consumer. -| Extension Set | Processors and Controller Services | CMAKE Flag | -| ------------- |:-------------| :-----| -| Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)
[CompressContent](PROCESSORS.md#compresscontent)
[ManipulateArchive](PROCESSORS.md#manipulatearchive)
[MergeContent](PROCESSORS.md#mergecontent)
[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)
[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | -| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)
[PutS3Object](PROCESSORS.md#puts3object)
[DeleteS3Object](PROCESSORS.md#deletes3object)
[FetchS3Object](PROCESSORS.md#fetchs3object)
[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | -| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)
[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)
[DeleteAzureBlobStorage](#deleteazureblobstorage)
[FetchAzureBlobStorage](#fetchazureblobstorage)
[PutAzureDataLakeStorage](#putazuredatalakestorage)
[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
[ListAzureDataLakeStorage](#listazuredatalakestorage) | -DENABLE_AZURE=ON | -| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON | -| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | -| GPS | GetGPS | -DENABLE_GPS=ON | -| Kafka | [PublishKafka](PROCESSORS.md#publishkafka) | -DENABLE_LIBRDKAFKA=ON | -| Kubernetes | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService) | -DENABLE_KUBERNETES=ON | -| JNI | **NiFi Processors** | -DENABLE_JNI=ON | -| MQTT | [ConsumeMQTT](PROCESSORS.md#consumeMQTT)
[PublishMQTT](PROCESSORS.md#publishMQTT) | -DENABLE_MQTT=ON | -| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor) | -DENABLE_OPC=ON | -| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame) | -DENABLE_OPENCV=ON | -| OpenWSMAN | SourceInitiatedSubscriptionListener | -DENABLE_OPENWSMAN=ON | -| PCAP | [CapturePacket](PROCESSORS.md#capturepacket) | -DENABLE_PCAP=ON | -| PDH (Windows only) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON | -| Scripting | [ExecuteScript](PROCESSORS.md#executescript)
**Custom Python Processors** | -DENABLE_SCRIPTING=ON | -| Sensors | GetEnvironmentalSensors
GetMovementSensors | -DENABLE_SENSORS=ON | -| SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)
[ListSFTP](PROCESSORS.md#listsftp)
[PutSFTP](PROCESSORS.md#putsftp) | -DENABLE_SFTP=ON | -| SQL | [ExecuteSQL](PROCESSORS.md#executesql)
[PutSQL](PROCESSORS.md#putsql)
[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)
| -DENABLE_SQL=ON | -| Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)
[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus)| -DENABLE_SPLUNK=ON | -| Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) | -DENABLE_SYSTEMD=ON | -| Tensorflow | TFApplyGraph
TFConvertImageToTensor
TFExtractTopLabels
| -DENABLE_TENSORFLOW=ON | -| USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) | -DENABLE_USB_CAMERA=ON | -| Windows Event Log (Windows only) | CollectorInitiatedSubscription
ConsumeWindowsEventLog
TailEventLog | -DENABLE_WEL=ON | +| Extension Set | Processors and Controller Services | CMAKE Flag | +|----------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------------| +| Archive Extensions | [ApplyTemplate](PROCESSORS.md#applytemplate)
[CompressContent](PROCESSORS.md#compresscontent)
[ManipulateArchive](PROCESSORS.md#manipulatearchive)
[MergeContent](PROCESSORS.md#mergecontent)
[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)
[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry) | -DBUILD_LIBARCHIVE=ON | +| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)
[PutS3Object](PROCESSORS.md#puts3object)
[DeleteS3Object](PROCESSORS.md#deletes3object)
[FetchS3Object](PROCESSORS.md#fetchs3object)
[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON | +| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)
[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)
[DeleteAzureBlobStorage](#deleteazureblobstorage)
[FetchAzureBlobStorage](#fetchazureblobstorage)
[PutAzureDataLakeStorage](#putazuredatalakestorage)
[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)
[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
[ListAzureDataLakeStorage](#listazuredatalakestorage) | -DENABLE_AZURE=ON | +| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON | +| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON | +| GPS | GetGPS | -DENABLE_GPS=ON | +| Google Cloud Platform | [GcpCredentialsControllerService](CONTROLLERS.md#GcpCredentialsControllerService)
[PutGCSObject](PROCESSORS.md#putgcsobject) | -DENABLE_GCP=ON | +| Kafka | [PublishKafka](PROCESSORS.md#publishkafka) | -DENABLE_LIBRDKAFKA=ON | +| Kubernetes | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService) | -DENABLE_KUBERNETES=ON | +| JNI | **NiFi Processors** | -DENABLE_JNI=ON | +| MQTT | [ConsumeMQTT](PROCESSORS.md#consumeMQTT)
[PublishMQTT](PROCESSORS.md#publishMQTT) | -DENABLE_MQTT=ON | +| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor) | -DENABLE_OPC=ON | +| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame) | -DENABLE_OPENCV=ON | +| OpenWSMAN | SourceInitiatedSubscriptionListener | -DENABLE_OPENWSMAN=ON | +| PCAP | [CapturePacket](PROCESSORS.md#capturepacket) | -DENABLE_PCAP=ON | +| PDH (Windows only) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON | +| Scripting | [ExecuteScript](PROCESSORS.md#executescript)
**Custom Python Processors** | -DENABLE_SCRIPTING=ON | +| Sensors | GetEnvironmentalSensors
GetMovementSensors | -DENABLE_SENSORS=ON | +| SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)
[ListSFTP](PROCESSORS.md#listsftp)
[PutSFTP](PROCESSORS.md#putsftp) | -DENABLE_SFTP=ON | +| SQL | [ExecuteSQL](PROCESSORS.md#executesql)
[PutSQL](PROCESSORS.md#putsql)
[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)
| -DENABLE_SQL=ON | +| Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)
[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus) | -DENABLE_SPLUNK=ON | +| Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) | -DENABLE_SYSTEMD=ON | +| Tensorflow | TFApplyGraph
TFConvertImageToTensor
TFExtractTopLabels
| -DENABLE_TENSORFLOW=ON | +| USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) | -DENABLE_USB_CAMERA=ON | +| Windows Event Log (Windows only) | CollectorInitiatedSubscription
ConsumeWindowsEventLog
TailEventLog | -DENABLE_WEL=ON | Please see our [Python guide](extensions/script/README.md) on how to write Python processors and use them within MiNiFi C++. diff --git a/bootstrap.sh b/bootstrap.sh index 6494ae3af7..4977b36ee9 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -336,6 +336,8 @@ set_dependency PYTHON_ENABLED NANOFI_ENABLED add_disabled_option SPLUNK_ENABLED ${FALSE} "ENABLE_SPLUNK" +add_disabled_option GCP_ENABLED ${FALSE} "ENABLE_GCP" + USE_SHARED_LIBS=${TRUE} ASAN_ENABLED=${FALSE} FAIL_ON_WARNINGS=${FALSE} diff --git a/bstrp_functions.sh b/bstrp_functions.sh index 93e4c7d1e2..501ddc39c5 100755 --- a/bstrp_functions.sh +++ b/bstrp_functions.sh @@ -393,6 +393,7 @@ show_supported_features() { echo "Z. NanoFi Support ..............$(print_feature_status NANOFI_ENABLED)" echo "AA. Splunk Support .............$(print_feature_status SPLUNK_ENABLED)" echo "AB. Kubernetes Support .........$(print_feature_status KUBERNETES_ENABLED)" + echo "AC. Google Cloud Support .......$(print_feature_status GCP_ENABLED)" echo "****************************************" echo " Build Options." echo "****************************************" @@ -415,7 +416,7 @@ show_supported_features() { read_feature_options(){ local choice - echo -n "Enter choice [A-Z or AA-AB or 1-7] " + echo -n "Enter choice [A-Z or AA-AC or 1-7] " read -r choice choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]') case $choice in @@ -449,6 +450,7 @@ read_feature_options(){ z) ToggleFeature NANOFI_ENABLED ;; aa) ToggleFeature SPLUNK_ENABLED ;; ab) ToggleFeature KUBERNETES_ENABLED ;; + ac) ToogleFeature GCP_ENABLED ;; 1) ToggleFeature TESTS_ENABLED ;; 2) EnableAllFeatures ;; 3) ToggleFeature JNI_ENABLED;; @@ -467,7 +469,7 @@ read_feature_options(){ fi ;; q) exit 0;; - *) echo -e "${RED}Please enter an option A-Z or AA-AB or 1-7...${NO_COLOR}" && sleep 2 + *) echo -e "${RED}Please enter an option A-Z or AA-AC or 1-7...${NO_COLOR}" && sleep 2 esac } diff --git a/cmake/Abseil.cmake b/cmake/Abseil.cmake new file mode 100644 index 0000000000..5df8063bac --- /dev/null +++ b/cmake/Abseil.cmake @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(FetchContent) +set(ABSL_PROPAGATE_CXX_STD ON CACHE INTERNAL absl-propagate-cxx) +set(ABSL_ENABLE_INSTALL ON CACHE INTERNAL "") +FetchContent_Declare( + absl + URL https://github.com/abseil/abseil-cpp/archive/refs/tags/20211102.0.tar.gz + URL_HASH SHA256=dcf71b9cba8dc0ca9940c4b316a0c796be8fab42b070bb6b7cab62b48f0e66c4 +) +FetchContent_MakeAvailable(absl) diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake index d49bccfa24..884c4e4c69 100644 --- a/cmake/DockerConfig.cmake +++ b/cmake/DockerConfig.cmake @@ -45,6 +45,7 @@ add_custom_target( -c ENABLE_ENCRYPT_CONFIG=${ENABLE_ENCRYPT_CONFIG} -c ENABLE_NANOFI=${ENABLE_NANOFI} -c ENABLE_SPLUNK=${ENABLE_SPLUNK} + -c ENABLE_GCP=${ENABLE_GCP} -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING} -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING} -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES} diff --git a/cmake/GoogleCloudCpp.cmake b/cmake/GoogleCloudCpp.cmake new file mode 100644 index 0000000000..9312647801 --- /dev/null +++ b/cmake/GoogleCloudCpp.cmake @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(FetchContent) +include(Nlohmann) +include(Abseil) + +set(GOOGLE_CLOUD_CPP_NLOHMANN_JSON_HEADER ${NLOHMANN_JSON_INCLUDE_DIR}) +set(CRC32C_USE_GLOG OFF CACHE INTERNAL crc32c-glog-off) +set(CRC32C_BUILD_TESTS OFF CACHE INTERNAL crc32c-gtest-off) +set(CRC32C_BUILD_BENCHMARKS OFF CACHE INTERNAL crc32-benchmarks-off) +set(CRC32C_INSTALL ON CACHE INTERNAL crc32-install-on) +FetchContent_Declare( + crc32c + URL https://github.com/google/crc32c/archive/refs/tags/1.1.1.tar.gz + URL_HASH SHA256=a6533f45b1670b5d59b38a514d82b09c6fb70cc1050467220216335e873074e8 +) +FetchContent_MakeAvailable(crc32c) +add_library(Crc32c::crc32c ALIAS crc32c) + +set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/google-cloud-cpp/remove-find_package.patch") +set(PATCH_FILE_2 "${CMAKE_SOURCE_DIR}/thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch") +set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\") &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\")") + +set(GOOGLE_CLOUD_CPP_ENABLE storage CACHE INTERNAL storage-api) +set(GOOGLE_CLOUD_CPP_ENABLE_MACOS_OPENSSL_CHECK OFF CACHE INTERNAL macos-openssl-check) +set(BUILD_TESTING OFF CACHE INTERNAL testing-off) +FetchContent_Declare(google-cloud-cpp + URL https://github.com/googleapis/google-cloud-cpp/archive/refs/tags/v1.37.0.tar.gz + URL_HASH SHA256=a7269b21d5e95bebff7833ebb602bcd5bcc79e82a59449cc5d5b350ff2f50bbc + PATCH_COMMAND "${PC}") +add_compile_definitions(_SILENCE_CXX20_REL_OPS_DEPRECATION_WARNING _SILENCE_CXX17_CODECVT_HEADER_DEPRECATION_WARNING CURL_STATICLIB) +FetchContent_MakeAvailable(google-cloud-cpp) diff --git a/cmake/Nlohmann.cmake b/cmake/Nlohmann.cmake new file mode 100644 index 0000000000..9ad44c56cf --- /dev/null +++ b/cmake/Nlohmann.cmake @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +set(NLOHMANN_JSON_INCLUDE_DIR "${CMAKE_BINARY_DIR}/_deps/nlohmann/" CACHE STRING "" FORCE) +if(NOT EXISTS "${NLOHMANN_JSON_INCLUDE_DIR}/nlohmann/json.hpp") + file(DOWNLOAD "https://github.com/nlohmann/json/releases/download/v3.10.5/json.hpp" "${NLOHMANN_JSON_INCLUDE_DIR}/nlohmann/json.hpp" + EXPECTED_HASH SHA256=e832d339d9e0c042e7dff807754769d778cf5d6ae9730ce21eed56de99cb5e86) +endif() diff --git a/docker/Dockerfile b/docker/Dockerfile index 6eea15d236..4e35c6374d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -51,6 +51,7 @@ ARG ENABLE_AZURE=OFF ARG ENABLE_ENCRYPT_CONFIG=ON ARG ENABLE_NANOFI=OFF ARG ENABLE_SPLUNK=OFF +ARG ENABLE_GCP=OFF ARG ENABLE_TEST_PROCESSORS=OFF ARG DISABLE_CURL=OFF ARG DISABLE_JEMALLOC=ON @@ -123,7 +124,7 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL -DENABLE_TEST_PROCESSORS="${ENABLE_TEST_PROCESSORS}" -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \ -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \ -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \ - -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" \ + -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" \ -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \ make -j "$(nproc)" package && \ tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C "${MINIFI_BASE_DIR}" diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py index 0f2d4b0803..138e4fe48a 100644 --- a/docker/test/integration/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/MiNiFi_integration_test_driver.py @@ -219,6 +219,12 @@ def check_splunk_event(self, splunk_container_name, query): def check_splunk_event_with_attributes(self, splunk_container_name, query, attributes): assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes) + def check_google_cloud_storage(self, gcs_container_name, content): + assert self.cluster.check_google_cloud_storage(gcs_container_name, content) + + def check_empty_gcs_bucket(self, gcs_container_name): + assert self.cluster.is_gcs_bucket_empty(gcs_container_name) + def check_minifi_log_contents(self, line, timeout_seconds=60, count=1): self.check_container_log_contents("minifi-cpp", line, timeout_seconds, count) diff --git a/docker/test/integration/features/google_cloud_storage.feature b/docker/test/integration/features/google_cloud_storage.feature new file mode 100644 index 0000000000..18af9b1402 --- /dev/null +++ b/docker/test/integration/features/google_cloud_storage.feature @@ -0,0 +1,19 @@ +Feature: Sending data to Google Cloud Storage using PutGCSObject + + Background: + Given the content of "/tmp/output" is monitored + + Scenario: A MiNiFi instance can upload data to Google Cloud storage + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "hello_gcs" is present in "/tmp/input" + And a Google Cloud storage server is set up + And a PutGCSObject processor + And the PutGCSObject processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GetFile processor is connected to the PutGCSObject + And the "success" relationship of the PutGCSObject processor is connected to the PutFile + + When all instances start up + + Then a flowfile with the content "hello_gcs" is placed in the monitored directory in less than 45 seconds + And an object with the content "hello_gcs" is present in the Google Cloud storage diff --git a/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py b/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py new file mode 100644 index 0000000000..56b260f263 --- /dev/null +++ b/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py @@ -0,0 +1,17 @@ +from ..core.ControllerService import ControllerService + + +class GCPCredentialsControllerService(ControllerService): + def __init__(self, name=None, credentials_location=None, json_path=None, raw_json=None): + super(GCPCredentialsControllerService, self).__init__(name=name) + + self.service_class = 'GCPCredentialsControllerService' + + if credentials_location is not None: + self.properties['Credentials Location'] = credentials_location + + if json_path is not None: + self.properties['Service Account JSON File'] = json_path + + if raw_json is not None: + self.properties['Service Account JSON'] = raw_json diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py index be2a32b568..1a8a7e5e4d 100644 --- a/docker/test/integration/minifi/core/DockerTestCluster.py +++ b/docker/test/integration/minifi/core/DockerTestCluster.py @@ -264,6 +264,16 @@ def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, "-auth", "admin:splunkadmin"]) return code == 0 + @retry_check() + def check_google_cloud_storage(self, gcs_container_name, content): + (code, output) = self.client.containers.get(gcs_container_name).exec_run(["grep", "-r", content, "/storage"]) + return code == 0 + + @retry_check() + def is_gcs_bucket_empty(self, container_name): + (code, output) = self.client.containers.get(container_name).exec_run(["ls", "/storage/test-bucket"]) + return code == 0 and output == b'' + def query_postgres_server(self, postgresql_container_name, query, number_of_rows): (code, output) = self.client.containers.get(postgresql_container_name).exec_run(["psql", "-U", "postgres", "-c", query]) output = output.decode(self.get_stdout_encoding()) diff --git a/docker/test/integration/minifi/core/FakeGcsServerContainer.py b/docker/test/integration/minifi/core/FakeGcsServerContainer.py new file mode 100644 index 0000000000..41af004063 --- /dev/null +++ b/docker/test/integration/minifi/core/FakeGcsServerContainer.py @@ -0,0 +1,27 @@ +import logging +import os +from .Container import Container + + +class FakeGcsServerContainer(Container): + def __init__(self, name, vols, network, image_store, command=None): + super().__init__(name, 'fake-gcs-server', vols, network, image_store, command) + + def get_startup_finished_log_entry(self): + return "server started at http" + + def deploy(self): + if not self.set_deployed(): + return + + logging.info('Creating and running google cloud storage server docker container...') + self.client.containers.run( + "fsouza/fake-gcs-server:latest", + detach=True, + name=self.name, + network=self.network.name, + entrypoint=self.command, + ports={'4443/tcp': 4443}, + volumes=[os.environ['TEST_DIRECTORY'] + "/resources/fake-gcs-server-data:/data"], + command='-scheme http -host fake-gcs-server') + logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py index a72e972ed8..17dd50bc53 100644 --- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py +++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py @@ -27,6 +27,7 @@ from .KafkaBrokerContainer import KafkaBrokerContainer from .S3ServerContainer import S3ServerContainer from .AzureStorageServerContainer import AzureStorageServerContainer +from .FakeGcsServerContainer import FakeGcsServerContainer from .HttpProxyContainer import HttpProxyContainer from .PostgreSQLServerContainer import PostgreSQLServerContainer from .MqttBrokerContainer import MqttBrokerContainer @@ -103,6 +104,8 @@ def acquire_container(self, name, engine='minifi-cpp', command=None): return self.containers.setdefault(name, S3ServerContainer(name, self.vols, self.network, self.image_store, command)) elif engine == 'azure-storage-server': return self.containers.setdefault(name, AzureStorageServerContainer(name, self.vols, self.network, self.image_store, command)) + elif engine == 'fake-gcs-server': + return self.containers.setdefault(name, FakeGcsServerContainer(name, self.vols, self.network, self.image_store, command)) elif engine == 'postgresql-server': return self.containers.setdefault(name, PostgreSQLServerContainer(name, self.vols, self.network, self.image_store, command)) elif engine == 'mqtt-broker': diff --git a/docker/test/integration/minifi/processors/PutGCSObject.py b/docker/test/integration/minifi/processors/PutGCSObject.py new file mode 100644 index 0000000000..3dd5c7c514 --- /dev/null +++ b/docker/test/integration/minifi/processors/PutGCSObject.py @@ -0,0 +1,14 @@ +from ..core.Processor import Processor + + +class PutGCSObject(Processor): + def __init__( + self): + super(PutGCSObject, self).__init__( + 'PutGCSObject', + properties={ + 'Bucket': 'test-bucket', + 'Endpoint Override URL': 'fake-gcs-server:4443', + 'Number of retries': 2 + }, + auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file b/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file new file mode 100644 index 0000000000..1aee3ef7f2 --- /dev/null +++ b/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file @@ -0,0 +1 @@ +preloaded data diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py index 31dbf5f026..ac005066a9 100644 --- a/docker/test/integration/steps/steps.py +++ b/docker/test/integration/steps/steps.py @@ -20,6 +20,7 @@ from minifi.core.Funnel import Funnel from minifi.controllers.SSLContextService import SSLContextService +from minifi.controllers.GCPCredentialsControllerService import GCPCredentialsControllerService from minifi.controllers.ODBCService import ODBCService from minifi.controllers.KubernetesControllerService import KubernetesControllerService @@ -389,6 +390,13 @@ def step_impl(context): context.test.acquire_container("azure-storage-server", "azure-storage-server") +# google cloud storage setup +@given("a Google Cloud storage server is set up with some test data") +@given("a Google Cloud storage server is set up") +def step_impl(context): + context.test.acquire_container("fake-gcs-server", "fake-gcs-server") + + # splunk hec @given("a Splunk HEC is set up and running") def step_impl(context): @@ -417,6 +425,14 @@ def step_impl(context): context.test.cluster.enable_splunk_hec_ssl('splunk', dump_certificate(splunk_cert), dump_privatekey(splunk_key), dump_certificate(root_ca_cert)) +@given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') +def step_impl(context, processor_one): + gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") + p1 = context.test.get_node_by_name(processor_one) + p1.controller_services.append(gcp_controller_service) + p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name) + + @given("the kafka broker is started") def step_impl(context): context.test.start_kafka_broker() @@ -742,6 +758,17 @@ def step_impl(context): context.test.start() +# Google Cloud Storage +@then('an object with the content \"{content}\" is present in the Google Cloud storage') +def step_imp(context, content): + context.test.check_google_cloud_storage("fake-gcs-server", content) + + +@then("the test bucket of Google Cloud Storage is empty") +def step_impl(context): + context.test.check_empty_gcs_bucket("fake-gcs-server") + + # Splunk @then('an event is registered in Splunk HEC with the content \"{content}\"') def step_imp(context, content): diff --git a/extensions/gcp/CMakeLists.txt b/extensions/gcp/CMakeLists.txt new file mode 100644 index 0000000000..5907a89f7a --- /dev/null +++ b/extensions/gcp/CMakeLists.txt @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(GoogleCloudCpp) +include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) +file(GLOB SOURCES "*.cpp" "controllerservices/*.cpp" "processors/*.cpp") + +add_library(minifi-gcp SHARED ${SOURCES}) + +target_link_libraries(minifi-gcp ${LIBMINIFI} google-cloud-cpp::storage) +target_include_directories(minifi-gcp PUBLIC ${google-cloud-cpp_INCLUDE_DIRS}) + +register_extension(minifi-gcp) + +register_extension_linter(minifi-gcp-extensions-linter) diff --git a/extensions/gcp/GCPAttributes.h b/extensions/gcp/GCPAttributes.h new file mode 100644 index 0000000000..92548fd04e --- /dev/null +++ b/extensions/gcp/GCPAttributes.h @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "google/cloud/storage/object_metadata.h" +#include "core/FlowFile.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +constexpr const char* GCS_ERROR_REASON = "gcs.error.reason"; +constexpr const char* GCS_ERROR_DOMAIN = "gcs.error.domain"; +constexpr const char* GCS_BUCKET_ATTR = "gcs.bucket"; +constexpr const char* GCS_OBJECT_NAME_ATTR = "gcs.key"; +constexpr const char* GCS_SIZE_ATTR = "gcs.size"; +constexpr const char* GCS_CRC32C_ATTR = "gcs.crc32c"; +constexpr const char* GCS_MD5_ATTR = "gcs.md5"; +constexpr const char* GCS_OWNER_ENTITY_ATTR = "gcs.owner.entity"; +constexpr const char* GCS_OWNER_ENTITY_ID_ATTR = "gcs.owner.entity.id"; +constexpr const char* GCS_MEDIA_LINK_ATTR = "gcs.media.link"; +constexpr const char* GCS_ETAG_ATTR = "gcs.etag"; +constexpr const char* GCS_GENERATED_ID = "gcs.generated.id"; +constexpr const char* GCS_GENERATION = "gcs.generation"; +constexpr const char* GCS_META_GENERATION = "gcs.metageneration"; +constexpr const char* GCS_STORAGE_CLASS = "gcs.storage.class"; +constexpr const char* GCS_CONTENT_ENCODING_ATTR = "gcs.content.encoding"; +constexpr const char* GCS_CONTENT_LANGUAGE_ATTR = "gcs.content.language"; +constexpr const char* GCS_CONTENT_DISPOSITION_ATTR = "gcs.content.disposition"; +constexpr const char* GCS_CREATE_TIME_ATTR = "gcs.create.time"; +constexpr const char* GCS_DELETE_TIME_ATTR = "gcs.delete.time"; +constexpr const char* GCS_UPDATE_TIME_ATTR = "gcs.update.time"; +constexpr const char* GCS_SELF_LINK_ATTR = "gcs.self.link"; +constexpr const char* GCS_ENCRYPTION_ALGORITHM_ATTR = "gcs.encryption.algorithm"; +constexpr const char* GCS_ENCRYPTION_SHA256_ATTR = "gcs.encryption.sha256"; + +inline void setAttributesFromObjectMetadata(core::FlowFile& flow_file, const ::google::cloud::storage::ObjectMetadata& object_metadata) { + flow_file.setAttribute(GCS_BUCKET_ATTR, object_metadata.bucket()); + flow_file.setAttribute(GCS_OBJECT_NAME_ATTR, object_metadata.name()); + flow_file.setAttribute(GCS_SIZE_ATTR, std::to_string(object_metadata.size())); + flow_file.setAttribute(GCS_CRC32C_ATTR, object_metadata.crc32c()); + flow_file.setAttribute(GCS_MD5_ATTR, object_metadata.md5_hash()); + flow_file.setAttribute(GCS_CONTENT_ENCODING_ATTR, object_metadata.content_encoding()); + flow_file.setAttribute(GCS_CONTENT_LANGUAGE_ATTR, object_metadata.content_language()); + flow_file.setAttribute(GCS_CONTENT_DISPOSITION_ATTR, object_metadata.content_disposition()); + flow_file.setAttribute(GCS_CREATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_created().time_since_epoch()).count())); + flow_file.setAttribute(GCS_UPDATE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.updated().time_since_epoch()).count())); + flow_file.setAttribute(GCS_DELETE_TIME_ATTR, std::to_string(std::chrono::duration_cast(object_metadata.time_deleted().time_since_epoch()).count())); + flow_file.setAttribute(GCS_MEDIA_LINK_ATTR, object_metadata.media_link()); + flow_file.setAttribute(GCS_SELF_LINK_ATTR, object_metadata.self_link()); + flow_file.setAttribute(GCS_ETAG_ATTR, object_metadata.etag()); + flow_file.setAttribute(GCS_GENERATED_ID, object_metadata.id()); + flow_file.setAttribute(GCS_META_GENERATION, std::to_string(object_metadata.metageneration())); + flow_file.setAttribute(GCS_GENERATION, std::to_string(object_metadata.generation())); + flow_file.setAttribute(GCS_STORAGE_CLASS, object_metadata.storage_class()); + if (object_metadata.has_customer_encryption()) { + flow_file.setAttribute(GCS_ENCRYPTION_ALGORITHM_ATTR, object_metadata.customer_encryption().encryption_algorithm); + flow_file.setAttribute(GCS_ENCRYPTION_SHA256_ATTR, object_metadata.customer_encryption().key_sha256); + } + if (object_metadata.has_owner()) { + flow_file.setAttribute(GCS_OWNER_ENTITY_ATTR, object_metadata.owner().entity); + flow_file.setAttribute(GCS_OWNER_ENTITY_ID_ATTR, object_metadata.owner().entity_id); + } +} + +} // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp new file mode 100644 index 0000000000..860a43afd8 --- /dev/null +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.cpp @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "GCPCredentialsControllerService.h" + +#include "core/Resource.h" +#include "google/cloud/storage/client.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { + +const core::Property GCPCredentialsControllerService::CredentialsLoc( + core::PropertyBuilder::createProperty("Credentials Location") + ->withDescription("The location of the credentials.") + ->withAllowableValues(CredentialsLocation::values()) + ->withDefaultValue(toString(CredentialsLocation::USE_DEFAULT_CREDENTIALS)) + ->isRequired(true) + ->build()); + +const core::Property GCPCredentialsControllerService::JsonFilePath( + core::PropertyBuilder::createProperty("Service Account JSON File") + ->withDescription("Path to a file containing a Service Account key file in JSON format.") + ->isRequired(false) + ->build()); + +const core::Property GCPCredentialsControllerService::JsonContents( + core::PropertyBuilder::createProperty("Service Account JSON") + ->withDescription("The raw JSON containing a Service Account keyfile.") + ->isRequired(false) + ->build()); + +void GCPCredentialsControllerService::initialize() { + setSupportedProperties({CredentialsLoc, JsonFilePath, JsonContents}); +} + +std::shared_ptr GCPCredentialsControllerService::createDefaultCredentials() const { + auto default_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromDefaultPaths(); + if (!default_credentials.ok()) { + logger_->log_error(default_credentials.status().message().c_str()); + return nullptr; + } + return *default_credentials; +} + +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonPath() const { + std::string json_path; + if (!getProperty(JsonFilePath.getName(), json_path)) { + logger_->log_error("Missing or invalid %s", JsonFilePath.getName()); + return nullptr; + } + + auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonFilePath(json_path); + if (!json_path_credentials.ok()) { + logger_->log_error(json_path_credentials.status().message().c_str()); + return nullptr; + } + return *json_path_credentials; +} + +std::shared_ptr GCPCredentialsControllerService::createCredentialsFromJsonContents() const { + std::string json_contents; + if (!getProperty(JsonContents.getName(), json_contents)) { + logger_->log_error("Missing or invalid %s", JsonContents.getName()); + return nullptr; + } + + auto json_path_credentials = gcs::oauth2::CreateServiceAccountCredentialsFromJsonContents(json_contents); + if (!json_path_credentials.ok()) { + logger_->log_error(json_path_credentials.status().message().c_str()); + return nullptr; + } + return *json_path_credentials; +} + +void GCPCredentialsControllerService::onEnable() { + CredentialsLocation credentials_location; + if (!getProperty(CredentialsLoc.getName(), credentials_location)) { + logger_->log_error("Invalid Credentials Location, defaulting to %s", toString(CredentialsLocation::USE_DEFAULT_CREDENTIALS)); + credentials_location = CredentialsLocation::USE_DEFAULT_CREDENTIALS; + } + if (credentials_location == CredentialsLocation::USE_DEFAULT_CREDENTIALS) { + credentials_ = createDefaultCredentials(); + } else if (credentials_location == CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS) { + credentials_ = gcs::oauth2::CreateComputeEngineCredentials(); + } else if (credentials_location == CredentialsLocation::USE_JSON_FILE) { + credentials_ = createCredentialsFromJsonPath(); + } else if (credentials_location == CredentialsLocation::USE_JSON_CONTENTS) { + credentials_ = createCredentialsFromJsonContents(); + } else if (credentials_location == CredentialsLocation::USE_ANONYMOUS_CREDENTIALS) { + credentials_ = gcs::oauth2::CreateAnonymousCredentials(); + } + if (!credentials_) + logger_->log_error("Couldn't create valid credentials"); +} + +REGISTER_RESOURCE(GCPCredentialsControllerService, "Google Cloud Platform Credentials Controller Service"); +} // namespace org::apache::nifi::minifi::extensions::gcp + diff --git a/extensions/gcp/controllerservices/GCPCredentialsControllerService.h b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h new file mode 100644 index 0000000000..cc7a628c45 --- /dev/null +++ b/extensions/gcp/controllerservices/GCPCredentialsControllerService.h @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include + +#include "core/controller/ControllerService.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Enum.h" + +#include "google/cloud/storage/oauth2/credentials.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +class GCPCredentialsControllerService : public core::controller::ControllerService { + public: + SMART_ENUM(CredentialsLocation, + (USE_DEFAULT_CREDENTIALS, "Google Application Default Credentials"), + (USE_COMPUTE_ENGINE_CREDENTIALS, "Use Compute Engine Credentials"), + (USE_JSON_FILE, "Service Account JSON File"), + (USE_JSON_CONTENTS, "Service Account JSON"), + (USE_ANONYMOUS_CREDENTIALS, "Use Anonymous credentials")); + + EXTENSIONAPI static const core::Property CredentialsLoc; + EXTENSIONAPI static const core::Property JsonFilePath; + EXTENSIONAPI static const core::Property JsonContents; + + using ControllerService::ControllerService; + + void initialize() override; + + void yield() override { + } + + bool isWorkAvailable() override { + return false; + } + + bool isRunning() override { + return getState() == core::controller::ControllerServiceState::ENABLED; + } + + void onEnable() override; + + [[nodiscard]] const auto& getCredentials() const { return credentials_; } + + protected: + [[nodiscard]] std::shared_ptr createDefaultCredentials() const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonPath() const; + [[nodiscard]] std::shared_ptr createCredentialsFromJsonContents() const; + + + std::shared_ptr credentials_; + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; +} // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp new file mode 100644 index 0000000000..45d898d9b1 --- /dev/null +++ b/extensions/gcp/processors/PutGCSObject.cpp @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "PutGCSObject.h" + +#include +#include + +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "io/StreamPipe.h" +#include "utils/OptionalUtils.h" +#include "../GCPAttributes.h" + +namespace gcs = ::google::cloud::storage; + +namespace org::apache::nifi::minifi::extensions::gcp { +const core::Property PutGCSObject::GCPCredentials( + core::PropertyBuilder::createProperty("GCP Credentials Provider Service") + ->withDescription("The Controller Service used to obtain Google Cloud Platform credentials.") + ->isRequired(true) + ->asType() + ->build()); + +const core::Property PutGCSObject::Bucket( + core::PropertyBuilder::createProperty("Bucket") + ->withDescription("Bucket of the object.") + ->withDefaultValue("${gcs.bucket}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Key( + core::PropertyBuilder::createProperty("Name of the object.") + ->withDescription("Name of the object.") + ->withDefaultValue("${filename}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::NumberOfRetries( + core::PropertyBuilder::createProperty("Number of retries") + ->withDescription("How many retry attempts should be made before routing to the failure relationship.") + ->withDefaultValue(6) + ->isRequired(true) + ->supportsExpressionLanguage(false) + ->build()); + +const core::Property PutGCSObject::ContentType( + core::PropertyBuilder::createProperty("Content Type") + ->withDescription("Content Type for the file, i.e. text/plain ") + ->isRequired(false) + ->withDefaultValue("${mime.type}") + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::MD5Hash( + core::PropertyBuilder::createProperty("MD5 Hash") + ->withDescription("MD5 Hash (encoded in Base64) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::Crc32cChecksum( + core::PropertyBuilder::createProperty("CRC32C Checksum") + ->withDescription("CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::EncryptionKey( + core::PropertyBuilder::createProperty("Server Side Encryption Key") + ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Property PutGCSObject::ObjectACL( + core::PropertyBuilder::createProperty("Object ACL") + ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") + ->isRequired(false) + ->withAllowableValues(PredefinedAcl::values()) + ->build()); + +const core::Property PutGCSObject::OverwriteObject( + core::PropertyBuilder::createProperty("Overwrite Object") + ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") + ->withDefaultValue(true) + ->build()); + +const core::Property PutGCSObject::EndpointOverrideURL( + core::PropertyBuilder::createProperty("Endpoint Override URL") + ->withDescription("Overrides the default Google Cloud Storage endpoints") + ->isRequired(false) + ->supportsExpressionLanguage(true) + ->build()); + +const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); +const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); + + +namespace { +class UploadToGCSCallback : public InputStreamCallback { + public: + UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) + : bucket_(std::move(bucket)), + key_(std::move(key)), + client_(client) { + } + + int64_t process(const std::shared_ptr& stream) override { + std::string content; + content.resize(stream->size()); + const auto read_ret = stream->read(gsl::make_span(content).as_span()); + if (io::isError(read_ret)) { + return -1; + } + auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); + writer << content; + writer.Close(); + result_ = writer.metadata(); + return read_ret; + } + + [[nodiscard]] const google::cloud::StatusOr& getResult() const noexcept { + return result_; + } + + void setHashValue(const std::string& hash_value_str) { + hash_value_ = gcs::MD5HashValue(hash_value_str); + } + + void setCrc32CChecksumValue(const std::string& crc32c_checksum_str) { + crc32c_checksum_ = gcs::Crc32cChecksumValue(crc32c_checksum_str); + } + + void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { + encryption_key_ = encryption_key; + } + + void setPredefinedAcl(PutGCSObject::PredefinedAcl predefined_acl) { + predefined_acl_ = gcs::PredefinedAcl(predefined_acl.toString()); + } + + void setContentType(const std::string& content_type_str) { + content_type_ = gcs::ContentType(content_type_str); + } + + void setIfGenerationMatch(std::optional overwrite) { + if (overwrite.has_value() && overwrite.value() == false) { + if_generation_match_ = gcs::IfGenerationMatch(0); + } else { + if_generation_match_ = gcs::IfGenerationMatch(); + } + } + + private: + std::string bucket_; + std::string key_; + gcs::Client& client_; + + gcs::MD5HashValue hash_value_; + gcs::Crc32cChecksumValue crc32c_checksum_; + gcs::EncryptionKey encryption_key_; + gcs::PredefinedAcl predefined_acl_; + gcs::ContentType content_type_; + gcs::IfGenerationMatch if_generation_match_; + + google::cloud::StatusOr result_; +}; + +std::shared_ptr getCredentials(core::ProcessContext& context) { + std::string service_name; + if (context.getProperty(PutGCSObject::GCPCredentials.getName(), service_name) && !IsNullOrEmpty(service_name)) { + auto gcp_credentials_controller_service = std::dynamic_pointer_cast(context.getControllerService(service_name)); + if (!gcp_credentials_controller_service) + return nullptr; + return gcp_credentials_controller_service->getCredentials(); + } + return nullptr; +} +} // namespace + + +void PutGCSObject::initialize() { + setSupportedProperties({GCPCredentials, + Bucket, + Key, + NumberOfRetries, + ContentType, + MD5Hash, + Crc32cChecksum, + EncryptionKey, + ObjectACL, + OverwriteObject, + EndpointOverrideURL}); + setSupportedRelationships({Success, Failure}); +} + +gcs::Client PutGCSObject::getClient(const gcs::ClientOptions& options) const { + return gcs::Client(options, *retry_policy_); +} + + +void PutGCSObject::onSchedule(const std::shared_ptr& context, const std::shared_ptr&) { + gsl_Expects(context); + if (auto number_of_retries = context->getProperty(NumberOfRetries)) { + retry_policy_ = std::make_shared(*number_of_retries); + } + if (auto encryption_key = context->getProperty(EncryptionKey)) { + try { + encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); + } catch (const google::cloud::RuntimeStatusError&) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName()); + } + } + gcp_credentials_ = getCredentials(*context); + if (!gcp_credentials_) { + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing GCP Credentials"); + } +} + +void PutGCSObject::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { + gsl_Expects(context && session && gcp_credentials_); + + auto flow_file = session->get(); + if (!flow_file) { + context->yield(); + return; + } + + auto bucket = context->getProperty(Bucket, flow_file); + if (!bucket || bucket->empty()) { + logger_->log_error("Missing bucket name"); + session->transfer(flow_file, Failure); + return; + } + auto object_name = context->getProperty(Key, flow_file); + if (!object_name || object_name->empty()) { + logger_->log_error("Missing object name"); + session->transfer(flow_file, Failure); + return; + } + + auto options = gcs::ClientOptions(gcp_credentials_); + if (auto endpoint_override_url = context->getProperty(EndpointOverrideURL)) { + options.set_endpoint(*endpoint_override_url); + logger_->log_debug("Endpoint override url %s", *endpoint_override_url); + } + + gcs::Client client = getClient(options); + UploadToGCSCallback callback(client, *bucket, *object_name); + + if (auto crc32_checksum = context->getProperty(Crc32cChecksum, flow_file)) { + callback.setCrc32CChecksumValue(*crc32_checksum); + } + + if (auto md5_hash = context->getProperty(MD5Hash, flow_file)) { + callback.setHashValue(*md5_hash); + } + + auto content_type = context->getProperty(ContentType, flow_file); + if (content_type && !content_type->empty()) + callback.setContentType(*content_type); + + if (auto predefined_acl = context->getProperty(ObjectACL)) + callback.setPredefinedAcl(*predefined_acl); + callback.setIfGenerationMatch(context->getProperty(OverwriteObject)); + + callback.setEncryptionKey(encryption_key_); + + session->read(flow_file, &callback); + auto& result = callback.getResult(); + if (!result.ok()) { + flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason()); + flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain()); + logger_->log_error("Failed to upload to Google Cloud Storage %s %s", result.status().message(), result.status().error_info().reason()); + session->transfer(flow_file, Failure); + } else { + setAttributesFromObjectMetadata(*flow_file, *result); + session->transfer(flow_file, Success); + } +} + +REGISTER_RESOURCE(PutGCSObject, "Puts flow files to a Google Cloud Storage Bucket."); +} // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/processors/PutGCSObject.h b/extensions/gcp/processors/PutGCSObject.h new file mode 100644 index 0000000000..4539622449 --- /dev/null +++ b/extensions/gcp/processors/PutGCSObject.h @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "core/Processor.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "../controllerservices/GCPCredentialsControllerService.h" +#include "google/cloud/storage/client.h" +#include "google/cloud/storage/retry_policy.h" +#include "utils/Enum.h" + +namespace org::apache::nifi::minifi::extensions::gcp { + +class PutGCSObject : public core::Processor { + public: + SMART_ENUM(PredefinedAcl, + (AUTHENTICATED_READ, "authenticatedRead"), + (BUCKET_OWNER_FULL_CONTROL, "bucketOwnerFullControl"), + (BUCKET_OWNER_READ_ONLY, "bucketOwnerRead"), + (PRIVATE, "private"), + (PROJECT_PRIVATE, "projectPrivate"), + (PUBLIC_READ_ONLY, "publicRead"), + (PUBLIC_READ_WRITE, "publicReadWrite")); + + explicit PutGCSObject(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid) { + } + PutGCSObject(const PutGCSObject&) = delete; + PutGCSObject(PutGCSObject&&) = delete; + PutGCSObject& operator=(const PutGCSObject&) = delete; + PutGCSObject& operator=(PutGCSObject&&) = delete; + ~PutGCSObject() override = default; + + EXTENSIONAPI static const core::Property GCPCredentials; + EXTENSIONAPI static const core::Property Bucket; + EXTENSIONAPI static const core::Property Key; + EXTENSIONAPI static const core::Property NumberOfRetries; + EXTENSIONAPI static const core::Property ContentType; + EXTENSIONAPI static const core::Property MD5Hash; + EXTENSIONAPI static const core::Property Crc32cChecksum; + EXTENSIONAPI static const core::Property EncryptionKey; + EXTENSIONAPI static const core::Property ObjectACL; + EXTENSIONAPI static const core::Property OverwriteObject; + EXTENSIONAPI static const core::Property EndpointOverrideURL; + + EXTENSIONAPI static const core::Relationship Success; + EXTENSIONAPI static const core::Relationship Failure; + + void initialize() override; + void onSchedule(const std::shared_ptr &context, const std::shared_ptr &sessionFactory) override; + void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_REQUIRED; + } + + bool isSingleThreaded() const override { + return true; + } + + private: + virtual google::cloud::storage::Client getClient(const google::cloud::storage::ClientOptions& options) const; + + google::cloud::storage::EncryptionKey encryption_key_; + + std::shared_ptr gcp_credentials_; + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); + + protected: + google::cloud::storage::RetryPolicyOption::Type retry_policy_ = std::make_shared(6); +}; + +} // namespace org::apache::nifi::minifi::extensions::gcp diff --git a/extensions/gcp/tests/CMakeLists.txt b/extensions/gcp/tests/CMakeLists.txt new file mode 100644 index 0000000000..f12204d460 --- /dev/null +++ b/extensions/gcp/tests/CMakeLists.txt @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +include(GoogleTest) +include(FetchContent) +FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz + URL_HASH SHA256=b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5 +) +set(BUILD_GMOCK ON CACHE BOOL "" FORCE) +FetchContent_MakeAvailable(googletest) + +file(GLOB GCS_TESTS "*.cpp") +FOREACH(testfile ${GCS_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/expression-language") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/gcp") + target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/") + + target_include_directories(${testfilename} PRIVATE BEFORE ${googletest_INCLUDE_DIRS} ${googletest_SOURCE_DIR}/googletest/include ${googletest_SOURCE_DIR}/googlemock/include) + createTests("${testfilename}") + + target_link_libraries(${testfilename} minifi-gcp) + target_link_libraries(${testfilename} minifi-standard-processors) + target_link_libraries(${testfilename} minifi-expression-language-extensions) + target_link_libraries(${testfilename} gtest_main gmock) + + gtest_add_tests(TARGET "${testfilename}") +ENDFOREACH() diff --git a/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp new file mode 100644 index 0000000000..4eb97cddd5 --- /dev/null +++ b/extensions/gcp/tests/GCPCredentialsControllerServiceTests.cpp @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#define EXTENSION_LIST "minifi-gcp" + +#include "TestBase.h" +#include "gtest/gtest.h" +#include "../controllerservices/GCPCredentialsControllerService.h" +#include "core/Resource.h" +#include "core/Processor.h" +#include "core/controller/ControllerServiceNode.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" +#include "google/cloud/internal/setenv.h" + + +namespace gcs = ::google::cloud::storage; + +using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; + +namespace { + +std::string create_mock_service_json() { + rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType); + root.AddMember("type", "service_account", root.GetAllocator()); + root.AddMember("project_id", "mock_project_id", root.GetAllocator()); + root.AddMember("private_key_id", "my_private_key_id", root.GetAllocator()); + root.AddMember("private_key", "-----BEGIN RSA PRIVATE KEY-----\n" + "MIIBOgIBAAJBAKNhMsOin3GDkg9A9e3MN/jP0JL+cV8GscR5QGZPgbHI5Jtkt+4k\n" + "MJvO5i54cq+55wFF2Ux7Eekg6wYY+/cbzNUCAwEAAQJAdUUyS+xxp+f5sgs9uyeH\n" + "0YdRPgF23O+QL+ecixNsk2/qsH195ngMNHWQCEUPGbl8O7Lu9W1tmMgeN0cFzpxl\n" + "rQIhANM6NG7uofCzwCvcuuc8PCjbhQPkYn+fuytdrr9V4AlrAiEAxgKkh+iU6CO8\n" + "hzrqeO1zNQijJ4wQ3uuxMST7gx9x0r8CIACPkQASZh+reoEjTuO6RzacjpfaDWDl\n" + "XavZzdL4OWYNAiEAxA41zoPRfJB1yQfixV2Gpsooka2zvqVrypctmUT3NI0CIH2c\n" + "h9CRoySmUy6INeKMMgAPiVJB04exJvKPXr/DHi6D\n" + "-----END RSA PRIVATE KEY-----", root.GetAllocator()); + root.AddMember("client_email", "my_client_email", root.GetAllocator()); + root.AddMember("client_id", "my_client_id", root.GetAllocator()); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + root.Accept(writer); + return buffer.GetString(); +} + +std::optional create_mock_json_file(const std::filesystem::path& dir_path) { + std::filesystem::path path = dir_path / "mock_credentials.json"; + std::ofstream p{path}; + if (!p) + return std::nullopt; + p << create_mock_service_json(); + p.close(); + return path; +} + +class DummyProcessor : public org::apache::nifi::minifi::core::Processor { + using minifi::core::Processor::Processor; +}; + +REGISTER_RESOURCE(DummyProcessor, "A processor that does nothing."); +} // namespace + +class GCPCredentialsTests : public ::testing::Test { + protected: + void SetUp() override { + ASSERT_TRUE(gcp_credentials_node_); + ASSERT_TRUE(gcp_credentials_); + plan_->addProcessor("DummyProcessor", "dummy_processor"); + } + TestController test_controller_{}; + std::shared_ptr plan_ = test_controller_.createPlan(); + std::shared_ptr gcp_credentials_node_ = plan_->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); + std::shared_ptr gcp_credentials_ = std::dynamic_pointer_cast(gcp_credentials_node_->getControllerServiceImplementation()); +}; + +TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithoutEnv) { + google::cloud::internal::UnsetEnv("GOOGLE_APPLICATION_CREDENTIALS"); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_DEFAULT_CREDENTIALS)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, DefaultGCPCredentialsWithEnv) { + auto temp_directory = test_controller_.createTempDirectory(); + auto path = create_mock_json_file(temp_directory); + ASSERT_TRUE(path.has_value()); + google::cloud::internal::SetEnv("GOOGLE_APPLICATION_CREDENTIALS", path->string()); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_DEFAULT_CREDENTIALS)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, CredentialsFromJsonWithoutProperty) { + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_JSON_FILE)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, CredentialsFromJsonWithProperty) { + auto temp_directory = test_controller_.createTempDirectory(); + auto path = create_mock_json_file(temp_directory); + ASSERT_TRUE(path.has_value()); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_JSON_FILE)); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonFilePath.getName(), path->string()); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, CredentialsFromComputeEngineVM) { + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_COMPUTE_ENGINE_CREDENTIALS)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, AnonymousCredentials) { + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, CredentialsFromJsonContentsWithoutProperty) { + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_JSON_CONTENTS)); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_EQ(nullptr, gcp_credentials_->getCredentials()); +} + +TEST_F(GCPCredentialsTests, CredentialsFromJsonContentsWithProperty) { + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::CredentialsLoc.getName(), toString(GCPCredentialsControllerService::CredentialsLocation::USE_JSON_CONTENTS)); + plan_->setProperty(gcp_credentials_node_, GCPCredentialsControllerService::JsonContents.getName(), create_mock_service_json()); + ASSERT_NO_THROW(test_controller_.runSession(plan_)); + EXPECT_NE(nullptr, gcp_credentials_->getCredentials()); +} diff --git a/extensions/gcp/tests/PutGCSObjectTests.cpp b/extensions/gcp/tests/PutGCSObjectTests.cpp new file mode 100644 index 0000000000..97773f79a0 --- /dev/null +++ b/extensions/gcp/tests/PutGCSObjectTests.cpp @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "../processors/PutGCSObject.h" +#include "GCPAttributes.h" +#include "core/Resource.h" +#include "SingleInputTestController.h" +#include "ProcessContextExpr.h" +#include "google/cloud/storage/testing/mock_client.h" +#include "google/cloud/storage/internal/object_metadata_parser.h" +#include "google/cloud/storage/retry_policy.h" +#include "google/cloud/storage/testing/canonical_errors.h" + +namespace gcs = ::google::cloud::storage; +namespace minifi_gcp = org::apache::nifi::minifi::extensions::gcp; + +using PutGCSObject = org::apache::nifi::minifi::extensions::gcp::PutGCSObject; +using GCPCredentialsControllerService = org::apache::nifi::minifi::extensions::gcp::GCPCredentialsControllerService; +using ResumableUploadRequest = gcs::internal::ResumableUploadRequest; +using ResumableUploadResponse = gcs::internal::ResumableUploadResponse; +using ResumableUploadSession = gcs::internal::ResumableUploadSession; +using ::google::cloud::storage::testing::canonical_errors::TransientError; +using ::google::cloud::storage::testing::canonical_errors::PermanentError; + +namespace { +class PutGCSObjectMocked : public PutGCSObject { + using org::apache::nifi::minifi::extensions::gcp::PutGCSObject::PutGCSObject; + public: + gcs::Client getClient(const gcs::ClientOptions&) const override { + return gcs::testing::ClientFromMock(mock_client_, *retry_policy_); + } + std::shared_ptr mock_client_ = std::make_shared(); +}; +REGISTER_RESOURCE(PutGCSObjectMocked, "PutGCSObjectMocked"); +} // namespace + +class PutGCSObjectTests : public ::testing::Test { + public: + void SetUp() override { + gcp_credentials_node_ = test_controller_.plan->addController("GCPCredentialsControllerService", "gcp_credentials_controller_service"); + test_controller_.plan->setProperty(gcp_credentials_node_, + GCPCredentialsControllerService::CredentialsLoc.getName(), + toString(GCPCredentialsControllerService::CredentialsLocation::USE_ANONYMOUS_CREDENTIALS)); + test_controller_.plan->setProperty(put_gcs_object_, + PutGCSObject::GCPCredentials.getName(), + "gcp_credentials_controller_service"); + } + std::shared_ptr put_gcs_object_ = std::make_shared("PutGCSObjectMocked"); + org::apache::nifi::minifi::test::SingleInputTestController test_controller_{put_gcs_object_}; + std::shared_ptr gcp_credentials_node_; + + static auto return_upload_in_progress() { + return testing::Return(google::cloud::make_status_or(ResumableUploadResponse{"fake-url", ResumableUploadResponse::kInProgress, 0, {}, {}})); + } + + static auto return_upload_done(const ResumableUploadRequest& request) { + using ObjectMetadataParser = gcs::internal::ObjectMetadataParser; + nlohmann::json metadata_json; + metadata_json["name"] = request.object_name(); + metadata_json["bucket"] = request.bucket_name(); + metadata_json["size"] = 10; + if (request.HasOption()) { + metadata_json["customerEncryption"]["encryptionAlgorithm"] = "AES256"; + metadata_json["customerEncryption"]["keySha256"] = "zkeXIcAB56dkHp0z1023TQZ+mzm+fZ5JRVgmAQ3bEVE="; + } + return testing::Return(google::cloud::make_status_or(ResumableUploadResponse{"fake-url", + ResumableUploadResponse::kDone, 0, + *ObjectMetadataParser::FromJson(metadata_json), {}})); + } +}; + +TEST_F(PutGCSObjectTests, MissingBucket) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession).Times(0); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "")); + const auto& result = test_controller_.trigger("hello world"); + EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); + ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN)); + EXPECT_EQ(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON)); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0])); +} + +TEST_F(PutGCSObjectTests, BucketFromAttribute) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_EQ("bucket-from-attribute", request.bucket_name()); + + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "${gcs.bucket}")); + const auto& result = test_controller_.trigger("hello world", {{minifi_gcp::GCS_BUCKET_ATTR, "bucket-from-attribute"}}); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, ServerGivesTransientErrors) { + auto return_temp_error = [](ResumableUploadRequest const&) { + return google::cloud::StatusOr>( + TransientError()); + }; + + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce(return_temp_error) + .WillOnce(return_temp_error) + .WillOnce(return_temp_error); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::NumberOfRetries.getName(), "2")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world"); + EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); + ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN)); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON)); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0])); +} + +TEST_F(PutGCSObjectTests, ServerGivesPermaError) { + auto return_permanent_error = [](ResumableUploadRequest const&) { + return google::cloud::StatusOr>( + PermanentError()); + }; + + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce(return_permanent_error); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world"); + EXPECT_EQ(0, result.at(PutGCSObject::Success).size()); + ASSERT_EQ(1, result.at(PutGCSObject::Failure).size()); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_DOMAIN)); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Failure)[0]->getAttribute(minifi_gcp::GCS_ERROR_REASON)); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Failure)[0])); +} + +TEST_F(PutGCSObjectTests, NonRequiredPropertiesAreMissing) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + EXPECT_FALSE(request.HasOption()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world"); + EXPECT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); +} + +TEST_F(PutGCSObjectTests, Crc32cMD5LocationTest) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("yZRlqg==", request.GetOption().value()); + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("XrY7u+Ae7tCTyyK7j1rNww==", request.GetOption().value()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::MD5Hash.getName(), "${md5}")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Crc32cChecksum.getName(), "${crc32c}")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}}); + EXPECT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); +} + +TEST_F(PutGCSObjectTests, DontOverwriteTest) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::OverwriteObject.getName(), "false")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world", {{"crc32c", "yZRlqg=="}, {"md5", "XrY7u+Ae7tCTyyK7j1rNww=="}}); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, ValidServerSideEncryptionTest) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "ZW5jcnlwdGlvbl9rZXk=")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world"); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_SHA256_ATTR)); + EXPECT_NE(std::nullopt, result.at(PutGCSObject::Success)[0]->getAttribute(minifi_gcp::GCS_ENCRYPTION_ALGORITHM_ATTR)); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, InvalidServerSideEncryptionTest) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession).Times(0); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::EncryptionKey.getName(), "not_base64_key")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + EXPECT_THROW(test_controller_.trigger("hello world"), minifi::Exception); +} + +TEST_F(PutGCSObjectTests, NoContentType) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_FALSE(request.HasOption()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world"); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, ContentTypeFromAttribute) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ("text/attribute", request.GetOption().value()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + const auto& result = test_controller_.trigger("hello world", {{"mime.type", "text/attribute"}}); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, ObjectACLTest) { + EXPECT_CALL(*put_gcs_object_->mock_client_, CreateResumableSession) + .WillOnce([](const ResumableUploadRequest& request) { + EXPECT_TRUE(request.HasOption()); + EXPECT_EQ(gcs::PredefinedAcl::AuthenticatedRead().value(), request.GetOption().value()); + auto mock_upload_session = std::make_unique(); + EXPECT_CALL(*mock_upload_session, done()).WillRepeatedly(testing::Return(false)); + EXPECT_CALL(*mock_upload_session, next_expected_byte()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_upload_session, UploadChunk).WillRepeatedly(return_upload_in_progress()); + EXPECT_CALL(*mock_upload_session, UploadFinalChunk).WillOnce(return_upload_done(request)); + return google::cloud::make_status_or(std::unique_ptr(std::move(mock_upload_session))); + }); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Bucket.getName(), "bucket-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::Key.getName(), "object-name-from-property")); + EXPECT_TRUE(test_controller_.plan->setProperty(put_gcs_object_, PutGCSObject::ObjectACL.getName(), toString(PutGCSObject::PredefinedAcl::AUTHENTICATED_READ))); + const auto& result = test_controller_.trigger("hello world"); + ASSERT_EQ(1, result.at(PutGCSObject::Success).size()); + EXPECT_EQ(0, result.at(PutGCSObject::Failure).size()); + EXPECT_EQ("hello world", test_controller_.plan->getContent(result.at(PutGCSObject::Success)[0])); +} + +TEST_F(PutGCSObjectTests, PredefinedACLTests) { + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::AUTHENTICATED_READ), gcs::PredefinedAcl::AuthenticatedRead().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::BUCKET_OWNER_FULL_CONTROL), gcs::PredefinedAcl::BucketOwnerFullControl().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::BUCKET_OWNER_READ_ONLY), gcs::PredefinedAcl::BucketOwnerRead().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::PRIVATE), gcs::PredefinedAcl::Private().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::PROJECT_PRIVATE), gcs::PredefinedAcl::ProjectPrivate().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::PUBLIC_READ_ONLY), gcs::PredefinedAcl::PublicRead().value()); + EXPECT_EQ(toString(PutGCSObject::PredefinedAcl::PUBLIC_READ_WRITE), gcs::PredefinedAcl::PublicReadWrite().value()); +} diff --git a/run_flake8.sh b/run_flake8.sh index be29c001d9..b0577c4286 100755 --- a/run_flake8.sh +++ b/run_flake8.sh @@ -19,4 +19,4 @@ set -euo pipefail directory=${1:-.} -flake8 --exclude thirdparty,build,cmake-build-* --builtins log,REL_SUCCESS,REL_FAILURE,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}" +flake8 --exclude venv,thirdparty,build,cmake-build-* --builtins log,REL_SUCCESS,REL_FAILURE,raw_input --ignore E501,W503 --per-file-ignores="steps.py:F811" "${directory}" diff --git a/thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch b/thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch new file mode 100644 index 0000000000..56d951a858 --- /dev/null +++ b/thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch @@ -0,0 +1,13 @@ +diff --git a/cmake/IncludeNlohmannJson.cmake b/cmake/IncludeNlohmannJson.cmake +index db8056ae0..613f18b97 100644 +--- a/cmake/IncludeNlohmannJson.cmake ++++ b/cmake/IncludeNlohmannJson.cmake +@@ -23,7 +23,7 @@ function (find_nlohmann_json) + # library that is all we need. + find_path(GOOGLE_CLOUD_CPP_NLOHMANN_JSON_HEADER "nlohmann/json.hpp" + REQUIRED) +- add_library(nlohmann_json::nlohmann_json UNKNOWN IMPORTED) ++ add_library(nlohmann_json::nlohmann_json INTERFACE IMPORTED) + set_property( + TARGET nlohmann_json::nlohmann_json + APPEND diff --git a/thirdparty/google-cloud-cpp/remove-find_package.patch b/thirdparty/google-cloud-cpp/remove-find_package.patch new file mode 100644 index 0000000000..74c16ee985 --- /dev/null +++ b/thirdparty/google-cloud-cpp/remove-find_package.patch @@ -0,0 +1,11 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +--- a/CMakeLists.txt (revision 334e481f49c78c9e22aa48e59d044f6051ce99a7) ++++ b/CMakeLists.txt (date 1646227181483) +@@ -251,7 +251,6 @@ + # Each subproject adds dependencies to this target to have their docs generated. + add_custom_target(doxygen-docs) + +-find_package(absl CONFIG REQUIRED) + if (${GOOGLE_CLOUD_CPP_ENABLE_GRPC}) + find_package(gRPC REQUIRED QUIET) + find_package(ProtobufWithTargets REQUIRED QUIET) diff --git a/win_build_vs.bat b/win_build_vs.bat index f0fd8dfb96..e69bb14616 100755 --- a/win_build_vs.bat +++ b/win_build_vs.bat @@ -58,6 +58,7 @@ for %%x in (%*) do ( if [%%~x] EQU [/SFTP] set build_SFTP=ON if [%%~x] EQU [/PDH] set build_PDH=ON if [%%~x] EQU [/SPLUNK] set build_SPLUNK=ON + if [%%~x] EQU [/GCP] set build_GCP=ON if [%%~x] EQU [/M] set installer_merge_modules=ON if [%%~x] EQU [/Z] set build_azure=ON if [%%~x] EQU [/N] set build_nanofi=ON @@ -74,7 +75,7 @@ for %%x in (%*) do ( mkdir %builddir% pushd %builddir%\ -cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP% -DENABLE_SPLUNK=%build_SPLUNK% -DENABLE_NANOFI=%build_nanofi% -DENABLE_OPENCV=%build_opencv% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DENABLE_SCRIPTING=OFF -DEXCLUDE_BOOST=ON -DENABLE_WEL=ON -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% %strict_gsl_checks% %redist% -DENABLE_LINTER=%build_linter% "%scriptdir%" && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy bin\%cmake_build_type%\minifi.exe main\ +cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -DENABLE_SFTP=%build_SFTP% -DENABLE_SPLUNK=%build_SPLUNK% -DENABLE_GCP=%build_GCP% -DENABLE_NANOFI=%build_nanofi% -DENABLE_OPENCV=%build_opencv% -DUSE_SHARED_LIBS=OFF -DDISABLE_CONTROLLER=ON -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DENABLE_SCRIPTING=OFF -DEXCLUDE_BOOST=ON -DENABLE_WEL=ON -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% %strict_gsl_checks% %redist% -DENABLE_LINTER=%build_linter% "%scriptdir%" && msbuild /m nifi-minifi-cpp.sln /property:Configuration=%cmake_build_type% /property:Platform=%build_platform% && copy bin\%cmake_build_type%\minifi.exe main\ IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL% if [%cpack%] EQU [ON] ( cpack -C %cmake_build_type%