diff --git a/stream_framework/storage/redis/structures/base.py b/stream_framework/storage/redis/structures/base.py index 7c942fdb..a2c01742 100644 --- a/stream_framework/storage/redis/structures/base.py +++ b/stream_framework/storage/redis/structures/base.py @@ -1,5 +1,5 @@ from stream_framework.storage.redis.connection import get_redis_connection -from redis.client import BasePipeline +from redis.client import Pipeline class RedisCache(object): @@ -49,7 +49,7 @@ def _pipeline_if_needed(self, operation, *args, **kwargs): If the redis connection is already in distributed state use it Otherwise spawn a new distributed connection using .map ''' - pipe_needed = not isinstance(self.redis, BasePipeline) + pipe_needed = not isinstance(self.redis, Pipeline) if pipe_needed: pipe = self.redis.pipeline(transaction=False) operation(pipe, *args, **kwargs)