Skip to content

Commit 62bd2dd

Browse files
authoredFeb 8, 2025
Add files via upload
1 parent 11f181c commit 62bd2dd

8 files changed

+1170
-0
lines changed
 

‎Cargo.toml

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[package]
2+
name = "phpd"
3+
version = "0.1.0"
4+
edition = "2021"
5+
license = "MIT"
6+
homepage = "https://github.com/TechfaneTechnologies/phpd"
7+
documentation = "https://github.com/TechfaneTechnologies/phpd"
8+
repository = "https://github.com/TechfaneTechnologies/phpd"
9+
authors = ["DrJuneMoone <96371033+MooneDrJune@users.noreply.github.com>"]
10+
11+
[[bin]]
12+
name = "process_dummy_data"
13+
path = "src/process_dummy_data.rs"
14+
15+
[[bin]]
16+
name = "generate_dummy_data"
17+
path = "src/generate_dummy_data.rs"
18+
19+
[features]
20+
default = []
21+
parallel = []
22+
23+
[dependencies]
24+
walkdir = "2"
25+
dirs = "6"
26+
rand = "0.9"
27+
csv = "1"
28+
itertools ="0.14"
29+
rayon = "1"
30+
crossbeam-channel = "0.5"
31+
bytes = "1.10"
32+
memmap2 = "0.9"
33+
tempfile = "3"
34+
time = { version = "0.3", features = ["serde", "formatting", "parsing", "macros", "local-offset", "quickcheck"] }

‎Readme.md

+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# PHPD (Process Hive Partitioned Data)
2+
3+
## Usage Guidelines
4+
5+
To run this program, ensure that Rust is installed on your system. You can download and install Rust from the official website: [Rust Installation Guide](https://www.rust-lang.org/tools/install).
6+
7+
### Installation
8+
9+
Clone the repository using the following command:
10+
11+
```bash
12+
git clone https://github.com/TechfaneTechnologies/phpd.git
13+
```
14+
15+
Then, navigate to the project directory and build the project:
16+
17+
```bash
18+
cd phpd
19+
cargo build --release
20+
```
21+
22+
### Generating Dummy Data
23+
24+
To generate dummy data, execute the following command:
25+
26+
```bash
27+
cargo run --release --bin generate_dummy_data
28+
```
29+
30+
Alternatively, you can run the compiled binary directly:
31+
32+
```bash
33+
./target/release/generate_dummy_data
34+
```
35+
36+
#### Example Terminal Output:
37+
38+
```bash
39+
$ ./target/release/generate_dummy_data
40+
Proceeding with the generation of dummy data for the following instruments: ["BANKNIFTY", "BANKEX", "FINNIFTY", "MIDCPNIFTY", "NIFTY", "NIFTYNXT50", "SENSEX"]
41+
For the year 2024
42+
At directory: /Users/DrJuneMoone/Document/hive_partitioned_data
43+
Successfully generated dummy data at: /Users/DrJuneMoone/Document/hive_partitioned_data
44+
Generated 5502 CSV files across 1841 subfolders, totaling 8.69 GiB
45+
Processing speed: 923.87 MiB per second in 9.63 seconds
46+
```
47+
48+
### Processing Hive Partitioned Dummy Data
49+
50+
To process the generated hive partitioned dummy data, run the following command:
51+
52+
```bash
53+
cargo run --release --bin process_dummy_data
54+
```
55+
56+
Or execute the binary directly:
57+
58+
```bash
59+
./target/release/process_dummy_data
60+
```
61+
62+
#### Example Terminal Output:
63+
64+
```bash
65+
$ ./target/release/process_dummy_data
66+
Found 7 instruments
67+
68+
Instrument: SENSEX Grouped CSV Files {2: [CsvFile { path: "/Users/DrJuneMoone/Document/hive_partitioned_data/SENSEX/20240101/SENSEX-2.csv", date: "20240101", seq_id: 2 }, CsvFile { path: "/Users/DrJuneMoone/Document/hive_partitioned_data/SENSEX/20240102/SENSEX-2.csv", date: "20240102", seq_id: 2 }, .... ]}
69+
70+
Processing instrument: BANKNIFTY
71+
Found 3 sequence groups
72+
Processing sequence group: 2
73+
74+
Processing file: /Users/DrJuneMoone/Document/hive_partitioned_data/NIFTYNXT50/20240102/NIFTYNXT50-1.csv
75+
Processing file: /Users/DrJuneMoone/Document/hive_partitioned_data/NIFTY/20240102/NIFTY-2.csv
76+
Processing file: /Users/DrJuneMoone/Document/hive_partitioned_data/SENSEX/20240103/SENSEX-2.csv
77+
............
78+
............
79+
Successfully merged dummy data at: /Users/DrJuneMoone/Document/hive_partitioned_data
80+
Generated 21 sequentially merged CSV files, totaling 8.69 GiB
81+
Processed 5502 CSV files across 1841 subfolders, totaling 8.69 GiB
82+
Processing speed: 3.30 GiB per second in 5.27 seconds
83+
```
84+
85+
### Changing the Data Directory
86+
87+
To change the location of the hive partitioned data, modify the `base_path` variable in the source code:
88+
89+
1. **For **``: Edit `src/generate_dummy_data.rs`, lines 18-20.
90+
2. **For **``: Edit `src/process_dummy_data.rs`, lines 8-10.
91+
92+
After making the necessary changes, rebuild and run the program to regenerate and process the data with the updated location.
93+
94+
### Processing Actual Hive Partitioned Data
95+
96+
To process your actual hive partitioned data, update the `base_path` variable in `src/process_dummy_data.rs` (lines 8-10) and run the following command:
97+
98+
```bash
99+
cargo run --release --bin process_dummy_data
100+
```
101+
102+
Or execute the compiled binary:
103+
104+
```bash
105+
./target/release/process_dummy_data
106+
```
107+
108+
## Example Video
109+
110+
A video demonstrating the processing performance of the program will be available soon.

‎src/constants.rs

+249
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
pub(crate) const ALL_INSTRUMENTS: [&str; 232] = [
2+
"AARTIIND",
3+
"ABB",
4+
"ABBOTINDIA",
5+
"ABCAPITAL",
6+
"ABFRL",
7+
"ACC",
8+
"ADANIENSOL",
9+
"ADANIENT",
10+
"ADANIGREEN",
11+
"ADANIPORTS",
12+
"ALKEM",
13+
"AMBUJACEM",
14+
"ANGELONE",
15+
"APLAPOLLO",
16+
"APOLLOHOSP",
17+
"APOLLOTYRE",
18+
"ASHOKLEY",
19+
"ASIANPAINT",
20+
"ASTRAL",
21+
"ATGL",
22+
"ATUL",
23+
"AUBANK",
24+
"AUROPHARMA",
25+
"AXISBANK",
26+
"BAJAJ-AUTO",
27+
"BAJAJFINSV",
28+
"BAJFINANCE",
29+
"BALKRISIND",
30+
"BANDHANBNK",
31+
"BANKBARODA",
32+
"BANKINDIA",
33+
"BANKNIFTY",
34+
"BATAINDIA",
35+
"BEL",
36+
"BERGEPAINT",
37+
"BHARATFORG",
38+
"BHARTIARTL",
39+
"BHEL",
40+
"BIOCON",
41+
"BOSCHLTD",
42+
"BPCL",
43+
"BRITANNIA",
44+
"BSE",
45+
"BSOFT",
46+
"CAMS",
47+
"CANBK",
48+
"CANFINHOME",
49+
"CDSL",
50+
"CESC",
51+
"CGPOWER",
52+
"CHAMBLFERT",
53+
"CHOLAFIN",
54+
"CIPLA",
55+
"COALINDIA",
56+
"COFORGE",
57+
"COLPAL",
58+
"CONCOR",
59+
"COROMANDEL",
60+
"CROMPTON",
61+
"CUB",
62+
"CUMMINSIND",
63+
"CYIENT",
64+
"DABUR",
65+
"DALBHARAT",
66+
"DEEPAKNTR",
67+
"DELHIVERY",
68+
"DIVISLAB",
69+
"DIXON",
70+
"DLF",
71+
"DMART",
72+
"DRREDDY",
73+
"EICHERMOT",
74+
"ESCORTS",
75+
"EXIDEIND",
76+
"FEDERALBNK",
77+
"FINNIFTY",
78+
"GAIL",
79+
"GLENMARK",
80+
"GMRAIRPORT",
81+
"GNFC",
82+
"GODREJCP",
83+
"GODREJPROP",
84+
"GRANULES",
85+
"GRASIM",
86+
"GUJGASLTD",
87+
"HAL",
88+
"HAVELLS",
89+
"HCLTECH",
90+
"HDFCAMC",
91+
"HDFCBANK",
92+
"HDFCLIFE",
93+
"HEROMOTOCO",
94+
"HFCL",
95+
"HINDALCO",
96+
"HINDCOPPER",
97+
"HINDPETRO",
98+
"HINDUNILVR",
99+
"HUDCO",
100+
"ICICIBANK",
101+
"ICICIGI",
102+
"ICICIPRULI",
103+
"IDEA",
104+
"IDFCFIRSTB",
105+
"IEX",
106+
"IGL",
107+
"INDHOTEL",
108+
"INDIAMART",
109+
"INDIANB",
110+
"INDIGO",
111+
"INDUSINDBK",
112+
"INDUSTOWER",
113+
"INFY",
114+
"IOC",
115+
"IPCALAB",
116+
"IRB",
117+
"IRCTC",
118+
"IRFC",
119+
"ITC",
120+
"JINDALSTEL",
121+
"JIOFIN",
122+
"JKCEMENT",
123+
"JSL",
124+
"JSWENERGY",
125+
"JSWSTEEL",
126+
"JUBLFOOD",
127+
"KALYANKJIL",
128+
"KEI",
129+
"KOTAKBANK",
130+
"KPITTECH",
131+
"LALPATHLAB",
132+
"LAURUSLABS",
133+
"LICHSGFIN",
134+
"LICI",
135+
"LODHA",
136+
"LT",
137+
"LTF",
138+
"LTIM",
139+
"LTTS",
140+
"LUPIN",
141+
"M&M",
142+
"M&MFIN",
143+
"MANAPPURAM",
144+
"MARICO",
145+
"MARUTI",
146+
"MAXHEALTH",
147+
"MCX",
148+
"METROPOLIS",
149+
"MFSL",
150+
"MGL",
151+
"MIDCPNIFTY",
152+
"MOTHERSON",
153+
"MPHASIS",
154+
"MRF",
155+
"MUTHOOTFIN",
156+
"NATIONALUM",
157+
"NAUKRI",
158+
"NAVINFLUOR",
159+
"NBCC",
160+
"NCC",
161+
"NESTLEIND",
162+
"NHPC",
163+
"NIFTY",
164+
"NIFTYNXT50",
165+
"NMDC",
166+
"NTPC",
167+
"NYKAA",
168+
"OBEROIRLTY",
169+
"OFSS",
170+
"OIL",
171+
"ONGC",
172+
"PAGEIND",
173+
"PAYTM",
174+
"PEL",
175+
"PERSISTENT",
176+
"PETRONET",
177+
"PFC",
178+
"PHOENIXLTD",
179+
"PIDILITIND",
180+
"PIIND",
181+
"PNB",
182+
"POLICYBZR",
183+
"POLYCAB",
184+
"POONAWALLA",
185+
"POWERGRID",
186+
"PRESTIGE",
187+
"PVRINOX",
188+
"RAMCOCEM",
189+
"RBLBANK",
190+
"RECLTD",
191+
"RELIANCE",
192+
"SAIL",
193+
"SBICARD",
194+
"SBILIFE",
195+
"SBIN",
196+
"SHREECEM",
197+
"SHRIRAMFIN",
198+
"SIEMENS",
199+
"SJVN",
200+
"SOLARINDS",
201+
"SONACOMS",
202+
"SRF",
203+
"SUNPHARMA",
204+
"SUNTV",
205+
"SUPREMEIND",
206+
"SYNGENE",
207+
"TATACHEM",
208+
"TATACOMM",
209+
"TATACONSUM",
210+
"TATAELXSI",
211+
"TATAMOTORS",
212+
"TATAPOWER",
213+
"TATASTEEL",
214+
"TCS",
215+
"TECHM",
216+
"TIINDIA",
217+
"TITAN",
218+
"TORNTPHARM",
219+
"TORNTPOWER",
220+
"TRENT",
221+
"TVSMOTOR",
222+
"UBL",
223+
"ULTRACEMCO",
224+
"UNIONBANK",
225+
"UNITDSPR",
226+
"UPL",
227+
"VBL",
228+
"VEDL",
229+
"VOLTAS",
230+
"WIPRO",
231+
"YESBANK",
232+
"ZOMATO",
233+
"ZYDUSLIFE",
234+
];
235+
236+
pub(crate) const OHLCVOI_HEADER: &[u8; 50] = b"Timestamp,Open,High,Low,Close,Volume,OpenInterest\n";
237+
#[allow(dead_code)]
238+
pub(crate) const OHLCVOI_HEADER_ARRAY: [&str; 7] = [
239+
"Timestamp",
240+
"Open",
241+
"High",
242+
"Low",
243+
"Close",
244+
"Volume",
245+
"OpenInterest",
246+
];
247+
pub(crate) const BUFFER_SIZE: usize = 64 * 1024; // 64KB
248+
pub(crate) const CHUNK_SIZE: usize = 5_000;
249+
pub(crate) const NSE_OPERATING_TIME_IN_SECONDS: usize = 22_500;

‎src/generate_dummy_data.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use {
2+
dirs::home_dir,
3+
phpd::{format_size, get_folder_stats, CsvGenerationConfig},
4+
std::time::Instant,
5+
};
6+
7+
const INSTRUMENTS: [&str; 7] = [
8+
"BANKNIFTY",
9+
"BANKEX",
10+
"FINNIFTY",
11+
"MIDCPNIFTY",
12+
"NIFTY",
13+
"NIFTYNXT50",
14+
"SENSEX",
15+
];
16+
17+
fn main() -> std::io::Result<()> {
18+
let mut base_path = home_dir().expect("Failed To Get Home Directory Path");
19+
base_path.push("Document");
20+
base_path.push("hive_partitioned_data");
21+
let base_path_str = base_path.display().to_string();
22+
let year = 2024;
23+
let instruments = Some(INSTRUMENTS.as_slice());
24+
let num_csvs_per_instrument_per_day = 3;
25+
eprintln!(
26+
"Proceeding With The Generation of Dummy Data of The Following Instruments: {:?}\nFor The Year 2024\nAt Directory: {}",
27+
INSTRUMENTS,
28+
&base_path_str
29+
);
30+
let start = Instant::now();
31+
CsvGenerationConfig::generate_dummy_data(
32+
year,
33+
instruments,
34+
num_csvs_per_instrument_per_day,
35+
base_path,
36+
None,
37+
)?;
38+
let (csv_count, subfolder_count, total_size) = get_folder_stats(&base_path_str);
39+
let elapsed_time = start.elapsed().as_secs_f64();
40+
let processing_speed = (total_size as f64) / elapsed_time;
41+
eprintln!(
42+
"Successfully Generated The Dummy Data At Directory: {}\nGenerated {} CSV Files Across {} Subfolders of Total Size {}\nAt The Processing Speed of {} Per Second In {} Seconds",
43+
&base_path_str,
44+
csv_count,
45+
subfolder_count,
46+
format_size(total_size as f64),
47+
format_size(processing_speed),
48+
elapsed_time
49+
);
50+
Ok(())
51+
}

‎src/generate_hive_partitioned_data.rs

+300
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
use {
2+
crate::constants::{
3+
ALL_INSTRUMENTS, BUFFER_SIZE, CHUNK_SIZE, NSE_OPERATING_TIME_IN_SECONDS, OHLCVOI_HEADER,
4+
OHLCVOI_HEADER_ARRAY,
5+
},
6+
csv::{ByteRecord, WriterBuilder},
7+
itertools::izip,
8+
rand::{rng, Rng},
9+
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
10+
std::{
11+
fs::File,
12+
io::{BufWriter, Write as _},
13+
path::{Path, PathBuf},
14+
},
15+
time::{
16+
macros::format_description, Date, Duration, Month, PrimitiveDateTime, Time, UtcOffset,
17+
Weekday,
18+
},
19+
};
20+
21+
#[derive(Debug)]
22+
pub struct Ohlcvoi {
23+
pub timestamp: Vec<String>,
24+
pub open: Vec<f32>,
25+
pub high: Vec<f32>,
26+
pub low: Vec<f32>,
27+
pub close: Vec<f32>,
28+
pub volume: Vec<u32>,
29+
pub oi: Vec<u32>,
30+
}
31+
32+
impl Ohlcvoi {
33+
pub fn random(date: Date, no_of_seconds: Option<usize>) -> Self {
34+
let total_points = no_of_seconds.unwrap_or(NSE_OPERATING_TIME_IN_SECONDS);
35+
let start_time = Time::from_hms(9, 15, 0).unwrap();
36+
// let end_time = Time::from_hms(15, 30, 0).unwrap();
37+
let offset = UtcOffset::from_whole_seconds(5 * 3600 + 30 * 60).unwrap();
38+
let timestamp: Vec<String> = (0..total_points)
39+
.filter_map(|seconds| {
40+
let duration = time::Duration::seconds(seconds as i64);
41+
let datetime = PrimitiveDateTime::new(date, start_time) + duration;
42+
datetime.assume_offset(offset)
43+
.format(format_description!(
44+
"[year]-[month]-[day]T[hour]:[minute]:[second][offset_hour sign:mandatory]:[offset_minute]"
45+
))
46+
.ok()
47+
})
48+
.collect();
49+
let mut rng = rng();
50+
let open: Vec<f32> = (0..total_points)
51+
.map(|_| rng.random_range(100.0..100_000.0))
52+
.collect();
53+
let high: Vec<f32> = (0..total_points)
54+
.map(|_| rng.random_range(100.0..100_000.0))
55+
.collect();
56+
let low: Vec<f32> = (0..total_points)
57+
.map(|_| rng.random_range(100.0..100_000.0))
58+
.collect();
59+
let close: Vec<f32> = (0..total_points)
60+
.map(|_| rng.random_range(100.0..100_000.0))
61+
.collect();
62+
let volume: Vec<u32> = (0..total_points)
63+
.map(|_| rng.random_range(100..1_000_000))
64+
.collect();
65+
let oi: Vec<u32> = (0..total_points)
66+
.map(|_| rng.random_range(100..1_000_000))
67+
.collect();
68+
Self {
69+
timestamp,
70+
open,
71+
high,
72+
low,
73+
close,
74+
volume,
75+
oi,
76+
}
77+
}
78+
79+
#[allow(dead_code)]
80+
fn write_csv<P: AsRef<Path>>(self, output_path: P) -> std::io::Result<()> {
81+
let len = self.timestamp.len();
82+
if ![
83+
&self.open.len(),
84+
&self.high.len(),
85+
&self.low.len(),
86+
&self.close.len(),
87+
&self.volume.len(),
88+
&self.oi.len(),
89+
]
90+
.iter()
91+
.all(|&x| *x == len)
92+
{
93+
return Err(std::io::Error::new(
94+
std::io::ErrorKind::InvalidData,
95+
"All vectors must have the same length",
96+
));
97+
}
98+
let output_file = File::create(output_path)?;
99+
let mut writer = csv::WriterBuilder::new()
100+
.buffer_capacity(BUFFER_SIZE)
101+
.from_writer(output_file);
102+
writer.write_record(OHLCVOI_HEADER_ARRAY)?;
103+
for chunk_start in (0..self.timestamp.len()).step_by(CHUNK_SIZE) {
104+
eprintln!(
105+
"Chunk Start {} Chunk End {}, {}",
106+
chunk_start,
107+
chunk_start + CHUNK_SIZE,
108+
self.timestamp.len()
109+
);
110+
let chunk_end = (chunk_start + CHUNK_SIZE).min(self.timestamp.len());
111+
eprintln!("Chunk Start {} Chunk End {}", chunk_start, chunk_end);
112+
for i in chunk_start..chunk_end {
113+
writer.write_record([
114+
&self.timestamp[i],
115+
&self.open[i].to_string(),
116+
&self.high[i].to_string(),
117+
&self.low[i].to_string(),
118+
&self.close[i].to_string(),
119+
&self.volume[i].to_string(),
120+
&self.oi[i].to_string(),
121+
])?;
122+
}
123+
writer.flush()?;
124+
}
125+
Ok(())
126+
}
127+
128+
#[allow(dead_code)]
129+
fn to_csv<P: AsRef<Path>>(&self, output_path: P) -> std::io::Result<()> {
130+
let output_file = File::create(output_path)?;
131+
let mut writer = WriterBuilder::new()
132+
.buffer_capacity(BUFFER_SIZE)
133+
.from_writer(output_file);
134+
writer.write_record(OHLCVOI_HEADER_ARRAY)?;
135+
let mut record = ByteRecord::with_capacity(128, 7);
136+
let mut buffer = Vec::with_capacity(128);
137+
for chunk_start in (0..self.timestamp.len()).step_by(CHUNK_SIZE) {
138+
let chunk_end = (chunk_start + CHUNK_SIZE).min(self.timestamp.len());
139+
eprintln!("Chunk Start {} Chunk End {}", chunk_start, chunk_end);
140+
for i in chunk_start..chunk_end {
141+
record.clear();
142+
buffer.clear();
143+
record.push_field(self.timestamp[i].as_bytes());
144+
write!(buffer, "{:.2}", self.open[i])?;
145+
record.push_field(&buffer);
146+
buffer.clear();
147+
write!(buffer, "{:.2}", self.high[i])?;
148+
record.push_field(&buffer);
149+
buffer.clear();
150+
write!(buffer, "{:.2}", self.low[i])?;
151+
record.push_field(&buffer);
152+
buffer.clear();
153+
write!(buffer, "{:.2}", self.close[i])?;
154+
record.push_field(&buffer);
155+
buffer.clear();
156+
write!(buffer, "{}", self.volume[i])?;
157+
record.push_field(&buffer);
158+
buffer.clear();
159+
write!(buffer, "{}", self.oi[i])?;
160+
record.push_field(&buffer);
161+
writer.write_byte_record(&record)?;
162+
}
163+
writer.flush()?;
164+
}
165+
Ok(())
166+
}
167+
168+
pub fn write_to_csv<P: AsRef<Path>>(&self, output_path: P) -> std::io::Result<()> {
169+
let output_file = File::create(output_path)?;
170+
let mut writer = BufWriter::with_capacity(BUFFER_SIZE, output_file);
171+
writer.write_all(OHLCVOI_HEADER)?;
172+
for chunk_start in (0..self.timestamp.len()).step_by(CHUNK_SIZE) {
173+
let chunk_end = (chunk_start + CHUNK_SIZE).min(self.timestamp.len());
174+
for (timestamp, open, high, low, close, volume, oi) in izip!(
175+
&self.timestamp[chunk_start..chunk_end],
176+
&self.open[chunk_start..chunk_end],
177+
&self.high[chunk_start..chunk_end],
178+
&self.low[chunk_start..chunk_end],
179+
&self.close[chunk_start..chunk_end],
180+
&self.volume[chunk_start..chunk_end],
181+
&self.oi[chunk_start..chunk_end]
182+
) {
183+
writeln!(
184+
writer,
185+
"{},{:.2},{:.2},{:.2},{:.2},{},{}",
186+
timestamp, open, high, low, close, volume, oi
187+
)?;
188+
}
189+
writer.flush()?;
190+
}
191+
writer.flush()?;
192+
Ok(())
193+
}
194+
195+
#[cfg(feature = "parallel")]
196+
pub fn generate_multiple(dates: &[Date], no_of_seconds: Option<usize>) -> Vec<Self> {
197+
dates
198+
.par_iter()
199+
.map(|&date| Self::random(date, no_of_seconds))
200+
.collect()
201+
}
202+
}
203+
204+
pub struct CsvGenerationConfig<'a> {
205+
pub base_path: PathBuf,
206+
pub instruments: Option<&'a [&'a str]>,
207+
pub num_csvs_per_instrument_per_day: usize,
208+
pub total_points_per_csv: Option<usize>,
209+
}
210+
211+
impl<'a> CsvGenerationConfig<'a> {
212+
pub fn new(
213+
base_path: PathBuf,
214+
instruments: Option<&'a [&'a str]>,
215+
num_csvs_per_instrument_per_day: usize,
216+
total_points_per_csv: Option<usize>,
217+
) -> Self {
218+
CsvGenerationConfig {
219+
base_path,
220+
instruments,
221+
num_csvs_per_instrument_per_day,
222+
total_points_per_csv,
223+
}
224+
}
225+
226+
fn generate_instrument_day_csvs(&self, instrument: &str, date: Date) -> std::io::Result<()> {
227+
let instrument_dir = self.base_path.join(instrument);
228+
std::fs::create_dir_all(&instrument_dir)?;
229+
let date_dir = instrument_dir.join(
230+
date.format(format_description!("[year][month][day]"))
231+
.unwrap_or_default(),
232+
);
233+
std::fs::create_dir_all(&date_dir)?;
234+
for seq_no in 0..self.num_csvs_per_instrument_per_day {
235+
let ohlcvoi = Ohlcvoi::random(date, self.total_points_per_csv);
236+
let filename = format!("{}-{}.csv", instrument, seq_no);
237+
let csv_path = date_dir.join(filename);
238+
ohlcvoi.write_to_csv(&csv_path)?;
239+
}
240+
Ok(())
241+
}
242+
243+
fn generate_all_instrument_csvs(&self, year: i32) -> std::io::Result<()> {
244+
let business_days = generate_business_days(year);
245+
let instruments = self.instruments.unwrap_or(ALL_INSTRUMENTS.as_slice());
246+
instruments.par_iter().try_for_each(|instrument| {
247+
business_days
248+
.par_iter()
249+
.try_for_each(|&date| self.generate_instrument_day_csvs(instrument, date))
250+
})?;
251+
Ok(())
252+
}
253+
254+
pub fn generate_dummy_data(
255+
year: i32,
256+
instruments: Option<&'a [&'a str]>,
257+
num_csvs_per_instrument_per_day: usize,
258+
base_path: PathBuf,
259+
total_points_per_csv: Option<usize>,
260+
) -> std::io::Result<()> {
261+
let config = CsvGenerationConfig {
262+
base_path,
263+
instruments,
264+
num_csvs_per_instrument_per_day,
265+
total_points_per_csv,
266+
};
267+
config.generate_all_instrument_csvs(year)?;
268+
Ok(())
269+
}
270+
}
271+
272+
#[inline]
273+
fn generate_business_days(year: i32) -> Vec<Date> {
274+
let start_date = Date::from_calendar_date(year, Month::January, 1).unwrap();
275+
let end_date = Date::from_calendar_date(year, Month::December, 31).unwrap();
276+
let total_days = (end_date - start_date).whole_days();
277+
let is_leap_year = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
278+
let estimated_business_days = if is_leap_year { 262 } else { 261 };
279+
let mut business_days = Vec::with_capacity(estimated_business_days);
280+
(0..total_days + 1)
281+
.map(|days| start_date + Duration::days(days))
282+
.filter(|date| !matches!(date.weekday(), Weekday::Saturday | Weekday::Sunday))
283+
.for_each(|date| business_days.push(date));
284+
business_days.shrink_to_fit();
285+
business_days
286+
}
287+
288+
#[cfg(test)]
289+
mod tests {
290+
use time::macros::date;
291+
292+
use super::*;
293+
294+
#[test]
295+
fn test_generate_business_days() {
296+
let twenty_twenty_four_dates = generate_business_days(2024);
297+
eprintln!("{:?}", twenty_twenty_four_dates);
298+
assert_eq!(twenty_twenty_four_dates.len(), 262);
299+
}
300+
}

‎src/lib.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
mod constants;
2+
pub mod generate_hive_partitioned_data;
3+
pub mod process_hive_partitioned_data;
4+
5+
pub use generate_hive_partitioned_data::*;
6+
pub use process_hive_partitioned_data::*;
7+
8+
use {std::path::Path, walkdir::WalkDir};
9+
10+
pub fn get_folder_stats<P: AsRef<Path>>(path: P) -> (usize, usize, u64) {
11+
let mut csv_count = 0;
12+
let mut subfolder_count = 0;
13+
let mut total_size = 0;
14+
15+
for entry in WalkDir::new(path.as_ref())
16+
.into_iter()
17+
.filter_map(|e| e.ok())
18+
{
19+
if entry.file_type().is_dir() && entry.path() != path.as_ref() {
20+
subfolder_count += 1;
21+
} else if entry.path().extension().and_then(|s| s.to_str()) == Some("csv") {
22+
csv_count += 1;
23+
if let Ok(metadata) = entry.metadata() {
24+
total_size += metadata.len();
25+
}
26+
}
27+
}
28+
29+
(csv_count, subfolder_count, total_size)
30+
}
31+
32+
pub fn format_size(size_bytes: f64) -> String {
33+
if size_bytes >= 1024.0 * 1024.0 * 1024.0 {
34+
format!("{:.2} GiB", size_bytes / (1024.0 * 1024.0 * 1024.0))
35+
} else {
36+
format!("{:.2} MiB", size_bytes / (1024.0 * 1024.0))
37+
}
38+
}

‎src/process_dummy_data.rs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use {
2+
dirs::home_dir,
3+
phpd::{format_size, get_folder_stats, MergeConfig, ThreadSafeError},
4+
std::time::Instant,
5+
};
6+
7+
fn main() -> Result<(), ThreadSafeError> {
8+
let mut base_path = home_dir().expect("Failed To Get Home Directory Path");
9+
base_path.push("Document");
10+
base_path.push("hive_partitioned_data");
11+
let base_path_str = base_path.display().to_string();
12+
let (csv_count, subfolder_count, total_size) = get_folder_stats(&base_path_str);
13+
14+
let mut config = MergeConfig::new(base_path);
15+
16+
// Disable combined CSV creation if needed
17+
// to create sequence wise combined csv's.
18+
config.create_combined = false;
19+
20+
let start = Instant::now();
21+
config.merge_all_instruments()?;
22+
let elapsed_time = start.elapsed().as_secs_f64();
23+
let (new_csv_count, _, new_total_size) = get_folder_stats(&base_path_str);
24+
let merged_csv_count = new_csv_count - csv_count;
25+
let merged_csv_size = new_total_size - total_size;
26+
let processing_speed = (new_total_size as f64) / elapsed_time;
27+
eprintln!(
28+
"Successfully Merged The Dummy Data At Directory: {}\nGenerated Sequently Meged {} CSV Files of Total Size {}",
29+
&base_path_str,
30+
merged_csv_count,
31+
format_size(merged_csv_size as f64)
32+
);
33+
eprintln!(
34+
"From {} CSV Files Across {} Subfolders of Total Size {}\nAt The Processing Speed of {} Per Second In {} Seconds",
35+
csv_count,
36+
subfolder_count,
37+
format_size(total_size as f64),
38+
format_size(processing_speed),
39+
elapsed_time
40+
);
41+
Ok(())
42+
}

‎src/process_hive_partitioned_data.rs

+346
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
use {
2+
bytes::{Bytes, BytesMut},
3+
crossbeam_channel::{bounded, Receiver, Sender},
4+
memmap2::MmapOptions,
5+
rayon::prelude::*,
6+
std::{
7+
collections::HashMap,
8+
error::Error,
9+
fs::{self, File},
10+
io::{BufReader, BufWriter, Read, Write},
11+
path::{Path, PathBuf},
12+
sync::Arc,
13+
},
14+
tempfile::{NamedTempFile, TempDir},
15+
walkdir::WalkDir,
16+
};
17+
18+
pub type ThreadSafeError = Box<dyn Error + Send + Sync>;
19+
20+
#[derive(Clone)]
21+
pub struct MergeConfig {
22+
pub base_path: PathBuf,
23+
pub output_base_path: PathBuf,
24+
pub chunk_size: usize,
25+
pub buffer_capacity: usize,
26+
pub create_combined: bool,
27+
pub wal_enabled: bool,
28+
wal: Option<Arc<WalWriter>>,
29+
}
30+
31+
#[derive(Debug, Clone)]
32+
struct CsvFile {
33+
path: PathBuf,
34+
date: String,
35+
seq_id: usize,
36+
}
37+
38+
#[allow(dead_code)]
39+
struct WalWriter {
40+
wal_dir: TempDir,
41+
current_file: NamedTempFile,
42+
wal_path: PathBuf,
43+
}
44+
45+
impl WalWriter {
46+
pub fn new() -> Result<Self, ThreadSafeError> {
47+
let wal_dir = tempfile::tempdir()?;
48+
let current_file = NamedTempFile::new_in(&wal_dir)?;
49+
let wal_path = current_file.path().to_owned();
50+
51+
Ok(Self {
52+
wal_dir,
53+
current_file,
54+
wal_path,
55+
})
56+
}
57+
58+
pub fn write(&self, data: &[u8]) -> Result<(), ThreadSafeError> {
59+
self.current_file.as_file().write_all(data)?;
60+
Ok(())
61+
}
62+
63+
pub fn commit(&self) -> Result<(), ThreadSafeError> {
64+
self.current_file.as_file().sync_all()?;
65+
Ok(())
66+
}
67+
}
68+
69+
impl MergeConfig {
70+
pub fn new<P: AsRef<Path>>(base_path: P) -> Self {
71+
MergeConfig {
72+
base_path: base_path.as_ref().to_path_buf(),
73+
output_base_path: base_path.as_ref().to_path_buf(),
74+
chunk_size: 1024 * 1024, // 1MB chunks
75+
buffer_capacity: 8 * 1024 * 1024, // 8MB buffer
76+
create_combined: true, // Default to creating combined CSV
77+
wal_enabled: false,
78+
wal: None,
79+
}
80+
}
81+
82+
pub fn new_with_wal<P: AsRef<Path>>(base_path: P) -> Result<Self, ThreadSafeError> {
83+
Ok(MergeConfig {
84+
base_path: base_path.as_ref().to_path_buf(),
85+
output_base_path: base_path.as_ref().to_path_buf(),
86+
chunk_size: 1024 * 1024, // 1MB chunks
87+
buffer_capacity: 8 * 1024 * 1024, // 8MB buffer
88+
create_combined: true, // Default to creating combined CSV
89+
wal_enabled: true,
90+
wal: Some(Arc::new(WalWriter::new()?)),
91+
})
92+
}
93+
94+
fn find_instruments(&self) -> Result<Vec<String>, ThreadSafeError> {
95+
let mut instruments = Vec::new();
96+
for entry in fs::read_dir(&self.base_path)? {
97+
let entry = entry?;
98+
let path = entry.path();
99+
if path.is_dir() {
100+
if let Some(instrument_name) = path.file_name() {
101+
instruments.push(instrument_name.to_string_lossy().to_string());
102+
}
103+
}
104+
}
105+
Ok(instruments)
106+
}
107+
108+
fn find_grouped_csv_files(
109+
&self,
110+
instrument: &str,
111+
) -> Result<HashMap<usize, Vec<CsvFile>>, ThreadSafeError> {
112+
let mut grouped_csv_files: HashMap<usize, Vec<CsvFile>> = HashMap::new();
113+
for date_dir in WalkDir::new(self.base_path.join(instrument))
114+
.min_depth(1)
115+
.max_depth(1)
116+
.into_iter()
117+
.filter_map(|e| e.ok())
118+
.filter(|e| e.file_type().is_dir())
119+
{
120+
let date_str = date_dir.file_name().to_string_lossy().to_string();
121+
for csv_entry in WalkDir::new(date_dir.path())
122+
.min_depth(1)
123+
.max_depth(1)
124+
.into_iter()
125+
.filter_map(|e| e.ok())
126+
.filter(|e| {
127+
e.file_type().is_file()
128+
&& e.path()
129+
.extension()
130+
.map_or_else(|| false, |ext| ext == "csv")
131+
})
132+
{
133+
let filename = csv_entry.file_name().to_string_lossy();
134+
let seq_id = filename
135+
.split('-')
136+
.nth(1)
137+
.and_then(|s| s.split('.').next())
138+
.and_then(|s| s.parse().ok())
139+
.unwrap_or(usize::MAX);
140+
141+
let csv_file = CsvFile {
142+
path: csv_entry.path().to_path_buf(),
143+
date: date_str.clone(),
144+
seq_id,
145+
};
146+
grouped_csv_files.entry(seq_id).or_default().push(csv_file);
147+
}
148+
}
149+
for files in grouped_csv_files.values_mut() {
150+
files.sort_by(|a, b| a.date.cmp(&b.date));
151+
}
152+
eprintln!(
153+
"Instrument: {} Grouped CSV Files {:?}",
154+
&instrument, grouped_csv_files
155+
);
156+
Ok(grouped_csv_files)
157+
}
158+
159+
fn process_sequence_group(
160+
&self,
161+
files: &[CsvFile],
162+
tx: Sender<Bytes>,
163+
) -> Result<(), ThreadSafeError> {
164+
for (file_index, csv_file) in files.iter().enumerate() {
165+
eprintln!("Processing file: {}", csv_file.path.display());
166+
let file = File::open(&csv_file.path)?;
167+
if file.metadata()?.len() as usize > self.buffer_capacity * 4 {
168+
self.process_large_file(&csv_file.path, tx.clone(), file_index)?;
169+
} else {
170+
self.process_small_file(&csv_file.path, tx.clone(), file_index)?;
171+
};
172+
}
173+
Ok(())
174+
}
175+
176+
fn process_small_file(
177+
&self,
178+
path: &PathBuf,
179+
tx: Sender<Bytes>,
180+
file_index: usize,
181+
) -> Result<(), ThreadSafeError> {
182+
let file = File::open(path)?;
183+
let mut reader = BufReader::with_capacity(self.buffer_capacity, file);
184+
let mut mmap = Vec::new();
185+
reader.read_to_end(&mut mmap)?;
186+
if file_index > 0 {
187+
let mut start_pos = 0;
188+
while start_pos < mmap.len() && mmap[start_pos] != b'\n' {
189+
start_pos += 1;
190+
}
191+
mmap.drain(..start_pos + 1);
192+
}
193+
let mut pos = 0;
194+
let mut buffer = BytesMut::with_capacity(self.chunk_size);
195+
196+
while pos < mmap.len() {
197+
let chunk_end = (pos + self.chunk_size).min(mmap.len());
198+
let mut actual_end = chunk_end;
199+
while actual_end > pos && mmap[actual_end - 1] != b'\n' {
200+
actual_end -= 1;
201+
}
202+
if actual_end > pos {
203+
buffer.extend_from_slice(&mmap[pos..actual_end]);
204+
tx.send(buffer.split().freeze())?;
205+
pos = actual_end;
206+
} else {
207+
buffer.extend_from_slice(&mmap[pos..chunk_end]);
208+
pos = chunk_end;
209+
}
210+
}
211+
if !buffer.is_empty() {
212+
tx.send(buffer.freeze())?;
213+
}
214+
Ok(())
215+
}
216+
217+
fn process_large_file(
218+
&self,
219+
path: &PathBuf,
220+
tx: Sender<Bytes>,
221+
file_index: usize,
222+
) -> Result<(), ThreadSafeError> {
223+
let file = File::open(path)?;
224+
let mmap = unsafe { MmapOptions::new().map(&file)? };
225+
let mut start_pos = 0;
226+
if file_index > 0 {
227+
while start_pos < mmap.len() && mmap[start_pos] != b'\n' {
228+
start_pos += 1;
229+
}
230+
start_pos += 1;
231+
}
232+
let mut pos = start_pos;
233+
let mut buffer = BytesMut::with_capacity(self.chunk_size);
234+
while pos < mmap.len() {
235+
let chunk_end = (pos + self.chunk_size).min(mmap.len());
236+
buffer.extend_from_slice(&mmap[pos..chunk_end]);
237+
let mut last_newline = buffer.len();
238+
while last_newline > 0 && buffer[last_newline - 1] != b'\n' {
239+
last_newline -= 1;
240+
}
241+
if last_newline > 0 {
242+
let chunk = buffer.split_to(last_newline).freeze();
243+
tx.send(chunk)?;
244+
}
245+
pos = chunk_end;
246+
}
247+
if !buffer.is_empty() {
248+
tx.send(buffer.freeze())?;
249+
}
250+
Ok(())
251+
}
252+
253+
fn merge_instrument_csvs(&self, instrument: &str) -> Result<(), ThreadSafeError> {
254+
let grouped_csv_files = self.find_grouped_csv_files(instrument)?;
255+
eprintln!("Processing instrument: {}", instrument);
256+
eprintln!("Found {} sequence groups", grouped_csv_files.len());
257+
if self.create_combined {
258+
self.create_combined_instrument_csv(instrument, &grouped_csv_files)?;
259+
} else {
260+
for (seq_id, files) in grouped_csv_files {
261+
eprintln!("Processing sequence group: {}", seq_id);
262+
let seq_output_path = self
263+
.output_base_path
264+
.join(instrument)
265+
.join(format!("{}-{}.csv", instrument, seq_id));
266+
fs::create_dir_all(seq_output_path.parent().unwrap())?;
267+
let seq_output_file = File::create(&seq_output_path)?;
268+
let seq_output_writer =
269+
BufWriter::with_capacity(self.buffer_capacity, seq_output_file);
270+
let (tx, rx) = bounded(10); // Bounded channel with some buffer
271+
let files_arc = Arc::new(files.clone());
272+
let files_clone = Arc::clone(&files_arc);
273+
let config_clone = self.clone();
274+
let handle = std::thread::spawn(move || {
275+
if let Err(e) = config_clone.process_sequence_group(&files_clone, tx) {
276+
eprintln!("Error processing sequence group {}: {}", seq_id, e);
277+
}
278+
});
279+
self.write_records(seq_output_writer, rx)?;
280+
handle.join().map_err(|_| "Thread panicked")?;
281+
}
282+
}
283+
Ok(())
284+
}
285+
286+
fn write_records(
287+
&self,
288+
mut writer: BufWriter<File>,
289+
rx: Receiver<Bytes>,
290+
) -> Result<(), ThreadSafeError> {
291+
while let Ok(chunk) = rx.recv() {
292+
if let Some(wal) = &self.wal {
293+
wal.write(&chunk)?;
294+
}
295+
writer.write_all(&chunk)?;
296+
}
297+
writer.flush()?;
298+
if let Some(wal) = &self.wal {
299+
wal.commit()?;
300+
}
301+
Ok(())
302+
}
303+
304+
fn create_combined_instrument_csv(
305+
&self,
306+
instrument: &str,
307+
grouped_csv_files: &HashMap<usize, Vec<CsvFile>>,
308+
) -> Result<(), ThreadSafeError> {
309+
eprintln!("Creating combined CSV for instrument: {}", instrument);
310+
let combined_output_path = self
311+
.output_base_path
312+
.join(instrument)
313+
.join(format!("{}.csv", instrument));
314+
fs::create_dir_all(combined_output_path.parent().unwrap())?;
315+
let combined_output_file = File::create(&combined_output_path)?;
316+
let combined_output_writer =
317+
BufWriter::with_capacity(self.buffer_capacity, combined_output_file);
318+
let mut all_files: Vec<CsvFile> = grouped_csv_files.values().flatten().cloned().collect();
319+
all_files.sort_by(|a, b| a.date.cmp(&b.date).then(a.seq_id.cmp(&b.seq_id)));
320+
eprintln!(
321+
"For Instrument {} Sorted all files: {:?}",
322+
instrument, all_files
323+
);
324+
let (tx, rx) = bounded(10); // Bounded channel with some buffer
325+
let files_arc = Arc::new(all_files);
326+
let files_clone = Arc::clone(&files_arc);
327+
let config_clone = self.clone();
328+
let handle = std::thread::spawn(move || {
329+
if let Err(e) = config_clone.process_sequence_group(&files_clone, tx) {
330+
eprintln!("Error processing combined CSV: {}", e);
331+
}
332+
});
333+
self.write_records(combined_output_writer, rx)?;
334+
handle.join().map_err(|_| "Thread panicked")?;
335+
Ok(())
336+
}
337+
338+
pub fn merge_all_instruments(&self) -> Result<(), ThreadSafeError> {
339+
let instruments = self.find_instruments()?;
340+
eprintln!("Found {} instruments", instruments.len());
341+
instruments
342+
.par_iter()
343+
.try_for_each(|instrument| self.merge_instrument_csvs(instrument))?;
344+
Ok(())
345+
}
346+
}

0 commit comments

Comments
 (0)
Please sign in to comment.