Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to assign splits due to the serialized split size #9410

Closed
javrasya opened this issue Jan 4, 2024 · 29 comments
Closed

Failed to assign splits due to the serialized split size #9410

javrasya opened this issue Jan 4, 2024 · 29 comments

Comments

@javrasya
Copy link
Contributor

javrasya commented Jan 4, 2024

Apache Iceberg version

1.4.2 (latest release)

Query engine

Flink

Please describe the bug 🐞

Hi there, I am trying to consume records from an Iceberg table in my Flink application and I am running into the following issue;

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to serialize splits.
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.lambda$assignSplits$4(SourceCoordinatorContext.java:223)
	at java.base/java.util.HashMap.forEach(HashMap.java:1337)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.lambda$assignSplits$5(SourceCoordinatorContext.java:213)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.callInCoordinatorThread(SourceCoordinatorContext.java:428)
	... 14 more
Caused by: java.io.UTFDataFormatException: Encoded string is too long: 123214
	at org.apache.flink.core.memory.DataOutputSerializer.writeUTF(DataOutputSerializer.java:257)
	at org.apache.iceberg.flink.source.split.IcebergSourceSplit.serializeV2(IcebergSourceSplit.java:150)
	at org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer.serialize(IcebergSourceSplitSerializer.java:42)
	at org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer.serialize(IcebergSourceSplitSerializer.java:25)
	at org.apache.flink.runtime.source.event.AddSplitEvent.<init>(AddSplitEvent.java:44)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.lambda$assignSplits$4(SourceCoordinatorContext.java:220)
	... 17 more

Not really sure why it gets too big but when I looked at the source code here, it might be because there is too many file scan task in one split and that is why this is happening.

@nastra
Copy link
Contributor

nastra commented Jan 4, 2024

@stevenzwu @pvary could you guys take a look please?

@pvary
Copy link
Contributor

pvary commented Jan 4, 2024

@stevenzwu: After a quick check, I have found this:
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java#L256-L260

        if (utflen > 65535) {
            throw new UTFDataFormatException("Encoded string is too long: " + utflen);
        } else if (this.position > this.buffer.length - utflen - 2) {
            resize(utflen + 2);
        }

This means that anything which is above 64k could not be serialized by DataOutputSerializer.writeUTF, which seems a bit arbitrary limit for me.

We could use DataOutputSerializer.writeChars which uses DataOutputSerializer.writeChar. The downside is that it is less effective if we use simple chars (between 0x0001 and 0x007F): See: https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java#L245C1-L254C10

        for (int i = 0; i < strlen; i++) {
            c = str.charAt(i);
            if ((c >= 0x0001) && (c <= 0x007F)) {
                utflen++;
            } else if (c > 0x07FF) {
                utflen += 3;
            } else {
                utflen += 2;
            }
        }

The upside that it should work regardless of the size of the string.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 5, 2024

It seems this bug has been introduced by version 1.4.0 which is kind of new. Tried fixing it by tweaking the SplitAssignerFactory I pass down to the IcebergSource but even though I reduce the size of FileScanTasks per split to be one, it still exceeds that 65K limit. So I ended up downgrading to 1.3 unfortunately until it is fixed with 1.4. My app works with version 1.3 Iceberg.

@pvary
Copy link
Contributor

pvary commented Jan 5, 2024

@javrasya: Any idea what causes the big size? Wide table? Column stats? Long filename?

@javrasya
Copy link
Contributor Author

javrasya commented Jan 5, 2024

@pvary No idea tbh since I run into this on production and there I don't have the ability to go deep and debug.

I wouldn't say it is a wide table in terms of number of columns and I don't know about column stats. Is there a way to fetch that from somewhere?

But the metadata about the file I know at least is like;

One of the files:

File name: s3://data-furnishing-pipeline-eu-production/data-lake/dafu_pipeline.db/transactional_customer_ids_created/data/__key_bucket=0/createdAt_day=2018-10-04/country=NO/00021-0-ca186d77-6349-4e79-bb4f-874fac0ee123-00186.parquet

Schema:

[
  {
    "Name": "__key",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "1",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "eventid",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "2",
      "iceberg.field.optional": "false"
    }
  },
  {
    "Name": "someIdColumn1",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "3",
      "iceberg.field.optional": "false"
    }
  },
  {
    "Name": "someIdColumn2",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "4",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn3",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "5",
      "iceberg.field.optional": "false"
    }
  },
  {
    "Name": "someIdColumn4",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "6",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn5",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "7",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn6",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "8",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn7",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "9",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn8",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "10",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn9",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "11",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "someIdColumn10",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "12",
      "iceberg.field.optional": "true"
    }
  },
  {
    "Name": "country",
    "Type": "string",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "13",
      "iceberg.field.optional": "false"
    }
  },
  {
    "Name": "createdat",
    "Type": "timestamp",
    "Parameters": {
      "iceberg.field.current": "true",
      "iceberg.field.id": "14",
      "iceberg.field.optional": "false"
    }
  }
]

@pvary
Copy link
Contributor

pvary commented Jan 5, 2024

@javrasya: This is table should not be too wide, and the statistics should be limited as well (unless you did some specific tweaking there). My best guess is your first suggestion:

reduce the size of FileScanTasks per split to be one

How did you try to archive this?
The best way to archive this is to set splitOpenFileCost to the same value as the split size.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 5, 2024

@pvary I wasn't aware of splitOpenFileCost, thank you for sharing that.

The way how I did it is that I introduced my own SplitAssignerFactory and SplitAssigner and pass that down to the source. Here is the code to that custom Splitassigner;

https://gist.github.com/javrasya/98cfe90bd1a2585c56c4c3346a518477

But the thing is that even though I manage to reduce the number of task per Split, it was still big enough to raise the same error. So it did not solve my problem.

How do you limit the statistics, I didn't do anything apart from creating my table and ingesting the data into it. Also do you think a table with 14 columns is too wide?

@pvary
Copy link
Contributor

pvary commented Jan 6, 2024

@javrasya: Table with 14 columns should not cause any issues. The default stats also could not cause issues.

I made a mistake reading the code, and combined splits also could not cause any issues, as we serialize them one-by-one in loop. And we have an issue with one of them.

My current theory is that we need to check FileScanTaskParser.toJson to understand what is happening:

private static void toJson(FileScanTask fileScanTask, JsonGenerator generator)

Could it be, that you have multiple deletes for the specific split which makes the serialized split too big?

if (fileScanTask.deletes() != null) {
generator.writeArrayFieldStart(DELETE_FILES);
for (DeleteFile deleteFile : fileScanTask.deletes()) {
ContentFileParser.toJson(deleteFile, spec, generator);
}
generator.writeEndArray();
}

@javrasya
Copy link
Contributor Author

javrasya commented Jan 6, 2024

We don't do any deletes actually. I will try to debug it locally somehow on that single file it was failing on to see why it is big. But regardless, what do you think would be a remedy in such case? Even though it was delete, should we not do deletes because that breaks the downstream?

@pvary
Copy link
Contributor

pvary commented Jan 8, 2024

@javrasya: I think we should fix the serialization issue, but I would like to understand the root cause before jumping to solutions. The deletes seems to be an issue anyway - which should be fixed, but if you do not have any deletes in the table, then there should be another source of the issues too. We have to identify it and fix that too.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 8, 2024

Hi @pvary , I failed to debug that locally, Couldn't reproduce it since it is one file of many and it takes time to hit that and can't really do that in a debug session. We don't do deletes but recently I have done the followings;

  • Expire snapshots older than 7 days
  • Ran rewrite_data_files procedure via spark on the source tables.
  • Ran rewrite_manifests procedure via spark on the source tables.

Do you think any of these operations would leave deleted files behind?

@pvary
Copy link
Contributor

pvary commented Jan 8, 2024

@javrasya: I do not see yet, how any of the above changes could create delete files.

For debugging: Could you use conditional breakpoints, or you could put a breakpoint where the Encoded string is too long exception is thrown?

@javrasya
Copy link
Contributor Author

javrasya commented Jan 10, 2024

I couldn't do this @pvary , the split is far ahead and some time is needed to get there in the application. My local environment is not able to run the app on real data and hit this problematic split on debug mode. It crashes because debug mode is quite a resource consuming mode. Still looking for a way to reproduce it and debu it but Just wanted to let you know the status. (Side note, I ended up re-creating my entire source data, that way it did not have this problem. But I want to find the reason why it exceeds that limit regardless.)

@javrasya
Copy link
Contributor Author

Hi again @pvary. I managed to run it in debug mode and the JSON which is being deserialized is crammed with delete files (EQUALITY_DELETES and POSITION_DELETES). It is a gigantic json content because of that. We don't do deletes but those must be due to upserting because it is a compacted table based on a key and we keep overriding the rows with the same key. is there any reason for those to be serialized really?

Another question; is it possible to skip it with some sort of configuration that is available right now so that it does not keep deleted files without waiting for you guys to fix it in a way?

@pvary
Copy link
Contributor

pvary commented Jan 12, 2024

@javrasya: Few things:

  • You should find a way to get rid of those deletes, as it will kill your read performance
  • If you have deletes then they are need to be read during table read, so they need to be serialized with the splits as well
  • I still think we should fix this issue, and use writeChar instead of writeUTF - serialization should not be this brittle

One way to get rid of the deletes is compaction. Currently I think only Spark does a full compaction where the deletes are removed as well.

@javrasya
Copy link
Contributor Author

Thanks for your input. I tried rewrite_position_delete_files but no luck.

@stevenzwu
Copy link
Contributor

first of all, current Flink Iceberg source (FLIP-27 or old) doesn't support streaming read with row-level deletes. It only read append-only snapshots/commits.

is it possible to skip it with some sort of configuration that is available right now so that it does not keep deleted files without waiting for you guys to fix it in a way?

No, that is not possible today and it won't be correct either.

Regarding the usage of writeUTF, it was just simple and intuitive at the time. I didn't anticipate/realize the 64 KB limit. @pvary 's suggestion of writeBytes sounds good to me.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 12, 2024

Thanks for the answer @stevenzwu, no you are right. I know we shouldn't should do streaming on an Iceberg table unless it is an append only table. But we don't do stream in this case. Every day we need to scan the table entirely until a certain snapshot id ( we are using asOfSnapshotId not endSnapsgotId) to produce what we need. No checkpointing, no save point either.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 12, 2024

It still feels weird to allow that big of a split to be created. Wouldn't it possible to make the deleted files lazy and rather be loaded in the respective task node, instead of the coordinator node. It is network cost in the cluster. It will be bigger and bigger for those tables which are upserted kind which is going to have so many EQUALITY_DELETES, or am I interpreting it wrongly 🤔 ? Deleted files in the split seems to be the only one which can bloat the size of the split.

@stevenzwu
Copy link
Contributor

stevenzwu commented Jan 12, 2024

ah. I didn't know it is a batch read mode using asOfSnapshotId. note that they are delete (not deleted) files to capture the row-level deletes. the actual files are not loaded during scan planning in jobmanager/coordinator node. splits only contains the locations of those delete files.

the problem is that a equality file can be associated with many data files. that is probably why you are seeing many of them in one split. that is unfortunate implication of equality deletes.

skipping those delete files won't be correct.

delete compaction that was suggested earlier should help. Did you use Spark for that? Spark batch should generate position deletes, which are easier for the read path?

Regardless, I would agree with @pvary 's suggestion of writeBytes to fix the 64 KB size limit. curious how many delete files you saw in one split/data file?

@javrasya
Copy link
Contributor Author

javrasya commented Jan 12, 2024

I agree, not having that limit would in anyway be better.

I see. Yes I did use Spark (rewrite_position_delete_files) to clean up positional deletes. Maybe it helped because I have very few positional deletes afterward. But I still have too many equality deletes which is still causing the limit excess. Did I use the wrong spark procedure? I also expires many snapshots which I hoped would help with compactio but still no luck.

@javrasya
Copy link
Contributor Author

javrasya commented Jan 13, 2024

I had the changes and a test which is making use of some mocking/spying working in my local. It is actually not that trivial by using writeBytes just like that. Because DataOutputSerializer.writeBytes is buggy and it is increasing the buffer position twice. Do you mind if I prepare a PR @pvary @stevenzwu so that we can discuss it there if you are not planning to pick this up soon?

javrasya added a commit to javrasya/iceberg that referenced this issue Jan 13, 2024
…too many delete files which are created due to an upsert oepration on the table and then it is not possible to consume from such a table
@javrasya
Copy link
Contributor Author

I took the liberty and created the PR since I had the changes locally. Hope you guys don't mind 🙏

@stevenzwu
Copy link
Contributor

Yes I did use Spark (rewrite_position_delete_files) to clean up positional deletes. Maybe it helped because I have very few positional deletes afterward. But I still have too many equality deletes which is still causing the limit excess.

I see. Spark delete compaction only handles position deletes (not equality deletes). that behavior makes sense, because Spark only writes position deletes.

I see there is an ConvertEqualityDeleteFiles interface in the api module. looks like neither Flink nor Spark has implemented that action.

@pvary
Copy link
Contributor

pvary commented Jan 13, 2024

But I still have too many equality deletes which is still causing the limit excess. Did I use the wrong spark procedure?

I think RewriteDataFilesAction could help you there. If I remember correctly it will apply all the delete files on reading, so in theory, after a compaction, you should not need the delete files anymore. That said, I haven't tried it myself, so this should be checked.

@javrasya
Copy link
Contributor Author

Thank you both. You are right @stevenzwu , it is sad that there is no implementation yet for ConvertEqualityDeleteFiles. I will give RewriteDataFilesAction a try and let you know.

javrasya added a commit to javrasya/iceberg that referenced this issue Jan 13, 2024
…f serializing/deserializing and it is the most backward compatible, but support longer texts than 65K. This introduce a breaking change which will make splits serialized before this change to be not deserialized because it uses int(4 bytes) instead of unsigned short (2 bytes) as the first bytes to indicate the length of the serialized text

Ahmet
@javrasya
Copy link
Contributor Author

javrasya commented Jan 13, 2024

Tried rewrite_data_files via Spark, not really sure if it would do the same with RewriteDataFilesAction on Flink, but still the same. I guess I will end up dumping the iceberg table as CSV or something on Athena every day maybe, and scan that one with Flink instead until the issue is resolved :-( It will double the time my daily job takes and it is more expensive but I will take it, otherwise the service can't work at the moment.

Any other suggestion, is appreciated.

javrasya added a commit to javrasya/iceberg that referenced this issue Jan 13, 2024
…n can be due to the size of an unsigned short which is 65kb. The text is broken down to chunks that can always fit in 65kb and written to the buffer iteratively
@pvary
Copy link
Contributor

pvary commented Jan 16, 2024

@javrasya: The Spark RewriteDataFiles should create a new snapshot in the table. If the query reads this new snapshot, then it should not read the old delete files anymore. If ExpireSnapshot is used then sooner or later these data files will be removed, as nobody will reference them anymore

@javrasya
Copy link
Contributor Author

javrasya commented Jan 22, 2024

@pvary , You are right. It is all immutable so it makes sense that rewrite operation would create another snapshot and I should be using that not a prior one to that. After I refer to the replace snapshot which is created by RewriteDataFiles, the number of positional delete and equality delete files went way less. Not really sure if it would still hit that 65kb limit with that, since I already downgraded to Iceberg version 1.3.1, but it definitely helped with read performance drastically. I will give it a try that way with 1.4.3 when I got the chance.

Thank you both, it is so appreciated. @pvary @stevenzwu

javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
…too many delete files which are created due to an upsert oepration on the table and then it is not possible to consume from such a table
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 12, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 17, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 17, 2024
javrasya added a commit to javrasya/iceberg that referenced this issue Apr 18, 2024
@javrasya javrasya closed this as completed May 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants