From 383c2d994dc9256724fa77a66e75da90901dee01 Mon Sep 17 00:00:00 2001 From: Rahil Chertara Date: Thu, 22 Feb 2024 10:00:55 -0800 Subject: [PATCH] Add Pagination To List Apis --- .../apache/iceberg/rest/PaginatedList.java | 269 ++++++++++++++++++ .../iceberg/rest/RESTSessionCatalog.java | 44 +-- .../responses/ListNamespacesResponse.java | 21 +- .../rest/responses/ListTablesResponse.java | 21 +- .../apache/iceberg/rest/responses/Route.java | 25 ++ .../responses/TestListNamespacesResponse.java | 8 + .../responses/TestListTablesResponse.java | 8 + 7 files changed, 356 insertions(+), 40 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/rest/PaginatedList.java create mode 100644 core/src/main/java/org/apache/iceberg/rest/responses/Route.java diff --git a/core/src/main/java/org/apache/iceberg/rest/PaginatedList.java b/core/src/main/java/org/apache/iceberg/rest/PaginatedList.java new file mode 100644 index 000000000000..dcc3d8123fe3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/PaginatedList.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Supplier; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.iceberg.rest.responses.Route; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PaginatedList implements List { + private static final Logger LOG = LoggerFactory.getLogger(PaginatedList.class); + + private static final String UNMODIFIABLE_MESSAGE = "This is an unmodifiable list"; + private RESTClient client; + private ResourcePaths paths; + private Supplier> headers; + private String pageToken; + private String pageSize; + private List pageItems; + private Map queryParams; + private Namespace namespace; + private Route route; + + public PaginatedList( + RESTClient client, + ResourcePaths paths, + Supplier> headers, + String pageSize, + Namespace namespace, + Route route) { + this.client = client; + this.paths = paths; + this.headers = headers; + this.pageSize = pageSize; + this.pageToken = ""; // start protocol with empty token + this.pageItems = Lists.newArrayList(); + this.queryParams = Maps.newHashMap(); + this.namespace = namespace; + this.route = route; + } + + @Override + public int size() { + return pageItems.size(); + } + + @Override + public boolean isEmpty() { + return !iterator().hasNext(); + } + + @Override + public boolean contains(Object o) { + return pageItems.contains(o); + } + + @Override + public Iterator iterator() { + return new Iterator() { + @Override + public boolean hasNext() { + if (!pageItems.isEmpty()) { + return true; + } + getPaginatedItems(); + return !pageItems.isEmpty(); + } + + @Override + public T next() { + return pageItems.remove(0); + } + }; + } + + public void getPaginatedItems() { + if (pageToken == null) { + return; + } + if (pageSize == null) { + throw new ValidationException("if pageToken is present, pageSize must be set"); + } + queryParams.put("pageToken", pageToken); + queryParams.put("pageSize", pageSize); + LOG.info("Sending request with pageToken: {}, pageSize: {}", pageToken, pageSize); + + switch (route) { + case LIST_NAMESPACES: + if (!namespace.isEmpty()) { + queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels())); + } + ListNamespacesResponse listNamespacesResponse = + client.get( + paths.namespaces(), + queryParams, + ListNamespacesResponse.class, + headers, + ErrorHandlers.namespaceErrorHandler()); + LOG.info("Received paginated response {}", listNamespacesResponse); + pageToken = listNamespacesResponse.nextPageToken(); + pageItems.addAll((Collection) listNamespacesResponse.namespaces()); + return; + + case LIST_TABLES: + ListTablesResponse listTablesResponse = + client.get( + paths.tables(namespace), + queryParams, + ListTablesResponse.class, + headers, + ErrorHandlers.namespaceErrorHandler()); + LOG.info("Received paginated response {}", listTablesResponse); + pageToken = listTablesResponse.nextPageToken(); + pageItems.addAll((Collection) listTablesResponse.identifiers()); + return; + + case LIST_VIEWS: + ListTablesResponse listViewsResponse = + client.get( + paths.views(namespace), + queryParams, + ListTablesResponse.class, + headers, + ErrorHandlers.namespaceErrorHandler()); + LOG.info("Received paginated response {}", listViewsResponse); + pageToken = listViewsResponse.nextPageToken(); + pageItems.addAll((Collection) listViewsResponse.identifiers()); + return; + + default: + throw new UnsupportedOperationException("Invalid route: " + route); + } + } + + @Override + public Object[] toArray() { + return pageItems.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return pageItems.toArray(a); + } + + @Override + public boolean add(T item) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public boolean containsAll(Collection c) { + return pageItems.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public boolean addAll(int index, Collection c) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public T get(int index) { + return pageItems.get(index); + } + + @Override + public T set(int index, T element) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public void add(int index, T element) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public T remove(int index) { + throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE); + } + + @Override + public int indexOf(Object o) { + int indexOf = pageItems.indexOf(o); + if (indexOf >= 0) { + return indexOf; + } + return -1; + } + + @Override + public int lastIndexOf(Object o) { + return pageItems.lastIndexOf(o); + } + + @Override + public ListIterator listIterator() { + throw new UnsupportedOperationException("ListIterators are not supported for this list"); + } + + @Override + public ListIterator listIterator(int index) { + throw new UnsupportedOperationException("ListIterators are not supported for this list"); + } + + @Override + public List subList(int fromIndex, int toIndex) { + return Collections.unmodifiableList(pageItems.subList(fromIndex, toIndex)); + } + + @Override + public Spliterator spliterator() { + // Required to override, else encounter: Accept exceeded fixed size of 0 + return Spliterators.spliteratorUnknownSize(this.iterator(), Spliterator.ORDERED); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 742caa9494f4..5f394e54acc9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -86,11 +86,10 @@ import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; -import org.apache.iceberg.rest.responses.ListNamespacesResponse; -import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.rest.responses.Route; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.EnvironmentUtil; import org.apache.iceberg.util.Pair; @@ -114,6 +113,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO"; private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled"; private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"; + public static final String REST_PAGE_SIZE = "rest-page-size"; private static final List TOKEN_PREFERENCE_ORDER = ImmutableList.of( OAuth2Properties.ID_TOKEN_TYPE, @@ -136,6 +136,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private FileIO io = null; private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; + private String restPageSize = null; private CloseableGroup closeables = null; // a lazy thread pool for token refresh @@ -224,6 +225,7 @@ public void initialize(String name, Map unresolved) { client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth); } + this.restPageSize = mergedProps.get(REST_PAGE_SIZE); this.io = newFileIO(SessionContext.createEmpty(), mergedProps); this.fileIOCloser = newFileIOCloser(); @@ -274,14 +276,8 @@ public void setConf(Object newConf) { @Override public List listTables(SessionContext context, Namespace ns) { checkNamespaceIsValid(ns); - - ListTablesResponse response = - client.get( - paths.tables(ns), - ListTablesResponse.class, - headers(context), - ErrorHandlers.namespaceErrorHandler()); - return response.identifiers(); + return new PaginatedList<>( + client, paths, headers(context), restPageSize, ns, Route.LIST_TABLES); } @Override @@ -490,22 +486,8 @@ public void createNamespace( @Override public List listNamespaces(SessionContext context, Namespace namespace) { - Map queryParams; - if (namespace.isEmpty()) { - queryParams = ImmutableMap.of(); - } else { - // query params should be unescaped - queryParams = ImmutableMap.of("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels())); - } - - ListNamespacesResponse response = - client.get( - paths.namespaces(), - queryParams, - ListNamespacesResponse.class, - headers(context), - ErrorHandlers.namespaceErrorHandler()); - return response.namespaces(); + return new PaginatedList<>( + client, paths, headers(context), restPageSize, namespace, Route.LIST_NAMESPACES); } @Override @@ -1044,14 +1026,8 @@ public void commitTransaction(SessionContext context, List commits) @Override public List listViews(SessionContext context, Namespace namespace) { checkNamespaceIsValid(namespace); - - ListTablesResponse response = - client.get( - paths.views(namespace), - ListTablesResponse.class, - headers(context), - ErrorHandlers.namespaceErrorHandler()); - return response.identifiers(); + return new PaginatedList<>( + client, paths, headers(context), restPageSize, namespace, Route.LIST_VIEWS); } @Override diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/ListNamespacesResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/ListNamespacesResponse.java index 13a599e1a76c..e36bbdbe26e9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/responses/ListNamespacesResponse.java +++ b/core/src/main/java/org/apache/iceberg/rest/responses/ListNamespacesResponse.java @@ -29,13 +29,15 @@ public class ListNamespacesResponse implements RESTResponse { private List namespaces; + private String nextPageToken; public ListNamespacesResponse() { // Required for Jackson deserialization } - private ListNamespacesResponse(List namespaces) { + private ListNamespacesResponse(List namespaces, String nextPageToken) { this.namespaces = namespaces; + this.nextPageToken = nextPageToken; validate(); } @@ -48,9 +50,16 @@ public List namespaces() { return namespaces != null ? namespaces : ImmutableList.of(); } + public String nextPageToken() { + return nextPageToken; + } + @Override public String toString() { - return MoreObjects.toStringHelper(this).add("namespaces", namespaces()).toString(); + return MoreObjects.toStringHelper(this) + .add("namespaces", namespaces()) + .add("next-page-token", nextPageToken()) + .toString(); } public static Builder builder() { @@ -59,6 +68,7 @@ public static Builder builder() { public static class Builder { private final ImmutableList.Builder namespaces = ImmutableList.builder(); + private String nextPageToken = null; private Builder() {} @@ -75,8 +85,13 @@ public Builder addAll(Collection toAdd) { return this; } + public Builder nextPageToken(String pageToken) { + nextPageToken = pageToken; + return this; + } + public ListNamespacesResponse build() { - return new ListNamespacesResponse(namespaces.build()); + return new ListNamespacesResponse(namespaces.build(), nextPageToken); } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/ListTablesResponse.java b/core/src/main/java/org/apache/iceberg/rest/responses/ListTablesResponse.java index 3c99c12c9023..f67aa0b06644 100644 --- a/core/src/main/java/org/apache/iceberg/rest/responses/ListTablesResponse.java +++ b/core/src/main/java/org/apache/iceberg/rest/responses/ListTablesResponse.java @@ -30,13 +30,15 @@ public class ListTablesResponse implements RESTResponse { private List identifiers; + private String nextPageToken; public ListTablesResponse() { // Required for Jackson deserialization } - private ListTablesResponse(List identifiers) { + private ListTablesResponse(List identifiers, String nextPageToken) { this.identifiers = identifiers; + this.nextPageToken = nextPageToken; validate(); } @@ -49,9 +51,16 @@ public List identifiers() { return identifiers != null ? identifiers : ImmutableList.of(); } + public String nextPageToken() { + return nextPageToken; + } + @Override public String toString() { - return MoreObjects.toStringHelper(this).add("identifiers", identifiers).toString(); + return MoreObjects.toStringHelper(this) + .add("identifiers", identifiers) + .add("next-page-token", nextPageToken()) + .toString(); } public static Builder builder() { @@ -60,6 +69,7 @@ public static Builder builder() { public static class Builder { private final ImmutableList.Builder identifiers = ImmutableList.builder(); + private String nextPageToken = null; private Builder() {} @@ -76,8 +86,13 @@ public Builder addAll(Collection toAdd) { return this; } + public Builder nextPageToken(String pageToken) { + nextPageToken = pageToken; + return this; + } + public ListTablesResponse build() { - return new ListTablesResponse(identifiers.build()); + return new ListTablesResponse(identifiers.build(), nextPageToken); } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/Route.java b/core/src/main/java/org/apache/iceberg/rest/responses/Route.java new file mode 100644 index 000000000000..799379b75a23 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/responses/Route.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.responses; + +public enum Route { + LIST_NAMESPACES, + LIST_TABLES, + LIST_VIEWS +} diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestListNamespacesResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestListNamespacesResponse.java index bfe5a662b219..ccffff241464 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestListNamespacesResponse.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestListNamespacesResponse.java @@ -83,6 +83,14 @@ public void testBuilderDoesNotCreateInvalidObjects() { .hasMessage("Invalid namespace: null"); } + @Test + public void testWithNullPaginationToken() { + ListNamespacesResponse response = + ListNamespacesResponse.builder().addAll(NAMESPACES).nextPageToken(null).build(); + Assertions.assertThat(response.nextPageToken()).isNull(); + Assertions.assertThat(response.namespaces()).isEqualTo(NAMESPACES); + } + @Override public String[] allFieldsFromSpec() { return new String[] {"namespaces"}; diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java index 116d43a6d147..308b199bd514 100644 --- a/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java +++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestListTablesResponse.java @@ -105,6 +105,14 @@ public void testBuilderDoesNotCreateInvalidObjects() { .hasMessage("Invalid table identifier: null"); } + @Test + public void testWithNullPaginationToken() { + ListTablesResponse response = + ListTablesResponse.builder().addAll(IDENTIFIERS).nextPageToken(null).build(); + Assertions.assertThat(response.nextPageToken()).isNull(); + Assertions.assertThat(response.identifiers()).isEqualTo(IDENTIFIERS); + } + @Override public String[] allFieldsFromSpec() { return new String[] {"identifiers"};