diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index dc1173ebe18db..bb5932a312567 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -151,18 +151,57 @@ Status MakeFlightError(FlightStatusCode code, std::string message, std::make_shared(code, std::move(extra_info))); } -bool FlightDescriptor::Equals(const FlightDescriptor& other) const { - if (type != other.type) { - return false; +static std::ostream& operator<<(std::ostream& os, std::vector values) { + os << '['; + std::string sep = ""; + for (const auto& v : values) { + os << sep << std::quoted(v); + sep = ", "; } - switch (type) { - case PATH: - return path == other.path; - case CMD: - return cmd == other.cmd; - default: - return false; + os << ']'; + + return os; +} + +template +static std::ostream& operator<<(std::ostream& os, std::map m) { + os << '{'; + std::string sep = ""; + if constexpr (std::is_convertible_v) { + // std::string, char*, std::string_view + for (const auto& [k, v] : m) { + os << sep << '[' << k << "]: " << std::quoted(v) << '"'; + sep = ", "; + } + } else { + for (const auto& [k, v] : m) { + os << sep << '[' << k << "]: " << v; + sep = ", "; + } } + os << '}'; + + return os; +} + +//------------------------------------------------------------ +// Wrapper types for Flight RPC protobuf messages + +std::string BasicAuth::ToString() const { + return arrow::util::StringBuilder(""); +} + +bool BasicAuth::Equals(const BasicAuth& other) const { + return (username == other.username) && (password == other.password); +} + +arrow::Status BasicAuth::Deserialize(std::string_view serialized, BasicAuth* out) { + return DeserializeProtoString("BasicAuth", serialized, out); +} + +arrow::Status BasicAuth::SerializeToString(std::string* out) const { + return SerializeToProtoString("BasicAuth", *this, out); } std::string FlightDescriptor::ToString() const { @@ -192,50 +231,18 @@ std::string FlightDescriptor::ToString() const { return ss.str(); } -Status FlightPayload::Validate() const { - static constexpr int64_t kInt32Max = std::numeric_limits::max(); - if (descriptor && descriptor->size() > kInt32Max) { - return Status::CapacityError("Descriptor size overflow (>= 2**31)"); - } - if (app_metadata && app_metadata->size() > kInt32Max) { - return Status::CapacityError("app_metadata size overflow (>= 2**31)"); +bool FlightDescriptor::Equals(const FlightDescriptor& other) const { + if (type != other.type) { + return false; } - if (ipc_message.body_length > kInt32Max) { - return Status::Invalid("Cannot send record batches exceeding 2GiB yet"); + switch (type) { + case PATH: + return path == other.path; + case CMD: + return cmd == other.cmd; + default: + return false; } - return Status::OK(); -} - -arrow::Result> SchemaResult::GetSchema( - ipc::DictionaryMemo* dictionary_memo) const { - // Create a non-owned Buffer to avoid copying - io::BufferReader schema_reader(std::make_shared(raw_schema_)); - return ipc::ReadSchema(&schema_reader, dictionary_memo); -} - -arrow::Result> SchemaResult::Make(const Schema& schema) { - std::string schema_in; - RETURN_NOT_OK(internal::SchemaToString(schema, &schema_in)); - return std::make_unique(std::move(schema_in)); -} - -std::string SchemaResult::ToString() const { - return ""; -} - -bool SchemaResult::Equals(const SchemaResult& other) const { - return raw_schema_ == other.raw_schema_; -} - -arrow::Status SchemaResult::SerializeToString(std::string* out) const { - return SerializeToProtoString("SchemaResult", *this, out); -} - -arrow::Status SchemaResult::Deserialize(std::string_view serialized, SchemaResult* out) { - pb::SchemaResult pb_schema_result; - RETURN_NOT_OK(ParseFromString("SchemaResult", serialized, &pb_schema_result)); - *out = SchemaResult{pb_schema_result.schema()}; - return Status::OK(); } arrow::Status FlightDescriptor::SerializeToString(std::string* out) const { @@ -248,22 +255,6 @@ arrow::Status FlightDescriptor::Deserialize(std::string_view serialized, "FlightDescriptor", serialized, out); } -std::string Ticket::ToString() const { - std::stringstream ss; - ss << ""; - return ss.str(); -} - -bool Ticket::Equals(const Ticket& other) const { return ticket == other.ticket; } - -arrow::Status Ticket::SerializeToString(std::string* out) const { - return SerializeToProtoString("Ticket", *this, out); -} - -arrow::Status Ticket::Deserialize(std::string_view serialized, Ticket* out) { - return DeserializeProtoString("Ticket", serialized, out); -} - arrow::Result FlightInfo::Make(const Schema& schema, const FlightDescriptor& descriptor, const std::vector& endpoints, @@ -431,43 +422,49 @@ arrow::Status CancelFlightInfoRequest::Deserialize(std::string_view serialized, "CancelFlightInfoRequest", serialized, out); } -static const char* const SetSessionOptionStatusNames[] = {"Unspecified", "InvalidName", - "InvalidValue", "Error"}; -static const char* const CloseSessionStatusNames[] = {"Unspecified", "Closed", "Closing", - "NotClosable"}; - -// Helpers for stringifying maps containing various types -std::string ToString(const SetSessionOptionErrorValue& error_value) { - return SetSessionOptionStatusNames[static_cast(error_value)]; +std::string CancelFlightInfoResult::ToString() const { + std::stringstream ss; + ss << ""; + return ss.str(); } -std::ostream& operator<<(std::ostream& os, - const SetSessionOptionErrorValue& error_value) { - os << ToString(error_value); - return os; +bool CancelFlightInfoResult::Equals(const CancelFlightInfoResult& other) const { + return status == other.status; } -std::string ToString(const CloseSessionStatus& status) { - return CloseSessionStatusNames[static_cast(status)]; +arrow::Status CancelFlightInfoResult::SerializeToString(std::string* out) const { + return SerializeToProtoString("CancelFlightInfoResult", + *this, out); } -std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status) { - os << ToString(status); - return os; +arrow::Status CancelFlightInfoResult::Deserialize(std::string_view serialized, + CancelFlightInfoResult* out) { + return DeserializeProtoString( + "CancelFlightInfoResult", serialized, out); } -std::ostream& operator<<(std::ostream& os, std::vector values) { - os << '['; - std::string sep = ""; - for (const auto& v : values) { - os << sep << std::quoted(v); - sep = ", "; +std::ostream& operator<<(std::ostream& os, CancelStatus status) { + switch (status) { + case CancelStatus::kUnspecified: + os << "Unspecified"; + break; + case CancelStatus::kCancelled: + os << "Cancelled"; + break; + case CancelStatus::kCancelling: + os << "Cancelling"; + break; + case CancelStatus::kNotCancellable: + os << "NotCancellable"; + break; } - os << ']'; - return os; } +// Session management messages + +// SessionOptionValue + std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v) { if (std::holds_alternative(v)) { os << ""; @@ -486,33 +483,6 @@ std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v) { return os; } -std::ostream& operator<<(std::ostream& os, const SetSessionOptionsResult::Error& e) { - os << '{' << e.value << '}'; - return os; -} - -template -std::ostream& operator<<(std::ostream& os, std::map m) { - os << '{'; - std::string sep = ""; - if constexpr (std::is_convertible_v) { - // std::string, char*, std::string_view - for (const auto& [k, v] : m) { - os << sep << '[' << k << "]: " << std::quoted(v) << '"'; - sep = ", "; - } - } else { - for (const auto& [k, v] : m) { - os << sep << '[' << k << "]: " << v; - sep = ", "; - } - } - os << '}'; - - return os; -} - -namespace { static bool CompareSessionOptionMaps(const std::map& a, const std::map& b) { if (a.size() != b.size()) { @@ -533,15 +503,30 @@ static bool CompareSessionOptionMaps(const std::map(error_value)]; +} + +std::ostream& operator<<(std::ostream& os, + const SetSessionOptionErrorValue& error_value) { + os << ToString(error_value); + return os; +} // SetSessionOptionsRequest std::string SetSessionOptionsRequest::ToString() const { std::stringstream ss; - ss << "(status)]; +} + +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status) { + os << ToString(status); + return os; +} + // CloseSessionResult std::string CloseSessionResult::ToString() const { std::stringstream ss; - ss << ""; + return ss.str(); +} + +bool Ticket::Equals(const Ticket& other) const { return ticket == other.ticket; } + +arrow::Status Ticket::SerializeToString(std::string* out) const { + return SerializeToProtoString("Ticket", *this, out); +} + +arrow::Status Ticket::Deserialize(std::string_view serialized, Ticket* out) { + return DeserializeProtoString("Ticket", serialized, out); +} + Location::Location() { uri_ = std::make_shared(); } arrow::Result Location::Parse(const std::string& uri_string) { @@ -712,7 +733,6 @@ arrow::Result Location::ForScheme(const std::string& scheme, return Location::Parse(uri_string.str()); } -std::string Location::ToString() const { return uri_->ToString(); } std::string Location::scheme() const { std::string scheme = uri_->scheme(); if (scheme.empty()) { @@ -722,6 +742,8 @@ std::string Location::scheme() const { return scheme; } +std::string Location::ToString() const { return uri_->ToString(); } + bool Location::Equals(const Location& other) const { return ToString() == other.ToString(); } @@ -815,6 +837,20 @@ arrow::Status RenewFlightEndpointRequest::Deserialize(std::string_view serialize serialized, out); } +Status FlightPayload::Validate() const { + static constexpr int64_t kInt32Max = std::numeric_limits::max(); + if (descriptor && descriptor->size() > kInt32Max) { + return Status::CapacityError("Descriptor size overflow (>= 2**31)"); + } + if (app_metadata && app_metadata->size() > kInt32Max) { + return Status::CapacityError("app_metadata size overflow (>= 2**31)"); + } + if (ipc_message.body_length > kInt32Max) { + return Status::Invalid("Cannot send record batches exceeding 2GiB yet"); + } + return Status::OK(); +} + std::string ActionType::ToString() const { return arrow::util::StringBuilder(""); @@ -924,45 +960,40 @@ arrow::Status Result::Deserialize(std::string_view serialized, Result* out) { return DeserializeProtoString("Result", serialized, out); } -std::string CancelFlightInfoResult::ToString() const { - std::stringstream ss; - ss << ""; - return ss.str(); +arrow::Result> SchemaResult::GetSchema( + ipc::DictionaryMemo* dictionary_memo) const { + // Create a non-owned Buffer to avoid copying + io::BufferReader schema_reader(std::make_shared(raw_schema_)); + return ipc::ReadSchema(&schema_reader, dictionary_memo); } -bool CancelFlightInfoResult::Equals(const CancelFlightInfoResult& other) const { - return status == other.status; +arrow::Result> SchemaResult::Make(const Schema& schema) { + std::string schema_in; + RETURN_NOT_OK(internal::SchemaToString(schema, &schema_in)); + return std::make_unique(std::move(schema_in)); } -arrow::Status CancelFlightInfoResult::SerializeToString(std::string* out) const { - return SerializeToProtoString("CancelFlightInfoResult", - *this, out); +std::string SchemaResult::ToString() const { + return ""; } -arrow::Status CancelFlightInfoResult::Deserialize(std::string_view serialized, - CancelFlightInfoResult* out) { - return DeserializeProtoString( - "CancelFlightInfoResult", serialized, out); +bool SchemaResult::Equals(const SchemaResult& other) const { + return raw_schema_ == other.raw_schema_; } -std::ostream& operator<<(std::ostream& os, CancelStatus status) { - switch (status) { - case CancelStatus::kUnspecified: - os << "Unspecified"; - break; - case CancelStatus::kCancelled: - os << "Cancelled"; - break; - case CancelStatus::kCancelling: - os << "Cancelling"; - break; - case CancelStatus::kNotCancellable: - os << "NotCancellable"; - break; - } - return os; +arrow::Status SchemaResult::SerializeToString(std::string* out) const { + return SerializeToProtoString("SchemaResult", *this, out); } +arrow::Status SchemaResult::Deserialize(std::string_view serialized, SchemaResult* out) { + pb::SchemaResult pb_schema_result; + RETURN_NOT_OK(ParseFromString("SchemaResult", serialized, &pb_schema_result)); + *out = SchemaResult{pb_schema_result.schema()}; + return Status::OK(); +} + +//------------------------------------------------------------ + Status ResultStream::Drain() { while (true) { ARROW_ASSIGN_OR_RAISE(auto result, Next()); @@ -1050,23 +1081,6 @@ arrow::Result> SimpleResultStream::Next() { return std::make_unique(std::move(results_[position_++])); } -std::string BasicAuth::ToString() const { - return arrow::util::StringBuilder(""); -} - -bool BasicAuth::Equals(const BasicAuth& other) const { - return (username == other.username) && (password == other.password); -} - -arrow::Status BasicAuth::Deserialize(std::string_view serialized, BasicAuth* out) { - return DeserializeProtoString("BasicAuth", serialized, out); -} - -arrow::Status BasicAuth::SerializeToString(std::string* out) const { - return SerializeToProtoString("BasicAuth", *this, out); -} - //------------------------------------------------------------ // Error propagation helpers diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index bc8e234d977b1..de93750f75b25 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -61,6 +61,18 @@ class Uri; namespace flight { +ARROW_FLIGHT_EXPORT +extern const char* kSchemeGrpc; +ARROW_FLIGHT_EXPORT +extern const char* kSchemeGrpcTcp; +ARROW_FLIGHT_EXPORT +extern const char* kSchemeGrpcUnix; +ARROW_FLIGHT_EXPORT +extern const char* kSchemeGrpcTls; + +class FlightClient; +class FlightServerBase; + /// \brief A timestamp compatible with Protocol Buffer's /// google.protobuf.Timestamp: /// @@ -214,6 +226,40 @@ struct BaseType { } // namespace internal +//------------------------------------------------------------ +// Wrapper types for Flight RPC protobuf messages + +// A wrapper around arrow.flight.protocol.HandshakeRequest is not defined +// A wrapper around arrow.flight.protocol.HandshakeResponse is not defined + +/// \brief message for simple auth +struct ARROW_FLIGHT_EXPORT BasicAuth : public internal::BaseType { + std::string username; + std::string password; + + BasicAuth() = default; + BasicAuth(std::string username, std::string password) + : username(std::move(username)), password(std::move(password)) {} + + std::string ToString() const; + bool Equals(const BasicAuth& other) const; + + using SuperT::Deserialize; + using SuperT::SerializeToString; + + /// \brief Serialize this message to its wire-format representation. + /// + /// Use `SerializeToString()` if you want a Result-returning version. + arrow::Status SerializeToString(std::string* out) const; + + /// \brief Deserialize this message from its wire-format representation. + /// + /// Use `Deserialize(serialized)` if you want a Result-returning version. + static arrow::Status Deserialize(std::string_view serialized, BasicAuth* out); +}; + +// A wrapper around arrow.flight.protocol.Empty is not defined + /// \brief A type of action that can be performed with the DoAction RPC. struct ARROW_FLIGHT_EXPORT ActionType : public internal::BaseType { /// \brief The name of the action. @@ -330,63 +376,26 @@ struct ARROW_FLIGHT_EXPORT Result : public internal::BaseType { static arrow::Status Deserialize(std::string_view serialized, Result* out); }; -enum class CancelStatus { - /// The cancellation status is unknown. Servers should avoid using - /// this value (send a kNotCancellable if the requested FlightInfo - /// is not known). Clients can retry the request. - kUnspecified = 0, - /// The cancellation request is complete. Subsequent requests with - /// the same payload may return kCancelled or a kNotCancellable error. - kCancelled = 1, - /// The cancellation request is in progress. The client may retry - /// the cancellation request. - kCancelling = 2, - // The FlightInfo is not cancellable. The client should not retry the - // cancellation request. - kNotCancellable = 3, -}; - -/// \brief The result of the CancelFlightInfo action. -struct ARROW_FLIGHT_EXPORT CancelFlightInfoResult - : public internal::BaseType { - CancelStatus status = CancelStatus::kUnspecified; - - CancelFlightInfoResult() = default; - CancelFlightInfoResult(CancelStatus status) // NOLINT runtime/explicit - : status(status) {} - - std::string ToString() const; - bool Equals(const CancelFlightInfoResult& other) const; - - using SuperT::Deserialize; - using SuperT::SerializeToString; - - /// \brief Serialize this message to its wire-format representation. - /// - /// Use `SerializeToString()` if you want a Result-returning version. - arrow::Status SerializeToString(std::string* out) const; - - /// \brief Deserialize this message from its wire-format representation. - /// - /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, - CancelFlightInfoResult* out); -}; +/// \brief Schema result returned after a schema request RPC +struct ARROW_FLIGHT_EXPORT SchemaResult : public internal::BaseType { + public: + SchemaResult() = default; + explicit SchemaResult(std::string schema) : raw_schema_(std::move(schema)) {} -ARROW_FLIGHT_EXPORT -std::ostream& operator<<(std::ostream& os, CancelStatus status); + /// \brief Factory method to construct a SchemaResult. + static arrow::Result> Make(const Schema& schema); -/// \brief message for simple auth -struct ARROW_FLIGHT_EXPORT BasicAuth : public internal::BaseType { - std::string username; - std::string password; + /// \brief return schema + /// \param[in,out] dictionary_memo for dictionary bookkeeping, will + /// be modified + /// \return Arrow result with the reconstructed Schema + arrow::Result> GetSchema( + ipc::DictionaryMemo* dictionary_memo) const; - BasicAuth() = default; - BasicAuth(std::string username, std::string password) - : username(std::move(username)), password(std::move(password)) {} + const std::string& serialized_schema() const { return raw_schema_; } std::string ToString() const; - bool Equals(const BasicAuth& other) const; + bool Equals(const SchemaResult& other) const; using SuperT::Deserialize; using SuperT::SerializeToString; @@ -399,7 +408,10 @@ struct ARROW_FLIGHT_EXPORT BasicAuth : public internal::BaseType { /// \brief Deserialize this message from its wire-format representation. /// /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, BasicAuth* out); + static arrow::Status Deserialize(std::string_view serialized, SchemaResult* out); + + private: + std::string raw_schema_; }; /// \brief A request to retrieve or generate a dataset @@ -427,10 +439,9 @@ struct ARROW_FLIGHT_EXPORT FlightDescriptor FlightDescriptor(DescriptorType type, std::string cmd, std::vector path) : type(type), cmd(std::move(cmd)), path(std::move(path)) {} - bool Equals(const FlightDescriptor& other) const; - /// \brief Get a human-readable form of this descriptor. std::string ToString() const; + bool Equals(const FlightDescriptor& other) const; using SuperT::Deserialize; using SuperT::SerializeToString; @@ -462,17 +473,60 @@ struct ARROW_FLIGHT_EXPORT FlightDescriptor } }; -/// \brief Data structure providing an opaque identifier or credential to use -/// when requesting a data stream with the DoGet RPC -struct ARROW_FLIGHT_EXPORT Ticket : public internal::BaseType { - std::string ticket; +/// \brief The access coordinates for retrieval of a dataset, returned by +/// GetFlightInfo +class ARROW_FLIGHT_EXPORT FlightInfo + : public internal::BaseType> { + public: + struct Data { + std::string schema; + FlightDescriptor descriptor; + std::vector endpoints; + int64_t total_records = -1; + int64_t total_bytes = -1; + bool ordered = false; + std::string app_metadata; + }; - Ticket() = default; - Ticket(std::string ticket) // NOLINT runtime/explicit - : ticket(std::move(ticket)) {} + explicit FlightInfo(Data data) : data_(std::move(data)), reconstructed_schema_(false) {} - std::string ToString() const; - bool Equals(const Ticket& other) const; + /// \brief Factory method to construct a FlightInfo. + static arrow::Result Make(const Schema& schema, + const FlightDescriptor& descriptor, + const std::vector& endpoints, + int64_t total_records, int64_t total_bytes, + bool ordered = false, + std::string app_metadata = ""); + + /// \brief Deserialize the Arrow schema of the dataset. Populate any + /// dictionary encoded fields into a DictionaryMemo for + /// bookkeeping + /// \param[in,out] dictionary_memo for dictionary bookkeeping, will + /// be modified + /// \return Arrow result with the reconstructed Schema + arrow::Result> GetSchema( + ipc::DictionaryMemo* dictionary_memo) const; + + const std::string& serialized_schema() const { return data_.schema; } + + /// The descriptor associated with this flight, may not be set + const FlightDescriptor& descriptor() const { return data_.descriptor; } + + /// A list of endpoints associated with the flight (dataset). To consume the + /// whole flight, all endpoints must be consumed + const std::vector& endpoints() const { return data_.endpoints; } + + /// The total number of records (rows) in the dataset. If unknown, set to -1 + int64_t total_records() const { return data_.total_records; } + + /// The total number of bytes in the dataset. If unknown, set to -1 + int64_t total_bytes() const { return data_.total_bytes; } + + /// Whether endpoints are in the same order as the data. + bool ordered() const { return data_.ordered; } + + /// Application-defined opaque metadata + const std::string& app_metadata() const { return data_.app_metadata; } using SuperT::Deserialize; using SuperT::SerializeToString; @@ -491,53 +545,233 @@ struct ARROW_FLIGHT_EXPORT Ticket : public internal::BaseType { /// services) that may want to return Flight types. /// /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, Ticket* out); -}; - -class FlightClient; -class FlightServerBase; + static arrow::Status Deserialize(std::string_view serialized, + std::unique_ptr* out); -ARROW_FLIGHT_EXPORT -extern const char* kSchemeGrpc; -ARROW_FLIGHT_EXPORT -extern const char* kSchemeGrpcTcp; -ARROW_FLIGHT_EXPORT -extern const char* kSchemeGrpcUnix; -ARROW_FLIGHT_EXPORT -extern const char* kSchemeGrpcTls; + std::string ToString() const; -/// \brief A host location (a URI) -struct ARROW_FLIGHT_EXPORT Location : public internal::BaseType { - public: - /// \brief Initialize a blank location. - Location(); + /// Compare two FlightInfo for equality. This will compare the + /// serialized schema representations, NOT the logical equality of + /// the schemas. + bool Equals(const FlightInfo& other) const; - /// \brief Initialize a location by parsing a URI string - static arrow::Result Parse(const std::string& uri_string); + private: + Data data_; + mutable std::shared_ptr schema_; + mutable bool reconstructed_schema_; +}; - /// \brief Get the fallback URI. - /// - /// arrow-flight-reuse-connection://? means that a client may attempt to - /// reuse an existing connection to a Flight service to fetch data instead - /// of creating a new connection to one of the other locations listed in a - /// FlightEndpoint response. - static const Location& ReuseConnection(); +/// \brief The information to process a long-running query. +class ARROW_FLIGHT_EXPORT PollInfo + : public internal::BaseType> { + public: + /// The currently available results so far. + std::unique_ptr info = NULLPTR; + /// The descriptor the client should use on the next try. If unset, + /// the query is complete. + std::optional descriptor = std::nullopt; + /// Query progress. Must be in [0.0, 1.0] but need not be + /// monotonic or nondecreasing. If unknown, do not set. + std::optional progress = std::nullopt; + /// Expiration time for this request. After this passes, the server + /// might not accept the poll descriptor anymore (and the query may + /// be cancelled). This may be updated on a call to PollFlightInfo. + std::optional expiration_time = std::nullopt; - /// \brief Initialize a location for a non-TLS, gRPC-based Flight - /// service from a host and port - /// \param[in] host The hostname to connect to - /// \param[in] port The port - /// \return Arrow result with the resulting location - static arrow::Result ForGrpcTcp(const std::string& host, const int port); + PollInfo() + : info(NULLPTR), + descriptor(std::nullopt), + progress(std::nullopt), + expiration_time(std::nullopt) {} - /// \brief Initialize a location for a TLS-enabled, gRPC-based Flight - /// service from a host and port - /// \param[in] host The hostname to connect to - /// \param[in] port The port - /// \return Arrow result with the resulting location - static arrow::Result ForGrpcTls(const std::string& host, const int port); + PollInfo(std::unique_ptr info, std::optional descriptor, + std::optional progress, std::optional expiration_time) + : info(std::move(info)), + descriptor(std::move(descriptor)), + progress(progress), + expiration_time(expiration_time) {} - /// \brief Initialize a location for a domain socket-based Flight + PollInfo(const PollInfo& other) + : info(other.info ? std::make_unique(*other.info) : NULLPTR), + descriptor(other.descriptor), + progress(other.progress), + expiration_time(other.expiration_time) {} + PollInfo(PollInfo&& other) noexcept = default; + ~PollInfo() = default; + PollInfo& operator=(const PollInfo& other) { + info = other.info ? std::make_unique(*other.info) : NULLPTR; + descriptor = other.descriptor; + progress = other.progress; + expiration_time = other.expiration_time; + return *this; + } + PollInfo& operator=(PollInfo&& other) = default; + + using SuperT::Deserialize; + using SuperT::SerializeToString; + + /// \brief Get the wire-format representation of this type. + /// + /// Useful when interoperating with non-Flight systems (e.g. REST + /// services) that may want to return Flight types. + /// + /// Use `SerializeToString()` if you want a Result-returning version. + arrow::Status SerializeToString(std::string* out) const; + + /// \brief Parse the wire-format representation of this type. + /// + /// Useful when interoperating with non-Flight systems (e.g. REST + /// services) that may want to return Flight types. + /// + /// Use `Deserialize(serialized)` if you want a Result-returning version. + static arrow::Status Deserialize(std::string_view serialized, + std::unique_ptr* out); + + std::string ToString() const; + + /// Compare two PollInfo for equality. This will compare the + /// serialized schema representations, NOT the logical equality of + /// the schemas. + bool Equals(const PollInfo& other) const; +}; + +/// \brief The request of the CancelFlightInfoRequest action. +struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest + : public internal::BaseType { + std::unique_ptr info; + + CancelFlightInfoRequest() = default; + CancelFlightInfoRequest(std::unique_ptr info) // NOLINT runtime/explicit + : info(std::move(info)) {} + + std::string ToString() const; + bool Equals(const CancelFlightInfoRequest& other) const; + + using SuperT::Deserialize; + using SuperT::SerializeToString; + + /// \brief Serialize this message to its wire-format representation. + /// + /// Use `SerializeToString()` if you want a Result-returning version. + arrow::Status SerializeToString(std::string* out) const; + + /// \brief Deserialize this message from its wire-format representation. + /// + /// Use `Deserialize(serialized)` if you want a Result-returning version. + static arrow::Status Deserialize(std::string_view serialized, + CancelFlightInfoRequest* out); +}; + +enum class CancelStatus { + /// The cancellation status is unknown. Servers should avoid using + /// this value (send a kNotCancellable if the requested FlightInfo + /// is not known). Clients can retry the request. + kUnspecified = 0, + /// The cancellation request is complete. Subsequent requests with + /// the same payload may return kCancelled or a kNotCancellable error. + kCancelled = 1, + /// The cancellation request is in progress. The client may retry + /// the cancellation request. + kCancelling = 2, + // The FlightInfo is not cancellable. The client should not retry the + // cancellation request. + kNotCancellable = 3, +}; + +/// \brief The result of the CancelFlightInfo action. +struct ARROW_FLIGHT_EXPORT CancelFlightInfoResult + : public internal::BaseType { + CancelStatus status = CancelStatus::kUnspecified; + + CancelFlightInfoResult() = default; + CancelFlightInfoResult(CancelStatus status) // NOLINT runtime/explicit + : status(status) {} + + std::string ToString() const; + bool Equals(const CancelFlightInfoResult& other) const; + + using SuperT::Deserialize; + using SuperT::SerializeToString; + + /// \brief Serialize this message to its wire-format representation. + /// + /// Use `SerializeToString()` if you want a Result-returning version. + arrow::Status SerializeToString(std::string* out) const; + + /// \brief Deserialize this message from its wire-format representation. + /// + /// Use `Deserialize(serialized)` if you want a Result-returning version. + static arrow::Status Deserialize(std::string_view serialized, + CancelFlightInfoResult* out); +}; + +ARROW_FLIGHT_EXPORT +std::ostream& operator<<(std::ostream& os, CancelStatus status); + +/// \brief Data structure providing an opaque identifier or credential to use +/// when requesting a data stream with the DoGet RPC +struct ARROW_FLIGHT_EXPORT Ticket : public internal::BaseType { + std::string ticket; + + Ticket() = default; + Ticket(std::string ticket) // NOLINT runtime/explicit + : ticket(std::move(ticket)) {} + + std::string ToString() const; + bool Equals(const Ticket& other) const; + + using SuperT::Deserialize; + using SuperT::SerializeToString; + + /// \brief Get the wire-format representation of this type. + /// + /// Useful when interoperating with non-Flight systems (e.g. REST + /// services) that may want to return Flight types. + /// + /// Use `SerializeToString()` if you want a Result-returning version. + arrow::Status SerializeToString(std::string* out) const; + + /// \brief Parse the wire-format representation of this type. + /// + /// Useful when interoperating with non-Flight systems (e.g. REST + /// services) that may want to return Flight types. + /// + /// Use `Deserialize(serialized)` if you want a Result-returning version. + static arrow::Status Deserialize(std::string_view serialized, Ticket* out); +}; + +/// \brief A host location (a URI) +struct ARROW_FLIGHT_EXPORT Location : public internal::BaseType { + public: + /// \brief Initialize a blank location. + Location(); + + /// \brief Initialize a location by parsing a URI string + static arrow::Result Parse(const std::string& uri_string); + + /// \brief Get the fallback URI. + /// + /// arrow-flight-reuse-connection://? means that a client may attempt to + /// reuse an existing connection to a Flight service to fetch data instead + /// of creating a new connection to one of the other locations listed in a + /// FlightEndpoint response. + static const Location& ReuseConnection(); + + /// \brief Initialize a location for a non-TLS, gRPC-based Flight + /// service from a host and port + /// \param[in] host The hostname to connect to + /// \param[in] port The port + /// \return Arrow result with the resulting location + static arrow::Result ForGrpcTcp(const std::string& host, const int port); + + /// \brief Initialize a location for a TLS-enabled, gRPC-based Flight + /// service from a host and port + /// \param[in] host The hostname to connect to + /// \param[in] port The port + /// \return Arrow result with the resulting location + static arrow::Result ForGrpcTls(const std::string& host, const int port); + + /// \brief Initialize a location for a domain socket-based Flight /// service /// \param[in] path The path to the domain socket /// \return Arrow result with the resulting location @@ -547,12 +781,11 @@ struct ARROW_FLIGHT_EXPORT Location : public internal::BaseType { static arrow::Result ForScheme(const std::string& scheme, const std::string& host, const int port); - /// \brief Get a representation of this URI as a string. - std::string ToString() const; - /// \brief Get the scheme of this URI. std::string scheme() const; + /// \brief Get a representation of this URI as a string. + std::string ToString() const; bool Equals(const Location& other) const; using SuperT::Deserialize; @@ -645,6 +878,8 @@ struct ARROW_FLIGHT_EXPORT RenewFlightEndpointRequest RenewFlightEndpointRequest* out); }; +// FlightData in Flight.proto maps to FlightPayload here. + /// \brief Staging data structure for messages about to be put on the wire /// /// This structure corresponds to FlightData in the protocol. @@ -664,217 +899,30 @@ struct ARROW_FLIGHT_EXPORT FlightPayload { Status Validate() const; }; -/// \brief Schema result returned after a schema request RPC -struct ARROW_FLIGHT_EXPORT SchemaResult : public internal::BaseType { - public: - SchemaResult() = default; - explicit SchemaResult(std::string schema) : raw_schema_(std::move(schema)) {} - - /// \brief Factory method to construct a SchemaResult. - static arrow::Result> Make(const Schema& schema); +// A wrapper around arrow.flight.protocol.PutResult is not defined - /// \brief return schema - /// \param[in,out] dictionary_memo for dictionary bookkeeping, will - /// be modified - /// \return Arrow result with the reconstructed Schema - arrow::Result> GetSchema( - ipc::DictionaryMemo* dictionary_memo) const; +// Session management messages - const std::string& serialized_schema() const { return raw_schema_; } - - std::string ToString() const; - bool Equals(const SchemaResult& other) const; - - using SuperT::Deserialize; - using SuperT::SerializeToString; - - /// \brief Serialize this message to its wire-format representation. - /// - /// Use `SerializeToString()` if you want a Result-returning version. - arrow::Status SerializeToString(std::string* out) const; - - /// \brief Deserialize this message from its wire-format representation. - /// - /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, SchemaResult* out); - - private: - std::string raw_schema_; -}; - -/// \brief The access coordinates for retrieval of a dataset, returned by -/// GetFlightInfo -class ARROW_FLIGHT_EXPORT FlightInfo - : public internal::BaseType> { - public: - struct Data { - std::string schema; - FlightDescriptor descriptor; - std::vector endpoints; - int64_t total_records = -1; - int64_t total_bytes = -1; - bool ordered = false; - std::string app_metadata; - }; - - explicit FlightInfo(Data data) : data_(std::move(data)), reconstructed_schema_(false) {} - - /// \brief Factory method to construct a FlightInfo. - static arrow::Result Make(const Schema& schema, - const FlightDescriptor& descriptor, - const std::vector& endpoints, - int64_t total_records, int64_t total_bytes, - bool ordered = false, - std::string app_metadata = ""); - - /// \brief Deserialize the Arrow schema of the dataset. Populate any - /// dictionary encoded fields into a DictionaryMemo for - /// bookkeeping - /// \param[in,out] dictionary_memo for dictionary bookkeeping, will - /// be modified - /// \return Arrow result with the reconstructed Schema - arrow::Result> GetSchema( - ipc::DictionaryMemo* dictionary_memo) const; - - const std::string& serialized_schema() const { return data_.schema; } - - /// The descriptor associated with this flight, may not be set - const FlightDescriptor& descriptor() const { return data_.descriptor; } - - /// A list of endpoints associated with the flight (dataset). To consume the - /// whole flight, all endpoints must be consumed - const std::vector& endpoints() const { return data_.endpoints; } - - /// The total number of records (rows) in the dataset. If unknown, set to -1 - int64_t total_records() const { return data_.total_records; } - - /// The total number of bytes in the dataset. If unknown, set to -1 - int64_t total_bytes() const { return data_.total_bytes; } - - /// Whether endpoints are in the same order as the data. - bool ordered() const { return data_.ordered; } - - /// Application-defined opaque metadata - const std::string& app_metadata() const { return data_.app_metadata; } - - using SuperT::Deserialize; - using SuperT::SerializeToString; - - /// \brief Get the wire-format representation of this type. - /// - /// Useful when interoperating with non-Flight systems (e.g. REST - /// services) that may want to return Flight types. - /// - /// Use `SerializeToString()` if you want a Result-returning version. - arrow::Status SerializeToString(std::string* out) const; - - /// \brief Parse the wire-format representation of this type. - /// - /// Useful when interoperating with non-Flight systems (e.g. REST - /// services) that may want to return Flight types. - /// - /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, - std::unique_ptr* out); - - std::string ToString() const; - - /// Compare two FlightInfo for equality. This will compare the - /// serialized schema representations, NOT the logical equality of - /// the schemas. - bool Equals(const FlightInfo& other) const; - - private: - Data data_; - mutable std::shared_ptr schema_; - mutable bool reconstructed_schema_; -}; - -/// \brief The information to process a long-running query. -class ARROW_FLIGHT_EXPORT PollInfo - : public internal::BaseType> { - public: - /// The currently available results so far. - std::unique_ptr info = NULLPTR; - /// The descriptor the client should use on the next try. If unset, - /// the query is complete. - std::optional descriptor = std::nullopt; - /// Query progress. Must be in [0.0, 1.0] but need not be - /// monotonic or nondecreasing. If unknown, do not set. - std::optional progress = std::nullopt; - /// Expiration time for this request. After this passes, the server - /// might not accept the poll descriptor anymore (and the query may - /// be cancelled). This may be updated on a call to PollFlightInfo. - std::optional expiration_time = std::nullopt; - - PollInfo() - : info(NULLPTR), - descriptor(std::nullopt), - progress(std::nullopt), - expiration_time(std::nullopt) {} - - PollInfo(std::unique_ptr info, std::optional descriptor, - std::optional progress, std::optional expiration_time) - : info(std::move(info)), - descriptor(std::move(descriptor)), - progress(progress), - expiration_time(expiration_time) {} - - PollInfo(const PollInfo& other) - : info(other.info ? std::make_unique(*other.info) : NULLPTR), - descriptor(other.descriptor), - progress(other.progress), - expiration_time(other.expiration_time) {} - PollInfo(PollInfo&& other) noexcept = default; - ~PollInfo() = default; - PollInfo& operator=(const PollInfo& other) { - info = other.info ? std::make_unique(*other.info) : NULLPTR; - descriptor = other.descriptor; - progress = other.progress; - expiration_time = other.expiration_time; - return *this; - } - PollInfo& operator=(PollInfo&& other) = default; - - using SuperT::Deserialize; - using SuperT::SerializeToString; - - /// \brief Get the wire-format representation of this type. - /// - /// Useful when interoperating with non-Flight systems (e.g. REST - /// services) that may want to return Flight types. - /// - /// Use `SerializeToString()` if you want a Result-returning version. - arrow::Status SerializeToString(std::string* out) const; - - /// \brief Parse the wire-format representation of this type. - /// - /// Useful when interoperating with non-Flight systems (e.g. REST - /// services) that may want to return Flight types. - /// - /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, - std::unique_ptr* out); - - std::string ToString() const; - - /// Compare two PollInfo for equality. This will compare the - /// serialized schema representations, NOT the logical equality of - /// the schemas. - bool Equals(const PollInfo& other) const; -}; +/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions +/// +/// By convention, an attempt to set a valueless (std::monostate) SessionOptionValue +/// should attempt to unset or clear the named option value on the server. +using SessionOptionValue = std::variant>; +std::ostream& operator<<(std::ostream& os, const SessionOptionValue& v); -/// \brief The request of the CancelFlightInfoRequest action. -struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest - : public internal::BaseType { - std::unique_ptr info; +/// \brief A request to set a set of session options by name/value. +struct ARROW_FLIGHT_EXPORT SetSessionOptionsRequest + : public internal::BaseType { + std::map session_options; - CancelFlightInfoRequest() = default; - CancelFlightInfoRequest(std::unique_ptr info) // NOLINT runtime/explicit - : info(std::move(info)) {} + SetSessionOptionsRequest() = default; + explicit SetSessionOptionsRequest( + std::map session_options) + : session_options(std::move(session_options)) {} std::string ToString() const; - bool Equals(const CancelFlightInfoRequest& other) const; + bool Equals(const SetSessionOptionsRequest& other) const; using SuperT::Deserialize; using SuperT::SerializeToString; @@ -888,16 +936,9 @@ struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest /// /// Use `Deserialize(serialized)` if you want a Result-returning version. static arrow::Status Deserialize(std::string_view serialized, - CancelFlightInfoRequest* out); + SetSessionOptionsRequest* out); }; -/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions -/// -/// By convention, an attempt to set a valueless (std::monostate) SessionOptionValue -/// should attempt to unset or clear the named option value on the server. -using SessionOptionValue = std::variant>; - /// \brief The result of setting a session option. enum class SetSessionOptionErrorValue : int8_t { /// \brief The status of setting the option is unknown. @@ -915,57 +956,6 @@ enum class SetSessionOptionErrorValue : int8_t { std::string ToString(const SetSessionOptionErrorValue& error_value); std::ostream& operator<<(std::ostream& os, const SetSessionOptionErrorValue& error_value); -/// \brief The result of closing a session. -enum class CloseSessionStatus : int8_t { - // \brief The session close status is unknown. - // - // Servers should avoid using this value (send a NOT_FOUND error if the requested - // session is not known). Clients can retry the request. - kUnspecified, - // \brief The session close request is complete. - // - // Subsequent requests with the same session produce a NOT_FOUND error. - kClosed, - // \brief The session close request is in progress. - // - // The client may retry the request. - kClosing, - // \brief The session is not closeable. - // - // The client should not retry the request. - kNotClosable -}; -std::string ToString(const CloseSessionStatus& status); -std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status); - -/// \brief A request to set a set of session options by name/value. -struct ARROW_FLIGHT_EXPORT SetSessionOptionsRequest - : public internal::BaseType { - std::map session_options; - - SetSessionOptionsRequest() = default; - explicit SetSessionOptionsRequest( - std::map session_options) - : session_options(std::move(session_options)) {} - - std::string ToString() const; - bool Equals(const SetSessionOptionsRequest& other) const; - - using SuperT::Deserialize; - using SuperT::SerializeToString; - - /// \brief Serialize this message to its wire-format representation. - /// - /// Use `SerializeToString()` if you want a Result-returning version. - arrow::Status SerializeToString(std::string* out) const; - - /// \brief Deserialize this message from its wire-format representation. - /// - /// Use `Deserialize(serialized)` if you want a Result-returning version. - static arrow::Status Deserialize(std::string_view serialized, - SetSessionOptionsRequest* out); -}; - /// \brief The result(s) of setting session option(s). struct ARROW_FLIGHT_EXPORT SetSessionOptionsResult : public internal::BaseType { @@ -1078,6 +1068,29 @@ struct ARROW_FLIGHT_EXPORT CloseSessionRequest static arrow::Status Deserialize(std::string_view serialized, CloseSessionRequest* out); }; +/// \brief The result of closing a session. +enum class CloseSessionStatus : int8_t { + // \brief The session close status is unknown. + // + // Servers should avoid using this value (send a NOT_FOUND error if the requested + // session is not known). Clients can retry the request. + kUnspecified, + // \brief The session close request is complete. + // + // Subsequent requests with the same session produce a NOT_FOUND error. + kClosed, + // \brief The session close request is in progress. + // + // The client may retry the request. + kClosing, + // \brief The session is not closeable. + // + // The client should not retry the request. + kNotClosable +}; +std::string ToString(const CloseSessionStatus& status); +std::ostream& operator<<(std::ostream& os, const CloseSessionStatus& status); + /// \brief The result of attempting to close the client session. struct ARROW_FLIGHT_EXPORT CloseSessionResult : public internal::BaseType { @@ -1104,6 +1117,8 @@ struct ARROW_FLIGHT_EXPORT CloseSessionResult static arrow::Status Deserialize(std::string_view serialized, CloseSessionResult* out); }; +//------------------------------------------------------------ + /// \brief An iterator to FlightInfo instances returned by ListFlights. class ARROW_FLIGHT_EXPORT FlightListing { public: diff --git a/format/Flight.proto b/format/Flight.proto index 4963e8c09ae47..2187a51ed48f4 100644 --- a/format/Flight.proto +++ b/format/Flight.proto @@ -208,24 +208,6 @@ message Action { bytes body = 2; } -/* - * The request of the CancelFlightInfo action. - * - * The request should be stored in Action.body. - */ -message CancelFlightInfoRequest { - FlightInfo info = 1; -} - -/* - * The request of the RenewFlightEndpoint action. - * - * The request should be stored in Action.body. - */ -message RenewFlightEndpointRequest { - FlightEndpoint endpoint = 1; -} - /* * An opaque result returned after executing an action. */ @@ -233,36 +215,6 @@ message Result { bytes body = 1; } -/* - * The result of a cancel operation. - * - * This is used by CancelFlightInfoResult.status. - */ -enum CancelStatus { - // The cancellation status is unknown. Servers should avoid using - // this value (send a NOT_FOUND error if the requested query is - // not known). Clients can retry the request. - CANCEL_STATUS_UNSPECIFIED = 0; - // The cancellation request is complete. Subsequent requests with - // the same payload may return CANCELLED or a NOT_FOUND error. - CANCEL_STATUS_CANCELLED = 1; - // The cancellation request is in progress. The client may retry - // the cancellation request. - CANCEL_STATUS_CANCELLING = 2; - // The query is not cancellable. The client should not retry the - // cancellation request. - CANCEL_STATUS_NOT_CANCELLABLE = 3; -} - -/* - * The result of the CancelFlightInfo action. - * - * The result should be stored in Result.body. - */ -message CancelFlightInfoResult { - CancelStatus status = 1; -} - /* * Wrap the result of a getSchema call */ @@ -423,6 +375,64 @@ message PollInfo { google.protobuf.Timestamp expiration_time = 4; } +/* + * The request of the CancelFlightInfo action. + * + * The request should be stored in Action.body. + */ +message CancelFlightInfoRequest { + FlightInfo info = 1; +} + +/* + * The result of a cancel operation. + * + * This is used by CancelFlightInfoResult.status. + */ +enum CancelStatus { + // The cancellation status is unknown. Servers should avoid using + // this value (send a NOT_FOUND error if the requested query is + // not known). Clients can retry the request. + CANCEL_STATUS_UNSPECIFIED = 0; + // The cancellation request is complete. Subsequent requests with + // the same payload may return CANCELLED or a NOT_FOUND error. + CANCEL_STATUS_CANCELLED = 1; + // The cancellation request is in progress. The client may retry + // the cancellation request. + CANCEL_STATUS_CANCELLING = 2; + // The query is not cancellable. The client should not retry the + // cancellation request. + CANCEL_STATUS_NOT_CANCELLABLE = 3; +} + +/* + * The result of the CancelFlightInfo action. + * + * The result should be stored in Result.body. + */ +message CancelFlightInfoResult { + CancelStatus status = 1; +} + +/* + * An opaque identifier that the service can use to retrieve a particular + * portion of a stream. + * + * Tickets are meant to be single use. It is an error/application-defined + * behavior to reuse a ticket. + */ +message Ticket { + bytes ticket = 1; +} + +/* + * A location where a Flight service will accept retrieval of a particular + * stream given a ticket. + */ +message Location { + string uri = 1; +} + /* * A particular stream or split associated with a flight. */ @@ -475,22 +485,12 @@ message FlightEndpoint { } /* - * A location where a Flight service will accept retrieval of a particular - * stream given a ticket. - */ -message Location { - string uri = 1; -} - -/* - * An opaque identifier that the service can use to retrieve a particular - * portion of a stream. + * The request of the RenewFlightEndpoint action. * - * Tickets are meant to be single use. It is an error/application-defined - * behavior to reuse a ticket. + * The request should be stored in Action.body. */ -message Ticket { - bytes ticket = 1; +message RenewFlightEndpointRequest { + FlightEndpoint endpoint = 1; } /*