Skip to content

Commit

Permalink
Merge pull request #1697 from treasure-data/backport-api-enhance
Browse files Browse the repository at this point in the history
Backport API enhancement
  • Loading branch information
yoyama authored Feb 17, 2022
2 parents c3d4f23 + 2dcd284 commit 83136d6
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import org.skife.jdbi.v2.sqlobject.stringtemplate.UseStringTemplate3StatementLocator;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import javax.activation.DataSource;

import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
Expand All @@ -55,8 +53,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Locale.ENGLISH;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;

public class DatabaseProjectStoreManager
extends BasicDatabaseStoreManager<DatabaseProjectStoreManager.Dao>
Expand Down Expand Up @@ -127,23 +125,18 @@ public DatabaseProjectStore(int siteId)
this.siteId = siteId;
}

//public List<StoredProject> getAllProjects()
//{
// return dao.getProjects(siteId, Integer.MAX_VALUE, 0);
//}

@DigdagTimed(value = "dpst_", category = "db", appendMethodName = true)
@Override
public List<StoredProjectWithRevision> getProjectsWithLatestRevision(int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter)
public List<StoredProjectWithRevision> getProjectsWithLatestRevision(int pageSize, Optional<Integer> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter)
{
return autoCommit((handle, dao) -> dao.getProjectsWithLatestRevision(siteId, pageSize, lastId.or(0), acFilter.getSql()));
return autoCommit((handle, dao) -> dao.getProjectsWithLatestRevision(siteId, pageSize, lastId.or(0), generatePartialMatchPattern(namePattern), acFilter.getSql()));
}

@DigdagTimed(value = "dpst_", category = "db", appendMethodName = true)
@Override
public List<StoredProject> getProjects(int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter)
public List<StoredProject> getProjects(int pageSize, Optional<Integer> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter)
{
return autoCommit((handle, dao) -> dao.getProjects(siteId, pageSize, lastId.or(0), acFilter.getSql()));
return autoCommit((handle, dao) -> dao.getProjects(siteId, pageSize, lastId.or(0), generatePartialMatchPattern(namePattern), acFilter.getSql()));
}

@DigdagTimed(value = "dpst_", category = "db", appendMethodName = true)
Expand Down Expand Up @@ -301,10 +294,17 @@ public StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(int
public List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
int pageSize,
Optional<Long> lastId,
Optional<String> namePattern,
AccessController.ListFilter acFilter)
throws ResourceNotFoundException
{
return autoCommit((handle, dao) -> dao.getLatestActiveWorkflowDefinitions(siteId, pageSize, lastId.or(0L), acFilter.getSql()));
return autoCommit((handle, dao) -> dao.getLatestActiveWorkflowDefinitions(
siteId,
pageSize,
lastId.or(0L),
generatePartialMatchPattern(namePattern),
acFilter.getSql())
);
}

@DigdagTimed(value = "dpst_", category = "db", appendMethodName = true)
Expand Down Expand Up @@ -364,6 +364,19 @@ public TimeZoneMap getWorkflowTimeZonesByIdList(List<Long> defIdList)
Map<Long, ZoneId> map = IdTimeZone.listToMap(list);
return new TimeZoneMap(map);
}

private String generatePartialMatchPattern(Optional<String> pattern)
{
// If provided pattern is absent or empty string, just set '%'
// so that the pattern does not affect to a where clause.
return !pattern.or("").isEmpty() ? "%" + escapeLikePattern(pattern.get()) + "%" : "%";
}

private String escapeLikePattern(String pattern)
{
return pattern.replace("%", "\\%")
.replace("_", "\\_");
}
}

private static class IdTimeZone
Expand Down Expand Up @@ -567,11 +580,17 @@ public interface H2Dao
") a on a.id = rev.id" +
" where proj.site_id = :siteId" +
" and proj.name is not null" +
" and proj.name like :namePattern" +
" and <acFilter>" +
" and proj.id > :lastId" +
" order by proj.id asc" +
" limit :limit")
List<StoredProjectWithRevision> getProjectsWithLatestRevision(@Bind("siteId") int siteId, @Bind("limit") int limit, @Bind("lastId") int lastId, @Define("acFilter") String acFilter);
List<StoredProjectWithRevision> getProjectsWithLatestRevision(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") int lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);

// h2's MERGE doesn't return generated id when conflicting row already exists
@SqlUpdate("merge into projects" +
Expand Down Expand Up @@ -599,13 +618,20 @@ public interface H2Dao
" join projects proj on a.project_id = proj.id" +
" join workflow_configs wc on wc.id = wd.config_id" +
" where wd.id \\> :lastId" +
// `workflow_definitions` table has a composite index
// for `revision_id` and `name` (`workflow_definitions_on_revision_id_and_name`).
// And the index is used for filter by `revision_id` and `name`.
// Since this query always limits the records by `revision_id` (the latest revision's one),
// partial matching of `name` (e.g. '%test%') can be accepted.
" and wd.name like :namePattern" +
" and <acFilter>" +
" order by wd.id" +
" limit :limit")
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") long lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);
}

Expand All @@ -621,13 +647,19 @@ public interface PgDao
" join revisions rev on proj.id = rev.project_id" +
" where proj.site_id = :siteId" +
" and proj.name is not null" +
" and proj.name like :namePattern" +
" and <acFilter>" +
" and proj.id > :lastId" +
") as projects_with_revision" +
" where projects_with_revision.revision_id = projects_with_revision.max_revision_id" +
" order by id asc" +
" limit :limit")
List<StoredProjectWithRevision> getProjectsWithLatestRevision(@Bind("siteId") int siteId, @Bind("limit") int limit, @Bind("lastId") int lastId, @Define("acFilter") String acFilter);
List<StoredProjectWithRevision> getProjectsWithLatestRevision(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") int lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);

@SqlQuery("insert into projects" +
" (site_id, name, created_at)" +
Expand Down Expand Up @@ -657,6 +689,12 @@ public interface PgDao
" group by r.project_id" +
" )) " +
" and wf.id \\> :lastId" +
// `workflow_definitions` table has a composite index
// for `revision_id` and `name` (`workflow_definitions_on_revision_id_and_name`).
// And the index is used for filter by `revision_id` and `name`.
// Since this query always limits the records by `revision_id` (the latest revision's one),
// partial matching of `name` (e.g. '%test%') can be accepted.
" and wf.name like :namePattern" +
" and <acFilter>" +
" order by wf.id" +
" limit :limit" +
Expand All @@ -669,6 +707,7 @@ List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") long lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);
}

Expand All @@ -678,16 +717,23 @@ public interface Dao
" where proj.site_id = :siteId" +
" and proj.name is not null" +
" and proj.id \\> :lastId" +
" and proj.name like :namePattern" +
" and <acFilter>" +
" order by proj.id asc" +
" limit :limit")
List<StoredProject> getProjects(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") int lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);

List<StoredProjectWithRevision> getProjectsWithLatestRevision(@Bind("siteId") int siteId, @Bind("limit") int limit, @Bind("lastId") int lastId, @Define("acFilter") String acFilter);
List<StoredProjectWithRevision> getProjectsWithLatestRevision(
@Bind("siteId") int siteId,
@Bind("limit") int limit,
@Bind("lastId") int lastId,
@Bind("namePattern") String namePattern,
@Define("acFilter") String acFilter);

@SqlUpdate("update projects" +
" set deleted_name = name, deleted_at = now(), name = NULL" +
Expand Down Expand Up @@ -774,7 +820,7 @@ List<StoredProject> getProjects(
" limit 1")
StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(@Bind("siteId") int siteId, @Bind("projId") int projId, @Bind("name") String name);

List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int siteId, int limit, long lastId, String acFilter);
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int siteId, int limit, long lastId, String namePattern, String acFilter);

// getWorkflowDetailsById is same with getWorkflowDetailsByIdInternal
// excepting site_id check
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package io.digdag.core.repository;

import java.util.List;
import java.util.Map;
import java.time.ZoneId;

import com.google.common.base.Optional;
import io.digdag.spi.ac.AccessController;

import java.util.List;

public interface ProjectStore
{
List<StoredProject> getProjects(int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter);
List<StoredProject> getProjects(int pageSize, Optional<Integer> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter);

List<StoredProjectWithRevision> getProjectsWithLatestRevision(int pageSize, Optional<Integer> lastId, AccessController.ListFilter acFilter);
List<StoredProjectWithRevision> getProjectsWithLatestRevision(int pageSize, Optional<Integer> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter);

ProjectMap getProjectsByIdList(List<Integer> projIdList);

Expand Down Expand Up @@ -58,16 +56,16 @@ byte[] getRevisionArchiveData(int revId)
List<StoredWorkflowDefinition> getWorkflowDefinitions(int revId, int pageSize, Optional<Long> lastId, AccessController.ListFilter acFilter);

StoredWorkflowDefinition getWorkflowDefinitionByName(int revId, String name)
throws ResourceNotFoundException;
throws ResourceNotFoundException;

StoredWorkflowDefinitionWithProject getWorkflowDefinitionById(long wfId)
throws ResourceNotFoundException;
throws ResourceNotFoundException;

StoredWorkflowDefinitionWithProject getLatestWorkflowDefinitionByName(int projId, String name)
throws ResourceNotFoundException;
throws ResourceNotFoundException;

List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int pageSize, Optional<Long> lastId, AccessController.ListFilter acFilter)
throws ResourceNotFoundException;
List<StoredWorkflowDefinitionWithProject> getLatestActiveWorkflowDefinitions(int pageSize, Optional<Long> lastId, Optional<String> namePattern, AccessController.ListFilter acFilter)
throws ResourceNotFoundException;

TimeZoneMap getWorkflowTimeZonesByIdList(List<Long> defIdList);
}
Loading

0 comments on commit 83136d6

Please sign in to comment.