Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40037: [C++][FS][Azure] Make attempted reads and writes against directories fail fast #40119

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 76 additions & 20 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <atomic>
#include <chrono>
#include <iostream>
#include <memory>
#include <optional>

Expand Down Expand Up @@ -347,6 +348,22 @@ bool IsContainerNotFound(const Storage::StorageException& e) {
return false;
}

const auto kHierarchicalNamespaceIsDirectoryMetadataKey = "hdi_isFolder";
const auto kFlatNamespaceIsDirectoryMetadataKey = "is_directory";

bool MetadataIndicatesIsDirectory(const Storage::Metadata& metadata) {
// Inspired by
// https://github.com/Azure/azure-sdk-for-cpp/blob/12407e8bfcb9bc1aa43b253c1d0ec93bf795ae3b/sdk/storage/azure-storage-files-datalake/src/datalake_utilities.cpp#L86-L91
auto hierarchical_directory_metadata =
metadata.find(kHierarchicalNamespaceIsDirectoryMetadataKey);
if (hierarchical_directory_metadata != metadata.end()) {
return hierarchical_directory_metadata->second == "true";
}
auto flat_directory_metadata = metadata.find(kFlatNamespaceIsDirectoryMetadataKey);
return flat_directory_metadata != metadata.end() &&
flat_directory_metadata->second == "true";
}

template <typename ArrowType>
std::string FormatValue(typename TypeTraits<ArrowType>::CType value) {
struct StringAppender {
Expand Down Expand Up @@ -512,11 +529,18 @@ class ObjectInputFile final : public io::RandomAccessFile {

Status Init() {
if (content_length_ != kNoSize) {
// When the user provides the file size we don't validate that its a file. This is
// only a read so its not a big deal if the user make a mistake.
DCHECK_GE(content_length_, 0);
return Status::OK();
}
try {
// To open an ObjectInputFile the Blob must exist and it must not represent
// a directory. Additionally we need to know the file size.
auto properties = blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
metadata_ = PropertiesToMetadata(properties.Value);
return Status::OK();
Expand Down Expand Up @@ -698,11 +722,10 @@ class ObjectAppendStream final : public io::OutputStream {
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options, int64_t size = kNoSize)
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location),
content_length_(size) {
location_(location) {
if (metadata && metadata->size() != 0) {
metadata_ = ArrowMetadataToAzureMetadata(metadata);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
Expand All @@ -716,17 +739,31 @@ class ObjectAppendStream final : public io::OutputStream {
io::internal::CloseFromDestructor(this);
}

Status Init() {
if (content_length_ != kNoSize) {
DCHECK_GE(content_length_, 0);
pos_ = content_length_;
Status Init(const bool truncate,
std::function<Status()> ensure_not_flat_namespace_directory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can inject AzureFileSystem *azure_file_system here and not have to allocate a closure for this. You would call AzureFileSystem::Impl::EnsureNotFlatNamespaceDirectory(location) via azure_file_system->impl_ (accessible because the handles produced by the azure file system can be friends with the filesystem class).

Copy link
Contributor Author

@Tom-Newton Tom-Newton Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extra info. I was planning to do this but I was struggling with the friends thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trick is to forward-declare the class as well.

diff --git a/cpp/src/arrow/filesystem/azurefs.h b/cpp/src/arrow/filesystem/azurefs.h
index 2a131e40c..d48ef9dd7 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -44,6 +44,7 @@ class DataLakeServiceClient;

 namespace arrow::fs {

+class ObjectAppendStream;
 class TestAzureFileSystem;

 /// Options for the AzureFileSystem implementation.
@@ -180,6 +181,7 @@ class ARROW_EXPORT AzureFileSystem : public FileSystem {

   explicit AzureFileSystem(std::unique_ptr<Impl>&& impl);

+  friend class ObjectAppendStream;
   friend class TestAzureFileSystem;
   void ForceCachedHierarchicalNamespaceSupport(int hns_support);

Copy link
Contributor Author

@Tom-Newton Tom-Newton Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my main problem was that ObjectAppendStream is defined inside an anonymous namespace but I still haven't got it working as you describe.

Are you suggesting to use AzureFileSystem *azure_file_system or AzureFileSystem:Impl *azure_file_system as the argument to ObjectAppendStream::Impl. I don't know how I can get a AzureFileSystem pointer from inside AzureFileSystem::Impl and using AzureFileSystem::Impl as the argument leads to incomplete type errors which I don't think I can avoid.

Also if you wouldn't mind I would be interested to know what the disadvantage of a lambda function is compared to what you proposed.

Sorry about my lacking C++ knowledge here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if you wouldn't mind I would be interested to know what the disadvantage of a lambda function is compared to what you proposed.

To create the std::function, you heap allocate an object with copies of the values in the capture list and generate a lot more extra code in the binary:

class function {
  T valuesfromthecpapturelist;
  
  RetType operator()(ArgsType ...) {...};
}

When you think about an std::function this way (a pair of context data and a function), you realize the class you already serves that purpose.

But hey, this is becoming challenging, so I won't hold the PR anymore because of this. Moving to Init() was a big step in the right direction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining

if (truncate) {
content_length_ = 0;
pos_ = 0;
// Create an empty file overwriting any existing file, but fail if there is an
// existing directory.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
// On hierarchical namespace CreateEmptyBlockBlob will fail if there is an existing
// directory so we don't need to check like we do on flat namespace.
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
try {
auto properties = block_blob_client_->GetProperties();
if (MetadataIndicatesIsDirectory(properties.Value.Metadata)) {
return NotAFile(location_);
}
content_length_ = properties.Value.BlobSize;
pos_ = content_length_;
} catch (const Storage::StorageException& exception) {
if (exception.StatusCode == Http::HttpStatusCode::NotFound) {
// No file exists but on flat namespace its possible there is a directory
// marker or an implied directory. Ensure there is no directory before starting
// a new empty file.
RETURN_NOT_OK(ensure_not_flat_namespace_directory());
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client_));
} else {
return ExceptionToStatus(
Expand All @@ -743,6 +780,7 @@ class ObjectAppendStream final : public io::OutputStream {
block_ids_.push_back(block.Name);
}
}
initialised_ = true;
return Status::OK();
}

Expand Down Expand Up @@ -789,6 +827,11 @@ class ObjectAppendStream final : public io::OutputStream {

Status Flush() override {
RETURN_NOT_OK(CheckClosed("flush"));
if (!initialised_) {
// If the stream has not been successfully initialized then there is nothing to
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, metadata_);
}

Expand Down Expand Up @@ -840,10 +883,11 @@ class ObjectAppendStream final : public io::OutputStream {
std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
int64_t content_length_ = kNoSize;

bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
int64_t content_length_ = kNoSize;
std::vector<std::string> block_ids_;
Storage::Metadata metadata_;
};
Expand Down Expand Up @@ -1662,20 +1706,32 @@ class AzureFileSystem::Impl {
AzureFileSystem* fs) {
RETURN_NOT_OK(ValidateFileLocation(location));

const auto blob_container_client = GetBlobContainerClient(location.container);
auto block_blob_client = std::make_shared<Blobs::BlockBlobClient>(
blob_service_client_->GetBlobContainerClient(location.container)
.GetBlockBlobClient(location.path));
blob_container_client.GetBlockBlobClient(location.path));

auto ensure_not_flat_namespace_directory = [this, location,
blob_container_client]() -> Status {
ARROW_ASSIGN_OR_RAISE(
auto hns_support,
HierarchicalNamespaceSupport(GetFileSystemClient(location.container)));
if (hns_support == HNSSupport::kDisabled) {
// Flat namespace so we need to GetFileInfo in-case its a directory.
ARROW_ASSIGN_OR_RAISE(auto status, GetFileInfo(blob_container_client, location))
if (status.type() == FileType::Directory) {
return NotAFile(location);
}
}
// kContainerNotFound - it doesn't exist, so no need to check if its a directory.
// kEnabled - hierarchical namespace so Azure APIs will fail if its a directory. We
// don't need to explicitly check.
return Status::OK();
};

std::shared_ptr<ObjectAppendStream> stream;
if (truncate) {
RETURN_NOT_OK(CreateEmptyBlockBlob(*block_blob_client));
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_, 0);
} else {
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
}
RETURN_NOT_OK(stream->Init());
stream = std::make_shared<ObjectAppendStream>(block_blob_client, fs->io_context(),
location, metadata, options_);
RETURN_NOT_OK(stream->Init(truncate, ensure_not_flat_namespace_directory));
return stream;
}

Expand All @@ -1690,7 +1746,7 @@ class AzureFileSystem::Impl {
// on directory marker blobs.
// https://github.com/fsspec/adlfs/blob/32132c4094350fca2680155a5c236f2e9f991ba5/adlfs/spec.py#L855-L870
Blobs::UploadBlockBlobFromOptions blob_options;
blob_options.Metadata.emplace("is_directory", "true");
blob_options.Metadata.emplace(kFlatNamespaceIsDirectoryMetadataKey, "true");
block_blob_client.UploadFrom(nullptr, 0, blob_options);
}

Expand Down
61 changes: 61 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,41 @@ class TestAzureFileSystem : public ::testing::Test {
AssertFileInfo(fs(), subdir3, FileType::Directory);
}

void TestDisallowReadingOrWritingDirectoryMarkers() {
auto data = SetUpPreexistingData();
auto directory_path = data.Path("directory");

ASSERT_OK(fs()->CreateDir(directory_path));
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path));

auto directory_path_with_slash = directory_path + "/";
ASSERT_RAISES(IOError, fs()->OpenInputFile(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(directory_path_with_slash));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(directory_path_with_slash));
}

void TestDisallowCreatingFileAndDirectoryWithTheSameName() {
auto data = SetUpPreexistingData();
auto path1 = data.Path("directory1");
ASSERT_OK(fs()->CreateDir(path1));
ASSERT_RAISES(IOError, fs()->OpenOutputStream(path1));
ASSERT_RAISES(IOError, fs()->OpenAppendStream(path1));
AssertFileInfo(fs(), path1, FileType::Directory);

auto path2 = data.Path("directory2");
ASSERT_OK(fs()->OpenOutputStream(path2));
// CreateDir returns OK even if there is already a file or directory at this
// location. Whether or not this is the desired behaviour is debatable.
ASSERT_OK(fs()->CreateDir(path2));
AssertFileInfo(fs(), path2, FileType::File);
}

void TestOpenOutputStreamWithMissingContainer() {
ASSERT_RAISES(IOError, fs()->OpenOutputStream("not-a-container/file", {}));
}

void TestDeleteDirSuccessEmpty() {
if (HasSubmitBatchBug()) {
GTEST_SKIP() << kSubmitBatchBugMessage;
Expand Down Expand Up @@ -1559,6 +1594,20 @@ TYPED_TEST(TestAzureFileSystemOnAllScenarios, CreateDirOnMissingContainer) {
this->TestCreateDirOnMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DisallowReadingOrWritingDirectoryMarkers) {
this->TestDisallowReadingOrWritingDirectoryMarkers();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios,
DisallowCreatingFileAndDirectoryWithTheSameName) {
this->TestDisallowCreatingFileAndDirectoryWithTheSameName();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios,
OpenOutputStreamWithMissingContainer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is causing the linter to fail @Tom-Newton. Please fix and I will merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

this->TestOpenOutputStreamWithMissingContainer();
}

TYPED_TEST(TestAzureFileSystemOnAllScenarios, DeleteDirSuccessEmpty) {
this->TestDeleteDirSuccessEmpty();
}
Expand Down Expand Up @@ -2162,6 +2211,18 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "foo")}, blob_metadata);

// Metadata can be written without writing any data.
ASSERT_OK_AND_ASSIGN(
output, fs_with_defaults->OpenAppendStream(
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "baz"}})));
ASSERT_OK(output->Close());
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
.Value.Metadata;
// Defaults are overwritten and not merged.
EXPECT_EQ(Core::CaseInsensitiveMap{std::make_pair("bar", "baz")}, blob_metadata);
}

TEST_F(TestAzuriteFileSystem, OpenOutputStreamSmall) {
Expand Down
Loading