diff --git a/.DS_Store b/.DS_Store index d5ac120..78aa27a 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/.github/workflows/cmake-multi-platform.yml b/.github/workflows/cmake-multi-platform.yml new file mode 100644 index 0000000..74202d7 --- /dev/null +++ b/.github/workflows/cmake-multi-platform.yml @@ -0,0 +1,32 @@ +name: CMake Build (Ubuntu + macOS) + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + build: + name: Build on ${{ matrix.os }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest, macos-latest] + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Configure CMake + run: cmake -B build + + - name: Build + run: cmake --build build + + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: my-app-${{ matrix.os }} + path: | + build/my-executable # <-- cambia questo con il tuo eseguibile diff --git a/.gitignore b/.gitignore index 868bd23..734d2cc 100644 --- a/.gitignore +++ b/.gitignore @@ -7,5 +7,5 @@ CMakeFiles/* !build/build.sh !build/buildGnome.sh !build/buildTestMultiClient.sh -!build/buildTestMultiDataNodes.sh +!build/buildTestMultiDataNode.sh !build/buildTestMultiple.sh \ No newline at end of file diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 338ef43..62c0a20 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -6,7 +6,8 @@ "${workspaceFolder}/**", "/usr/local/Cellar/sdl2/2.32.2/include/SDL2", "${workspaceFolder}/common/src/filesystem", - "${workspaceFolder}/common/src/squidprotocol" + "${workspaceFolder}/common/src/squidprotocol", + "/usr/include/SDL2" ], "defines": [], "compilerPath": "/usr/bin/clang", diff --git a/build/build.sh b/build/build.sh index 1ce878d..fd33ff0 100755 --- a/build/build.sh +++ b/build/build.sh @@ -4,9 +4,9 @@ cmake .. make -j 8 # Move executables to their respective directories -mv ./SquidStorage ../test_txt/test_client/SquidStorage -mv ./SquidStorageServer ../test_txt/test_server/SquidStorageServer -mv ./DataNode ../test_txt/test_datanode/DataNode +cp ./SquidStorage ../test_txt/test_client/SquidStorage +cp ./SquidStorageServer ../test_txt/test_server/SquidStorageServer +cp ./DataNode ../test_txt/test_datanode/DataNode # Start a new tmux session SESSION_NAME="SquidStorage" diff --git a/build/buildTestMultiDataNode.sh b/build/buildTestMultiDataNode.sh new file mode 100755 index 0000000..a38a431 --- /dev/null +++ b/build/buildTestMultiDataNode.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# Build the project +cmake .. +make -j 8 + +# Move executables to their respective directories +cp ./SquidStorage ../test_txt/test_client1/SquidStorage +cp ./SquidStorageServer ../test_txt/test_server/SquidStorageServer +cp ./DataNode ../test_txt/test_datanode1/DataNode +cp ./DataNode ../test_txt/test_datanode2/DataNode +cp ./DataNode ../test_txt/test_datanode3/DataNode + +# Start a new tmux session +SESSION_NAME="SquidStorage" +tmux new-session -d -s $SESSION_NAME +tmux set -g mouse on + +# Run SquidStorageServer in the first pane +tmux rename-window -t $SESSION_NAME "Server" +tmux send-keys -t $SESSION_NAME "cd ../test_txt/test_server && ./SquidStorageServer" C-m +sleep 2 +# Split the window and run DataNode in the second pane +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode1 && ./DataNode" C-m + +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode2 && ./DataNode" C-m + +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode3 && ./DataNode" C-m + +# Split the window vertically and run SquidStorage in the third pane +tmux split-window -v +tmux send-keys "cd ../test_txt/test_client1 && ./SquidStorage" C-m + +# Attach to the tmux session +tmux select-pane -t 0 +tmux attach-session -t $SESSION_NAME + +# Clean up executables after execution +rm -f ../test_txt/test_client1/SquidStorage +rm -f ../test_txt/test_server/SquidStorageServer +rm -f ../test_txt/test_datanode1/DataNode +rm -f ../test_txt/test_datanode2/DataNode +rm -f ../test_txt/test_datanode3/DataNode +# make clean diff --git a/client/src/client.cpp b/client/src/client.cpp index 68429f7..edaa147 100644 --- a/client/src/client.cpp +++ b/client/src/client.cpp @@ -37,10 +37,10 @@ void Client::run() void Client::checkSecondarySocket() { - while (!secondarySquidProtocol.isAlive()) + if (!secondarySquidProtocol.isAlive()) { cout << "[CLIENT]: Lost connection, reconnecting..." << endl; - this_thread::sleep_for(chrono::seconds(5)); + return; // this->initiateConnection(); // if(secondarySquidProtocol.isAlive()) this->syncStatus(); } @@ -62,15 +62,23 @@ void Client::checkSecondarySocket() return; } - // Se ci sono dati sulla socket secondaria + // if data on second socket if (FD_ISSET(secondarySquidProtocol.getSocket(), &readfds)) { try { - // Ricevi e gestisci il messaggio Message mex = secondarySquidProtocol.receiveAndParseMessage(); cout << "[CLIENT]: Received message on secondary socket: " + mex.keyword << endl; - secondarySquidProtocol.requestDispatcher(mex); + if (mex.keyword == RELEASE_LOCK) + { + cout << "[CLIENT]: Received request for FileLock release" << endl; + FileManager::getInstance().getFileLock().setIsLocked(true); + } + else + { + secondarySquidProtocol.requestDispatcher(mex); + } + } catch (exception &e) { @@ -79,10 +87,11 @@ void Client::checkSecondarySocket() } else { - // Nessun messaggio disponibile - cout << "[CLIENT]: No messages on secondary socket" << endl; + return; + // cout << "[CLIENT]: No message available on secondary socket" << endl; } } + void Client::initiateConnection() { this->connectToServer(); @@ -147,6 +156,10 @@ void Client::createFile(string filePath) { handleRequest(squidProtocol.createFile(filePath)); } +void Client::createFile(string filePath, int version) +{ + handleRequest(squidProtocol.createFile(filePath, version)); +} void Client::deleteFile(string filePath) { @@ -158,6 +171,11 @@ void Client::updateFile(string filePath) handleRequest(squidProtocol.updateFile(filePath)); } +void Client::updateFile(std::string filePath, int version) +{ + handleRequest(squidProtocol.updateFile(filePath, version)); +} + void Client::syncStatus() { handleRequest(squidProtocol.syncStatus()); @@ -173,6 +191,11 @@ void Client::releaseLock(string filePath) handleRequest(squidProtocol.releaseLock(filePath)); } +bool Client::isSecondarySocketAlive() +{ + return secondarySquidProtocol.isAlive(); +} + void Client::testing() { this->initiateConnection(); diff --git a/client/src/client.hpp b/client/src/client.hpp index 7d72b65..8affc72 100644 --- a/client/src/client.hpp +++ b/client/src/client.hpp @@ -11,7 +11,7 @@ #include "squidprotocol.hpp" #define SERVER_IP "127.0.0.1" -#define SERVER_PORT 8080 +#define SERVER_PORT 12345 #define BUFFER_SIZE 1024 class Client : public Peer @@ -27,11 +27,14 @@ class Client : public Peer virtual void initiateConnection(); virtual void checkSecondarySocket(); virtual void createFile(std::string filePath); + virtual void createFile(std::string filePath, int version); virtual void deleteFile(std::string filePath); virtual void updateFile(std::string filePath); + virtual void updateFile(std::string filePath, int version); virtual void syncStatus(); virtual bool acquireLock(std::string filePath); virtual void releaseLock(std::string filePath); + virtual bool isSecondarySocketAlive(); private: SquidProtocol secondarySquidProtocol; diff --git a/common/src/filesystem/filemanager.cpp b/common/src/filesystem/filemanager.cpp index 7fc5b92..3c3bc50 100644 --- a/common/src/filesystem/filemanager.cpp +++ b/common/src/filesystem/filemanager.cpp @@ -8,6 +8,9 @@ FileManager::FileManager() fileMap[entry] = FileLock(entry); } std::cout << "[FILEMANAGER]: File map initialized" << std::endl; + + // creating file version file if it doesn't exist + createFileVersionFile(); } void FileManager::setFileMap(std::map fileMap) @@ -45,13 +48,49 @@ std::map FileManager::getFilesLastWrite(std::st std::map filesLastWrite; for (auto file : files) { - if (file == ".DS_Store" || file == "SquidStorage" || file == "SquidStorageServer" || file == "imgui.ini" || file == "DataNode") + if (file == ".DS_Store" || file == "SquidStorage" || file == "SquidStorageServer" || file == "imgui.ini" || file == "DataNode" || file == ".fileVersion.txt") continue; filesLastWrite[file] = fs::last_write_time(file); } return filesLastWrite; } +std::map FileManager::getFileVersionMap(std::string path) +{ + std::map savedFilesVersion; + std::ifstream versionFile(FILE_VERSION_PATH); + std::string line; + while (std::getline(versionFile, line)) + { + std::istringstream iss(line); + std::string filePath; + int version; + if (iss >> filePath >> version) + { + savedFilesVersion[filePath] = version; + } + } + versionFile.close(); + + std::map filesVersion; + auto files = this->getFiles(path); + for (auto file : files) + { + if (file == ".DS_Store" || file == "SquidStorage" || file == "SquidStorageServer" || file == "imgui.ini" || file == "DataNode" || file == ".fileVersion.txt") + continue; + if (savedFilesVersion.find(file) != savedFilesVersion.end()) + { + filesVersion[file] = savedFilesVersion[file]; + } + else + { + filesVersion[file] = 0; + } + } + + return filesVersion; +} + char *FileManager::stringToChar(std::string str) { char *res = new char[str.length() + 1]; @@ -71,11 +110,55 @@ bool FileManager::createFile(std::string path) return true; } +bool FileManager::createFile(std::string path, int version) +{ + if (createFile(path)) + { + if (setFileVersion(path, version)) + { + return true; + } + } + return false; +} + bool FileManager::deleteFile(std::string path) { return fs::remove(path); } +bool FileManager::deleteFileAndVersion(std::string path) +{ + if (deleteFile(path)) + { + std::ifstream versionFile(FILE_VERSION_PATH); + std::string line; + std::vector lines; + while (std::getline(versionFile, line)) + { + std::istringstream iss(line); + std::string filePath; + int version; + if (iss >> filePath >> version) + { + if (filePath != path) + { + lines.push_back(line); + } + } + } + versionFile.close(); + std::ofstream newVersionFile(FILE_VERSION_PATH); + for (auto line : lines) + { + newVersionFile << line << std::endl; + } + newVersionFile.close(); + return true; + } + return false; +} + bool FileManager::updateFile(std::string path, std::string content) { std::ofstream file(path, std::ios::trunc); @@ -88,6 +171,36 @@ bool FileManager::updateFile(std::string path, std::string content) return true; } +bool FileManager::updateFile(std::string path, std::string content, int version) +{ + if (updateFile(path, content)) + { + if (setFileVersion(path, version)) + { + return true; + } + } + return false; +} + +bool FileManager::updateFileAndVersion(std::string path, std::string content) +{ + int version = getFileVersion(path); + if (version == -1) + { + return false; // file version not found + } + version++; + if (updateFile(path, content)) + { + if (setFileVersion(path, version)) + { + return true; + } + } + return false; +} + std::string FileManager::readFile(std::string path) { std::ifstream file(path); @@ -112,6 +225,77 @@ std::string FileManager::formatFileList(std::vector files) return fileList; } +int FileManager::getFileVersion(std::string path) +{ + std::ifstream versionFile(FILE_VERSION_PATH); + std::string line; + while (std::getline(versionFile, line)) + { + std::istringstream iss(line); + std::string filePath; + int version; + if (iss >> filePath >> version) + { + if (filePath == path) + { + versionFile.close(); + return version; + } + } + } + return -1; // file not found +} + +bool FileManager::setFileVersion(std::string path, int version) +{ + std::ifstream versionFile(FILE_VERSION_PATH); + std::string line; + std::vector lines; + bool found = false; + while (std::getline(versionFile, line)) + { + std::istringstream iss(line); + std::string filePath; + int ver; + if (iss >> filePath >> ver) + { + if (filePath == path) + { + lines.push_back(filePath + " " + std::to_string(version)); + found = true; + } + else + { + lines.push_back(line); + } + } + } + versionFile.close(); + if (!found) + { + lines.push_back(path + " " + std::to_string(version)); + } + std::ofstream newVersionFile(FILE_VERSION_PATH); + for (auto line : lines) + { + newVersionFile << line << std::endl; + } + newVersionFile.close(); + + return true; +} + +// used by client +void FileManager::setFileLock(FileLock fileLock) +{ + this->fileLock = fileLock; +} + +FileLock& FileManager::getFileLock() +{ + return this->fileLock; +} + void FileManager::updateFileMap() { std::vector entries = getFiles(DEFAULT_PATH); @@ -120,4 +304,15 @@ void FileManager::updateFileMap() if (fileMap.find(entry) == fileMap.end()) fileMap[entry] = FileLock(entry); } -} \ No newline at end of file +} + +void FileManager::createFileVersionFile() +{ + std::ifstream versionFile(FILE_VERSION_PATH); + if (!versionFile) + { + std::ofstream newFile(FILE_VERSION_PATH); + newFile.close(); + } + versionFile.close(); +} diff --git a/common/src/filesystem/filemanager.hpp b/common/src/filesystem/filemanager.hpp index a78365d..c4f00e6 100644 --- a/common/src/filesystem/filemanager.hpp +++ b/common/src/filesystem/filemanager.hpp @@ -8,6 +8,8 @@ #include "filelock.hpp" #define DEFAULT_PATH fs::current_path().string() // current directory +#define FILE_VERSION_PATH DEFAULT_PATH + "/.fileVersion.txt" + namespace fs = std::filesystem; // Singleton class: private constructor and prevent copying @@ -27,13 +29,23 @@ class FileManager std::vector getFiles(std::string path); std::vector getFileEntries(std::string path); std::map getFilesLastWrite(std::string path); + std::map getFileVersionMap(std::string path); char *stringToChar(std::string str); bool createFile(std::string path); + bool createFile(std::string path, int version); bool deleteFile(std::string path); + bool deleteFileAndVersion(std::string path); bool updateFile(std::string path, std::string content); + bool updateFile(std::string path, std::string content, int version); + bool updateFileAndVersion(std::string path, std::string content); std::string readFile(std::string path); std::string formatFileList(std::vector files); + int getFileVersion(std::string path); + bool setFileVersion(std::string path, int version); + + void setFileLock(FileLock fileLock); + FileLock& getFileLock(); bool acquireLock(std::string path); bool releaseLock(std::string path); @@ -47,4 +59,6 @@ class FileManager FileManager(const FileManager &) = delete; FileManager &operator=(const FileManager &) = delete; std::map fileMap; + void createFileVersionFile(); + FileLock fileLock; }; \ No newline at end of file diff --git a/common/src/squidprotocol/squidProtocolFormatter.cpp b/common/src/squidprotocol/squidProtocolFormatter.cpp index c0255f7..3f89826 100644 --- a/common/src/squidprotocol/squidProtocolFormatter.cpp +++ b/common/src/squidprotocol/squidProtocolFormatter.cpp @@ -117,6 +117,11 @@ std::string SquidProtocolFormatter::createFileFormat(std::string filePath) return this->createMessage(CREATE_FILE, {"filePath:" + filePath}); } +std::string SquidProtocolFormatter::createFileFormat(std::string filePath, int version) +{ + return this->createMessage(CREATE_FILE, {"filePath:" + filePath, "fileVersion:" + std::to_string(version)}); +} + std::string SquidProtocolFormatter::transferFileFormat(std::string filePath) { return this->createMessage(TRANSFER_FILE, {"filePath:" + filePath}); @@ -132,6 +137,11 @@ std::string SquidProtocolFormatter::updateFileFormat(std::string filePath) return this->createMessage(UPDATE_FILE, {"filePath:" + filePath}); } +std::string SquidProtocolFormatter::updateFileFormat(std::string filePath, int version) +{ + return this->createMessage(UPDATE_FILE, {"filePath:" + filePath, "fileVersion:" + std::to_string(version)}); +} + std::string SquidProtocolFormatter::deleteFileFormat(std::string filePath) { return this->createMessage(DELETE_FILE, {"filePath:" + filePath}); @@ -182,6 +192,7 @@ std::string SquidProtocolFormatter::responseFormat(bool lock) return this->createMessage(RESPONSE, {"isLocked:" + std::to_string(lock)}); } +// deprecated std::string SquidProtocolFormatter::responseFormat(std::map filesLastWrite) { std::vector arguments; @@ -192,6 +203,16 @@ std::string SquidProtocolFormatter::responseFormat(std::mapcreateMessage(RESPONSE, arguments); } +std::string SquidProtocolFormatter::responseFormat(std::map fileVersionMap) +{ + std::vector arguments; + for (auto arg : fileVersionMap) + { + arguments.push_back(arg.first + ":" + std::to_string(arg.second)); + } + return this->createMessage(RESPONSE, arguments); +} + std::string SquidProtocolFormatter::responseFormat(std::map fileTimeMap) { std::vector arguments; diff --git a/common/src/squidprotocol/squidProtocolFormatter.hpp b/common/src/squidprotocol/squidProtocolFormatter.hpp index d6c6af5..d5f7b2b 100644 --- a/common/src/squidprotocol/squidProtocolFormatter.hpp +++ b/common/src/squidprotocol/squidProtocolFormatter.hpp @@ -32,7 +32,8 @@ class Message Message() {}; Message(ProtocolKeyWord keyword, std::map args) : keyword(keyword), args(args) {} - std::string toString(){ + std::string toString() + { std::string result = "Message: \n"; for (const auto &arg : args) { @@ -54,9 +55,12 @@ class SquidProtocolFormatter std::string connectServerFormat(); std::string createFileFormat(std::string filePath); + std::string createFileFormat(std::string filePath, int version); + std::string transferFileFormat(std::string fileContent); std::string readFileFormat(std::string filePath); std::string updateFileFormat(std::string filePath); + std::string updateFileFormat(std::string filePath, int version); std::string deleteFileFormat(std::string filePath); std::string acquireLockFormat(std::string filePath); @@ -70,9 +74,9 @@ class SquidProtocolFormatter std::string responseFormat(int port); std::string responseFormat(std::string nodeType, std::string processName); std::string responseFormat(std::map fileTimeMap); + std::string responseFormat(std::map fileVersionMap); std::string responseFormat(std::map filesLastWrite); - private: std::string nodeType; std::string createMessage(ProtocolKeyWord keyword, std::vector args); diff --git a/common/src/squidprotocol/squidprotocol.cpp b/common/src/squidprotocol/squidprotocol.cpp index 2de6bc0..43f2440 100644 --- a/common/src/squidprotocol/squidprotocol.cpp +++ b/common/src/squidprotocol/squidprotocol.cpp @@ -67,6 +67,18 @@ Message SquidProtocol::createFile(string filePath) transferFile(filePath, response); return receiveAndParseMessage(); } +Message SquidProtocol::createFile(string filePath, int version) +{ + cout << "file name: " + filePath << endl; + this->sendMessage(this->formatter.createFileFormat(filePath, version)); + cout << nodeType + ": sent create file request" << endl; + Message response = receiveAndParseMessage(); + cout << nodeType + ": received create file response" << endl; + transferFile(filePath, response); + return receiveAndParseMessage(); +} + + Message SquidProtocol::updateFile(string filePath) { @@ -76,6 +88,14 @@ Message SquidProtocol::updateFile(string filePath) return receiveAndParseMessage(); } +Message SquidProtocol::updateFile(string filePath, int version) +{ + this->sendMessage(this->formatter.updateFileFormat(filePath, version)); + Message response = receiveAndParseMessage(); + transferFile(filePath, response); + return receiveAndParseMessage(); +} + void SquidProtocol::transferFile(string filePath, Message response) { cout << nodeType + ": trying transfering file" << endl; @@ -142,6 +162,7 @@ Message SquidProtocol::listFiles() return response; } + // executed by client Message SquidProtocol::syncStatus() { @@ -150,28 +171,35 @@ Message SquidProtocol::syncStatus() Message response = receiveAndParseMessage(); if (response.keyword == RESPONSE) { + cout << nodeType + "response.args.size()" << response.args.size() << endl; + if (response.args.find("ACK") != response.args.end()) + { + cout << nodeType + ": received NACK from server" << endl; + return response; + } cout << nodeType + ": received sync status response" << endl; - map filesLastWrite; - filesLastWrite = FileManager::getInstance().getFilesLastWrite(DEFAULT_FOLDER_PATH); + map fileVersionMap; + fileVersionMap = FileManager::getInstance().getFileVersionMap(DEFAULT_FOLDER_PATH); cout << nodeType + ": checking files: " << endl; - for (auto localFile : filesLastWrite) + for (auto localFile : fileVersionMap) { if (response.args.find(localFile.first) != response.args.end()) { cout << nodeType + ": found file that already exists on server " << localFile.first << endl; - if (localFile.second.time_since_epoch().count() > stoll(response.args[localFile.first])) + if (localFile.second > stoi(response.args[localFile.first])) { // in this case server needs to update the file cout << nodeType + ": server needs to update the file: " + localFile.first << endl; - this->updateFile(localFile.first); + this->updateFile(localFile.first, localFile.second); // this->fileTransfer.sendFile(this->socket_fd, this->processName.c_str(), localFile.first.c_str()); } - else if (localFile.second.time_since_epoch().count() < stoll(response.args[localFile.first])) + else if (localFile.second < stoi(response.args[localFile.first])) { // in this case client needs to update the file // this->fileTransfer.receiveFile(this->socket_fd, this->processName.c_str(), localFile.first.c_str()); cout << nodeType + ": client needs to update the file: " + localFile.first << endl; this->readFile(localFile.first); + FileManager::getInstance().setFileVersion(localFile.first, stoi(response.args[localFile.first])); } response.args.erase(localFile.first); } @@ -179,9 +207,10 @@ Message SquidProtocol::syncStatus() { // in this case server needs to create the file cout << nodeType + ": server needs to create the file: " + localFile.first << endl; - this->createFile(localFile.first); + this->createFile(localFile.first, localFile.second); } } + cout << nodeType + "response.args.size()" << response.args.size() << endl; if (response.args.size() > 0) { for (auto remoteFile : response.args) @@ -190,12 +219,15 @@ Message SquidProtocol::syncStatus() // this->fileManager.deleteFile(remoteFile.first); cout << nodeType + ": client needs to create the file: " + remoteFile.first << endl; this->readFile(remoteFile.first); + FileManager::getInstance().setFileVersion(remoteFile.first, stoi(remoteFile.second)); } } } return formatter.parseMessage(formatter.responseFormat(string("ACK"))); } + + // --------------------------- // --------- PARSING --------- // --------------------------- @@ -243,7 +275,7 @@ bool SquidProtocol::handleErrors(ssize_t bytes) alive = false; return false; } - else if (bytes< 0) + else if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -277,11 +309,17 @@ void SquidProtocol::response(string nodeType, string processName) this->sendMessage(this->formatter.responseFormat(nodeType, processName)); } +// deprecated void SquidProtocol::response(map filesLastWrite) { this->sendMessage(this->formatter.responseFormat(filesLastWrite)); } +void SquidProtocol::response(map fileVersionMap) +{ + this->sendMessage(this->formatter.responseFormat(fileVersionMap)); +} + void SquidProtocol::response(map fileTimeMap) { this->sendMessage(this->formatter.responseFormat(fileTimeMap)); @@ -329,6 +367,7 @@ void SquidProtocol::requestDispatcher(Message message) this->response(string("ACK")); cout << nodeType + ": Receiving file" << endl; this->fileTransfer.receiveFile(this->socket_fd, this->processName.c_str(), message.args["filePath"].c_str()); + FileManager::getInstance().setFileVersion(message.args["filePath"], stoi(message.args["fileVersion"])); this->response(string("ACK")); break; case TRANSFER_FILE: @@ -346,11 +385,12 @@ void SquidProtocol::requestDispatcher(Message message) cout << nodeType + ": received update file request\n"; this->response(string("ACK")); this->fileTransfer.receiveFile(this->socket_fd, this->processName.c_str(), message.args["filePath"].c_str()); + FileManager::getInstance().setFileVersion(message.args["filePath"], stoi(message.args["fileVersion"])); this->response(string("ACK")); break; case DELETE_FILE: cout << nodeType + ": received delete file request\n"; - FileManager::getInstance().deleteFile(message.args["filePath"]); + FileManager::getInstance().deleteFileAndVersion(message.args["filePath"]); this->response(string("ACK")); break; // case ACQUIRE_LOCK: @@ -358,7 +398,7 @@ void SquidProtocol::requestDispatcher(Message message) // this->response(FileManager::getInstance().acquireLock(message.args["filePath"])); // break; case RELEASE_LOCK: - //FileManager::getInstance().releaseLock(message.args["filePath"]); + // FileManager::getInstance().releaseLock(message.args["filePath"]); this->response(string("ACK")); break; case HEARTBEAT: @@ -366,7 +406,8 @@ void SquidProtocol::requestDispatcher(Message message) break; case SYNC_STATUS: cout << nodeType + ": received sync status request\n"; - this->response(FileManager::getInstance().getFilesLastWrite(DEFAULT_FOLDER_PATH)); + cout << "getting file version map" << endl; + this->response(FileManager::getInstance().getFileVersionMap(DEFAULT_FOLDER_PATH)); break; case IDENTIFY: this->response(this->nodeType, this->processName); diff --git a/common/src/squidprotocol/squidprotocol.hpp b/common/src/squidprotocol/squidprotocol.hpp index db1d680..17a9b43 100644 --- a/common/src/squidprotocol/squidprotocol.hpp +++ b/common/src/squidprotocol/squidprotocol.hpp @@ -40,8 +40,10 @@ class SquidProtocol virtual Message listFiles(); virtual Message createFile(string filePath); + virtual Message createFile(string filePath, int version); virtual Message readFile(string filePath); virtual Message updateFile(string filePath); + virtual Message updateFile(string filePath, int version); virtual Message deleteFile(string filePath); virtual Message acquireLock(string filePath); @@ -58,6 +60,8 @@ class SquidProtocol virtual void response(int port); virtual void response(string ack); virtual void response(string nodeType, string processName); + virtual void response(map fileVersionMap); + // deprecated virtual void response(map filesLastWrite); virtual void response(map fileTimeMap); diff --git a/data-node/src/datanode.hpp b/data-node/src/datanode.hpp index fa80e48..b5e30be 100644 --- a/data-node/src/datanode.hpp +++ b/data-node/src/datanode.hpp @@ -11,7 +11,7 @@ #include "squidprotocol.hpp" #define SERVER_IP "127.0.0.1" -#define SERVER_PORT 8080 +#define SERVER_PORT 12345 #define BUFFER_SIZE 1024 class DataNode: public Peer diff --git a/data-node/src/main.cpp b/data-node/src/main.cpp index 23780ef..5e3335d 100644 --- a/data-node/src/main.cpp +++ b/data-node/src/main.cpp @@ -4,13 +4,18 @@ int main(int argc, char **argv) { - int port = 12345; + const char* server_ip = SERVER_IP; + int server_port = SERVER_PORT; + if (argc > 1) - { - port = atoi(argv[1]); - } - std::cout << "Starting datanode" << std::endl; + server_ip = argv[1]; + if (argc > 2) + server_port = atoi(argv[2]); + + std::cout << "Starting DataNode. Server IP: " << server_ip << ", Server Port: " << server_port << std::endl; std::string currentPath = fs::current_path().string(); // current directory - DataNode datanode = DataNode(port, std::string("DATANODE"), currentPath); + DataNode datanode(server_ip, server_port, std::string("DATANODE"), currentPath); datanode.run(); + + return 0; } \ No newline at end of file diff --git a/imgui/src/Application.cpp b/imgui/src/Application.cpp index 1c5d10c..923ccd4 100644 --- a/imgui/src/Application.cpp +++ b/imgui/src/Application.cpp @@ -5,44 +5,117 @@ namespace SquidStorage { - std::string currentPath = fs::current_path().string(); // current directory std::string selectedFile = ""; // selected file std::string fileContent = ""; // selected file content bool showFileSavedMessage = false; bool showFileSaveButton = false; bool showFileDeleteButton = false; + std::atomic showLoadingPopup = false; + std::atomic isConnected = false; bool showFileEditor = false; + bool showErrorMessage = false; + bool showRefreshMessage = false; bool newFileButtonPressed = false; bool deleteButtonPressed = false; char newFileName[128]; + int openedFileVersion = -1; + bool executed = false; + bool connectedOnce = false; + Client client("127.0.0.1", 12345, "CLIENT", currentPath); FileLock fileLock; int currentFrame = 0; + + void attachClient(Client client) + { + SquidStorage::client = client; + } + void runClient() { - client.initiateConnection(); std::thread secondarySocketThread([]() { try { - client.run(); + client.initiateConnection(); + showLoadingPopup = true; + client.syncStatus(); + showLoadingPopup = false; + std::cout << "[GUI]: Closing loading popup" << std::endl; } catch (const std::exception &e) { std::cerr << "[CLIENT]: Error in secondary socket thread: " << e.what() << std::endl; } }); secondarySocketThread.detach(); - client.syncStatus(); + executed = true; + connectedOnce = true; + std::cout << "[GUI]: Socket thread started" << std::endl; + /* + std::thread syncThread([]() + { + try + { + showLoadingPopup = true; + client.syncStatus(); + showLoadingPopup = false; + std::cout << "[GUI]: Closing loading popup" << std::endl; + } + catch (const std::exception &e) + { + std::cerr << "[CLIENT]: Error in sync thread: " << e.what() << std::endl; + } }); + syncThread.detach(); + */ } void RenderUI() { + // cout << "[DEBUG]: isSecondarySocketConnected -> "<< client.isSecondarySocketAlive() << std::endl; + if (connectedOnce) + { + if (client.isSecondarySocketAlive()) + { + client.checkSecondarySocket(); + executed = false; + } + else + { + if (!executed) + { + cout << "[GUI]: Connection lost, restarting client..." << std::endl; + cout << "[DEBUG]: Executing SquidStorage::runClient()" << std::endl; + SquidStorage::runClient(); + executed = true; + } + } + } + + if (currentFrame == UPDATE_EVERY) { currentFrame = 0; - //client.checkSecondarySocket(); // this blocks the gui if connection is lost + // client.checkSecondarySocket(); // this blocks the gui if connection is lost + } + + // Popup for loading rendering + if (showLoadingPopup) + { + ImGui::OpenPopup("Loading..."); + } + if (ImGui::BeginPopupModal("Loading...", NULL, ImGuiWindowFlags_NoTitleBar | ImGuiWindowFlags_NoResize | ImGuiWindowFlags_NoMove)) + { + ImGui::Text("Please wait..."); + ImGui::Text("Synchronizing with the server..."); + ImGui::Separator(); + ImGui::Text("This may take a few seconds."); + if (!showLoadingPopup) + { + ImGui::CloseCurrentPopup(); + } + ImGui::EndPopup(); } currentFrame++; @@ -178,10 +251,19 @@ namespace SquidStorage { if (ImGui::Selectable(("- " + name).c_str(), selectedFile == name)) { - selectedFile = entry.path().filename().string(); - fileContent = FileManager::getInstance().readFile(selectedFile); - showFileDeleteButton = true; - showFileEditor = true; + std::thread readThread([entry]() + { + showLoadingPopup = true; + selectedFile = entry.path().filename().string(); + fileContent = FileManager::getInstance().readFile(selectedFile); + openedFileVersion = FileManager::getInstance().getFileVersion(selectedFile); + showFileDeleteButton = true; + showFileEditor = true; + showFileSavedMessage = false; + showErrorMessage = false; + showRefreshMessage = false; + showLoadingPopup = false; }); + readThread.detach(); } } } @@ -201,9 +283,10 @@ namespace SquidStorage ImGui::InputText("File name", newFileName, 128); if (ImGui::Button("Create")) { - if (FileManager::getInstance().createFile(newFileName)) + if (FileManager::getInstance().createFile(newFileName, 0)) { - client.createFile(newFileName); + client.createFile(newFileName, 0); + openedFileVersion = 0; fileContent = ""; newFileButtonPressed = false; } @@ -216,7 +299,7 @@ namespace SquidStorage if (deleteButtonPressed) { - if (FileManager::getInstance().deleteFile(selectedFile)) + if (FileManager::getInstance().deleteFileAndVersion(selectedFile)) { client.deleteFile(selectedFile); selectedFile = ""; @@ -244,16 +327,24 @@ namespace SquidStorage fileContent = std::string(buffer); showFileSavedMessage = false; showFileSaveButton = fileCanBeSaved(); + showErrorMessage = !showFileSaveButton; } ImGui::EndChild(); if (showFileSaveButton) { if (ImGui::Button("Save")) { - if (FileManager::getInstance().updateFile(selectedFile, fileContent)) + if (FileManager::getInstance().updateFileAndVersion(selectedFile, fileContent)) { - client.updateFile(selectedFile); - showFileSavedMessage = true; + std::thread updateThread([]() + { + showLoadingPopup = true; + client.updateFile(selectedFile, FileManager::getInstance().getFileVersion(selectedFile)); + //std::this_thread::sleep_for(std::chrono::seconds(1)); + openedFileVersion = FileManager::getInstance().getFileVersion(selectedFile); + showFileSavedMessage = true; + showLoadingPopup = false; }); + updateThread.detach(); } else { @@ -273,8 +364,37 @@ namespace SquidStorage showFileEditor = false; showFileSavedMessage = false; showFileSaveButton = false; + showErrorMessage = false; + showRefreshMessage = false; + openedFileVersion = 0; + + } + if (showErrorMessage) + { + ImGui::Separator(); + ImGui::TextColored(ImVec4(1,0,0,1), "You cannot save the file: either the connection is missing or the file is locked by another user."); + } + if (showRefreshMessage) + { + ImGui::TextColored(ImVec4(1,0,0,1), "A newer version of the file is available, please refresh or close the file."); + + ImGui::SameLine(); + if (ImGui::Button("Refresh")) + { + std::thread readThread([]() + { + showLoadingPopup = true; + fileContent = FileManager::getInstance().readFile(selectedFile); + openedFileVersion = FileManager::getInstance().getFileVersion(selectedFile); + showFileDeleteButton = true; + showFileEditor = true; + showFileSavedMessage = false; + showErrorMessage = false; + showRefreshMessage = false; + showLoadingPopup = false; }); + readThread.detach(); + } } - if (showFileSavedMessage) ImGui::Text("File saved!"); } @@ -285,20 +405,29 @@ namespace SquidStorage { if (selectedFile.empty()) return false; - if (fileLock.getFilePath() == selectedFile) + if (!client.isSecondarySocketAlive()) + return false; + if (openedFileVersion < FileManager::getInstance().getFileVersion(selectedFile)) + { + showRefreshMessage = true; + return false; + } + + if (FileManager::getInstance().getFileLock().getFilePath() == selectedFile) { - if (fileLock.isLocked()) + if (FileManager::getInstance().getFileLock().isLocked()) { - fileLock.setIsLocked(!client.acquireLock(selectedFile)); + FileManager::getInstance().getFileLock().setIsLocked(!client.acquireLock(selectedFile)); } - return !fileLock.isLocked(); + return !FileManager::getInstance().getFileLock().isLocked(); } else { - fileLock = FileLock(selectedFile); - fileLock.setIsLocked(!client.acquireLock(selectedFile)); - return !fileLock.isLocked(); + auto newFileLock = FileLock(selectedFile); + newFileLock.setIsLocked(!client.acquireLock(selectedFile)); + FileManager::getInstance().setFileLock(newFileLock); + return !FileManager::getInstance().getFileLock().isLocked(); } return false; } -} +} \ No newline at end of file diff --git a/imgui/src/Application.hpp b/imgui/src/Application.hpp index 2f59529..49d805e 100644 --- a/imgui/src/Application.hpp +++ b/imgui/src/Application.hpp @@ -2,8 +2,10 @@ #include #include "../../common/src/filesystem/filemanager.hpp" #include "client.hpp" +#include namespace SquidStorage { + void attachClient(Client client); void runClient(); void RenderUI(); void renderFileExplorer(); diff --git a/imgui/src/main.cpp b/imgui/src/main.cpp index 65347f7..f66773c 100644 --- a/imgui/src/main.cpp +++ b/imgui/src/main.cpp @@ -19,9 +19,20 @@ #include #include "Application.hpp" +std::string currentPath = fs::current_path().string(); + // Main code -int main(int, char **) +int main(int argc, char **argv) { + const char* server_ip = SERVER_IP; + int server_port = SERVER_PORT; + if (argc > 1) + server_ip = argv[1]; + if (argc > 2) + server_port = atoi(argv[2]); + + std::cout << "Starting Client. Server IP: " << server_ip << ", Server Port: " << server_port << std::endl; + SquidStorage::attachClient(Client(server_ip, server_port, "CLIENT", currentPath)); // Setup SDL if (SDL_Init(SDL_INIT_VIDEO | SDL_INIT_TIMER | SDL_INIT_GAMECONTROLLER) != 0) { diff --git a/main.cpp b/main.cpp deleted file mode 100644 index 979673e..0000000 --- a/main.cpp +++ /dev/null @@ -1,33 +0,0 @@ -#include -#include -#include "server.hpp" -#include "datanode.hpp" -#include "client.hpp" - -using namespace std; -int main() -{ - std::thread server_thread([](){ - Server server(12345); - server.run(); }); - - std::this_thread::sleep_for(std::chrono::seconds(0)); - - // std::thread datanode_thread1([](){ - // DataNode datanode("127.0.0.1", 12345); - // std::this_thread::sleep_for(std::chrono::seconds(1)); - // datanode.run(); }); - - std::thread client_thread1([](){ - Client client("127.0.0.1", 12345); - std::this_thread::sleep_for(std::chrono::seconds(3)); - client.testing(); }); - - client_thread1.join(); - std::cout << "Client thread finished" << std::endl; - - // datanode_thread1.join(); - // std::cout << "Datanode thread finished" << std::endl; - - return 0; -} \ No newline at end of file diff --git a/server/src/main.cpp b/server/src/main.cpp index 0c1de36..af1eb0c 100644 --- a/server/src/main.cpp +++ b/server/src/main.cpp @@ -4,12 +4,21 @@ int main(int argc, char **argv) { - int port = 12345; + int port = DEFAULT_PORT; + int replicationFactor = DEFAULT_REPLICATION_FACTOR; + int timeoutSeconds = DEFAULT_TIMEOUT; + if (argc > 1) - { port = atoi(argv[1]); - } - std::cout << "Starting server on port: " << port << std::endl; - Server server = Server(port); + if (argc > 2) + replicationFactor = atoi(argv[2]); + if (argc > 3) + timeoutSeconds = atoi(argv[3]); + + std::cout << "Starting server on port: " << port + << ", replication factor: " << replicationFactor + << ", timeout: " << timeoutSeconds << "s" << std::endl; + + Server server(port, replicationFactor, timeoutSeconds); server.run(); } \ No newline at end of file diff --git a/server/src/server.cpp b/server/src/server.cpp index 0052897..7d1be1c 100644 --- a/server/src/server.cpp +++ b/server/src/server.cpp @@ -55,8 +55,8 @@ Server::Server(int port, int replicationFactor, int timeoutSeconds) fileTransfer = FileTransfer(); fileLockMap = map(); - fileTimeMap = map(); - loadMapFromFile(); + // fileTimeMap = map(); + // loadMapFromFile(); primarySocketMap = map(); clientEndpointMap = map>(); @@ -102,7 +102,11 @@ void Server::run() } readfds = master_set; - if (select(max_sd + 1, &readfds, NULL, NULL, NULL) < 0) + + struct timeval timeout; + timeout.tv_sec = 1; // 1 second timeout + timeout.tv_usec = 0; + if (select(max_sd + 1, &readfds, NULL, NULL, &timeout) < 0) { cerr << "[SERVER]: Select failed" << endl; checkCloseConnetions(master_set, max_sd); @@ -129,14 +133,17 @@ void Server::run() } } } - - sendHearbeats(); // datanodes only - saveMapToFile(); // save file time map + sendHeartbeats(); // datanodes only + // saveMapToFile(); // save file time map checkFileLockExpiration(); + + // printMap(dataNodeEndpointMap, "DataNode Endpoint Map"); + // printMap(dataNodeReplicationMap, "DataNode Replication Map"); } } -void Server::checkFileLockExpiration(){ +void Server::checkFileLockExpiration() +{ auto now = chrono::system_clock::now(); for (auto it = fileLockMap.begin(); it != fileLockMap.end();) { @@ -178,7 +185,7 @@ void Server::checkCloseConnetions(fd_set &master_set, int max_sd) } } -void Server::sendHearbeats() +void Server::sendHeartbeats() { vector erasable = vector(); for (auto &datanode : dataNodeEndpointMap) @@ -191,38 +198,57 @@ void Server::sendHearbeats() datanode.second.setIsAlive(false); datanode.second.closeConn(); erasable.push_back(datanode.first); - eraseFromReplicationMap(datanode.second.getProcessName()); - cout << "[SERVER]: Datanode removed from map: " + datanode.first << endl; + // dataNodeEndpointMap.erase(datanode.first); + // eraseFromReplicationMap(datanode.first); + cout << "[SERVER]: Datanode removed from replication map: " + datanode.first << endl; } } for (auto &datanode : erasable) { dataNodeEndpointMap.erase(datanode); - cout << "[SERVER]: Datanode removed from map: " + datanode << endl; + cout << "[SERVER]: Datanode removed from endpoint map: " + datanode << endl; } - cout << "[SERVER]: Heartbeat sent to all datanodes" << endl; + // erase all datanodes that are not alive + eraseFromReplicationMap(erasable); + + // cout << "[SERVER]: Heartbeat sent to all datanodes" << endl; +} + +void Server::eraseFromReplicationMap(vector datanodeNames) +{ + for (auto &datanodeName : datanodeNames) + { + cout << "[SERVER]: Erasing datanode: " + datanodeName + " from replication map" << endl; + eraseFromReplicationMap(datanodeName); + } } void Server::eraseFromReplicationMap(string datanodeName) { for (auto it = dataNodeReplicationMap.begin(); it != dataNodeReplicationMap.end();) { - // for each file check if the datanode endpoint holds the file + cout << "[SERVER]: Checking file: " + it->first << endl; + // printMap(it->second, "Datanode holding the file"); + // for each file check if the datanode endpoint holds the file auto datanodeEndpoint = it->second.find(datanodeName); if (datanodeEndpoint != it->second.end()) { it->second.erase(datanodeEndpoint->first); // erase from internal map - cout << "[SERVER]: Datanode removed from replication map of file: " + it->first << endl; + cout << "[SERVER]: Datanode " + datanodeName + " removed from replication map of file: " + it->first << endl; - // check that internal map is not empty + // check that internal map is not below threshold if (it->second.size() < (replicationFactor / 2) + 1) { - cout << "[SERVER]: Datanodes hodling the file: " + it->first + "is below threshold" << endl; + cout << "[SERVER]: Datanodes hodling the file: " + it->first + " are below threshold" << endl; rebalanceFileReplication(it->first, it->second); } } + else + { + cout << "[SERVER]: Datanode: " + datanodeName + " not found for file: " + it->first << endl; + } ++it; } } @@ -231,6 +257,20 @@ void Server::rebalanceFileReplication(string filePath, mapfirst << endl; + getFileFromDataNode(filePath, fileHoldersMap.begin()->second); + } + + auto endpointIterator = dataNodeEndpointMap.begin(); + // Assign new datanodes until the replication factor is met for (int i = 0; i < dataNodeEndpointMap.size(); i++) { @@ -246,13 +286,14 @@ void Server::rebalanceFileReplication(string filePath, mapsecond; - endpointIterator++; // Send the file to the newly assigned datanode cout << "[SERVER]: Sending file " << filePath << " to datanode: " << datanodeName << endl; - Message response = endpointIterator->second.createFile(filePath); + Message response = endpointIterator->second.createFile(filePath, FileManager::getInstance().getFileVersion(filePath)); + FileManager::getInstance().deleteFile(filePath); + endpointIterator++; // Check if the file transfer was successful if (response.args["ACK"] != "ACK") @@ -263,6 +304,11 @@ void Server::rebalanceFileReplication(string filePath, map= replicationFactor) + { + cout << "[SERVER]: Replication factor met for file: " << filePath << endl; + break; // Stop if the replication factor is met + } } } @@ -362,28 +408,28 @@ void Server::handleConnection(SquidProtocol clientProtocol) { case CREATE_FILE: clientProtocol.requestDispatcher(mex); - propagateCreateFile(mex.args["filePath"], clientProtocol); - FileManager::getInstance().deleteFile(mex.args["filePath"]); + propagateCreateFile(mex.args["filePath"], stoi(mex.args["fileVersion"]), clientProtocol); + FileManager::getInstance().deleteFileAndVersion(mex.args["filePath"]); break; case READ_FILE: getFileFromDataNode(mex.args["filePath"], clientProtocol); clientProtocol.requestDispatcher(mex); - FileManager::getInstance().deleteFile(mex.args["filePath"]); + FileManager::getInstance().deleteFileAndVersion(mex.args["filePath"]); break; case UPDATE_FILE: clientProtocol.requestDispatcher(mex); - propagateUpdateFile(mex.args["filePath"], clientProtocol); - FileManager::getInstance().deleteFile(mex.args["filePath"]); + propagateUpdateFile(mex.args["filePath"], stoi(mex.args["fileVersion"]), clientProtocol); + FileManager::getInstance().deleteFileAndVersion(mex.args["filePath"]); break; case DELETE_FILE: clientProtocol.requestDispatcher(mex); propagateDeleteFile(mex.args["filePath"], clientProtocol); dataNodeReplicationMap.erase(mex.args["filePath"]); - FileManager::getInstance().deleteFile(mex.args["filePath"]); + FileManager::getInstance().deleteFileAndVersion(mex.args["filePath"]); break; case SYNC_STATUS: cout << "SERVER: received sync status request\n"; - clientProtocol.response(fileTimeMap); + clientProtocol.response(getFileVersionMap()); break; case ACQUIRE_LOCK: cout << "[SERVER]: received acquire lock request for " << mex.args["filePath"] << endl; @@ -398,10 +444,11 @@ void Server::handleConnection(SquidProtocol clientProtocol) } cout << "[SERVER]: Request dispatched" << endl; - printMap(fileLockMap, "File Lock Map"); - printMap(fileTimeMap, "File Time Map"); - printMap(dataNodeReplicationMap, "DataNode Replication Map"); - printMap(clientEndpointMap, "Client Endpoint Map"); + // printMap(fileLockMap, "File Lock Map"); + // printMap(fileTimeMap, "File Time Map"); + // printMap(FileManager::getInstance().getFileVersionMap(), "File Version Map"); + // printMap(dataNodeReplicationMap, "DataNode Replication Map"); + // printMap(clientEndpointMap, "Client Endpoint Map"); }; // ----------------------- @@ -464,28 +511,44 @@ void Server::buildFileLockMap() for (auto &datanodeEndpoint : dataNodeEndpointMap) { cout << "[SERVER]: Building file map from datanode: " + datanodeEndpoint.first << endl; - Message files = datanodeEndpoint.second.listFiles(); // + Message files = datanodeEndpoint.second.listFiles(); // for (auto &file : files.args) { + if (file.second == "NACK") + { + cout << "[SERVER]: NACK for: " + datanodeEndpoint.first << endl; + // cout << "[SERVER]: File map not built" << endl; + // return; + continue; + } if (fileLockMap.find(file.first) == fileLockMap.end()) { // new file + cout << "raw version -> " << file.second << "\n"; fileLockMap[file.first] = FileLock(file.first); - fileTimeMap[file.first] = stoll(file.second); - dataNodeReplicationMap[file.first].insert(datanodeEndpoint); + FileManager::getInstance().setFileVersion(file.first, stoi(file.second)); } - else if (fileTimeMap.find(file.first)->second > stoll(file.second)) + else if (FileManager::getInstance().getFileVersion(file.first) > stoi(file.second)) { // if file on server is neewer, update datanode - datanodeEndpoint.second.updateFile(file.first); + cout << "updating datanode file version\n"; + auto fileHoldersMap = dataNodeReplicationMap[file.first]; + for (auto it = fileHoldersMap.begin(); it != fileHoldersMap.end(); ++it) + { + if (it->first != datanodeEndpoint.first) + { + cout << "retriving file from datanode: " + it->first << endl; + getFileFromDataNode(file.first, it->second); + break; + } + } + datanodeEndpoint.second.updateFile(file.first, FileManager::getInstance().getFileVersion(file.first)); } - else if (fileTimeMap.find(file.first)->second < stoll(file.second)) + else if (FileManager::getInstance().getFileVersion(file.first) < stoi(file.second)) { // if file on server is older, update file time map - fileTimeMap[file.first] = stoll(file.second); + cout << "updating server file version\n"; + FileManager::getInstance().setFileVersion(file.first, stoi(file.second)); } - // if (dataNodeReplicationMap.find(file.first) == dataNodeReplicationMap.end()) - // { - // dataNodeReplicationMap[file.first].insert(datanodeEndpoint); - // } + dataNodeReplicationMap[file.first].insert(datanodeEndpoint); cout << "[SERVER]: File: " + file.first + " added to datanode: " + datanodeEndpoint.first << endl; } } @@ -530,6 +593,30 @@ void Server::getFileFromDataNode(string filePath, SquidProtocol clientProtocol) cout << "Retrived file from datanode holder" << endl; } +map Server::getFileVersionMap() +{ + map fileVersionMap; + for (auto &datanode : dataNodeEndpointMap) + { + + Message mex = datanode.second.listFiles(); + + for (auto &file : mex.args) + { + if (fileVersionMap.find(file.first) == fileVersionMap.end()) + { + fileVersionMap[file.first] = stoi(file.second); + } + else + { + fileVersionMap[file.first] = max(fileVersionMap[file.first], stoi(file.second)); + } + } + } + return fileVersionMap; +} + +// deprecated void Server::propagateUpdateFile(string filePath, SquidProtocol clientProtocol) { @@ -542,7 +629,20 @@ void Server::propagateUpdateFile(string filePath, SquidProtocol clientProtocol) for (auto &datanode : dataNodeReplicationMap[filePath]) datanode.second.updateFile(filePath); - fileTimeMap[filePath] = chrono::system_clock::now().time_since_epoch().count(); + // fileTimeMap[filePath] = chrono::system_clock::now().time_since_epoch().count(); +} + +void Server::propagateUpdateFile(string filePath, int version, SquidProtocol clientProtocol) +{ + + for (auto &client : clientEndpointMap) + { + if (client.second.first.getSocket() != clientProtocol.getSocket()) + client.second.second.updateFile(filePath, version); // second channel + } + + for (auto &datanode : dataNodeReplicationMap[filePath]) + datanode.second.updateFile(filePath, version); } void Server::propagateDeleteFile(string filePath, SquidProtocol clientProtocol) @@ -557,10 +657,11 @@ void Server::propagateDeleteFile(string filePath, SquidProtocol clientProtocol) datanode.second.deleteFile(filePath); fileLockMap.erase(filePath); - fileTimeMap.erase(filePath); + // fileTimeMap.erase(filePath); dataNodeReplicationMap.erase(filePath); } +// deprecated void Server::propagateCreateFile(string filePath, SquidProtocol clientProtocol) { // round robin replication lock_guard lock(mapMutex); @@ -585,7 +686,8 @@ void Server::propagateCreateFile(string filePath, SquidProtocol clientProtocol) datanode.second.createFile(filePath); fileLockMap.insert({filePath, FileLock(filePath)}); - fileTimeMap.insert({filePath, chrono::system_clock::now().time_since_epoch().count()}); + // fileTimeMap.insert({filePath, chrono::system_clock::now().time_since_epoch().count()}); + printMap(fileLockMap, "File Lock Map"); for (auto &client : clientEndpointMap) @@ -595,10 +697,43 @@ void Server::propagateCreateFile(string filePath, SquidProtocol clientProtocol) } } +void Server::propagateCreateFile(string filePath, int version, SquidProtocol clientProtocol) +{ // round robin replication + lock_guard lock(mapMutex); + auto fileHoldersMap = map(); + + if (dataNodeEndpointMap.empty()) + return; + + for (int i = 0; i < replicationFactor; i++) + { + if (endpointIterator == dataNodeEndpointMap.end()) + endpointIterator = dataNodeEndpointMap.begin(); + + fileHoldersMap.insert({endpointIterator->first, endpointIterator->second}); + endpointIterator++; + } + + cout << "iterated" << endl; + dataNodeReplicationMap.insert({filePath, fileHoldersMap}); + + for (auto &datanode : dataNodeReplicationMap[filePath]) + datanode.second.createFile(filePath, version); + + fileLockMap.insert({filePath, FileLock(filePath)}); + fileTimeMap.insert({filePath, chrono::system_clock::now().time_since_epoch().count()}); + printMap(fileLockMap, "File Lock Map"); + + for (auto &client : clientEndpointMap) + { + if (client.second.first.getSocket() != clientProtocol.getSocket()) + client.second.second.createFile(filePath, version); // second channel + } +} // ----------------------- // ------ PERSISTANCE ---- // ----------------------- - +/* void Server::saveMapToFile() { ofstream outFile(filename); @@ -627,7 +762,7 @@ void Server::loadMapFromFile() inFile.close(); cout << "[SERVER]: File time map loaded from file" << endl; } - +*/ // ----------------------- // ------ PRINT MAPS ----- // ----------------------- diff --git a/server/src/server.hpp b/server/src/server.hpp index 721c087..6db097d 100644 --- a/server/src/server.hpp +++ b/server/src/server.hpp @@ -16,7 +16,7 @@ #include "filetransfer.hpp" #include "squidProtocolServer.cpp" -#define DEFAULT_PORT 8080 +#define DEFAULT_PORT 12345 #define BUFFER_SIZE 1024 #define DEFAULT_PATH "./test_txt/test_server" @@ -42,15 +42,18 @@ class Server void handleConnection(SquidProtocol clientProtocol); void handleAccept(int new_socket, sockaddr_in peer_addr); - void sendHearbeats(); + void sendHeartbeats(); void checkFileLockExpiration(); + void eraseFromReplicationMap(vector datanodeNames); void eraseFromReplicationMap(string datanodeName); void checkCloseConnetions(fd_set &master_set, int max_sd); void rebalanceFileReplication(string filePath, map fileHoldersMap); void getFileFromDataNode(string filePath, SquidProtocol clientProtocol); void propagateCreateFile(string filePath, SquidProtocol clientProtocol); + void propagateCreateFile(string filePath, int version, SquidProtocol clientProtocol); void propagateUpdateFile(string filePath, SquidProtocol clientProtocol); + void propagateUpdateFile(string filePath, int version, SquidProtocol clientProtocol); void propagateDeleteFile(string filePath, SquidProtocol clientProtocol); private: @@ -82,9 +85,8 @@ class Server // iterators for round robin redundancy map::iterator endpointIterator; - void saveMapToFile(); - void loadMapFromFile(); + map getFileVersionMap(); void printMap(map &map, string name); void printMap(map &map, string name); void printMap(map &map, string name); diff --git a/test/testMultiClient.sh b/test/testMultiClient.sh new file mode 100755 index 0000000..7900433 --- /dev/null +++ b/test/testMultiClient.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Move executables to their respective directories +cp ./SquidStorage ../test_txt/test_client1/SquidStorage +cp ./SquidStorage ../test_txt/test_client2/SquidStorage +cp ./SquidStorageServer ../test_txt/test_server/SquidStorageServer +cp ./DataNode ../test_txt/test_datanode1/DataNode + +# Start a new tmux session +SESSION_NAME="SquidStorage" +tmux new-session -d -s $SESSION_NAME +tmux set -g mouse on + +# Run SquidStorageServer in the first pane +tmux rename-window -t $SESSION_NAME "Server" +tmux send-keys -t $SESSION_NAME "cd ../test_txt/test_server && ./SquidStorageServer" C-m + +# Split the window and run DataNode in the second pane +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode1 && ./DataNode" C-m + +# Split the window vertically and run SquidStorage in the third pane +tmux split-window -v +tmux send-keys "cd ../test_txt/test_client1 && ./SquidStorage" C-m + +tmux split-window -v +tmux send-keys "cd ../test_txt/test_client2 && ./SquidStorage" C-m + +# Attach to the tmux session +tmux select-pane -t 0 +tmux attach-session -t $SESSION_NAME + +# Clean up executables after execution +rm -f ../test_txt/test_client1/SquidStorage +rm -f ../test_txt/test_client2/SquidStorage +rm -f ../test_txt/test_server/SquidStorageServer +rm -f ../test_txt/test_datanode1/DataNode \ No newline at end of file diff --git a/build/buildTestMultiDataNodes.sh b/test/testMultiDataNode.sh similarity index 95% rename from build/buildTestMultiDataNodes.sh rename to test/testMultiDataNode.sh index 2f4143e..13617b0 100755 --- a/build/buildTestMultiDataNodes.sh +++ b/test/testMultiDataNode.sh @@ -1,8 +1,4 @@ #!/bin/bash -# Build the project -cmake .. -make -j 8 - # Move executables to their respective directories cp ./SquidStorage ../test_txt/test_client1/SquidStorage cp ./SquidStorageServer ../test_txt/test_server/SquidStorageServer @@ -38,4 +34,3 @@ rm -f ../test_txt/test_client1/SquidStorage rm -f ../test_txt/test_server/SquidStorageServer rm -f ../test_txt/test_datanode1/DataNode rm -f ../test_txt/test_datanode2/DataNode -# make clean diff --git a/test/testMultiple.sh b/test/testMultiple.sh new file mode 100755 index 0000000..241931b --- /dev/null +++ b/test/testMultiple.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# Move executables to their respective directories +cp ./SquidStorage ../test_txt/test_client1/SquidStorage +cp ./SquidStorage ../test_txt/test_client2/SquidStorage +cp ./SquidStorageServer ../test_txt/test_server/SquidStorageServer +cp ./DataNode ../test_txt/test_datanode1/DataNode +cp ./DataNode ../test_txt/test_datanode2/DataNode +cp ./DataNode ../test_txt/test_datanode3/DataNode + +# Start a new tmux session +SESSION_NAME="SquidStorage" +tmux new-session -d -s $SESSION_NAME +tmux set -g mouse on + +# Run SquidStorageServer in the first pane +tmux rename-window -t $SESSION_NAME "Server" +tmux send-keys -t $SESSION_NAME "cd ../test_txt/test_server && ./SquidStorageServer" C-m + +# Split the window and run DataNode in the second pane +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode1 && ./DataNode" C-m + +# Split the window and run DataNode in the second pane +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode2 && ./DataNode" C-m + +# Split the window and run DataNode in the second pane +tmux split-window -h +tmux send-keys "cd ../test_txt/test_datanode3 && ./DataNode" C-m + +# Split the window vertically and run SquidStorage in the third pane +tmux split-window -v +tmux send-keys "cd ../test_txt/test_client1 && ./SquidStorage" C-m + +tmux split-window -v +tmux send-keys "cd ../test_txt/test_client2 && ./SquidStorage" C-m + +# Attach to the tmux session +tmux select-pane -t 0 +tmux attach-session -t $SESSION_NAME + +# Clean up executables after execution +rm -f ../test_txt/test_client1/SquidStorage +rm -f ../test_txt/test_client2/SquidStorage +rm -f ../test_txt/test_server/SquidStorageServer +rm -f ../test_txt/test_datanode1/DataNode +rm -f ../test_txt/test_datanode2/DataNode +rm -f ../test_txt/test_datanode3/DataNode diff --git a/test/test_client1/datanode1.txt b/test/test_client1/datanode1.txt new file mode 100644 index 0000000..ccb91ac --- /dev/null +++ b/test/test_client1/datanode1.txt @@ -0,0 +1,2 @@ +I am datanode1 +ciao bello! \ No newline at end of file diff --git a/test/test_client2/clientfile2.txt b/test/test_client2/clientfile2.txt new file mode 100644 index 0000000..fb140c8 --- /dev/null +++ b/test/test_client2/clientfile2.txt @@ -0,0 +1 @@ +I am client 2 \ No newline at end of file diff --git a/test/test_datanode1/datanode1.txt b/test/test_datanode1/datanode1.txt new file mode 100644 index 0000000..ccb91ac --- /dev/null +++ b/test/test_datanode1/datanode1.txt @@ -0,0 +1,2 @@ +I am datanode1 +ciao bello! \ No newline at end of file diff --git a/test/test_datanode2/datanode2.txt b/test/test_datanode2/datanode2.txt new file mode 100644 index 0000000..8b0996a --- /dev/null +++ b/test/test_datanode2/datanode2.txt @@ -0,0 +1,2 @@ +I am datanode2 +ciao \ No newline at end of file diff --git a/test/test_datanode3/datanode3.txt b/test/test_datanode3/datanode3.txt new file mode 100644 index 0000000..86c0b6c --- /dev/null +++ b/test/test_datanode3/datanode3.txt @@ -0,0 +1 @@ +I am datanode3 \ No newline at end of file