-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-3888] HA for metastore connection #4708
Conversation
Creating a connection to a metasotor with two hosts for high avitablity (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastor.
https://airflow.readthedocs.io/en/latest/concepts.html Why did you create a new way to define multiple connections instead of expanding the old one? Why is your solution better? Could you complete the documentation? |
The solution prepared by me is a little better, because I am based on checking the correctness of the connection to the metastore during the task and it is not necessary to repeat it in case of hitting the offline connection. Secondly, I do not choose connections randomly, but only check them in succession, as they were entered, thanks to which I have 100% certainty of the task in case of at least one working connection. |
I prepared a PR with my other idea: https://github.com/GuzikJakub/airflow/pull/1 Uses AA mechanisms to store multiple connections rather than specific to this particular hook. In addition, I changed the way of logging in to the correct one (LoggingMixin instead logging). And also created a method to increase the readability of the code. |
Thank you. I made a few changes because during the tests I detected several errors. I have a question, when I do marge to my pull it will be still possible to land it in the airflow main branch?
HA for Metastore
flake8 code repair
Flake8 repair
@gglanzani PTAL |
airflow/hooks/hive_hooks.py
Outdated
self.log.info("Trying to connect to %s:%s", conn.host, conn.port) | ||
try: | ||
result = host_socket.connect_ex((conn.host, conn.port)) | ||
host_socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can rewrite the try like this
if host_socket.connect_ex((conn.host, conn.port)) == 0:
self.log.info("Connected to %s:%s", conn.host, conn.port)
host_socket.close()
return conn
and avoid the break
, the results
and valid_conn
variable, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code corrected and thrown. The wider answer will be prepared below.
I also removed variables such as break, results and valid_conn
airflow/hooks/hive_hooks.py
Outdated
result = host_socket.connect_ex((conn.host, conn.port)) | ||
host_socket.close() | ||
except Exception: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we swallowing this one without at least logging?
I understand that one of the two hosts might fail, but if both fail I won't see anything in the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test (but I made some corrections to the code):
[2019-03-04 14:08:51,954] {hive_hooks.py:528} INFO - Trying to connect to Metastore1
[2019-03-04 14:08:54,960] {hive_hooks.py:534} INFO - Could not connect to Metastore1
[2019-03-04 14:08:54,960] {hive_hooks.py:528} INFO - Trying to connect to Metastore2
[2019-03-04 14:08:54,962] {hive_hooks.py:534} INFO - Could not connect to Metastore2
[2019-03-04 14:08:54,970] {models.py:1760} ERROR - Failed to locate the valid server.
Traceback (most recent call last):
File "/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/airflow/sensors/base_sensor_operator.py", line 68, in execute
while not self.poke(context):
File "/airflow/sensors/hive_partition_sensor.py", line 73, in poke
metastore_conn_id=self.metastore_conn_id)
File "/airflow/hooks/hive_hooks.py", line 468, in init
self.metastore = self.get_metastore_client()
File "/airflow/hooks/hive_hooks.py", line 491, in get_metastore_client
raise AirflowException("Failed to locate the valid server.")
airflow.exceptions.AirflowException: Failed to locate the valid server.
[2019-03-04 14:08:54,971] {models.py:1789} INFO - All retries failed; marking task as FAILED
@GuzikJakub In principle LGTM, I've requested some changes to make the code easier (to understand) and for logging purposes. It would be nice to add a test (possibly connecting to a port where nothing is listening?). @Fokko what's your opinion on the test? |
Code behavior improvements
@gglanzani I introduced the changes mentioned above. I have also added a test response that sends an error if there is no valid connection. |
@GuzikJakub I think you still need to |
I have one small question. While I wrote unittest for the wrong port, how can I add a new basic connection in the test base? I need it because I can not pass a variable there myself - they are passed straight from the base. |
@GuzikJakub You must mock the method to return the expected value. |
test improvement
@gglanzani @Fokko Is it correct now? |
@GuzikJakub The tests are still failing 🤔 |
Add test [AIRFLOW-3888] HA for metastore connection test improvement
Add test [AIRFLOW-3888] HA for metastore connection test improvement [AIRFLOW-3888] HA for metastore connection test improvement
Improving the typo in the variable name
tests/hooks/test_hive_hook.py
Outdated
@@ -282,6 +283,13 @@ def test_get_max_partition_from_valid_part_specs(self): | |||
def test_get_metastore_client(self): | |||
self.assertIsInstance(self.hook.get_metastore_client(), HMSClient) | |||
|
|||
@mock.patch("airflow.hooks.hive_hooks.HiveMetastoreHook.get_connection", | |||
**{'return_value': [Connection(host="localhost", port=9802)]}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return_value=[Connection(…)]
instead of using the **
syntax?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sugested syntax does not work properly in all cases supported by mock. I often use only one syntax, so that I do not have to think which is right at the moment.
Reference:
https://github.com/apache/airflow/blob/49b98ab2b5dc37c69598d485d62dd39372ab6aa9/tests/contrib/hooks/test_gcp_natural_language_hook.py#L57-L68
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your scientists were so preoccupied with whether or not they could, they didn’t stop to think if they should.
The suggested syntax does not work, indeed, when you have invalid identifiers as keyword arguments. return_value
is not one of them
Hi @GuzikJakub, beside a very minor comment, LGTM. |
Mock return_value edit
Flake8 repair
Test repair
Tests failure seems unrelated, can you take a look @Fokko? |
The added tests have a small coverage. Is it possible to add more tests? |
@mik-laj In my opinion, the test I added includes everything that should be in this change. I check if the connection is correct (it was ready) and if it is not correct (added by me). |
@GuzikJakub Please rebase on master |
Flake8 repair [AIRFLOW-3888] HA for metastore connection Test repair
@Fokko I did it |
LGTM now |
I am reminded about PR :) |
@XD-DENG any further questions? |
Hi @Fokko , no other comment from my side. Thanks! |
* HA for Metastore * [AIRFLOW-3888] HA for metastore connection Creating a connection to a metasotor with two hosts for high avitablity (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastor. * add function to base_hook * update webhdfs_hook * back to original version * back to original version * Update hive_hooks.py Thank you. I made a few changes because during the tests I detected several errors. I have a question, when I do marge to my pull it will be still possible to land it in the airflow main branch? * [AIRFLOW-3888] HA for metastore connection flake8 code repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Code behavior improvements * [AIRFLOW-3888] HA for metastore connection Add test * [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Improving the typo in the variable name * [AIRFLOW-3888] HA for metastore connection Mock return_value edit * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Test repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair [AIRFLOW-3888] HA for metastore connection Test repair
* HA for Metastore * [AIRFLOW-3888] HA for metastore connection Creating a connection to a metasotor with two hosts for high avitablity (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastor. * add function to base_hook * update webhdfs_hook * back to original version * back to original version * Update hive_hooks.py Thank you. I made a few changes because during the tests I detected several errors. I have a question, when I do marge to my pull it will be still possible to land it in the airflow main branch? * [AIRFLOW-3888] HA for metastore connection flake8 code repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Code behavior improvements * [AIRFLOW-3888] HA for metastore connection Add test * [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Improving the typo in the variable name * [AIRFLOW-3888] HA for metastore connection Mock return_value edit * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Test repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair [AIRFLOW-3888] HA for metastore connection Test repair
* HA for Metastore * [AIRFLOW-3888] HA for metastore connection Creating a connection to a metasotor with two hosts for high avitablity (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastor. * add function to base_hook * update webhdfs_hook * back to original version * back to original version * Update hive_hooks.py Thank you. I made a few changes because during the tests I detected several errors. I have a question, when I do marge to my pull it will be still possible to land it in the airflow main branch? * [AIRFLOW-3888] HA for metastore connection flake8 code repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Code behavior improvements * [AIRFLOW-3888] HA for metastore connection Add test * [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Add test [AIRFLOW-3888] HA for metastore connection test improvement [AIRFLOW-3888] HA for metastore connection test improvement * [AIRFLOW-3888] HA for metastore connection Improving the typo in the variable name * [AIRFLOW-3888] HA for metastore connection Mock return_value edit * [AIRFLOW-3888] HA for metastore connection Flake8 repair * [AIRFLOW-3888] HA for metastore connection Test repair * [AIRFLOW-3888] HA for metastore connection Flake8 repair [AIRFLOW-3888] HA for metastore connection Test repair
Creating a connection to a metastore with two hosts for high availability (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastore. (cherry picked from commit 355bd56)
Creating a connection to a metastore with two hosts for high availability (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working. This change allows you to check and then connect to a working metastore. (cherry picked from commit 355bd56)
Jira
https://issues.apache.org/jira/browse/AIRFLOW-3888
Description
Creating a connection to a metastore with two hosts for high availability (eg connection 1, connection 2) is not possible because the entire value entered is taken. For our needs, it is necessary to go through subsequent hosts and connect to the first working.
This change allows you to check and then connect to a working metastor.
Tests
Tested on two metastores, of which one was always inactive
Example:
matastore1(offline); metastoer2(online).
In this case, we will connect to metastore2
Documentation
The connection is created in the webserver. We separate the metastore with a semicolon, eg: metastror1; metastore2; metastore3