-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resume read_package after drop. #915
Conversation
I have now tested it quite thoroughly, together with #918. My test runs a future with let mut rows = sqlx::query!("SELECT `id` FROM `long_table`").fetch(context.con());
let mut sum: usize = 0;
while let Some(row) = rows.try_next().await? {
sum += row.id as usize;
} I have run this future in parallel while canceling some of them. This was able to reproduce the issue in #563. Applying this patch and #918, fixes the situation. |
@antialize can you add your repro to |
@abonander I have added a test. It is a bit hard to test reliably, so it does not always fail. You can se #938 for a run of the test on top of master. |
Currently there is a bug in this part of the code, but it is hard to reliability tricker. The man problem with trickering the test seems to be that sqlx_rt::timeout is not very precise under async std, and straight up broken in tokio. On my box, the test fails around half the time when using tokio, and almost always when using async std
This fixes #563 Data read from a mysql stream is ultimitly read by read_package. If the read package future is droped before it is done, it will leave the stream in an invalid state. To fix this we store the state of the read_package future in the stream it self such that read_package just resumens reading the same package it is called again after beeing dropped
As far as I can tell, this solution works given a couple of assumptions we might consider.
For what it's worth, I'm exploring both of these issues. |
I love the work done here. Thank you – both of you – for the work and the research. I'd love to chat more on approaches on Discord. I just pushed up the beginnings of the core rewrite to the |
Hey, even though I've raised these additional contexts to consider, I do appreciate what this code does to make the state changes explicit and restartable. |
@rich-murphey. You are right about the assumptions on the stream and bufstream. Thouse assumptions could can be checked as long as the mysql-stream does not have it aggregated stream public, and does not have any way of releasing it again. @mehcode I would be happy to discuss things on discord. I could not find any sqlx project on there, perhaps you could add me antialize#8715. I am in Central European Timezone, so it would probably have to be in the morning for you. |
Assumption 2 holds in at least one test that reproduces protocol errors. Here is a test that reproduces the protocol error for postgres in an actix-web app: https://github.com/rich-murphey/sqlx-actix-test I added a Mutex to both PgStream::recv_unchecked() and BufStream::read_raw() so that they log an error upon any attempt to lock an already locked stream. The test shows the protocol errors, and no attempts to lock an already locked stream. So for this specifc test there are never multiple threads trying to concurrently read from a given PgStream or BufStream. I also modified BufStream::read_raw() so that it did not remove the header separately from the body of the message, but only remove them together. That did not eliminate the protocol errors. I'm not sure what is leaving message fragments in the connection. |
Thank you again for the impetus here. With #1025 merged, the problem should be solved – albeit inefficiently – with 0.5.x and this is fully fixed in the |
Fixes #563
As detailed in #563, dropping MySQL future, such as
fetch_many
while it is withinread_package
, puts the stream in an invalid state, which causes the next user of a stream in the pool to get garbage. This is because a package has some length, stopping in the middle ofread_package
might have only read part of the package. Whenread_package
is then called again it will attempt to read the header of the next package from the data in the middle of the latter.This pull requests tries to fix this issue, by essentially storing the state of the
read_package
future within the stream it self, such that callingread_package
again will start it from the same position, and return the full package thatread_package
had partially read before.To do this a
read_some
call is added to theBufStream
. The assumption for this call is that if it succeeded it will tell how much data has been read, and if it is dropped til will have read NO data.