Skip to content

Commit

Permalink
Add TimeWheel to sync service policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
hexiaofeng committed May 17, 2024
1 parent 028a790 commit 850080f
Show file tree
Hide file tree
Showing 9 changed files with 1,042 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.thread;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A factory class for creating named threads or thread pools. This class implements the {@link ThreadFactory} interface,
* allowing for customization of thread creation. Threads or thread pools created using this factory can be easily identified
* by their names, which is particularly useful for debugging or management purposes.
*/
public class NamedThreadFactory implements ThreadFactory {

/**
* A global counter for all thread pools created by instances of this class. This counter ensures that each thread pool
* has a unique identifier.
*/
protected static final AtomicInteger POOL_COUNTER = new AtomicInteger();

/**
* A counter for the threads created by this particular instance of {@code NamedThreadFactory}. This helps in assigning
* a unique identifier to each thread within the pool.
*/
protected final AtomicInteger threadCounter = new AtomicInteger(0);

/**
* The thread group to which threads created by this factory will belong. Using a thread group allows for easier management
* of threads, such as setting a maximum priority for all threads in the group.
*/
protected final ThreadGroup group;

/**
* The prefix for the names of threads created by this factory. This prefix is used to easily identify threads belonging
* to a particular pool or purpose.
*/
protected final String namePrefix;

/**
* Indicates whether threads created by this factory should be daemon threads. Daemon threads are terminated by the JVM
* when all non-daemon threads finish execution. Setting this to true is useful for background tasks that should not prevent
* the application from exiting.
*/
protected final boolean isDaemon;

/**
* Constructs a new {@code NamedThreadFactory} instance with the specified prefix for thread names.
*
* @param prefix The prefix to be used in the names of threads created by this factory.
*/
public NamedThreadFactory(String prefix) {
this(null, prefix, true);
}

/**
* Constructs a new {@code NamedThreadFactory} instance with the specified prefix for thread names and daemon status.
*
* @param prefix The prefix to be used in the names of threads created by this factory.
* @param daemon Indicates whether the threads created by this factory should be daemon threads.
*/
public NamedThreadFactory(String prefix, boolean daemon) {
this(null, prefix, daemon);
}

/**
* Constructs a new {@code NamedThreadFactory} instance with the specified thread group, prefix for thread names, and
* daemon status.
*
* @param group The thread group to which threads created by this factory will belong. If {@code null}, the factory
* uses the current thread's {@link ThreadGroup}.
* @param prefix The prefix to be used in the names of threads created by this factory.
* @param daemon Indicates whether the threads created by this factory should be daemon threads.
*/
public NamedThreadFactory(ThreadGroup group, String prefix, boolean daemon) {
this.group = group == null ? Thread.currentThread().getThreadGroup() : group;
namePrefix = prefix + "-" + POOL_COUNTER.getAndIncrement() + "-T-";
isDaemon = daemon;
}

/**
* Creates a new {@link Thread} with the specified {@link Runnable} task and settings defined by this factory. The thread
* will belong to the thread group, have the name prefix, and be a daemon thread as specified during the factory's
* construction.
*
* @param r The {@link Runnable} task to be executed by the new thread.
* @return A new {@link Thread} configured according to this factory's settings.
*/
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadCounter.incrementAndGet(), 0);
t.setDaemon(isDaemon);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

/**
* Represents a task that is scheduled to be executed after a certain delay. This interface extends {@link TimeTask},
* inheriting its methods for retrieving the task's name and its scheduled execution time. The execution time in the context
* of a {@code DelayTask} is understood to be the time at which the delay period ends and the task is eligible for execution.
*/
public interface DelayTask extends TimeTask {

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

/**
* A {@code DelegateTask} is a concrete implementation of the {@code TimeTask} interface, encapsulating a task
* with a specific execution time and a name. It delegates the execution to the {@code Runnable} provided at
* construction time. This class can be used to wrap a {@code Runnable} with additional timing and identification
* properties, making it suitable for scheduled execution in a timing framework.
*/
public class DelegateTask implements TimeTask {
/**
* The name of the task, used for identification purposes.
*/
private final String name;

/**
* The scheduled execution time for the task, represented as a timestamp.
*/
private final long time;

/**
* The {@code Runnable} containing the code to be executed when the task runs.
*/
private final Runnable runnable;

/**
* Constructs a new {@code DelegateTask} with the specified name, time, and executable code.
*
* @param name The name of the task.
* @param time The execution time of the task as a timestamp.
* @param runnable The {@code Runnable} to be executed.
*/
public DelegateTask(final String name, final long time, final Runnable runnable) {
this.name = name;
this.time = time;
this.runnable = runnable;
}

@Override
public String getName() {
return name;
}

@Override
public long getTime() {
return time;
}

@Override
public void run() {
if (runnable != null) {
runnable.run();
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Represents a time slot within a timing wheel or similar scheduling system.
* A time slot is responsible for holding and managing {@link TimeWork} tasks that are scheduled to be executed
* at a specific expiration time. This class provides mechanisms to add, remove, and flush tasks when their
* execution time has arrived.
*/
public class TimeSlot implements Delayed {

/**
* Indicates that the task is being added at the head of the list within the slot.
*/
public static final int HEAD = 1;

/**
* Indicates that the task is being added at the tail of the list within the slot.
*/
public static final int TAIL = 2;

/**
* The expiration time of the time slot. Tasks within this slot are due to be executed when the current
* time surpasses this expiration time.
*/
protected long expiration = -1L;

/**
* The root node of a doubly linked list that holds the tasks. This is a sentinel node to simplify the
* add and remove operations by eliminating the need to check for null.
*/
private final TimeWork root = new TimeWork("root", -1L, null, null, null);

/**
* Constructs a new {@code TimeSlot} instance. Initializes the doubly linked list with the root node
* pointing to itself, indicating an empty list.
*/
public TimeSlot() {
root.pre = root;
root.next = root;
}

/**
* Adds a new {@link TimeWork} task to this time slot and sets a new expiration time for the slot if necessary.
*
* @param timeWork The task to be added.
* @param expire The new expiration time for the slot.
* @return An integer indicating whether the task was added at the head ({@link #HEAD}) or tail ({@link #TAIL}) of the list.
*/
protected int add(final TimeWork timeWork, final long expire) {
timeWork.timeSlot = this;
TimeWork tail = root.pre;
timeWork.next = root;
timeWork.pre = tail;
tail.next = timeWork;
root.pre = timeWork;
if (expiration == -1L) {
expiration = expire;
return HEAD;
}
return TAIL;
}

/**
* Removes a {@link TimeWork} task from this time slot. This operation effectively detaches the task from
* the doubly linked list within the slot.
*
* @param timeWork The task to be removed.
*/
protected void remove(final TimeWork timeWork) {
timeWork.next.pre = timeWork.pre;
timeWork.pre.next = timeWork.next;
timeWork.timeSlot = null;
timeWork.next = null;
timeWork.pre = null;
}

/**
* Flushes this time slot by executing all tasks within it. This method is called when the time slot has
* expired. Each task is removed from the slot and then passed to the provided consumer for execution.
*
* @param consumer A {@link Consumer} that takes a {@link TimeWork} task and executes it.
*/
protected void flush(final Consumer<TimeWork> consumer) {
List<TimeWork> ts = new LinkedList<>();
TimeWork timeWork = root.next;
while (timeWork != root) {
remove(timeWork);
ts.add(timeWork);
timeWork = root.next;
}
expiration = -1L;
ts.forEach(consumer);
}

@Override
public long getDelay(final TimeUnit unit) {
long delayMs = expiration - System.currentTimeMillis();
return Math.max(0, unit.convert(delayMs, TimeUnit.MILLISECONDS));
}

@Override
public int compareTo(final Delayed o) {
return o instanceof TimeSlot ? Long.compare(expiration, ((TimeSlot) o).expiration) : 0;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.core.util.time;

/**
* Represents a task that can be executed at a specific time.
*/
public interface TimeTask extends Runnable {

/**
* Retrieves the name of the task.
*
* @return A {@code String} representing the name of the task.
*/
String getName();

/**
* Retrieves the scheduled execution time for the task. The time is expected to be a specific
* point in time, represented as a long value, such as a timestamp.
*
* @return A {@code long} value representing the scheduled execution time of the task.
*/
long getTime();
}

Loading

0 comments on commit 850080f

Please sign in to comment.