diff --git a/.changeset/yellow-pillows-bathe.md b/.changeset/yellow-pillows-bathe.md new file mode 100644 index 00000000..8947f602 --- /dev/null +++ b/.changeset/yellow-pillows-bathe.md @@ -0,0 +1,5 @@ +--- +"app-gelinkt-notuleren": minor +--- + +Directly move LDES members to the correct graph using a custom `processPage` diff --git a/config/ldes-client/handleStreamEnd.ts b/config/ldes-client/handleStreamEnd.ts deleted file mode 100644 index 91955e7e..00000000 --- a/config/ldes-client/handleStreamEnd.ts +++ /dev/null @@ -1,313 +0,0 @@ -import { updateSudo, querySudo } from '@lblod/mu-auth-sudo'; -import { DIRECT_DATABASE_CONNECTION, GRAPH_STORE_URL, LDES_BASE, WORKING_GRAPH, FIRST_PAGE, CRON_PATTERN, LOG_LEVEL, TIME_PREDICATE, EXTRA_HEADERS } from './environment'; - -const PREFIXES = ` - PREFIX person: - PREFIX persoon: - PREFIX dct: - PREFIX mu: - PREFIX foaf: - PREFIX adms: - PREFIX org: - PREFIX mandaat: - PREFIX besluit: -` - -const PAGE_SIZE = 200 - -async function moveToPublic(type) { - const subjectQuery = ` - SELECT ?subject WHERE { - GRAPH { - ?subject a ${type}. - } - } - ` - const response = await querySudo(subjectQuery, {}, {sparqlEndpoint: "http://virtuoso:8890/sparql"}); - - const subjects = response.results.bindings.map((triple) => `<${triple.subject.value}>`) - - - - for(let i = 0; i < subjects.length; i+=PAGE_SIZE) { - - const deleteOldData = ` - DELETE { - GRAPH { - ?subject a ${type}; - ?predicate ?object. - } - }WHERE { - GRAPH { - ?subject a ${type}; - ?predicate ?object. - } - VALUES ?subject { - ${subjects.slice(i, i+PAGE_SIZE).join(' ')} - } - } - ` - await updateSudo(deleteOldData); - - const query= ` - DELETE { - GRAPH { - ?subject a ${type}; - ?predicate ?object. - } - }INSERT { - GRAPH { - ?subject a ${type}; - ?predicate ?object. - } - }WHERE { - GRAPH { - ?subject a ${type}; - ?predicate ?object. - } - VALUES ?subject { - ${subjects.slice(i, i+PAGE_SIZE).join(' ')} - } - } - ` - await updateSudo(query); - - } - -} - -export async function handleStreamEnd(){ - console.log("Stream ended"); - await moveToPublic('') - await moveToPublic('') - await moveToPublic('') - - const personPublicSubjectQuery = ` - ${PREFIXES} - SELECT ?subject WHERE { - GRAPH { - ?subject a person:Person. - } - } - ` - const responsePublic = await querySudo(personPublicSubjectQuery, {}, {sparqlEndpoint: "http://virtuoso:8890/sparql"}); - - const subjectsPublic = responsePublic.results.bindings.map((triple) => `<${triple.subject.value}>`) - - for(let i = 0; i < subjectsPublic.length; i+=PAGE_SIZE) { - - const personPublicDeleteOldDataQuery = ` - ${PREFIXES} - DELETE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - - } WHERE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - VALUES ?predicatePerson{ - dct:modified - mu:uuid - foaf:familyName - persoon:gebruikteVoornaam - } - VALUES ?person { - ${subjectsPublic.slice(i, i+PAGE_SIZE).join(' ')} - } - } - ` - await updateSudo(personPublicDeleteOldDataQuery); - const personPublicQuery = ` - ${PREFIXES} - INSERT { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - }WHERE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - VALUES ?predicatePerson{ - dct:modified - mu:uuid - foaf:familyName - persoon:gebruikteVoornaam - } - VALUES ?person { - ${subjectsPublic.slice(i, i+PAGE_SIZE).join(' ')} - } - } - - ` - await updateSudo(personPublicQuery); - - - - } - - // We move to the person staging because we want to keep all the data, but not influence future public person migration - // Person in mandaten staging graph = waiting to be moved to lmb-public - // Person in person staging graph = waiting to be moved to lmb-private - - const moveBirthdaysToPersonStagingQuery = ` - ${PREFIXES} - DELETE { - GRAPH { - ?birthdate ?birthdatePredicate ?birthdateObject. - } - } INSERT { - GRAPH { - ?birthdate ?birthdatePredicate ?birthdateObject. - } - }WHERE { - GRAPH { - ?person a person:Person. - ?person persoon:heeftGeboorte ?birthdate. - ?birthdate ?birthdatePredicate ?birthdateObject. - } - } - ` - - await updateSudo(moveBirthdaysToPersonStagingQuery, {}, {sparqlEndpoint: "http://virtuoso:8890/sparql"}); - const moveToPersonStagingQuery = ` - ${PREFIXES} - DELETE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - } INSERT { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - }WHERE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - } - - ` - await updateSudo(moveToPersonStagingQuery, {}, {sparqlEndpoint: "http://virtuoso:8890/sparql"}); - - const personPrivateSubjectQuery = ` - ${PREFIXES} - SELECT ?subject WHERE { - GRAPH { - ?subject a person:Person; - ?predicatePerson ?objectPerson. - ?subject persoon:heeftGeboorte ?birthdate. - ?birthdate ?birthdatePredicate ?birthdateObject. - OPTIONAL { - ?subject adms:identifier ?identifier. - ?identifier ?identifierPredicate ?identifierObject. - } - } - GRAPH { - ?mandataris mandaat:isBestuurlijkeAliasVan ?subject. - - ?mandataris org:holds ?mandat. - - } - GRAPH { - ?temporaryBestuursorgan org:hasPost ?mandat; - mandaat:isTijdspecialisatieVan ?bestuursorgan. - ?bestuursorgan besluit:bestuurt ?adminUnit. - ?adminUnit mu:uuid ?adminUnitUuid. - } - BIND(IRI(CONCAT("http://mu.semte.ch/graphs/lmb-data-private/", ?adminUnitUuid)) AS ?g) - } - ` - const responsePrivate = await querySudo(personPrivateSubjectQuery, {}, {sparqlEndpoint: "http://virtuoso:8890/sparql"}); - - const subjectsPrivate = responsePrivate.results.bindings.map((triple) => `<${triple.subject.value}>`) - - for(let i = 0; i < subjectsPrivate.length; i+=PAGE_SIZE) { - - const deleteOldDataPersonPrivateQuery = ` - ${PREFIXES} - DELETE { - GRAPH ?g { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - - } WHERE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - GRAPH { - ?mandataris mandaat:isBestuurlijkeAliasVan ?person. - - ?mandataris org:holds ?mandat. - - } - GRAPH { - ?temporaryBestuursorgan org:hasPost ?mandat; - mandaat:isTijdspecialisatieVan ?bestuursorgan. - ?bestuursorgan besluit:bestuurt ?adminUnit. - ?adminUnit mu:uuid ?adminUnitUuid. - } - BIND(IRI(CONCAT("http://mu.semte.ch/graphs/lmb-data-private/", ?adminUnitUuid)) AS ?g) - VALUES ?person { - ${subjectsPrivate.slice(i, i+PAGE_SIZE).join(' ')} - } - - } - - ` - - await updateSudo(deleteOldDataPersonPrivateQuery); - - const personPrivateQuery = ` - ${PREFIXES} - DELETE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - } - }INSERT { - GRAPH ?g { - ?person a person:Person; - ?predicatePerson ?objectPerson. - ?birthdate ?birthdatePredicate ?birthdateObject. - } - }WHERE { - GRAPH { - ?person a person:Person; - ?predicatePerson ?objectPerson. - ?person persoon:heeftGeboorte ?birthdate. - ?birthdate ?birthdatePredicate ?birthdateObject. - } - GRAPH { - ?mandataris mandaat:isBestuurlijkeAliasVan ?person. - - ?mandataris org:holds ?mandat. - - } - GRAPH { - ?temporaryBestuursorgan org:hasPost ?mandat; - mandaat:isTijdspecialisatieVan ?bestuursorgan. - ?bestuursorgan besluit:bestuurt ?adminUnit. - ?adminUnit mu:uuid ?adminUnitUuid. - } - BIND(IRI(CONCAT("http://mu.semte.ch/graphs/lmb-data-private/", ?adminUnitUuid)) AS ?g) - VALUES ?person { - ${subjectsPrivate.slice(i, i+PAGE_SIZE).join(' ')} - } - - } - ` - await updateSudo(personPrivateQuery); - } - - console.log('LDES postproccessed') - } diff --git a/config/ldes-client/processPage.ts b/config/ldes-client/processPage.ts new file mode 100644 index 00000000..3df220c5 --- /dev/null +++ b/config/ldes-client/processPage.ts @@ -0,0 +1,86 @@ +// this is a winston logger +import { logger } from "../logger"; +import { updateSudo } from '@lblod/mu-auth-sudo'; +import { sparqlEscapeUri } from 'mu'; +import { BATCH_GRAPH, BYPASS_MU_AUTH, DIRECT_DATABASE_CONNECTION, TARGET_GRAPH, TIME_PREDICATE, VERSION_PREDICATE } from "../environment"; + +async function replaceExistingData() { + let options = {}; + if(BYPASS_MU_AUTH){ + options = { + sparqlEndpoint: DIRECT_DATABASE_CONNECTION, + } + } + const query = /* sparql */ ` + PREFIX rdf: + PREFIX rdfs: + PREFIX mandaat: + PREFIX org: + PREFIX person: + PREFIX dct: + PREFIX mu: + PREFIX foaf: + PREFIX persoon: + PREFIX ext: + PREFIX adms: + DELETE { + GRAPH ?target_graph { + ?s ?pOld ?oOld. + } + } + INSERT { + GRAPH ?target_graph { + ?s a ?type. + ?s ?pNew ?oNew. + } + } + WHERE { + GRAPH ${sparqlEscapeUri(BATCH_GRAPH)} { + ?stream ?versionedMember . + ?versionedMember ${sparqlEscapeUri(VERSION_PREDICATE)} ?s . + + { + ?versionedMember a ?type. + VALUES ?type { mandaat:Mandataris mandaat:Fractie org:Membership } + ?versionedMember ?pNew ?oNew. + BIND( as ?target_graph) + } + UNION + { + ?versionedMember a ?type. + VALUES ?type { person:Person } + ?versionedMember ?pNew ?oNew. + VALUES ?pNew { + dct:modified + mu:uuid + foaf:familyName + persoon:gebruikteVoornaam + } + BIND( as ?target_graph) + } + UNION + { + ?versionedMember a ?type. + VALUES ?type { person:Person persoon:Geboorte } + ?versionedMember ?pNew ?oNew. + FILTER (?pNew NOT IN ( adms:identifier )) + ?versionedMember ext:relatedTo ?administrativeUnit + BIND(URI(REPLACE(STR(?administrativeUnit), "http://data.lblod.info/id/bestuurseenheden/", "http://mu.semte.ch/graphs/lmb-data-private/")) as ?target_graph) + } + FILTER (?pNew NOT IN ( ${sparqlEscapeUri(VERSION_PREDICATE)}, ${sparqlEscapeUri(TIME_PREDICATE)}, ext:relatedTo )) + } + OPTIONAL { + GRAPH ?target_graph { + ?s ?pOld ?oOld. + } + } + } + ` + await updateSudo(query, {}, options); +} + +export async function processPage(){ + logger.debug('Running custom logic to process the current page'); + await replaceExistingData(); + return; +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 0b2addad..83e533ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -235,7 +235,7 @@ services: ldes-client: image: lblod/ldes-client:0.0.3 volumes: - - ./config/ldes-client/handleStreamEnd.ts:/config/handleStreamEnd.ts + - ./config/ldes-client/processPage.ts:/config/processPage.ts links: - database:database - virtuoso:virtuoso @@ -243,10 +243,10 @@ services: TARGET_GRAPH: "http://mu.semte.ch/graphs/mandaten-staging" DIRECT_DATABASE_CONNECTION: "http://virtuoso:8890/sparql" RANDOMIZE_GRAPHS: "true" - BATCH_SIZE: 200 - BYPASS_MU_AUTH: "true" + BATCH_SIZE: 100 + BYPASS_MU_AUTH: "false" EXTRA_HEADERS: "secret" FIRST_PAGE: "https:/dev.mandatenbeheer.lblod.info/streams/ldes/abb/1" LDES_BASE: "https:/dev.mandatenbeheer.lblod.info/streams/ldes/abb/" labels: - - "logging=true" \ No newline at end of file + - "logging=true"