From 7987ae03257663ea4651bc93c9eebacc338db8df Mon Sep 17 00:00:00 2001 From: James Hooker Date: Tue, 16 Jul 2024 15:39:16 +0100 Subject: [PATCH] Added method `StarWriter.file_contents` Generates strings that are written out by `StarWriter.write`. --- src/starfile/writer.py | 90 ++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 52 deletions(-) diff --git a/src/starfile/writer.py b/src/starfile/writer.py index df78e38..3213209 100644 --- a/src/starfile/writer.py +++ b/src/starfile/writer.py @@ -2,7 +2,7 @@ from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Union, Dict, List +from typing import TYPE_CHECKING, Union, Dict, List, Generator from importlib.metadata import version import csv @@ -60,24 +60,28 @@ def coerce_data_blocks( got {type(data_blocks)}' ) + def file_contents(self) -> Generator[str, None, None]: + yield f'{package_info()}\n' + yield '\n' * 2 + yield from self.data_block_generator() + def write(self): - write_package_info(self.filename) - write_blank_lines(self.filename, n=2) - self.write_data_blocks() + with open(self.filename, mode='w+') as f: + for item in self.file_contents(): + f.write(item) - def write_data_blocks(self): + def data_block_generator(self) -> Generator[str, None, None]: for block_name, block in self.data_blocks.items(): if isinstance(block, dict): - write_simple_block( - file=self.filename, + for line in simple_block( block_name=block_name, data=block, quote_character=self.quote_character, quote_all_strings=self.quote_all_strings - ) + ): + yield line elif isinstance(block, pd.DataFrame): - write_loop_block( - file=self.filename, + for line in loop_block( block_name=block_name, df=block, float_format=self.float_format, @@ -85,7 +89,8 @@ def write_data_blocks(self): na_rep=self.na_rep, quote_character=self.quote_character, quote_all_strings=self.quote_all_strings - ) + ): + yield line def backup_if_file_exists(self): if self.filename.exists(): @@ -117,48 +122,31 @@ def coerce_list(data_blocks: List[DataBlock]) -> Dict[str, DataBlock]: return {f'{idx}': df for idx, df in enumerate(data_blocks)} -def write_blank_lines(file: Path, n: int): - with open(file, mode='a') as f: - f.write('\n' * n) - - -def write_package_info(file: Path): +def package_info(): date = datetime.now().strftime('%d/%m/%Y') time = datetime.now().strftime('%H:%M:%S') - line = f'# Created by the starfile Python package (version {__version__}) at {time} on {date}' - with open(file, mode='w+') as f: - f.write(f'{line}\n') + return f'# Created by the starfile Python package (version {__version__}) at {time} on {date}' -def write_simple_block( - file: Path, +def simple_block( block_name: str, data: Dict[str, Union[str, int, float]], quote_character: str = '"', quote_all_strings: bool = False -): +) -> Generator[str, None, None]: + yield f'data_{block_name}\n\n' quoted_data = { - k: f"{quote_character}{v}{quote_character}" - if isinstance(v, str) and (quote_all_strings or " " in v or v == "") + k: f"{quote_character}{v}{quote_character}" + if isinstance(v, str) and (quote_all_strings or " " in v or v == "") else v - for k, v - in data.items() + for k, v in data.items() } - formatted_lines = '\n'.join( - [ - f'_{k}\t\t\t{v}' - for k, v - in quoted_data.items() - ] - ) - with open(file, mode='a') as f: - f.write(f'data_{block_name}\n\n') - f.write(formatted_lines) - f.write('\n\n\n') + for k, v in quoted_data.items(): + yield f'_{k}\t\t\t{v}\n' + yield '\n' * 2 -def write_loop_block( - file: Path, +def loop_block( block_name: str, df: pd.DataFrame, float_format: str = '%.6f', @@ -166,26 +154,24 @@ def write_loop_block( na_rep: str = '', quote_character: str = '"', quote_all_strings: bool = False -): +) -> Generator[str, None, None]: # write header header_lines = [ f'_{column_name} #{idx}' for idx, column_name in enumerate(df.columns, 1) ] - with open(file, mode='a') as f: - f.write(f'data_{block_name}\n\n') - f.write('loop_\n') - f.write('\n'.join(header_lines)) - f.write('\n') - - df = df.map(lambda x: f'{quote_character}{x}{quote_character}' - if isinstance(x, str) and (quote_all_strings or " " in x or x == "") + yield f'data_{block_name}\n\n' + yield 'loop_\n' + for line in header_lines: + yield line + '\n' + + df = df.map(lambda x: f'{quote_character}{x}{quote_character}' + if isinstance(x, str) and (quote_all_strings or " " in x or x == "") else x) # write data - df.to_csv( - path_or_buf=file, + yield df.to_csv( mode='a', sep=separator, header=False, @@ -194,4 +180,4 @@ def write_loop_block( na_rep=na_rep, quoting=csv.QUOTE_NONE ) - write_blank_lines(file, n=2) + yield '\n' * 2