From 6eeb129017217752c662befc7071f3a38ef09f29 Mon Sep 17 00:00:00 2001 From: Robert von Burg Date: Mon, 19 Feb 2024 08:50:20 +0100 Subject: [PATCH] [New] Added a ExecutorPool which is accessible in the runtime for async operations Now the input listeners don't create their own executors and/or threads anymore, but use Future tasks to manage the input listener threads. Further the pulseAsync and blinkAsync methods now simply call context.submitTask() with a lambda which calls the relevant non-async blink or pulse methods. --- .../main/java/com/pi4j/context/Context.java | 133 +++++++++--------- .../com/pi4j/context/impl/DefaultContext.java | 11 +- .../src/main/java/com/pi4j/io/IOBase.java | 13 +- .../io/gpio/digital/DigitalOutputBase.java | 89 ++---------- .../main/java/com/pi4j/runtime/Runtime.java | 11 +- .../com/pi4j/runtime/impl/DefaultRuntime.java | 27 +++- .../main/java/com/pi4j/util/ExecutorPool.java | 87 ++++++++++++ .../gpio/digital/GpioDDigitalInput.java | 109 +++++++------- .../gpio/digital/GpioDDigitalOutput.java | 9 +- .../gpio/digital/LinuxFsDigitalInput.java | 86 +++++------ 10 files changed, 316 insertions(+), 259 deletions(-) create mode 100644 pi4j-core/src/main/java/com/pi4j/util/ExecutorPool.java diff --git a/pi4j-core/src/main/java/com/pi4j/context/Context.java b/pi4j-core/src/main/java/com/pi4j/context/Context.java index 5a1c27a2..c5078779 100644 --- a/pi4j-core/src/main/java/com/pi4j/context/Context.java +++ b/pi4j-core/src/main/java/com/pi4j/context/Context.java @@ -51,6 +51,7 @@ import com.pi4j.registry.Registry; import com.pi4j.util.PropertiesUtil; import com.pi4j.util.StringUtil; + import java.util.Map; import java.util.concurrent.Future; @@ -60,11 +61,8 @@ * @author Robert Savage (http://www.savagehomeautomation.com) * @version $Id: $Id */ -public interface Context extends Describable, - IOCreator, - ProviderProvider, - InitializedEventProducer, - ShutdownEventProducer { +public interface Context extends Describable, IOCreator, ProviderProvider, InitializedEventProducer, + ShutdownEventProducer { /** *

config.

@@ -72,24 +70,28 @@ public interface Context extends Describable, * @return a {@link com.pi4j.context.ContextConfig} object. */ ContextConfig config(); + /** *

properties.

* * @return a {@link com.pi4j.context.ContextProperties} object. */ ContextProperties properties(); + /** *

providers.

* * @return a {@link com.pi4j.provider.Providers} object. */ Providers providers(); + /** *

registry.

* * @return a {@link com.pi4j.registry.Registry} object. */ Registry registry(); + /** *

platforms.

* @@ -97,13 +99,22 @@ public interface Context extends Describable, */ Platforms platforms(); + /** + * Submits the given task for async execution + * + * @param task the task to execute asynchronously + * + * @return the task to cancel later + */ + Future submitTask(Runnable task); + /** *

shutdown.

* * @return a {@link com.pi4j.context.Context} object. * @throws com.pi4j.exception.ShutdownException if an error occurs during shutdown. */ - Context shutdown() throws ShutdownException; + Context shutdown() throws ShutdownException; /** * @@ -127,7 +138,7 @@ public interface Context extends Describable, * @param

a P object. * @return a P object. */ - default

P platform(){ + default

P platform() { return platforms().getDefault(); } @@ -137,7 +148,7 @@ default

P platform(){ * @param

a P object. * @return a P object. */ - default

P getPlatform(){ + default

P getPlatform() { return this.platform(); } @@ -147,7 +158,7 @@ default

P getPlatform(){ * @param

a P object. * @return a P object. */ - default

P getDefaultPlatform(){ + default

P getDefaultPlatform() { return this.platform(); } @@ -157,26 +168,26 @@ default

P getDefaultPlatform(){ * @param

a P object. * @return a P object. */ - default

P defaultPlatform(){ + default

P defaultPlatform() { return this.platform(); } /** *

platform.

* - * @param id Id of the platform. + * @param id Id of the platform. * @param

the platform type * @return a P object. * @throws PlatformNotFoundException if platform specified by {@code id} is not found. */ default

P platform(String id) throws PlatformNotFoundException { - return (P)this.platforms().get(id); + return (P) this.platforms().get(id); } /** *

platform.

* - * @param id Id of the platform. + * @param id Id of the platform. * @param

the platform type * @return a P object. * @throws PlatformNotFoundException if platform specified by {@code id} is not found. @@ -237,20 +248,20 @@ default boolean hasPlatform(Class platformClass) throws Plat /** {@inheritDoc} */ default T provider(String providerId) throws ProviderNotFoundException { - return (T)providers().get(providerId); + return (T) providers().get(providerId); } /** {@inheritDoc} */ - default T provider(String providerId, Class providerClass) throws ProviderNotFoundException { - return (T)providers().get(providerId); + default T provider(String providerId, Class providerClass) + throws ProviderNotFoundException { + return (T) providers().get(providerId); } /** {@inheritDoc} */ default boolean hasProvider(String providerId) { try { return providers().exists(providerId); - } - catch (Exception e){ + } catch (Exception e) { return false; } } @@ -266,13 +277,14 @@ default boolean hasProvider(Class providerClass) { } /** {@inheritDoc} */ - default T provider(Class providerClass) throws ProviderNotFoundException, ProviderInterfaceException { + default T provider(Class providerClass) + throws ProviderNotFoundException, ProviderInterfaceException { // return the default provider for this type from the default platform - if(platform() != null && platform().hasProvider(providerClass)) + if (platform() != null && platform().hasProvider(providerClass)) return platform().provider(providerClass); // return the default provider for this type (outside of default platform) - if(providers().exists(providerClass)) + if (providers().exists(providerClass)) return providers().get(providerClass); // provider not found @@ -282,28 +294,27 @@ default T provider(Class providerClass) throws ProviderN /** {@inheritDoc} */ default T provider(IOType ioType) throws ProviderNotFoundException { // return the default provider for this type from the default platform - if(platform() != null && platform().hasProvider(ioType)) + if (platform() != null && platform().hasProvider(ioType)) return platform().provider(ioType); // return the default provider for this type (outside of default platform) - if(providers().exists(ioType)) + if (providers().exists(ioType)) return providers().get(ioType); // provider not found throw new ProviderNotFoundException(ioType); } - // ------------------------------------------------------------------------ // I/O INSTANCE ACCESSOR/CREATOR METHODS // ------------------------------------------------------------------------ @Override - default I create(IOConfig config, IOType ioType) { + default I create(IOConfig config, IOType ioType) { // create by explicitly configured IO from IO config String platformId = config.platform(); - if(StringUtil.isNotNullOrEmpty(platformId)) { + if (StringUtil.isNotNullOrEmpty(platformId)) { // resolve the platform and use it to create the IO instance Platform platform = this.platforms().get(platformId); return platform.create(config, ioType); @@ -311,34 +322,33 @@ default I create(IOConfig config, IOType ioType) { // create by explicitly configured IO from IO config String providerId = config.provider(); - if(StringUtil.isNotNullOrEmpty(providerId)) { + if (StringUtil.isNotNullOrEmpty(providerId)) { // resolve the provider and use it to create the IO instance Provider provider = this.providers().get(providerId, ioType); - return (I)provider.create(config); + return (I) provider.create(config); } // get implicitly defined provider (defined by IO type) // (this is the default or platform defined provider for this particular IO type) - if(ioType != null) { + if (ioType != null) { // resolve the provider and use it to create the IO instance Provider provider = this.provider(ioType); - return (I)provider.create(config); + return (I) provider.create(config); } // unable to resolve the IO type and thus unable to create I/O instance - throw new IOException("This IO instance [" + config.id() + - "] could not be created because it does not define one of the following: 'PLATFORM', 'PROVIDER', or 'I/O TYPE'."); + throw new IOException("This IO instance [" + config.id() + "] could not be created because it does not define one of the following: 'PLATFORM', 'PROVIDER', or 'I/O TYPE'."); } @Override - default T create(String id) { + default T create(String id) { Provider provider = null; // resolve inheritable properties from the context based on the provided 'id' for this IO instance - Map inheritedProperties = PropertiesUtil.subProperties(this.properties().all(), id); + Map inheritedProperties = PropertiesUtil.subProperties(this.properties().all(), id); // create by explicitly configured IO from IO inheritable properties - if(inheritedProperties.containsKey("platform")){ + if (inheritedProperties.containsKey("platform")) { // resolve the platform and use it to create the IO instance String platformId = inheritedProperties.get("platform"); Platform platform = this.platforms().get(platformId); @@ -346,7 +356,7 @@ default T create(String id) { } // create by explicitly configured IO from IO config - if(inheritedProperties.containsKey("provider")){ + if (inheritedProperties.containsKey("provider")) { String providerId = inheritedProperties.get("provider"); // resolve the provider and use it to create the IO instance provider = this.providers().get(providerId); @@ -354,13 +364,13 @@ default T create(String id) { // create by IO TYPE // (use platform provider if one if available for this IO type) - if(provider == null && inheritedProperties.containsKey("type")){ + if (provider == null && inheritedProperties.containsKey("type")) { IOType ioType = IOType.parse(inheritedProperties.get("type")); provider = provider(ioType); } // validate resolved provider - if(provider == null) { + if (provider == null) { // unable to resolve the IO type and thus unable to create I/O instance throw new IOException("This IO instance [" + id + "] could not be created because it does not define one of the following: 'PLATFORM', 'PROVIDER', or 'I/O TYPE'."); @@ -370,18 +380,18 @@ default T create(String id) { ConfigBuilder builder = provider.type().newConfigBuilder(this); builder.id(id); builder.load(inheritedProperties); - return (T)provider.create((Config) builder.build()); + return (T) provider.create((Config) builder.build()); } @Override - default T create(String id, IOType ioType) { + default T create(String id, IOType ioType) { Provider provider = null; // resolve inheritable properties from the context based on the provided 'id' for this IO instance - Map inheritedProperties = PropertiesUtil.subProperties(this.properties().all(), id); + Map inheritedProperties = PropertiesUtil.subProperties(this.properties().all(), id); // create by explicitly configured IO from IO inheritable properties - if(inheritedProperties.containsKey("platform")){ + if (inheritedProperties.containsKey("platform")) { // resolve the platform and use it to create the IO instance String platformId = inheritedProperties.get("platform"); Platform platform = this.platforms().get(platformId); @@ -389,13 +399,13 @@ default T create(String id, IOType ioType) { } // create by explicitly configured IO from IO config - if(inheritedProperties.containsKey("provider")){ + if (inheritedProperties.containsKey("provider")) { String providerId = inheritedProperties.get("provider"); // resolve the provider and use it to create the IO instance provider = this.providers().get(providerId, ioType); // validate IO type from resolved provider - if(!ioType.isType(provider.type())){ + if (!ioType.isType(provider.type())) { throw new IOException("This IO instance [" + id + "] could not be created because the resolved provider [" + providerId + "] does not match the required I/O TYPE [" + ioType.name() + "]"); @@ -407,7 +417,7 @@ default T create(String id, IOType ioType) { provider = provider(ioType); // validate resolved provider - if(provider == null) { + if (provider == null) { throw new ProviderNotFoundException(ioType); } @@ -415,20 +425,20 @@ default T create(String id, IOType ioType) { ConfigBuilder builder = provider.type().newConfigBuilder(this); builder.id(id); builder.load(inheritedProperties); - return (T)provider.create((Config) builder.build()); - } - - /** - * shutdown and unregister a created IO. - * - * @param the IO Type - * @param id the IO id - * @return the IO which was shutdown or null if it wasn't created/registered - * @throws IONotFoundException if the IO was not registered - * @throws IOInvalidIDException if the ID is invalid - * @throws IOShutdownException if an error occured while shuting down the IO - */ - T shutdown(String id) throws IOInvalidIDException, IONotFoundException, IOShutdownException; + return (T) provider.create((Config) builder.build()); + } + + /** + * shutdown and unregister a created IO. + * + * @param the IO Type + * @param id the IO id + * @return the IO which was shutdown or null if it wasn't created/registered + * @throws IONotFoundException if the IO was not registered + * @throws IOInvalidIDException if the ID is invalid + * @throws IOShutdownException if an error occured while shuting down the IO + */ + T shutdown(String id) throws IOInvalidIDException, IONotFoundException, IOShutdownException; // ------------------------------------------------------------------------ // I/O INSTANCE ACCESSORS @@ -464,10 +474,7 @@ default T getIO(String id, Class ioClass) throws IOInvalidIDEx * @return a {@link com.pi4j.common.Descriptor} object. */ default Descriptor describe() { - Descriptor descriptor = Descriptor.create() - .category("CONTEXT") - .name("Runtime Context") - .type(this.getClass()); + Descriptor descriptor = Descriptor.create().category("CONTEXT").name("Runtime Context").type(this.getClass()); descriptor.add(registry().describe()); descriptor.add(platforms().describe()); diff --git a/pi4j-core/src/main/java/com/pi4j/context/impl/DefaultContext.java b/pi4j-core/src/main/java/com/pi4j/context/impl/DefaultContext.java index 2c5423b5..af74d71f 100644 --- a/pi4j-core/src/main/java/com/pi4j/context/impl/DefaultContext.java +++ b/pi4j-core/src/main/java/com/pi4j/context/impl/DefaultContext.java @@ -28,8 +28,6 @@ import com.pi4j.context.Context; import com.pi4j.context.ContextConfig; import com.pi4j.context.ContextProperties; -import com.pi4j.context.impl.DefaultContext; -import com.pi4j.context.impl.DefaultContextProperties; import com.pi4j.event.InitializedListener; import com.pi4j.event.ShutdownListener; import com.pi4j.exception.LifecycleException; @@ -43,10 +41,11 @@ import com.pi4j.registry.impl.DefaultRegistry; import com.pi4j.runtime.Runtime; import com.pi4j.runtime.impl.DefaultRuntime; -import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Future; + /** *

DefaultContext class.

* @@ -129,6 +128,12 @@ public ContextProperties properties() { @Override public Platforms platforms() { return this.platforms; } + /** {@inheritDoc} */ + @Override + public Future submitTask(Runnable task) { + return this.runtime.submitTask(task); + } + /** {@inheritDoc} */ @Override public Context shutdown() throws ShutdownException { diff --git a/pi4j-core/src/main/java/com/pi4j/io/IOBase.java b/pi4j-core/src/main/java/com/pi4j/io/IOBase.java index ff833aa7..b8e1181e 100644 --- a/pi4j-core/src/main/java/com/pi4j/io/IOBase.java +++ b/pi4j-core/src/main/java/com/pi4j/io/IOBase.java @@ -42,8 +42,9 @@ public abstract class IOBase extends IdentityBase implements IO { - protected CONFIG_TYPE config = null; - protected PROVIDER_TYPE provider = null; + protected CONFIG_TYPE config; + protected PROVIDER_TYPE provider; + private Context context; /** {@inheritDoc} */ @Override @@ -83,17 +84,21 @@ public CONFIG_TYPE config(){ return this.config; } + protected Context context() { + return this.context; + } /** {@inheritDoc} */ @Override public IO_TYPE initialize(Context context) throws InitializeException { - return (IO_TYPE)this; + this.context = context; + return (IO_TYPE) this; } /** {@inheritDoc} */ @Override public IO_TYPE shutdown(Context context) throws ShutdownException { - return (IO_TYPE)this; + return (IO_TYPE) this; } /** {@inheritDoc} */ diff --git a/pi4j-core/src/main/java/com/pi4j/io/gpio/digital/DigitalOutputBase.java b/pi4j-core/src/main/java/com/pi4j/io/gpio/digital/DigitalOutputBase.java index 20e418e1..1fce9aeb 100644 --- a/pi4j-core/src/main/java/com/pi4j/io/gpio/digital/DigitalOutputBase.java +++ b/pi4j-core/src/main/java/com/pi4j/io/gpio/digital/DigitalOutputBase.java @@ -31,10 +31,7 @@ import com.pi4j.io.exception.IOException; import java.util.concurrent.Callable; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -80,7 +77,7 @@ public DigitalOutput state(DigitalState state) throws IOException { if(!this.state.equals(state)){ this.state = state; - this.dispatch(new DigitalStateChangeEvent(this, this.state)); + this.dispatch(new DigitalStateChangeEvent<>(this, this.state)); } return this; } @@ -122,40 +119,10 @@ public DigitalOutput pulse(int interval, TimeUnit unit, DigitalState state, Call /** {@inheritDoc} */ @Override public Future pulseAsync(int interval, TimeUnit unit, DigitalState state, Callable callback) { - - long millis = validateArguments(interval, unit); - - this.state(state); - - ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(); - Future scheduledFuture = ses.schedule(new Runnable() { - public void run() { - ses.shutdown(); - - // toggle the pulse state - toggle(); - - // invoke callback if one was defined - if (callback != null) { - try { - logger.info("Calling callback from non-blocking pulse() method"); - callback.call(); - } - catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - } - } - , millis - , TimeUnit.MILLISECONDS - ); - - - return scheduledFuture; + validateArguments(interval, unit); + return context().submitTask(() -> pulse(interval, unit, state, callback)); } - /** {@inheritDoc} */ /** * This method will blink an output pin of the RPi according the given specifications. * The pin itself is created while creating a DigitalOutput configuration where one of @@ -237,11 +204,10 @@ public DigitalOutput blink(int delay, int duration, TimeUnit unit, DigitalState return this; } - /** {@inheritDoc} */ /** - * This method is exactly the same as the blink() method, except that this method is non-blocking. + * This method is exactly the same as the blink() method, except that this method is non-blocking and returns a {@link Future} with which the action can be cancelled, or it can be detected if the task is complete *

- * Pls. see the {@link #blink(int, int, java.util.concurrent.TimeUnit, com.pi4j.io.gpio.digital.DigitalState, java.util.concurrent.Callable) blink()} + * See the {@link #blink(int, int, java.util.concurrent.TimeUnit, com.pi4j.io.gpio.digital.DigitalState, java.util.concurrent.Callable) blink()} * method for a more detailed explanation on how the method works. * * @param delay The toggle time. @@ -253,41 +219,8 @@ public DigitalOutput blink(int delay, int duration, TimeUnit unit, DigitalState */ @Override public Future blinkAsync(int delay, int duration, TimeUnit unit, DigitalState state, Callable callback) { - - long millis = validateArguments(delay, duration, unit); - - this.state(state); - - ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(); - Future scheduledFuture = ses.scheduleAtFixedRate(new Runnable() { - int count = 0; - - public void run() { - count++; - if (count >= (duration * 2)) { - ses.shutdown(); - if (callback != null) { - try { - logger.info("Calling callback from non-blocking blink() method"); - callback.call(); - } - catch (Exception e) { - logger.error(e.getMessage(), e); - } - } - } - else { - // toggle the pulse state - toggle(); - } - } - } - , millis - , millis - , TimeUnit.MILLISECONDS - ); - - return scheduledFuture; + validateArguments(delay, duration, unit); + return context().submitTask(() -> blink(delay, duration, unit, state, callback)); } /** {@inheritDoc} */ @@ -299,7 +232,7 @@ public DigitalState state() { /** {@inheritDoc} */ @Override public DigitalOutput shutdown(Context context) throws ShutdownException { - // set pin state to shutdown state if a shutdown state is configured + // set pin state to the shutdown state if a shutdown state is configured if(config().shutdownState() != null && config().shutdownState() != DigitalState.UNKNOWN){ try { state(config().shutdownState()); @@ -400,18 +333,14 @@ private long validateArguments(int interval, int duration, TimeUnit unit) { * @return Number of milliseconds. */ private long validateTimeUnit(int interval, TimeUnit unit) { - long millis = 0; - + long millis; switch (unit) { case NANOSECONDS: throw new IllegalArgumentException("TimeUnit.NANOSECONDS is not supported."); - case MICROSECONDS: throw new IllegalArgumentException("TimeUnit.MICROSECONDS is not supported."); - case DAYS: throw new IllegalArgumentException("TimeUnit.DAYS is not supported."); - default: millis = unit.toMillis(interval); break; diff --git a/pi4j-core/src/main/java/com/pi4j/runtime/Runtime.java b/pi4j-core/src/main/java/com/pi4j/runtime/Runtime.java index 639e7ff3..acda7219 100644 --- a/pi4j-core/src/main/java/com/pi4j/runtime/Runtime.java +++ b/pi4j-core/src/main/java/com/pi4j/runtime/Runtime.java @@ -10,7 +10,7 @@ * This file is part of the Pi4J project. More information about * this project can be found here: https://pi4j.com/ * ********************************************************************** - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -49,24 +49,28 @@ public interface Runtime extends InitializedEventProducer, ShutdownEven * @return a {@link com.pi4j.registry.impl.RuntimeRegistry} object. */ RuntimeRegistry registry(); + /** *

providers.

* * @return a {@link com.pi4j.provider.impl.RuntimeProviders} object. */ RuntimeProviders providers(); + /** *

platforms.

* * @return a {@link com.pi4j.platform.impl.RuntimePlatforms} object. */ RuntimePlatforms platforms(); + /** *

properties.

* * @return a {@link com.pi4j.runtime.RuntimeProperties} object. */ RuntimeProperties properties(); + /** *

context.

* @@ -74,10 +78,13 @@ public interface Runtime extends InitializedEventProducer, ShutdownEven */ Context context(); + Future submitTask(Runnable task); + /** *

shutdown.

* * @return a {@link com.pi4j.runtime.Runtime} object. + * * @throws com.pi4j.exception.ShutdownException if any. */ Runtime shutdown() throws ShutdownException; @@ -85,7 +92,6 @@ public interface Runtime extends InitializedEventProducer, ShutdownEven Future asyncShutdown(); /** - * * @return Flag indicating if the runtime has been shutdown */ boolean isShutdown(); @@ -94,6 +100,7 @@ public interface Runtime extends InitializedEventProducer, ShutdownEven *

initialize.

* * @return a {@link com.pi4j.runtime.Runtime} object. + * * @throws com.pi4j.exception.InitializeException if any. */ Runtime initialize() throws InitializeException; diff --git a/pi4j-core/src/main/java/com/pi4j/runtime/impl/DefaultRuntime.java b/pi4j-core/src/main/java/com/pi4j/runtime/impl/DefaultRuntime.java index daee6059..bb4fa07a 100644 --- a/pi4j-core/src/main/java/com/pi4j/runtime/impl/DefaultRuntime.java +++ b/pi4j-core/src/main/java/com/pi4j/runtime/impl/DefaultRuntime.java @@ -44,13 +44,14 @@ import com.pi4j.registry.impl.RuntimeRegistry; import com.pi4j.runtime.Runtime; import com.pi4j.runtime.RuntimeProperties; +import com.pi4j.util.ExecutorPool; import com.pi4j.util.PropertiesUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -67,11 +68,12 @@ public class DefaultRuntime implements Runtime { private final RuntimeProviders providers; private final RuntimePlatforms platforms; private final RuntimeProperties properties; - private final List plugins = new ArrayList<>(); - private final ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "Pi4J.RUNTIME")); + private final List plugins; private boolean isShutdown = false; private final EventManager shutdownEventManager; private final EventManager initializedEventManager; + private final ExecutorPool executorPool; + private final ExecutorService runtimeExecutor; /** *

newInstance.

@@ -89,15 +91,21 @@ private DefaultRuntime(Context context) { // set local references this.context = context; + plugins = new ArrayList<>(); this.properties = DefaultRuntimeProperties.newInstance(context); this.registry = DefaultRuntimeRegistry.newInstance(this); this.providers = DefaultRuntimeProviders.newInstance(this); this.platforms = DefaultRuntimePlatforms.newInstance(this); + this.shutdownEventManager = new EventManager(this, (EventDelegate) (listener, event) -> listener.onShutdown(event)); this.initializedEventManager = new EventManager(this, (EventDelegate) (listener, event) -> listener.onInitialized(event)); + // initialize executor pool and runtime executor + this.executorPool = new ExecutorPool(); + this.runtimeExecutor = this.executorPool.getExecutor("Pi4J.RUNTIME"); + logger.debug("Pi4J runtime context successfully created & initialized.'"); // listen for shutdown to properly clean up @@ -108,7 +116,7 @@ private DefaultRuntime(Context context) { if (!isShutdown) shutdown(); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("Failed to shutdown Pi4J runtime", e); } }, "pi4j-shutdown")); } @@ -153,6 +161,11 @@ public RuntimeProperties properties() { return this.properties; } + @Override + public Future submitTask(Runnable task) { + return this.runtimeExecutor.submit(task); + } + /** * {@inheritDoc} */ @@ -170,6 +183,10 @@ public Runtime shutdown() throws ShutdownException { shutdownEventManager.dispatch(new ShutdownEvent(this.context), ShutdownListener::beforeShutdown); try { + + // shutdown executor pool + this.executorPool.destroy(); + // remove shutdown monitoring thread //java.lang.Runtime.getRuntime().removeShutdownHook(this.shutdownThread); @@ -208,7 +225,7 @@ public Runtime shutdown() throws ShutdownException { @Override public Future asyncShutdown() { - return executor.submit(() -> { + return CompletableFuture.supplyAsync(() -> { try { shutdown(); } catch (Exception e) { diff --git a/pi4j-core/src/main/java/com/pi4j/util/ExecutorPool.java b/pi4j-core/src/main/java/com/pi4j/util/ExecutorPool.java new file mode 100644 index 00000000..e1ea803e --- /dev/null +++ b/pi4j-core/src/main/java/com/pi4j/util/ExecutorPool.java @@ -0,0 +1,87 @@ +package com.pi4j.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.Executors.*; + +public class ExecutorPool { + + private static final Logger logger = LoggerFactory.getLogger(ExecutorPool.class); + + private final Map executors; + private final Map scheduledExecutors; + + public ExecutorPool() { + this.executors = new ConcurrentHashMap<>(); + this.scheduledExecutors = new ConcurrentHashMap<>(); + } + + public ExecutorService getExecutor(String poolName) { + if (poolName == null || poolName.isEmpty()) + throw new IllegalStateException("poolName must be set!"); + return this.executors.computeIfAbsent(poolName, p -> newCachedThreadPool(new NamedThreadPoolFactory(p))); + } + + public ExecutorService getSingleThreadExecutor(String poolName) { + if (poolName == null || poolName.isEmpty()) + throw new IllegalStateException("poolName must be set!"); + return this.executors.computeIfAbsent(poolName, p -> newSingleThreadExecutor(new NamedThreadPoolFactory(p))); + } + + public ScheduledExecutorService getScheduledExecutor(String poolName) { + if (poolName == null || poolName.isEmpty()) + throw new IllegalStateException("poolName must be set!"); + return this.scheduledExecutors.computeIfAbsent(poolName, + p -> newScheduledThreadPool(4, new NamedThreadPoolFactory(p))); + } + + public void destroy() { + this.executors.forEach(this::shutdownExecutor); + this.scheduledExecutors.forEach(this::shutdownExecutor); + } + + private void shutdownExecutor(String name, ExecutorService executor) { + logger.info("Shutting down executor pool " + name); + try { + List tasks = executor.shutdownNow(); + if (!tasks.isEmpty()) { + logger.warn("The following " + tasks.size() + " tasks were never started for executor " + name + " :"); + for (Runnable runnable : tasks) { + logger.warn(" " + runnable); + } + } + + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) + logger.error("Executor " + name + " did not stop after " + 5 + "s!"); + } catch (InterruptedException e) { + logger.error("Was interrupted while shutting down tasks"); + } + } + + private static class NamedThreadPoolFactory implements ThreadFactory { + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String poolName; + + public NamedThreadPoolFactory(String poolName) { + this.group = Thread.currentThread().getThreadGroup(); + this.poolName = poolName + "-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(this.group, r, this.poolName + this.threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } +} diff --git a/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalInput.java b/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalInput.java index baf13737..c07028be 100644 --- a/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalInput.java +++ b/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalInput.java @@ -11,9 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; /** *

PiGpioDigitalOutput class.

@@ -22,23 +20,23 @@ * @version $Id: $Id */ public class GpioDDigitalInput extends DigitalInputBase implements DigitalInput { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private static final Logger logger = LoggerFactory.getLogger(GpioDDigitalInput.class); private final long inputMaxWaitNs = 500 * 1000 * 1000; // 0,5 seconds - protected ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "Pi4J.GPIO.Monitor")); private final GpioLine line; private final long debounceNs; + private Future inputListener; /** *

Constructor for GpioDDigitalInput.

* - * @param line a {@link com.pi4j.library.gpiod.internal.GpioLine} object. + * @param line a {@link com.pi4j.library.gpiod.internal.GpioLine} object. * @param provider a {@link DigitalInputProvider} object. * @param config a {@link DigitalInputConfig} object. */ public GpioDDigitalInput(GpioLine line, DigitalInputProvider provider, DigitalInputConfig config) { super(provider, config); this.line = line; - if(config.getDebounce() == 0) { + if (config.getDebounce() == 0) { debounceNs = 0; } else { // Convert microseconds to nanoseconds @@ -54,73 +52,81 @@ public DigitalInput initialize(Context context) throws InitializeException { this.line.requestBothEdgeEventsFlags(config.getId(), GpioD.LINE_REQUEST_FLAG.BIAS_PULL_UP.getVal()); break; case PULL_DOWN: - this.line.requestBothEdgeEventsFlags(config.getId(), GpioD.LINE_REQUEST_FLAG.BIAS_PULL_DOWN.getVal()); + this.line.requestBothEdgeEventsFlags(config.getId(), + GpioD.LINE_REQUEST_FLAG.BIAS_PULL_DOWN.getVal()); break; case OFF: this.line.requestBothEdgeEventsFlags(config.getId(), GpioD.LINE_REQUEST_FLAG.BIAS_DISABLE.getVal()); break; } } catch (GpioDException e) { - logger.error(e.getMessage(), e); - throw new InitializeException(e); + throw new InitializeException("Failed to initialize input " + this.id, e); } super.initialize(context); - Runnable monitorThread = new Runnable() { - @Override - public void run() { - DigitalState lastState = null; - while (!Thread.interrupted()) { - long debounceNs = GpioDDigitalInput.this.debounceNs; - // We have to use this function before calling eventRead() directly, since native methods can't be interrupted. - // eventRead() is blocking and prevents thread interrupt while running - while (!GpioDDigitalInput.this.line.eventWait(inputMaxWaitNs)) { - if (Thread.interrupted()) { - return; - } + Runnable monitorThread = () -> { + DigitalState lastState = null; + while (!Thread.interrupted()) { + long debounceNs = GpioDDigitalInput.this.debounceNs; + // We have to use this function before calling eventRead() directly, since native methods can't be interrupted. + // eventRead() is blocking and prevents thread interrupt while running + while (!GpioDDigitalInput.this.line.eventWait(inputMaxWaitNs)) { + if (Thread.interrupted()) { + return; } - GpioLineEvent lastEvent = GpioDDigitalInput.this.line.eventRead(); - long currentTime = System.nanoTime(); - - // Perform debouncing - // If the event is too new to be sure that it is debounced then ... - while (lastEvent.getTimeNs() + debounceNs >= currentTime) { - if (Thread.interrupted()) { - return; - } - // ... wait for remaining debounce time and watch out for new event(s) - if(GpioDDigitalInput.this.line.eventWait(Math.min(inputMaxWaitNs, lastEvent.getTimeNs() + debounceNs - currentTime))) { - // Repeat if a second event occurred withing debounce interval - lastEvent = GpioDDigitalInput.this.line.eventRead(); - } + } + GpioLineEvent lastEvent = GpioDDigitalInput.this.line.eventRead(); + long currentTime = System.nanoTime(); - currentTime = System.nanoTime(); + // Perform debouncing + // If the event is too new to be sure that it is debounced then ... + while (lastEvent.getTimeNs() + debounceNs >= currentTime) { + if (Thread.interrupted()) { + return; } - - // Apply event only if the new state is not the same as the last state. - DigitalState newState = DigitalState.getState(lastEvent.getType() == GpioD.LINE_EVENT.RISING_EDGE); - if(lastState != newState) { - lastState = newState; - GpioDDigitalInput.this.dispatch(new DigitalStateChangeEvent(GpioDDigitalInput.this, newState)); + // ... wait for remaining debounce time and watch out for new event(s) + if (GpioDDigitalInput.this.line.eventWait( + Math.min(inputMaxWaitNs, lastEvent.getTimeNs() + debounceNs - currentTime))) { + // Repeat if a second event occurred withing debounce interval + lastEvent = GpioDDigitalInput.this.line.eventRead(); } + + currentTime = System.nanoTime(); + } + + // Apply event only if the new state is not the same as the last state. + DigitalState newState = DigitalState.getState(lastEvent.getType() == GpioD.LINE_EVENT.RISING_EDGE); + if (lastState != newState) { + lastState = newState; + GpioDDigitalInput.this.dispatch(new DigitalStateChangeEvent<>(GpioDDigitalInput.this, newState)); } } }; - executor.submit(monitorThread); + this.inputListener = context.submitTask(monitorThread); return this; } @Override public DigitalInput shutdown(Context context) throws ShutdownException { - try { - super.shutdown(context); - executor.shutdownNow(); - executor.awaitTermination(5, TimeUnit.SECONDS); - this.line.release(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + super.shutdown(context); + if (this.inputListener != null) { + if (!this.inputListener.cancel(true)) + logger.error("Failed to cancel input listener!"); + long start = System.currentTimeMillis(); + while (!inputListener.isDone()) { + if (System.currentTimeMillis() - start > 5000L) + throw new IllegalArgumentException("Input listener didn't cancel in 5s"); + try { + Thread.sleep(1L); + } catch (InterruptedException e) { + logger.warn("Interrupted, while waiting for input listener to stop"); + } + } + logger.info("Shutdown input listener for " + this.id); } + + this.line.release(); return this; } @@ -128,5 +134,4 @@ public DigitalInput shutdown(Context context) throws ShutdownException { public DigitalState state() { return DigitalState.getState(this.line.getValue()); } - } diff --git a/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalOutput.java b/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalOutput.java index f8953fc9..06441bb3 100644 --- a/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalOutput.java +++ b/plugins/pi4j-plugin-gpiod/src/main/java/com/pi4j/plugin/gpiod/provider/gpio/digital/GpioDDigitalOutput.java @@ -34,8 +34,6 @@ import com.pi4j.io.gpio.digital.*; import com.pi4j.library.gpiod.internal.GpioDException; import com.pi4j.library.gpiod.internal.GpioLine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** *

PiGpioDigitalOutput class.

@@ -44,7 +42,6 @@ * @version $Id: $Id */ public class GpioDDigitalOutput extends DigitalOutputBase implements DigitalOutput { - private Logger logger = LoggerFactory.getLogger(this.getClass()); private final GpioLine line; /** @@ -72,8 +69,7 @@ public DigitalOutput initialize(Context context) throws InitializeException { initialState = config.initialState().value().intValue(); this.line.requestOutput(config.getId(), initialState); } catch (GpioDException e) { - logger.error(e.getMessage(), e); - throw new InitializeException(e); + throw new InitializeException("Failed to initialize output " + this.id, e); } super.initialize(context); return this; @@ -94,8 +90,7 @@ public DigitalOutput state(DigitalState state) throws IOException { try { this.line.setValue(state.value().intValue()); } catch (GpioDException e) { - logger.error(e.getMessage(), e); - throw new IOException(e.getMessage(), e); + throw new IOException("Failed to set state for output " + this.id + " to " + state, e); } return super.state(state); } diff --git a/plugins/pi4j-plugin-linuxfs/src/main/java/com/pi4j/plugin/linuxfs/provider/gpio/digital/LinuxFsDigitalInput.java b/plugins/pi4j-plugin-linuxfs/src/main/java/com/pi4j/plugin/linuxfs/provider/gpio/digital/LinuxFsDigitalInput.java index 8f6e4125..0444c296 100644 --- a/plugins/pi4j-plugin-linuxfs/src/main/java/com/pi4j/plugin/linuxfs/provider/gpio/digital/LinuxFsDigitalInput.java +++ b/plugins/pi4j-plugin-linuxfs/src/main/java/com/pi4j/plugin/linuxfs/provider/gpio/digital/LinuxFsDigitalInput.java @@ -37,8 +37,7 @@ import org.slf4j.LoggerFactory; import java.nio.file.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** *

LinuxFsDigitalInput class.

@@ -50,8 +49,8 @@ public class LinuxFsDigitalInput extends DigitalInputBase implements DigitalInpu protected final LinuxGpio gpio; private Logger logger = LoggerFactory.getLogger(this.getClass()); - protected ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "Pi4J.GPIO.Monitor")); protected DigitalState state = DigitalState.UNKNOWN; + private Future inputListener; /** *

Constructor for LinuxFsDigitalInput.

@@ -123,50 +122,48 @@ else if(this.config.pull() == PullResistance.OFF){ // [MONITOR] start background monitoring thread for GPIO state changes logger.trace("start monitoring thread for GPIO [" + this.config.address() + "]; " + gpio.getPinPath()); - executor.submit(monitorThread); + Runnable monitorTask = () -> { + try { + // create file system watcher + logger.trace("monitoring thread watching GPIO [" + LinuxFsDigitalInput.this.config.address() + "]; " + gpio.getPinPath()); + WatchService watchService = FileSystems.getDefault().newWatchService(); + WatchKey key; + + // create GPIO path to monitor + Path path = Paths.get(gpio.getPinPath()); + + // only watch for modified files in this path + path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); + + // dispatch value change event + while ((key = watchService.take()) != null) { + for (WatchEvent event : key.pollEvents()) { + if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { + if (event.context().toString().equalsIgnoreCase("value")) { + // filter out any redundant event notifications for same state + DigitalState newState = LinuxFsDigitalInput.this.gpio.state(); + if (newState != LinuxFsDigitalInput.this.state) { + LinuxFsDigitalInput.this.state = newState; + LinuxFsDigitalInput.this.dispatch( + new DigitalStateChangeEvent(LinuxFsDigitalInput.this, newState)); + } + } + } + } + key.reset(); + } + } catch (java.io.IOException e) { + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + // thread interrupted; likely exiting on shutdown + } + }; + this.inputListener = context.submitTask(monitorTask); // return this I/O instance return this; } - Runnable monitorThread = new Runnable() { - public void run() { - try { - // create file system watcher - logger.trace("monitoring thread watching GPIO [" + LinuxFsDigitalInput.this.config.address() + "]; " + gpio.getPinPath()); - WatchService watchService = FileSystems.getDefault().newWatchService(); - WatchKey key; - - // create GPIO path to monitor - Path path = Paths.get(gpio.getPinPath()); - - // only watch for modified files in this path - path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY); - - // dispatch value change event - while ((key = watchService.take()) != null) { - for (WatchEvent event : key.pollEvents()) { - if(event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { - if (event.context().toString().equalsIgnoreCase("value")) { - // filter out any redundant event notifications for same state - DigitalState newState = LinuxFsDigitalInput.this.gpio.state(); - if(newState != LinuxFsDigitalInput.this.state) { - LinuxFsDigitalInput.this.state = newState; - LinuxFsDigitalInput.this.dispatch(new DigitalStateChangeEvent(LinuxFsDigitalInput.this, newState)); - } - } - } - } - key.reset(); - } - } catch (java.io.IOException e) { - logger.error(e.getMessage(), e); - } catch (InterruptedException e) { - // thread interrupted; likely exiting on shutdown - } - } - }; - /** {@inheritDoc} */ @Override public DigitalInput shutdown(Context context) throws ShutdownException { @@ -174,7 +171,10 @@ public DigitalInput shutdown(Context context) throws ShutdownException { // this line will execute immediately, not waiting for your task to complete logger.trace("shutdown monitoring thread for GPIO [" + this.config.address() + "]; " + gpio.getPinPath()); - executor.shutdown(); // tell executor no more work is coming + if (this.inputListener != null) { + if (!this.inputListener.cancel(true)) + logger.error("Failed to cancel input listener!"); + } // perform any shutdown cleanup via superclass super.shutdown(context);