Skip to content

Commit

Permalink
Add pull API
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Jan 16, 2023
1 parent 1184903 commit 20f53e1
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageQueue;
import org.apache.rocketmq.client.apis.message.MessageView;

public interface PullConsumer extends Closeable {
String getConsumerGroup();

void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener);

Collection<MessageQueue> fetchMessageQueues(String topic) throws ClientException;

PullConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;

PullConsumer unsubscribe(String topic);

void assign(Collection<MessageQueue> messageQueues);

List<MessageView> poll(Duration timeout);

void seek(MessageQueue messageQueue, long offset);

void pause(Collection<MessageQueue> messageQueues);

void resume(Collection<MessageQueue> messageQueues);

Optional<Long> offsetForTimestamp(MessageQueue messageQueue, Long timestamp);

Optional<Long> committed(MessageQueue messageQueue);

void commit();

void seekToBegin(MessageQueue messageQueue) throws ClientException;

void seekToEnd(MessageQueue messageQueue) throws ClientException;

@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.time.Duration;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;

public interface PullConsumerBuilder {
/**
* Set the client configuration for the consumer.
*
* @param clientConfiguration client's configuration.
* @return the consumer builder instance.
*/
PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);

/**
* Set the load balancing group for the consumer.
*
* @param consumerGroup consumer load balancing group.
* @return the consumer builder instance.
*/
PullConsumerBuilder setConsumerGroup(String consumerGroup);

/**
* Automate the consumer's offset commit.
*
* @return the consumer builder instance.
*/
PullConsumerBuilder enableAutoCommit(boolean enable);

/**
* Set the consumer's offset commit interval if auto commit is enabled.
*
* @param duration offset commit interval
* @return the consumer builder instance.
*/
PullConsumerBuilder setAutoCommitInterval(Duration duration);

/**
* Set the maximum number of messages cached locally.
*
* @param count message count.
* @return the consumer builder instance.
*/
PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count);

/**
* Set the maximum bytes of messages cached locally.
*
* @param bytes message size.
* @return the consumer builder instance.
*/
PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes);

/**
* Finalize the build of {@link PullConsumer} and start.
*
* <p>This method will block until the pull consumer starts successfully.
*
* <p>Especially, if this method is invoked more than once, different pull consumer will be created and started.
*
* @return the pull consumer instance.
*/
PullConsumer build() throws ClientException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface PushConsumerBuilder {
*
* <p>This method will block until the push consumer starts successfully.
*
* <p>Especially, if this method is invoked more than once, different push consumers will be created and started.
* <p>Especially, if this method is invoked more than once, different push consumer will be created and started.
*
* @return the push consumer instance.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.util.Set;
import org.apache.rocketmq.client.apis.message.MessageQueue;

public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param topic the topic to listen.
* @param messageQueues
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.message;

public interface MessageQueue {
String getTopic();

String getId();
}

0 comments on commit 20f53e1

Please sign in to comment.