Skip to content

Commit

Permalink
refactor(plugin): enhance error handling when file do not exist anymore
Browse files Browse the repository at this point in the history
Resolves: #150
Resolves: #151
  • Loading branch information
fhussonnois committed Jul 31, 2021
1 parent 370a1d6 commit d8aac2b
Show file tree
Hide file tree
Showing 20 changed files with 505 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ public interface FileInputIterator<T> extends Iterator<RecordsIterable<T>>, Auto
* Checks whether this iterator is already close.
* @return {@code true} if this iterator is close.
*/
boolean isClose();
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DelegateFileInputIterator implements FileInputIterator<FileRecord<T
private static final Logger LOG = LoggerFactory.getLogger(DelegateFileInputIterator.class);

private final URI objectURI;
private final FileObjectKey key;
private final FileInputReader reader;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

Expand All @@ -45,18 +46,22 @@ public class DelegateFileInputIterator implements FileInputIterator<FileRecord<T
/**
* Creates a new {@link DelegateFileInputIterator} instance.
*
* @param objectURI the input source file.
* @param key the object file key.
* @param objectURI the object file URI.
* @param reader the input source reader used to create a new {@link FileInputIterator}.
*/
DelegateFileInputIterator(final URI objectURI, final FileInputReader reader) {
this.objectURI = Objects.requireNonNull(objectURI, "source can't be null");
this.reader = Objects.requireNonNull(reader, "reader can't be null");
DelegateFileInputIterator(final FileObjectKey key,
final URI objectURI,
final FileInputReader reader) {
this.key = Objects.requireNonNull(key, "'key' should not be null");
this.objectURI = Objects.requireNonNull(objectURI, "'objectURI' can't be null");
this.reader = Objects.requireNonNull(reader, "'reader' can't be null");
}

/**
* Gets the metadata of the backed object file.
*
* @return the {@link FileObjectMeta}
* @return the {@link FileObjectMeta}
*/
public FileObjectMeta getMetadata() {
return reader.getObjectMetadata(objectURI);
Expand All @@ -65,7 +70,7 @@ public FileObjectMeta getMetadata() {
/**
* Gets the URI of the backed object file.
*
* @return the {@link URI}
* @return the {@link URI}
*/
public URI getObjectURI() {
return objectURI;
Expand All @@ -84,17 +89,20 @@ public void open() {
* @return {@code true} if an iterator is already opened.
*/
boolean isOpen() {
return iterator != null && !iterator.isClose();
return iterator != null && !iterator.isClosed();
}

/**
*
* @return {@code true} if the backed object file can be read and is accessible.
*/
boolean isValid() {
return reader.canBeRead(objectURI);
}

public FileObjectKey key() {
return key;
}

/**
* {@inheritDoc}
*/
Expand All @@ -103,7 +111,12 @@ public FileContext context() {
if (iterator == null) {
throw new IllegalStateException("Iterator is not initialized for URI: " + objectURI);
}
return iterator.context();
final FileContext context = iterator.context();
return new FileContext(
key,
context.metadata(),
context.offset()
);
}

/**
Expand Down Expand Up @@ -149,7 +162,7 @@ public void close() {
* {@inheritDoc}
*/
@Override
public boolean isClose() {
public boolean isClosed() {
return isClosed.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
/**
* Immutable class which is use to wrap contextual information about an input file.
*/
public class FileContext {
public final class FileContext {

private final FileObjectKey key;

private final FileObjectMeta metadata;

Expand All @@ -35,21 +37,32 @@ public class FileContext {
* @param metadata the source metadata.
*/
public FileContext(final FileObjectMeta metadata) {
this(metadata, FileObjectOffset.empty());
this(null, metadata);
}

/**
* Creates a new {@link FileContext} instance.
*
* @param metadata the source metadata.
* @param offset teh source startPosition.
*/
public FileContext(final FileObjectMeta metadata,
public FileContext(final FileObjectKey key, final FileObjectMeta metadata) {
this(key, metadata, FileObjectOffset.empty());
}


/**
* Creates a new {@link FileContext} instance.
*
* @param key the object file's key.
* @param metadata the object file's metadata.
* @param offset the object file's startPosition.
*/
public FileContext(final FileObjectKey key,
final FileObjectMeta metadata,
final FileObjectOffset offset) {
Objects.requireNonNull(metadata, "metadata can't be null");
Objects.requireNonNull(offset, "startPosition can't be null");
this.metadata = metadata;
this.offset = offset;
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
this.offset = Objects.requireNonNull(offset, "startPosition can't be null");
this.key = key;
}

/**
Expand All @@ -69,9 +82,18 @@ public FileObjectMeta metadata() {
public FileObjectOffset offset() {
return offset;
}


/**
* Returns the partition string identifier for this file.
*
* @return the partition.
*/
public FileObjectKey key() {
return key;
}

public FileContext withOffset(final FileObjectOffset offset) {
return new FileContext(metadata, offset);
return new FileContext(key, metadata, offset);
}

/**
Expand All @@ -82,7 +104,8 @@ public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FileContext)) return false;
FileContext that = (FileContext) o;
return Objects.equals(metadata, that.metadata) &&
return Objects.equals(key, that.key) &&
Objects.equals(metadata, that.metadata) &&
Objects.equals(offset, that.offset);
}

Expand All @@ -91,14 +114,15 @@ public boolean equals(Object o) {
*/
@Override
public int hashCode() {
return Objects.hash(metadata, offset);
return Objects.hash(key, metadata, offset);
}

@Override
public String toString() {
return "[" +
"metadata=" + metadata +
", offset=" + offset +
"partition=" + key +
", metadata=" + metadata +
", offset=" + offset +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,46 @@
package io.streamthoughts.kafka.connect.filepulse.source;

import com.jsoniter.annotation.JsonCreator;
import com.jsoniter.annotation.JsonIgnore;
import com.jsoniter.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;

/**
* A {@code FileObject} describes an input file being processing by the connector and its current state.
*/
public class FileObject implements Serializable {
public final class FileObject implements Serializable {

@JsonIgnore
private final FileObjectKey key;
private final FileObjectMeta metadata;
private final FileObjectOffset offset;
private final FileObjectStatus status;

/**
* Creates a new {@link FileObject} instance.
*
* @param source the source file metadata.
* @param offset the source file offset.
* @param status the source status.
* @param metadata the object file's metadata.
* @param offset the object file's offset.
* @param status the object file's status.
*/
@JsonCreator
public FileObject(@JsonProperty("metadata") final FileObjectMeta source,
public FileObject(@JsonProperty("metadata") final FileObjectMeta metadata,
@JsonProperty("offset") final FileObjectOffset offset,
@JsonProperty("status") final FileObjectStatus status) {
Objects.requireNonNull(source, "source can't be null");
Objects.requireNonNull(offset, "offset can't be null");
Objects.requireNonNull(status, "status can't be null");
this.metadata = source;
this.offset = offset;
this.status = status;
this(metadata, offset, status, null);
}

public FileObject(final FileObjectMeta metadata,
final FileObjectOffset offset,
final FileObjectStatus status,
final FileObjectKey key) {
this.metadata = Objects.requireNonNull(metadata, "metadata can't be null");
this.offset = Objects.requireNonNull(offset, "offset can't be null");
this.status = Objects.requireNonNull(status, "status can't be null");
this.key = key;
}

public FileObjectMeta metadata() {
Expand All @@ -64,10 +73,18 @@ public FileObjectStatus status() {
return status;
}

public Optional<FileObjectKey> key() {
return Optional.ofNullable(key);
}

public FileObject withStatus(final FileObjectStatus status) {
return new FileObject(metadata, offset, status);
}

public FileObject withKey(final FileObjectKey key) {
return new FileObject(metadata, offset, status, key);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021 StreamThoughts.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamthoughts.kafka.connect.filepulse.source;

import java.util.Objects;

/**
* A {@code FileObjectKey} represents a unique ID used for uniquely identifying an object file.
*
* @see FileObject
* @see FileObjectMeta
* @see SourceOffsetPolicy
*/
public final class FileObjectKey {

private final String key;

private final int hashCode;

public static FileObjectKey of(final String s) {
return new FileObjectKey(s);
}

/**
* Creates a new {@link FileObjectKey} instance.
*
* @param id the id.
*/
public FileObjectKey(final String id) {
this.key = Objects.requireNonNull(id, "'id' should not be null");
this.hashCode = id.hashCode();
}

public String original() {
return key;
}

/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FileObjectKey)) return false;
FileObjectKey that = (FileObjectKey) o;
return Objects.equals(key, that.key);
}

/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
return hashCode;
}

/**
* {@inheritDoc}
*/
@Override
public String toString() {
return key;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void close() {
* {@inheritDoc}
*/
@Override
public boolean isClose() {
public boolean isClosed() {
return isClosed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void addOpenIterator(final FileInputIterator<?> iterator) {
* @param iterator an iterator to remove.
*/
void removeIterator(final FileInputIterator<?> iterator) {
if (iterator.isClose()) {
if (iterator.isClosed()) {
openIterators.remove(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public FileContext context() {
*/
@Override
public void close() {
if (!isClose()) {
if (!isClosed()) {
iteratorManager.removeIterator(this);
closed.set(true);
}
Expand All @@ -73,7 +73,7 @@ public void close() {
* {@inheritDoc}
*/
@Override
public boolean isClose() {
public boolean isClosed() {
return closed.get();
}
}
Loading

0 comments on commit d8aac2b

Please sign in to comment.