diff --git a/Dockerfile b/Dockerfile index c79d79b..6ad821f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,27 +1,3 @@ -# Build stage for Erlang -FROM erlang:24 AS erlang-build -WORKDIR /app - -# Copy Erlang-specific files -COPY rebar.config ./ -COPY config ./config/ -COPY src/erlang ./src/erlang/ -COPY microservices ./microservices/ - -# Install dependencies -RUN apt-get update && \ - apt-get install -y --no-install-recommends \ - ffmpeg \ - libavcodec-extra \ - build-essential \ - gcc \ - make \ - && apt-get clean && \ - rm -rf /var/lib/apt/lists/* - -# Compile Erlang -RUN rebar3 get-deps && rebar3 compile - # Build stage for Scala FROM ubuntu:24.04 AS scala-build WORKDIR /app @@ -57,19 +33,12 @@ ENV NODE_NAME_1=node1@127.0.0.1 \ COOKIE=erlangcast_cookie \ DEBIAN_FRONTEND=noninteractive -# Install Erlang and runtime dependencies +# Install runtime dependencies RUN apt-get update && \ apt-get install -y --no-install-recommends \ - erlang \ - erlang-base \ - erlang-dev \ - erlang-tools \ - ffmpeg \ - libavcodec-extra \ openjdk-11-jre-headless \ netcat-openbsd \ curl \ - rebar3 \ && apt-get clean && \ rm -rf /var/lib/apt/lists/* @@ -78,15 +47,14 @@ RUN mkdir -p /app/log /app/media/input /app/media/output && \ chmod -R 777 /app/log /app/media # Copy artifacts and configs -COPY --from=erlang-build /app/_build /app/_build COPY --from=scala-build /app/target /app/target COPY config ./config/ # Create startup script RUN echo '#!/bin/sh\n\ epmd -daemon\n\ -NODE_NAME=$NODE_NAME_1 COOKIE=$COOKIE rebar3 shell & \ -NODE_NAME=$NODE_NAME_2 COOKIE=$COOKIE rebar3 shell\n' > /usr/local/bin/start.sh && \ +NODE_NAME=$NODE_NAME_1 COOKIE=$COOKIE sbt run & \ +NODE_NAME=$NODE_NAME_2 COOKIE=$COOKIE sbt run\n' > /usr/local/bin/start.sh && \ chmod +x /usr/local/bin/start.sh EXPOSE 8080 8081 9100-9155 9200-9255 @@ -94,4 +62,4 @@ EXPOSE 8080 8081 9100-9155 9200-9255 HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD nc -z localhost 8080 || exit 1 -CMD ["/usr/local/bin/start.sh"] \ No newline at end of file +CMD ["/usr/local/bin/start.sh"] diff --git a/README.md b/README.md index 7b37483..0e867f8 100644 --- a/README.md +++ b/README.md @@ -1,115 +1,132 @@ -# ErlangCast -ErlangCast is a P2P video streaming server built with Erlang, leveraging its strengths in concurrency and distributed programming. Features include peer discovery, video chunking, reliable data transmission, and fault-tolerant architecture. Ideal for real-time streaming and decentralized applications. +# ScalaCast +ScalaCast is a centralized server streaming video server built with Scala, leveraging its strengths in concurrency and distributed programming. Features include centralized server, video chunking, reliable data transmission, and fault-tolerant architecture. Ideal for real-time streaming and centralized applications. -## Peer Discovery -The peer discovery feature allows peers to find each other in the network, enabling efficient communication and data sharing. +## Centralized Server +The centralized server feature allows clients to connect to a central server for efficient communication and data sharing. -### How to Use Peer Discovery -1. Start the peer discovery process by calling the `peer_discovery:start/0` function. -2. Handle incoming peer discovery messages using the `peer_discovery:handle_message/1` function. -3. Broadcast peer discovery messages using the `peer_discovery:broadcast_message/1` function. -4. Stop the peer discovery process by calling the `peer_discovery:stop/0` function. +### How to Use Centralized Server +1. Start the centralized server by calling the `CentralizedServer.startServer` function. +2. Handle client requests using the `CentralizedServer.handleClientRequest` function. +3. Stop the centralized server by calling the `CentralizedServer.stopServer` function. ### Example -```erlang -% Start the peer discovery process -peer_discovery:start(). +```scala +import scala.concurrent.Await +import scala.concurrent.duration._ + +implicit val ec: ExecutionContext = ExecutionContext.global -% Handle an incoming peer discovery message -peer_discovery:handle_message("Hello, peer!"). +// Start the centralized server +Await.result(CentralizedServer.startServer(8080), 5.seconds) -% Broadcast a peer discovery message -peer_discovery:broadcast_message("Hello, peers!"). +// Handle a client request +Await.result(CentralizedServer.handleClientRequest(clientSocket), 5.seconds) -% Stop the peer discovery process -peer_discovery:stop(). +// Stop the centralized server +Await.result(CentralizedServer.stopServer(), 5.seconds) ``` ## Video Chunking The video chunking feature allows the system to divide video files into smaller chunks for transmission, enabling efficient streaming and data handling. ### How to Use Video Chunking -1. Chunk a video file by calling the `video_chunking:chunk_video/2` function with the video file path and chunk size as arguments. -2. Retrieve a specific chunk of a video file using the `video_chunking:get_chunk/2` function with the video file path and chunk index as arguments. +1. Chunk a video file by calling the `VideoChunking.chunkVideo` function with the video file path and chunk size as arguments. +2. Retrieve a specific chunk of a video file using the `VideoChunking.getChunk` function with the video file path and chunk index as arguments. ### Example -```erlang -% Chunk a video file into 1MB chunks -video_chunking:chunk_video("path/to/video.mp4", 1048576). +```scala +import scala.util.{Success, Failure} -% Retrieve the first chunk of the video file -{ok, Chunk} = video_chunking:get_chunk("path/to/video.mp4", 1). +// Chunk a video file into 1MB chunks +VideoChunking.chunkVideo("path/to/video.mp4", 1048576) match { + case Success(chunks) => println(s"Video chunked into ${chunks.length} parts") + case Failure(exception) => println(s"Failed to chunk video: ${exception.getMessage}") +} + +// Retrieve the first chunk of the video file +VideoChunking.getChunk("path/to/video.mp4", 1) match { + case Success(chunk) => println(s"Retrieved chunk of size ${chunk.length}") + case Failure(exception) => println(s"Failed to retrieve chunk: ${exception.getMessage}") +} ``` ## Reliable Data Transmission -The reliable data transmission feature ensures that data is transmitted reliably between peers, handling packet loss and retransmissions. +The reliable data transmission feature ensures that data is transmitted reliably between clients and the server, handling packet loss and retransmissions. ### How to Use Reliable Data Transmission -1. Send data reliably by calling the `reliable_transmission:send_data/2` function with the peer and data as arguments. -2. Receive data reliably by calling the `reliable_transmission:receive_data/1` function with a data handler function as an argument. +1. Send data reliably by calling the `ReliableTransmission.sendData` function with the client and data as arguments. +2. Receive data reliably by calling the `ReliableTransmission.receiveData` function with a data handler function as an argument. ### Example -```erlang -% Send data reliably to a peer -reliable_transmission:send_data(Peer, "Hello, peer!"). +```scala +import scala.concurrent.Await +import scala.concurrent.duration._ + +implicit val ec: ExecutionContext = ExecutionContext.global + +// Send data reliably to a client +Await.result(ReliableTransmission.sendData("client1", "Hello, client!"), 5.seconds) -% Define a data handler function -DataHandler = fun(Data) -> - io:format("Received data: ~p~n", [Data]) -end. +// Define a data handler function +val dataHandler: String => Unit = data => println(s"Received data: $data") -% Receive data reliably -reliable_transmission:receive_data(DataHandler). +// Receive data reliably +Await.result(ReliableTransmission.receiveData(dataHandler), 5.seconds) ``` ## Fault-Tolerant Architecture The fault-tolerant architecture ensures that the system can handle failures gracefully and continue operating. This includes error handling and recovery mechanisms in various modules. ### Fault-Tolerant Features -1. **Peer Discovery**: The `peer_discovery` module includes error handling and recovery mechanisms for peer discovery failures. -2. **Reliable Transmission**: The `reliable_transmission` module includes error handling for retry limit reached and logs the error. -3. **Video Chunking**: The `video_chunking` module includes error handling for file operations and chunking process. +1. **Centralized Server**: The `CentralizedServer` module includes error handling and recovery mechanisms for server failures. +2. **Reliable Transmission**: The `ReliableTransmission` module includes error handling for retry limit reached and logs the error. +3. **Video Chunking**: The `VideoChunking` module includes error handling for file operations and chunking process. 4. **System Configuration**: The `sys.config` file includes fault-tolerance configurations for the system. ### Example -```erlang -% Example of error handling in peer discovery -case peer_discovery:start() of - ok -> io:format("Peer discovery started successfully~n"); - {error, Reason} -> io:format("Failed to start peer discovery: ~p~n", [Reason]) -end. - -% Example of error handling in reliable transmission -case reliable_transmission:send_data(Peer, "Hello, peer!") of - {ok, data_sent} -> io:format("Data sent successfully~n"); - {error, retry_limit_reached} -> io:format("Failed to send data: retry limit reached~n") -end. - -% Example of error handling in video chunking -case video_chunking:chunk_video("path/to/video.mp4", 1048576) of - {ok, Chunks} -> io:format("Video chunked successfully~n"); - {error, Reason} -> io:format("Failed to chunk video: ~p~n", [Reason]) -end. +```scala +import scala.concurrent.Await +import scala.concurrent.duration._ + +implicit val ec: ExecutionContext = ExecutionContext.global + +// Example of error handling in centralized server +Await.result(CentralizedServer.startServer(8080), 5.seconds) match { + case Success(_) => println("Centralized server started successfully") + case Failure(exception) => println(s"Failed to start centralized server: ${exception.getMessage}") +} + +// Example of error handling in reliable transmission +Await.result(ReliableTransmission.sendData("client1", "Hello, client!"), 5.seconds) match { + case Success(_) => println("Data sent successfully") + case Failure(exception) => println(s"Failed to send data: ${exception.getMessage}") +} + +// Example of error handling in video chunking +VideoChunking.chunkVideo("path/to/video.mp4", 1048576) match { + case Success(chunks) => println("Video chunked successfully") + case Failure(exception) => println(s"Failed to chunk video: ${exception.getMessage}") +} ``` ## Scala Connector -The Scala connector enables communication between the Erlang server and the Scala videoserver, leveraging Akka for actor-based communication. Scala is also used to raise the needed frontend to interact with the videoserver. +The Scala connector enables communication between the Scala server and the Scala videoserver, leveraging Akka for actor-based communication. Scala is also used to raise the needed frontend to interact with the videoserver. ### How to Use Scala Connector 1. Set up Scala and necessary dependencies. -2. Use the `ScalaConnector` object to send and receive messages between Scala and Erlang. +2. Use the `ScalaConnector` object to send and receive messages between Scala and the videoserver. ### Example ```scala import ScalaConnector._ object Main extends App { - // Send a message to Erlang - sendToErlang("Hello, Erlang!") + // Send a message to the videoserver + sendToErlang("Hello, videoserver!") - // Receive a message from Erlang + // Receive a message from the videoserver val message = Await.result(receiveFromErlang(), 5.seconds) - println(s"Received message from Erlang: $message") + println(s"Received message from videoserver: $message") } ``` @@ -133,19 +150,19 @@ The testing section provides instructions for running unit tests and integration ### Running Unit Tests To run the unit tests, use the following command: ```sh -rebar3 eunit +sbt test ``` ### Running Integration Tests To run the integration tests, use the following command: ```sh -rebar3 ct +sbt it:test ``` ### Running Test Coverage To run the test coverage, use the following command: ```sh -rebar3 as test cover +sbt coverage test ``` ## Automated Testing Workflow @@ -174,20 +191,28 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - - name: Set up Erlang/OTP - uses: erlef/setup-beam@v1 + - name: Set up Java + uses: actions/setup-java@v2 with: - otp-version: 24.x - rebar3-version: 3.16.1 + distribution: 'adopt' + java-version: '11' + + - name: Set up Scala + run: | + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add + sudo apt-get update + sudo apt-get install sbt - - name: Install dependencies - run: rebar3 get-deps + - name: Compile Scala code + run: sbt compile - name: Run unit tests - run: rebar3 eunit --verbose + run: sbt test - name: Run integration tests - run: rebar3 ct --verbose + run: sbt it:test ``` ## Running the Project Locally @@ -292,12 +317,13 @@ The HLS and DASH streaming options allow for adaptive bitrate streaming, providi 3. Retrieve the HLS playlist using the `hls_streaming:get_playlist/1` function with the output directory as an argument. ### Example -```erlang -% Start HLS streaming -hls_streaming:start("path/to/video.mp4", "path/to/output"). +```scala +// Start HLS streaming +HlsStreaming.start("path/to/video.mp4", "path/to/output") -% Retrieve the HLS playlist -{ok, Playlist} = hls_streaming:get_playlist("path/to/output"). +// Retrieve the HLS playlist +val playlist = HlsStreaming.getPlaylist("path/to/output") +println(s"HLS playlist: $playlist") ``` ### How to Use DASH Streaming @@ -306,12 +332,13 @@ hls_streaming:start("path/to/video.mp4", "path/to/output"). 3. Retrieve the DASH manifest using the `dash_streaming:get_manifest/1` function with the output directory as an argument. ### Example -```erlang -% Start DASH streaming -dash_streaming:start("path/to/video.mp4", "path/to/output"). +```scala +// Start DASH streaming +DashStreaming.start("path/to/video.mp4", "path/to/output") -% Retrieve the DASH manifest -{ok, Manifest} = dash_streaming:get_manifest("path/to/output"). +// Retrieve the DASH manifest +val manifest = DashStreaming.getManifest("path/to/output") +println(s"DASH manifest: $manifest") ``` ## Deploying the App to GitHub Pages @@ -342,18 +369,6 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - - name: Set up Erlang/OTP - uses: erlef/setup-beam@v1 - with: - otp-version: 24.x - rebar3-version: 3.16.1 - - - name: Install dependencies - run: rebar3 get-deps - - - name: Build project - run: rebar3 compile - - name: Set up Java uses: actions/setup-java@v2 with: @@ -390,12 +405,13 @@ The RTMP streaming option allows for real-time messaging protocol streaming, pro 3. Retrieve the RTMP stream URL using the `rtmp_streaming:get_stream_url/1` function with the output directory as an argument. ### Example -```erlang -% Start RTMP streaming -rtmp_streaming:start("path/to/video.mp4", "path/to/output"). +```scala +// Start RTMP streaming +RtmpStreaming.start("path/to/video.mp4", "path/to/output") -% Retrieve the RTMP stream URL -{ok, StreamURL} = rtmp_streaming:get_stream_url("path/to/output"). +// Retrieve the RTMP stream URL +val streamUrl = RtmpStreaming.getStreamUrl("path/to/output") +println(s"RTMP stream URL: $streamUrl") ``` ## Troubleshooting Common Issues @@ -416,25 +432,25 @@ To build and run the project using Docker, follow these steps: 1. Build the Docker image: ```sh - docker build -t erlangcast . + docker build -t scalacast . ``` 2. Run the Docker container: ```sh - docker run -p 8080:8080 -p 9100-9155:9100-9155 -p 9200-9255:9200-9255 erlangcast + docker run -p 8080:8080 -p 9100-9155:9100-9155 -p 9200-9255:9200-9255 scalacast ``` -This will start the ErlangCast project in a Docker container, making it easy to run locally. +This will start the ScalaCast project in a Docker container, making it easy to run locally. ## Microservices Architecture -The ErlangCast project now includes a microservices architecture for easier integration and stability. The microservices are designed to handle specific functionalities and can be run independently. +The ScalaCast project now includes a microservices architecture for easier integration and stability. The microservices are designed to handle specific functionalities and can be run independently. ### Microservices Overview -1. **Peer Discovery Service**: Handles peer discovery in the network. +1. **Centralized Server Service**: Handles client connections and requests. 2. **Video Chunking Service**: Manages video chunking for efficient streaming. -3. **Reliable Transmission Service**: Ensures reliable data transmission between peers. +3. **Reliable Transmission Service**: Ensures reliable data transmission between clients and the server. 4. **Fault Tolerance Service**: Provides fault tolerance and error recovery mechanisms. ### Setting Up and Running Microservices @@ -445,14 +461,14 @@ To set up and run the microservices, follow these steps: 2. Navigate to the project directory. 3. Build the Docker images for each microservice: ```sh - docker build -t peer-discovery-service -f Dockerfile . + docker build -t centralized-server-service -f Dockerfile . docker build -t video-chunking-service -f Dockerfile . docker build -t reliable-transmission-service -f Dockerfile . docker build -t fault-tolerance-service -f Dockerfile . ``` 4. Run the Docker containers for each microservice: ```sh - docker run -d --name peer-discovery-service peer-discovery-service + docker run -d --name centralized-server-service centralized-server-service docker run -d --name video-chunking-service video-chunking-service docker run -d --name reliable-transmission-service reliable-transmission-service docker run -d --name fault-tolerance-service fault-tolerance-service @@ -460,41 +476,40 @@ To set up and run the microservices, follow these steps: ### Using Microservices -#### Peer Discovery Service +#### Centralized Server Service -The Peer Discovery Service handles peer discovery in the network. +The Centralized Server Service handles client connections and requests. **Functions**: -- `start/0`: Starts the peer discovery process. -- `stop/0`: Stops the peer discovery process. -- `handle_message/1`: Handles incoming peer discovery messages. -- `broadcast_message/1`: Broadcasts peer discovery messages. +- `startServer`: Starts the centralized server. +- `stopServer`: Stops the centralized server. +- `handleClientRequest`: Handles client requests. #### Video Chunking Service The Video Chunking Service manages video chunking for efficient streaming. **Functions**: -- `chunk_video/2`: Chunks a video file into smaller chunks. -- `get_chunk/2`: Retrieves a specific chunk of a video file. +- `chunkVideo`: Chunks a video file into smaller chunks. +- `getChunk`: Retrieves a specific chunk of a video file. #### Reliable Transmission Service -The Reliable Transmission Service ensures reliable data transmission between peers. +The Reliable Transmission Service ensures reliable data transmission between clients and the server. **Functions**: -- `send_data/2`: Sends data to a peer with retry mechanism. -- `receive_data/1`: Receives data and processes it using the provided data handler. +- `sendData`: Sends data to a client with retry mechanism. +- `receiveData`: Receives data and processes it using the provided data handler. #### Fault Tolerance Service The Fault Tolerance Service provides fault tolerance and error recovery mechanisms. **Functions**: -- `start/0`: Starts the fault tolerance service. -- `stop/0`: Stops the fault tolerance service. -- `handle_error/1`: Handles errors and logs them. -- `recover/1`: Recovers from errors based on the error type. +- `start`: Starts the fault tolerance service. +- `stop`: Stops the fault tolerance service. +- `handleError`: Handles errors and logs them. +- `recover`: Recovers from errors based on the error type. ## Running the App on Multiple Ports @@ -508,7 +523,7 @@ To run the app on multiple ports, follow these steps: ### Example Configuration #### `config/sys.config` -```erlang +```scala [ {kernel, [ {inet_dist_listen_min, 9100}, @@ -519,10 +534,10 @@ To run the app on multiple ports, follow these steps: {sasl_error_logger, {file, "log/sasl-error.log"}}, {errlog_type, error} ]}, - {peer_discovery, [ + {centralized_server, [ {port_range, {9200, 9255}}, {broadcast_interval, 5000}, - {max_peers, 50} + {max_clients, 50} ]}, {fault_tolerance, [ {retry_limit, 5}, @@ -533,63 +548,76 @@ To run the app on multiple ports, follow these steps: {port1, 8080}, {port2, 8081} ]} -]. +] ``` #### `Dockerfile` ```Dockerfile -# Use an official Erlang runtime as a parent image -FROM erlang:24 - -# Set the working directory +# Build stage for Scala +FROM ubuntu:24.04 AS scala-build WORKDIR /app -# Copy the current directory contents into the container at /app -COPY . /app - -# Install dependencies +# Install Java and SBT dependencies RUN apt-get update && \ - apt-get install -y ffmpeg libav-tools libavcodec-extra && \ - rebar3 get-deps - -# Compile the project -RUN rebar3 compile - -# Install Scala and sbt -RUN echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list && \ - echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | tee /etc/apt/sources.list.d/sbt_old.list && \ - curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | apt-key add && \ - apt-get update && \ - apt-get install -y sbt - -# Compile the Scala code + apt-get install -y --no-install-recommends \ + openjdk-11-jdk-headless \ + curl \ + gnupg \ + && echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | tee /etc/apt/sources.list.d/sbt.list \ + && curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | gpg --dearmor > /etc/apt/trusted.gpg.d/sbt.gpg \ + && apt-get update \ + && apt-get install -y sbt \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Setup SBT +RUN mkdir -p project && \ + echo 'sbt.version=1.8.2' > project/build.properties && \ + echo 'addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")\naddSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")' > project/plugins.sbt && \ + echo 'name := "scalacast"\nversion := "0.1.0"\nscalaVersion := "2.13.8"' > build.sbt + +COPY src/scala ./src/scala/ RUN sbt compile -# Copy and run the peer discovery microservice -COPY microservices/peer_discovery_service.erl /app/microservices/ -RUN erlc /app/microservices/peer_discovery_service.erl -CMD ["erl", "-noshell", "-s", "peer_discovery_service", "start", "-s", "init", "stop"] - -# Copy and run the video chunking microservice -COPY microservices/video_chunking_service.erl /app/microservices/ -RUN erlc /app/microservices/video_chunking_service.erl -CMD ["erl", "-noshell", "-s", "video_chunking_service", "chunk_video", "-s", "init", "stop"] +# Final runtime stage +FROM ubuntu:24.04 +WORKDIR /app -# Copy and run the reliable transmission microservice -COPY microservices/reliable_transmission_service.erl /app/microservices/ -RUN erlc /app/microservices/reliable_transmission_service.erl -CMD ["erl", "-noshell", "-s", "reliable_transmission_service", "send_data", "-s", "init", "stop"] +ENV NODE_NAME_1=node1@127.0.0.1 \ + NODE_NAME_2=node2@127.0.0.1 \ + COOKIE=scalacast_cookie \ + DEBIAN_FRONTEND=noninteractive -# Copy and run the fault tolerance microservice -COPY microservices/fault_tolerance_service.erl /app/microservices/ -RUN erlc /app/microservices/fault_tolerance_service.erl -CMD ["erl", "-noshell", "-s", "fault_tolerance_service", "start", "-s", "init", "stop"] +# Install runtime dependencies +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + openjdk-11-jre-headless \ + netcat-openbsd \ + curl \ + && apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Setup directories +RUN mkdir -p /app/log /app/media/input /app/media/output && \ + chmod -R 777 /app/log /app/media + +# Copy artifacts and configs +COPY --from=scala-build /app/target /app/target +COPY config ./config/ + +# Create startup script +RUN echo '#!/bin/sh\n\ +epmd -daemon\n\ +NODE_NAME=$NODE_NAME_1 COOKIE=$COOKIE sbt run & \ +NODE_NAME=$NODE_NAME_2 COOKIE=$COOKIE sbt run\n' > /usr/local/bin/start.sh && \ + chmod +x /usr/local/bin/start.sh -# Expose the necessary ports EXPOSE 8080 8081 9100-9155 9200-9255 -# Define the command to run the project on both ports -CMD ["sh", "-c", "rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1"] +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD nc -z localhost 8080 || exit 1 + +CMD ["/usr/local/bin/start.sh"] ``` #### `docs/index.html` @@ -768,7 +796,7 @@ import subprocess import argparse def run_app(port1, port2, camera1, camera2): - cmd = f"rebar3 shell & rebar3 shell --setcookie port2 --name port2@127.0.0.1" + cmd = f"sbt run & sbt run --setcookie port2 --name port2@127.0.0.1" subprocess.run(cmd, shell=True) if __name__ == "__main__": diff --git a/microservices/fault_tolerance_service.erl b/microservices/fault_tolerance_service.erl deleted file mode 100644 index 8f4cff8..0000000 --- a/microservices/fault_tolerance_service.erl +++ /dev/null @@ -1,81 +0,0 @@ --module(fault_tolerance_service). --behavior(gen_server). - --export([start/0, stop/0, handle_error/1, recover/1]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - -% Define state record --record(state, { - peers = [], % List of connected peers - video_data = [], % Current video data - chunk_size = 1024, % Default chunk size - error_count = 0 % Count of errors encountered -}). - -% Start the fault tolerance service -start() -> - case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of - {ok, Pid} -> - {ok, Pid}; - {error, Reason} -> - error_logger:error_msg("Failed to start fault tolerance service: ~p~n", [Reason]), - {error, Reason} - end. - -% Stop the service -stop() -> - gen_server:cast(?MODULE, stop). - -% Handle various types of errors -handle_error({peer_failure, Peer, Data}) -> - gen_server:cast(?MODULE, {handle_peer_failure, Peer, Data}); -handle_error({chunk_error, VideoPath, ChunkSize}) -> - gen_server:cast(?MODULE, {handle_chunk_error, VideoPath, ChunkSize}); -handle_error({video_corrupted, VideoPath}) -> - gen_server:cast(?MODULE, {handle_video_corruption, VideoPath}); -handle_error({transmission_error, VideoData, ChunkSize}) -> - gen_server:cast(?MODULE, {handle_transmission_error, VideoData, ChunkSize}); -handle_error({chunk_missing, VideoPath, ChunkIndex}) -> - gen_server:cast(?MODULE, {handle_missing_chunk, VideoPath, ChunkIndex}). - -% Recovery functions -recover({peer_connection, Peer, Data}) -> - gen_server:call(?MODULE, {recover_peer_connection, Peer, Data}); -recover({data_handler, DataHandler}) -> - gen_server:call(?MODULE, {recover_data_handler, DataHandler}); -recover({error_handler, Msg}) -> - gen_server:call(?MODULE, {recover_error_handler, Msg}); -recover({recovery_handler, Msg}) -> - gen_server:call(?MODULE, {recover_recovery_handler, Msg}). - -% Callback functions -init([]) -> - {ok, #state{}}. - -handle_call({recover_peer_connection, Peer, Data}, _From, State) -> - {reply, {ok, {Peer, Data}}, State}; -handle_call({recover_data_handler, DataHandler}, _From, State) -> - {reply, {ok, DataHandler}, State}; -handle_call({recover_error_handler, Msg}, _From, State) -> - {reply, {ok, Msg}, State}; -handle_call({recover_recovery_handler, Msg}, _From, State) -> - {reply, {ok, Msg}, State}. - -handle_cast(stop, State) -> - {stop, normal, State}; -handle_cast({handle_peer_failure, Peer, Data}, State) -> - error_logger:warning_msg("Handling peer failure for ~p with data ~p~n", [Peer, Data]), - {noreply, State#state{error_count = State#state.error_count + 1}}; -handle_cast({handle_chunk_error, VideoPath, ChunkSize}, State) -> - error_logger:warning_msg("Handling chunk error for ~p with size ~p~n", [VideoPath, ChunkSize]), - {noreply, State}. - -handle_info(Info, State) -> - error_logger:info_msg("Received unexpected message: ~p~n", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/microservices/peer_discovery_service.erl b/microservices/peer_discovery_service.erl deleted file mode 100644 index 40e3598..0000000 --- a/microservices/peer_discovery_service.erl +++ /dev/null @@ -1,39 +0,0 @@ --module(peer_discovery_service). - --export([start/0, stop/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --record(state, {}). - -% Start the peer discovery process -start() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -% Stop the peer discovery process -stop() -> - gen_server:stop(?MODULE). - -% Initialize the gen_server -init([]) -> - {ok, #state{}}. - -% Handle synchronous calls -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -% Handle asynchronous messages -handle_cast(_Msg, State) -> - {noreply, State}. - -% Handle other messages -handle_info(_Info, State) -> - error_logger:error_msg("Unexpected info received: ~p~n", [_Info]), - {noreply, State}. - -% Terminate the gen_server -terminate(_Reason, _State) -> - ok. - -% Handle code changes -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/microservices/reliable_transmission_service.erl b/microservices/reliable_transmission_service.erl deleted file mode 100644 index f63d068..0000000 --- a/microservices/reliable_transmission_service.erl +++ /dev/null @@ -1,37 +0,0 @@ --module(reliable_transmission_service). - --export([send_data/2, receive_data/1]). - --define(RETRY_LIMIT, 5). --define(ACK_TIMEOUT, 1000). - -% Send data to a peer with retry mechanism -send_data(Peer, Data) -> - send_data(Peer, Data, 0). - -% Handle retry limit reached for sending data -send_data(_Peer, _Data, ?RETRY_LIMIT) -> - error_logger:error_msg("Retry limit reached for sending data to peer: ~p~n", [_Peer]), - {error, retry_limit_reached}; -send_data(Peer, Data, RetryCount) -> - % Send data to peer - Peer ! {data, Data}, - % Wait for acknowledgment - receive - {ack, Peer} -> - {ok, data_sent}; - after ?ACK_TIMEOUT -> - % Retry sending data - send_data(Peer, Data, RetryCount + 1) - end. - -% Receive data and process it using the provided data handler -receive_data(DataHandler) -> - receive - {data, Data} -> - % Process received data - DataHandler(Data), - % Send acknowledgment - self() ! {ack, self()}, - receive_data(DataHandler) - end. diff --git a/microservices/video_chunking_service.erl b/microservices/video_chunking_service.erl deleted file mode 100644 index 3de942c..0000000 --- a/microservices/video_chunking_service.erl +++ /dev/null @@ -1,45 +0,0 @@ --module(video_chunking_service). - --export([chunk_video/2, get_chunk/2]). - -% Chunks a video file into smaller chunks of the specified size -chunk_video(VideoPath, ChunkSize) -> - % Read the video file - case read_video_file(VideoPath) of - {ok, VideoData} -> - % Chunk the video data - chunk_video_data(VideoData, ChunkSize, []); - {error, Reason} -> - % Log an error message if reading the file fails - error_logger:error_msg("Failed to read video file ~p: ~p~n", [VideoPath, Reason]), - {error, Reason} - end. - -% Retrieves a specific chunk of a video file -get_chunk(VideoPath, ChunkIndex) -> - % Read the video file - case read_video_file(VideoPath) of - {ok, VideoData} -> - % Chunk the video data - Chunks = chunk_video_data(VideoData, 1048576, []), - % Retrieve the specified chunk - lists:nth(ChunkIndex, Chunks); - {error, Reason} -> - % Log an error message if reading the file fails - error_logger:error_msg("Failed to read video file ~p: ~p~n", [VideoPath, Reason]), - {error, Reason} - end. - -% Helper function to read the video file -read_video_file(VideoPath) -> - file:read_file(VideoPath). - -% Helper function to chunk the video data -chunk_video_data(VideoData, ChunkSize, Acc) when byte_size(VideoData) =< ChunkSize -> - % Return the accumulated chunks in reverse order - lists:reverse([VideoData | Acc]); -chunk_video_data(VideoData, ChunkSize, Acc) -> - % Split the video data into a chunk and the remaining data - <> = VideoData, - % Recursively chunk the remaining data - chunk_video_data(Rest, ChunkSize, [Chunk | Acc]). diff --git a/src/erlang/peer_discovery.erl b/src/erlang/peer_discovery.erl deleted file mode 100644 index 086d558..0000000 --- a/src/erlang/peer_discovery.erl +++ /dev/null @@ -1,63 +0,0 @@ --module(peer_discovery). - --export([start/0, stop/0, handle_message/1, broadcast_message/1]). - -% Start the peer discovery process -start() -> - % Start the peer discovery process - case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of - {ok, _Pid} -> - ok; - {error, Reason} -> - error_logger:error_msg("Failed to start peer discovery: ~p~n", [Reason]), - {error, Reason} - end. - -% Stop the peer discovery process -stop() -> - % Stop the peer discovery process - case gen_server:stop(?MODULE) of - ok -> - ok; - {error, Reason} -> - error_logger:error_msg("Failed to stop peer discovery: ~p~n", [Reason]), - {error, Reason} - end. - -% Handle incoming peer discovery messages -handle_message(Msg) -> - % Handle incoming peer discovery messages - io:format("Received peer discovery message: ~p~n", [Msg]), - ok. - -% Broadcast peer discovery messages -broadcast_message(Msg) -> - % Broadcast peer discovery messages - io:format("Broadcasting peer discovery message: ~p~n", [Msg]), - ok. - -% Initialize the gen_server -init([]) -> - {ok, #state{}}. - -% Handle synchronous calls -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -% Handle asynchronous messages -handle_cast(_Msg, State) -> - {noreply, State}. - -% Handle other messages -handle_info(_Info, State) -> - % Recovery mechanism for peer discovery failures - error_logger:error_msg("Unexpected info received: ~p~n", [_Info]), - {noreply, State}. - -% Terminate the gen_server -terminate(_Reason, _State) -> - ok. - -% Handle code changes -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/erlang/reliable_transmission.erl b/src/erlang/reliable_transmission.erl deleted file mode 100644 index b19b008..0000000 --- a/src/erlang/reliable_transmission.erl +++ /dev/null @@ -1,67 +0,0 @@ --module(reliable_transmission). - --export([send_data/2, receive_data/1, handle_transmission_error/2]). - --define(RETRY_LIMIT, 5). --define(ACK_TIMEOUT, 1000). - -send_data(Peer, Data) -> - send_data(Peer, Data, 0). - -send_data(_Peer, _Data, ?RETRY_LIMIT) -> - handle_transmission_error(_Peer, retry_limit_exceeded); - -send_data(Peer, Data, RetryCount) -> - try - Peer ! {data, self(), Data}, - receive - {ack, Peer} -> - {ok, sent} - after ?ACK_TIMEOUT -> - case handle_transmission_error(Peer, {timeout, RetryCount}) of - retry -> - send_data(Peer, Data, RetryCount + 1); - Error -> - Error - end - end - catch - error:Reason -> - handle_transmission_error(Peer, Reason) - end. - -receive_data(Timeout) -> - receive - {data, Sender, Data} -> - case send_ack(Sender) of - ok -> - {ok, Data}; - Error -> - handle_transmission_error(Sender, Error) - end; - {error, Sender, Reason} -> - handle_transmission_error(Sender, Reason) - after Timeout -> - handle_transmission_error(undefined, timeout) - end. - -send_ack(Sender) -> - try - Sender ! {ack, self()}, - ok - catch - _:Error -> Error - end. - -handle_transmission_error(Peer, Reason) -> - error_logger:error_msg("Transmission error with peer ~p: ~p~n", [Peer, Reason]), - case Reason of - {timeout, RetryCount} when RetryCount < ?RETRY_LIMIT -> - retry; - timeout -> - {error, {timeout, Peer}}; - retry_limit_exceeded -> - {error, {retry_limit_exceeded, Peer}}; - _ -> - {error, {transmission_failed, Peer, Reason}} - end. diff --git a/src/erlang/video_chunking.erl b/src/erlang/video_chunking.erl deleted file mode 100644 index 1be9d72..0000000 --- a/src/erlang/video_chunking.erl +++ /dev/null @@ -1,41 +0,0 @@ --module(video_chunking). - --export([chunk_video/2, get_chunk/2]). - -% Chunks a video file into smaller chunks of the specified size -chunk_video(VideoPath, ChunkSize) -> - % Read the video file - case file:read_file(VideoPath) of - {ok, VideoData} -> - % Chunk the video data - chunk_video_data(VideoData, ChunkSize, []); - {error, Reason} -> - % Log an error message if reading the file fails - error_logger:error_msg("Failed to read video file ~p: ~p~n", [VideoPath, Reason]), - {error, Reason} - end. - -% Retrieves a specific chunk of a video file -get_chunk(VideoPath, ChunkIndex) -> - % Read the video file - case file:read_file(VideoPath) of - {ok, VideoData} -> - % Chunk the video data - Chunks = chunk_video_data(VideoData, 1048576, []), - % Retrieve the specified chunk - lists:nth(VideoIndex, Chunks); - {error, Reason} -> - % Log an error message if reading the file fails - error_logger:error_msg("Failed to read video file ~p: ~p~n", [VideoPath, Reason]), - {error, Reason} - end. - -% Helper function to chunk the video data -chunk_video_data(VideoData, ChunkSize, Acc) when byte_size(VideoData) =< ChunkSize -> - % Return the accumulated chunks in reverse order - lists:reverse([VideoData | Acc]); -chunk_video_data(VideoData, ChunkSize, Acc) -> - % Split the video data into a chunk and the remaining data - <> = VideoData, - % Recursively chunk the remaining data - chunk_video_data(Rest, ChunkSize, [Chunk | Acc]). diff --git a/src/scala/main/scala/centralized_server.scala b/src/scala/main/scala/centralized_server.scala new file mode 100644 index 0000000..6388ad7 --- /dev/null +++ b/src/scala/main/scala/centralized_server.scala @@ -0,0 +1,42 @@ +import scala.concurrent.{Future, ExecutionContext} +import scala.util.{Success, Failure} +import java.net.{ServerSocket, Socket} +import java.io.{BufferedReader, InputStreamReader, PrintWriter} + +object CentralizedServer { + implicit val ec: ExecutionContext = ExecutionContext.global + + private var serverSocket: Option[ServerSocket] = None + private var isRunning = false + + def startServer(port: Int): Future[Unit] = Future { + serverSocket = Some(new ServerSocket(port)) + isRunning = true + println(s"Centralized server started on port $port") + + while (isRunning) { + val clientSocket = serverSocket.get.accept() + handleClientRequest(clientSocket) + } + } + + def stopServer(): Future[Unit] = Future { + isRunning = false + serverSocket.foreach(_.close()) + println("Centralized server stopped") + } + + def handleClientRequest(clientSocket: Socket): Future[Unit] = Future { + val in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream)) + val out = new PrintWriter(clientSocket.getOutputStream, true) + + val request = in.readLine() + println(s"Received request: $request") + + // Handle the request and send a response + val response = s"Response to: $request" + out.println(response) + + clientSocket.close() + } +} diff --git a/src/scala/main/scala/fault_tolerance.scala b/src/scala/main/scala/fault_tolerance.scala new file mode 100644 index 0000000..72351c1 --- /dev/null +++ b/src/scala/main/scala/fault_tolerance.scala @@ -0,0 +1,29 @@ +import scala.concurrent.{Future, ExecutionContext} +import scala.util.{Success, Failure} + +object FaultTolerance { + implicit val ec: ExecutionContext = ExecutionContext.global + + private var isRunning = false + private var errorCount = 0 + + def start(): Future[Unit] = Future { + isRunning = true + println("Fault tolerance service started.") + } + + def stop(): Future[Unit] = Future { + isRunning = false + println("Fault tolerance service stopped.") + } + + def handleError(error: String): Future[Unit] = Future { + println(s"Handling error: $error") + errorCount += 1 + } + + def recover(error: String): Future[Unit] = Future { + println(s"Recovering from error: $error") + errorCount -= 1 + } +} diff --git a/src/scala/main/scala/peer_discovery.scala b/src/scala/main/scala/peer_discovery.scala new file mode 100644 index 0000000..2ff8e83 --- /dev/null +++ b/src/scala/main/scala/peer_discovery.scala @@ -0,0 +1,37 @@ +import scala.concurrent.{Future, ExecutionContext} +import scala.concurrent.duration._ +import scala.util.{Success, Failure} + +object PeerDiscovery { + implicit val ec: ExecutionContext = ExecutionContext.global + + private var isRunning = false + private var peers: Set[String] = Set.empty + + def start(): Future[Unit] = Future { + isRunning = true + println("Peer discovery started.") + // Simulate peer discovery process + while (isRunning) { + Thread.sleep(5000) + broadcastMessage("Hello, peers!") + } + } + + def stop(): Future[Unit] = Future { + isRunning = false + println("Peer discovery stopped.") + } + + def handleMessage(message: String): Future[Unit] = Future { + println(s"Received peer discovery message: $message") + // Handle incoming peer discovery message + peers += message + } + + def broadcastMessage(message: String): Future[Unit] = Future { + println(s"Broadcasting peer discovery message: $message") + // Broadcast peer discovery message to all peers + peers.foreach(peer => println(s"Message sent to $peer: $message")) + } +} diff --git a/src/scala/main/scala/reliable_transmission.scala b/src/scala/main/scala/reliable_transmission.scala new file mode 100644 index 0000000..9c12e34 --- /dev/null +++ b/src/scala/main/scala/reliable_transmission.scala @@ -0,0 +1,50 @@ +import scala.concurrent.{Future, ExecutionContext} +import scala.concurrent.duration._ +import scala.util.{Success, Failure} + +object ReliableTransmission { + implicit val ec: ExecutionContext = ExecutionContext.global + + private val retryLimit = 5 + private val ackTimeout = 1000 + + def sendData(peer: String, data: String): Future[Unit] = { + def attemptSend(retryCount: Int): Future[Unit] = { + if (retryCount >= retryLimit) { + Future.failed(new Exception("Retry limit reached")) + } else { + // Simulate sending data to peer + println(s"Sending data to $peer: $data") + // Simulate waiting for acknowledgment + Future { + Thread.sleep(ackTimeout) + if (scala.util.Random.nextBoolean()) { + println(s"Acknowledgment received from $peer") + Success(()) + } else { + println(s"No acknowledgment from $peer, retrying...") + Failure(new Exception("No acknowledgment")) + } + }.flatMap { + case Success(_) => Future.successful(()) + case Failure(_) => attemptSend(retryCount + 1) + } + } + } + + attemptSend(0) + } + + def receiveData(dataHandler: String => Unit): Future[Unit] = Future { + // Simulate receiving data + val data = "Received data" + println(data) + dataHandler(data) + // Simulate sending acknowledgment + println("Sending acknowledgment") + } + + def handleTransmissionError(peer: String, reason: String): Unit = { + println(s"Transmission error with peer $peer: $reason") + } +} diff --git a/src/scala/main/scala/video_chunking.scala b/src/scala/main/scala/video_chunking.scala new file mode 100644 index 0000000..beaa400 --- /dev/null +++ b/src/scala/main/scala/video_chunking.scala @@ -0,0 +1,33 @@ +import java.io.{File, FileInputStream, IOException} +import java.nio.file.{Files, Paths} +import scala.util.{Try, Success, Failure} + +object VideoChunking { + + def chunkVideo(videoPath: String, chunkSize: Int): Try[List[Array[Byte]]] = { + Try { + val videoFile = new File(videoPath) + val videoData = Files.readAllBytes(videoFile.toPath) + chunkVideoData(videoData, chunkSize, List.empty) + } + } + + def getChunk(videoPath: String, chunkIndex: Int): Try[Array[Byte]] = { + chunkVideo(videoPath, 1048576).map { chunks => + if (chunkIndex >= 0 && chunkIndex < chunks.length) { + chunks(chunkIndex) + } else { + throw new IndexOutOfBoundsException(s"Chunk index $chunkIndex out of bounds") + } + } + } + + private def chunkVideoData(videoData: Array[Byte], chunkSize: Int, acc: List[Array[Byte]]): List[Array[Byte]] = { + if (videoData.length <= chunkSize) { + (videoData :: acc).reverse + } else { + val (chunk, rest) = videoData.splitAt(chunkSize) + chunkVideoData(rest, chunkSize, chunk :: acc) + } + } +} diff --git a/tests/scala/test_centralized_server.scala b/tests/scala/test_centralized_server.scala new file mode 100644 index 0000000..277e98b --- /dev/null +++ b/tests/scala/test_centralized_server.scala @@ -0,0 +1,23 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration._ + +class TestCentralizedServer extends AnyFlatSpec with Matchers { + + "CentralizedServer" should "start the centralized server" in { + val result = Await.result(CentralizedServer.startServer(8080), 5.seconds) + result shouldBe () + } + + it should "stop the centralized server" in { + val result = Await.result(CentralizedServer.stopServer(), 5.seconds) + result shouldBe () + } + + it should "handle client requests" in { + val clientSocket = new java.net.Socket("localhost", 8080) + val result = Await.result(CentralizedServer.handleClientRequest(clientSocket), 5.seconds) + result shouldBe () + } +} diff --git a/tests/scala/test_fault_tolerance.scala b/tests/scala/test_fault_tolerance.scala new file mode 100644 index 0000000..ee3b17d --- /dev/null +++ b/tests/scala/test_fault_tolerance.scala @@ -0,0 +1,27 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration._ + +class TestFaultTolerance extends AnyFlatSpec with Matchers { + + "FaultTolerance" should "start the fault tolerance service" in { + val result = Await.result(FaultTolerance.start(), 5.seconds) + result shouldBe () + } + + it should "stop the fault tolerance service" in { + val result = Await.result(FaultTolerance.stop(), 5.seconds) + result shouldBe () + } + + it should "handle errors" in { + val result = Await.result(FaultTolerance.handleError("Test error"), 5.seconds) + result shouldBe () + } + + it should "recover from errors" in { + val result = Await.result(FaultTolerance.recover("Test error"), 5.seconds) + result shouldBe () + } +} diff --git a/tests/scala/test_peer_discovery.scala b/tests/scala/test_peer_discovery.scala new file mode 100644 index 0000000..2836b09 --- /dev/null +++ b/tests/scala/test_peer_discovery.scala @@ -0,0 +1,27 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration._ + +class TestPeerDiscovery extends AnyFlatSpec with Matchers { + + "PeerDiscovery" should "start the peer discovery process" in { + val result = Await.result(PeerDiscovery.start(), 5.seconds) + result shouldBe () + } + + it should "stop the peer discovery process" in { + val result = Await.result(PeerDiscovery.stop(), 5.seconds) + result shouldBe () + } + + it should "handle incoming peer discovery messages" in { + val result = Await.result(PeerDiscovery.handleMessage("Test message"), 5.seconds) + result shouldBe () + } + + it should "broadcast peer discovery messages" in { + val result = Await.result(PeerDiscovery.broadcastMessage("Test message"), 5.seconds) + result shouldBe () + } +} diff --git a/tests/scala/test_reliable_transmission.scala b/tests/scala/test_reliable_transmission.scala new file mode 100644 index 0000000..c9597cb --- /dev/null +++ b/tests/scala/test_reliable_transmission.scala @@ -0,0 +1,25 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.duration._ + +class TestReliableTransmission extends AnyFlatSpec with Matchers { + + "ReliableTransmission" should "send data reliably to a peer" in { + val result = Await.result(ReliableTransmission.sendData("peer1", "Test data"), 5.seconds) + result shouldBe () + } + + it should "receive data reliably" in { + val dataHandler: String => Unit = data => data shouldBe "Test data" + val result = Await.result(ReliableTransmission.receiveData(dataHandler), 5.seconds) + result shouldBe () + } + + it should "handle transmission errors" in { + ReliableTransmission.handleTransmissionError("peer1", "Test error") + // Assuming some mechanism to verify the error was handled + // This is a placeholder for actual verification logic + true should be (true) + } +} diff --git a/tests/scala/test_video_chunking.scala b/tests/scala/test_video_chunking.scala new file mode 100644 index 0000000..3f46d99 --- /dev/null +++ b/tests/scala/test_video_chunking.scala @@ -0,0 +1,24 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.util.{Success, Failure} + +class TestVideoChunking extends AnyFlatSpec with Matchers { + + "VideoChunking" should "chunk a video file into smaller chunks" in { + val videoPath = "path/to/test_video.mp4" + val chunkSize = 1048576 + VideoChunking.chunkVideo(videoPath, chunkSize) match { + case Success(chunks) => chunks.length should be > 0 + case Failure(exception) => fail(s"Failed to chunk video: ${exception.getMessage}") + } + } + + it should "retrieve a specific chunk of a video file" in { + val videoPath = "path/to/test_video.mp4" + val chunkIndex = 1 + VideoChunking.getChunk(videoPath, chunkIndex) match { + case Success(chunk) => chunk.length should be > 0 + case Failure(exception) => fail(s"Failed to retrieve chunk: ${exception.getMessage}") + } + } +}