Skip to content

Commit

Permalink
Add Pagination To List Apis
Browse files Browse the repository at this point in the history
  • Loading branch information
Rahil Chertara committed Mar 26, 2024
1 parent 3058e30 commit 383c2d9
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 40 deletions.
269 changes: 269 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/PaginatedList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Supplier;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.Route;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PaginatedList<T> implements List<T> {
private static final Logger LOG = LoggerFactory.getLogger(PaginatedList.class);

private static final String UNMODIFIABLE_MESSAGE = "This is an unmodifiable list";
private RESTClient client;
private ResourcePaths paths;
private Supplier<Map<String, String>> headers;
private String pageToken;
private String pageSize;
private List<T> pageItems;
private Map<String, String> queryParams;
private Namespace namespace;
private Route route;

public PaginatedList(
RESTClient client,
ResourcePaths paths,
Supplier<Map<String, String>> headers,
String pageSize,
Namespace namespace,
Route route) {
this.client = client;
this.paths = paths;
this.headers = headers;
this.pageSize = pageSize;
this.pageToken = ""; // start protocol with empty token
this.pageItems = Lists.newArrayList();
this.queryParams = Maps.newHashMap();
this.namespace = namespace;
this.route = route;
}

@Override
public int size() {
return pageItems.size();
}

@Override
public boolean isEmpty() {
return !iterator().hasNext();
}

@Override
public boolean contains(Object o) {
return pageItems.contains(o);
}

@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
if (!pageItems.isEmpty()) {
return true;
}
getPaginatedItems();
return !pageItems.isEmpty();
}

@Override
public T next() {
return pageItems.remove(0);
}
};
}

public void getPaginatedItems() {
if (pageToken == null) {
return;
}
if (pageSize == null) {
throw new ValidationException("if pageToken is present, pageSize must be set");
}
queryParams.put("pageToken", pageToken);
queryParams.put("pageSize", pageSize);
LOG.info("Sending request with pageToken: {}, pageSize: {}", pageToken, pageSize);

switch (route) {
case LIST_NAMESPACES:
if (!namespace.isEmpty()) {
queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}
ListNamespacesResponse listNamespacesResponse =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers,
ErrorHandlers.namespaceErrorHandler());
LOG.info("Received paginated response {}", listNamespacesResponse);
pageToken = listNamespacesResponse.nextPageToken();
pageItems.addAll((Collection<? extends T>) listNamespacesResponse.namespaces());
return;

case LIST_TABLES:
ListTablesResponse listTablesResponse =
client.get(
paths.tables(namespace),
queryParams,
ListTablesResponse.class,
headers,
ErrorHandlers.namespaceErrorHandler());
LOG.info("Received paginated response {}", listTablesResponse);
pageToken = listTablesResponse.nextPageToken();
pageItems.addAll((Collection<? extends T>) listTablesResponse.identifiers());
return;

case LIST_VIEWS:
ListTablesResponse listViewsResponse =
client.get(
paths.views(namespace),
queryParams,
ListTablesResponse.class,
headers,
ErrorHandlers.namespaceErrorHandler());
LOG.info("Received paginated response {}", listViewsResponse);
pageToken = listViewsResponse.nextPageToken();
pageItems.addAll((Collection<? extends T>) listViewsResponse.identifiers());
return;

default:
throw new UnsupportedOperationException("Invalid route: " + route);
}
}

@Override
public Object[] toArray() {
return pageItems.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return pageItems.toArray(a);
}

@Override
public boolean add(T item) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public boolean containsAll(Collection<?> c) {
return pageItems.containsAll(c);
}

@Override
public boolean addAll(Collection<? extends T> c) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public boolean addAll(int index, Collection<? extends T> c) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public void clear() {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public T get(int index) {
return pageItems.get(index);
}

@Override
public T set(int index, T element) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public void add(int index, T element) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public T remove(int index) {
throw new UnsupportedOperationException(UNMODIFIABLE_MESSAGE);
}

@Override
public int indexOf(Object o) {
int indexOf = pageItems.indexOf(o);
if (indexOf >= 0) {
return indexOf;
}
return -1;
}

@Override
public int lastIndexOf(Object o) {
return pageItems.lastIndexOf(o);
}

@Override
public ListIterator<T> listIterator() {
throw new UnsupportedOperationException("ListIterators are not supported for this list");
}

@Override
public ListIterator<T> listIterator(int index) {
throw new UnsupportedOperationException("ListIterators are not supported for this list");
}

@Override
public List<T> subList(int fromIndex, int toIndex) {
return Collections.unmodifiableList(pageItems.subList(fromIndex, toIndex));
}

@Override
public Spliterator<T> spliterator() {
// Required to override, else encounter: Accept exceeded fixed size of 0
return Spliterators.spliteratorUnknownSize(this.iterator(), Spliterator.ORDERED);
}
}
44 changes: 10 additions & 34 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.CreateNamespaceResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.rest.responses.Route;
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.EnvironmentUtil;
import org.apache.iceberg.util.Pair;
Expand All @@ -114,6 +113,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
public static final String REST_PAGE_SIZE = "rest-page-size";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
Expand All @@ -136,6 +136,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private FileIO io = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private String restPageSize = null;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
Expand Down Expand Up @@ -224,6 +225,7 @@ public void initialize(String name, Map<String, String> unresolved) {
client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth);
}

this.restPageSize = mergedProps.get(REST_PAGE_SIZE);
this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
Expand Down Expand Up @@ -274,14 +276,8 @@ public void setConf(Object newConf) {
@Override
public List<TableIdentifier> listTables(SessionContext context, Namespace ns) {
checkNamespaceIsValid(ns);

ListTablesResponse response =
client.get(
paths.tables(ns),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
return new PaginatedList<>(
client, paths, headers(context), restPageSize, ns, Route.LIST_TABLES);
}

@Override
Expand Down Expand Up @@ -490,22 +486,8 @@ public void createNamespace(

@Override
public List<Namespace> listNamespaces(SessionContext context, Namespace namespace) {
Map<String, String> queryParams;
if (namespace.isEmpty()) {
queryParams = ImmutableMap.of();
} else {
// query params should be unescaped
queryParams = ImmutableMap.of("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}

ListNamespacesResponse response =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.namespaces();
return new PaginatedList<>(
client, paths, headers(context), restPageSize, namespace, Route.LIST_NAMESPACES);
}

@Override
Expand Down Expand Up @@ -1044,14 +1026,8 @@ public void commitTransaction(SessionContext context, List<TableCommit> commits)
@Override
public List<TableIdentifier> listViews(SessionContext context, Namespace namespace) {
checkNamespaceIsValid(namespace);

ListTablesResponse response =
client.get(
paths.views(namespace),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
return new PaginatedList<>(
client, paths, headers(context), restPageSize, namespace, Route.LIST_VIEWS);
}

@Override
Expand Down
Loading

0 comments on commit 383c2d9

Please sign in to comment.