diff --git a/common/consistenthash/consistenthash.go b/common/consistenthash/consistenthash.go index dde99a9..971a8b6 100644 --- a/common/consistenthash/consistenthash.go +++ b/common/consistenthash/consistenthash.go @@ -1,108 +1,114 @@ package consistenthash import ( - "errors" - "hash/crc32" - "sort" - "sync" + "errors" + "hash/crc32" + "sort" + "sync" ) var ErrNodeNotFound = errors.New("node not found") // Ring is the data structure that holds a consistent hashed ring. type Ring struct { - Nodes Nodes - sync.Mutex + Nodes Nodes + sync.Mutex } // search will find the index of the node that is responsible for the range that // includes the hashed value of key. func (r *Ring) search(key string) int { - ///////////////////////// - // YOUR CODE GOES HERE // - ///////////////////////// - - return 0 + ///////////////////////// + // YOUR CODE GOES HERE // + ///////////////////////// + hashedKey := crc32.ChecksumIEEE([]byte(key)) % uint32(1000) + selected := 0 + for _, n := range r.Nodes { + if hashedKey > n.HashId { + selected++ + } + } + return selected % r.Nodes.Len() } // NewRing will create a new Ring object and return a pointer to it. func NewRing() *Ring { - return &Ring{Nodes: Nodes{}} + return &Ring{Nodes: Nodes{}} } // Verify if a node with a given id already exists in the ring and if so return // a pointer to it. func (r *Ring) Exists(id string) (bool, *Node) { - r.Lock() - defer r.Unlock() + r.Lock() + defer r.Unlock() - for _, node := range r.Nodes { - if node.Id == id { - return true, node - } - } + for _, node := range r.Nodes { + if node.Id == id { + return true, node + } + } - return false, nil + return false, nil } // Add a node to the ring and return a pointer to it. func (r *Ring) AddNode(id string) *Node { - r.Lock() - defer r.Unlock() + r.Lock() + defer r.Unlock() - node := NewNode(id) - r.Nodes = append(r.Nodes, node) + node := NewNode(id) + r.Nodes = append(r.Nodes, node) - sort.Sort(r.Nodes) + sort.Sort(r.Nodes) - return node + return node } // Remove a node from the ring. func (r *Ring) RemoveNode(id string) error { - r.Lock() - defer r.Unlock() + r.Lock() + defer r.Unlock() - i := r.search(id) - if i >= r.Nodes.Len() || r.Nodes[i].Id != id { - return ErrNodeNotFound - } + i := r.search(id) + if i >= r.Nodes.Len() || r.Nodes[i].Id != id { + return ErrNodeNotFound + } - r.Nodes = append(r.Nodes[:i], r.Nodes[i+1:]...) + r.Nodes = append(r.Nodes[:i], r.Nodes[i+1:]...) - return nil + return nil } // Get the id of the node responsible for the hash range of id. func (r *Ring) Get(id string) string { - i := r.search(id) - if i >= r.Nodes.Len() { - i = 0 - } + i := r.search(id) + if i >= r.Nodes.Len() { + i = 0 + } - return r.Nodes[i].Id + return r.Nodes[i].Id } // GetNext will return the next node after the one responsible for the hash // range of id. func (r *Ring) GetNext(id string) (string, error) { - r.Lock() - defer r.Unlock() - var i = 0 - for i < r.Nodes.Len() && r.Nodes[i].Id != id { - i++ - } + r.Lock() + defer r.Unlock() + var i = 0 + for i < r.Nodes.Len() && r.Nodes[i].Id != id { + i++ + } - if i >= r.Nodes.Len() { - return "", ErrNodeNotFound - } + if i >= r.Nodes.Len() { + return "", ErrNodeNotFound + } - nextIndex := (i + 1) % r.Nodes.Len() + nextIndex := (i + 1) % r.Nodes.Len() - return r.Nodes[nextIndex].Id, nil + return r.Nodes[nextIndex].Id, nil } // hashId returns the hashed form of a key. func hashId(key string) uint32 { - return crc32.ChecksumIEEE([]byte(key)) % uint32(1000) + return crc32.ChecksumIEEE([]byte(key)) % uint32(1000) } diff --git a/dynamo/cache.go b/dynamo/cache.go index 0483a39..6989d91 100644 --- a/dynamo/cache.go +++ b/dynamo/cache.go @@ -1,8 +1,8 @@ package dynamo import ( - "log" - "sync" + "log" + "sync" ) /////////////////////////////// @@ -11,47 +11,50 @@ import ( // Cache is the struct that handle all the data storage for the dynamo server. type Cache struct { - data map[string]string - sync.Mutex + data map[string]string + timestamps map[string]int64 + sync.Mutex } // Create a new cache object and return a pointer to it. func NewCache() *Cache { - var s Cache + var s Cache - s.data = make(map[string]string) + s.data = make(map[string]string) + s.timestamps = make(map[string]int64) - return &s + return &s } // Get the value of a key from the storage. This will handle concurrent get // requests by locking the structure. func (cache *Cache) Get(key string) (value string, timestamp int64) { - cache.Lock() - value = cache.data[key] - timestamp = 0 - cache.Unlock() + cache.Lock() + value = cache.data[key] + timestamp = cache.timestamps[key] + cache.Unlock() - log.Printf("[CACHE] Getting Key '%v' with Value '%v' @ timestamp '%v'\n", key, value, timestamp) - return + log.Printf("[CACHE] Getting Key '%v' with Value '%v' @ timestamp '%v'\n", key, value, timestamp) + return } // Put a value to a key in the storage. This will handle concurrent put // requests by locking the structure. func (cache *Cache) Put(key string, value string, timestamp int64) { - log.Printf("[CACHE] Putting Key '%v' with Value '%v' @ timestamp '%v'\n", key, value, timestamp) + log.Printf("[CACHE] Putting Key '%v' with Value '%v' @ timestamp '%v'\n", key, value, timestamp) - cache.Lock() - cache.data[key] = value - cache.Unlock() + cache.Lock() + cache.data[key] = value + cache.timestamps[key] = timestamp + cache.Unlock() - return + return } // Retrieve all information from the server. This shouldn't be used in any way // except for testing purposes. func (cache *Cache) getAll() (data map[string]string, timestamps map[string]int64) { - data = cache.data - timestamps = make(map[string]int64) - return data, timestamps + data = cache.data + timestamps = cache.timestamps + return data, timestamps } diff --git a/dynamo/coordinator.go b/dynamo/coordinator.go index 408cc9e..2a75341 100644 --- a/dynamo/coordinator.go +++ b/dynamo/coordinator.go @@ -194,5 +194,12 @@ func aggregateVotes(votes []*vote) (result string) { // YOUR CODE GOES HERE // ///////////////////////// result = votes[0].value + ts := votes[0].timestamp + for _, vote := range votes { + if vote.timestamp > ts { + result = vote.value + ts = vote.timestamp + } + } return }