Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add search functions by key(--keysearch) #187

Open
anch665 opened this issue Jan 30, 2024 · 3 comments
Open

add search functions by key(--keysearch) #187

anch665 opened this issue Jan 30, 2024 · 3 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@anch665
Copy link

anch665 commented Jan 30, 2024

Please add search functions by key

to determine the partition where the key is located, use the Utils.murmur2 function
for this we wrote a simple library, partitionIdentifier.jar

With your help, I will determine the maximum number of partitions and transfer the key and the maximum number of partitions to the library. This way I will determine in which partition the events for this key are located. Next, I search for events in the desired partition.
This gives a very high speed of searching for the necessary messages, especially in topics with more than 64 partitions.

partitionIdentifier.jar

package ru.msk;
import org.apache.kafka.common.utils.Utils;
public class Main {
    public static void main(String[] args) {
        try{
            getPartitionByKey(args[0], args[1]);
        } catch (Exception e) {
            System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
        }
    }

    public static void getPartitionByKey(String key, String numPartitions){
        try{
            int partitions = Integer.parseInt(numPartitions);
            partitions++;
            if ((key != null && !key.isEmpty()) && partitions > 0){
                System.out.println ((Utils.murmur2(Utils.utf8(key)) & 0x7fffffff) % partitions);
            } else {
                System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
            }
        }catch (Exception e){
            System.out.println("Please, enter the correct program arguments. It must be not-blank string 'Key' and number of partitions > 0.");
        }
    }
}

kafkactlkey.sh

JAR="/rtm/kafka/libs/partitionIdentifier.jar"
KEY=$(echo ""$@"" | egrep -o "\-\-keysearch.[[:digit:]]+\S+[[:digit:]]"  | awk -F " " '{print $2}')
TOPIC=$(echo ""$@"" | grep -E 'consume\s[^ ]+\s' |awk -F " " '{print $2}' )
STANDARTVAR=$(sed -r 's/--keysearch(.|\s+)\w+\S\w+//' <<< $@)
TIMESTAMP=$(echo ""$@"" | grep -o '\-\-from-timestamp')


if [ ! -f $JAR ]; then
  echo "File $JAR не найден."
  exit
fi

searchtokey () {
  if [ -z $KEY ]  ;then
    help
  fi
  KEYEXAMPL=$(kafkactl consume $TOPIC -k -b --max-messages 1 --output yaml | grep 'key:'| awk -F " " '{print $2}')
  if [ -z $KEYEXAMPL ] ; then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo -en "\033[0;31m  Топик $TOPIC не содержит ключа, используйте команду:  \033[0m \n"
    echo "  kafkactl $STANDARTVAR | grep ${KEY}"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi
  MAXPARTNUM=$(kafkactl get topics | grep -P "^$TOPIC "| awk -F " " '{print $2}')
  if [ -z $MAXPARTNUM ]  ;then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo "не удалось опередить количество партиций в топике $TOPIC, выход"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi
  MAXPARTNUM=$(($MAXPARTNUM-1))
  CURRENTPARTNUM=$(java -cp "/rtm/kafka/libs/partitionIdentifier.jar:/rtm/kafka/libs/*" ru.msk.Main $KEY $MAXPARTNUM)
  if [ -z $CURRENTPARTNUM ]  ;then
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    echo
    echo "не удалось определить партицию топике $TOPIC по ключу $KEY, выход"
    echo
    echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
    exit
  fi

  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
  echo
  echo "   --keysearch $KEY найден в партиции $CURRENTPARTNUM топика $TOPIC. Первый ключ в топике $KEYEXAMPL"
  if [ "x$TIMESTAMP" != "x" ] ; then
    ARGS="$STANDARTVAR -k --partitions $CURRENTPARTNUM"
#    echo "   from timestamp"
  else
    ARGS="$STANDARTVAR -k --from-beginning --partitions $CURRENTPARTNUM"
    echo "   from beggin"
  fi
  ARGS=$(sed -r 's/[ ]{2,}/ /' <<< $ARGS)
#  echo "   kafkactl $ARGS | grep $KEY"
  echo
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"

  exec kafkactl ${ARGS} | grep ${KEY}



}


help () {
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"
  echo
  echo "   --keysearch или параметр не указан, пример использования:"
  echo "   kafkactlkey.sh consume RTDM-INPUT --keysearch 7924777777"
  echo
  echo "-------------------------------------------------------------------------------------------------------------------------------------------------"

}

if [[ $@ =~ "--keysearch" ]] ;then
  # New query
  searchtokey
else
  # Standart query
  help
fi

Usage example

 ./kafkactlkey consume SDP-QUOTA-EVENTS --from-timestamp 1705800000  --keysearch 15654565
-------------------------------------------------------------------------------------------------------------------------------------------------

  Топик SDP-QUOTA-EVENTS не содержит ключа, используйте команду:
  kafkactl consume SDP-QUOTA-EVENTS --from-timestamp 1705800000  | grep 15654565

-------------------------------------------------------------------------------------------------------------------------------------------------

./kafkactlkey consume RTDM-INPUT --from-timestamp 1705800000  --keysearch 79280483033
-------------------------------------------------------------------------------------------------------------------------------------------------

   --keysearch 79280483033 найден в партиции 95 топика RTDM-INPUT. Первый ключ в топике "79289026353"

-------------------------------------------------------------------------------------------------------------------------------------------------
79280483033#{"SAPPN":"234242.170","MSISDN":"79280483033","EVAM_SCENARIO":"CAMP_0568_Smart","CAMPAIGN_CD":"CAMP2018850","M_REG_NAME":"Москва","UUID":"c396ec4d-4296-442d-bbeee-16da54a6e803","EVAM_EVENT":"RTDM_Response","RTDM_EVENT":"CAMP_0568_Smart - Приглашение в кампанию"}

@d-rk
Copy link
Collaborator

d-rk commented Feb 1, 2024

Hey @anch665

I guess this could work if we introduce a parameter on the consume command:

kafkactl consume my-topic --filter-key=my-key

The problem however is that the hashing algorithm used to map a key to a partition does not have to be murmur2
See: https://github.com/deviceinsight/kafkactl/blob/main/cmd/produce/produce.go#L32

So we would have to introduce an additional parameter --partitioner for the consume command, which controls the hashing algorithm. (for convenience, the default for the parameter could just be murmur2)

Then we could use the sarama partitioner to determine the partition to read from:
https://github.com/deviceinsight/kafkactl/blob/main/internal/producer/producer-operation.go#L292

A PR for this is welcomed, for us this currently does not have priority.

@d-rk d-rk added enhancement New feature or request help wanted Extra attention is needed labels Feb 1, 2024
@anch665
Copy link
Author

anch665 commented Feb 1, 2024

Hey @d-rk

This is a great idea, I would be glad to see such a feature

@vgdum
Copy link

vgdum commented Feb 3, 2024

Cool feature, when this is going to be implemented?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants