Skip to content

Commit

Permalink
Core: Add pagination when listing namespaces/tables/views (apache#9782)
Browse files Browse the repository at this point in the history
  • Loading branch information
rahil-c authored and Sasank Pagolu committed Oct 27, 2024
1 parent c2e1673 commit db3377c
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 43 deletions.
58 changes: 58 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

public class CatalogHandlers {
private static final Schema EMPTY_SCHEMA = new Schema();
private static final String INTIAL_PAGE_TOKEN = "";

private CatalogHandlers() {}

Expand Down Expand Up @@ -117,6 +118,29 @@ public static ListNamespacesResponse listNamespaces(
return ListNamespacesResponse.builder().addAll(results).build();
}

public static ListNamespacesResponse listNamespaces(
SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) {
List<Namespace> results;
List<Namespace> subResults;

if (parent.isEmpty()) {
results = catalog.listNamespaces();
} else {
results = catalog.listNamespaces(parent);
}

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListNamespacesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static CreateNamespaceResponse createNamespace(
SupportsNamespaces catalog, CreateNamespaceRequest request) {
Namespace namespace = request.namespace();
Expand Down Expand Up @@ -174,6 +198,23 @@ public static ListTablesResponse listTables(Catalog catalog, Namespace namespace
return ListTablesResponse.builder().addAll(idents).build();
}

public static ListTablesResponse listTables(
Catalog catalog, Namespace namespace, String pageToken, String pageSize) {
List<TableIdentifier> results = catalog.listTables(namespace);
List<TableIdentifier> subResults;

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListTablesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static LoadTableResponse stageTableCreate(
Catalog catalog, Namespace namespace, CreateTableRequest request) {
request.validate();
Expand Down Expand Up @@ -397,6 +438,23 @@ public static ListTablesResponse listViews(ViewCatalog catalog, Namespace namesp
return ListTablesResponse.builder().addAll(catalog.listViews(namespace)).build();
}

public static ListTablesResponse listViews(
ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) {
List<TableIdentifier> results = catalog.listViews(namespace);
List<TableIdentifier> subResults;

int start = INTIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
int end = start + Integer.parseInt(pageSize);
subResults = results.subList(start, end);
String nextToken = String.valueOf(end);

if (end >= results.size()) {
nextToken = null;
}

return ListTablesResponse.builder().addAll(subResults).nextPageToken(nextToken).build();
}

public static LoadViewResponse createView(
ViewCatalog catalog, Namespace namespace, CreateViewRequest request) {
request.validate();
Expand Down
99 changes: 71 additions & 28 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final String REST_METRICS_REPORTING_ENABLED = "rest-metrics-reporting-enabled";
private static final String REST_SNAPSHOT_LOADING_MODE = "snapshot-loading-mode";
public static final String REST_PAGE_SIZE = "rest-page-size";
private static final List<String> TOKEN_PREFERENCE_ORDER =
ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE,
Expand All @@ -136,6 +137,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private FileIO io = null;
private MetricsReporter reporter = null;
private boolean reportingViaRestEnabled;
private Integer pageSize = null;
private CloseableGroup closeables = null;

// a lazy thread pool for token refresh
Expand Down Expand Up @@ -228,6 +230,12 @@ public void initialize(String name, Map<String, String> unresolved) {
client, tokenRefreshExecutor(name), token, expiresAtMillis(mergedProps), catalogAuth);
}

this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE);
if (pageSize != null) {
Preconditions.checkArgument(
pageSize > 0, "Invalid value for %s, must be a positive integer", REST_PAGE_SIZE);
}

this.io = newFileIO(SessionContext.createEmpty(), mergedProps);

this.fileIOCloser = newFileIOCloser();
Expand Down Expand Up @@ -278,14 +286,27 @@ public void setConf(Object newConf) {
@Override
public List<TableIdentifier> listTables(SessionContext context, Namespace ns) {
checkNamespaceIsValid(ns);
Map<String, String> queryParams = Maps.newHashMap();
ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

ListTablesResponse response =
client.get(
paths.tables(ns),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
do {
queryParams.put("pageToken", pageToken);
ListTablesResponse response =
client.get(
paths.tables(ns),
queryParams,
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
tables.addAll(response.identifiers());
} while (pageToken != null);

return tables.build();
}

@Override
Expand Down Expand Up @@ -494,22 +515,31 @@ public void createNamespace(

@Override
public List<Namespace> listNamespaces(SessionContext context, Namespace namespace) {
Map<String, String> queryParams;
if (namespace.isEmpty()) {
queryParams = ImmutableMap.of();
} else {
// query params should be unescaped
queryParams = ImmutableMap.of("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
Map<String, String> queryParams = Maps.newHashMap();
if (!namespace.isEmpty()) {
queryParams.put("parent", RESTUtil.NAMESPACE_JOINER.join(namespace.levels()));
}

ListNamespacesResponse response =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.namespaces();
ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

do {
queryParams.put("pageToken", pageToken);
ListNamespacesResponse response =
client.get(
paths.namespaces(),
queryParams,
ListNamespacesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
namespaces.addAll(response.namespaces());
} while (pageToken != null);

return namespaces.build();
}

@Override
Expand Down Expand Up @@ -1048,14 +1078,27 @@ public void commitTransaction(SessionContext context, List<TableCommit> commits)
@Override
public List<TableIdentifier> listViews(SessionContext context, Namespace namespace) {
checkNamespaceIsValid(namespace);
Map<String, String> queryParams = Maps.newHashMap();
ImmutableList.Builder<TableIdentifier> views = ImmutableList.builder();
String pageToken = "";
if (pageSize != null) {
queryParams.put("pageSize", String.valueOf(pageSize));
}

ListTablesResponse response =
client.get(
paths.views(namespace),
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
return response.identifiers();
do {
queryParams.put("pageToken", pageToken);
ListTablesResponse response =
client.get(
paths.views(namespace),
queryParams,
ListTablesResponse.class,
headers(context),
ErrorHandlers.namespaceErrorHandler());
pageToken = response.nextPageToken();
views.addAll(response.identifiers());
} while (pageToken != null);

return views.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
public class ListNamespacesResponse implements RESTResponse {

private List<Namespace> namespaces;
private String nextPageToken;

public ListNamespacesResponse() {
// Required for Jackson deserialization
}

private ListNamespacesResponse(List<Namespace> namespaces) {
private ListNamespacesResponse(List<Namespace> namespaces, String nextPageToken) {
this.namespaces = namespaces;
this.nextPageToken = nextPageToken;
validate();
}

Expand All @@ -48,9 +50,16 @@ public List<Namespace> namespaces() {
return namespaces != null ? namespaces : ImmutableList.of();
}

public String nextPageToken() {
return nextPageToken;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("namespaces", namespaces()).toString();
return MoreObjects.toStringHelper(this)
.add("namespaces", namespaces())
.add("next-page-token", nextPageToken())
.toString();
}

public static Builder builder() {
Expand All @@ -59,6 +68,7 @@ public static Builder builder() {

public static class Builder {
private final ImmutableList.Builder<Namespace> namespaces = ImmutableList.builder();
private String nextPageToken;

private Builder() {}

Expand All @@ -75,8 +85,13 @@ public Builder addAll(Collection<Namespace> toAdd) {
return this;
}

public Builder nextPageToken(String pageToken) {
nextPageToken = pageToken;
return this;
}

public ListNamespacesResponse build() {
return new ListNamespacesResponse(namespaces.build());
return new ListNamespacesResponse(namespaces.build(), nextPageToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
public class ListTablesResponse implements RESTResponse {

private List<TableIdentifier> identifiers;
private String nextPageToken;

public ListTablesResponse() {
// Required for Jackson deserialization
}

private ListTablesResponse(List<TableIdentifier> identifiers) {
private ListTablesResponse(List<TableIdentifier> identifiers, String nextPageToken) {
this.identifiers = identifiers;
this.nextPageToken = nextPageToken;
validate();
}

Expand All @@ -49,9 +51,16 @@ public List<TableIdentifier> identifiers() {
return identifiers != null ? identifiers : ImmutableList.of();
}

public String nextPageToken() {
return nextPageToken;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("identifiers", identifiers).toString();
return MoreObjects.toStringHelper(this)
.add("identifiers", identifiers)
.add("next-page-token", nextPageToken())
.toString();
}

public static Builder builder() {
Expand All @@ -60,6 +69,7 @@ public static Builder builder() {

public static class Builder {
private final ImmutableList.Builder<TableIdentifier> identifiers = ImmutableList.builder();
private String nextPageToken;

private Builder() {}

Expand All @@ -76,8 +86,13 @@ public Builder addAll(Collection<TableIdentifier> toAdd) {
return this;
}

public Builder nextPageToken(String pageToken) {
nextPageToken = pageToken;
return this;
}

public ListTablesResponse build() {
return new ListTablesResponse(identifiers.build());
return new ListTablesResponse(identifiers.build(), nextPageToken);
}
}
}
32 changes: 29 additions & 3 deletions core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,17 @@ public <T extends RESTResponse> T handleRequest(
ns = Namespace.empty();
}

return castResponse(responseType, CatalogHandlers.listNamespaces(asNamespaceCatalog, ns));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);

if (pageSize != null) {
return castResponse(
responseType,
CatalogHandlers.listNamespaces(asNamespaceCatalog, ns, pageToken, pageSize));
} else {
return castResponse(
responseType, CatalogHandlers.listNamespaces(asNamespaceCatalog, ns));
}
}
break;

Expand Down Expand Up @@ -339,7 +349,14 @@ public <T extends RESTResponse> T handleRequest(
case LIST_TABLES:
{
Namespace namespace = namespaceFromPathVars(vars);
return castResponse(responseType, CatalogHandlers.listTables(catalog, namespace));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);
if (pageSize != null) {
return castResponse(
responseType, CatalogHandlers.listTables(catalog, namespace, pageToken, pageSize));
} else {
return castResponse(responseType, CatalogHandlers.listTables(catalog, namespace));
}
}

case CREATE_TABLE:
Expand Down Expand Up @@ -412,7 +429,16 @@ public <T extends RESTResponse> T handleRequest(
{
if (null != asViewCatalog) {
Namespace namespace = namespaceFromPathVars(vars);
return castResponse(responseType, CatalogHandlers.listViews(asViewCatalog, namespace));
String pageToken = PropertyUtil.propertyAsString(vars, "pageToken", null);
String pageSize = PropertyUtil.propertyAsString(vars, "pageSize", null);
if (pageSize != null) {
return castResponse(
responseType,
CatalogHandlers.listViews(asViewCatalog, namespace, pageToken, pageSize));
} else {
return castResponse(
responseType, CatalogHandlers.listViews(asViewCatalog, namespace));
}
}
break;
}
Expand Down
Loading

0 comments on commit db3377c

Please sign in to comment.