Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Keshav Krishna committed Jun 23, 2024
0 parents commit a9f1203
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
133 changes: 133 additions & 0 deletions cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, Dict, Any
from threading import Lock
from time import time
from eviction_policies import EvictionPolicy
from cache_metrics import CacheMetrics

K = TypeVar('K')
V = TypeVar('V')

class CacheItem:
def __init__(self, value: Any, expiry: Optional[float] = None):
self.value = value
self.expiry = expiry

class Cache(ABC, Generic[K, V]):
@abstractmethod
def put(self, key: K, value: V, ttl: Optional[int] = None) -> None:
pass

@abstractmethod
def get(self, key: K) -> V:
pass

@abstractmethod
def remove(self, key: K) -> None:
pass

@abstractmethod
def get_metrics(self) -> Dict[str, Any]:
pass

class SegmentedCache(Cache[K, V]):
def __init__(self, capacity_per_segment: int, eviction_policy_class: EvictionPolicy[K], num_segments: int = 16):
self.capacity_per_segment = capacity_per_segment
self.eviction_policy_class = eviction_policy_class
self.segments = [{} for _ in range(num_segments)] # Each segment is a dictionary
self.eviction_policies = [eviction_policy_class() for _ in range(num_segments)] # Instantiate policy for each segment
self.locks = [Lock() for _ in range(num_segments)] # One lock per segment
self.metrics = CacheMetrics()
self.num_segments = num_segments
self.global_lock = Lock()

def _get_segment(self, key: K) -> int:
return hash(key) % self.num_segments

def put(self, key: K, value: V, ttl: Optional[int] = None) -> None:
segment_index = self._get_segment(key)
with self.locks[segment_index]:
self._remove_expired_items(segment_index)
if key in self.segments[segment_index]:
self.eviction_policies[segment_index].remove(key)
elif len(self.segments[segment_index]) >= self.capacity_per_segment:
self._evict_item(segment_index)

expiry = time() + ttl if ttl is not None else None
self.segments[segment_index][key] = CacheItem(value, expiry)
self.eviction_policies[segment_index].add(key)

def get(self, key: K) -> V:
segment_index = self._get_segment(key)
with self.locks[segment_index]:
self._remove_expired_items(segment_index)
if key not in self.segments[segment_index]:
self.metrics.record_miss()
raise KeyError(f"Key '{key}' not found in cache")

item = self.segments[segment_index][key]
if item.expiry is not None and item.expiry <= time():
del self.segments[segment_index][key]
self.eviction_policies[segment_index].remove(key)
self.metrics.record_expiration()
self.metrics.record_miss()
raise KeyError(f"Key '{key}' has expired")

self.eviction_policies[segment_index].remove(key)
self.eviction_policies[segment_index].add(key)
self.metrics.record_hit()
return item.value

def remove(self, key: K) -> None:
segment_index = self._get_segment(key)
with self.locks[segment_index]:
if key in self.segments[segment_index]:
del self.segments[segment_index][key]
self.eviction_policies[segment_index].remove(key)

def _remove_expired_items(self, segment_index: int) -> None:
current_time = time()
expired_keys = [k for k, v in self.segments[segment_index].items() if v.expiry is not None and v.expiry <= current_time]
for key in expired_keys:
del self.segments[segment_index][key]
self.eviction_policies[segment_index].remove(key)
self.metrics.record_expiration()

def _evict_item(self, segment_index: int) -> None:
evicted_key = self.eviction_policies[segment_index].evict()
del self.segments[segment_index][evicted_key]
self.metrics.record_eviction()

def get_metrics(self) -> Dict[str, Any]:
return self.metrics.get_metrics()

def resize_segments(self, new_num_segments: int) -> None:
if new_num_segments <= 0:
raise ValueError("Number of segments must be positive")

acquired_locks = []
with self.global_lock:
current_num_segments = self.num_segments
for lock in self.locks:
lock.acquire()
acquired_locks.append(lock)

try:
if new_num_segments > current_num_segments:
self.segments.extend([{} for _ in range(new_num_segments - current_num_segments)])
self.eviction_policies.extend([self.eviction_policy_class() for _ in range(new_num_segments - current_num_segments)])
self.locks.extend([Lock() for _ in range(new_num_segments - current_num_segments)])
elif new_num_segments < current_num_segments:
self.segments = self.segments[:new_num_segments]
self.eviction_policies = self.eviction_policies[:new_num_segments]
self.locks = self.locks[:new_num_segments]

self.num_segments = new_num_segments
print(f"Segments resized to {new_num_segments}")

finally:
for lock in acquired_locks:
lock.release()

def __str__(self) -> str:
return f"SegmentedCache with {self.num_segments} segments, capacity per segment: {self.capacity_per_segment}"
21 changes: 21 additions & 0 deletions cache_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Type
from cache import SegmentedCache
from eviction_policies import EvictionPolicy, FIFOEvictionPolicy, LRUEvictionPolicy, LIFOEvictionPolicy

class CacheFactory:
_cache_types = {
"FIFO": FIFOEvictionPolicy,
"LRU": LRUEvictionPolicy,
"LIFO": LIFOEvictionPolicy
}

@classmethod
def register_cache_type(cls, cache_type: str, policy_class: Type[EvictionPolicy]):
cls._cache_types[cache_type] = policy_class

@classmethod
def create_cache(cls, cache_type: str, capacity: int, num_segments: int = 16):
policy_class = cls._cache_types.get(cache_type)
if policy_class is None:
raise ValueError(f"Unsupported cache type: {cache_type}")
return SegmentedCache(capacity, policy_class, num_segments)
40 changes: 40 additions & 0 deletions cache_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from threading import Lock

class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
self.total_requests = 0
self.evictions = 0
self.expirations = 0
self.lock = Lock()

def record_hit(self):
with self.lock:
self.hits += 1
self.total_requests += 1

def record_miss(self):
with self.lock:
self.misses += 1
self.total_requests += 1

def record_eviction(self):
with self.lock:
self.evictions += 1

def record_expiration(self):
with self.lock:
self.expirations += 1

def get_metrics(self):
with self.lock:
return {
"hits": self.hits,
"misses": self.misses,
"total_requests": self.total_requests,
"evictions": self.evictions,
"expirations": self.expirations,
"hit_ratio": self.hits / self.total_requests if self.total_requests > 0 else 0,
"miss_ratio": self.misses / self.total_requests if self.total_requests > 0 else 0
}
147 changes: 147 additions & 0 deletions demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import threading
import time
from cache_factory import CacheFactory
from eviction_policies import EvictionPolicy
from typing import Generic, TypeVar, Dict
from collections import defaultdict

K = TypeVar('K')

# declared this class to implement LFU (least frequently used ) eviction policy
class LFUEvictionPolicy(EvictionPolicy, Generic[K]):
def __init__(self):
self.key_frequency: Dict[K, int] = defaultdict(int)
self.frequency_keys: Dict[int, set[K]] = defaultdict(set)
self.min_frequency = 0

def add(self, key: K) -> None:
if key not in self.key_frequency:
self.key_frequency[key] = 1
self.frequency_keys[1].add(key)
self.min_frequency = 1
else:
self._increment_frequency(key)

def remove(self, key: K) -> None:
if key in self.key_frequency:
freq = self.key_frequency[key]
self.frequency_keys[freq].remove(key)
if len(self.frequency_keys[freq]) == 0:
del self.frequency_keys[freq]
if freq == self.min_frequency:
self.min_frequency += 1
del self.key_frequency[key]

def evict(self) -> K:
if not self.key_frequency:
raise ValueError("No keys to evict")

key_to_evict = next(iter(self.frequency_keys[self.min_frequency]))
self.remove(key_to_evict)
return key_to_evict

def _increment_frequency(self, key: K) -> None:
freq = self.key_frequency[key]
self.key_frequency[key] = freq + 1
self.frequency_keys[freq].remove(key)
if len(self.frequency_keys[freq]) == 0:
del self.frequency_keys[freq]
if freq == self.min_frequency:
self.min_frequency += 1
self.frequency_keys[freq + 1].add(key)




def test_cache(cache, thread):
# Put some items in the cache
for i in range(4):
cache.put(f"key{i}", f"value{i}")
print(f"{thread} Added key{i}")

# Access some items multiple times to increase their frequency
for _ in range(3):
cache.get("key0")
for _ in range(2):
cache.get("key1")

print(f"\n {thread} Current cache state:")
for i in range(4):
try:
print(f" {thread} key{i}: {cache.get(f'key{i}')}")
except KeyError:
print(f" {thread} key{i}: NOt found")



for i in range(4, 16):
cache.put(f"key{i}", f"value{i}")
print(f"{thread} Added key{i}")

print(f"\n {thread} Afteer adding more items:")
for i in range(16):
try:
print(f" {thread} key{i}: {cache.get(f'key{i}')}")
except KeyError:
print(f" {thread} key{i}: Not found")

# Resize the segment, add one more segment
cache.resize_segments(cache.num_segments + 1)


print(f"\n {thread} After ading one mre dsegment:")
for i in range(6):
try:
print(f" {thread} key{i}: {cache.get(f'key{i}')}")
except KeyError:
print(f" {thread} key{i}: Not found")

# Demonstrate TTL functionality
cache.put("ttl_key", "ttl_value", ttl=2)
print(f"\n {thread} After adding ttl_key:")
print(f"{thread} ttl_key: {cache.get('ttl_key')}")

time.sleep(3)

print(f"\n {thread} After waiting for TTL expiristion:")
try:
print(f"{thread} ttl_key: {cache.get('ttl_key')}")
except KeyError:
print(f"{thread} ttl_key: expired")


# Print cache metrics
print(f"\n {thread} Cache Metrics:")
print(cache.get_metrics())



def create_test_cache(thread, cache_type, capacity, num_segments):
cache = CacheFactory.create_cache(cache_type, capacity=capacity, num_segments=num_segments)
test_cache(cache, thread)

def main():
threads = []

# Create a thread for each cache operation
threads.append(threading.Thread(target=create_test_cache, args=("Thread1->", "FIFO", 3, 4)))
threads.append(threading.Thread(target=create_test_cache, args=("Thread2->", "LRU", 3, 2)))
threads.append(threading.Thread(target=create_test_cache, args=("Thread3->", "LIFO", 3, 3)))

# have implemented the custom class LFUEvictionPolicy to add custom Eviction policy,
# it will basically replicate Least frequently Used policy to evict
CacheFactory.register_cache_type("LFU", LFUEvictionPolicy)
threads.append(threading.Thread(target=create_test_cache, args=("Thread4->", "LFU", 3, 4)))

# Start all threads
for thread in threads:
thread.start()

# Wait till all the threads are completed
for thread in threads:
thread.join()



if __name__ == "__main__":
main()
Loading

0 comments on commit a9f1203

Please sign in to comment.