Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add namespace, before, and after to search endpoint #2556

Merged
merged 9 commits into from
Sep 14, 2023
40 changes: 20 additions & 20 deletions api/src/main/java/marquez/api/SearchResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@
package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static marquez.common.Utils.toLocateDateOrNull;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.List;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Pattern;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand All @@ -33,6 +36,7 @@
@Slf4j
@Path("/api/v1/search")
public class SearchResource {
private static final String YYYY_MM_DD = "^\\d{4}-\\d{2}-\\d{2}$";
private static final String DEFAULT_SORT = "name";
private static final String DEFAULT_LIMIT = "10";
private static final int MIN_LIMIT = 0;
Expand All @@ -49,25 +53,23 @@ public SearchResource(@NonNull final SearchDao searchDao) {
@GET
@Produces(APPLICATION_JSON)
public Response search(
@QueryParam("q") @NotNull String query,
@QueryParam("q") @NotBlank String query,
@QueryParam("filter") @Nullable SearchFilter filter,
@QueryParam("sort") @DefaultValue(DEFAULT_SORT) SearchSort sort,
@QueryParam("limit") @DefaultValue(DEFAULT_LIMIT) @Min(MIN_LIMIT) int limit) {
return Response.ok(
isQueryBlank(query)
? SearchResults.EMPTY
: searchWithNonBlankQuery(query, filter, sort, limit))
.build();
}

private static boolean isQueryBlank(@NonNull String query) {
return query.trim().isEmpty();
}

private SearchResults searchWithNonBlankQuery(
String query, SearchFilter filter, SearchSort sort, int limit) {
final List<SearchResult> results = searchDao.search(query, filter, sort, limit);
return new SearchResults(results);
@QueryParam("limit") @DefaultValue(DEFAULT_LIMIT) @Min(MIN_LIMIT) int limit,
@QueryParam("namespace") @Nullable String namespace,
@QueryParam("before") @Valid @Pattern(regexp = YYYY_MM_DD) @Nullable String before,
@QueryParam("after") @Valid @Pattern(regexp = YYYY_MM_DD) @Nullable String after) {
Comment on lines +61 to +62
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wslulciuc Since App Wizard doesn't expose Instant out of the box, would it be a possibility to define a custom parameter Instant converter and register it up at the MarquezApp.java? Or would that be overkill for the problem we're solving here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we'll continue to use the approach taken in our runsAPI:

  Response markRunAs(@NonNull RunState runState, @QueryParam("at") String atAsIso) {
    runService.markRunAs(runId, runState, Utils.toInstant(atAsIso));
    return getRun();
  }

But, agree a more long term solution would be to define a custom query param.

final List<SearchResult> searchResults =
searchDao.search(
query,
filter,
sort,
limit,
namespace,
toLocateDateOrNull(before),
toLocateDateOrNull(after));
return Response.ok(new SearchResults(searchResults)).build();
}

/** Wrapper for {@link SearchResult}s which also contains a {@code total count}. */
Expand All @@ -81,7 +83,5 @@ public SearchResults(@NonNull final List<SearchResult> results) {
this.totalCount = results.size();
this.results = results;
}

static final SearchResults EMPTY = new SearchResults(List.of());
}
}
6 changes: 6 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
Expand Down Expand Up @@ -105,6 +107,10 @@ public static void addZonedDateTimeMixin(ObjectMapper mapper) {
@JsonDeserialize(using = FlexibleDateTimeDeserializer.class)
static final class ZonedDateTimeMixin {}

public static LocalDate toLocateDateOrNull(@Nullable final String timeAsString) {
return Optional.ofNullable(timeAsString).map(LocalDate::parse).orElse(null);
}

public static String toJson(@NonNull final Object value) {
try {
return MAPPER.writeValueAsString(value);
Expand Down
81 changes: 62 additions & 19 deletions api/src/main/java/marquez/db/SearchDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,45 @@

package marquez.db;

import java.time.LocalDate;
import java.util.List;
import javax.annotation.Nullable;
import marquez.api.models.SearchFilter;
import marquez.api.models.SearchResult;
import marquez.api.models.SearchSort;
import marquez.db.mappers.SearchResultMapper;
import org.jdbi.v3.sqlobject.SqlObject;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

/** The DAO for {@link SearchResult}. */
@RegisterRowMapper(SearchResultMapper.class)
public interface SearchDao {
public interface SearchDao extends SqlObject {

default List<SearchResult> search(String query, SearchFilter filter, SearchSort sort, int limit) {
return search(query, filter, sort, limit, null, null, null);
}

default List<SearchResult> search(
String query, SearchFilter filter, SearchSort sort, int limit, String namespace) {
return search(query, filter, sort, limit, namespace, null, null);
}

default List<SearchResult> search(
String query, SearchFilter filter, SearchSort sort, int limit, LocalDate before) {
return search(query, filter, sort, limit, null, before, null);
}

default List<SearchResult> search(
String query,
SearchFilter filter,
SearchSort sort,
int limit,
LocalDate before,
LocalDate after) {
return search(query, filter, sort, limit, null, before, after);
}

/**
* Returns all datasets and jobs that match the provided query; matching of datasets and jobs are
* string based and case-insensitive.
Expand All @@ -24,26 +52,41 @@ public interface SearchDao {
* @param filter The filter to apply to the query result.
* @param sort The sort to apply to the query result.
* @param limit The limit to apply to the query result.
* @param namespace Match jobs or datasets within the given namespace.
* @param before Match jobs or datasets before YYYY-MM-DD.
* @param after Match jobs or datasets after YYYY-MM-DD.
* @return A {@link SearchResult} object.
*/
@SqlQuery(
"""
SELECT type, name, updated_at, namespace_name
FROM (
SELECT 'DATASET' AS type, d.name, d.updated_at, d.namespace_name
FROM datasets_view AS d
WHERE d.name ilike '%' || :query || '%'
UNION
SELECT DISTINCT ON (j.namespace_name, j.name)\s
'JOB' AS type, j.name, j.updated_at, j.namespace_name
FROM (SELECT namespace_name, name, unnest(COALESCE(aliases, Array[NULL]::varchar[])) AS alias, updated_at\s
FROM jobs_view WHERE symlink_target_uuid IS NULL
ORDER BY updated_at DESC) AS j
WHERE j.name ilike '%' || :query || '%'
OR j.alias ilike '%' || :query || '%'
) AS results
WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL
ORDER BY :sort
LIMIT :limit""")
List<SearchResult> search(String query, SearchFilter filter, SearchSort sort, int limit);
SELECT type, name, updated_at, namespace_name
FROM (
SELECT 'DATASET' AS type, d.name, d.updated_at, d.namespace_name
FROM datasets_view AS d
WHERE (d.namespace_name = :namespace OR CAST(:namespace AS TEXT) IS NULL)
AND (d.updated_at < :before OR CAST(:before AS TEXT) IS NULL)
AND (d.updated_at > :after OR CAST(:after AS TEXT) IS NULL)
AND (d.name ILIKE '%' || :query || '%')
UNION
SELECT DISTINCT ON (j.namespace_name, j.name)
'JOB' AS type, j.name, j.updated_at, j.namespace_name
FROM (SELECT namespace_name, name, UNNEST(COALESCE(aliases, Array[NULL]::varchar[])) AS alias, updated_at
FROM jobs_view WHERE symlink_target_uuid IS NULL
ORDER BY updated_at DESC) AS j
WHERE (j.namespace_name = :namespace OR CAST(:namespace AS TEXT) IS NULL)
AND (j.updated_at < :before OR CAST(:before AS TEXT) IS NULL)
AND (j.updated_at > :after OR CAST(:after AS TEXT) IS NULL)
AND (j.name ILIKE '%' || :query || '%' OR j.alias ILIKE '%' || :query || '%')
) AS results
WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL
ORDER BY :sort
LIMIT :limit""")
List<SearchResult> search(
String query,
SearchFilter filter,
SearchSort sort,
int limit,
@Nullable String namespace,
@Nullable LocalDate before,
@Nullable LocalDate after);
}
58 changes: 58 additions & 0 deletions api/src/test/java/marquez/db/SearchDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.net.URL;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -36,6 +39,10 @@
@Tag("DataAccessTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class SearchDaoTest {
private static final LocalDate BEFORE =
LocalDate.ofInstant(Instant.now().plus(1, ChronoUnit.DAYS), ZoneOffset.systemDefault());
private static final LocalDate AFTER =
LocalDate.ofInstant(Instant.now().minus(1, ChronoUnit.DAYS), ZoneOffset.systemDefault());

static final int LIMIT = 25;
static final int NUM_OF_JOBS = 2;
Expand All @@ -62,6 +69,11 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException {
DbTestUtils.newDataset(jdbi, "time_ordering_1");
DbTestUtils.newDataset(jdbi, "time_ordering_2");

DbTestUtils.newDataset(jdbi, "namespace1", "datasetA");
DbTestUtils.newDataset(jdbi, "namespace1", "datasetB");
DbTestUtils.newDataset(jdbi, "namespace111", "excludeMe");
DbTestUtils.newDataset(jdbi, "namespace2", "datasetC");

ImmutableSet<JobRow> jobRows = DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);

// add a symlinked job - validate that the number of results is the same in the below unit test
Expand Down Expand Up @@ -107,6 +119,52 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException {
});
}

@Test
public void testSearch_filterByNamespace() {
final String query = "dataset";
final String namespace = "namespace1";
final List<SearchResult> resultsWithSort =
searchDao.search(query, SearchFilter.DATASET, SearchSort.UPDATE_AT, LIMIT, namespace);

// Ensure sorted search results contain N datasets.
assertThat(resultsWithSort).hasSize(2);
assertThat(resultsWithSort).extracting("name").contains("datasetA", "datasetB");
}

@Test
public void testSearch_filterByNamespaceAndAfter() {
final String query = "dataset";
final String namespace = "namespace2";
final List<SearchResult> resultsWithSort =
searchDao.search(
query, SearchFilter.DATASET, SearchSort.UPDATE_AT, LIMIT, namespace, null, AFTER);

// Ensure sorted search results contain N datasets.
assertThat(resultsWithSort).hasSize(1);
assertThat(resultsWithSort).extracting("name").contains("datasetC");
}

@Test
public void testSearch_filterByNamespaceBeforeFuture() {
final String query = "dataset";
final List<SearchResult> resultsWithSort =
searchDao.search(query, SearchFilter.DATASET, SearchSort.UPDATE_AT, LIMIT, BEFORE);

// Ensure sorted search results contain N datasets.
assertThat(resultsWithSort).hasSize(15);
assertThat(resultsWithSort).extracting("name").contains("datasetA", "datasetB");
}

@Test
public void testSearch_filterByNamespaceBeforePast() {
final String query = "dataset";
final List<SearchResult> resultsWithSort =
searchDao.search(query, SearchFilter.DATASET, SearchSort.UPDATE_AT, LIMIT, AFTER);

// Ensure sorted search results contain N datasets.
assertThat(resultsWithSort).hasSize(0);
}

@Test
public void testSearch(Jdbi jdbi) {
jdbi.withHandle(
Expand Down
31 changes: 29 additions & 2 deletions spec/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,33 @@ paths:
- $ref: '#/components/parameters/filter'
- $ref: '#/components/parameters/sort'
- $ref: '#/components/parameters/limit'
- name: namespace
in: query
description: Match jobs or datasets within the given namespace.
required: false
schema:
type: string
maxLength: 1024
example: my-namespace
- before:
name: before
in: query
description: Match jobs or datasets **before** `YYYY-MM-DD`.
required: false
schema:
type: string
pattern: YYYY-MM-DD
example: "2022-09-15"
- after:
name: after
in: query
description: Match jobs or datasets **after** `YYYY-MM-DD`.
required: false
schema:
type: string
pattern: YYYY-MM-DD
example: "2022-09-15"

summary: Query all datasets and jobs
description: Returns one or more datasets and jobs of your query.
tags:
Expand Down Expand Up @@ -773,7 +800,7 @@ components:
limit:
name: limit
in: query
description: The number of results to return from offset
description: The number of results to return from offset.
required: false
schema:
type: integer
Expand All @@ -783,7 +810,7 @@ components:
offset:
name: offset
in: query
description: The initial position from which to return results
description: The initial position from which to return results.
required: false
schema:
type: integer
Expand Down
Loading