From b69ec3772edaa90553794cdcc01db516b7b129fd Mon Sep 17 00:00:00 2001 From: Tim Allison Date: Wed, 5 Jul 2023 16:25:34 -0400 Subject: [PATCH] TIKA-4096 -- improve configurability of the JDBCPipesReporter (#1225) --- .../reporters/jdbc/JDBCPipesReporter.java | 57 +++++++++++++++++-- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java index febb9cc267..0082eb9def 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java @@ -53,13 +53,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializable { private static final Logger LOG = LoggerFactory.getLogger(JDBCPipesReporter.class); - private static final int CACHE_SIZE = 100; + private static final int DEFAULT_CACHE_SIZE = 100; + private static final long DEFAULT_REPORT_WITHIN_MS = 10000; private static final int ARRAY_BLOCKING_QUEUE_SIZE = 1000; public static final String TABLE_NAME = "tika_status"; private static final long MAX_WAIT_MILLIS = 120000; + private long reportWithinMs = DEFAULT_REPORT_WITHIN_MS; + + private int cacheSize = DEFAULT_CACHE_SIZE; + private String connectionString; private Optional postConnectionString = Optional.empty(); @@ -73,7 +78,8 @@ public void initialize(Map params) throws TikaConfigException { if (StringUtils.isBlank(connectionString)) { throw new TikaConfigException("Must specify a connectionString"); } - ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, queue); + ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, + queue, cacheSize, reportWithinMs); reportWorker.init(); reportWorkerFuture = CompletableFuture.runAsync(reportWorker); } @@ -90,6 +96,37 @@ public void setConnection(String connection) { this.connectionString = connection; } + /** + * Commit the reports if the cache is greater than or equal to this size. + *

+ * Default is {@link JDBCPipesReporter#DEFAULT_CACHE_SIZE}. + *

+ * The reports will be committed if the cache size + * triggers reporting or if the amount of time since + * last reported ({@link JDBCPipesReporter#reportWithinMs}) triggers reporting. + * @param cacheSize + */ + @Field + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } + + + /** + * Commit the reports if the amount of time elapsed since the last report commit + * exceeds this value. + *

+ * Default is {@link JDBCPipesReporter#DEFAULT_REPORT_WITHIN_MS}. + *

+ * The reports will be committed if the cache size triggers reporting or if the amount of + * time since last reported triggers reporting. + * @param reportWithinMs + */ + @Field + public void setReportWithinMs(long reportWithinMs) { + this.reportWithinMs = reportWithinMs; + } + /** * This sql will be called immediately after the connection is made. This was * initially added for setting pragmas on sqlite3, but may be used for other @@ -172,16 +209,23 @@ private static class ReportWorker implements Runnable { private final String connectionString; private final Optional postConnectionString; private final ArrayBlockingQueue queue; + private final int cacheSize; + private final long reportWithinMs; + List cache = new ArrayList<>(); private Connection connection; private PreparedStatement insert; + public ReportWorker(String connectionString, Optional postConnectionString, - ArrayBlockingQueue queue) { + ArrayBlockingQueue queue, int cacheSize, + long reportWithinMs) { this.connectionString = connectionString; this.postConnectionString = postConnectionString; this.queue = queue; + this.cacheSize = cacheSize; + this.reportWithinMs = reportWithinMs; } public void init() throws TikaConfigException { @@ -196,6 +240,7 @@ public void init() throws TikaConfigException { @Override public void run() { + long lastReported = System.currentTimeMillis(); while (true) { //blocking KeyStatusPair p = null; @@ -209,9 +254,12 @@ public void run() { return; } cache.add(p); - if (cache.size() >= CACHE_SIZE) { + long elapsed = System.currentTimeMillis() - lastReported; + + if (cache.size() >= cacheSize || elapsed > reportWithinMs) { try { reportNow(); + lastReported = System.currentTimeMillis(); } catch (SQLException e) { throw new RuntimeException(e); } catch (InterruptedException e) { @@ -219,7 +267,6 @@ public void run() { } } } - } private void shutdownNow() {