Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Http::Stream::reply to Pointer #1855

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Downloader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Downloader::doneAll() const

static void
downloaderRecipient(clientStreamNode * node, ClientHttpRequest * http,
HttpReply * rep, StoreIOBuffer receivedData)
const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
debugs(33, 6, MYNAME);
/* Test preconditions */
Expand All @@ -120,7 +120,7 @@ downloaderRecipient(clientStreamNode * node, ClientHttpRequest * http,
assert(context);

if (context->downloader.valid())
context->downloader->handleReply(node, http, rep, receivedData);
context->downloader->handleReply(node, http, reply, receivedData);
}

static void
Expand Down Expand Up @@ -188,7 +188,7 @@ Downloader::start()
}

void
Downloader::handleReply(clientStreamNode * node, ClientHttpRequest *http, HttpReply *reply, StoreIOBuffer receivedData)
Downloader::handleReply(clientStreamNode * node, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
DownloaderContext::Pointer callerContext = dynamic_cast<DownloaderContext *>(node->data.getRaw());
// TODO: remove the following check:
Expand Down
5 changes: 4 additions & 1 deletion src/Downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class Downloader: virtual public AsyncJob
/// The nested level of Downloader object (downloads inside downloads).
unsigned int nestedLevel() const {return level_;}

void handleReply(clientStreamNode *, ClientHttpRequest *, HttpReply *, StoreIOBuffer);
/* clientStreams API */

/// \copydoc CSCB
void handleReply(clientStreamNode *, ClientHttpRequest *, const HttpReplyPointer &, StoreIOBuffer);

protected:

Expand Down
7 changes: 2 additions & 5 deletions src/clientStreamForward.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,18 @@
#define SQUID_SRC_CLIENTSTREAMFORWARD_H

#include "enums.h" /* for clientStream_status_t */

class Lock;
template <class C> class RefCount;
#include "http/forward.h"

typedef RefCount<Lock> ClientStreamData;

/* Callbacks for ClientStreams API */

class clientStreamNode;
class ClientHttpRequest;
class HttpReply;
class StoreIOBuffer;

/// client stream read callback
typedef void CSCB(clientStreamNode *, ClientHttpRequest *, HttpReply *, StoreIOBuffer);
typedef void CSCB(clientStreamNode *, ClientHttpRequest *, const HttpReplyPointer &, StoreIOBuffer);

/// client stream read
typedef void CSR(clientStreamNode *, ClientHttpRequest *);
Expand Down
12 changes: 6 additions & 6 deletions src/client_side.cc
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ ClientHttpRequest::rangeBoundaryStr() const
*/
void
clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,
HttpReply * rep, StoreIOBuffer receivedData)
const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
// do not try to deliver if client already ABORTED
if (!http->getConn() || !cbdataReferenceValid(http->getConn()) || !Comm::IsConnOpen(http->getConn()->clientConnection))
Expand All @@ -819,11 +819,11 @@ clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,

// TODO: enforces HTTP/1 MUST on pipeline order, but is irrelevant to HTTP/2
if (context != http->getConn()->pipeline.front())
context->deferRecipientForLater(node, rep, receivedData);
context->deferRecipientForLater(node, reply, receivedData);
else if (http->getConn()->cbControlMsgSent) // 1xx to the user is pending
context->deferRecipientForLater(node, rep, receivedData);
context->deferRecipientForLater(node, reply, receivedData);
else
http->getConn()->handleReply(rep, receivedData);
http->getConn()->handleReply(reply, receivedData);
}

/**
Expand Down Expand Up @@ -880,7 +880,7 @@ ClientSocketContextPushDeferredIfNeeded(Http::StreamPointer deferredRequest, Con
/** defer now. */
clientSocketRecipient(deferredRequest->deferredparams.node,
deferredRequest->http,
deferredRequest->deferredparams.rep,
deferredRequest->deferredparams.reply.getRaw(),
deferredRequest->deferredparams.queuedBuffer);
}

Expand Down Expand Up @@ -3628,7 +3628,7 @@ ConnStateData::sendControlMsg(HttpControlMsg msg)
typedef CommCbMemFunT<HttpControlMsgSink, CommIoCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, HttpControlMsgSink::wroteControlMsg);

if (!writeControlMsgAndCall(rep.getRaw(), call)) {
if (!writeControlMsgAndCall(rep, call)) {
// but still inform the caller (so it may resume its operation)
doneWithControlMsg();
}
Expand Down
4 changes: 2 additions & 2 deletions src/client_side.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ class ConnStateData:
void add(const Http::StreamPointer &context);

/// handle a control message received by context from a peer and call back
virtual bool writeControlMsgAndCall(HttpReply *rep, AsyncCall::Pointer &call) = 0;
virtual bool writeControlMsgAndCall(const HttpReplyPointer &, AsyncCall::Pointer &) = 0;

/// ClientStream calls this to supply response header (once) and data
/// for the current Http::Stream.
virtual void handleReply(HttpReply *header, StoreIOBuffer receivedData) = 0;
virtual void handleReply(const HttpReplyPointer &, StoreIOBuffer receivedData) = 0;

/// remove no longer needed leading bytes from the input buffer
void consumeInput(const size_t byteCount);
Expand Down
8 changes: 4 additions & 4 deletions src/esi/Esi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ esiStreamDetach (clientStreamNode *thisNode, ClientHttpRequest *http)
* There is context data or a reply structure
*/
void
esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply *rep, StoreIOBuffer receivedData)
esiProcessStream(clientStreamNode *thisNode, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
/* test preconditions */
assert (thisNode != nullptr);
Expand All @@ -684,7 +684,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply

if (!thisNode->data.getRaw())
/* setup ESI context from reply headers */
thisNode->data = ESIContextNew(rep, thisNode, http);
thisNode->data = ESIContextNew(reply, thisNode, http);

ESIContext::Pointer context = dynamic_cast<ESIContext *>(thisNode->data.getRaw());

Expand All @@ -697,7 +697,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply
* has been detected to prevent ESI processing the error body
*/
if (context->flags.passthrough) {
clientStreamCallback (thisNode, http, rep, receivedData);
clientStreamCallback(thisNode, http, reply, receivedData);
return;
}

Expand Down Expand Up @@ -767,7 +767,7 @@ esiProcessStream (clientStreamNode *thisNode, ClientHttpRequest *http, HttpReply
}

/* EOF / Read error / aborted entry */
if (rep == nullptr && receivedData.data == nullptr && receivedData.length == 0 && !context->flags.finishedtemplate) {
if (!reply && receivedData.data == nullptr && receivedData.length == 0 && !context->flags.finishedtemplate) {
/* TODO: get stream status to test the entry for aborts */
/* else flush the esi processor */
debugs(86, 5, "esiProcess: " << context.getRaw() << " Finished reading upstream data");
Expand Down
16 changes: 8 additions & 8 deletions src/esi/Include.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ esiBufferDetach (clientStreamNode *node, ClientHttpRequest *http)
* not be reinstated or it will trigger bug #975 again - RBC 20060903
*/
void
esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *rep, StoreIOBuffer receivedData)
esiBufferRecipient(clientStreamNode *node, ClientHttpRequest *http, const HttpReplyPointer &reply, StoreIOBuffer receivedData)
{
/* Test preconditions */
assert (node != nullptr);
Expand All @@ -80,24 +80,24 @@ esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *
assert (receivedData.length <= sizeof(esiStream->localbuffer->buf));
assert (!esiStream->finished);

debugs (86,5, "rep " << rep << " body " << receivedData.data << " len " << receivedData.length);
debugs(86, 5, "reply " << reply << " body " << receivedData.data << " len " << receivedData.length);
assert (node->readBuffer.offset == receivedData.offset || receivedData.length == 0);

/* trivial case */

if (http->out.offset != 0) {
assert(rep == nullptr);
assert(!reply);
} else {
if (rep) {
if (rep->sline.status() != Http::scOkay) {
rep = nullptr;
if (reply) {
if (reply->sline.status() != Http::scOkay) {
reply = nullptr;
esiStream->include->includeFail (esiStream);
esiStream->finished = 1;
httpRequestFree (http);
return;
}

rep = nullptr;
reply = nullptr;
}
}

Expand All @@ -121,7 +121,7 @@ esiBufferRecipient (clientStreamNode *node, ClientHttpRequest *http, HttpReply *
}

/* EOF / Read error / aborted entry */
if (rep == nullptr && receivedData.data == nullptr && receivedData.length == 0) {
if (!reply && receivedData.data == nullptr && receivedData.length == 0) {
/* TODO: get stream status to test the entry for aborts */
debugs(86, 5, "Finished reading upstream data in subrequest");
esiStream->include->subRequestDone (esiStream, true);
Expand Down
62 changes: 27 additions & 35 deletions src/http/Stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
clientConnection(aConn),
http(aReq),
reply(nullptr),
writtenToSocket(0),
mayUseConnection_(false),
connRegistered_(false)
Expand All @@ -33,7 +32,6 @@ Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aR
flags.deferred = 0;
flags.parsed_ok = 0;
deferredparams.node = nullptr;
deferredparams.rep = nullptr;
}

Http::Stream::~Stream()
Expand Down Expand Up @@ -135,7 +133,7 @@ Http::Stream::getNextRangeOffset() const
"; reply " << reply);

// XXX: This method is called from many places, including pullData() which
// may be called before prepareReply() [on some Squid-generated errors].
// may be called before sendStartOfMessage() [on some Squid-generated errors].
// Hence, we may not even know yet whether we should honor/do ranges.

if (http->request->range) {
Expand Down Expand Up @@ -263,11 +261,15 @@ Http::Stream::socketState()
}

void
Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
Http::Stream::sendStartOfMessage(const HttpReplyPointer &rep, StoreIOBuffer bodyData)
{
prepareReply(rep);
assert(rep);
MemBuf *mb = rep->pack();
reply = rep;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This HttpReply object may be passed from its raw/unprotected/lockless DeferredParams::reply storage to now-refconted Http::Stream::reply storage. I hope those deferred parameters always belong to the same Http::Stream object, but if they are not, then Http::Stream::reply object might be destroyed (via now-added Http::Stream::reply reference counting) before the corresponding DeferredParams object is destroyed, creating a dangling pointer inside DeferredParams storage. I recommend upgrading Http::Stream::DeferredParams::reply together with Http::Stream::reply (i.e. in this PR) to mitigate that risk.

Copy link
Contributor

@rousskov rousskov Jul 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at branch commit 7516249 (and, to a degree, its parent), it feels like something went wrong while upgrading Http::Stream::DeferredParams::reply storage. If possible, please avoid changing all (or the vast majority of) those functions! During these transitions, it is OK to pass a raw pointer (to a reference-counted HttpReply object) to legacy code that does not store that pointer or stores it properly. AFAICT, most of those modified functions fall into the former category. They should not be modified.

If there are some problems with that approach, let's discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a chain reaction going on here. This is as small a chain of side effects as I can make it.

A) To fix the dangling pointer in Http::Stream::DeferredParams::reply update Http::Stream::DeferredParams::reply to RefCount.

That pushes the dangling pointer problem into ESIIncludeContext and Downloader where class methods using CSCB type for AsyncCall parameters as a raw-pointer (now the dangling one).

B) To fix (A) update CSCB definition to pass RefCount instead. Which pulls in all the CSCB functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a chain reaction going on here. This is as small a chain of side effects as I can make it.

Thank you for detailing this! I hope we can find a way to stop that chain reaction much sooner than branch commit 7516249 does:

A) To fix the dangling pointer in Http::Stream::DeferredParams::reply update Http::Stream::DeferredParams::reply to RefCount.

That pushes the dangling pointer problem into ESIIncludeContext and Downloader where class methods using CSCB type for AsyncCall parameters as a raw-pointer (now the dangling one).

CSCB callbacks are synchronous, right? When I look at Downloader changes in branch commit 7516249, I do not see any long-term HttpReply storage. I only see caller's reply pointer being forwarded to another synchronous call. That kind of forwarding, by itself, does not require smart pointers and does not create dangling raw pointers. Let's temporary assume, to simplify arguments, that downloaderRecipient() is the only CSCB callback in existence. Why do we have to change downloaderRecipient() and CSCB type after changing Http::Stream::DeferredParams::reply?


if (http->request->range)
buildRangeHeader();

auto mb = reply->pack();

// dump now, so we do not output any body.
debugs(11, 2, "HTTP Client " << clientConnection);
Expand All @@ -292,7 +294,7 @@ Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
if (pool->access) {
ACLFilledChecklist chl(pool->access, nullptr);
clientAclChecklistFill(chl, http);
chl.updateReply(rep);
chl.updateReply(reply);
const auto answer = chl.fastCheck();
if (answer.allowed()) {
writeQuotaHandler = pool->createBucket();
Expand Down Expand Up @@ -376,7 +378,7 @@ Http::Stream::noteSentBodyBytes(size_t bytes)

/// \return true when If-Range specs match reply, false otherwise
static bool
clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
clientIfRangeMatch(ClientHttpRequest * http, const HttpReplyPointer &reply)
{
const TimeOrTag spec = http->request->header.getTimeOrTag(Http::HdrType::IF_RANGE);

Expand All @@ -386,7 +388,7 @@ clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)

/* got an ETag? */
if (spec.tag.str) {
ETag rep_tag = rep->header.getETag(Http::HdrType::ETAG);
ETag rep_tag = reply->header.getETag(Http::HdrType::ETAG);
debugs(33, 3, "ETags: " << spec.tag.str << " and " <<
(rep_tag.str ? rep_tag.str : "<none>"));

Expand All @@ -413,42 +415,40 @@ clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
// seems to be something better suited to Server logic
/** adds appropriate Range headers if needed */
void
Http::Stream::buildRangeHeader(HttpReply *rep)
Http::Stream::buildRangeHeader()
{
rousskov marked this conversation as resolved.
Show resolved Hide resolved
HttpHeader *hdr = rep ? &rep->header : nullptr;
Assure(reply);
const auto hdr = &reply->header;
const char *range_err = nullptr;
HttpRequest *request = http->request;
assert(request->range);
/* check if we still want to do ranges */
int64_t roffLimit = request->getRangeOffsetLimit();
auto contentRange = rep ? rep->contentRange() : nullptr;

if (!rep)
range_err = "no [parse-able] reply";
else if ((rep->sline.status() != Http::scOkay) && (rep->sline.status() != Http::scPartialContent))
if ((reply->sline.status() != Http::scOkay) && (reply->sline.status() != Http::scPartialContent))
range_err = "wrong status code";
else if (rep->sline.status() == Http::scPartialContent)
else if (reply->sline.status() == Http::scPartialContent)
range_err = "too complex response"; // probably contains what the client needs
else if (rep->sline.status() != Http::scOkay)
else if (reply->sline.status() != Http::scOkay)
range_err = "wrong status code";
else if (hdr->has(Http::HdrType::CONTENT_RANGE)) {
Must(!contentRange); // this is a 200, not 206 response
Must(!reply->contentRange()); // this is a 200, not 206 response
range_err = "meaningless response"; // the status code or the header is wrong
}
else if (rep->content_length < 0)
else if (reply->content_length < 0)
range_err = "unknown length";
else if (rep->content_length != http->storeEntry()->mem().baseReply().content_length)
else if (reply->content_length != http->storeEntry()->mem().baseReply().content_length)
range_err = "INCONSISTENT length"; /* a bug? */

/* hits only - upstream CachePeer determines correct behaviour on misses,
* and client_side_reply determines hits candidates
*/
else if (http->loggingTags().isTcpHit() &&
http->request->header.has(Http::HdrType::IF_RANGE) &&
!clientIfRangeMatch(http, rep))
!clientIfRangeMatch(http, reply))
range_err = "If-Range match failed";

else if (!http->request->range->canonize(rep))
else if (!http->request->range->canonize(reply.getRaw()))
range_err = "canonization failed";
else if (http->request->range->isComplex())
range_err = "too complex range header";
Expand All @@ -465,21 +465,21 @@ Http::Stream::buildRangeHeader(HttpReply *rep)
http->request->ignoreRange(range_err);
} else {
/* XXX: TODO: Review, this unconditional set may be wrong. */
rep->sline.set(rep->sline.version, Http::scPartialContent);
reply->sline.set(reply->sline.version, Http::scPartialContent);

// before range_iter accesses
const auto actual_clen = http->prepPartialResponseGeneration();

const int spec_count = http->request->range->specs.size();

debugs(33, 3, "range spec count: " << spec_count <<
" virgin clen: " << rep->content_length);
" virgin clen: " << reply->content_length);
assert(spec_count > 0);
/* append appropriate header(s) */
if (spec_count == 1) {
const auto singleSpec = *http->request->range->begin();
assert(singleSpec);
httpHeaderAddContRange(hdr, *singleSpec, rep->content_length);
httpHeaderAddContRange(hdr, *singleSpec, reply->content_length);
} else {
/* multipart! */
/* delete old Content-Type, add ours */
Expand Down Expand Up @@ -552,24 +552,16 @@ Http::Stream::initiateClose(const char *reason)
}

void
Http::Stream::deferRecipientForLater(clientStreamNode *node, HttpReply *rep, StoreIOBuffer receivedData)
Http::Stream::deferRecipientForLater(clientStreamNode *node, const HttpReplyPointer &rep, StoreIOBuffer receivedData)
{
debugs(33, 2, "Deferring request " << http->uri);
assert(flags.deferred == 0);
flags.deferred = 1;
deferredparams.node = node;
deferredparams.rep = rep;
deferredparams.reply = rep;
deferredparams.queuedBuffer = receivedData;
}

void
Http::Stream::prepareReply(HttpReply *rep)
{
reply = rep;
if (http->request->range)
buildRangeHeader(rep);
}

/**
* Packs bodyData into mb using chunked encoding.
* Packs the last-chunk if bodyData is empty.
Expand Down
Loading
Loading