Skip to content

Commit

Permalink
added pipelining.
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan committed Oct 6, 2024
1 parent 19f9ad6 commit cdfa9cd
Showing 1 changed file with 64 additions and 38 deletions.
102 changes: 64 additions & 38 deletions script/dftracer_split
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ else
date_echo "Previous counting present"
fi
wait
echo ""

counts=$(wc -l ${counting_file} | awk {'print $1'})

Expand All @@ -202,6 +201,7 @@ CHUNKS=()
chunk_index=0
accumulated_size=0
temp_chunk=""
scheduled_chunks=-1
while IFS='|' read -r index file_name size start end; do
size_per_line=$(bc -l <<< "scale=8; $size / ($end - $start + 1)")

Expand All @@ -218,28 +218,52 @@ while IFS='|' read -r index file_name size start end; do

if [ $(bc -l <<< "scale=0; $lines > 0") -eq 1 ]; then
if [ "${CHUNKS[$chunk_index]}" == "" ]; then
progress_date_echo "Adding $index of $total to chunk $chunk_index with size $accumulated_size"
CHUNKS[$chunk_index]="$file_name,$size_chunk,$start,$((start + lines))"
else
progress_date_echo "Adding $index of $total to chunk $chunk_index with size $accumulated_size"
CHUNKS[$chunk_index]="${CHUNKS[$chunk_index]};$file_name,$size_chunk,$start,$((start + lines))"
fi

start=$((start + lines + 1))
accumulated_size=$(bc -l <<< "scale=8; $accumulated_size + $size_chunk")
size=$(bc -l <<< "scale=8; $size - $size_chunk")
else
progress_date_echo "Scheduling $index of $total to chunk $chunk_index with size $accumulated_size"
i=$chunk_index
scheduled_chunks=$chunk_index
{
chunk=${CHUNKS[$i]}
if [ "$verbose" == "1" ]; then
date_echo "Processing chunk $i with size $size MB"
fi
chunk_file=$dest/${app_name}-$i.pfw
rm -f $chunk_file
touch $chunk_file
echo '[' > $chunk_file
IFS=';' read -ra files <<< "$chunk"
total_size=0
for file in "${files[@]}"; do
file_name=$(echo $file | cut -d, -f1)
size=$(echo $file | cut -d, -f2)
start=$(echo $file | cut -d, -f3)
end=$(echo $file | cut -d, -f4)
if [ "$verbose" == "1" ]; then
date_echo "[CHUNK $i] Extracting from $file_name from $start to $end with size $size"
fi
$zq_exec ${file_name}.pfw.gz --index-file ${file_name}.pfw.gz.zindex --raw "select a.line from LineOffsets a where a.line >= $start AND a.line <= $end AND a.length > 8;" | grep -v '^[[]\|^[]]' >> $chunk_file
total_size=$(bc -l <<< "scale=2; $total_size + $size")
done
echo ']' >> $chunk_file
progress_date_echo "Chunk $i out of ${#CHUNKS[@]} done with size $total_size MB, path = $chunk_file"
} &
chunk_index=$((chunk_index + 1))
accumulated_size=0
fi
else
accumulated_size=$(bc -l <<< "scale=8; $accumulated_size + $size")
if [ "${CHUNKS[$chunk_index]}" == "" ]; then
CHUNKS[$chunk_index]="$file_name,$size,$start,$end"
progress_date_echo "Adding $index of $total to chunk $chunk_index with size $accumulated_size"
else
CHUNKS[$chunk_index]="${CHUNKS[$chunk_index]};$file_name,$size,$start,$end"
progress_date_echo "Adding $index of $total to chunk $chunk_index with size $accumulated_size"
fi
size=0
fi
Expand All @@ -250,42 +274,44 @@ date_echo "Total chunks: ${#CHUNKS[@]}"
date_echo "Start processing chunks"

for i in ${!CHUNKS[@]}; do
running_jobs=$(jobs -rp | wc -l)
if [ $running_jobs -ge $JOBS_LIMIT ]; then
# date_echo "waiting for Running $running_jobs jobs to be less than $JOBS_LIMIT"
while [ $running_jobs -ge $JOBS_LIMIT ]
do
sleep 1
running_jobs=$(jobs -rp | wc -l)
done
date_echo "Running $running_jobs jobs are now less than $JOBS_LIMIT"
fi

{
chunk=${CHUNKS[$i]}
if [ "$verbose" == "1" ]; then
date_echo "Processing chunk $i with size $size MB"
if [ $i -gt $scheduled_chunks ]; then
running_jobs=$(jobs -rp | wc -l)
if [ $running_jobs -ge $JOBS_LIMIT ]; then
# date_echo "waiting for Running $running_jobs jobs to be less than $JOBS_LIMIT"
while [ $running_jobs -ge $JOBS_LIMIT ]
do
sleep 1
running_jobs=$(jobs -rp | wc -l)
done
date_echo "Running $running_jobs jobs are now less than $JOBS_LIMIT"
fi
chunk_file=$dest/${app_name}-$i.pfw
rm -f $chunk_file
touch $chunk_file
echo '[' > $chunk_file
IFS=';' read -ra files <<< "$chunk"
total_size=0
for file in "${files[@]}"; do
file_name=$(echo $file | cut -d, -f1)
size=$(echo $file | cut -d, -f2)
start=$(echo $file | cut -d, -f3)
end=$(echo $file | cut -d, -f4)

{
chunk=${CHUNKS[$i]}
if [ "$verbose" == "1" ]; then
date_echo "[CHUNK $i] Extracting from $file_name from $start to $end with size $size"
date_echo "Processing chunk $i with size $size MB"
fi
$zq_exec ${file_name}.pfw.gz --index-file ${file_name}.pfw.gz.zindex --raw "select a.line from LineOffsets a where a.line >= $start AND a.line <= $end;" | grep -v '^[[]\|^[]]' >> $chunk_file
total_size=$(bc -l <<< "scale=2; $total_size + $size")
done
echo ']' >> $chunk_file
progress_date_echo "Chunk $i out of ${#CHUNKS[@]} done with size $total_size MB, path = $chunk_file"
} &
chunk_file=$dest/${app_name}-$i.pfw
rm -f $chunk_file
touch $chunk_file
echo '[' > $chunk_file
IFS=';' read -ra files <<< "$chunk"
total_size=0
for file in "${files[@]}"; do
file_name=$(echo $file | cut -d, -f1)
size=$(echo $file | cut -d, -f2)
start=$(echo $file | cut -d, -f3)
end=$(echo $file | cut -d, -f4)
if [ "$verbose" == "1" ]; then
date_echo "[CHUNK $i] Extracting from $file_name from $start to $end with size $size"
fi
$zq_exec ${file_name}.pfw.gz --index-file ${file_name}.pfw.gz.zindex --raw "select a.line from LineOffsets a where a.line >= $start AND a.line <= $end AND a.length > 8;" | grep -v '^[[]\|^[]]' >> $chunk_file
total_size=$(bc -l <<< "scale=2; $total_size + $size")
done
echo ']' >> $chunk_file
progress_date_echo "Chunk $i out of ${#CHUNKS[@]} done with size $total_size MB, path = $chunk_file"
} &
fi
done
wait

Expand Down

0 comments on commit cdfa9cd

Please sign in to comment.