Skip to content

Commit

Permalink
feat(#129): add back automatic pipeline updates (#130)
Browse files Browse the repository at this point in the history
* feat(#129): add back automatic pipeline updates

* fix(#129): fixing tests, increasing timeout and reducing delay
  • Loading branch information
witash authored Aug 2, 2024
1 parent 63cc236 commit fc73fd7
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 89 deletions.
202 changes: 116 additions & 86 deletions dbt/dbt-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,91 +22,121 @@ def connection():
print(f"Unable to connect! (Attempt {attempt})", e)
raise psycopg2.OperationalError("Could not connect to postgres")


# Create schema
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE SCHEMA IF NOT EXISTS {os.getenv('POSTGRES_SCHEMA')};
""")
conn.commit()

with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS
{os.getenv('POSTGRES_SCHEMA')}._dataemon (
inserted_on TIMESTAMP DEFAULT NOW(),
packages jsonb, manifest jsonb
)
""")
conn.commit()

package_json = '{}'

if os.getenv("DBT_PACKAGE_TARBALL_URL"):
print(os.getenv("DBT_PACKAGE_TARBALL_URL"))
init_package = urlparse(os.getenv("DBT_PACKAGE_TARBALL_URL"))
package_json = json.dumps({"packages": [{
"tarball": init_package.geturl(),
"name": "packages"
}]})

if os.getenv("CHT_PIPELINE_BRANCH_URL"):
init_package = urlparse(os.getenv("CHT_PIPELINE_BRANCH_URL"))
if init_package.scheme in ["http", "https"]:
package_json = json.dumps({"packages": [{
"git": init_package._replace(fragment='').geturl(),
"revision": init_package.fragment
}]})

with open("/dbt/packages.yml", "w") as f:
f.write(package_json)

subprocess.run(["dbt", "deps", "--profiles-dir", ".dbt"])

# load old manifest from db
with connection() as conn:
def setup():
# Create schema
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE SCHEMA IF NOT EXISTS {os.getenv('POSTGRES_SCHEMA')};
""")
conn.commit()

with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS
{os.getenv('POSTGRES_SCHEMA')}._dataemon (
inserted_on TIMESTAMP DEFAULT NOW(),
packages jsonb, manifest jsonb
)
""")
conn.commit()

def get_package():
package_json = '{}'

if os.getenv("DBT_PACKAGE_TARBALL_URL"):
print(os.getenv("DBT_PACKAGE_TARBALL_URL"))
init_package = urlparse(os.getenv("DBT_PACKAGE_TARBALL_URL"))
package_json = json.dumps({"packages": [{
"tarball": init_package.geturl(),
"name": "packages"
}]})

if os.getenv("CHT_PIPELINE_BRANCH_URL"):
init_package = urlparse(os.getenv("CHT_PIPELINE_BRANCH_URL"))
if init_package.scheme in ["http", "https"]:
package_json = json.dumps({"packages": [{
"git": init_package._replace(fragment='').geturl(),
"revision": init_package.fragment
}]})

with open("/dbt/packages.yml", "w") as f:
f.write(package_json)

return package_json


def get_manifest():
# load old manifest from db
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT manifest
FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon
ORDER BY inserted_on DESC
""")
manifest = cur.fetchone()

# save to file if found
if manifest and len(manifest) > 0:
with open("/dbt/old_manifest/manifest.json", "w") as f:
f.write(json.dumps(manifest[0]));

# run dbt ls to make sure current manifest is generated
subprocess.run(["dbt", "ls", "--profiles-dir", ".dbt"])

new_manifest = '{}'
with open("/dbt/target/manifest.json", "r") as f:
new_manifest = f.read()

return new_manifest;

def save_package_manifest(package_json, manifest_json):
with connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
SELECT manifest
FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon
ORDER BY inserted_on DESC
""")
manifest = cur.fetchone()

# save to file if found
if manifest and len(manifest) > 0:
with open("/dbt/old_manifest/manifest.json", "w") as f:
f.write(json.dumps(manifest[0]));

# run dbt ls to make sure current manifest is generated
subprocess.run(["dbt", "ls", "--profiles-dir", ".dbt"])

new_manifest = '{}'
with open("/dbt/target/manifest.json", "r") as f:
new_manifest = f.read()

cur.execute(
f"INSERT INTO {os.getenv('POSTGRES_SCHEMA')}._dataemon "
"(packages, manifest) VALUES (%s, %s);",
[package_json, new_manifest]
)
conn.commit()

# anything that changed, run a full refresh
subprocess.run(["dbt", "run",
"--profiles-dir",
".dbt",
"--select",
"state:modified",
"--full-refresh",
"--state",
"./old_manifest"])

# run views (which may not have changed but need to be created)
subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--select", "config.materialized:view"])

while True:
subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--exclude", "config.materialized:view"])
# because manifest is large, delete old entries
# we only want the current/latest data
cur.execute(
f"DELETE FROM {os.getenv('POSTGRES_SCHEMA')}._dataemon "
)
cur.execute(
f"INSERT INTO {os.getenv('POSTGRES_SCHEMA')}._dataemon "
"(packages, manifest) VALUES (%s, %s);",
[package_json, manifest_json]
)
conn.commit()


def update_models():
# install the cht pipeline package
package_json = get_package()
subprocess.run(["dbt", "deps", "--profiles-dir", ".dbt", "--upgrade"])

# check for new changes using the manifest
manifest_json = get_manifest()

# save the new manifest and package for the next run
save_package_manifest(package_json, manifest_json)

# anything that changed, run a full refresh
subprocess.run(["dbt", "run",
"--profiles-dir",
".dbt",
"--select",
"state:modified",
"--full-refresh",
"--state",
"./old_manifest"])

def run_incremental_models():
# update incremental models (and tables if there are any)
subprocess.run(["dbt", "run", "--profiles-dir", ".dbt", "--exclude", "config.materialized:view"])


if __name__ == "__main__":
setup()
while True:
update_models()
run_incremental_models()
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "",
"scripts": {
"postinstall": "cd couch2pg && npm ci",
"test:e2e": "npm run test:e2e-data && npm run test:e2e-stop-containers && npm run test:e2e-containers && mocha tests/**/*.spec.js --timeout 50000",
"test:e2e": "npm run test:e2e-data && npm run test:e2e-containers && mocha tests/**/*.spec.js --timeout 50000; npm run test:e2e-stop-containers ",
"lint": "eslint --color --cache .",
"test:e2e-stop-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml down -v",
"test:e2e-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml up -d --build --force-recreate",
Expand Down
1 change: 1 addition & 0 deletions tests/.e2e-env
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ DBT_POSTGRES_PASSWORD="postgres"
DBT_POSTGRES_SCHEMA="dbt"
DBT_POSTGRES_HOST="postgres"
DBT_PACKAGE_TARBALL_URL="http://dbt-package/dbt/package.tar.gz"
DATAEMON_INTERVAL=0
COUCHDB_USER="medic"
COUCHDB_PASSWORD="password"
COUCHDB_DBS="medic,medic-sentinel"
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e-test.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('Main workflow Test Suite', () => {
await delay(6); // wait for CHT-Sync
const pgTableContact = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [contact._id]);
expect(pgTableContact.rows[0]._deleted).to.equal(true);
await delay(6); // wait for DBT
await delay(12); // wait for DBT
const modelContactResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid = $1`, [contact._id]);
expect(modelContactResult.rows.length).to.equal(0);
});
Expand Down Expand Up @@ -131,7 +131,7 @@ describe('Main workflow Test Suite', () => {
await delay(6); // wait for CHT-Sync
const pgTableReport = await client.query(`SELECT * from ${PGTABLE} where _id = $1`, [report._id]);
expect(pgTableReport.rows[0]._deleted).to.equal(true);
await delay(6); // wait for DBT
await delay(12); // wait for DBT
const modelReportResult = await client.query(`SELECT * FROM ${pgSchema}.reports where uuid = $1`, [report._id]);
expect(modelReportResult.rows.length).to.equal(0);
});
Expand Down

0 comments on commit fc73fd7

Please sign in to comment.