Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions scripts/monitor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ log_message() {
# Function to handle errors
handle_error() {
log_message "ERROR: $1"
restart_services
exit 1
}

Expand All @@ -31,6 +32,12 @@ monitor_logs() {
tail -f log/error.log | grep --line-buffered "ERROR" || handle_error "Failed to monitor logs for errors"
}

# Function to restart all services
restart_services() {
log_message "Restarting all services..."
# Add logic to restart all services
}

# Main monitoring function
main() {
log_message "Starting monitoring..."
Expand Down
10 changes: 7 additions & 3 deletions scripts/run_local.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import subprocess
import argparse

def run_app(port1, port2, camera1, camera2):
cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1"
def run_app(port1, port2, camera1, camera2, restart):
if restart:
cmd = f"sbt run & sbt run --setcookie port2 --name port2@127.0.0.1"
else:
cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1"
subprocess.run(cmd, shell=True)

if __name__ == "__main__":
Expand All @@ -11,6 +14,7 @@ def run_app(port1, port2, camera1, camera2):
parser.add_argument("--port2", type=int, default=8081, help="Port for the second instance")
parser.add_argument("--camera1", type=str, help="Camera input for the first instance")
parser.add_argument("--camera2", type=str, help="Camera input for the second instance")
parser.add_argument("--restart", action="store_true", help="Restart the app")
args = parser.parse_args()

run_app(args.port1, args.port2, args.camera1, args.camera2)
run_app(args.port1, args.port2, args.camera1, args.camera2, args.restart)
25 changes: 18 additions & 7 deletions src/scala/main/scala/centralized_server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ object CentralizedServer {
private var isRunning = false

def startServer(port: Int): Future[Unit] = Future {
serverSocket = Some(new ServerSocket(port))
isRunning = true
println(s"Centralized server started on port $port")

while (isRunning) {
val clientSocket = serverSocket.get.accept()
handleClientRequest(clientSocket)
if (isRunning) {
restartServer(port)
} else {
serverSocket = Some(new ServerSocket(port))
isRunning = true
println(s"Centralized server started on port $port")

while (isRunning) {
val clientSocket = serverSocket.get.accept()
handleClientRequest(clientSocket)
}
}
}

Expand All @@ -39,4 +43,11 @@ object CentralizedServer {

clientSocket.close()
}

def restartServer(port: Int): Future[Unit] = {
for {
_ <- stopServer()
_ <- startServer(port)
} yield ()
}
}
15 changes: 13 additions & 2 deletions src/scala/main/scala/fault_tolerance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ object FaultTolerance {
private var errorCount = 0

def start(): Future[Unit] = Future {
isRunning = true
println("Fault tolerance service started.")
if (isRunning) {
restart()
} else {
isRunning = true
println("Fault tolerance service started.")
}
}

def stop(): Future[Unit] = Future {
Expand All @@ -26,4 +30,11 @@ object FaultTolerance {
println(s"Recovering from error: $error")
errorCount -= 1
}

def restart(): Future[Unit] = {
for {
_ <- stop()
_ <- start()
} yield ()
}
}
23 changes: 17 additions & 6 deletions src/scala/main/scala/peer_discovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ object PeerDiscovery {
private var peers: Set[String] = Set.empty

def start(): Future[Unit] = Future {
isRunning = true
println("Peer discovery started.")
// Simulate peer discovery process
while (isRunning) {
Thread.sleep(5000)
broadcastMessage("Hello, peers!")
if (isRunning) {
restart()
} else {
isRunning = true
println("Peer discovery started.")
// Simulate peer discovery process
while (isRunning) {
Thread.sleep(5000)
broadcastMessage("Hello, peers!")
}
}
}

Expand All @@ -34,4 +38,11 @@ object PeerDiscovery {
// Broadcast peer discovery message to all peers
peers.foreach(peer => println(s"Message sent to $peer: $message"))
}

def restart(): Future[Unit] = {
for {
_ <- stop()
_ <- start()
} yield ()
}
}
17 changes: 16 additions & 1 deletion src/scala/main/scala/reliable_transmission.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object ReliableTransmission {
def sendData(peer: String, data: String): Future[Unit] = {
def attemptSend(retryCount: Int): Future[Unit] = {
if (retryCount >= retryLimit) {
Future.failed(new Exception("Retry limit reached"))
restart()
} else {
// Simulate sending data to peer
println(s"Sending data to $peer: $data")
Expand Down Expand Up @@ -47,4 +47,19 @@ object ReliableTransmission {
def handleTransmissionError(peer: String, reason: String): Unit = {
println(s"Transmission error with peer $peer: $reason")
}

def restart(): Future[Unit] = {
for {
_ <- stop()
_ <- start()
} yield ()
}

def stop(): Future[Unit] = Future {
println("Reliable transmission service stopped.")
}

def start(): Future[Unit] = Future {
println("Reliable transmission service started.")
}
}
18 changes: 17 additions & 1 deletion src/scala/main/scala/scala_connector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,29 @@ object ScalaConnector {
val erlangActor = system.actorOf(Props[ErlangActor], "erlangActor")

def sendToErlang(message: String): Unit = {
erlangActor ! message
if (system.whenTerminated.isCompleted) {
restart()
} else {
erlangActor ! message
}
}

def receiveFromErlang(): Future[String] = {
(erlangActor ? "receive").mapTo[String]
}

def restart(): Future[Unit] = {
for {
_ <- system.terminate()
_ <- system.whenTerminated
newSystem = ActorSystem("ScalaErlangSystem")
newErlangActor = newSystem.actorOf(Props[ErlangActor], "erlangActor")
} yield {
system = newSystem
erlangActor = newErlangActor
}
}

class ErlangActor extends Actor {
def receive: Receive = {
case message: String =>
Expand Down
12 changes: 12 additions & 0 deletions src/scala/main/scala/video_chunking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ object VideoChunking {
val videoFile = new File(videoPath)
val videoData = Files.readAllBytes(videoFile.toPath)
chunkVideoData(videoData, chunkSize, List.empty)
} match {
case Success(chunks) => Success(chunks)
case Failure(exception) =>
restart()
Failure(exception)
}
}

Expand All @@ -30,4 +35,11 @@ object VideoChunking {
chunkVideoData(rest, chunkSize, chunk :: acc)
}
}

def restart(): Try[Unit] = {
Try {
println("Restarting video chunking service...")
// Add logic to restart the video chunking service
}
}
}
5 changes: 5 additions & 0 deletions tests/scala/test_centralized_server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ class TestCentralizedServer extends AnyFlatSpec with Matchers {
val result = Await.result(CentralizedServer.handleClientRequest(clientSocket), 5.seconds)
result shouldBe ()
}

it should "restart the centralized server" in {
val result = Await.result(CentralizedServer.restartServer(8080), 5.seconds)
result shouldBe ()
}
}
5 changes: 5 additions & 0 deletions tests/scala/test_fault_tolerance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ class TestFaultTolerance extends AnyFlatSpec with Matchers {
val result = Await.result(FaultTolerance.recover("Test error"), 5.seconds)
result shouldBe ()
}

it should "restart the fault tolerance service" in {
val result = Await.result(FaultTolerance.restart(), 5.seconds)
result shouldBe ()
}
}
5 changes: 5 additions & 0 deletions tests/scala/test_peer_discovery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ class TestPeerDiscovery extends AnyFlatSpec with Matchers {
val result = Await.result(PeerDiscovery.broadcastMessage("Test message"), 5.seconds)
result shouldBe ()
}

it should "restart the peer discovery service" in {
val result = Await.result(PeerDiscovery.restart(), 5.seconds)
result shouldBe ()
}
}
5 changes: 5 additions & 0 deletions tests/scala/test_reliable_transmission.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ class TestReliableTransmission extends AnyFlatSpec with Matchers {
// This is a placeholder for actual verification logic
true should be (true)
}

it should "restart the reliable transmission service" in {
val result = Await.result(ReliableTransmission.restart(), 5.seconds)
result shouldBe ()
}
}
5 changes: 5 additions & 0 deletions tests/scala/test_scala_connector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ class TestScalaConnector extends AnyFlatSpec with Matchers {
val message = Await.result(ScalaConnector.receiveFromErlang(), 5.seconds)
message should be ("Message from Erlang")
}

"restart" should "restart the Scala connector" in {
val result = Await.result(ScalaConnector.restart(), 5.seconds)
result shouldBe ()
}
}
8 changes: 8 additions & 0 deletions tests/scala/test_video_chunking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ class TestVideoChunking extends AnyFlatSpec with Matchers {
case Failure(exception) => fail(s"Failed to retrieve chunk: ${exception.getMessage}")
}
}

it should "restart the video chunking service" in {
val result = VideoChunking.restart()
result match {
case Success(_) => succeed
case Failure(exception) => fail(s"Failed to restart video chunking service: ${exception.getMessage}")
}
}
}
Loading