Skip to content

Commit

Permalink
Command to start DHT client
Browse files Browse the repository at this point in the history
  • Loading branch information
lavrov committed Sep 11, 2024
1 parent 83967e0 commit ffee393
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 33 deletions.
49 changes: 22 additions & 27 deletions cmd/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import cats.effect.IO
import cats.effect.Resource
import cats.effect.ResourceIO
import cats.syntax.all.*
import com.comcast.ip4s.Port
import com.comcast.ip4s.SocketAddress
import com.github.torrentdam.bencode
import com.github.torrentdam.bittorrent.dht.*
Expand Down Expand Up @@ -48,10 +49,15 @@ object Main
) {

def main: Opts[IO[ExitCode]] =
downloadCommand <+> torrentCommand <+> discoverCommand <+> verifyCommand
torrentCommand <+> dhtCommand

def torrentCommand =
Opts.subcommand("torrent", "download torrent file") {
def torrentCommand: Opts[IO[ExitCode]] =
Opts.subcommand("torrent", "torrent client")(
fetchFileCommand <+> downloadCommand <+> verifyCommand
)

def fetchFileCommand =
Opts.subcommand("fetch-file", "download torrent file") {
(
Opts.option[String]("info-hash", "Info-hash"),
Opts.option[String]("save", "Save as a torrent file")
Expand All @@ -71,7 +77,7 @@ object Main
)
.await
val table = Resource.eval(RoutingTable[IO](selfId)).await
val node = Node(selfId, QueryHandler(selfId, table)).await
val node = Node(selfId, none, QueryHandler(selfId, table)).await
Resource.eval(RoutingTableBootstrap[IO](table, node.client)).await
val discovery = PeerDiscovery.make(table, node.client).await

Expand Down Expand Up @@ -145,7 +151,7 @@ object Main
Stream.emit(PeerInfo(peerAddress)).covary[IO]
case None =>
val table = Resource.eval(RoutingTable[IO](selfId)).await
val node = Node(selfId, QueryHandler(selfId, table)).await
val node = Node(selfId, none, QueryHandler(selfId, table)).await
Resource.eval(RoutingTableBootstrap(table, node.client)).await
val discovery = PeerDiscovery.make(table, node.client).await
discovery.discover(infoHash)
Expand Down Expand Up @@ -266,34 +272,23 @@ object Main
}
}

def discoverCommand =
Opts.subcommand("discover", "discover peers via DHT") {
Opts.option[String]("info-hash", "Info-hash").map { infoHash0 =>
def dhtCommand: Opts[IO[ExitCode]] =
Opts.subcommand("dht", "DHT client")(
startCommand
)

def startCommand =
Opts.subcommand("start", "start DHT node") {
Opts.option[Int]("port", "UDP port").map { portParam =>
withLogger {
async[ResourceIO] {
val port = Port.fromInt(portParam).liftTo[ResourceIO](new Exception("Invalid port")).await
given Random[IO] = Resource.eval(Random.scalaUtilRandom[IO]).await

val selfId = Resource.eval(NodeId.generate[IO]).await
val infoHash = Resource
.eval(
InfoHash.fromString
.unapply(infoHash0)
.liftTo[IO](new Exception("Malformed info-hash"))
)
.await
val table = Resource.eval(RoutingTable[IO](selfId)).await
val node = Node(selfId, QueryHandler(selfId, table)).await
val node = Node(selfId, Some(port), QueryHandler(selfId, table)).await
Resource.eval(RoutingTableBootstrap(table, node.client)).await
val discovery = PeerDiscovery.make(table, node.client).await
discovery
.discover(infoHash)
.evalTap { peerInfo =>
Logger[IO].trace(s"Discovered peer ${peerInfo.address}")
}
.compile
.drain
.as(ExitCode.Success)
}.useEval
}.useForever
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.effect.Concurrent
import cats.effect.IO
import cats.effect.Resource
import cats.syntax.all.*
import com.comcast.ip4s.*
import com.comcast.ip4s.ip
import com.github.torrentdam.bencode.decode
import com.github.torrentdam.bencode.encode
Expand All @@ -15,7 +16,6 @@ import fs2.io.net.DatagramSocket
import fs2.io.net.DatagramSocketGroup
import fs2.io.net.Network
import fs2.Chunk
import com.comcast.ip4s.*
import java.net.InetSocketAddress
import org.legogroup.woof.given
import org.legogroup.woof.Logger
Expand Down Expand Up @@ -47,11 +47,11 @@ class MessageSocket(socket: DatagramSocket[IO], logger: Logger[IO]) {

object MessageSocket {

def apply()(using
logger: Logger[IO]
): Resource[IO, MessageSocket] =
def apply(
port: Option[Port]
)(using logger: Logger[IO]): Resource[IO, MessageSocket] =
Network[IO]
.openDatagramSocket(Some(ip"0.0.0.0"))
.openDatagramSocket(Some(ip"0.0.0.0"), port)
.map(socket => new MessageSocket(socket, logger))

object Error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object Node {

def apply(
selfId: NodeId,
port: Option[Port],
queryHandler: QueryHandler[IO]
)(using
random: Random[IO],
Expand All @@ -34,7 +35,7 @@ object Node {
(nextChar, nextChar).mapN((a, b) => ByteVector.encodeAscii(List(a, b).mkString).toOption.get)

for
messageSocket <- MessageSocket()
messageSocket <- MessageSocket(port)
responses <- Resource.eval {
Queue.unbounded[IO, (SocketAddress[IpAddress], Either[Message.ErrorMessage, Message.ResponseMessage])]
}
Expand Down

0 comments on commit ffee393

Please sign in to comment.