Skip to content

Commit e4ad77a

Browse files
init commit
0 parents  commit e4ad77a

14 files changed

+1180
-0
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/target

Diff for: Cargo.toml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "p2p"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
anyhow = "1"
8+
clap = { version = "3.0", default-features = false, features = ["std"] }
9+
futures = { version = "0.3", default-features = false, features = ["std"] }
10+
log = { version = "0.4", default-features = false, features = ["std"] }
11+
serde = { version = "1", default-features = false, features = ["derive"] }
12+
serde_json = { version = "1", default-features = false, features = ["std"] }
13+
simple_logger = { version = "1.16", default-features = false, features = ["colors"] }
14+
tokio = { version = "1.15", default-features = false, features = ["io-util", "fs", "signal", "sync", "rt", "rt-multi-thread", "net", "time"] }
15+
tokio-serde = { version = "0.8", default-features = false, features = ["json"] }
16+
tokio-util = { version = "0.6", default-features = false, features = ["codec"] }
17+
uuid = { version = "0.8", default-features = false, features = ["v4", "serde"] }
18+
rustydht-lib = { path = "../../RustPart2/rustydht-lib" }
19+
tower = { version = "0.4.12" }
20+
sha-1 = "0.10.0"
21+
portpicker = "0.1.1"

Diff for: TODO.md

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
🔥 High priority:
2+
3+
[] Create something like manifest file that contains shards ids, their hashes, pieces size
4+
5+
[ 🚧 ] Research DHT(distributed hash table)
6+
7+
1. [] Implement/add DHT.
8+
2. [ 🚧 ] Use DHT logic inside sessions.
9+
- Download logic.
10+
- Upload logic.
11+
3. [ ] Own BHT bootstrapping node - postponed. use https://github.com/bittorrent/bootstrap-dht for now.
12+
13+
🦀 Medium priority:
14+
15+
1. Improve manifest file to better support DHT(see https://en.wikipedia.org/wiki/Torrent_file)
16+
1. [] Add nodes address in manifest files.
17+
2. [] Add the target file size in manifest files.
18+
3. [] Add pieces size in manifest files.
19+
4. [] Add infohash of a manifest file to uniquely identify a manifest file amount other nodes.
20+
21+
2. Download logic(blocked by DHT research/implementing)
22+
23+
🐮 Low priority:
24+
25+
1. [ ] Better naming: A "peer" is a client/server listening on a TCP port that implements the BitTorrent protocol. A "node" is a client/server listening on a UDP port implementing the distributed hash table protocol.

Diff for: rustfmt.toml

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
reorder_imports = true
2+
imports_granularity = "Crate"
3+
max_width = 120

Diff for: src/config.rs

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use clap::{App, Arg, ArgMatches, Values};
2+
use std::{
3+
net::SocketAddr,
4+
path::{Path, PathBuf},
5+
};
6+
7+
const ARG_NODE_ADDRESS: &str = "address";
8+
const ARG_UPLOAD: &str = "upload";
9+
const ARG_DOWNLOAD: &str = "download";
10+
pub const MANIFEST_FILE_EXT: &str = "manifest";
11+
12+
#[derive(Clone)]
13+
pub struct Config {
14+
pub node_address: SocketAddr,
15+
pub upload_files: Option<Vec<PathBuf>>,
16+
pub manifest_files: Option<Vec<PathBuf>>,
17+
}
18+
19+
impl Config {
20+
pub fn new() -> Self {
21+
Self::default()
22+
}
23+
24+
fn config() -> ArgMatches {
25+
let file_path_validation = |path: &str| -> Result<(), String> {
26+
if !Path::new(path).exists() {
27+
Err(format!("{path} file doesn't exists"))
28+
} else {
29+
Ok(())
30+
}
31+
};
32+
33+
let manifest_path_validator = |path: &str| -> Result<(), String> {
34+
match file_path_validation(path).map(|_| {
35+
if Path::new(path).extension() != Some(MANIFEST_FILE_EXT.as_ref()) {
36+
Err(format!("incorrect file extension, expect: .{MANIFEST_FILE_EXT}"))
37+
} else {
38+
Ok(())
39+
}
40+
}) {
41+
Ok(Ok(())) => Ok(()),
42+
Ok(Err(err)) => Err(err),
43+
Err(err) => Err(err),
44+
}
45+
};
46+
let app = App::new("p2p application")
47+
.arg(
48+
Arg::new(ARG_NODE_ADDRESS)
49+
.short('a')
50+
.long(ARG_NODE_ADDRESS)
51+
.help("address node to be bound")
52+
.takes_value(true)
53+
.validator(|value| {
54+
value
55+
.parse::<SocketAddr>()
56+
.map(|_| ())
57+
.map_err(|err| format!("incorrect socket address: {}", err))
58+
})
59+
.required(true),
60+
)
61+
.arg(
62+
Arg::new(ARG_UPLOAD)
63+
.short('u')
64+
.long(ARG_UPLOAD)
65+
.help("Specify file path to file to share")
66+
.takes_value(true)
67+
.multiple_values(true)
68+
.validator(file_path_validation),
69+
)
70+
.arg(
71+
Arg::new(ARG_DOWNLOAD)
72+
.short('d')
73+
.long(ARG_DOWNLOAD)
74+
.help("Specify manifest path to correspond file to be download")
75+
.takes_value(true)
76+
.multiple_values(true)
77+
.validator(manifest_path_validator),
78+
);
79+
80+
app.get_matches()
81+
}
82+
}
83+
84+
impl Default for Config {
85+
fn default() -> Self {
86+
let config = Self::config();
87+
88+
let node_address = config
89+
.value_of(ARG_NODE_ADDRESS)
90+
.map(|value| value.parse::<SocketAddr>().expect("node address is expected to valid"))
91+
.expect("node address is required");
92+
93+
let values_to_vec_path_buf = |values: Option<Values>| -> Option<Vec<PathBuf>> {
94+
Some(
95+
values
96+
.into_iter()
97+
.flat_map(|value| value.into_iter().map(PathBuf::from))
98+
.collect(),
99+
)
100+
};
101+
102+
Self {
103+
node_address,
104+
upload_files: values_to_vec_path_buf(config.values_of(ARG_UPLOAD)),
105+
manifest_files: values_to_vec_path_buf(config.values_of(ARG_DOWNLOAD)),
106+
}
107+
}
108+
}

Diff for: src/dht_node.rs

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use anyhow::Context;
2+
use log::{debug, trace};
3+
use rustydht_lib::{
4+
common::{ipv4_addr_src::IPV4Consensus, Id, Node},
5+
dht::{
6+
dht_event::DHTEvent,
7+
operations::{announce_peer, find_node, get_peers, GetPeersResult},
8+
DHTBuilder, DHTSettingsBuilder, DHT,
9+
},
10+
shutdown::{create_shutdown, ShutdownSender},
11+
};
12+
use std::{
13+
net::{Ipv4Addr, SocketAddrV4},
14+
time::Duration,
15+
};
16+
use tokio::sync::mpsc::Receiver;
17+
18+
pub struct DhtNode {
19+
dht: DHT,
20+
timeout_duration: Duration,
21+
shutdown_notifier: ShutdownSender,
22+
}
23+
24+
impl DhtNode {
25+
pub fn new(timeout_duration: Duration) -> anyhow::Result<Self> {
26+
let (sender, receiver) = create_shutdown();
27+
28+
let port = portpicker::pick_unused_port().expect("no free port in the system :(");
29+
let dht = DHTBuilder::new()
30+
.listen_addr(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port))
31+
.settings(
32+
DHTSettingsBuilder::new()
33+
.read_only(false)
34+
.routers(vec!["localhost:6882".to_string()])
35+
.build(),
36+
)
37+
.ip_source(Box::new(IPV4Consensus::new(0, 10)))
38+
.build(receiver)?;
39+
40+
debug!("running DHT peer on 0.0.0.0:{port}");
41+
42+
Ok(Self {
43+
dht,
44+
timeout_duration,
45+
shutdown_notifier: sender,
46+
})
47+
}
48+
49+
pub async fn run(&self) -> anyhow::Result<()> {
50+
self.dht.run_event_loop().await.map_err(|err| err.into())
51+
}
52+
53+
pub async fn announce_peer<H: AsRef<[u8]>>(&self, info_hash: H, port: u16) -> anyhow::Result<Vec<Node>> {
54+
debug!("announce infohash: {:02x?}", info_hash.as_ref());
55+
let id = Id::from_bytes(info_hash.as_ref())?;
56+
announce_peer(&self.dht, id, Some(port), self.timeout_duration)
57+
.await
58+
.with_context(|| format!("announce peer failed for infohash {:02x?}", info_hash.as_ref()))
59+
}
60+
61+
pub async fn find_nodes<H: AsRef<[u8]>>(&self, info_hash: H) -> anyhow::Result<Vec<Node>> {
62+
debug!("find nodes for infohash {:02x?}", info_hash.as_ref());
63+
let id = Id::from_bytes(info_hash.as_ref())?;
64+
find_node(&self.dht, id, self.timeout_duration)
65+
.await
66+
.with_context(|| format!("find nodes failed for infohash {:02x?}", info_hash.as_ref()))
67+
}
68+
69+
pub async fn get_peers<H: AsRef<[u8]>>(&self, info_hash: H) -> anyhow::Result<GetPeersResult> {
70+
debug!("get peers for infohash {:02x?}", info_hash.as_ref());
71+
let id = Id::from_bytes(info_hash.as_ref())?;
72+
73+
get_peers(&self.dht, id, self.timeout_duration)
74+
.await
75+
.with_context(|| format!("get peers failed for infohash {:02x?}", info_hash.as_ref()))
76+
}
77+
78+
pub async fn stop(&mut self) {
79+
trace!("stopping DHT node");
80+
self.shutdown_notifier.shutdown().await
81+
}
82+
83+
pub fn subscribe(&self) -> Receiver<DHTEvent> {
84+
self.dht.subscribe()
85+
}
86+
}

0 commit comments

Comments
 (0)