diff --git a/scripts/monitor.sh b/scripts/monitor.sh index 99a0c1d..9b7ce67 100644 --- a/scripts/monitor.sh +++ b/scripts/monitor.sh @@ -10,6 +10,7 @@ log_message() { # Function to handle errors handle_error() { log_message "ERROR: $1" + restart_services exit 1 } @@ -31,6 +32,12 @@ monitor_logs() { tail -f log/error.log | grep --line-buffered "ERROR" || handle_error "Failed to monitor logs for errors" } +# Function to restart all services +restart_services() { + log_message "Restarting all services..." + # Add logic to restart all services +} + # Main monitoring function main() { log_message "Starting monitoring..." diff --git a/scripts/run_local.py b/scripts/run_local.py index d21eee8..a7c7f01 100644 --- a/scripts/run_local.py +++ b/scripts/run_local.py @@ -1,8 +1,11 @@ import subprocess import argparse -def run_app(port1, port2, camera1, camera2): - cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1" +def run_app(port1, port2, camera1, camera2, restart): + if restart: + cmd = f"sbt run & sbt run --setcookie port2 --name port2@127.0.0.1" + else: + cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1" subprocess.run(cmd, shell=True) if __name__ == "__main__": @@ -11,6 +14,7 @@ def run_app(port1, port2, camera1, camera2): parser.add_argument("--port2", type=int, default=8081, help="Port for the second instance") parser.add_argument("--camera1", type=str, help="Camera input for the first instance") parser.add_argument("--camera2", type=str, help="Camera input for the second instance") + parser.add_argument("--restart", action="store_true", help="Restart the app") args = parser.parse_args() - run_app(args.port1, args.port2, args.camera1, args.camera2) + run_app(args.port1, args.port2, args.camera1, args.camera2, args.restart) diff --git a/src/scala/main/scala/centralized_server.scala b/src/scala/main/scala/centralized_server.scala index 6388ad7..18b18d8 100644 --- a/src/scala/main/scala/centralized_server.scala +++ b/src/scala/main/scala/centralized_server.scala @@ -10,13 +10,17 @@ object CentralizedServer { private var isRunning = false def startServer(port: Int): Future[Unit] = Future { - serverSocket = Some(new ServerSocket(port)) - isRunning = true - println(s"Centralized server started on port $port") - - while (isRunning) { - val clientSocket = serverSocket.get.accept() - handleClientRequest(clientSocket) + if (isRunning) { + restartServer(port) + } else { + serverSocket = Some(new ServerSocket(port)) + isRunning = true + println(s"Centralized server started on port $port") + + while (isRunning) { + val clientSocket = serverSocket.get.accept() + handleClientRequest(clientSocket) + } } } @@ -39,4 +43,11 @@ object CentralizedServer { clientSocket.close() } + + def restartServer(port: Int): Future[Unit] = { + for { + _ <- stopServer() + _ <- startServer(port) + } yield () + } } diff --git a/src/scala/main/scala/fault_tolerance.scala b/src/scala/main/scala/fault_tolerance.scala index 72351c1..8d1c2f2 100644 --- a/src/scala/main/scala/fault_tolerance.scala +++ b/src/scala/main/scala/fault_tolerance.scala @@ -8,8 +8,12 @@ object FaultTolerance { private var errorCount = 0 def start(): Future[Unit] = Future { - isRunning = true - println("Fault tolerance service started.") + if (isRunning) { + restart() + } else { + isRunning = true + println("Fault tolerance service started.") + } } def stop(): Future[Unit] = Future { @@ -26,4 +30,11 @@ object FaultTolerance { println(s"Recovering from error: $error") errorCount -= 1 } + + def restart(): Future[Unit] = { + for { + _ <- stop() + _ <- start() + } yield () + } } diff --git a/src/scala/main/scala/peer_discovery.scala b/src/scala/main/scala/peer_discovery.scala index 2ff8e83..916c7d1 100644 --- a/src/scala/main/scala/peer_discovery.scala +++ b/src/scala/main/scala/peer_discovery.scala @@ -9,12 +9,16 @@ object PeerDiscovery { private var peers: Set[String] = Set.empty def start(): Future[Unit] = Future { - isRunning = true - println("Peer discovery started.") - // Simulate peer discovery process - while (isRunning) { - Thread.sleep(5000) - broadcastMessage("Hello, peers!") + if (isRunning) { + restart() + } else { + isRunning = true + println("Peer discovery started.") + // Simulate peer discovery process + while (isRunning) { + Thread.sleep(5000) + broadcastMessage("Hello, peers!") + } } } @@ -34,4 +38,11 @@ object PeerDiscovery { // Broadcast peer discovery message to all peers peers.foreach(peer => println(s"Message sent to $peer: $message")) } + + def restart(): Future[Unit] = { + for { + _ <- stop() + _ <- start() + } yield () + } } diff --git a/src/scala/main/scala/reliable_transmission.scala b/src/scala/main/scala/reliable_transmission.scala index 9c12e34..48a5a44 100644 --- a/src/scala/main/scala/reliable_transmission.scala +++ b/src/scala/main/scala/reliable_transmission.scala @@ -11,7 +11,7 @@ object ReliableTransmission { def sendData(peer: String, data: String): Future[Unit] = { def attemptSend(retryCount: Int): Future[Unit] = { if (retryCount >= retryLimit) { - Future.failed(new Exception("Retry limit reached")) + restart() } else { // Simulate sending data to peer println(s"Sending data to $peer: $data") @@ -47,4 +47,19 @@ object ReliableTransmission { def handleTransmissionError(peer: String, reason: String): Unit = { println(s"Transmission error with peer $peer: $reason") } + + def restart(): Future[Unit] = { + for { + _ <- stop() + _ <- start() + } yield () + } + + def stop(): Future[Unit] = Future { + println("Reliable transmission service stopped.") + } + + def start(): Future[Unit] = Future { + println("Reliable transmission service started.") + } } diff --git a/src/scala/main/scala/scala_connector.scala b/src/scala/main/scala/scala_connector.scala index 54e8ded..551cbe1 100644 --- a/src/scala/main/scala/scala_connector.scala +++ b/src/scala/main/scala/scala_connector.scala @@ -11,13 +11,29 @@ object ScalaConnector { val erlangActor = system.actorOf(Props[ErlangActor], "erlangActor") def sendToErlang(message: String): Unit = { - erlangActor ! message + if (system.whenTerminated.isCompleted) { + restart() + } else { + erlangActor ! message + } } def receiveFromErlang(): Future[String] = { (erlangActor ? "receive").mapTo[String] } + def restart(): Future[Unit] = { + for { + _ <- system.terminate() + _ <- system.whenTerminated + newSystem = ActorSystem("ScalaErlangSystem") + newErlangActor = newSystem.actorOf(Props[ErlangActor], "erlangActor") + } yield { + system = newSystem + erlangActor = newErlangActor + } + } + class ErlangActor extends Actor { def receive: Receive = { case message: String => diff --git a/src/scala/main/scala/video_chunking.scala b/src/scala/main/scala/video_chunking.scala index beaa400..e01f4aa 100644 --- a/src/scala/main/scala/video_chunking.scala +++ b/src/scala/main/scala/video_chunking.scala @@ -9,6 +9,11 @@ object VideoChunking { val videoFile = new File(videoPath) val videoData = Files.readAllBytes(videoFile.toPath) chunkVideoData(videoData, chunkSize, List.empty) + } match { + case Success(chunks) => Success(chunks) + case Failure(exception) => + restart() + Failure(exception) } } @@ -30,4 +35,11 @@ object VideoChunking { chunkVideoData(rest, chunkSize, chunk :: acc) } } + + def restart(): Try[Unit] = { + Try { + println("Restarting video chunking service...") + // Add logic to restart the video chunking service + } + } } diff --git a/tests/scala/test_centralized_server.scala b/tests/scala/test_centralized_server.scala index 277e98b..4050051 100644 --- a/tests/scala/test_centralized_server.scala +++ b/tests/scala/test_centralized_server.scala @@ -20,4 +20,9 @@ class TestCentralizedServer extends AnyFlatSpec with Matchers { val result = Await.result(CentralizedServer.handleClientRequest(clientSocket), 5.seconds) result shouldBe () } + + it should "restart the centralized server" in { + val result = Await.result(CentralizedServer.restartServer(8080), 5.seconds) + result shouldBe () + } } diff --git a/tests/scala/test_fault_tolerance.scala b/tests/scala/test_fault_tolerance.scala index ee3b17d..f3238ec 100644 --- a/tests/scala/test_fault_tolerance.scala +++ b/tests/scala/test_fault_tolerance.scala @@ -24,4 +24,9 @@ class TestFaultTolerance extends AnyFlatSpec with Matchers { val result = Await.result(FaultTolerance.recover("Test error"), 5.seconds) result shouldBe () } + + it should "restart the fault tolerance service" in { + val result = Await.result(FaultTolerance.restart(), 5.seconds) + result shouldBe () + } } diff --git a/tests/scala/test_peer_discovery.scala b/tests/scala/test_peer_discovery.scala index 2836b09..0ec1bd9 100644 --- a/tests/scala/test_peer_discovery.scala +++ b/tests/scala/test_peer_discovery.scala @@ -24,4 +24,9 @@ class TestPeerDiscovery extends AnyFlatSpec with Matchers { val result = Await.result(PeerDiscovery.broadcastMessage("Test message"), 5.seconds) result shouldBe () } + + it should "restart the peer discovery service" in { + val result = Await.result(PeerDiscovery.restart(), 5.seconds) + result shouldBe () + } } diff --git a/tests/scala/test_reliable_transmission.scala b/tests/scala/test_reliable_transmission.scala index c9597cb..dc108fa 100644 --- a/tests/scala/test_reliable_transmission.scala +++ b/tests/scala/test_reliable_transmission.scala @@ -22,4 +22,9 @@ class TestReliableTransmission extends AnyFlatSpec with Matchers { // This is a placeholder for actual verification logic true should be (true) } + + it should "restart the reliable transmission service" in { + val result = Await.result(ReliableTransmission.restart(), 5.seconds) + result shouldBe () + } } diff --git a/tests/scala/test_scala_connector.scala b/tests/scala/test_scala_connector.scala index c1b394b..f3aa35e 100644 --- a/tests/scala/test_scala_connector.scala +++ b/tests/scala/test_scala_connector.scala @@ -16,4 +16,9 @@ class TestScalaConnector extends AnyFlatSpec with Matchers { val message = Await.result(ScalaConnector.receiveFromErlang(), 5.seconds) message should be ("Message from Erlang") } + + "restart" should "restart the Scala connector" in { + val result = Await.result(ScalaConnector.restart(), 5.seconds) + result shouldBe () + } } diff --git a/tests/scala/test_video_chunking.scala b/tests/scala/test_video_chunking.scala index 3f46d99..5e2b475 100644 --- a/tests/scala/test_video_chunking.scala +++ b/tests/scala/test_video_chunking.scala @@ -21,4 +21,12 @@ class TestVideoChunking extends AnyFlatSpec with Matchers { case Failure(exception) => fail(s"Failed to retrieve chunk: ${exception.getMessage}") } } + + it should "restart the video chunking service" in { + val result = VideoChunking.restart() + result match { + case Success(_) => succeed + case Failure(exception) => fail(s"Failed to restart video chunking service: ${exception.getMessage}") + } + } }