Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate "Serialization" code #40974

Open
Tracked by #39593
kaxil opened this issue Jul 23, 2024 · 21 comments
Open
Tracked by #39593

Consolidate "Serialization" code #40974

kaxil opened this issue Jul 23, 2024 · 21 comments
Labels
airflow3.0:candidate Potential candidates for Airflow 3.0 area:serialization

Comments

@kaxil
Copy link
Member

kaxil commented Jul 23, 2024

We have different ways to do Serialisation in Airflow --- and also multiple way of pickling (dill, stdlib pickle, cloudpickle).

Would you like to add additional color here @bolkedebruin @amoghrajesh

@kaxil kaxil mentioned this issue Jul 23, 2024
10 tasks
@kaxil
Copy link
Member Author

kaxil commented Jul 23, 2024

cc @gyli since you waned to take this issue. Could you add a comment to this GitHub issues, please? Your name only shows up for me to add in Assignees once you have a comment on it.

@kaxil kaxil added the airflow3.0:candidate Potential candidates for Airflow 3.0 label Jul 23, 2024
@pedro-cf
Copy link

pedro-cf commented Jul 24, 2024

Not sure if related, but there is issues with serializing context for some Python Operators (ex: PythonVirtualEnvOperator)
Perhaps this work could potentially solve this issue? @kaxil


Edit: New PR 41049 fixes this.

@gyli
Copy link
Contributor

gyli commented Jul 25, 2024

Hi @kaxil , I have been reviewing serialization codes recently, while I haven't noticed any significant parts that need to be consolidated, especially after the improvement of lazy load serialization module.
The other issue mentioned above #7870, is already completed.
So we need more clarification from @bolkedebruin what needs to be consolidated in his proposal.

@kaxil
Copy link
Member Author

kaxil commented Jul 25, 2024

cc @uranusjr who might also know about it since he worked with Bolke on it

@kaxil
Copy link
Member Author

kaxil commented Jul 25, 2024

I remember different ways of Serialization used for Xcom, Airflow Webserver/ORM but Bolke or TP can add more specific details:

pickle = Column(PickleType(pickler=dill))

@provide_session
def get_dag_by_pickle(pickle_id: int, session: Session = NEW_SESSION) -> DAG:
"""Fetch DAG from the database using pickling."""
from airflow.models import DagPickle
dag_pickle = session.scalar(select(DagPickle).where(DagPickle.id == pickle_id).limit(1))
if not dag_pickle:
raise AirflowException(f"pickle_id could not be found in DagPickle.id list: {pickle_id}")
pickle_dag = dag_pickle.pickle
return pickle_dag

"response": base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),

pickling_library = dill if self.use_dill else pickle

def serialize_value(
value: Any,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> Any:
"""Serialize XCom value to str or pickled object."""
if conf.getboolean("core", "enable_xcom_pickling"):
return pickle.dumps(value)
try:
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
except (ValueError, TypeError) as ex:
log.error(
"%s."
" If you are using pickle instead of JSON for XCom,"
" then you need to enable pickle support for XCom"
" in your airflow config or make sure to decorate your"
" object with attr.",
ex,
)
raise

class WebEncoder(json.JSONEncoder):

@potiuk
Copy link
Member

potiuk commented Jul 25, 2024

The problem as I understand it is tha we currently have two serializations:

  1. https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py -> this one uses "old" serialization where you have giant if/else clause where we have to manually add new type of objects to serialize

  2. https://github.com/apache/airflow/blob/main/airflow/serialization/serde.py -> which is a pluggable and presumably way faster way of serializing. It uses "pluggable" https://github.com/apache/airflow/tree/main/airflow/serialization/serializers modules and I believe you can also add your own serializers and implement "serialize/deserialize" methods in your objects. It has no giant if, providers could potentnially register their own serializers etc. etc.

Some of our code uses 1) - some uses 2). There are a few problems there - for example 2) does not yet support DAG serialization, and it also is not used for example in internal-api (but internal-api will be gone in Airlfow 3).

There are also other places where we use other serialization mechanisms (dill, cloudpickle) and there is a potential tha we could consolidate that as well.

Ideally (this was the long term vision of @bolkedebruin) we should have "one to rule them all" - 2) should become universal serializer used by everything.

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Jul 26, 2024

Ha! There are some things to consider when wanting to do "consolidation". The main leading principle in the past was that we do not want to have executable code when deserializing. This is what all third-party (de)serializers seem to do, pickle, dill, cloudpickle etc. The second principle was to have a human readable format and the third principle was to have it versioned.

I've added "serde" (no 2. that @potiuk is speaking of in the past) to have a generic way of serializing any object with the principles in mind. This is particularly useful for XCom as that shares arbitrary data across workers. The 'other' serializer which I would call the DAG serializer has three main short-comings: 1) It is slow - serde takes about 10% of the time the DAG serializer takes, 2) it is hard to extend, you would need to change the core code to add an extension and 3) it will add O(n) in time to do so. The upside is that it is tried and tested, serializes DAGs and does JSON schema validation.

It might then seem the obvious route to add DAG serialization to "serde". Which it did try, but also felt a bit like squeezing something into something else where it doesnt entirely fit (keeping backwards compatibility in mind). It is possible, but a lot of past cruft would need to be re-implemented to make it work.

Now I see other projects like Spark Connect settle on Cloudpickle and they forego the issue of arbitrary code execution. The question then becomes how relevant is that attack vector? Is the tradeoff in maintaining our own serializer worth it? Also it will not generate a human readable format. Do we need to review our principles (which I think have never been officially settled, but correct me if I am wrong).

Concluding: if you add DAG seralization to "serde" it is probably the most Airflow way to go. It gives you extensibility for the future as it has the better format (over the DAG serializer) and can with a little bit of help serialize any kind of object. It seems to serve us well nowadays. If you take a step back and want to re-evaluate it might be worth re-visiting our principles and checkng what we can do to reduce the attack vector and maybe go for cloudpickle. This would externalize the support and reduce the maintenance burden. However, we might run into issues when we cannot serialize through cloudpickle and we do not control how it works.

(I'll add some additional notes later that are historically important, but slightly less relevant).

My 2 cents.

@potiuk
Copy link
Member

potiuk commented Jul 26, 2024

Also (to add to what @bolkedebruin wrote ) in internal-API we used Pydantic for some of that - mostly because it alows to serialize ORM SQLalchemy objects together with their relations. But this is possible to plug-in in serde as well as another serializers and treat it similarly to "to_dict" or "to_json"..

Also I think there was a lot of misunderstandings and clashes (and I am one of the guilty ones there) about which and why and how we are using serializers - precisely because we have not agreed on a common strategy we want to take, direction and vision we have. It has never been agreed and writen down, and I think we should start with that.

For example right now when I look at serde - it's a really good idea and if we extend the concept of it to be able to provide seriaizers from respective providers and make it support all that we need in "core" airflow, that could be a good idea for example (but we had disagreements on that in the past - again mostly because we had no consolidated vision and because in many cases it was just easier and faster (and without the whole picture and vision set) to use another approach than common. And possibly that was even good because that would in some cases break compatibility too much for some Airlfow 2 cases.

But with Airflow 3, I'd be personally for making serde default and incorporate the 'DAG' serializer (after playing a bit with both) and making it the default engine and plugging in whatever serializer implementations are good. That should of course include all the compatibility and migration pieces as needed. We could also possibly distribute the current serde modules (deltalake, iceberg etc - as "pluggable" / external serializers in providers).

That would be my 1c .

@gyli
Copy link
Contributor

gyli commented Jul 27, 2024

Thanks for the explanation!
There are 3 points that I really agree, and can explore in this issue:

  1. Consolidating DAG serializer and serde by adding DAG serializer into serde.
  2. We will need a full evaluation on Cloudpickle. Other than maintainability, full control of the output, human readable format, what are the other pros & cons.
  3. It would be great to move serde modules to providers, and we can explore that in this issue.

@bolkedebruin
Copy link
Contributor

bolkedebruin commented Jul 27, 2024

Sounds good. Although I don't understand your point 1 fully. Yes to consolidation, but "making airflow modules select serialization engines" I do not understand. Care to the clarify?

@gyli
Copy link
Contributor

gyli commented Jul 27, 2024

I've edited my language in my comment above. I was thinking about supporting the old DAG serializer and the consolidated version at the same time for smoother migration, while I also feel it could be unnecessary if the serialization format is not changed.
I'm currently thinking adding DAG and operator serializer under serde, that still calls methods from serialized_objects, and making serde as the only entrypoint for all serialization gradually. Does it sound good to you?

@potiuk
Copy link
Member

potiuk commented Jul 27, 2024

Also as mentioned in #31213 -> it might be a good exercise to migrate AIP-44 for Airflow 2.10 to use serde eventually. While AIP-44 will be experimental - only in 2.10 (and possibly 2.11), I think it might play a major role in preparing our users to migrate (and test isolation before) to Airflow 3 with AIP-72 rewrting the isolation code from scratch, so this effort is not lost.

@bolkedebruin
Copy link
Contributor

@gyli It might be possible, but note there are a lot of assumptions within the DAG serializers about the output format. I'm not sure it is worth the effort to re-use the old serializer code. Serde has a strict schema / format (versioning specification), while the serialized objects of the DAG serializer do not.

In your place I would focus on 'on-the-wire' compatibility so that the serialized format ends up compatible with the JSON schema used. This will require a second pass serializer.

As @potiuk mentioned ensuring AIP-44 doesn't use the legacy path any more might be an easier way to get used to the code and has more benefits. The DAG serializer is quite contained now except by its use of AIP-44.

@gyli
Copy link
Contributor

gyli commented Jul 29, 2024

Yeah I can start from #31213 first.

Note that by doing so, internal_api_call has to temporarily call either old serializer or serde depending on the argument type, because the the API argument could be DAG or operator, which is currently not supported by serde. It can only be completely switched to serde once the serializer consolidation is done.

@potiuk
Copy link
Member

potiuk commented Jul 30, 2024

I am not sure we allow DAG or Operator to be passed over the internal-api. I think it should not be the case and we should likely change it. I will review it shortly as well.

@gyli
Copy link
Contributor

gyli commented Jul 30, 2024

Here are some internal API endpoints I found with DAG/operator arguments:


@potiuk
Copy link
Member

potiuk commented Jul 30, 2024

I will take a look today :)

@gyli
Copy link
Contributor

gyli commented Aug 1, 2024

@bolkedebruin Still thinking about the best approach here. The tricky part is how to handle different encoding if old code is used indeed. While even if we decide not to use the old codes, we will still need to implement classes similar to SerializedBaseOperator, SerializedDAG, DependencyDetector, etc. in serde, with mainly the serialize and deserialize methods replaced with serde methods.

Approach 1, calling serialized_objects from serde, and keeping the old output format for DAG and operator. This approach requires minimum changes, and existing jsonschema can be reused directly. While the cost is, both encoding format will be used.

Approach 2, porting SerializedBaseOperator, SerializedDAG, DependencyDetector classes into serde, and replacing serialize and deserialize methods with serde's. A new jsonschema file is needed as the encoding will be changed. Another reason to port those classes instead of creating a new serde serializer module is, dag serialization is recursive. This approach will make serde.py codes much longer, probably similar to the length of current serialized_objects.py eventually.

I gave a second thought and second approach makes more sense to me. Although it requires more work, we might want a complete migration from old to new serializer considering this is an Airflow 2 to 3 change. Please let me know what you think.

@bolkedebruin
Copy link
Contributor

I'm mostly AFK - Olympics etc - so a bit brief and can't really deep dive into the code.

I think targeting a new schema, your approach 2 is best, while for deserialization having detection of the old format and using the old deserializers for some time is smart. We do want people to be able to move from Airflow 2 -> 3 but I don't think we require them to entirely start from scratch.

Having the updated schema be as close to serde's native output format, basically dicts with classnames and versioning information, requires the least passes so will be the fastest. It's also easier then so switch to another output format (say protobuf) if we would like that in the future.

It might be worthwhile to have a mix of a serde module and porting of the classes. Classes are slow, the overhead on the stack is significant. This is one of the reasons why serde does what it does. Another reason is that the deserialized versions of DAG and Operator are not the same as DAG and Operator. So it night be cleaner to keep them separated. Another one is that the "serialize" and "deserialize" methods in serde's modules have priority over the ones in the classes, so that allows you to keep the existing infrastructure in place while replacing it with serde's.

Summarizing my thoughts:

  • Detecting for the old format (it night be there, or easy to do. afaik I created some foundatiions that allows serde to handle the old format to an extend by something like deserialize(data, DAG) where DAG is the class fo which it will try to find a deserializer for if it cannot detect the correct implementation)
  • Port parts of SerializedBaseOperator to serde as a module and move things to their respective classes. Timetables for example can be managed within Timetable itself. Which allows re-use in other places too.
  • Update the JSOn schema to follow more closely what serde does and have it properly versioned so forward compatibility works.

For now my thoughts. Thanks for taking this on. I'll help where I can.

@kaxil
Copy link
Member Author

kaxil commented Aug 16, 2024

@gyli : Could I assign this task to you? Do you have enough clarity?

@gyli
Copy link
Contributor

gyli commented Aug 16, 2024

Hi @kaxil, yes, please assign it to me. It's progressing slowly though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
airflow3.0:candidate Potential candidates for Airflow 3.0 area:serialization
Projects
None yet
Development

No branches or pull requests

5 participants