This commit is contained in:
Daniel Mills
2021-07-31 19:41:12 -04:00
parent 16de60f82d
commit ee18217520
9 changed files with 257 additions and 58 deletions

View File

@@ -21,34 +21,71 @@ package com.volmit.iris.engine.parallel;
import com.volmit.iris.Iris;
import com.volmit.iris.core.IrisSettings;
import com.volmit.iris.util.collection.KList;
import com.volmit.iris.util.math.M;
import com.volmit.iris.util.scheduling.J;
import com.volmit.iris.util.scheduling.Looper;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class MultiBurst {
public static final MultiBurst burst = new MultiBurst("Iris", IrisSettings.get().getConcurrency().getMiscThreadPriority(), IrisSettings.getThreadCount(IrisSettings.get().getConcurrency().getMiscThreadCount()));
private final ExecutorService service;
private ExecutorService syncService;
private ExecutorService service;
private final Looper heartbeat;
private final AtomicLong last;
private int tid;
private final String name;
private final int tc;
private final int priority;
public MultiBurst(int tc) {
this("Iris", 6, tc);
}
public MultiBurst(String name, int priority, int tc) {
service = Executors.newFixedThreadPool(Math.max(tc, 1), r -> {
tid++;
Thread t = new Thread(r);
t.setName(name + " " + tid);
t.setPriority(priority);
t.setUncaughtExceptionHandler((et, e) ->
{
Iris.info("Exception encountered in " + et.getName());
e.printStackTrace();
});
this.name = name;
this.priority = priority;
this.tc = tc;
last = new AtomicLong(M.ms());
heartbeat = new Looper() {
@Override
protected long loop() {
if(M.ms() - last.get() > TimeUnit.MINUTES.toMillis(1) && service != null)
{
service.shutdown();
service = null;
Iris.debug("Shutting down MultiBurst Pool " + getName() + " to conserve resource.");
}
return t;
});
return 60000;
}
};
heartbeat.setName(name);
heartbeat.start();
}
private synchronized ExecutorService getService()
{
last.set(M.ms());
if(service == null || service.isShutdown())
{
service = Executors.newFixedThreadPool(Math.max(tc, 1), r -> {
tid++;
Thread t = new Thread(r);
t.setName(name + " " + tid);
t.setPriority(priority);
t.setUncaughtExceptionHandler((et, e) ->
{
Iris.info("Exception encountered in " + et.getName());
e.printStackTrace();
});
return t;
});
Iris.debug("Started MultiBurst Pool " + name + " with " + tc + " threads at " + priority + " priority.");
}
return service;
}
public void burst(Runnable... r) {
@@ -66,7 +103,7 @@ public class MultiBurst {
}
public BurstExecutor burst(int estimate) {
return new BurstExecutor(service, estimate);
return new BurstExecutor(getService(), estimate);
}
public BurstExecutor burst() {
@@ -74,43 +111,72 @@ public class MultiBurst {
}
public <T> Future<T> lazySubmit(Callable<T> o) {
return service.submit(o);
return getService().submit(o);
}
public void lazy(Runnable o) {
service.execute(o);
getService().execute(o);
}
public Future<?> future(Runnable o) {
return service.submit(o);
return getService().submit(o);
}
public CompletableFuture<?> complete(Runnable o) {
return CompletableFuture.runAsync(o, service);
return CompletableFuture.runAsync(o, getService());
}
public void shutdownNow() {
service.shutdownNow().forEach(Runnable::run);
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
if(service != null)
{
service.shutdownNow().forEach(Runnable::run);
}
}
public void shutdown() {
service.shutdown();
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
if(service != null)
{
service.shutdown();
}
}
public void shutdownLater() {
J.a(service::shutdown, 100);
if(service != null)
{
service.submit(() -> {
J.sleep(3000);
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
if(service != null)
{
service.shutdown();
}
});
heartbeat.interrupt();
}
}
public void shutdownAndAwait() {
service.shutdown();
try {
while (!service.awaitTermination(10, TimeUnit.SECONDS)) {
Iris.info("Still waiting to shutdown burster...");
Iris.debug("Shutting down MultiBurst Pool " + heartbeat.getName() + ".");
heartbeat.interrupt();
if(service != null)
{
service.shutdown();
try {
while (!service.awaitTermination(10, TimeUnit.SECONDS)) {
Iris.info("Still waiting to shutdown burster...");
}
} catch (Throwable e) {
e.printStackTrace();
Iris.reportError(e);
}
} catch (Throwable e) {
e.printStackTrace();
Iris.reportError(e);
}
}
}