-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Downloading files from S3 broken in 2.1.0 #16148
Comments
@ConstantinoSchillebeeckx, can you try with the latest amazon provider, It seems like you aren't using constraints in your environment, which we highly suggest: |
These are not Airflow methods, but you are using the AWS library. My impression is that you forgot to flush the data to disk ( |
An update:
|
|
Didn't work either I was curious to see if I could take Airflow out of the equation, so I created a new virtual env and installed:
Then I executed the following script: # -*- coding: utf-8 -*-
import boto3
def download_file_from_s3():
s3 = boto3.resource('s3')
bucket = 'secret-bucket'
key = 'tmp.txt'
with open('/tmp/s3_hook.txt', 'w') as f:
s3.Bucket(bucket).Object(key).download_file(f.name)
print(f"File downloaded: {f.name}")
with open(f.name, 'r') as f_in:
print(f"FILE CONTENT {f_in.read()}")
download_file_from_s3() So, Finally, as a sanity check, I updated the DAG to match the script above: # -*- coding: utf-8 -*-
import os
import boto3
import logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
def download_file_from_s3():
# authed with ENVIRONMENT variables
s3 = boto3.resource('s3')
bucket = 'secret-bucket'
key = 'tmp.txt'
with open('/tmp/s3_hook.txt', 'w') as f:
s3.Bucket(bucket).Object(key).download_file(f.name)
logging.info(f"File downloaded: {f.name}")
with open(f.name, 'r') as f_in:
logging.info(f"FILE CONTENT {f_in.read()}")
dag = DAG(
"tmp",
catchup=False,
default_args={
"start_date": days_ago(1),
},
schedule_interval=None,
)
download_file_from_s3ile = PythonOperator(
task_id="download_file_from_s3ile", python_callable=download_file_from_s3, dag=dag
) This resulted in the same, erroneous (empty downloaded file) behavior. 😢 |
You have bad indentation in your last example:
Should be:
You are trying to read the file before it has been closed and flushed (which will happen when you exit "with" clause). |
I've updated the indentation and I'm still seeing the same behavior. |
I believe the reason is environmental (of where/how your workers are run) not Airflow itself. Are you running the manual part in the same instances/containers as Airflow ? If not can you run them there? I can imagine it might come from bad configuration of your service account/env variables used to authenticate to S3 or some configuration of your bucket (for example buckets can have configuration allowing downloading data only from specific IP addresses). There is also similar problem: sous-chefs/aws#317 where the problem is (apparently) caused by connecting from instance in a different region than the bucket. You need to try to run the code in the same machine, same user and the same environment variables that your airflow user is run with. |
I am indeed; I spin up all my docker services (locally), and then login to the worker service to execute the small script shown above (and confirm that I can correctly download the S3 file).
I'm not sure how this could be; I'm using the same Thanks for the link to a possible solution; this one confuses me too as S3 buckets are specified globally (i.e. have no region). |
maybe dump all the environment variables in both worker run Python operator, and when you "exec" into the same container. This is the only way I can see those can differ. Also - just to check again - make sure you are using the same user/group as the workers. |
Another update: I've dumped all the environment variables when running the DAG (both in 2.0.2 and 2.1.0) and the values are the same in both instances. Furthermore, I've logged into the worker service (as the airflow same user) and found the environment variables to also be the same there. So I've got 3 sets of environment variables, all the same value, however only one set (the one associated with 2.1.0) produces erroneous behavior. |
I've dug a bit more into this and found a few more clues. This line in the 2.0.2
2.1.0
I've done each of the following (independently) and it yielded a fix:
Given that I'm using the same version of botocore (1.18.18), I'm wondering if there's something about the Airflow logger that's triggering the streaming of the StreamingBody object during the logging statement (and thus resulting in an empty body when requesting the content again). |
I was also thinking it might be related to the Is there a quick and dirty way I can reset the logger to maybe |
Python does not require compilation, which means you can try to overwrite the file in image. |
Thanks for being so persistent @ConstantinoSchillebeeckx . Great investigation! It's a very subtle problem it seems. I believe @mik-laj is right. We seem to have indeed a problem here introduced by #15599. In SecretMasker all "iterable" objects are converted into lists by the SecretMasker https://github.com/apache/airflow/pull/15599/files#diff-63a47f524bd31519c96c4f7376b864a77eedf8d6cf59ef6e5c86d575d53a74c6R221 . Unfortunately, the StreamingBody is iterable: https://github.com/boto/botocore/blob/87facf71d42fa6a17d22ff946d76d3ea1d01f561/botocore/response.py#L89 - and it will read all the content of the body while converting it. The original Debug statement works fine without the SecretMasker because it uses %s to format the object, which will not call iter() method (it will just print the object + address). This would only show itself if (like you) you want to debug botocore and you set the level of logger of botocore to "Debug". That's why seting the "botocore.parser" level to "Critical" fixed the issue. I think there is no easy way to avoid similar problems and we should remove the "iterable" handling and just limit it to cases where those are explicit 'lists' of objects rather than Iterable objects ? Simply adding list to (set, tuple) above should do the job.. @ashb @ephraimbuddy - WDYT? |
Yeah, looking at Iterables was a mistake since many are non-rewindable or have side effects such as this. I was trying to protect against secrets in object representations and data frames etc here, but that might just be a caveat we have to document. |
I think adding lists and primitive strings/sets/ tuples /lista dicts recurrently should be quite safe |
Alternatively we can only handle instances implementing the collection protocol. An object implementing |
Should we mask the elements? What do you think about using |
Oh that's a good idea. An iterator's repr is extremely unlikely to consume it (and I'd argue it's a bug if it does). |
I am not sure _repr is a good idea. If anything, then I guess we should use str() output rather than __str or __repr. I think it is more important what str() does (which is __str first and __repr.second). The str() is printable representation which is what format/log is likely to use. But none of this is perfect though without parsing the actual format string. %s is likely most used and it works str + repr fallback. %r/!r is __repr. So in order to do it 'correctly' we'd have to parse the formatting string. Which i think we should not do. In this case trying to only mask primitive objects and their lists/tuples/sets sounds like much better idea. |
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: apache#16148
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: #16148
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: #16148 (cherry picked from commit d1d02b6)
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: apache#16148 (cherry picked from commit d1d02b6) (cherry picked from commit 4c37aea)
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: apache#16148 (cherry picked from commit d1d02b6) (cherry picked from commit 4c37aea) (cherry picked from commit 7e5968a)
Using Iterable in SecretsMasker might cause undesireable side effect in case the object passed as log parameter is an iterable object and actually iterating it is not idempotent. For example in case of botocore, it passes StreamingBody object to log and this object is Iterable. However it can be iterated only once. Masking causes the object to be iterated during logging and results in empty body when actual results are retrieved later. This change only iterates list type of objects and recurrently redacts only dicts/strs/tuples/sets/lists which should never produce any side effects as all those objects do not have side effects when they are accessed. Fixes: apache#16148 (cherry picked from commit d1d02b6) (cherry picked from commit 4c37aea) (cherry picked from commit 7e5968a) (cherry picked from commit 523bba0)
Apache Airflow version: 2.0.2 and 2.1.0
Environment:
uname -a
): Darwin CSchillebeeckx-0589.local 19.6.0 Darwin Kernel Version 19.6.0: Tue Jan 12 22:13:05 PST 2021; root:xnu-6153.141.16~1/RELEASE_X86_64 x86_64What happened:
I'm seeing issues with downloading files from S3 on 2.1.0; a file is created after download, however the file content is empty!
What you expected to happen:
Non-empty files :)
How to reproduce it:
The DAG I'm running:
The logged output from 2.0.2
test
) is properly shown in the logThe logged output from 2.1.0
Anything else we need to know:
pip freeze for 2.0.2:
pip freeze for 2.1.0:
The text was updated successfully, but these errors were encountered: