Skip to content

Commit

Permalink
Merge 5076b50 into f940b8c
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Oct 23, 2024
2 parents f940b8c + 5076b50 commit 606dfdf
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 2 deletions.
16 changes: 14 additions & 2 deletions ydb/library/yql/public/purecalc/common/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,18 @@ void TPushStreamWorker::FeedToConsumer() {
}
}

NYql::NUdf::IBoxedValue* TPushStreamWorker::GetPushStream() const {
auto& ctx = Graph_.ComputationGraph_->GetContext();
NUdf::TUnboxedValue pushStream = SelfNode_->GetValue(ctx);

if (Y_UNLIKELY(pushStream.IsInvalid())) {
SelfNode_->SetValue(ctx, Graph_.ComputationGraph_->GetHolderFactory().Create<TPushStream>());
pushStream = SelfNode_->GetValue(ctx);
}

return pushStream.AsBoxed().Get();
}

void TPushStreamWorker::SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> consumer) {
auto guard = Guard(GetScopedAlloc());
const auto inputsCount = Graph_.SelfNodes_.size();
Expand Down Expand Up @@ -553,7 +565,7 @@ void TPushStreamWorker::Push(NKikimr::NUdf::TUnboxedValue&& value) {
YQL_ENSURE(!Finished_, "OnFinish has already been sent to the consumer; no new values can be pushed");

if (Y_LIKELY(SelfNode_)) {
static_cast<TPushStream*>(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetValue(std::move(value));
static_cast<TPushStream*>(GetPushStream())->SetValue(std::move(value));
}

FeedToConsumer();
Expand All @@ -564,7 +576,7 @@ void TPushStreamWorker::OnFinish() {
YQL_ENSURE(!Finished_, "already finished");

if (Y_LIKELY(SelfNode_)) {
static_cast<TPushStream*>(SelfNode_->GetValue(Graph_.ComputationGraph_->GetContext()).AsBoxed().Get())->SetFinished();
static_cast<TPushStream*>(GetPushStream())->SetFinished();
}

FeedToConsumer();
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/public/purecalc/common/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ namespace NYql {

private:
void FeedToConsumer();
NYql::NUdf::IBoxedValue* GetPushStream() const;

public:
void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) override;
Expand Down
138 changes: 138 additions & 0 deletions ydb/library/yql/public/purecalc/ut/test_push_stream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#include <library/cpp/testing/unittest/registar.h>

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>

#include <ydb/library/yql/public/purecalc/io_specs/protobuf/spec.h>
#include <ydb/library/yql/public/purecalc/ut/protos/test_structs.pb.h>

using namespace NYql::NPureCalc;

namespace {
class TStatelessInputSpec : public TInputSpecBase {
public:
TStatelessInputSpec()
: Schemas_({NYT::TNode::CreateList()
.Add("StructType")
.Add(NYT::TNode::CreateList()
.Add(NYT::TNode::CreateList()
.Add("InputValue")
.Add(NYT::TNode::CreateList()
.Add("DataType")
.Add("Utf8")
)
)
)
})
{};

const TVector<NYT::TNode>& GetSchemas() const override {
return Schemas_;
}

private:
const TVector<NYT::TNode> Schemas_;
};

class TStatelessInputConsumer : public IConsumer<const NYql::NUdf::TUnboxedValue&> {
public:
TStatelessInputConsumer(TWorkerHolder<IPushStreamWorker> worker)
: Worker_(std::move(worker))
{}

void OnObject(const NYql::NUdf::TUnboxedValue& value) override {
with_lock (Worker_->GetScopedAlloc()) {
NYql::NUdf::TUnboxedValue* items = nullptr;
NYql::NUdf::TUnboxedValue result = Worker_->GetGraph().GetHolderFactory().CreateDirectArrayHolder(1, items);

items[0] = value;

Worker_->Push(std::move(result));

// Clear graph after each object because
// values allocated on another allocator and should be released
Worker_->GetGraph().Invalidate();
}
}

void OnFinish() override {
with_lock(Worker_->GetScopedAlloc()) {
Worker_->OnFinish();
}
}

private:
TWorkerHolder<IPushStreamWorker> Worker_;
};

class TStatelessConsumer : public IConsumer<NPureCalcProto::TStringMessage*> {
const TString ExpectedData_;
const ui64 ExpectedRows_;
ui64 RowId_ = 0;

public:
TStatelessConsumer(const TString& expectedData, ui64 expectedRows)
: ExpectedData_(expectedData)
, ExpectedRows_(expectedRows)
{}

void OnObject(NPureCalcProto::TStringMessage* message) override {
UNIT_ASSERT_VALUES_EQUAL_C(ExpectedData_, message->GetX(), RowId_);
RowId_++;
}

void OnFinish() override {
UNIT_ASSERT_VALUES_EQUAL(ExpectedRows_, RowId_);
}
};
}

template <>
struct TInputSpecTraits<TStatelessInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<IConsumer<const NYql::NUdf::TUnboxedValue&>>;

static TConsumerType MakeConsumer(const TStatelessInputSpec&, TWorkerHolder<IPushStreamWorker> worker) {
return MakeHolder<TStatelessInputConsumer>(std::move(worker));
}
};

Y_UNIT_TEST_SUITE(TestPushStream) {
Y_UNIT_TEST(TestGraphInvalidation) {
const auto targetString = "large string >= 14 bytes";
const auto factory = MakeProgramFactory();
const auto sql = TStringBuilder() << "SELECT InputValue AS X FROM Input WHERE InputValue = \"" << targetString << "\";";

const auto program = factory->MakePushStreamProgram(
TStatelessInputSpec(),
TProtobufOutputSpec<NPureCalcProto::TStringMessage>(),
sql
);

const ui64 numberRows = 5;
const auto inputConsumer = program->Apply(MakeHolder<TStatelessConsumer>(targetString, numberRows));
const NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false);

const auto pushString = [&](TString inputValue) {
NYql::NUdf::TUnboxedValue stringValue;
with_lock(alloc) {
stringValue = NKikimr::NMiniKQL::MakeString(inputValue);
}

inputConsumer->OnObject(stringValue);

with_lock(alloc) {
stringValue.Clear();
}
};

for (ui64 i = 0; i < numberRows; ++i) {
pushString(targetString);
pushString("another large string >= 14 bytes");
Cerr << "Computed string " << i << "\n";
}
inputConsumer->OnFinish();
}
}
1 change: 1 addition & 0 deletions ydb/library/yql/public/purecalc/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SRCS(
test_user_data.cpp
test_eval.cpp
test_pool.cpp
test_push_stream.cpp
)

PEERDIR(
Expand Down

0 comments on commit 606dfdf

Please sign in to comment.