-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changed RedisSubscriber wait with TaskCondition vs Timers #815
Conversation
Ok, this is good to go |
This is not an error. |
Ok, I'd have to fix the driver about this assumption. Maybe a specific unit tests would help, but would it be acceptable as a feature addition to remove connections though? |
That may make sense in some situations (although it's probably more important to add support for limiting the number of connections), but it should be separately dealt with (there should be a clear vision of the use cases and implications). |
When you say connections can be re-established, you mean that a TCPConnection that was closed can be used to send data again without warning? Wouldn't that be prone to errors regarding different TCP options and more complex to dispose and resume at a lower level? I would have assumed using a new object would be safer |
Oh I see what you mean, now I realiaze I had an error in the |
However, the pull is still necessary because the handling for unsubscribe/subscribe responses takes priority over |
} | ||
if (m_listening) | ||
synchronized(m_capMutex) | ||
enforce(m_captured.wait(5.seconds), "Failed to wait for Redis listener to stop"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make this more robust, the original condition should be checked here instead of relying on the return value of wait
(a good guideline to follow when using condition variables in general):
synchronized (m_capMutex) {
if (m_listening) m_captured.wait(5.seconds);
enforce (!m_listening, "Failed to wait for Redis listener to stop.");
}
In this case, since there is nothing that m_capMutex
actually protects, the synchronized
block can also just be removed. But it seems like logically there should instead be a m_statusMutex
that protectsm_listening
and m_capture
. This mutex would then be passed to both, m_captured
and m_started
. (always remember that the mutex is associated to the data and not to the condition variable)
A second mutex, m_connMutex
may be necessary to protect the individual calls to _request_void
, but that is a separate concern from the wait functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I shouldn't protect the TaskCondition with a TaskMutex? I was certain it had to be used like the pthread conditions.
Also, would you suggest using a mutex in the RedisClient to protect _request_void
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutex doesn't protect the condition variable, it's more that the condition variable has a feature that it can perform the unlock-wait and trigger-lock steps atomically from the view of code that runs within a region locked by the mutex. This makes a loop like this thread-safe:
bool condition;
Mutex mutex;
Condition conditionVar;
synchronized (mutex) {
while (!condition)
conditionVar.wait();
assert(condition); // only guaranteed for the mutex passed to the condition variable!
}
Using the condition variable itself would be perfectly fine without the mutex.
This is ready for a review |
shared static this() | ||
{ | ||
auto publisher = new RedisClient(); | ||
auto subscriber = new RedisSubscriber(new RedisClient()); | ||
subscriber = RedisSubscriber(new RedisClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OT: Maybe we should add a method RedisClient.createSubscriber
to create the instance. That way we get at least some freedom in the future to change what kind of storage/RC/... is used.
Just a quick observation: Some changes in json.d ended up in this PR, they should be removed. Similarly, there are a lot of whitespace changes in redis.d, which should be removed (especially since the first thing my editor would do would be to remove the trailing WS again). I'll try to review the actual changes later today. |
return; | ||
} | ||
void onSubscribe(string channel) { | ||
logInfo("Callback subscribe(%s)", channel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think all logInfos should be logTraces or something, so the default logging is not spammed
looks good to me, what do you think @s-ludwig ? |
// This is a simple parser/handler for subscribe/unsubscribe/publish | ||
// commands sent by redis. The PubSub client protocol is simple enough | ||
|
||
void pubsub_handler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there should be a scope (failure) m_lockedConn.conn.disconnect();
here, so that any parsing exception doesn't leave the connection in a bad state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be covered with the scope(exit)
on line 927, since the pubsub_handler()
is only called through the while loop under there. The only points of failure now that I think of it would be runTask
and vibe.concurrency.send
at line 919..., I'm not sure if these functions can throw though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I see. Maybe it makes sense to convert the nested functions into private member functions instead, so that the function gets a bit more manageable. But that doesn't need to be done for this PR.
I have to admit that my review hasn't been as thorough as I had hoped, but apart from the |
well i have to admit i am not familiar enough with the whole subscriber design in general, but the current code in trunk is simply broken. something has to be done. |
No doubt about that! |
Changed RedisSubscriber wait with TaskCondition vs Timers
This update provides stability and performance improvements to the
RedisSubscriber
:TaskCondition
notifications rather than rearming timers throughout the class.ConnectionPool
, where connections could be re-used after they were closed. An assertion on the new event driver required this fix.