From 9121a8147b1b528094c9f72382cbc3e264463097 Mon Sep 17 00:00:00 2001 From: Gem Lamont <106068376+gem-neo4j@users.noreply.github.com> Date: Mon, 28 Oct 2024 09:03:29 +0100 Subject: [PATCH] Cypher25 APOC proc func migration (#4208) * [lYfZxdRz] Migrate procedures and functions from Core to Cypher 25 --------- Co-authored-by: Omar Ahmad --- .../actions/test-gradle-project/action.yml | 8 +- .github/workflows/CI.yaml | 78 +++- .gitmodules | 2 +- build.gradle | 41 +- .../modules/ROOT/pages/ml/openai.adoc | 2 +- extended-it/build.gradle | 10 +- .../apoc/MissingExtraDependenciesTest.java | 23 - extended/build.gradle | 20 +- .../apoc/custom/CypherProceduresHandler.java | 6 +- .../main/java/apoc/cypher/CypherExtended.java | 9 +- .../src/main/java/apoc/es/ElasticSearch.java | 20 +- .../java/apoc/export/arrow/ArrowConfig.java | 44 ++ .../java/apoc/export/arrow/ArrowUtils.java | 41 ++ .../java/apoc/export/arrow/ExportArrow.java | 184 ++++++++ .../export/arrow/ExportArrowFileStrategy.java | 205 +++++++++ .../apoc/export/arrow/ExportArrowService.java | 75 ++++ .../export/arrow/ExportArrowStrategy.java | 291 +++++++++++++ .../arrow/ExportArrowStreamStrategy.java | 130 ++++++ .../export/arrow/ExportGraphFileStrategy.java | 123 ++++++ .../export/arrow/ExportGraphStrategy.java | 113 +++++ .../arrow/ExportGraphStreamStrategy.java | 115 +++++ .../arrow/ExportResultFileStrategy.java | 121 ++++++ .../export/arrow/ExportResultStrategy.java | 48 +++ .../arrow/ExportResultStreamStrategy.java | 92 ++++ .../java/apoc/export/arrow/ImportArrow.java | 10 +- .../apoc/export/parquet/ExportParquet.java | 12 +- .../parquet/ExportParquetFileStrategy.java | 16 +- .../ExportParquetGraphFileStrategy.java | 4 +- .../ExportParquetResultFileStrategy.java | 4 +- .../apoc/export/parquet/ImportParquet.java | 10 +- .../main/java/apoc/export/xls/ExportXls.java | 12 +- .../apoc/export/xls/ExportXlsHandler.java | 9 +- extended/src/main/java/apoc/gephi/Gephi.java | 6 +- extended/src/main/java/apoc/load/Gexf.java | 10 +- .../src/main/java/apoc/load/LoadArrow.java | 187 ++++++++ .../main/java/apoc/load/LoadJsonExtended.java | 96 +++++ .../main/java/apoc/log/Neo4jLogStream.java | 123 ++++++ .../src/main/java/apoc/systemdb/SystemDb.java | 7 +- .../java/apoc/export/arrow/ArrowTest.java | 402 ++++++++++++++++++ .../apoc/export/arrow/ImportArrowTest.java | 8 +- .../src/test/java/apoc/load/LoadJsonTest.java | 199 +++++++++ .../java/apoc/load/SimpleHttpHandler.java | 106 +++++ .../java/apoc/log/Neo4jLogStreamTest.java | 69 +++ .../src/test/kotlin/apoc/nlp/NodeMatcher.kt | 2 +- extended/src/test/resources/multi.json | 2 + extra-dependencies/nlp/build.gradle | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 47 files changed, 2969 insertions(+), 130 deletions(-) create mode 100644 extended/src/main/java/apoc/export/arrow/ArrowConfig.java create mode 100644 extended/src/main/java/apoc/export/arrow/ArrowUtils.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportArrow.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportArrowFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportArrowService.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportArrowStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportGraphFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportGraphStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportGraphStreamStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportResultFileStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportResultStrategy.java create mode 100644 extended/src/main/java/apoc/export/arrow/ExportResultStreamStrategy.java create mode 100644 extended/src/main/java/apoc/load/LoadArrow.java create mode 100644 extended/src/main/java/apoc/load/LoadJsonExtended.java create mode 100644 extended/src/main/java/apoc/log/Neo4jLogStream.java create mode 100644 extended/src/test/java/apoc/export/arrow/ArrowTest.java create mode 100644 extended/src/test/java/apoc/load/LoadJsonTest.java create mode 100644 extended/src/test/java/apoc/load/SimpleHttpHandler.java create mode 100644 extended/src/test/java/apoc/log/Neo4jLogStreamTest.java create mode 100644 extended/src/test/resources/multi.json diff --git a/.github/actions/test-gradle-project/action.yml b/.github/actions/test-gradle-project/action.yml index df3ecc0fbb..0fec385d0a 100644 --- a/.github/actions/test-gradle-project/action.yml +++ b/.github/actions/test-gradle-project/action.yml @@ -10,16 +10,16 @@ runs: using: "composite" steps: - uses: ./.github/actions/setup-gradle-cache - - name: Run compile tests + - name: Run compile tests shell: bash - run: ./gradlew :${{inputs.project-name}}:compileJava :${{inputs.project-name}}:compileTestJava + run: ./gradlew --info -Pneo4jVersionOverride=$NEO4JVERSION -Pneo4jDockerEeOverride=$NEO4J_DOCKER_EE_OVERRIDE -Pneo4jDockerCeOverride=$NEO4J_DOCKER_CE_OVERRIDE :${{inputs.project-name}}:compileJava :${{inputs.project-name}}:compileTestJava - name: Run tests shell: bash - run: ./gradlew :${{inputs.project-name}}:check --parallel + run: ./gradlew --info -Pneo4jVersionOverride=$NEO4JVERSION -Pneo4jDockerEeOverride=$NEO4J_DOCKER_EE_OVERRIDE -Pneo4jDockerCeOverride=$NEO4J_DOCKER_CE_OVERRIDE :${{inputs.project-name}}:check --parallel - name: Archive test results uses: actions/upload-artifact@v3 if: always() with: name: ${{inputs.project-name}}-test-results path: | - ${{inputs.project-name}}/build/reports/tests/test/ + ${{inputs.project-name}}/build/reports/tests/test/ \ No newline at end of file diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml index 79f07ea6bf..c1b4ee7581 100644 --- a/.github/workflows/CI.yaml +++ b/.github/workflows/CI.yaml @@ -9,7 +9,8 @@ on: env: CODEARTIFACT_DOWNLOAD_URL: ${{ secrets.CODEARTIFACT_DOWNLOAD_URL }} CODEARTIFACT_USERNAME: ${{ secrets.CODEARTIFACT_USERNAME }} - + ECR_NEO4J_DOCKER_URL: ${{ secrets.ECR_NEO4J_DOCKER_URL }} + jobs: code-ql: @@ -23,23 +24,53 @@ jobs: matrix: language: [ 'java', 'javascript' ] steps: + - name: Configure AWS CLI + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + + - name: Configure CodeArtifact Authentication Token + run: | + CODEARTIFACT_TOKEN=`aws codeartifact get-authorization-token --domain build-service-live --domain-owner ${{ secrets.AWS_ACCOUNT_ID }} --query authorizationToken --output text` + echo "::add-mask::$CODEARTIFACT_TOKEN" + echo "CODEARTIFACT_TOKEN=$CODEARTIFACT_TOKEN" >> "$GITHUB_ENV" + + - name: Login in to AWS ECR + run: | + aws ecr get-login-password --region eu-west-1 | docker login --username AWS --password-stdin $ECR_NEO4J_DOCKER_URL + - uses: actions/checkout@v3 - uses: ./.github/actions/setup-jdk - uses: ./.github/actions/setup-gradle-cache - + + - name: Determine latest neo4j CI version and docker nightly images + run: | + neo4j_version_base=$(grep -e "neo4jVersion = .*" build.gradle | cut -d '=' -f 2 | tr -d \'\" | tr -d ' ') + echo "neo4j_version_base=$neo4j_version_base" + NEO4JVERSION=`aws codeartifact list-package-versions --domain build-service-live --domain-owner ${{ secrets.AWS_ACCOUNT_ID }} --repository ci-live --format maven --namespace org.neo4j --package neo4j --sort-by PUBLISHED_TIME --query "versions[?starts_with(version,'$neo4j_version_base')] | [0].version" | tr -d '" '` + echo "NEO4JVERSION=$NEO4JVERSION" >> "$GITHUB_ENV" + echo "Found NEO4JVERSION=$NEO4JVERSION" + NEO4J_DOCKER_EE_OVERRIDE="$ECR_NEO4J_DOCKER_URL:$neo4j_version_base-enterprise-debian-nightly" + echo "NEO4J_DOCKER_EE_OVERRIDE=$NEO4J_DOCKER_EE_OVERRIDE" >> "$GITHUB_ENV" + echo "Found NEO4J_DOCKER_EE_OVERRIDE=$NEO4J_DOCKER_EE_OVERRIDE" + NEO4J_DOCKER_CE_OVERRIDE="$ECR_NEO4J_DOCKER_URL:$neo4j_version_base-community-debian-nightly" + echo "NEO4J_DOCKER_CE_OVERRIDE=$NEO4J_DOCKER_CE_OVERRIDE" >> "$GITHUB_ENV" + echo "Found NEO4J_DOCKER_CE_OVERRIDE=$NEO4J_DOCKER_CE_OVERRIDE" + - name: Compile Java run: | chmod +x gradlew - ./gradlew --no-daemon --init-script init.gradle clean + ./gradlew --no-daemon --info -Pneo4jVersionOverride=$NEO4JVERSION --init-script init.gradle clean # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} - - # Autobuild attempts to build any compiled languages - - name: Autobuild - uses: github/codeql-action/autobuild@v2 + + - name: Compile + run: ./gradlew --info -Pneo4jVersionOverride=$NEO4JVERSION compileJava compileTestJava - name: Perform CodeQL Analysis uses: github/codeql-action/analyze@v2 @@ -53,6 +84,23 @@ jobs: project: [ 'extended', 'extended-it' ] runs-on: ubuntu-latest steps: + - name: Configure AWS CLI + uses: aws-actions/configure-aws-credentials@v2 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + + - name: Configure CodeArtifact Authentication Token + run: | + CODEARTIFACT_TOKEN=`aws codeartifact get-authorization-token --domain build-service-live --domain-owner ${{ secrets.AWS_ACCOUNT_ID }} --query authorizationToken --output text` + echo "::add-mask::$CODEARTIFACT_TOKEN" + echo "CODEARTIFACT_TOKEN=$CODEARTIFACT_TOKEN" >> "$GITHUB_ENV" + + - name: Login in to AWS ECR + run: | + aws ecr get-login-password --region eu-west-1 | docker login --username AWS --password-stdin $ECR_NEO4J_DOCKER_URL + - uses: actions/checkout@v2 - name: Set up JDK 17 uses: actions/setup-java@v2 @@ -66,10 +114,24 @@ jobs: ~/.gradle/wrapper key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }} + - name: Determine latest neo4j CI version and docker nightly images + run: | + neo4j_version_base=$(grep -e "neo4jVersion = .*" build.gradle | cut -d '=' -f 2 | tr -d \'\" | tr -d ' ') + echo "neo4j_version_base=$neo4j_version_base" + NEO4JVERSION=`aws codeartifact list-package-versions --domain build-service-live --domain-owner ${{ secrets.AWS_ACCOUNT_ID }} --repository ci-live --format maven --namespace org.neo4j --package neo4j --sort-by PUBLISHED_TIME --query "versions[?starts_with(version,'$neo4j_version_base')] | [0].version" | tr -d '" '` + echo "NEO4JVERSION=$NEO4JVERSION" >> "$GITHUB_ENV" + echo "Found NEO4JVERSION=$NEO4JVERSION" + NEO4J_DOCKER_EE_OVERRIDE="$ECR_NEO4J_DOCKER_URL/build-service/neo4j:$neo4j_version_base-enterprise-debian-nightly" + echo "NEO4J_DOCKER_EE_OVERRIDE=$NEO4J_DOCKER_EE_OVERRIDE" >> "$GITHUB_ENV" + echo "Found NEO4J_DOCKER_EE_OVERRIDE=$NEO4J_DOCKER_EE_OVERRIDE" + NEO4J_DOCKER_CE_OVERRIDE="$ECR_NEO4J_DOCKER_URL/build-service/neo4j:$neo4j_version_base-community-debian-nightly" + echo "NEO4J_DOCKER_CE_OVERRIDE=$NEO4J_DOCKER_CE_OVERRIDE" >> "$GITHUB_ENV" + echo "Found NEO4J_DOCKER_CE_OVERRIDE=$NEO4J_DOCKER_CE_OVERRIDE" + - name: Init gradle run: | chmod +x gradlew - ./gradlew --init-script init.gradle + ./gradlew --info -Pneo4jVersionOverride=$NEO4JVERSION -Pneo4jDockerEeOverride=$NEO4J_DOCKER_EE_OVERRIDE -Pneo4jDockerCeOverride=$NEO4J_DOCKER_CE_OVERRIDE --init-script init.gradle - name: Run ${{ matrix.project }} tests uses: ./.github/actions/test-gradle-project diff --git a/.gitmodules b/.gitmodules index 6f4a70ee54..ea53a5a847 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "apoc-core"] path = apoc-core url = https://github.com/neo4j/apoc - branch = 5.24 + branch = dev diff --git a/build.gradle b/build.gradle index 7b436d518d..3dedfb19eb 100644 --- a/build.gradle +++ b/build.gradle @@ -4,6 +4,7 @@ plugins { id 'maven-publish' id 'antlr' id "com.github.hierynomus.license-report" version"0.16.1" + id "org.jetbrains.kotlin.jvm" version "1.8.0" apply false } downloadLicenses { @@ -14,7 +15,7 @@ downloadLicenses { allprojects { group = 'org.neo4j.procedure' - version = '5.24.0' + version = '5.26.0' archivesBaseName = 'apoc' description = """neo4j-apoc-procedures""" } @@ -28,7 +29,17 @@ repositories { /*maven { // this contains the neo4j 4.0.0-beta jars url "https://neo4j.bintray.com/community/" }*/ - mavenCentral() + if (System.getenv("CODEARTIFACT_DOWNLOAD_URL") ?: "" != "") { + maven { + url System.getenv('CODEARTIFACT_DOWNLOAD_URL') + credentials { + username System.getenv('CODEARTIFACT_USERNAME') + password System.getenv('CODEARTIFACT_TOKEN') + } + } + } else { + mavenCentral() + } maven { url "https://repo.gradle.org/gradle/libs-releases" } @@ -43,7 +54,17 @@ subprojects { /*maven { // this contains the neo4j 4.0.0-beta jars url "https://neo4j.bintray.com/community/" }*/ - mavenCentral() + if (System.getenv("CODEARTIFACT_DOWNLOAD_URL") ?: "" != "") { + maven { + url System.getenv('CODEARTIFACT_DOWNLOAD_URL') + credentials { + username System.getenv('CODEARTIFACT_USERNAME') + password System.getenv('CODEARTIFACT_TOKEN') + } + } + } else { + mavenCentral() + } maven { url "https://repo.gradle.org/gradle/libs-releases" } @@ -58,7 +79,7 @@ subprojects { task mySourcesJar(type: Jar) { from sourceSets.main.allJava - classifier = 'sources' + archiveClassifier = 'sources' } task myJavadocJar(type: Jar) { @@ -71,8 +92,8 @@ subprojects { // neo4jDockerImage system property is used in TestContainerUtil systemProperties 'user.language' : 'en' , 'user.country' : 'US', - 'neo4jDockerImage' : System.getProperty("NEO4JVERSION") ? 'neo4j:' + System.getProperty("NEO4JVERSION") + '-enterprise-debian' : 'neo4j:5.24.1-enterprise', - 'neo4jCommunityDockerImage': System.getProperty("NEO4JVERSION") ? 'neo4j:' + System.getProperty("NEO4JVERSION") + '-debian' : 'neo4j:5.24.1', + 'neo4jDockerImage': project.hasProperty("neo4jDockerEeOverride") ? project.getProperty("neo4jDockerEeOverride") : 'neo4j:5.26.0-enterprise', + 'neo4jCommunityDockerImage': project.hasProperty("neo4jDockerCeOverride") ? project.getProperty("neo4jDockerCeOverride") : 'neo4j:5.26.0', 'coreDir': 'apoc-core/core', 'testDockerBundle': false @@ -131,9 +152,9 @@ subprojects { ext { // NB: due to version.json generation by parsing this file, the next line must not have any if/then/else logic - neo4jVersion = "5.24.1" + neo4jVersion = "5.26.0" // instead we apply the override logic here - neo4jVersionEffective = project.hasProperty("neo4jVersionOverride") ? project.getProperty("neo4jVersionOverride") : neo4jVersion - testContainersVersion = '1.18.3' + neo4jVersionEffective = project.hasProperty("neo4jVersionOverride") ? project.getProperty("neo4jVersionOverride") : neo4jVersion + "-SNAPSHOT" + testContainersVersion = '1.20.2' apacheArrowVersion = '15.0.0' -} +} \ No newline at end of file diff --git a/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc b/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc index 57e5068c46..95c23fb4ac 100644 --- a/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc +++ b/docs/asciidoc/modules/ROOT/pages/ml/openai.adoc @@ -18,7 +18,7 @@ All the following procedures can have the following APOC config, i.e. in `apoc.c | apoc.ml.openai.url | the OpenAI endpoint base url | `https://api.openai.com/v1` by default, `https://api.anthropic.com/v1` if `apoc.ml.openai.type=`, or empty string if `apoc.ml.openai.type=` -| apoc.ml.azure.api.version | in case of `apoc.ml.openai.type=AZURE`, indicates the `api-version` to be passed after the `?api-version=` url +| apoc.ml.azure.api.version | in case of `apoc.ml.openai.type=AZURE`, indicates the `api-version` to be passed after the `?api-version=` url | "" |=== diff --git a/extended-it/build.gradle b/extended-it/build.gradle index 0f78343ca0..c3e6c1b5ea 100644 --- a/extended-it/build.gradle +++ b/extended-it/build.gradle @@ -3,7 +3,7 @@ plugins { id 'com.github.johnrengelman.shadow' id 'maven-publish' id 'antlr' - id "org.jetbrains.kotlin.jvm" version "1.6.0" + id "org.jetbrains.kotlin.jvm" version "1.8.0" id "com.diffplug.spotless" version "6.7.2" } @@ -51,10 +51,10 @@ dependencies { } testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers - testImplementation group: 'org.testcontainers', name: 'qdrant', version: '1.19.7' - testImplementation group: 'org.testcontainers', name: 'chromadb', version: '1.19.7' - testImplementation group: 'org.testcontainers', name: 'weaviate', version: '1.19.7' - testImplementation group: 'org.testcontainers', name: 'milvus', version: '1.19.7' + testImplementation group: 'org.testcontainers', name: 'qdrant', version: '1.20.2' + testImplementation group: 'org.testcontainers', name: 'chromadb', version: '1.20.2' + testImplementation group: 'org.testcontainers', name: 'weaviate', version: '1.20.2' + testImplementation group: 'org.testcontainers', name: 'milvus', version: '1.20.2' configurations.all { exclude group: 'org.slf4j', module: 'slf4j-nop' diff --git a/extended-it/src/test/java/apoc/MissingExtraDependenciesTest.java b/extended-it/src/test/java/apoc/MissingExtraDependenciesTest.java index 9cc96887f0..71ee4ff0f7 100644 --- a/extended-it/src/test/java/apoc/MissingExtraDependenciesTest.java +++ b/extended-it/src/test/java/apoc/MissingExtraDependenciesTest.java @@ -47,29 +47,6 @@ public static void tearDown() { neo4jContainer.close(); } - @Test - public void testParquet() { - // export file - assertParquetFails("CALL apoc.export.parquet.all('test.parquet')"); - assertParquetFails("CALL apoc.export.parquet.data([], [], 'test.parquet')"); - assertParquetFails("CALL apoc.export.parquet.graph({nodes: [], relationships: []}, 'test.parquet')"); - assertParquetFails("CALL apoc.export.parquet.query('MATCH (n:ParquetNode) RETURN n', 'test.parquet')"); - - // export stream - assertParquetFails("CALL apoc.export.parquet.all.stream()"); - assertParquetFails("CALL apoc.export.parquet.data.stream([], [])"); - assertParquetFails("CALL apoc.export.parquet.graph.stream({nodes: [], relationships: []})"); - assertParquetFails("CALL apoc.export.parquet.query.stream('MATCH (n:ParquetNode) RETURN n')"); - - // import and load - assertParquetFails("CALL apoc.import.parquet('test.parquet')"); - assertParquetFails("CALL apoc.load.parquet('test.parquet')"); - } - - private static void assertParquetFails(String query) { - assertFails(query, PARQUET_MISSING_DEPS_ERROR); - } - @Test public void testCouchbase() { assertCouchbaseFails("CALL apoc.couchbase.get('host', 'bucket', 'documentId')"); diff --git a/extended/build.gradle b/extended/build.gradle index 7f27d1507d..26d4f0f2b5 100644 --- a/extended/build.gradle +++ b/extended/build.gradle @@ -2,11 +2,11 @@ import org.gradle.api.internal.artifacts.DefaultExcludeRule plugins { id 'java' - id 'com.github.johnrengelman.shadow' + id 'com.github.johnrengelman.shadow' version '7.1.0' id 'maven-publish' id 'antlr' - id "org.jetbrains.kotlin.jvm" version "1.6.0" - id "com.diffplug.spotless" version "6.7.2" + id "org.jetbrains.kotlin.jvm" version "1.8.0" + id "com.diffplug.spotless" version "6.22.0" } spotless { @@ -26,6 +26,7 @@ jar { } compileKotlin { + dependsOn(generateGrammarSource) kotlinOptions.jvmTarget = "17" } @@ -33,6 +34,11 @@ generateGrammarSource { arguments += ["-package", "apoc.custom"] } +sourceSets.configureEach { + var generateGrammarSource = tasks.named(getTaskName("generate", "GrammarSource")) + java.srcDir(generateGrammarSource.map { files() }) +} + javadoc { failOnError = false options.addStringOption('Xdoclint:none', '-quiet') @@ -54,11 +60,13 @@ task gitSubmoduleLoad { dependencies { // to fix unresolved imports with IntelliJ from version 2024.1.x compileOnly sourceSets.main.java + + apt project(':processor') apt group: 'org.neo4j', name: 'neo4j', version: neo4jVersionEffective // mandatory to run @ServiceProvider based META-INF code generation - antlr "org.antlr:antlr4:4.13.1", { + antlr "org.antlr:antlr4:4.13.2", { exclude group: 'org.glassfish' exclude group: 'com.ibm.icu' exclude group: 'org.abego.treelayout' @@ -111,7 +119,7 @@ dependencies { compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270' compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-comprehend', version: '1.12.353' , withoutJacksons compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0' - compileOnly group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.6.0' + compileOnly group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.8.0' compileOnly group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers // testImplementation analogous is not needed since is bundled via `test-utils` submodule @@ -133,7 +141,7 @@ dependencies { testImplementation group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0' testImplementation group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270' testImplementation group: 'org.reflections', name: 'reflections', version: '0.9.12' - testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.6.0' + testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.8.0' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '1.3' testImplementation group: 'org.apache.derby', name: 'derby', version: '10.12.1.1' testImplementation group: 'org.mock-server', name: 'mockserver-netty', version: '5.6.0' diff --git a/extended/src/main/java/apoc/custom/CypherProceduresHandler.java b/extended/src/main/java/apoc/custom/CypherProceduresHandler.java index 7eb74724e3..288e33fd74 100644 --- a/extended/src/main/java/apoc/custom/CypherProceduresHandler.java +++ b/extended/src/main/java/apoc/custom/CypherProceduresHandler.java @@ -22,7 +22,7 @@ import org.neo4j.internal.kernel.api.procs.ProcedureSignature; import org.neo4j.internal.kernel.api.procs.QualifiedName; import org.neo4j.internal.kernel.api.procs.UserFunctionSignature; -import org.neo4j.kernel.api.CypherScope; +import org.neo4j.kernel.api.QueryLanguage; import org.neo4j.kernel.api.ResourceMonitor; import org.neo4j.kernel.api.procedure.CallableProcedure; import org.neo4j.kernel.api.procedure.CallableUserFunction; @@ -243,7 +243,7 @@ private long getLastUpdate() { public boolean registerProcedure(ProcedureSignature signature, String statement) { QualifiedName name = signature.name(); try { - boolean exists = globalProceduresRegistry.getCurrentView().getAllProcedures(CypherScope.CYPHER_5) + boolean exists = globalProceduresRegistry.getCurrentView().getAllProcedures(QueryLanguage.CYPHER_5) .anyMatch(s -> s.name().equals(name)); if (exists) { // we deregister and remove possible homonyms signatures overridden/overloaded @@ -293,7 +293,7 @@ public boolean registerFunction(UserFunctionSignature signature) { public boolean registerFunction(UserFunctionSignature signature, String statement, boolean forceSingle, boolean mapResult) { try { QualifiedName name = signature.name(); - boolean exists = globalProceduresRegistry.getCurrentView().getAllNonAggregatingFunctions(CypherScope.CYPHER_5) + boolean exists = globalProceduresRegistry.getCurrentView().getAllNonAggregatingFunctions(QueryLanguage.CYPHER_5) .anyMatch(s -> s.name().equals(name)); if (exists) { // we deregister and remove possible homonyms signatures overridden/overloaded diff --git a/extended/src/main/java/apoc/cypher/CypherExtended.java b/extended/src/main/java/apoc/cypher/CypherExtended.java index 6452d8153c..dd424de076 100644 --- a/extended/src/main/java/apoc/cypher/CypherExtended.java +++ b/extended/src/main/java/apoc/cypher/CypherExtended.java @@ -2,6 +2,7 @@ import apoc.Extended; import apoc.Pools; +import apoc.result.CypherStatementMapResult; import apoc.result.MapResult; import apoc.util.CompressionAlgo; import apoc.util.EntityUtil; @@ -368,7 +369,7 @@ public static String compiled(String fragment) { @Procedure @Description("apoc.cypher.parallel(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel through a list defined in `paramMap` with a key `keyList`") - public Stream parallel(@Name("fragment") String fragment, @Name("params") Map params, @Name("parallelizeOn") String key) { + public Stream parallel(@Name("fragment") String fragment, @Name("params") Map params, @Name("parallelizeOn") String key) { if (params == null) return runCypherQuery(tx, fragment, params); if (key == null || !params.containsKey(key)) throw new RuntimeException("Can't parallelize on key " + key + " available keys " + params.keySet()); @@ -382,7 +383,7 @@ public Stream parallel(@Name("fragment") String fragment, @Name("para terminationGuard.check(); Map parallelParams = new HashMap<>(params); parallelParams.replace(key, v); - return tx.execute(statement, parallelParams).stream().map(MapResult::new); + return tx.execute(statement, parallelParams).stream().map(CypherStatementMapResult::new); }); /* @@ -453,7 +454,7 @@ public Map parallelParams(@Name("params") Map pa @Procedure @Description("apoc.cypher.parallel2(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel batches through a list defined in `paramMap` with a key `keyList`") - public Stream parallel2(@Name("fragment") String fragment, @Name("params") Map params, @Name("parallelizeOn") String key) { + public Stream parallel2(@Name("fragment") String fragment, @Name("params") Map params, @Name("parallelizeOn") String key) { if (params == null) return runCypherQuery(tx, fragment, params); if (StringUtils.isEmpty(key) || !params.containsKey(key)) throw new RuntimeException("Can't parallelize on key " + key + " available keys " + params.keySet() + ". Note that parallelizeOn parameter must be not empty"); @@ -486,7 +487,7 @@ public Stream parallel2(@Name("fragment") String fragment, @Name("par } return futures.stream().flatMap(f -> { try { - return EntityUtil.anyRebind(tx, f.get()).stream().map(MapResult::new); + return EntityUtil.anyRebind(tx, f.get()).stream().map(CypherStatementMapResult::new); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Error executing in parallel " + statement, e); } diff --git a/extended/src/main/java/apoc/es/ElasticSearch.java b/extended/src/main/java/apoc/es/ElasticSearch.java index 7c8f31afa7..903716eaa8 100644 --- a/extended/src/main/java/apoc/es/ElasticSearch.java +++ b/extended/src/main/java/apoc/es/ElasticSearch.java @@ -2,7 +2,7 @@ import apoc.Extended; import apoc.load.LoadJsonUtils; -import apoc.result.MapResult; +import apoc.result.LoadDataMapResult; import apoc.util.Util; import org.neo4j.graphdb.security.URLAccessChecker; import org.neo4j.procedure.Context; @@ -36,7 +36,7 @@ protected String toPayload(Object payload) { @Procedure @Description("apoc.es.stats(host-or-key,$config) - elastic search statistics") - public Stream stats(@Name("host") String hostOrKey, @Name(value = "config", defaultValue = "{}") Map config) { + public Stream stats(@Name("host") String hostOrKey, @Name(value = "config", defaultValue = "{}") Map config) { ElasticSearchConfig conf = new ElasticSearchConfig(config); String url = conf.getVersion().getElasticSearchUrl(hostOrKey); return loadJsonStream(url + "/_stats", conf, null); @@ -44,7 +44,7 @@ public Stream stats(@Name("host") String hostOrKey, @Name(value = "co @Procedure @Description("apoc.es.get(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a GET operation on elastic search") - public Stream get(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("id") String id, @Name("query") Object query, @Name("payload") Object payload, + public Stream get(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("id") String id, @Name("query") Object query, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { ElasticSearchConfig conf = new ElasticSearchConfig(config); String queryUrl = conf.getVersion().getQueryUrl(hostOrKey, index, type, id, query);//.replace("mytype/", ""); @@ -53,7 +53,7 @@ public Stream get(@Name("hostOrKey") String hostOrKey, @Name("index") @Procedure @Description("apoc.es.query(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a SEARCH operation on elastic search") - public Stream query(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("query") Object query, @Name("payload") Object payload, + public Stream query(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("query") Object query, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { ElasticSearchConfig conf = new ElasticSearchConfig(config); String searchQueryUrl = conf.getVersion().getSearchQueryUrl(hostOrKey, index, type, query);//.replace("mytype/", ""); @@ -63,7 +63,7 @@ public Stream query(@Name("hostOrKey") String hostOrKey, @Name("index @Procedure @Description("apoc.es.getRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw GET operation on elastic search") - public Stream getRaw(@Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, + public Stream getRaw(@Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { ElasticSearchConfig conf = new ElasticSearchConfig(config); String url = conf.getVersion().getElasticSearchUrl(hostOrKey); @@ -72,7 +72,7 @@ public Stream getRaw(@Name("hostOrKey") String hostOrKey, @Name("path @Procedure @Description("apoc.es.postRaw(host-or-key,path,payload-or-null,$config) yield value - perform a raw POST operation on elastic search") - public Stream postRaw(@Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { + public Stream postRaw(@Name("hostOrKey") String hostOrKey, @Name("path") String suffix, @Name("payload") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { ElasticSearchConfig conf = new ElasticSearchConfig(config, "POST"); String url = conf.getVersion().getElasticSearchUrl(hostOrKey); return loadJsonStream(url + "/" + suffix, conf, toPayload(payload)); @@ -80,7 +80,7 @@ public Stream postRaw(@Name("hostOrKey") String hostOrKey, @Name("pat @Procedure @Description("apoc.es.post(host-or-key,index-or-null,type-or-null,query-or-null,payload-or-null,$config) yield value - perform a POST operation on elastic search") - public Stream post(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("query") Object query, + public Stream post(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("query") Object query, @Name(value = "payload", defaultValue = "{}") Object payload, @Name(value = "config", defaultValue = "{}") Map config) { if (payload == null) @@ -94,7 +94,7 @@ public Stream post(@Name("hostOrKey") String hostOrKey, @Name("index" @Procedure @Description("apoc.es.put(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,payload-or-null,$config) yield value - perform a PUT operation on elastic search") - public Stream put(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("id") String id, @Name("query") Object query, + public Stream put(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("id") String id, @Name("query") Object query, @Name(value = "payload", defaultValue = "{}") Map payload, @Name(value = "config", defaultValue = "{}") Map config) { if (payload == null) @@ -109,7 +109,7 @@ public Stream put(@Name("hostOrKey") String hostOrKey, @Name("index") @Procedure @Description("apoc.es.delete(host-or-key,index-or-null,type-or-null,id-or-null,query-or-null,$config) yield value - perform a DELETE operation on elastic search") - public Stream delete(@Name("hostOrKey") String hostOrKey, + public Stream delete(@Name("hostOrKey") String hostOrKey, @Name("index") String index, @Name("type") String type, @Name("id") String id, @@ -126,7 +126,7 @@ public Stream delete(@Name("hostOrKey") String hostOrKey, return loadJsonStream(queryUrl, conf, payload); } - private Stream loadJsonStream(@Name("url") Object url, ElasticSearchConfig conf, @Name("payload") String payload) { + private Stream loadJsonStream(@Name("url") Object url, ElasticSearchConfig conf, @Name("payload") String payload) { return LoadJsonUtils.loadJsonStream(url, conf.getHeaders(), payload, "", true, null, null, null, urlAccessChecker); } } diff --git a/extended/src/main/java/apoc/export/arrow/ArrowConfig.java b/extended/src/main/java/apoc/export/arrow/ArrowConfig.java new file mode 100644 index 0000000000..8131e5a1d1 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ArrowConfig.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.util.Util; + +import java.util.Collections; +import java.util.Map; + +public class ArrowConfig { + + private final int batchSize; + + private final Map config; + + public ArrowConfig(Map config) { + this.config = config == null ? Collections.emptyMap() : config; + this.batchSize = Util.toInteger(this.config.getOrDefault("batchSize", 2000)); + } + + public int getBatchSize() { + return batchSize; + } + + public Map getConfig() { + return config; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ArrowUtils.java b/extended/src/main/java/apoc/export/arrow/ArrowUtils.java new file mode 100644 index 0000000000..60e5906dcb --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ArrowUtils.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; + +import java.util.List; + +public class ArrowUtils { + + private ArrowUtils() {} + + public static Field FIELD_ID = new Field("", FieldType.nullable(Types.MinorType.BIGINT.getType()), null); + public static Field FIELD_LABELS = new Field( + "labels", + FieldType.nullable(Types.MinorType.LIST.getType()), + List.of(new Field("$data$", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null))); + public static Field FIELD_SOURCE_ID = + new Field("", FieldType.nullable(Types.MinorType.BIGINT.getType()), null); + public static Field FIELD_TARGET_ID = + new Field("", FieldType.nullable(Types.MinorType.BIGINT.getType()), null); + public static Field FIELD_TYPE = new Field("", FieldType.nullable(Types.MinorType.VARCHAR.getType()), null); +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrow.java b/extended/src/main/java/apoc/export/arrow/ExportArrow.java new file mode 100644 index 0000000000..cb923e44c5 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrow.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.export.util.NodesAndRelsSubGraph; +import apoc.result.ByteArrayResult; +import apoc.result.ExportProgressInfo; +import apoc.result.VirtualGraph; +import org.neo4j.cypher.export.DatabaseSubGraph; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.graphdb.Transaction; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.logging.Log; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.NotThreadSafe; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; + +public class ExportArrow { + + @Context + public Transaction tx; + + @Context + public GraphDatabaseService db; + + @Context + public Pools pools; + + @Context + public Log logger; + + @Context + public TerminationGuard terminationGuard; + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.stream.all", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the full database as an arrow byte array.") + public Stream all( + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + return new ExportArrowService(db, pools, terminationGuard, logger) + .stream(new DatabaseSubGraph(tx), new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure( + name = "apoc.export.arrow.stream.graph", + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given graph as an arrow byte array.") + public Stream graph( + @Name(value = "graph", description = "The graph to export.") Object graph, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + final SubGraph subGraph; + if (graph instanceof Map) { + Map mGraph = (Map) graph; + if (!mGraph.containsKey("nodes")) { + throw new IllegalArgumentException( + "Graph Map must contains `nodes` field and `relationships` optionally"); + } + subGraph = new NodesAndRelsSubGraph( + tx, (Collection) mGraph.get("nodes"), (Collection) mGraph.get("relationships")); + } else if (graph instanceof VirtualGraph) { + VirtualGraph vGraph = (VirtualGraph) graph; + subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships()); + } else { + throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map"); + } + return new ExportArrowService(db, pools, terminationGuard, logger).stream(subGraph, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure( + name = "apoc.export.arrow.stream.query", + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given Cypher query as an arrow byte array.") + public Stream query( + @Name(value = "query", description = "The query used to collect the data for export.") String query, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + Map params = config == null + ? Collections.emptyMap() + : (Map) config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query, params); + return new ExportArrowService(db, pools, terminationGuard, logger).stream(result, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.all", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the full database as an arrow file.") + public Stream all( + @Name(value = "file", description = "The name of the file to export the data to.") String fileName, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, new DatabaseSubGraph(tx), new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.graph", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the given graph as an arrow file.") + public Stream graph( + @Name(value = "file", description = "The name of the file to export the data to.") String fileName, + @Name(value = "graph", description = "The graph to export.") Object graph, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + final SubGraph subGraph; + if (graph instanceof Map) { + Map mGraph = (Map) graph; + if (!mGraph.containsKey("nodes")) { + throw new IllegalArgumentException( + "Graph Map must contains `nodes` field and `relationships` optionally"); + } + subGraph = new NodesAndRelsSubGraph( + tx, (Collection) mGraph.get("nodes"), (Collection) mGraph.get("relationships")); + } else if (graph instanceof VirtualGraph) { + VirtualGraph vGraph = (VirtualGraph) graph; + subGraph = new NodesAndRelsSubGraph(tx, vGraph.nodes(), vGraph.relationships()); + } else { + throw new IllegalArgumentException("Supported inputs are VirtualGraph, Map"); + } + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, subGraph, new ArrowConfig(config)); + } + + @NotThreadSafe + @Procedure(name = "apoc.export.arrow.query", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Exports the results from the given Cypher query as an arrow file.") + public Stream query( + @Name(value = "file", description = "The name of the file to which the data will be exported.") + String fileName, + @Name(value = "query", description = "The query to use to collect the data for export.") String query, + @Name(value = "config", defaultValue = "{}", description = "{ batchSize = 2000 :: INTEGER }") + Map config) { + Map params = config == null + ? Collections.emptyMap() + : (Map) config.getOrDefault("params", Collections.emptyMap()); + Result result = tx.execute(query, params); + return new ExportArrowService(db, pools, terminationGuard, logger) + .file(fileName, result, new ArrowConfig(config)); + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrowFileStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportArrowFileStrategy.java new file mode 100644 index 0000000000..183a59056c --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrowFileStrategy.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.convert.ConvertUtils; +import apoc.export.util.ProgressReporter; +import apoc.meta.Types; +import apoc.result.ExportProgressInfo; +import apoc.util.FileUtils; +import apoc.util.Util; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowFileWriter; +import org.apache.arrow.vector.ipc.ArrowWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Path; +import org.neo4j.graphdb.Relationship; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static apoc.util.Util.labelStrings; +import static apoc.util.Util.map; + +public interface ExportArrowFileStrategy extends ExportArrowStrategy> { + + Iterator> toIterator(ProgressReporter reporter, IN data); + + default Stream export(IN data, ArrowConfig config) { + final OutputStream out = FileUtils.getOutputStream(getFileName()); + ExportProgressInfo progressInfo = new ExportProgressInfo(getFileName(), getSource(data), "arrow"); + progressInfo.setBatchSize(config.getBatchSize()); + ProgressReporter reporter = new ProgressReporter(null, null, progressInfo); + int batchCount = 0; + List> rows = new ArrayList<>(config.getBatchSize()); + VectorSchemaRoot root = null; + ArrowWriter writer = null; + try { + Iterator> it = toIterator(reporter, data); + while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext()) { + rows.add(it.next()); + if (batchCount > 0 && batchCount % config.getBatchSize() == 0) { + if (root == null) { + root = VectorSchemaRoot.create(schemaFor(rows), getBufferAllocator()); + writer = newArrowWriter(root, out); + } + writeBatch(root, writer, rows); + rows.clear(); + } + ++batchCount; + } + if (!rows.isEmpty()) { + if (root == null) { + root = VectorSchemaRoot.create(schemaFor(rows), getBufferAllocator()); + writer = newArrowWriter(root, out); + } + writeBatch(root, writer, rows); + } + } catch (Exception e) { + getLogger().error("Exception while extracting Arrow data:", e); + } finally { + reporter.done(); + Util.close(root); + Util.close(writer); + } + + return Stream.of(progressInfo); + } + + String getSource(IN data); + + default void writeBatch(VectorSchemaRoot root, ArrowWriter writer, List> rows) { + AtomicInteger counter = new AtomicInteger(); + root.allocateNew(); + rows.forEach(row -> { + final int index = counter.getAndIncrement(); + root.getFieldVectors().forEach(fe -> { + Object value = convertValue(row.get(fe.getName())); + write(index, value, fe); + }); + }); + root.setRowCount(counter.get()); + try { + writer.writeBatch(); + } catch (IOException e) { + throw new RuntimeException(e); + } + root.clear(); + } + + String getFileName(); + + TerminationGuard getTerminationGuard(); + + BufferAllocator getBufferAllocator(); + + GraphDatabaseService getGraphDatabaseApi(); + + ExecutorService getExecutorService(); + + Log getLogger(); + + default Object convertValue(Object data) { + return data == null ? null : writeJsonResult(data); + } + + default ArrowWriter newArrowWriter(VectorSchemaRoot root, OutputStream out) { + return new ArrowFileWriter(root, new DictionaryProvider.MapDictionaryProvider(), Channels.newChannel(out)); + } + + Schema schemaFor(List> rows); + + // visible for testing + public static String NODE = "node"; + public static String RELATIONSHIP = "relationship"; + + public static Object writeJsonResult(Object value) { + Types type = Types.of(value); + switch (type) { + case NODE: + return nodeToMap((Node) value); + case RELATIONSHIP: + return relToMap((Relationship) value); + case PATH: + return writeJsonResult(StreamSupport.stream(((Path) value).spliterator(), false) + .map(i -> i instanceof Node ? nodeToMap((Node) i) : relToMap((Relationship) i)) + .collect(Collectors.toList())); + case LIST: + return ConvertUtils.convertToList(value).stream() + .map(j -> writeJsonResult(j)) + .collect(Collectors.toList()); + case MAP: + return ((Map) value) + .entrySet().stream() + .collect( + HashMap::new, // workaround for https://bugs.openjdk.java.net/browse/JDK-8148463 + (mapAccumulator, entry) -> + mapAccumulator.put(entry.getKey(), writeJsonResult(entry.getValue())), + HashMap::putAll); + default: + return value; + } + } + + private static Map relToMap(Relationship rel) { + Map mapRel = map( + "id", String.valueOf(rel.getId()), + "type", RELATIONSHIP, + "label", rel.getType().toString(), + "start", nodeToMap(rel.getStartNode()), + "end", nodeToMap(rel.getEndNode())); + + return mapWithOptionalProps(mapRel, rel.getAllProperties()); + } + + private static Map nodeToMap(Node node) { + Map mapNode = map("id", String.valueOf(node.getId())); + + mapNode.put("type", NODE); + + if (node.getLabels().iterator().hasNext()) { + mapNode.put("labels", labelStrings(node)); + } + return mapWithOptionalProps(mapNode, node.getAllProperties()); + } + + private static Map mapWithOptionalProps(Map mapEntity, Map props) { + if (!props.isEmpty()) { + mapEntity.put("properties", props); + } + return mapEntity; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrowService.java b/extended/src/main/java/apoc/export/arrow/ExportArrowService.java new file mode 100644 index 0000000000..92b96ed544 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrowService.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.result.ByteArrayResult; +import apoc.result.ExportProgressInfo; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Result; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.stream.Stream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.EXPORT_NOT_ENABLED_ERROR; +import static apoc.ApocConfig.apocConfig; + +public class ExportArrowService { + + public static final String EXPORT_TO_FILE_ARROW_ERROR = EXPORT_NOT_ENABLED_ERROR + + "\nOtherwise, if you are running in a cloud environment without filesystem access, use the apoc.export.arrow.stream.* procedures to stream the export back to your client."; + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + + public ExportArrowService(GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger) { + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + } + + public Stream stream(Object data, ArrowConfig config) { + if (data instanceof Result) { + return new ExportResultStreamStrategy(db, pools, terminationGuard, logger).export((Result) data, config); + } else { + return new ExportGraphStreamStrategy(db, pools, terminationGuard, logger).export((SubGraph) data, config); + } + } + + public Stream file(String fileName, Object data, ArrowConfig config) { + // we cannot use apocConfig().checkWriteAllowed(..) because the error is confusing + // since it says "... use the `{stream:true}` config", but with arrow procedures the streaming mode is + // implemented via different procedures + if (!apocConfig().getBoolean(APOC_EXPORT_FILE_ENABLED)) { + throw new RuntimeException(EXPORT_TO_FILE_ARROW_ERROR); + } + if (data instanceof Result) { + return new ExportResultFileStrategy(fileName, db, pools, terminationGuard, logger) + .export((Result) data, config); + } else { + return new ExportGraphFileStrategy(fileName, db, pools, terminationGuard, logger) + .export((SubGraph) data, config); + } + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrowStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportArrowStrategy.java new file mode 100644 index 0000000000..cf966e16ad --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrowStrategy.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.util.JsonUtil; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.ipc.ArrowWriter; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; +import org.neo4j.values.storable.DurationValue; + +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +public interface ExportArrowStrategy { + + OUT export(IN data, ArrowConfig config); + + Object convertValue(Object data); + + ArrowWriter newArrowWriter(VectorSchemaRoot root, OutputStream out); + + Schema schemaFor(List> rows); + + TerminationGuard getTerminationGuard(); + + BufferAllocator getBufferAllocator(); + + GraphDatabaseService getGraphDatabaseApi(); + + ExecutorService getExecutorService(); + + Log getLogger(); + + static String fromMetaType(apoc.meta.Types type) { + switch (type) { + case INTEGER: + return "Long"; + case FLOAT: + return "Double"; + case LIST: + String inner = type.toString().substring("LIST OF ".length()).trim(); + final apoc.meta.Types innerType = apoc.meta.Types.from(inner); + if (innerType == apoc.meta.Types.LIST) { + return "AnyArray"; + } + return fromMetaType(innerType) + "Array"; + case BOOLEAN: + return "Boolean"; + case MAP: + return "Map"; + case RELATIONSHIP: + return "Relationship"; + case NODE: + return "Node"; + case PATH: + return "Path"; + case POINT: + return "Point"; + case DATE: + return "Date"; + case LOCAL_TIME: + case DATE_TIME: + case LOCAL_DATE_TIME: + return "DateTime"; + case TIME: + return "Time"; + case DURATION: + return "Duration"; + default: + return "String"; + } + } + + static Field toField(String fieldName, Set propertyTypes) { + if (propertyTypes.size() > 1) { + // return string type + return new Field(fieldName, FieldType.nullable(new ArrowType.Utf8()), null); + } else { + // convert to RelatedType + final String type = propertyTypes.iterator().next(); + switch (type) { + case "Boolean": + return new Field(fieldName, FieldType.nullable(Types.MinorType.BIT.getType()), null); + case "Long": + return new Field(fieldName, FieldType.nullable(Types.MinorType.BIGINT.getType()), null); + case "Double": + return new Field(fieldName, FieldType.nullable(Types.MinorType.FLOAT8.getType()), null); + case "DateTime": + case "LocalDateTime": + case "Date": + return new Field(fieldName, FieldType.nullable(Types.MinorType.DATEMILLI.getType()), null); + case "Duration": + case "Node": + case "Relationship": + // return new Field(fieldName, + // FieldType.nullable(Types.MinorType.STRUCT.getType()), null); + case "Point": + case "Map": + // return new Field(fieldName, FieldType.nullable(Types.MinorType.MAP.getType()), + // null); + case "DateTimeArray": + case "DateArray": + case "BooleanArray": + case "LongArray": + case "DoubleArray": + case "StringArray": + case "PointArray": + default: + return (type.endsWith("Array")) + ? new Field( + fieldName, + FieldType.nullable(Types.MinorType.LIST.getType()), + List.of(toField("$data$", Set.of(type.replace("Array", ""))))) + : new Field(fieldName, FieldType.nullable(Types.MinorType.VARCHAR.getType()), null); + } + } + } + + default void write(int index, Object value, FieldVector fieldVector) { + if (fieldVector instanceof BaseVariableWidthVector) { + writeBaseVariableWidthVector(index, value, (BaseVariableWidthVector) fieldVector); + } else if (fieldVector instanceof BigIntVector) { + writeBigIntVector(index, value, (BigIntVector) fieldVector); + } else if (fieldVector instanceof DateMilliVector) { + writeDateMilliVector(index, value, (DateMilliVector) fieldVector); + } else if (fieldVector instanceof Float8Vector) { + writeFloat8Vector(index, value, (Float8Vector) fieldVector); + } else if (fieldVector instanceof BitVector) { + writeBitVector(index, value, (BitVector) fieldVector); + } else if (fieldVector instanceof ListVector) { + writeListVector(index, value, fieldVector); + } + } + + private void writeListVector(int index, Object value, FieldVector fieldVector) { + ListVector listVector = (ListVector) fieldVector; + if (value == null) { + listVector.setNull(index); + return; + } + UnionListWriter listWriter = listVector.getWriter(); + Object[] array; + if (value instanceof Collection) { + final Collection collection = (Collection) value; + array = collection.toArray(new Object[collection.size()]); + } else { + array = (Object[]) value; + } + listWriter.setPosition(index); + listWriter.startList(); + FieldVector inner = listVector.getChildrenFromFields().get(0); + for (int i = 0; i < array.length; i++) { + Object val = convertValue(array[i]); + if (val == null) { + listWriter.writeNull(); + } else if (inner instanceof ListVector) { + write(i, val, inner); + } else if (inner instanceof BaseVariableWidthVector) { + final byte[] bytes; + if (val instanceof String) { + bytes = val.toString().getBytes(StandardCharsets.UTF_8); + } else { + bytes = JsonUtil.writeValueAsBytes(val); + } + ArrowBuf tempBuf = fieldVector.getAllocator().buffer(bytes.length); + tempBuf.setBytes(0, bytes); + listWriter.varChar().writeVarChar(0, bytes.length, tempBuf); + tempBuf.close(); + } else if (inner instanceof BigIntVector) { + long lng = (long) val; + listWriter.bigInt().writeBigInt(lng); + } else if (inner instanceof Float8Vector) { + double dbl = (double) val; + listWriter.float8().writeFloat8(dbl); + } else if (inner instanceof BitVector) { + boolean bool = (boolean) val; + listWriter.bit().writeBit(bool ? 1 : 0); + } // TODO datemilli + } + listWriter.endList(); + } + + private void writeBitVector(int index, Object value, BitVector fieldVector) { + BitVector baseVector = fieldVector; + if (value == null) { + baseVector.setNull(index); + } else { + baseVector.setSafe(index, (boolean) value ? 1 : 0); + } + } + + private void writeFloat8Vector(int index, Object value, Float8Vector fieldVector) { + Float8Vector baseVector = fieldVector; + if (value == null) { + baseVector.setNull(index); + } else { + baseVector.setSafe(index, (double) value); + } + } + + private void writeDateMilliVector(int index, Object value, DateMilliVector fieldVector) { + DateMilliVector baseVector = fieldVector; + final Long dateInMillis; + if (value == null) { + dateInMillis = null; + } else if (value instanceof Date) { + dateInMillis = ((Date) value).getTime(); + } else if (value instanceof LocalDateTime) { + dateInMillis = ((LocalDateTime) value).toInstant(ZoneOffset.UTC).toEpochMilli(); + } else if (value instanceof ZonedDateTime) { + dateInMillis = ((ZonedDateTime) value).toInstant().toEpochMilli(); + } else if (value instanceof OffsetDateTime) { + dateInMillis = ((OffsetDateTime) value).toInstant().toEpochMilli(); + } else { + dateInMillis = null; + } + if (dateInMillis == null) { + baseVector.setNull(index); + } else { + baseVector.setSafe(index, dateInMillis); + } + } + + private void writeBigIntVector(int index, Object value, BigIntVector fieldVector) { + BigIntVector baseVector = fieldVector; + if (value == null) { + baseVector.setNull(index); + } else { + baseVector.setSafe(index, (long) value); + } + } + + private void writeBaseVariableWidthVector(int index, Object value, BaseVariableWidthVector fieldVector) { + final BaseVariableWidthVector baseVector = fieldVector; + if (value == null) { + baseVector.setNull(index); + return; + } + if (value instanceof DurationValue) { + value = ((DurationValue) value).toString(); + } + if (value instanceof String) { + baseVector.setSafe(index, value.toString().getBytes(StandardCharsets.UTF_8)); + } else { + baseVector.setSafe(index, JsonUtil.writeValueAsBytes(value)); + } + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java new file mode 100644 index 0000000000..ebd1a7270b --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportArrowStreamStrategy.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.result.ByteArrayResult; +import apoc.util.Util; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.ArrowWriter; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static apoc.export.arrow.ExportArrowFileStrategy.writeJsonResult; + +public interface ExportArrowStreamStrategy extends ExportArrowStrategy> { + + Iterator> toIterator(IN data); + + default byte[] writeBatch(BufferAllocator bufferAllocator, List> rows) { + try (final VectorSchemaRoot root = VectorSchemaRoot.create(schemaFor(rows), bufferAllocator); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final ArrowWriter writer = newArrowWriter(root, out)) { + AtomicInteger counter = new AtomicInteger(); + root.allocateNew(); + rows.forEach(row -> { + final int index = counter.getAndIncrement(); + root.getFieldVectors().forEach(fe -> { + Object value = convertValue(row.get(fe.getName())); + write(index, value, fe); + }); + }); + root.setRowCount(counter.get()); + writer.writeBatch(); + root.clear(); + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + default Stream export(IN data, ArrowConfig config) { + class ExportIterator implements Iterator { + ByteArrayResult current; + int batchCount = 0; + Iterator> it; + + public ExportIterator(IN data) { + it = toIterator(data); + current = null; + computeBatch(); + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public ByteArrayResult next() { + ByteArrayResult result = current; + current = null; + computeBatch(); + return result; + } + + private void computeBatch() { + boolean keepIterating = true; + List> rows = new ArrayList<>(config.getBatchSize()); + + while (!Util.transactionIsTerminated(getTerminationGuard()) && it.hasNext() && keepIterating) { + rows.add(it.next()); + if (batchCount > 0 && batchCount % config.getBatchSize() == 0) { + final byte[] bytes = writeBatch(getBufferAllocator(), rows); + current = new ByteArrayResult(bytes); + keepIterating = false; + } + ++batchCount; + } + + if (!rows.isEmpty()) { + final byte[] bytes = writeBatch(getBufferAllocator(), rows); + current = new ByteArrayResult(bytes); + } + } + } + + var streamIterator = new ExportIterator(data); + Iterable iterable = () -> streamIterator; + return StreamSupport.stream(iterable.spliterator(), false); + } + + default Object convertValue(Object data) { + return data == null ? null : writeJsonResult(data); + } + + default ArrowWriter newArrowWriter(VectorSchemaRoot root, OutputStream out) { + return new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), Channels.newChannel(out)); + } + + Schema schemaFor(List> rows); +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportGraphFileStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportGraphFileStrategy.java new file mode 100644 index 0000000000..829f4ec99f --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportGraphFileStrategy.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.export.util.ProgressReporter; +import apoc.result.ExportProgressInfo; +import apoc.util.collection.Iterables; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +public class ExportGraphFileStrategy implements ExportArrowFileStrategy, ExportGraphStrategy { + + private final String fileName; + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + + private final RootAllocator bufferAllocator; + + private Schema schema; + + public ExportGraphFileStrategy( + String fileName, GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger) { + this.fileName = fileName; + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.bufferAllocator = new RootAllocator(); + } + + @Override + public Iterator> toIterator(ProgressReporter reporter, SubGraph subGraph) { + return Stream.concat(Iterables.stream(subGraph.getNodes()), Iterables.stream(subGraph.getRelationships())) + .map(entity -> { + reporter.update(entity instanceof Node ? 1 : 0, entity instanceof Relationship ? 1 : 0, 0); + return this.entityToMap(entity); + }) + .iterator(); + } + + @Override + public String getSource(SubGraph subGraph) { + return String.format( + "graph: nodes(%d), rels(%d)", + Iterables.count(subGraph.getNodes()), Iterables.count(subGraph.getRelationships())); + } + + @Override + public Stream export(SubGraph data, ArrowConfig config) { + schemaFor(List.of(createConfigMap(data, config))); + return ExportArrowFileStrategy.super.export(data, config); + } + + @Override + public String getFileName() { + return fileName; + } + + @Override + public TerminationGuard getTerminationGuard() { + return terminationGuard; + } + + @Override + public BufferAllocator getBufferAllocator() { + return bufferAllocator; + } + + @Override + public GraphDatabaseService getGraphDatabaseApi() { + return db; + } + + @Override + public ExecutorService getExecutorService() { + return pools.getDefaultExecutorService(); + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public synchronized Schema schemaFor(List> records) { + if (schema == null) { + schema = schemaFor(getGraphDatabaseApi(), records); + } + return schema; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportGraphStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportGraphStrategy.java new file mode 100644 index 0000000000..87751bcd23 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportGraphStrategy.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.util.Util; +import apoc.util.collection.Iterables; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; +import org.neo4j.graphdb.ResultTransformer; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static apoc.export.arrow.ArrowUtils.FIELD_ID; +import static apoc.export.arrow.ArrowUtils.FIELD_LABELS; +import static apoc.export.arrow.ArrowUtils.FIELD_SOURCE_ID; +import static apoc.export.arrow.ArrowUtils.FIELD_TARGET_ID; +import static apoc.export.arrow.ArrowUtils.FIELD_TYPE; +import static apoc.export.arrow.ExportArrowStrategy.toField; + +public interface ExportGraphStrategy { + + default Schema schemaFor(GraphDatabaseService db, List> records) { + final Function, Stream> flatMapStream = m -> { + String propertyName = (String) m.get("propertyName"); + List propertyTypes = (List) m.get("propertyTypes"); + return propertyTypes.stream().map(propertyType -> toField(propertyName, new HashSet<>(propertyTypes))); + }; + final Predicate> filterStream = m -> m.get("propertyName") != null; + final ResultTransformer> parsePropertiesResult = result -> + result.stream().filter(filterStream).flatMap(flatMapStream).collect(Collectors.toSet()); + + final Map cfg = records.get(0); + final Map parameters = Map.of("config", cfg); + final Set allFields = new HashSet<>(); + Set nodeFields = db.executeTransactionally( + "CALL apoc.meta.nodeTypeProperties($config)", parameters, parsePropertiesResult); + + allFields.add(FIELD_ID); + allFields.add(FIELD_LABELS); + allFields.addAll(nodeFields); + + if (cfg.containsKey("includeRels")) { + final Set relFields = db.executeTransactionally( + "CALL apoc.meta.relTypeProperties($config)", parameters, parsePropertiesResult); + allFields.add(FIELD_SOURCE_ID); + allFields.add(FIELD_TARGET_ID); + allFields.add(FIELD_TYPE); + allFields.addAll(relFields); + } + return new Schema(allFields); + } + + default Map entityToMap(Entity entity) { + Map flattened = new HashMap<>(); + flattened.put(FIELD_ID.getName(), entity.getId()); + if (entity instanceof Node) { + flattened.put(FIELD_LABELS.getName(), Util.labelStrings((Node) entity)); + } else { + Relationship rel = (Relationship) entity; + flattened.put(FIELD_TYPE.getName(), rel.getType().name()); + flattened.put(FIELD_SOURCE_ID.getName(), rel.getStartNodeId()); + flattened.put(FIELD_TARGET_ID.getName(), rel.getEndNodeId()); + } + flattened.putAll(entity.getAllProperties()); + return flattened; + } + + default Map createConfigMap(SubGraph subGraph, ArrowConfig config) { + final List allLabelsInUse = + Iterables.stream(subGraph.getAllLabelsInUse()).map(Label::name).collect(Collectors.toList()); + final List allRelationshipTypesInUse = Iterables.stream(subGraph.getAllRelationshipTypesInUse()) + .map(RelationshipType::name) + .collect(Collectors.toList()); + Map configMap = new HashMap<>(); + configMap.put("includeLabels", allLabelsInUse); + if (!allRelationshipTypesInUse.isEmpty()) { + configMap.put("includeRels", allRelationshipTypesInUse); + } + configMap.putAll(config.getConfig()); + return configMap; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportGraphStreamStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportGraphStreamStrategy.java new file mode 100644 index 0000000000..afc605b26e --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportGraphStreamStrategy.java @@ -0,0 +1,115 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.result.ByteArrayResult; +import apoc.util.collection.Iterables; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.arrow.vector.ipc.ArrowWriter; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.cypher.export.SubGraph; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.stream.Stream; + +public class ExportGraphStreamStrategy implements ExportArrowStreamStrategy, ExportGraphStrategy { + + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + + private final RootAllocator bufferAllocator; + + private Schema schema; + + public ExportGraphStreamStrategy( + GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger) { + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.bufferAllocator = new RootAllocator(); + } + + @Override + public Iterator> toIterator(SubGraph subGraph) { + return Stream.concat(Iterables.stream(subGraph.getNodes()), Iterables.stream(subGraph.getRelationships())) + .map(this::entityToMap) + .iterator(); + } + + @Override + public Stream export(SubGraph subGraph, ArrowConfig config) { + Map configMap = createConfigMap(subGraph, config); + this.schemaFor(List.of(configMap)); + return ExportArrowStreamStrategy.super.export(subGraph, config); + } + + @Override + public TerminationGuard getTerminationGuard() { + return terminationGuard; + } + + @Override + public BufferAllocator getBufferAllocator() { + return bufferAllocator; + } + + @Override + public GraphDatabaseService getGraphDatabaseApi() { + return db; + } + + @Override + public ExecutorService getExecutorService() { + return pools.getDefaultExecutorService(); + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public ArrowWriter newArrowWriter(VectorSchemaRoot root, OutputStream out) { + return new ArrowStreamWriter(root, new DictionaryProvider.MapDictionaryProvider(), Channels.newChannel(out)); + } + + @Override + public synchronized Schema schemaFor(List> records) { + if (schema == null) { + schema = schemaFor(getGraphDatabaseApi(), records); + } + return schema; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportResultFileStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportResultFileStrategy.java new file mode 100644 index 0000000000..48b53e0a85 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportResultFileStrategy.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import apoc.export.util.ProgressReporter; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.Result; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class ExportResultFileStrategy implements ExportArrowFileStrategy, ExportResultStrategy { + + private final String fileName; + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + + private final RootAllocator bufferAllocator; + + private Schema schema; + + public ExportResultFileStrategy( + String fileName, GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger) { + this.fileName = fileName; + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.bufferAllocator = new RootAllocator(); + } + + @Override + public Iterator> toIterator(ProgressReporter reporter, Result data) { + return data.stream() + .map(row -> { + row.forEach((key, val) -> { + final boolean notNodeNorRelationship = !(val instanceof Node) && !(val instanceof Relationship); + reporter.update( + val instanceof Node ? 1 : 0, + val instanceof Relationship ? 1 : 0, + notNodeNorRelationship ? 1 : 0); + if (notNodeNorRelationship) { + reporter.nextRow(); + } + }); + return row; + }) + .iterator(); + } + + @Override + public String getSource(Result result) { + return String.format("statement: cols(%d)", result.columns().size()); + } + + @Override + public String getFileName() { + return fileName; + } + + @Override + public TerminationGuard getTerminationGuard() { + return terminationGuard; + } + + @Override + public BufferAllocator getBufferAllocator() { + return bufferAllocator; + } + + @Override + public GraphDatabaseService getGraphDatabaseApi() { + return db; + } + + @Override + public ExecutorService getExecutorService() { + return pools.getDefaultExecutorService(); + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public synchronized Schema schemaFor(List> records) { + if (schema == null) { + schema = schemaFor(getGraphDatabaseApi(), records); + } + return schema; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportResultStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportResultStrategy.java new file mode 100644 index 0000000000..d823c71981 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportResultStrategy.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.meta.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.graphdb.GraphDatabaseService; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static apoc.export.arrow.ExportArrowStrategy.fromMetaType; +import static apoc.export.arrow.ExportArrowStrategy.toField; + +public interface ExportResultStrategy { + + default Schema schemaFor(GraphDatabaseService db, List> records) { + final List fields = records.stream() + .flatMap(m -> m.entrySet().stream()) + .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(), fromMetaType(Types.of(e.getValue())))) + .collect(Collectors.groupingBy( + e -> e.getKey(), Collectors.mapping(e -> e.getValue(), Collectors.toSet()))) + .entrySet() + .stream() + .map(e -> toField(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + return new Schema(fields); + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ExportResultStreamStrategy.java b/extended/src/main/java/apoc/export/arrow/ExportResultStreamStrategy.java new file mode 100644 index 0000000000..eb57260605 --- /dev/null +++ b/extended/src/main/java/apoc/export/arrow/ExportResultStreamStrategy.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.Pools; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.types.pojo.Schema; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Result; +import org.neo4j.logging.Log; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +public class ExportResultStreamStrategy implements ExportArrowStreamStrategy, ExportResultStrategy { + + private final GraphDatabaseService db; + private final Pools pools; + private final TerminationGuard terminationGuard; + private final Log logger; + + private final RootAllocator bufferAllocator; + + private Schema schema; + + public ExportResultStreamStrategy( + GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger) { + this.db = db; + this.pools = pools; + this.terminationGuard = terminationGuard; + this.logger = logger; + this.bufferAllocator = new RootAllocator(); + } + + @Override + public Iterator> toIterator(Result data) { + return data; + } + + @Override + public TerminationGuard getTerminationGuard() { + return terminationGuard; + } + + @Override + public BufferAllocator getBufferAllocator() { + return bufferAllocator; + } + + @Override + public GraphDatabaseService getGraphDatabaseApi() { + return db; + } + + @Override + public ExecutorService getExecutorService() { + return pools.getDefaultExecutorService(); + } + + @Override + public Log getLogger() { + return logger; + } + + @Override + public synchronized Schema schemaFor(List> records) { + if (schema == null) { + schema = schemaFor(getGraphDatabaseApi(), records); + } + return schema; + } +} diff --git a/extended/src/main/java/apoc/export/arrow/ImportArrow.java b/extended/src/main/java/apoc/export/arrow/ImportArrow.java index 7735087079..7b017fec7c 100644 --- a/extended/src/main/java/apoc/export/arrow/ImportArrow.java +++ b/extended/src/main/java/apoc/export/arrow/ImportArrow.java @@ -4,7 +4,7 @@ import apoc.Pools; import apoc.export.util.BatchTransaction; import apoc.export.util.ProgressReporter; -import apoc.result.ProgressInfo; +import apoc.result.ImportProgressInfo; import apoc.util.FileUtils; import apoc.util.Util; import org.apache.arrow.memory.RootAllocator; @@ -68,9 +68,9 @@ public class ImportArrow { @Procedure(name = "apoc.import.arrow", mode = Mode.WRITE) @Description("Imports arrow from the provided arrow file or byte array") - public Stream importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + public Stream importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { - ProgressInfo result = + ImportProgressInfo result = Util.inThread(pools, () -> { String file = null; String sourceInfo = "binary"; @@ -87,7 +87,7 @@ public Stream importFile(@Name("input") Object input, @Name(value try (ArrowReader reader = getReader(input); VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) { - final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow")); + final ProgressReporter reporter = new ProgressReporter(null, null, new ImportProgressInfo(file, sourceInfo, "arrow")); BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); try { while (hasElements(counter, reader, schemaRoot)) { @@ -146,7 +146,7 @@ public Stream importFile(@Name("input") Object input, @Name(value btx.close(); } - return reporter.getTotal(); + return (ImportProgressInfo) reporter.getTotal(); } }); diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquet.java b/extended/src/main/java/apoc/export/parquet/ExportParquet.java index 979508f55c..32c3ec5b4d 100644 --- a/extended/src/main/java/apoc/export/parquet/ExportParquet.java +++ b/extended/src/main/java/apoc/export/parquet/ExportParquet.java @@ -6,7 +6,7 @@ import apoc.Pools; import apoc.export.util.NodesAndRelsSubGraph; import apoc.result.ByteArrayResult; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import apoc.util.FileUtils; import org.apache.commons.lang3.StringUtils; import org.neo4j.cypher.export.DatabaseSubGraph; @@ -98,20 +98,20 @@ public Stream query(@Name("query") String query, @Name(value = @Procedure("apoc.export.parquet.all") @Description("Exports the full database as a Parquet file.") - public Stream all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + public Stream all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { return exportParquet(fileName, new DatabaseSubGraph(tx), new ParquetConfig(config)); } @Procedure("apoc.export.parquet.data") @Description("Exports the given nodes and relationships as a Parquet file.") - public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { ParquetConfig conf = new ParquetConfig(config); return exportParquet(fileName, new NodesAndRelsSubGraph(tx, nodes, rels), conf); } @Procedure("apoc.export.parquet.graph") @Description("Exports the given graph as a Parquet file.") - public Stream graph(@Name("graph") Map graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + public Stream graph(@Name("graph") Map graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { Collection nodes = (Collection) graph.get("nodes"); Collection rels = (Collection) graph.get("relationships"); ParquetConfig conf = new ParquetConfig(config); @@ -121,7 +121,7 @@ public Stream graph(@Name("graph") Map graph, @Name @Procedure("apoc.export.parquet.query") @Description("Exports the given Cypher query as a Parquet file.") - public Stream query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + public Stream query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { ParquetConfig exportConfig = new ParquetConfig(config); Map params = config == null ? Collections.emptyMap() : (Map)config.getOrDefault("params", Collections.emptyMap()); Result result = tx.execute(query,params); @@ -129,7 +129,7 @@ public Stream query(@Name("query") String query, @Name("file") Str return exportParquet(fileName, result, exportConfig); } - public Stream exportParquet(String fileName, Object data, ParquetConfig config) throws IOException, URLAccessValidationError { + public Stream exportParquet(String fileName, Object data, ParquetConfig config) throws IOException, URLAccessValidationError { if (StringUtils.isBlank(fileName)) { throw new RuntimeException("The fileName must exists. Otherwise, use the `apoc.export.parquet.*.stream.` procedures to stream the export back to your client."); } diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java index c97110903b..933dc50898 100644 --- a/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetFileStrategy.java @@ -2,7 +2,7 @@ import apoc.Pools; import apoc.export.util.ProgressReporter; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import apoc.util.FileUtils; import apoc.util.QueueBasedSpliterator; import apoc.util.QueueUtil; @@ -24,7 +24,7 @@ import java.util.stream.StreamSupport; -public abstract class ExportParquetFileStrategy implements ExportParquetStrategy> { +public abstract class ExportParquetFileStrategy implements ExportParquetStrategy> { private String fileName; private final GraphDatabaseService db; @@ -43,13 +43,13 @@ public ExportParquetFileStrategy(String fileName, GraphDatabaseService db, Pools this.exportType = exportType; } - public Stream export(IN data, ParquetConfig config) { + public Stream export(IN data, ParquetConfig config) { - ProgressInfo progressInfo = new ProgressInfo(fileName, getSource(data), "parquet"); - progressInfo.batchSize = config.getBatchSize(); + ExportProgressInfo progressInfo = new ExportProgressInfo(fileName, getSource(data), "parquet"); + progressInfo.setBatches(config.getBatchSize()); ProgressReporter reporter = new ProgressReporter(null, null, progressInfo); - final BlockingQueue queue = new ArrayBlockingQueue<>(10); + final BlockingQueue queue = new ArrayBlockingQueue<>(10); Util.inTxFuture(pools.getDefaultExecutorService(), db, tx -> { int batchCount = 0; List rows = new ArrayList<>(config.getBatchSize()); @@ -76,12 +76,12 @@ public Stream export(IN data, ParquetConfig config) { } finally { closeWriter(); reporter.done(); - QueueUtil.put(queue, ProgressInfo.EMPTY, 10); + QueueUtil.put(queue, ExportProgressInfo.EMPTY, 10); } return true; }); - QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, Integer.MAX_VALUE); + QueueBasedSpliterator spliterator = new QueueBasedSpliterator<>(queue, ExportProgressInfo.EMPTY, terminationGuard, Integer.MAX_VALUE); return StreamSupport.stream(spliterator, false); } diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java index 57729218fb..cb11746761 100644 --- a/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetGraphFileStrategy.java @@ -2,7 +2,7 @@ import apoc.Pools; import apoc.export.util.ProgressReporter; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import apoc.util.collection.Iterables; import org.neo4j.cypher.export.SubGraph; import org.neo4j.graphdb.Entity; @@ -21,7 +21,7 @@ public ExportParquetGraphFileStrategy(String fileName, GraphDatabaseService db, } @Override - public Stream export(SubGraph data, ParquetConfig config) { + public Stream export(SubGraph data, ParquetConfig config) { return super.export(data, config); } diff --git a/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java b/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java index 04d26a6d6a..0f1e12400d 100644 --- a/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java +++ b/extended/src/main/java/apoc/export/parquet/ExportParquetResultFileStrategy.java @@ -2,7 +2,7 @@ import apoc.Pools; import apoc.export.util.ProgressReporter; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import org.neo4j.graphdb.GraphDatabaseService; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Relationship; @@ -44,7 +44,7 @@ public Iterator> toIterator(ProgressReporter reporter, Resul } @Override - public Stream export(Result data, ParquetConfig config) { + public Stream export(Result data, ParquetConfig config) { return super.export(data, config); } diff --git a/extended/src/main/java/apoc/export/parquet/ImportParquet.java b/extended/src/main/java/apoc/export/parquet/ImportParquet.java index ff22a37fe5..59ef289c7f 100644 --- a/extended/src/main/java/apoc/export/parquet/ImportParquet.java +++ b/extended/src/main/java/apoc/export/parquet/ImportParquet.java @@ -4,7 +4,7 @@ import apoc.Pools; import apoc.export.util.BatchTransaction; import apoc.export.util.ProgressReporter; -import apoc.result.ProgressInfo; +import apoc.result.ImportProgressInfo; import apoc.util.Util; import org.neo4j.graphdb.Entity; import org.neo4j.graphdb.GraphDatabaseService; @@ -52,10 +52,10 @@ public class ImportParquet { @Procedure(name = "apoc.import.parquet", mode = Mode.WRITE) @Description("Imports parquet from the provided file or binary") - public Stream importParquet( + public Stream importParquet( @Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) { - ProgressInfo result = + ImportProgressInfo result = Util.inThread(pools, () -> { String file = null; @@ -69,7 +69,7 @@ public Stream importParquet( final Map idMapping = new HashMap<>(); try (ApocParquetReader reader = getReader(input, conf, urlAccessChecker)) { - final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "parquet")); + final ProgressReporter reporter = new ProgressReporter(null, null, new ImportProgressInfo(file, sourceInfo, "parquet")); BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); @@ -116,7 +116,7 @@ public Stream importParquet( btx.close(); } - return reporter.getTotal(); + return (ImportProgressInfo) reporter.getTotal(); } }); return Stream.of(result); diff --git a/extended/src/main/java/apoc/export/xls/ExportXls.java b/extended/src/main/java/apoc/export/xls/ExportXls.java index e8ec0e904b..3fb3c8a0ce 100644 --- a/extended/src/main/java/apoc/export/xls/ExportXls.java +++ b/extended/src/main/java/apoc/export/xls/ExportXls.java @@ -3,7 +3,7 @@ import apoc.ApocConfig; import apoc.Extended; import apoc.export.util.NodesAndRelsSubGraph; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import apoc.util.MissingDependencyException; import apoc.util.Util; import org.neo4j.cypher.export.DatabaseSubGraph; @@ -38,7 +38,7 @@ public class ExportXls { @Procedure @Description("apoc.export.xls.all(file,config) - exports whole database as xls to the provided file") - public Stream all(@Name("file") String fileName, @Name("config") Map config) throws Exception { + public Stream all(@Name("file") String fileName, @Name("config") Map config) throws Exception { String source = String.format("database: nodes(%d), rels(%d)", Util.nodeCount(tx), Util.relCount(tx)); return exportXls(fileName, source, new DatabaseSubGraph(tx), config); @@ -46,7 +46,7 @@ public Stream all(@Name("file") String fileName, @Name("config") M @Procedure @Description("apoc.export.xls.data(nodes,rels,file,config) - exports given nodes and relationships as xls to the provided file") - public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name("file") String fileName, @Name("config") Map config) throws Exception { + public Stream data(@Name("nodes") List nodes, @Name("rels") List rels, @Name("file") String fileName, @Name("config") Map config) throws Exception { String source = String.format("data: nodes(%d), rels(%d)", nodes.size(), rels.size()); return exportXls(fileName, source, new NodesAndRelsSubGraph(tx, nodes, rels), config); @@ -54,7 +54,7 @@ public Stream data(@Name("nodes") List nodes, @Name("rels") @Procedure @Description("apoc.export.xls.graph(graph,file,config) - exports given graph object as xls to the provided file") - public Stream graph(@Name("graph") Map graph, @Name("file") String fileName, @Name("config") Map config) throws Exception { + public Stream graph(@Name("graph") Map graph, @Name("file") String fileName, @Name("config") Map config) throws Exception { Collection nodes = (Collection) graph.get("nodes"); Collection rels = (Collection) graph.get("relationships"); @@ -64,14 +64,14 @@ public Stream graph(@Name("graph") Map graph, @Name @Procedure @Description("apoc.export.xls.query(query,file,{config,...,params:{params}}) - exports results from the cypher statement as xls to the provided file") - public Stream query(@Name("query") String query, @Name("file") String fileName, @Name("config") Map config) throws Exception { + public Stream query(@Name("query") String query, @Name("file") String fileName, @Name("config") Map config) throws Exception { Map params = config == null ? Collections.emptyMap() : (Map)config.getOrDefault("params", Collections.emptyMap()); Result result = tx.execute(query,params); String source = String.format("statement: cols(%d)", result.columns().size()); return exportXls(fileName, source,result,config); } - private Stream exportXls(@Name("file") String fileName, String source, Object data, Map configMap) throws Exception { + private Stream exportXls(@Name("file") String fileName, String source, Object data, Map configMap) throws Exception { try { return ExportXlsHandler.getProgressInfoStream(fileName, source, data, configMap, apocConfig, db); } catch (NoClassDefFoundError e) { diff --git a/extended/src/main/java/apoc/export/xls/ExportXlsHandler.java b/extended/src/main/java/apoc/export/xls/ExportXlsHandler.java index bda05d807e..0aa01d04eb 100644 --- a/extended/src/main/java/apoc/export/xls/ExportXlsHandler.java +++ b/extended/src/main/java/apoc/export/xls/ExportXlsHandler.java @@ -3,6 +3,7 @@ import apoc.ApocConfig; import apoc.export.util.ExportConfig; import apoc.export.util.ProgressReporter; +import apoc.result.ExportProgressInfo; import apoc.result.ProgressInfo; import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.tuple.Triple; @@ -49,7 +50,7 @@ public class ExportXlsHandler { "Please see the documentation: https://neo4j.com/labs/apoc/5/overview/apoc.export/apoc.export.xls.all/#_install_dependencies"; - public static Stream getProgressInfoStream(String fileName, String source, Object data, Map configMap, ApocConfig apocConfig, GraphDatabaseService db) throws IOException { + public static Stream getProgressInfoStream(String fileName, String source, Object data, Map configMap, ApocConfig apocConfig, GraphDatabaseService db) throws IOException { ExportConfig c = new ExportConfig(configMap); apocConfig.checkWriteAllowed(c, fileName); try (Transaction tx = db.beginTx(); @@ -57,8 +58,8 @@ public static Stream getProgressInfoStream(String fileName, String SXSSFWorkbook wb = new SXSSFWorkbook(-1)) { XlsExportConfig config = new XlsExportConfig(configMap); - ProgressInfo progressInfo = new ProgressInfo(fileName, source, "xls"); - progressInfo.batchSize = config.getBatchSize(); + ProgressInfo progressInfo = new ExportProgressInfo(fileName, source, "xls"); + progressInfo.setBatches(config.getBatchSize()); ProgressReporter reporter = new ProgressReporter(null, null, progressInfo); Map styles = buildCellStyles(config, wb); @@ -77,7 +78,7 @@ public static Stream getProgressInfoStream(String fileName, String wb.dispose(); reporter.done(); tx.commit(); - return reporter.stream(); + return Stream.of((ExportProgressInfo) reporter.getTotal()); } } diff --git a/extended/src/main/java/apoc/gephi/Gephi.java b/extended/src/main/java/apoc/gephi/Gephi.java index 3551849ceb..42f6ca1802 100644 --- a/extended/src/main/java/apoc/gephi/Gephi.java +++ b/extended/src/main/java/apoc/gephi/Gephi.java @@ -2,7 +2,7 @@ import apoc.Extended; import apoc.graph.GraphsUtils; -import apoc.result.ProgressInfo; +import apoc.result.ExportProgressInfo; import apoc.util.ExtendedUtil; import apoc.util.JsonUtil; import apoc.util.UrlResolver; @@ -52,7 +52,7 @@ public static double doubleValue(Entity pc, String prop, Number defaultValue) { // TODO configure property-filters or transfer all properties @Procedure @Description("apoc.gephi.add(url-or-key, workspace, data, weightproperty, ['exportproperty']) | streams passed in data to Gephi") - public Stream add(@Name("urlOrKey") String keyOrUrl, @Name("workspace") String workspace, @Name("data") Object data,@Name(value = "weightproperty",defaultValue = "null") String weightproperty,@Name(value = "exportproperties",defaultValue = "[]") List exportproperties) { + public Stream add(@Name("urlOrKey") String keyOrUrl, @Name("workspace") String workspace, @Name("data") Object data, @Name(value = "weightproperty",defaultValue = "null") String weightproperty, @Name(value = "exportproperties",defaultValue = "[]") List exportproperties) { if (workspace == null) workspace = "workspace0"; String url = getGephiUrl(keyOrUrl)+"/"+Util.encodeUrlComponent(workspace)+"?operation=updateGraph"; long start = System.currentTimeMillis(); @@ -63,7 +63,7 @@ public Stream add(@Name("urlOrKey") String keyOrUrl, @Name("worksp if (GraphsUtils.extract(data, nodes, rels)) { String payload = toGephiStreaming(nodes, rels, weightproperty, propertyNames.toArray(new String[propertyNames.size()])); JsonUtil.loadJson(url,map("method","POST","Content-Type","application/json; charset=utf-8"), payload, "", true, null, null, urlAccessChecker).count(); - return Stream.of(new ProgressInfo(url,"graph","gephi").update(nodes.size(),rels.size(),nodes.size()).done(start)); + return Stream.of(new ExportProgressInfo(url,"graph","gephi").update(nodes.size(),rels.size(),nodes.size()).done(start)); } return Stream.empty(); } diff --git a/extended/src/main/java/apoc/load/Gexf.java b/extended/src/main/java/apoc/load/Gexf.java index b7e75571e2..cda164e95a 100644 --- a/extended/src/main/java/apoc/load/Gexf.java +++ b/extended/src/main/java/apoc/load/Gexf.java @@ -6,8 +6,8 @@ import apoc.export.util.ExportConfig; import apoc.export.util.ProgressReporter; import apoc.load.util.XmlReadUtil.Import; +import apoc.result.ExportProgressInfo; import apoc.result.MapResult; -import apoc.result.ProgressInfo; import apoc.util.FileUtils; import apoc.util.Util; import org.neo4j.graphdb.GraphDatabaseService; @@ -50,9 +50,9 @@ public Stream gexf( @Procedure(name = "apoc.import.gexf", mode = Mode.WRITE) @Description("Imports a graph from the provided GraphML file.") - public Stream importGexf( + public Stream importGexf( @Name("urlOrBinaryFile") Object urlOrBinaryFile, @Name("config") Map config) { - ProgressInfo result = Util.inThread(pools, () -> { + ExportProgressInfo result = Util.inThread(pools, () -> { ExportConfig exportConfig = new ExportConfig(config); String file = null; String source = "binary"; @@ -60,7 +60,7 @@ public Stream importGexf( file = (String) urlOrBinaryFile; source = "file"; } - ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, source, "gexf")); + ProgressReporter reporter = new ProgressReporter(null, null, new ExportProgressInfo(file, source, "gexf")); Import graphReader = new Import(db) .reporter(reporter) .batchSize(exportConfig.getBatchSize()) @@ -76,7 +76,7 @@ public Stream importGexf( graphReader.parseXML(reader, terminationGuard); } - return reporter.getTotal(); + return (ExportProgressInfo) reporter.getTotal(); }); return Stream.of(result); } diff --git a/extended/src/main/java/apoc/load/LoadArrow.java b/extended/src/main/java/apoc/load/LoadArrow.java new file mode 100644 index 0000000000..68bc238935 --- /dev/null +++ b/extended/src/main/java/apoc/load/LoadArrow.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import apoc.result.LoadDataMapResult; +import apoc.util.FileUtils; +import apoc.util.JsonUtil; +import apoc.util.Util; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.arrow.vector.util.Text; +import org.neo4j.graphdb.security.URLAccessChecker; +import org.neo4j.graphdb.security.URLAccessValidationError; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.values.storable.Values; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.channels.SeekableByteChannel; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class LoadArrow { + + @Context + public URLAccessChecker urlAccessChecker; + + private static class ArrowSpliterator extends Spliterators.AbstractSpliterator { + + private final ArrowReader reader; + private final VectorSchemaRoot schemaRoot; + private final AtomicInteger counter; + + public ArrowSpliterator(ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException { + super(Long.MAX_VALUE, Spliterator.ORDERED); + this.reader = reader; + this.schemaRoot = schemaRoot; + this.counter = new AtomicInteger(); + this.reader.loadNextBatch(); + } + + @Override + public synchronized boolean tryAdvance(Consumer action) { + try { + if (counter.get() >= schemaRoot.getRowCount()) { + if (reader.loadNextBatch()) { + counter.set(0); + } else { + return false; + } + } + final Map row = schemaRoot.getFieldVectors().stream() + .collect( + HashMap::new, + (map, fieldVector) -> map.put(fieldVector.getName(), read(fieldVector, counter.get())), + HashMap::putAll); // please look at https://bugs.openjdk.java.net/browse/JDK-8148463 + counter.incrementAndGet(); + action.accept(new LoadDataMapResult(row)); + return true; + } catch (Exception e) { + return false; + } + } + } + + @Procedure(name = "apoc.load.arrow.stream", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow byte array.") + public Stream stream( + @Name(value = "source", description = "The data to load.") byte[] source, + @Name(value = "config", defaultValue = "{}", description = "This value is never used.") + Map config) + throws IOException { + RootAllocator allocator = new RootAllocator(); + ByteArrayInputStream inputStream = new ByteArrayInputStream(source); + ArrowStreamReader streamReader = new ArrowStreamReader(inputStream, allocator); + VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot(); + return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false) + .onClose(() -> { + Util.close(allocator); + Util.close(streamReader); + Util.close(schemaRoot); + Util.close(inputStream); + }); + } + + @Procedure(name = "apoc.load.arrow", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Imports `NODE` and `RELATIONSHIP` values from the provided arrow file.") + public Stream file( + @Name(value = "file", description = "The name of the file to import data from.") String fileName, + @Name(value = "config", defaultValue = "{}", description = "This value is never used.") + Map config) + throws IOException, URISyntaxException, URLAccessValidationError { + final SeekableByteChannel channel = FileUtils.inputStreamFor(fileName, null, null, null, urlAccessChecker) + .asChannel(); + RootAllocator allocator = new RootAllocator(); + ArrowFileReader streamReader = new ArrowFileReader(channel, allocator); + VectorSchemaRoot schemaRoot = streamReader.getVectorSchemaRoot(); + return StreamSupport.stream(new ArrowSpliterator(streamReader, schemaRoot), false) + .onClose(() -> { + Util.close(allocator); + Util.close(streamReader); + Util.close(schemaRoot); + Util.close(channel); + }); + } + + private static Object read(FieldVector fieldVector, int index) { + if (fieldVector.isNull(index)) { + return null; + } else if (fieldVector instanceof DateMilliVector) { + DateMilliVector fe = (DateMilliVector) fieldVector; + return Instant.ofEpochMilli(fe.get(index)).atOffset(ZoneOffset.UTC); + } else if (fieldVector instanceof BitVector) { + BitVector fe = (BitVector) fieldVector; + return fe.get(index) == 1; + } else { + Object object = fieldVector.getObject(index); + return getObject(object); + } + } + + private static Object getObject(Object object) { + if (object instanceof Collection) { + return ((Collection) object).stream().map(LoadArrow::getObject).collect(Collectors.toList()); + } + if (object instanceof Map) { + return ((Map) object) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> getObject(e.getValue()))); + } + if (object instanceof Text) { + return object.toString(); + } + try { + // we test if is a valid Neo4j type + return Values.of(object); + } catch (Exception e) { + // otherwise we try coerce it + return valueToString(object); + } + } + + private static String valueToString(Object value) { + return JsonUtil.writeValueAsString(value); + } +} diff --git a/extended/src/main/java/apoc/load/LoadJsonExtended.java b/extended/src/main/java/apoc/load/LoadJsonExtended.java new file mode 100644 index 0000000000..9f1e24a2a0 --- /dev/null +++ b/extended/src/main/java/apoc/load/LoadJsonExtended.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import apoc.result.LoadDataMapResult; +import apoc.util.CompressionAlgo; +import org.neo4j.graphdb.security.URLAccessChecker; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.procedure.TerminationGuard; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import static apoc.load.LoadJsonUtils.loadJsonStream; +import static apoc.util.CompressionConfig.COMPRESSION; + +public class LoadJsonExtended { + + @Context + public TerminationGuard terminationGuard; + + @Context + public URLAccessChecker urlAccessChecker; + + @SuppressWarnings("unchecked") + @Procedure(name = "apoc.load.jsonParams", deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description( + "Loads parameters from a JSON URL (e.g. web-API) as a stream of values if the given JSON file is a `LIST`.\n" + + "If the given JSON file is a `MAP`, this procedure imports a single value instead.") + public Stream jsonParams( + @Name( + value = "urlOrKeyOrBinary", + description = "The name of the file or binary data to import the data from.") + Object urlOrKeyOrBinary, + @Name(value = "headers", description = "Headers to be used when connecting to the given URL.") + Map headers, + @Name(value = "payload", description = "The payload to send when connecting to the given URL.") + String payload, + @Name( + value = "path", + defaultValue = "", + description = "A JSON path expression used to extract a certain part from the list.") + String path, + @Name( + value = "config", + defaultValue = "{}", + description = + """ + { + failOnError = true :: BOOLEAN, + pathOptions :: LIST, + compression = ""NONE"" :: [""NONE"", ""BYTES"", ""GZIP"", ""BZIP2"", ""DEFLATE"", ""BLOCK_LZ4"", ""FRAMED_SNAPPYā€¯] + } + """) + Map config) { + if (config == null) config = Collections.emptyMap(); + boolean failOnError = (boolean) config.getOrDefault("failOnError", true); + String compressionAlgo = (String) config.getOrDefault(COMPRESSION, CompressionAlgo.NONE.name()); + List pathOptions = (List) config.get("pathOptions"); + return loadJsonStream( + urlOrKeyOrBinary, + headers, + payload, + path, + failOnError, + compressionAlgo, + pathOptions, + terminationGuard, + urlAccessChecker); + } +} diff --git a/extended/src/main/java/apoc/log/Neo4jLogStream.java b/extended/src/main/java/apoc/log/Neo4jLogStream.java new file mode 100644 index 0000000000..10ef47304b --- /dev/null +++ b/extended/src/main/java/apoc/log/Neo4jLogStream.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.log; + +import apoc.util.FileUtils; +import org.neo4j.kernel.api.QueryLanguage; +import org.neo4j.kernel.api.procedure.QueryLanguageScope; +import org.neo4j.procedure.Admin; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +/** + * @author moxious + * @since 27.02.19 + */ +public class Neo4jLogStream { + + public static class FileEntry implements Comparable { + @Description("The line number.") + public final long lineNo; + + @Description("The content of the line.") + public final String line; + + @Description("The path to the log file.") + public final String path; + + public FileEntry(long lineNumber, String data, String path) { + this.lineNo = lineNumber; + this.line = data; + this.path = path; + } + + public int compareTo(FileEntry o) { + return Long.compare(this.lineNo, o.lineNo); + } + } + + @Admin + @Procedure( + name = "apoc.log.stream", + mode = Mode.DBMS, + deprecatedBy = "This procedure is being moved to APOC Extended.") + @Deprecated + @QueryLanguageScope(scope = {QueryLanguage.CYPHER_25}) + @Description("Returns the file contents from the given log, optionally returning only the last n lines.\n" + + "This procedure requires users to have an admin role.") + public Stream stream( + @Name(value = "path", description = "The name of the log file to read.") String logName, + @Name(value = "config", defaultValue = "{}", description = "{ last :: INTEGER }") + Map config) { + + File logDir = FileUtils.getLogDirectory(); + + if (logDir == null) { + throw new RuntimeException("Neo4j configured server.directories.logs points to a directory that " + + "does not exist or is not readable. Please ensure this configuration is correct."); + } + + // Prepend neo4jHome if it's a relative path, and use the user's path otherwise. + File f = new File(logDir, logName); + + try { + if (!f.getCanonicalFile().toPath().startsWith(logDir.getAbsolutePath())) { + throw new RuntimeException("The path you are trying to access has a canonical path outside of the logs " + + "directory, and this procedure is only permitted to access files in the log directory. This may " + + "occur if the path in question is a symlink or other link."); + } + } catch (IOException ioe) { + throw new RuntimeException("Unable to resolve basic log file canonical path", ioe); + } + + try { + Stream stream = Files.lines(Paths.get(f.toURI())); + final AtomicLong lineNumber = new AtomicLong(0); + final String p = f.getCanonicalPath(); + + Stream entries = stream.map(line -> new FileEntry(lineNumber.getAndIncrement(), line, p)); + + // Useful for tailing logfiles. + if (config.containsKey("last")) { + return entries.sorted(Collections.reverseOrder()) + .limit(Double.valueOf(config.get("last").toString()).longValue()); + } + + return entries; + } catch (NoSuchFileException nsf) { + // This special case we want to throw a custom message and not let this error propagate, because the + // trace exposes the full path we were checking. + throw new RuntimeException("No log file exists by that name"); + } catch (IOException exc) { + throw new RuntimeException(exc); + } + } +} diff --git a/extended/src/main/java/apoc/systemdb/SystemDb.java b/extended/src/main/java/apoc/systemdb/SystemDb.java index 53b849c765..c0e48d908c 100644 --- a/extended/src/main/java/apoc/systemdb/SystemDb.java +++ b/extended/src/main/java/apoc/systemdb/SystemDb.java @@ -7,6 +7,7 @@ import apoc.export.cypher.FileManagerFactory; import apoc.export.util.ExportConfig; import apoc.export.util.ProgressReporter; +import apoc.result.ExportProgressInfo; import apoc.result.ProgressInfo; import apoc.result.RowResult; import apoc.result.VirtualNode; @@ -62,12 +63,12 @@ public NodesAndRelationshipsResult(List nodes, List relation @Admin @Procedure(name = "apoc.systemdb.export.metadata") @Description("apoc.systemdb.export.metadata($conf) - export the apoc feature saved in system db (that is: customProcedures, triggers, uuids, and dvCatalogs) in multiple files called ...cypher") - public Stream metadata(@Name(value = "config",defaultValue = "{}") Map config) { + public Stream metadata(@Name(value = "config",defaultValue = "{}") Map config) { final SystemDbConfig conf = new SystemDbConfig(config); final String fileName = conf.getFileName(); apocConfig.checkWriteAllowed(null, fileName); - ProgressInfo progressInfo = new ProgressInfo(fileName, null, "cypher"); + ProgressInfo progressInfo = new ExportProgressInfo(fileName, null, "cypher"); ProgressReporter progressReporter = new ProgressReporter(null, null, progressInfo); ExportFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName + ".cypher", true, ExportConfig.EMPTY); withSystemDbTransaction(tx -> { @@ -92,7 +93,7 @@ public Stream metadata(@Name(value = "config",defaultValue = "{}") }); progressReporter.done(); - return progressReporter.stream(); + return Stream.of((ExportProgressInfo) progressReporter.getTotal()); } @Admin diff --git a/extended/src/test/java/apoc/export/arrow/ArrowTest.java b/extended/src/test/java/apoc/export/arrow/ArrowTest.java new file mode 100644 index 0000000000..5ff4355475 --- /dev/null +++ b/extended/src/test/java/apoc/export/arrow/ArrowTest.java @@ -0,0 +1,402 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.export.arrow; + +import apoc.graph.Graphs; +import apoc.load.LoadArrow; +import apoc.meta.Meta; +import apoc.util.JsonUtil; +import apoc.util.TestUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseInternalSettings; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Result; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.io.File; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static org.junit.Assert.assertEquals; + +public class ArrowTest { + + private static File directory = new File("target/arrow import"); + + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting( + GraphDatabaseSettings.load_csv_file_url_root, + directory.toPath().toAbsolutePath()) + .withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true); + + public static final List> EXPECTED = List.of( + new HashMap<>() { + { + put("name", "Adam"); + put("bffSince", null); + put("", null); + put("", 0L); + put("age", 42L); + put("labels", List.of("User")); + put("male", true); + put("", null); + put("kids", List.of("Sam", "Anna", "Grace")); + put( + "place", + Map.of("crs", "wgs-84-3d", "longitude", 33.46789D, "latitude", 13.1D, "height", 100.0D)); + put("", null); + put("since", null); + put( + "born", + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime()); + } + }, + new HashMap<>() { + { + put("name", "Jim"); + put("bffSince", null); + put("", null); + put("", 1L); + put("age", 42L); + put("labels", List.of("User")); + put("male", null); + put("", null); + put("kids", null); + put("place", null); + put("", null); + put("since", null); + put("born", null); + } + }, + new HashMap<>() { + { + put("name", null); + put("bffSince", "P5M1DT12H"); + put("", 0L); + put("", 0L); + put("age", null); + put("labels", null); + put("male", null); + put("", "KNOWS"); + put("kids", null); + put("place", null); + put("", 1L); + put("since", 1993L); + put("born", null); + } + }); + + @BeforeClass + public static void beforeClass() { + db.executeTransactionally( + "CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + TestUtil.registerProcedure(db, ExportArrow.class, LoadArrow.class, Graphs.class, Meta.class); + } + + @AfterClass + public static void teardown() { + db.shutdown(); + } + + @Before + public void before() { + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + } + + private byte[] extractByteArray(Result result) { + return result.columnAs("byteArray").next(); + } + + private String extractFileName(Result result) { + return result.columnAs("file").next(); + } + + private T readValue(String json, Class clazz) { + if (json == null) return null; + try { + return JsonUtil.OBJECT_MAPPER.readValue(json, clazz); + } catch (JsonProcessingException e) { + return null; + } + } + + @Test + public void testStreamRoundtripArrowQuery() { + // given - when + final String returnQuery = "RETURN 1 AS intData," + "'a' AS stringData," + + "true AS boolData," + + "[1, 2, 3] AS intArray," + + "[1.1, 2.2, 3.3] AS doubleArray," + + "[true, false, true] AS boolArray," + + "[1, '2', true, null] AS mixedArray," + + "{foo: 'bar'} AS mapData," + + "localdatetime('2015-05-18T19:32:24') as dateData," + + "[[0]] AS arrayArray," + + "1.1 AS doubleData"; + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query($query) YIELD value AS byteArray", + Map.of("query", returnQuery), + this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final Map row = (Map) result.next().get("value"); + assertEquals(1L, row.get("intData")); + assertEquals("a", row.get("stringData")); + assertEquals(Arrays.asList(1L, 2L, 3L), row.get("intArray")); + assertEquals(Arrays.asList(1.1D, 2.2D, 3.3), row.get("doubleArray")); + assertEquals(Arrays.asList(true, false, true), row.get("boolArray")); + assertEquals(Arrays.asList("1", "2", "true", null), row.get("mixedArray")); + assertEquals("{\"foo\":\"bar\"}", row.get("mapData")); + assertEquals( + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime(), + row.get("dateData")); + assertEquals(Arrays.asList("[0]"), row.get("arrayArray")); + assertEquals(1.1D, row.get("doubleData")); + return true; + }); + } + + @Test + public void testFileRoundtripArrowQuery() { + // given - when + final String returnQuery = "RETURN 1 AS intData," + "'a' AS stringData," + + "true AS boolData," + + "[1, 2, 3] AS intArray," + + "[1.1, 2.2, 3.3] AS doubleArray," + + "[true, false, true] AS boolArray," + + "[1, '2', true, null] AS mixedArray," + + "{foo: 'bar'} AS mapData," + + "localdatetime('2015-05-18T19:32:24') as dateData," + + "[[0]] AS arrayArray," + + "1.1 AS doubleData"; + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.query('query_test.arrow', $query) YIELD file", + Map.of("query", returnQuery), + this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final Map row = (Map) result.next().get("value"); + assertEquals(1L, row.get("intData")); + assertEquals("a", row.get("stringData")); + assertEquals(Arrays.asList(1L, 2L, 3L), row.get("intArray")); + assertEquals(Arrays.asList(1.1D, 2.2D, 3.3), row.get("doubleArray")); + assertEquals(Arrays.asList(true, false, true), row.get("boolArray")); + assertEquals(Arrays.asList("1", "2", "true", null), row.get("mixedArray")); + assertEquals("{\"foo\":\"bar\"}", row.get("mapData")); + assertEquals( + LocalDateTime.parse("2015-05-18T19:32:24.000") + .atOffset(ZoneOffset.UTC) + .toZonedDateTime(), + row.get("dateData")); + assertEquals(Arrays.asList("[0]"), row.get("arrayArray")); + assertEquals(1.1D, row.get("doubleData")); + return true; + }); + } + + @Test + public void testStreamRoundtripArrowGraph() { + // given - when + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.graph.fromDB('neo4j',{}) yield graph " + + "CALL apoc.export.arrow.stream.graph(graph) YIELD value AS byteArray " + + "RETURN byteArray", + Map.of(), + this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + private List> getActual(Result result) { + return result.stream() + .map(m -> (Map) m.get("value")) + .map(m -> { + final Map newMap = new HashMap(m); + newMap.put("place", readValue((String) m.get("place"), Map.class)); + return newMap; + }) + .collect(Collectors.toList()); + } + + @Test + public void testFileRoundtripArrowGraph() { + // given - when + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.graph.fromDB('neo4j',{}) yield graph " + + "CALL apoc.export.arrow.graph('graph_test.arrow', graph) YIELD file " + + "RETURN file", + Map.of(), + this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testStreamRoundtripArrowAll() { + testStreamRoundtripAllCommon(); + } + + @Test + public void testStreamRoundtripArrowAllWithImportExportConfsDisabled() { + // disable both export and import configs + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, false); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, false); + + // should work regardless of the previous config + testStreamRoundtripAllCommon(); + } + + private void testStreamRoundtripAllCommon() { + // given - when + final byte[] byteArray = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.all() YIELD value AS byteArray ", Map.of(), this::extractByteArray); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow.stream($byteArray) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("byteArray", byteArray), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testFileRoundtripArrowAll() { + // given - when + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.all('all_test.arrow') YIELD file", Map.of(), this::extractFileName); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List> actual = getActual(result); + assertEquals(EXPECTED, actual); + return null; + }); + } + + @Test + public void testStreamVolumeArrowAll() { + // given - when + db.executeTransactionally("UNWIND range(0, 10000 - 1) AS id CREATE (n:ArrowNode{id:id})"); + + final List list = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query('MATCH (n:ArrowNode) RETURN n.id AS id') YIELD value AS byteArray ", + Map.of(), + result -> result.columnAs("byteArray").stream().collect(Collectors.toList())); + + final List expected = LongStream.range(0, 10000).mapToObj(l -> l).collect(Collectors.toList()); + + // then + final String query = "CYPHER 25 UNWIND $list AS byteArray " + "CALL apoc.load.arrow.stream(byteArray) YIELD value " + + "RETURN value.id AS id"; + db.executeTransactionally(query, Map.of("list", list), result -> { + final List actual = + result.stream().map(m -> (Long) m.get("id")).sorted().collect(Collectors.toList()); + assertEquals(expected, actual); + return null; + }); + + db.executeTransactionally("MATCH (n:ArrowNode) DELETE n"); + } + + @Test + public void testFileVolumeArrowAll() { + // given - when + db.executeTransactionally("UNWIND range(0, 10000 - 1) AS id CREATE (:ArrowNode{id:id})"); + + String file = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.query('volume_test.arrow', 'MATCH (n:ArrowNode) RETURN n.id AS id') YIELD file ", + Map.of(), + this::extractFileName); + + final List expected = LongStream.range(0, 10000).mapToObj(l -> l).collect(Collectors.toList()); + + // then + final String query = "CYPHER 25 CALL apoc.load.arrow($file) YIELD value " + "RETURN value.id AS id"; + db.executeTransactionally(query, Map.of("file", file), result -> { + final List actual = + result.stream().map(m -> (Long) m.get("id")).sorted().collect(Collectors.toList()); + assertEquals(expected, actual); + return null; + }); + + db.executeTransactionally("MATCH (n:ArrowNode) DELETE n"); + } + + @Test + public void testValidNonStorableQuery() { + final List list = db.executeTransactionally( + "CYPHER 25 CALL apoc.export.arrow.stream.query($query) YIELD value AS byteArray ", + Map.of("query", "RETURN [1, true, 2.3, null, { name: 'Dave' }] AS array"), + result -> result.columnAs("byteArray").stream().collect(Collectors.toList())); + + final List expected = Arrays.asList("1", "true", "2.3", null, "{\"name\":\"Dave\"}"); + + // then + final String query = "CYPHER 25 UNWIND $list AS byteArray " + "CALL apoc.load.arrow.stream(byteArray) YIELD value " + + "RETURN value.array AS array"; + db.executeTransactionally(query, Map.of("list", list), result -> { + List actual = result.>columnAs("array").next(); + assertEquals(expected, actual); + return null; + }); + } +} diff --git a/extended/src/test/java/apoc/export/arrow/ImportArrowTest.java b/extended/src/test/java/apoc/export/arrow/ImportArrowTest.java index d5a44b24b2..aa5619d208 100644 --- a/extended/src/test/java/apoc/export/arrow/ImportArrowTest.java +++ b/extended/src/test/java/apoc/export/arrow/ImportArrowTest.java @@ -6,6 +6,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseInternalSettings; import org.neo4j.configuration.GraphDatabaseSettings; import org.neo4j.graphdb.Node; import org.neo4j.graphdb.Relationship; @@ -45,6 +46,7 @@ public class ImportArrowTest { @ClassRule public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true) .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); @@ -67,7 +69,7 @@ public void before() { @Test public void testStreamRoundtripImportArrowAll() { - final byte[] bytes = db.executeTransactionally("CALL apoc.export.arrow.stream.all", + final byte[] bytes = db.executeTransactionally("CYPHER 25 CALL apoc.export.arrow.stream.all", Map.of(), this::extractByteArray); @@ -76,7 +78,7 @@ public void testStreamRoundtripImportArrowAll() { @Test public void testFileRoundtripImportArrowAll() { - String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", + String file = db.executeTransactionally("CYPHER 25 CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); @@ -85,7 +87,7 @@ public void testFileRoundtripImportArrowAll() { @Test public void testFileRoundtripImportArrowAllWithSmallBatchSize() { - String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", + String file = db.executeTransactionally("CYPHER 25 CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); diff --git a/extended/src/test/java/apoc/load/LoadJsonTest.java b/extended/src/test/java/apoc/load/LoadJsonTest.java new file mode 100644 index 0000000000..fc245fd708 --- /dev/null +++ b/extended/src/test/java/apoc/load/LoadJsonTest.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import apoc.util.CompressionAlgo; +import apoc.util.JsonUtil; +import apoc.util.TestUtil; +import apoc.util.Util; +import com.sun.net.httpserver.HttpContext; +import com.sun.net.httpserver.HttpServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.mockserver.client.MockServerClient; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.Header; +import org.neo4j.configuration.GraphDatabaseInternalSettings; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Result; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_USE_NEO4J_CONFIG; +import static apoc.ApocConfig.apocConfig; +import static apoc.util.BinaryTestUtil.fileToBinary; +import static apoc.util.CompressionConfig.COMPRESSION; +import static apoc.util.MapUtil.map; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testResult; +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.matchers.Times.exactly; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class LoadJsonTest { + + private static ClientAndServer mockServer; + private HttpServer server; + + @BeforeClass + public static void startServer() { + mockServer = startClientAndServer(1080); + } + + @AfterClass + public static void stopServer() { + mockServer.stop(); + } + + @Rule + public DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.memory_tracking, true) + .withSetting(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true); + + @Before + public void setUp() throws IOException { + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_IMPORT_FILE_USE_NEO4J_CONFIG, false); + apocConfig().setProperty("apoc.json.zip.url", "http://localhost:5353/testload.zip?raw=true!person.json"); + apocConfig() + .setProperty( + "apoc.json.simpleJson.url", + ClassLoader.getSystemResource("map.json").toString()); + TestUtil.registerProcedure(db, LoadJsonExtended.class); + + server = HttpServer.create(new InetSocketAddress(5353), 0); + HttpContext staticContext = server.createContext("/"); + staticContext.setHandler(new SimpleHttpHandler()); + server.start(); + } + + @After + public void cleanup() { + server.stop(0); + db.shutdown(); + } + + @Test + public void testLoadMultiJsonWithBinary() { + testResult( + db, + "CYPHER 25 CALL apoc.load.jsonParams($url, null, null, null, $config)", + map( + "url", + fileToBinary( + new File(ClassLoader.getSystemResource("multi.json") + .getPath()), + CompressionAlgo.FRAMED_SNAPPY.name()), + "config", + map(COMPRESSION, CompressionAlgo.FRAMED_SNAPPY.name())), + this::commonAssertionsLoadJsonMulti); + } + + private void commonAssertionsLoadJsonMulti(Result result) { + Map row = result.next(); + assertEquals(map("foo", asList(1L, 2L, 3L)), row.get("value")); + row = result.next(); + assertEquals(map("bar", asList(4L, 5L, 6L)), row.get("value")); + assertFalse(result.hasNext()); + } + + @Test + public void testLoadJsonParamsWithAuth() throws Exception { + String userPass = "user:password"; + String token = Util.encodeUserColonPassToBase64(userPass); + Map responseBody = Map.of("result", "message"); + + new MockServerClient("localhost", 1080) + .when( + request() + .withMethod("POST") + .withPath("/docs/search") + .withHeader("Authorization", "Basic " + token) + .withHeader("Content-type", "application/json") + .withBody("{\"query\":\"pagecache\",\"version\":\"3.5\"}"), + exactly(1)) + .respond(response() + .withStatusCode(200) + .withHeaders( + new Header("Content-Type", "application/json"), + new Header("Cache-Control", "public, max-age=86400")) + .withBody(JsonUtil.OBJECT_MAPPER.writeValueAsString(responseBody)) + .withDelay(TimeUnit.SECONDS, 1)); + + testCall( + db, + "CYPHER 25 CALL apoc.load.jsonParams($url, $config, $payload)", + map( + "payload", + "{\"query\":\"pagecache\",\"version\":\"3.5\"}", + "url", + "http://" + userPass + "@localhost:1080/docs/search", + "config", + map("method", "POST", "Content-Type", "application/json")), + (row) -> assertEquals(responseBody, row.get("value"))); + } + + @Test + public void testLoadJsonParams() { + new MockServerClient("localhost", 1080) + .when( + request() + .withMethod("POST") + .withPath("/docs/search") + .withHeader("Content-type", "application/json"), + exactly(1)) + .respond(response() + .withStatusCode(200) + .withHeaders( + new Header("Content-Type", "application/json"), + new Header("Cache-Control", "public, max-age=86400")) + .withBody("{ result: 'message' }") + .withDelay(TimeUnit.SECONDS, 1)); + + testCall( + db, + "CYPHER 25 CALL apoc.load.jsonParams($url, $config, $json)", + map( + "json", + "{\"query\":\"pagecache\",\"version\":\"3.5\"}", + "url", + "http://localhost:1080/docs/search", + "config", + map("method", "POST", "Content-Type", "application/json")), + (row) -> { + Map value = (Map) row.get("value"); + assertFalse("value should be not empty", value.isEmpty()); + }); + } +} diff --git a/extended/src/test/java/apoc/load/SimpleHttpHandler.java b/extended/src/test/java/apoc/load/SimpleHttpHandler.java new file mode 100644 index 0000000000..62776b2eb7 --- /dev/null +++ b/extended/src/test/java/apoc/load/SimpleHttpHandler.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.load; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +/** + * A Simple HTTP Handler to serve resource files to be used in testing. + * This allows us to not rely on Github or other websites being up 100% of the time + * which would result in flaky tests. + */ +public class SimpleHttpHandler implements HttpHandler { + + private static final Map MIME_MAP = new HashMap<>(); + + static { + MIME_MAP.put("html", "text/html"); + MIME_MAP.put("json", "application/json"); + MIME_MAP.put("xml", "application/xml"); + MIME_MAP.put("zip", "application/zip"); + MIME_MAP.put("tgz", "application/x-gzip"); + MIME_MAP.put("gz", "application/x-gzip"); + MIME_MAP.put("txt", "text/plain"); + } + + public void handle(HttpExchange t) throws IOException { + URI uri = t.getRequestURI(); + String path = uri.getPath().substring(1); + + File file; + try { + file = new File(Thread.currentThread() + .getContextClassLoader() + .getResource(path) + .toURI()); + } catch (Exception e) { + // Object does not exist or is not a file: reject with 404 error. + String response = "404 (Not Found)\n"; + t.sendResponseHeaders(404, response.length()); + OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + return; + } + + // Object exists and is a file: accept with response code 200. + String mime = lookupMime(path); + + Headers h = t.getResponseHeaders(); + h.set("Content-Type", mime); + t.sendResponseHeaders(200, 0); + + OutputStream os = t.getResponseBody(); + FileInputStream fs = new FileInputStream(file); + byte[] buffer = new byte[1024]; + int count; + while ((count = fs.read(buffer)) != -1) { + os.write(buffer, 0, count); + } + fs.close(); + os.close(); + } + + private static String getExt(String path) { + int slashIndex = path.lastIndexOf('/'); + String basename = (slashIndex < 0) ? path : path.substring(slashIndex + 1); + + int dotIndex = basename.lastIndexOf('.'); + if (dotIndex >= 0) { + return basename.substring(dotIndex + 1); + } else { + return ""; + } + } + + private static String lookupMime(String path) { + String ext = getExt(path).toLowerCase(); + return MIME_MAP.getOrDefault(ext, "application/octet-stream"); + } +} diff --git a/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java b/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java new file mode 100644 index 0000000000..f0fb5ac268 --- /dev/null +++ b/extended/src/test/java/apoc/log/Neo4jLogStreamTest.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package apoc.log; + +import apoc.util.TestUtil; +import apoc.util.collection.Iterators; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseInternalSettings; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.dbms.api.DatabaseManagementService; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.test.TestDatabaseManagementServiceBuilder; + +import java.nio.file.Paths; +import java.util.UUID; +import java.util.stream.Collectors; + +import static apoc.ApocConfig.apocConfig; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertTrue; + +public class Neo4jLogStreamTest { + + private GraphDatabaseService db; + private DatabaseManagementService dbManagementService; + + @Before + public void setUp() { + dbManagementService = new TestDatabaseManagementServiceBuilder( + Paths.get("target", UUID.randomUUID().toString()).toAbsolutePath()) + .setConfig(GraphDatabaseInternalSettings.enable_experimental_cypher_versions, true) + .build(); + apocConfig().setProperty("server.directories.logs", ""); + db = dbManagementService.database(GraphDatabaseSettings.DEFAULT_DATABASE_NAME); + TestUtil.registerProcedure(db, Neo4jLogStream.class); + } + + @After + public void teardown() { + dbManagementService.shutdown(); + } + + @Test + public void testLogStream() { + testResult(db, "CYPHER 25 CALL apoc.log.stream('debug.log')", res -> { + final String wholeFile = + Iterators.stream(res.columnAs("line")).collect(Collectors.joining("")); + assertTrue(wholeFile.contains("apoc.import.file.enabled=false")); + }); + } +} diff --git a/extended/src/test/kotlin/apoc/nlp/NodeMatcher.kt b/extended/src/test/kotlin/apoc/nlp/NodeMatcher.kt index 653bdbd8dd..6656c0e5e6 100644 --- a/extended/src/test/kotlin/apoc/nlp/NodeMatcher.kt +++ b/extended/src/test/kotlin/apoc/nlp/NodeMatcher.kt @@ -11,7 +11,7 @@ data class NodeMatcher(private val labels: List