Skip to content

Commit

Permalink
Added method StarWriter.file_contents
Browse files Browse the repository at this point in the history
Generates strings that are written out by `StarWriter.write`.
  • Loading branch information
jahooker committed Jul 16, 2024
1 parent 5147912 commit 7987ae0
Showing 1 changed file with 38 additions and 52 deletions.
90 changes: 38 additions & 52 deletions src/starfile/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Union, Dict, List
from typing import TYPE_CHECKING, Union, Dict, List, Generator
from importlib.metadata import version
import csv

Expand Down Expand Up @@ -60,32 +60,37 @@ def coerce_data_blocks(
got {type(data_blocks)}'
)

def file_contents(self) -> Generator[str, None, None]:
yield f'{package_info()}\n'
yield '\n' * 2
yield from self.data_block_generator()

def write(self):
write_package_info(self.filename)
write_blank_lines(self.filename, n=2)
self.write_data_blocks()
with open(self.filename, mode='w+') as f:
for item in self.file_contents():
f.write(item)

def write_data_blocks(self):
def data_block_generator(self) -> Generator[str, None, None]:
for block_name, block in self.data_blocks.items():
if isinstance(block, dict):
write_simple_block(
file=self.filename,
for line in simple_block(
block_name=block_name,
data=block,
quote_character=self.quote_character,
quote_all_strings=self.quote_all_strings
)
):
yield line
elif isinstance(block, pd.DataFrame):
write_loop_block(
file=self.filename,
for line in loop_block(
block_name=block_name,
df=block,
float_format=self.float_format,
separator=self.sep,
na_rep=self.na_rep,
quote_character=self.quote_character,
quote_all_strings=self.quote_all_strings
)
):
yield line

def backup_if_file_exists(self):
if self.filename.exists():
Expand Down Expand Up @@ -117,75 +122,56 @@ def coerce_list(data_blocks: List[DataBlock]) -> Dict[str, DataBlock]:
return {f'{idx}': df for idx, df in enumerate(data_blocks)}


def write_blank_lines(file: Path, n: int):
with open(file, mode='a') as f:
f.write('\n' * n)


def write_package_info(file: Path):
def package_info():
date = datetime.now().strftime('%d/%m/%Y')
time = datetime.now().strftime('%H:%M:%S')
line = f'# Created by the starfile Python package (version {__version__}) at {time} on {date}'
with open(file, mode='w+') as f:
f.write(f'{line}\n')
return f'# Created by the starfile Python package (version {__version__}) at {time} on {date}'


def write_simple_block(
file: Path,
def simple_block(
block_name: str,
data: Dict[str, Union[str, int, float]],
quote_character: str = '"',
quote_all_strings: bool = False
):
) -> Generator[str, None, None]:
yield f'data_{block_name}\n\n'
quoted_data = {
k: f"{quote_character}{v}{quote_character}"
if isinstance(v, str) and (quote_all_strings or " " in v or v == "")
k: f"{quote_character}{v}{quote_character}"
if isinstance(v, str) and (quote_all_strings or " " in v or v == "")
else v
for k, v
in data.items()
for k, v in data.items()
}
formatted_lines = '\n'.join(
[
f'_{k}\t\t\t{v}'
for k, v
in quoted_data.items()
]
)
with open(file, mode='a') as f:
f.write(f'data_{block_name}\n\n')
f.write(formatted_lines)
f.write('\n\n\n')
for k, v in quoted_data.items():
yield f'_{k}\t\t\t{v}\n'
yield '\n' * 2


def write_loop_block(
file: Path,
def loop_block(
block_name: str,
df: pd.DataFrame,
float_format: str = '%.6f',
separator: str = '\t',
na_rep: str = '<NA>',
quote_character: str = '"',
quote_all_strings: bool = False
):
) -> Generator[str, None, None]:
# write header
header_lines = [
f'_{column_name} #{idx}'
for idx, column_name
in enumerate(df.columns, 1)
]
with open(file, mode='a') as f:
f.write(f'data_{block_name}\n\n')
f.write('loop_\n')
f.write('\n'.join(header_lines))
f.write('\n')

df = df.map(lambda x: f'{quote_character}{x}{quote_character}'
if isinstance(x, str) and (quote_all_strings or " " in x or x == "")
yield f'data_{block_name}\n\n'
yield 'loop_\n'
for line in header_lines:
yield line + '\n'

df = df.map(lambda x: f'{quote_character}{x}{quote_character}'
if isinstance(x, str) and (quote_all_strings or " " in x or x == "")
else x)

# write data
df.to_csv(
path_or_buf=file,
yield df.to_csv(
mode='a',
sep=separator,
header=False,
Expand All @@ -194,4 +180,4 @@ def write_loop_block(
na_rep=na_rep,
quoting=csv.QUOTE_NONE
)
write_blank_lines(file, n=2)
yield '\n' * 2

0 comments on commit 7987ae0

Please sign in to comment.