Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
SgtSilvio committed Nov 15, 2023
1 parent c197fa7 commit 9627f41
Show file tree
Hide file tree
Showing 60 changed files with 375 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public interface Mqtt3ReactorClient extends Mqtt3Client {
* Fluent counterpart of {@link #subscribe(Mqtt3Subscribe)}.
* <p>
* Calling {@link Mqtt3SubscribeBuilder.Nested.Complete#applySubscribe()} on the returned builder has the same
* effect as calling {@link #subscribe(Mqtt3Subscribe)} with the result of {@link
* Mqtt3SubscribeBuilder.Complete#build()}.
* effect as calling {@link #subscribe(Mqtt3Subscribe)} with the result of
* {@link Mqtt3SubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Subscribe message.
* @see #subscribe(Mqtt3Subscribe)
Expand Down Expand Up @@ -204,8 +204,8 @@ public interface Mqtt3ReactorClient extends Mqtt3Client {
* Fluent counterpart of {@link #subscribePublishes(Mqtt3Subscribe, boolean)}.
* <p>
* Calling {@link Mqtt3SubscribeBuilder.Nested.Complete#applySubscribe()} on the returned builder has the same
* effect as calling {@link #subscribePublishes(Mqtt3Subscribe)} with the result of {@link
* Mqtt3SubscribeBuilder.Complete#build()}.
* effect as calling {@link #subscribePublishes(Mqtt3Subscribe)} with the result of
* {@link Mqtt3SubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Subscribe message.
* @see #subscribePublishes(Mqtt3Subscribe, boolean)
Expand Down Expand Up @@ -276,8 +276,8 @@ public interface Mqtt3ReactorClient extends Mqtt3Client {
* Fluent counterpart of {@link #unsubscribe(Mqtt3Unsubscribe)}.
* <p>
* Calling {@link Mqtt3UnsubscribeBuilder.Nested.Complete#applyUnsubscribe()} on the returned builder has the same
* effect as calling {@link #unsubscribe(Mqtt3Unsubscribe)} with the result of {@link
* Mqtt3UnsubscribeBuilder.Complete#build()}.
* effect as calling {@link #unsubscribe(Mqtt3Unsubscribe)} with the result of
* {@link Mqtt3UnsubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Unsubscribe message.
* @see #unsubscribe(Mqtt3Unsubscribe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public interface Mqtt5ReactorClient extends Mqtt5Client {
* Fluent counterpart of {@link #subscribe(Mqtt5Subscribe)}.
* <p>
* Calling {@link Mqtt5SubscribeBuilder.Nested.Complete#applySubscribe()} on the returned builder has the same
* effect as calling {@link #subscribe(Mqtt5Subscribe)} with the result of {@link
* Mqtt5SubscribeBuilder.Complete#build()}.
* effect as calling {@link #subscribe(Mqtt5Subscribe)} with the result of
* {@link Mqtt5SubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Subscribe message.
* @see #subscribe(Mqtt5Subscribe)
Expand Down Expand Up @@ -206,8 +206,8 @@ public interface Mqtt5ReactorClient extends Mqtt5Client {
* Fluent counterpart of {@link #subscribePublishes(Mqtt5Subscribe, boolean)}.
* <p>
* Calling {@link Mqtt5SubscribeBuilder.Nested.Complete#applySubscribe()} on the returned builder has the same
* effect as calling {@link #subscribePublishes(Mqtt5Subscribe)} with the result of {@link
* Mqtt5SubscribeBuilder.Complete#build()}.
* effect as calling {@link #subscribePublishes(Mqtt5Subscribe)} with the result of
* {@link Mqtt5SubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Subscribe message.
* @see #subscribePublishes(Mqtt5Subscribe, boolean)
Expand Down Expand Up @@ -282,8 +282,8 @@ public interface Mqtt5ReactorClient extends Mqtt5Client {
* Fluent counterpart of {@link #unsubscribe(Mqtt5Unsubscribe)}.
* <p>
* Calling {@link Mqtt5UnsubscribeBuilder.Nested.Complete#applyUnsubscribe()} on the returned builder has the same
* effect as calling {@link #unsubscribe(Mqtt5Unsubscribe)} with the result of {@link
* Mqtt5UnsubscribeBuilder.Complete#build()}.
* effect as calling {@link #unsubscribe(Mqtt5Unsubscribe)} with the result of
* {@link Mqtt5UnsubscribeBuilder.Complete#build()}.
*
* @return the fluent builder for the Unsubscribe message.
* @see #unsubscribe(Mqtt5Unsubscribe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
public interface CorePublisherWithSingle<T, S> extends PublisherWithSingle<T, S>, CorePublisher<T> {

/**
* {@link PublisherWithSingle#subscribeBoth(WithSingleSubscriber) Subscribes} to this {@link
* CorePublisherWithSingle}.
* {@link PublisherWithSingle#subscribeBoth(WithSingleSubscriber) Subscribes} to this
* {@link CorePublisherWithSingle}.
* <p>
* In addition to behave as expected by {@link Publisher#subscribe(Subscriber)} in a controlled manner, it supports
* direct subscribe-time {@link reactor.util.context.Context Context} passing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ protected MqttRxClientBuilderBase(final @NotNull MqttRxClientBuilderBase<?> clie
if (autoReconnect == null) {
return disconnectedListenersBuilder.build();
}
return ImmutableList.<MqttDisconnectedListener>builder(disconnectedListenersBuilder.getSize() + 1).add(
autoReconnect).addAll(disconnectedListenersBuilder.build()).build();
return ImmutableList.<MqttDisconnectedListener>builder(disconnectedListenersBuilder.getSize() + 1)
.add(autoReconnect)
.addAll(disconnectedListenersBuilder.build())
.build();
}

protected @NotNull MqttClientConfig buildClientConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public MqttDecoderException(final @NotNull Mqtt5DisconnectReasonCode reasonCode,
}

/**
* Creates a new Decoder exception with the Disconnect reason code {@link Mqtt5DisconnectReasonCode#MALFORMED_PACKET}.
* Creates a new Decoder exception with the Disconnect reason code
* {@link Mqtt5DisconnectReasonCode#MALFORMED_PACKET}.
*
* @param message the description of the decoder exception.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ private int remainingLength(
* Calculates the property length of the given MQTT message with omitting the given count of properties.
*
* @param message the MQTT message to encode.
* @param propertyLength the already calculated property length with a count of omitted properties of {@code
* omittedProperties - 1}.
* @param propertyLength the already calculated property length with a count of omitted properties of
* {@code omittedProperties - 1}.
* @param omittedProperties the count of omitted properties.
* @return the property length of the MQTT message with omitting the given count of properties or -1 if no more
* properties can be omitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ static boolean isShared(final @NotNull String string) {

/**
* Checks if the given byte array with UTF-8 encoded data represents a Shared Topic Filter. This method does not
* validate whether it represents a valid Shared Topic Filter but only whether it starts with {@value
* #SHARE_PREFIX}.
* validate whether it represents a valid Shared Topic Filter but only whether it starts with
* {@value #SHARE_PREFIX}.
*
* @param binary the byte array with UTF-8 encoded data.
* @return whether the byte array represents a Shared Topic Filter.
Expand All @@ -68,8 +68,9 @@ static boolean isShared(final byte @NotNull [] binary) {
/**
* Validates and creates a Shared Topic Filter of the given UTF-16 encoded Java string.
* <p>
* This method does not validate {@link MqttUtf8StringImpl#checkLength(String, String) length}, {@link
* MqttUtf8StringImpl#checkWellFormed(String, String) if well-fromed} and {@link #isShared(String) if shared}.
* This method does not validate {@link MqttUtf8StringImpl#checkLength(String, String) length},
* {@link MqttUtf8StringImpl#checkWellFormed(String, String) if well-fromed} and
* {@link #isShared(String) if shared}.
*
* @param string the UTF-16 encoded Java string staring with {@value #SHARE_PREFIX}.
* @return the created Shared Topic Filter.
Expand Down Expand Up @@ -101,8 +102,8 @@ static boolean isShared(final byte @NotNull [] binary) {
/**
* Validates and creates a Shared Topic Filter of the given byte array with UTF-8 encoded data.
* <p>
* This method does not validate length, {@link MqttUtf8StringImpl#isWellFormed(byte[]) if well-formed} and {@link
* #isShared(byte[]) if shared}.
* This method does not validate length, {@link MqttUtf8StringImpl#isWellFormed(byte[]) if well-formed} and
* {@link #isShared(byte[]) if shared}.
*
* @param binary the byte array with UTF-8 encoded data staring with {@value #SHARE_PREFIX}.
* @return the created Shared Topic Filter or <code>null</code> if the byte array is not a valid Shared Topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public class MqttTopicFilterImpl extends MqttUtf8StringImpl implements MqttTopic
* @param binary the byte array with UTF-8 encoded data.
* @param start the index in the byte array to start validation at.
* @return a combination of {@link #WILDCARD_FLAG_MULTI_LEVEL} and {@link #WILDCARD_FLAG_SINGLE_LEVEL} indicating
* that a multi-level and/or single-level wildcards are present in the byte array or {@link
* #WILDCARD_CHECK_FAILURE} if the wildcard characters are misplaced.
* that a multi-level and/or single-level wildcards are present in the byte array or
* {@link #WILDCARD_CHECK_FAILURE} if the wildcard characters are misplaced.
*/
static int validateWildcards(final byte @NotNull [] binary, final int start) {
int wildcardFlags = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public final class MqttVariableByteInteger {
*
* @param byteBuf the buffer to decode from.
* @return the decoded integer value or {@link #NOT_ENOUGH_BYTES} if there are not enough bytes in the byte buffer
* or {@link #TOO_LARGE} if the encoded variable byte integer has more than 4 bytes or {@link
* #NOT_MINIMUM_BYTES} if the value is not encoded with a minimum number of bytes.
* or {@link #TOO_LARGE} if the encoded variable byte integer has more than 4 bytes or
* {@link #NOT_MINIMUM_BYTES} if the value is not encoded with a minimum number of bytes.
*/
public static int decode(final @NotNull ByteBuf byteBuf) {
byte encodedByte;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void onSessionStartOrResume(
packetIdentifiers.resize(newSendMaximum);
if (oldSendMaximum == 0) {
publishFlowables.flatMap(
f -> f, true, MAX_CONCURRENT_PUBLISH_FLOWABLES, Math.min(newSendMaximum, Flowable.bufferSize()))
f -> f, true, MAX_CONCURRENT_PUBLISH_FLOWABLES, Math.min(newSendMaximum, Flowable.bufferSize()))
.subscribe(this);
assert subscription != null;
subscription.request(newSendMaximum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import java.util.concurrent.TimeUnit;

/**
* ChannelInboundHandler with timeout handling. Subclasses must not be {@link io.netty.channel.ChannelHandler.Sharable}.
* ChannelInboundHandler with timeout handling. Subclasses must not be
* {@link io.netty.channel.ChannelHandler.Sharable}.
*
* @author Silvio Giebl
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public interface MqttExecutorConfigBuilderBase<B extends MqttExecutorConfigBuild
@NotNull B nettyThreads(@Range(from = 1, to = Integer.MAX_VALUE) int nettyThreads);

/**
* Sets the {@link MqttExecutorConfig#getApplicationScheduler() scheduler used for executing application specific
* code}.
* Sets the
* {@link MqttExecutorConfig#getApplicationScheduler() scheduler used for executing application specific code}.
*
* @param applicationScheduler the scheduler used for executing application specific code.
* @return the builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ public interface MqttTransportConfigBuilderBase<B extends MqttTransportConfigBui
MqttProxyConfigBuilder.@NotNull Nested<? extends B> proxyConfigWith();

/**
* Sets the {@link MqttTransportConfig#getSocketConnectTimeoutMs() timeout for connecting the socket to the
* server}.
* Sets the
* {@link MqttTransportConfig#getSocketConnectTimeoutMs() timeout for connecting the socket to the server}.
* <p>
* The timeout in milliseconds must be in the range: [0, {@link Integer#MAX_VALUE}].
*
Expand All @@ -216,8 +216,9 @@ public interface MqttTransportConfigBuilderBase<B extends MqttTransportConfigBui
@NotNull B socketConnectTimeout(long timeout, @NotNull TimeUnit timeUnit);

/**
* Sets the {@link MqttTransportConfig#getMqttConnectTimeoutMs() timeout between sending the Connect and receiving
* the ConnAck message}.
* Sets the
* {@link MqttTransportConfig#getMqttConnectTimeoutMs() timeout between sending the Connect and receiving the
* ConnAck message}.
* <p>
* The timeout in milliseconds must be in the range: [0, {@link Integer#MAX_VALUE}].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
/**
* A reconnector is supplied by a {@link MqttDisconnectedContext} and can be used for reconnecting.
* <p>
* The client will reconnect only if at least one of the methods {@link #reconnect(boolean)} or {@link
* #reconnectWhen(CompletableFuture, BiConsumer)} is called.
* The client will reconnect only if at least one of the methods {@link #reconnect(boolean)} or
* {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called.
* <p>
* All methods must only be called in {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}. Some
* methods can also be called in the callback supplied to {@link #reconnectWhen(CompletableFuture, BiConsumer)}.
Expand Down Expand Up @@ -91,7 +91,8 @@ public interface MqttReconnector {
* It can be used to set new connect properties (e.g. credentials).
* @param <T> the result type of the future.
* @return this reconnector.
* @throws UnsupportedOperationException if called outside of {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @throws UnsupportedOperationException if called outside of
* {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
*/
<T> @NotNull MqttReconnector reconnectWhen(
@NotNull CompletableFuture<T> future, @Nullable BiConsumer<? super T, ? super Throwable> callback);
Expand All @@ -108,15 +109,16 @@ public interface MqttReconnector {
* When the client reconnected successfully and its session is still present, the server still knows its
* subscriptions and they do not need to be restored.
* <p>
* This setting only has effect if the client will reconnect (at least one of the methods {@link
* #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* This setting only has effect if the client will reconnect (at least one of the methods
* {@link #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* <p>
* This method must only be called in {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)} and
* not in the callback supplied to {@link #reconnectWhen(CompletableFuture, BiConsumer)}.
*
* @param resubscribe whether to resubscribe when the session expired before the client reconnected successfully.
* @return this reconnector.
* @throws UnsupportedOperationException if called outside of {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @throws UnsupportedOperationException if called outside of
* {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @since 1.2
*/
@NotNull MqttReconnector resubscribeIfSessionExpired(boolean resubscribe);
Expand All @@ -134,15 +136,16 @@ public interface MqttReconnector {
* When the client reconnected successfully and its session is still present, the client will always queue pending
* Publish messages and automatically publish them to ensure the QoS guarantees.
* <p>
* This setting only has effect if the client will reconnect (at least one of the methods {@link
* #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* This setting only has effect if the client will reconnect (at least one of the methods
* {@link #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* <p>
* This method must only be called in {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)} and
* not in the callback supplied to {@link #reconnectWhen(CompletableFuture, BiConsumer)}.
*
* @param republish whether to republish when the session expired before the client reconnected successfully.
* @return this reconnector.
* @throws UnsupportedOperationException if called outside of {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @throws UnsupportedOperationException if called outside of
* {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @since 1.2
*/
@NotNull MqttReconnector republishIfSessionExpired(boolean republish);
Expand All @@ -156,8 +159,8 @@ public interface MqttReconnector {
/**
* Sets a delay the client will wait for before trying to reconnect.
* <p>
* This setting only has effect if the client will reconnect (at least one of the methods {@link
* #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* This setting only has effect if the client will reconnect (at least one of the methods
* {@link #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
* <p>
* If additionally a {@link #reconnectWhen(CompletableFuture, BiConsumer) future} is supplied, the client will
* reconnect after both are complete.
Expand All @@ -168,15 +171,16 @@ public interface MqttReconnector {
* @param delay delay which the client will wait before trying to reconnect.
* @param timeUnit the time unit of the delay.
* @return this reconnector.
* @throws UnsupportedOperationException if called outside of {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
* @throws UnsupportedOperationException if called outside of
* {@link MqttDisconnectedListener#onDisconnected(MqttDisconnectedContext)}.
*/
@NotNull MqttReconnector delay(long delay, @NotNull TimeUnit timeUnit);

/**
* Returns the currently set delay the client will wait for before trying to reconnect.
* <p>
* If the {@link #delay(long, TimeUnit)} method has not been called before (including previous {@link
* MqttDisconnectedListener}s) it will be {@link #DEFAULT_DELAY_MS}.
* If the {@link #delay(long, TimeUnit)} method has not been called before (including previous
* {@link MqttDisconnectedListener}s) it will be {@link #DEFAULT_DELAY_MS}.
*
* @param timeUnit the time unit of the returned delay.
* @return the delay in the given time unit.
Expand Down Expand Up @@ -206,10 +210,10 @@ public interface MqttReconnector {
/**
* Returns the currently set transport configuration the client will try to reconnect with.
* <p>
* If the {@link #transportConfig(MqttTransportConfig)} method has not been called before (including previous {@link
* MqttDisconnectedListener}s) it will be the transport configuration the client was connected with or the {@link
* com.hivemq.client2.mqtt.MqttClientConfig#getTransportConfig() default transport configuration} if it has not been
* connected yet.
* If the {@link #transportConfig(MqttTransportConfig)} method has not been called before (including previous
* {@link MqttDisconnectedListener}s) it will be the transport configuration the client was connected with or the
* {@link com.hivemq.client2.mqtt.MqttClientConfig#getTransportConfig() default transport configuration} if it has
* not been connected yet.
*
* @return the transport configuration.
*/
Expand Down
Loading

0 comments on commit 9627f41

Please sign in to comment.