-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update workflow #24
Update workflow #24
Conversation
pipeline/data.py
Outdated
# tables each time the flow is run to produce unique transactions | ||
# xref https://discourse.prefect.io/t/how-to-get-flow-count/3996 | ||
# if table in ["lineitem", "orders"]: | ||
# df[f"{table[0]}_orderkey"] += counter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to do more than just add now - 1995
or whatever. We might want to do an affine transform so that the entire previous range (like 1985-1995
) gets squeezed into the last hour.
def check_model_endpoint(): | ||
r = requests.get("http://0.0.0.0:8080/health") | ||
if not r.json() == ["ok"]: | ||
raise ValueError("Model endpoint isn't healthy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still doing the serving with --subdomain
or no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My hope is that serving is cheap and the ROI is high
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Not added yet -- that's for me to do today
pipeline/reduce.py
Outdated
@@ -18,7 +18,7 @@ | |||
|
|||
|
|||
@task | |||
def save_query(region, part_type): | |||
def save_query(segment): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a better name? Something that's connected to the query itself like "revenue_by_supplier" or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with unshipped_orders_by_revenue
-- further suggestions are welcome
|
||
outfile = RESULTS_DIR / region / part_type / "result.snappy.parquet" | ||
).compute() | ||
outfile = RESULTS_DIR / f"{segment}.snappy.parquet" | ||
fs.makedirs(outfile.parent, exist_ok=True) | ||
result.to_parquet(outfile, compression="snappy") | ||
|
||
|
||
@flow | ||
def query_reduce(): | ||
with lock_compact: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cluster creation should go here maybe instead of save_query
?
No description provided.