Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make warnings available as soon as they are received. #857

Merged
merged 6 commits into from
Oct 20, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -1772,13 +1772,12 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
PSQLState.PROTOCOL_VIOLATION));
}

public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
PgResultSet.this.addWarning(warning);
}
super.handleCompletion();
@Override
public void handleWarning(SQLWarning warning) {
super.handleWarning(warning);
PgResultSet.this.warnings = this.getWarning();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, this might cause double-processing via the following scenario:

  1. Consumer consumes all the warnings, and clears the chain
  2. Producer handles yet another warning, and PgResultSet.this.warnings = this.getWarning(); write results in the same head being used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for ResultSet.getWarnings() as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vlsi I decided to revert the change to ResultSet because I couldn't actually come up with test a scenario where it made sense.
I'm not sure it is even possible for postgres to raise warnings while a result set is reading from a cursor.

}

}


Expand Down
46 changes: 25 additions & 21 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.postgresql.core.ResultHandlerBase;
import org.postgresql.core.SqlCommand;
import org.postgresql.util.GT;
import org.postgresql.util.PGSQLWarningWrapper;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

Expand Down Expand Up @@ -98,11 +99,7 @@ public class PgStatement implements Statement, BaseStatement {
/**
* The warnings chain.
*/
protected SQLWarning warnings = null;
/**
* The last warning of the warning chain.
*/
protected SQLWarning lastWarning = null;
protected volatile PGSQLWarningWrapper warnings = null;

/**
* Maximum number of rows to return, 0 = unlimited
Expand Down Expand Up @@ -218,13 +215,10 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
}

@Override
public void handleCompletion() throws SQLException {
SQLWarning warning = getWarning();
if (warning != null) {
PgStatement.this.addWarning(warning);
}
super.handleCompletion();
public void handleWarning(SQLWarning warning) {
PgStatement.this.addWarning(warning);
}

}

public java.sql.ResultSet executeQuery(String p_sql) throws SQLException {
Expand Down Expand Up @@ -536,25 +530,29 @@ public void setQueryTimeoutMs(int millis) throws SQLException {
}

/**
* This adds a warning to the warning chain. We track the tail of the warning chain as well to
* avoid O(N) behavior for adding a new warning to an existing chain. Some server functions which
* RAISE NOTICE (or equivalent) produce a ton of warnings.
* Either initializes new warning wrapper, or adds warning onto the chain.
*
* Although warnings are expected to be added sequentially, the warnings chain may be cleared
* concurrently at any time via {@link #clearWarnings()}, therefore it is possible that a warning
* added via this method is placed onto the end of the previous warning chain
*
* @param warn warning to add
*/
public void addWarning(SQLWarning warn) {
if (warnings == null) {
warnings = warn;
lastWarning = warn;
//copy reference to avoid NPE from concurrent modification of this.warnings
final PGSQLWarningWrapper warnWrap = this.warnings;
if (warnWrap == null) {
this.warnings = new PGSQLWarningWrapper(warn);
} else {
lastWarning.setNextWarning(warn);
lastWarning = warn;
warnWrap.addWarning(warn);
}
}

public SQLWarning getWarnings() throws SQLException {
checkClosed();
return warnings;
//copy reference to avoid NPE from concurrent modification of this.warnings
final PGSQLWarningWrapper warnWrap = this.warnings;
return warnWrap != null ? warnWrap.getFirstWarning() : null;
}

public int getMaxFieldSize() throws SQLException {
Expand All @@ -571,9 +569,15 @@ public void setMaxFieldSize(int max) throws SQLException {
maxfieldSize = max;
}

/**
* Clears the warning chain.<p>
* Note that while it is safe to clear warnings while the query is executing, warnings that are
* added between calls to {@link #getWarnings()} and #clearWarnings() may be missed.
* Therefore you should hold a reference to the tail of the previous warning chain
* and verify if its {@link SQLWarning#getNextWarning()} value is holds any new value.
*/
public void clearWarnings() throws SQLException {
warnings = null;
lastWarning = null;
}

public java.sql.ResultSet getResultSet() throws SQLException {
Expand Down
35 changes: 35 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/util/PGSQLWarningWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.util;

import java.sql.SQLWarning;

/**
* Wrapper class for SQLWarnings that provides an optimisation to add
* new warnings to the tail of the SQLWarning singly linked list, avoiding Θ(n) insertion time
* of calling #setNextWarning on the head. By encapsulating this into a single object it allows
* users(ie PgStatement) to atomically set and clear the warning chain.
*/
public class PGSQLWarningWrapper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be PSQLWarningWrapper? We already have PSQLWarning, PSQLState, PSQLSavepoint, ...

On the other hand, I'm sure this (PGSQLWarningWrapper) class should not be a part of public API, so it might make sense to put it to some .internal. package (or specify in the javadoc that this class should not be used by end clients)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving it to org.postgresql.jdbc.PSQLWarningWrapper and making the class package-private?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If package-private is possible, please do that.


private final SQLWarning firstWarning;
private SQLWarning lastWarning;

public PGSQLWarningWrapper(SQLWarning warning) {
firstWarning = warning;
lastWarning = warning;
}

public void addWarning(SQLWarning sqlWarning) {
lastWarning.setNextWarning(sqlWarning);
lastWarning = sqlWarning;
}

public SQLWarning getFirstWarning() {
return firstWarning;
}

}
108 changes: 108 additions & 0 deletions pgjdbc/src/test/java/org/postgresql/test/jdbc2/StatementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -411,6 +421,104 @@ public void testWarningsAreCleared() throws SQLException {
stmt.close();
}

@Test
public void testWarningsAreAvailableAsap()
throws SQLException, InterruptedException, ExecutionException {
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_then_sleep() RETURNS VOID AS "
+ "$BODY$ "
+ "BEGIN "
+ "RAISE NOTICE 'Test 1'; "
+ "RAISE NOTICE 'Test 2'; "
+ "EXECUTE pg_sleep(2); "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative approach would be to use two connections, and make sure the outer transaction holds a row lock preventing notify_then_sleep from making progress.
That would ensure Java gets warnings before statement finish.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea, I have now implemented this, it should speed up the test a bit and make it a little more robust.

+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
final PreparedStatement preparedStatement = con.prepareStatement("SELECT notify_then_sleep()");
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
ScheduledFuture<SQLWarning> future = executorService.schedule(new Callable<SQLWarning>() {
@Override
public SQLWarning call() throws SQLException {
return preparedStatement.getWarnings();
}
}, 1000, TimeUnit.MILLISECONDS);
preparedStatement.execute();

SQLWarning warning = future.get();
executorService.shutdown();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have executorService as a field and shutdown in @After, to make sure it's always executed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to introduce a new field that is only used in a couple of tests.
This test class currently only has one field which is used in seemingly every test.
The executor will only not be shutdown if the test fails, so it shouldn't really be an issue.
Additionally the the executor will be automatically shut down when its finalizer runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a try-finally condition to ensure the executor is shut down.


assertNotNull(warning);
assertEquals("First warning received not first notice raised",
warning.getMessage(), "Test 1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the proper expected, actual argument order: http://junit.sourceforge.net/javadoc/org/junit/Assert.html#assertEquals(java.lang.String, java.lang.Object, java.lang.Object)

assertEquals("Second warning received not second notice raised",
warning.getNextWarning().getMessage(), "Test 2");
}

/**
* Demonstrates a safe approach to concurrently reading the latest
* warnings while periodically clearing them.
*
* One drawback of this approach is that it requires the reader to make it to the end of the
* warning chain before clearing it, so long as your warning processing step is not very slow,
* this should happen more or less instantaneously even if you receive a lot of warnings.
*/
@Test
public void testConcurrentWarningReadAndClear()
throws SQLException, InterruptedException, ExecutionException, TimeoutException {
final int iterations = 1000;
final ExecutorService executor = Executors.newSingleThreadExecutor();
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_loop() RETURNS VOID AS "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the procedure be removed from the database after the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added function removal to the @After test section.

+ "$BODY$ "
+ "BEGIN "
+ "FOR i IN 1.. " + iterations + " LOOP "
+ " RAISE NOTICE 'Warning %', i; "
+ "END LOOP; "
+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
final PreparedStatement statement = con.prepareStatement("SELECT notify_loop()");

final Future warningReaderThread = executor.submit(new Callable<Object>() {
@Override
public Object call() throws SQLException, InterruptedException {
SQLWarning lastProcessed = null;
int warnings = 0;
//For production code replace this with some condition that
//ends after the statement finishes execution
while (warnings < iterations) {
SQLWarning warn = statement.getWarnings();
//if next linked warning has value use that, otherwise keep using latest head
if (lastProcessed != null && lastProcessed.getNextWarning() != null) {
warn = lastProcessed.getNextWarning();
}
if (warn != null) {
warnings++;
//System.out.println("Processing " + warn.getMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add assert for each received warning

lastProcessed = warn;
if (warn == statement.getWarnings()) {
//System.out.println("Clearing warnings");
statement.clearWarnings();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can understand, a warning can be lost in the following scenario:
Precondition: satement has chain of 5 warnings.

  1. Thread 1 acquires st.getWarnings()
  2. Thread 2 acquires st.getWarnings() and starts addWarning. Unfortunately it gets suspended.
  3. Thread 1 performs clear warnings
  4. Thread 1 walks getNextWarning chain up to the very end, and "processes" all the warnings
  5. Thread 2 acquires CPU and adds yet another warning to the chain (the chain head was obtained at step 2)
    Thread 1 will never see the warning from step 5 since it did walk through the chain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "suspended" are you referring to Thread#suspend(), or something else?
Because I don't think there is really much that can be done about the former, it's a deprecated API for good reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to simulate your scenario by manually adding delay into the PgStatement#addWarning method after it acquires the reference to the warning wrapper and also adding some delay between the checking of lastProcesed.getNextWarning() and the statement.getWarnings() call.
It was indeed possible for a very unlikely race to cause a warning to be missed.
I have fixed that issue by doing the warn = statement.getWarnings() assignment up front, so there is no way for a racing thread to add any warnings between the check and the assignment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed that issue by doing the warn = statement.getWarnings() assignment up front, so there is no way for a racing thread to add any warnings between the check and the assignment.

I am afraid I do not see how the additional warn = statement.getWarnings() solves the issue.
The issue is addWarning thread might wait infinitely long before actually adding a warning. The only check "receiver" does it checks lastProcessed.getNextWarning(), however that gives absolutely no clue on whether new warnings might be added or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there are only two places a new warning can be added, onto the previous chain or added as the new head of the chain, lastProcessed will be the previous chain.
This algorithm only needs to handle a single-producer scenario, if addWarning takes an indeterminate amount of time, then the consumer will just spin until the producer finally wakes up.
#getNextWarning() and statement.getWarnings() are both volatile reads, so even if two new warnings are added, one to the previous chain and one to the new head, we wont start using the new head until everything is read from the last chain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I see now.
I wonder how many users would be able to replicate this warning consumer loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be unlikely someone would replicate it without looking at this unit test.
Concurrently reading warnings from a statement is probably a fairly niche feature, I think for the majority of use-cases it is unlikely that someone is going to generate enough notices that they would consider using Statement#clearWarnings(), but if they do I think the important thing is that the driver doesn’t crash, and we at least provide some way of the user reading the warnings safely, despite how abstruse the implementation might be.

I considered other approaches like employing locks/synchronization or maybe even exposing a queue data structure, but I felt that those options would introduce negative performance implications for users who don’t need the feature, or in the case of exposing a queue, wouldn’t work with the standard JDBC API.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me what happens in the loop here and when warnings are cleared, etc. Would it be possible to improve readability somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warnings are cleared if the warning just processed is the head of the warning chain, does that explanation help?
I can add a comment to that affect.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can see this condition, but trying to understand how often it happens. will try more making notes on paper :)

Copy link
Contributor Author

@magJ magJ Aug 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often it happens depends on a few factors, I tried to explain it a bit in the javadoc comment.
Some factors that affect it are:

  • How long the users "process" function takes.
  • If they opt to add some delay between each warn == null iteration.
  • OS thread sheduling.
  • Burstiness of the received warnings.

Overall it's fairly random, im not sure how to describe it in a useful way.

}
} else {
//Not required for this test, but a good idea adding some delay for production code
//to avoid high cpu usage while the query is running and no warnings are coming in.
//Alternatively use JDK9's Thread.onSpinWait()
Thread.sleep(10);
}
}
//Ensure that we didn't double process the same warning.
assertEquals(lastProcessed.getMessage(), "Warning " + iterations);
return null;
}
});
statement.execute();
//If the reader doesn't return after 2 seconds, it failed.
warningReaderThread.get(2, TimeUnit.SECONDS);
}

/**
* The parser tries to break multiple statements into individual queries as required by the V3
* extended query protocol. It can be a little overzealous sometimes and this test ensures we keep
Expand Down