2525import org .apache .iggy .client .async .TopicsClient ;
2626import org .apache .iggy .client .async .UsersClient ;
2727
28+ import java .time .Duration ;
29+ import java .util .Optional ;
2830import java .util .concurrent .CompletableFuture ;
2931
3032/**
@@ -35,6 +37,12 @@ public class AsyncIggyTcpClient {
3537
3638 private final String host ;
3739 private final int port ;
40+ private final Optional <String > username ;
41+ private final Optional <String > password ;
42+ private final Optional <Duration > connectionTimeout ;
43+ private final Optional <Duration > requestTimeout ;
44+ private final Optional <Integer > connectionPoolSize ;
45+ private final Optional <RetryPolicy > retryPolicy ;
3846 private AsyncTcpConnection connection ;
3947 private MessagesClient messagesClient ;
4048 private ConsumerGroupsClient consumerGroupsClient ;
@@ -43,8 +51,29 @@ public class AsyncIggyTcpClient {
4351 private UsersClient usersClient ;
4452
4553 public AsyncIggyTcpClient (String host , int port ) {
54+ this (host , port , null , null , null , null , null , null );
55+ }
56+
57+ private AsyncIggyTcpClient (String host , int port , String username , String password ,
58+ Duration connectionTimeout , Duration requestTimeout ,
59+ Integer connectionPoolSize , RetryPolicy retryPolicy ) {
4660 this .host = host ;
4761 this .port = port ;
62+ this .username = Optional .ofNullable (username );
63+ this .password = Optional .ofNullable (password );
64+ this .connectionTimeout = Optional .ofNullable (connectionTimeout );
65+ this .requestTimeout = Optional .ofNullable (requestTimeout );
66+ this .connectionPoolSize = Optional .ofNullable (connectionPoolSize );
67+ this .retryPolicy = Optional .ofNullable (retryPolicy );
68+ }
69+
70+ /**
71+ * Creates a new builder for configuring AsyncIggyTcpClient.
72+ *
73+ * @return a new Builder instance
74+ */
75+ public static Builder builder () {
76+ return new Builder ();
4877 }
4978
5079 /**
@@ -59,6 +88,14 @@ public CompletableFuture<Void> connect() {
5988 streamsClient = new StreamsTcpClient (connection );
6089 topicsClient = new TopicsTcpClient (connection );
6190 usersClient = new UsersTcpClient (connection );
91+ })
92+ .thenCompose (v -> {
93+ // Auto-login if credentials are provided
94+ if (username .isPresent () && password .isPresent ()) {
95+ return usersClient .loginAsync (username .get (), password .get ())
96+ .thenApply (identity -> null );
97+ }
98+ return CompletableFuture .completedFuture (null );
6299 });
63100 }
64101
@@ -121,4 +158,192 @@ public CompletableFuture<Void> close() {
121158 }
122159 return CompletableFuture .completedFuture (null );
123160 }
161+
162+ /**
163+ * Builder for creating configured AsyncIggyTcpClient instances.
164+ */
165+ public static class Builder {
166+ private String host = "localhost" ;
167+ private Integer port = 8090 ;
168+ private String username ;
169+ private String password ;
170+ private Duration connectionTimeout ;
171+ private Duration requestTimeout ;
172+ private Integer connectionPoolSize ;
173+ private RetryPolicy retryPolicy ;
174+
175+ private Builder () {
176+ }
177+
178+ /**
179+ * Sets the host address for the Iggy server.
180+ *
181+ * @param host the host address
182+ * @return this builder
183+ */
184+ public Builder host (String host ) {
185+ this .host = host ;
186+ return this ;
187+ }
188+
189+ /**
190+ * Sets the port for the Iggy server.
191+ *
192+ * @param port the port number
193+ * @return this builder
194+ */
195+ public Builder port (Integer port ) {
196+ this .port = port ;
197+ return this ;
198+ }
199+
200+ /**
201+ * Sets the credentials for authentication.
202+ *
203+ * @param username the username
204+ * @param password the password
205+ * @return this builder
206+ */
207+ public Builder credentials (String username , String password ) {
208+ this .username = username ;
209+ this .password = password ;
210+ return this ;
211+ }
212+
213+ /**
214+ * Sets the connection timeout.
215+ *
216+ * @param connectionTimeout the connection timeout duration
217+ * @return this builder
218+ */
219+ public Builder connectionTimeout (Duration connectionTimeout ) {
220+ this .connectionTimeout = connectionTimeout ;
221+ return this ;
222+ }
223+
224+ /**
225+ * Sets the request timeout.
226+ *
227+ * @param requestTimeout the request timeout duration
228+ * @return this builder
229+ */
230+ public Builder requestTimeout (Duration requestTimeout ) {
231+ this .requestTimeout = requestTimeout ;
232+ return this ;
233+ }
234+
235+ /**
236+ * Sets the connection pool size.
237+ *
238+ * @param connectionPoolSize the size of the connection pool
239+ * @return this builder
240+ */
241+ public Builder connectionPoolSize (Integer connectionPoolSize ) {
242+ this .connectionPoolSize = connectionPoolSize ;
243+ return this ;
244+ }
245+
246+ /**
247+ * Sets the retry policy.
248+ *
249+ * @param retryPolicy the retry policy to use
250+ * @return this builder
251+ */
252+ public Builder retryPolicy (RetryPolicy retryPolicy ) {
253+ this .retryPolicy = retryPolicy ;
254+ return this ;
255+ }
256+
257+ /**
258+ * Builds and returns a configured AsyncIggyTcpClient instance.
259+ * Note: You still need to call connect() on the returned client.
260+ *
261+ * @return a new AsyncIggyTcpClient instance
262+ */
263+ public AsyncIggyTcpClient build () {
264+ if (host == null || host .isEmpty ()) {
265+ throw new IllegalArgumentException ("Host cannot be null or empty" );
266+ }
267+ if (port == null || port <= 0 ) {
268+ throw new IllegalArgumentException ("Port must be a positive integer" );
269+ }
270+ return new AsyncIggyTcpClient (host , port , username , password ,
271+ connectionTimeout , requestTimeout , connectionPoolSize , retryPolicy );
272+ }
273+ }
274+
275+ /**
276+ * Retry policy for client operations.
277+ */
278+ public static class RetryPolicy {
279+ private final int maxRetries ;
280+ private final Duration initialDelay ;
281+ private final Duration maxDelay ;
282+ private final double multiplier ;
283+
284+ private RetryPolicy (int maxRetries , Duration initialDelay , Duration maxDelay , double multiplier ) {
285+ this .maxRetries = maxRetries ;
286+ this .initialDelay = initialDelay ;
287+ this .maxDelay = maxDelay ;
288+ this .multiplier = multiplier ;
289+ }
290+
291+ /**
292+ * Creates a retry policy with exponential backoff.
293+ *
294+ * @return a RetryPolicy with exponential backoff configuration
295+ */
296+ public static RetryPolicy exponentialBackoff () {
297+ return new RetryPolicy (3 , Duration .ofMillis (100 ), Duration .ofSeconds (5 ), 2.0 );
298+ }
299+
300+ /**
301+ * Creates a retry policy with exponential backoff and custom parameters.
302+ *
303+ * @param maxRetries the maximum number of retries
304+ * @param initialDelay the initial delay before the first retry
305+ * @param maxDelay the maximum delay between retries
306+ * @param multiplier the multiplier for exponential backoff
307+ * @return a RetryPolicy with custom exponential backoff configuration
308+ */
309+ public static RetryPolicy exponentialBackoff (int maxRetries , Duration initialDelay , Duration maxDelay , double multiplier ) {
310+ return new RetryPolicy (maxRetries , initialDelay , maxDelay , multiplier );
311+ }
312+
313+ /**
314+ * Creates a retry policy with fixed delay.
315+ *
316+ * @param maxRetries the maximum number of retries
317+ * @param delay the fixed delay between retries
318+ * @return a RetryPolicy with fixed delay configuration
319+ */
320+ public static RetryPolicy fixedDelay (int maxRetries , Duration delay ) {
321+ return new RetryPolicy (maxRetries , delay , delay , 1.0 );
322+ }
323+
324+ /**
325+ * Creates a no-retry policy.
326+ *
327+ * @return a RetryPolicy that does not retry
328+ */
329+ public static RetryPolicy noRetry () {
330+ return new RetryPolicy (0 , Duration .ZERO , Duration .ZERO , 1.0 );
331+ }
332+
333+ public int getMaxRetries () {
334+ return maxRetries ;
335+ }
336+
337+ public Duration getInitialDelay () {
338+ return initialDelay ;
339+ }
340+
341+ public Duration getMaxDelay () {
342+ return maxDelay ;
343+ }
344+
345+ public double getMultiplier () {
346+ return multiplier ;
347+ }
348+ }
124349}
0 commit comments