Skip to content

Commit

Permalink
TIKA-4096 -- improve configurability of the JDBCPipesReporter (#1225)
Browse files Browse the repository at this point in the history
  • Loading branch information
tballison committed Jul 5, 2023
1 parent 3ef545d commit b69ec37
Showing 1 changed file with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@
public class JDBCPipesReporter extends PipesReporterBase implements Initializable {

private static final Logger LOG = LoggerFactory.getLogger(JDBCPipesReporter.class);
private static final int CACHE_SIZE = 100;
private static final int DEFAULT_CACHE_SIZE = 100;
private static final long DEFAULT_REPORT_WITHIN_MS = 10000;
private static final int ARRAY_BLOCKING_QUEUE_SIZE = 1000;

public static final String TABLE_NAME = "tika_status";

private static final long MAX_WAIT_MILLIS = 120000;

private long reportWithinMs = DEFAULT_REPORT_WITHIN_MS;

private int cacheSize = DEFAULT_CACHE_SIZE;

private String connectionString;

private Optional<String> postConnectionString = Optional.empty();
Expand All @@ -73,7 +78,8 @@ public void initialize(Map<String, Param> params) throws TikaConfigException {
if (StringUtils.isBlank(connectionString)) {
throw new TikaConfigException("Must specify a connectionString");
}
ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, queue);
ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString,
queue, cacheSize, reportWithinMs);
reportWorker.init();
reportWorkerFuture = CompletableFuture.runAsync(reportWorker);
}
Expand All @@ -90,6 +96,37 @@ public void setConnection(String connection) {
this.connectionString = connection;
}

/**
* Commit the reports if the cache is greater than or equal to this size.
* <p/>
* Default is {@link JDBCPipesReporter#DEFAULT_CACHE_SIZE}.
* <p/>
* The reports will be committed if the cache size
* triggers reporting or if the amount of time since
* last reported ({@link JDBCPipesReporter#reportWithinMs}) triggers reporting.
* @param cacheSize
*/
@Field
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}


/**
* Commit the reports if the amount of time elapsed since the last report commit
* exceeds this value.
* <p/>
* Default is {@link JDBCPipesReporter#DEFAULT_REPORT_WITHIN_MS}.
* <p/>
* The reports will be committed if the cache size triggers reporting or if the amount of
* time since last reported triggers reporting.
* @param reportWithinMs
*/
@Field
public void setReportWithinMs(long reportWithinMs) {
this.reportWithinMs = reportWithinMs;
}

/**
* This sql will be called immediately after the connection is made. This was
* initially added for setting pragmas on sqlite3, but may be used for other
Expand Down Expand Up @@ -172,16 +209,23 @@ private static class ReportWorker implements Runnable {
private final String connectionString;
private final Optional<String> postConnectionString;
private final ArrayBlockingQueue<KeyStatusPair> queue;
private final int cacheSize;
private final long reportWithinMs;

List<KeyStatusPair> cache = new ArrayList<>();
private Connection connection;
private PreparedStatement insert;


public ReportWorker(String connectionString,
Optional<String> postConnectionString,
ArrayBlockingQueue<KeyStatusPair> queue) {
ArrayBlockingQueue<KeyStatusPair> queue, int cacheSize,
long reportWithinMs) {
this.connectionString = connectionString;
this.postConnectionString = postConnectionString;
this.queue = queue;
this.cacheSize = cacheSize;
this.reportWithinMs = reportWithinMs;
}

public void init() throws TikaConfigException {
Expand All @@ -196,6 +240,7 @@ public void init() throws TikaConfigException {

@Override
public void run() {
long lastReported = System.currentTimeMillis();
while (true) {
//blocking
KeyStatusPair p = null;
Expand All @@ -209,17 +254,19 @@ public void run() {
return;
}
cache.add(p);
if (cache.size() >= CACHE_SIZE) {
long elapsed = System.currentTimeMillis() - lastReported;

if (cache.size() >= cacheSize || elapsed > reportWithinMs) {
try {
reportNow();
lastReported = System.currentTimeMillis();
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
return;
}
}
}

}

private void shutdownNow() {
Expand Down

0 comments on commit b69ec37

Please sign in to comment.