Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 62 additions & 11 deletions src/hwpx/opc/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import tempfile
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import BinaryIO, Iterable, Iterator, Mapping, MutableMapping
from typing import BinaryIO, Iterable, Iterator, Mapping, MutableMapping, Sequence
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile, ZipInfo

from lxml import etree # type: ignore[reportAttributeAccessIssue]
Expand Down Expand Up @@ -133,11 +133,16 @@ def __init__(
rootfiles: Iterable[RootFile],
version_info: VersionInfo,
mimetype: str,
*,
zip_infos: Mapping[str, ZipInfo] | None = None,
zip_order: Sequence[str] | None = None,
) -> None:
self._files = files
self._rootfiles = list(rootfiles)
self._version = version_info
self._mimetype = mimetype
self._zip_infos = dict(zip_infos or {})
self._zip_order = list(zip_order or files.keys())
self._manifest_tree: etree._Element | None = None
self._spine_cache: list[str] | None = None
self._section_paths_cache: list[str] | None = None
Expand All @@ -156,14 +161,24 @@ def open(cls, pkg_file: str | Path | bytes | bytearray | BinaryIO) -> HwpxPackag
stream = pkg_file

with ZipFile(stream, "r") as zf:
files = {info.filename: zf.read(info.filename) for info in zf.infolist()}
infos = [info for info in zf.infolist() if not info.is_dir()]
files = {info.filename: zf.read(info.filename) for info in infos}
zip_infos = {info.filename: info for info in infos}
zip_order = [info.filename for info in infos]
logger.debug("HWPX 패키지 파일 목록 %d개를 로드했습니다.", len(files))
if cls.MIMETYPE_PATH not in files:
raise HwpxStructureError("HWPX package is missing the mandatory 'mimetype' file.")
mimetype = files[cls.MIMETYPE_PATH].decode("utf-8")
rootfiles = cls._parse_container(files.get(cls.CONTAINER_PATH))
version_info = cls._parse_version(files.get(cls.VERSION_PATH))
package = cls(files, rootfiles, version_info, mimetype)
package = cls(
files,
rootfiles,
version_info,
mimetype,
zip_infos=zip_infos,
zip_order=zip_order,
)
return package

@staticmethod
Expand Down Expand Up @@ -243,6 +258,8 @@ def write(self, path: str, data: bytes | str) -> None:
elif norm_path == self.VERSION_PATH:
pending_version = self._parse_version(data)
self._files[norm_path] = data
if norm_path not in self._zip_order:
self._zip_order.append(norm_path)
if norm_path == self.MIMETYPE_PATH:
self._mimetype = mimetype
elif norm_path == self.CONTAINER_PATH:
Expand All @@ -263,6 +280,8 @@ def delete(self, path: str) -> None:
"Cannot remove mandatory files ('mimetype', 'container.xml', 'version.xml')."
)
del self._files[norm_path]
self._zip_infos.pop(norm_path, None)
self._zip_order = [name for name in self._zip_order if name != norm_path]
self._invalidate_caches(norm_path)
self._validate_structure()

Expand Down Expand Up @@ -553,15 +572,47 @@ def _save_to_zip(self, pkg_file: str | Path | BinaryIO) -> None:

def _write_archive(self, zf: ZipFile) -> None:
self._write_mimetype(zf)
for name in sorted(self._files):
if name == self.MIMETYPE_PATH:
continue
written = {self.MIMETYPE_PATH}
ordered_names = [
name
for name in self._zip_order
if name != self.MIMETYPE_PATH and name in self._files
]
new_names = sorted(
name for name in self._files if name not in written and name not in ordered_names
)
for name in [*ordered_names, *new_names]:
self._write_zip_entry(zf, name, self._files[name], ZIP_DEFLATED)

@staticmethod
def _write_zip_entry(zf: ZipFile, path: str, payload: bytes, compress_type: int) -> None:
info = ZipInfo(path)
info.compress_type = compress_type
written.add(name)

def _zip_info_for_write(self, path: str, compress_type: int) -> ZipInfo:
original = self._zip_infos.get(path)
if original is None:
info = ZipInfo(path)
info.compress_type = compress_type
return info

info = ZipInfo(path, original.date_time)
info.compress_type = original.compress_type
info.comment = original.comment
info.extra = original.extra
info.create_system = original.create_system
info.create_version = original.create_version
info.extract_version = original.extract_version
info.flag_bits = original.flag_bits
info.volume = original.volume
info.internal_attr = original.internal_attr
info.external_attr = original.external_attr
return info

def _write_zip_entry(
self,
zf: ZipFile,
path: str,
payload: bytes,
compress_type: int,
) -> None:
info = self._zip_info_for_write(path, compress_type)
zf.writestr(info, payload)

def _write_mimetype(self, zf: ZipFile) -> None:
Expand Down
16 changes: 16 additions & 0 deletions src/hwpx/oxml/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
parse_track_change_authors,
parse_track_changes,
)
from .namespaces import HWPML_COMPAT_ROOT_NAMESPACES
from .utils import parse_int

ET.register_namespace("hp", "http://www.hancom.co.kr/hwpml/2011/paragraph")
Expand Down Expand Up @@ -104,6 +105,21 @@ def _sanitize_text(value: str) -> str:

def _serialize_xml(element: ET.Element) -> bytes:
"""Return a UTF-8 encoded XML document for *element*."""
xml_bytes = ET.tostring(element, encoding="utf-8", xml_declaration=False)
if element.tag in {_HS + "sec", _HH + "head"}:
root = LET.fromstring(xml_bytes)
wrapped = LET.Element(root.tag, nsmap=HWPML_COMPAT_ROOT_NAMESPACES)
wrapped.attrib.update(root.attrib)
wrapped.text = root.text
wrapped.tail = root.tail
for child in root:
wrapped.append(child)
return LET.tostring(
wrapped,
encoding="UTF-8",
xml_declaration=True,
standalone=True,
)
return ET.tostring(element, encoding="utf-8", xml_declaration=True)


Expand Down
25 changes: 25 additions & 0 deletions src/hwpx/oxml/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,28 @@
HS10_NS = "http://www.hancom.co.kr/hwpml/2016/section"
HC10_NS = "http://www.hancom.co.kr/hwpml/2016/core"
HH10_NS = "http://www.hancom.co.kr/hwpml/2016/head"

OPF_NS = "http://www.idpf.org/2007/opf/"
XML_NS = "http://www.w3.org/XML/1998/namespace"

# Hancom Office emits this broad namespace surface on HWPML document roots and
# may treat declarations that generic XML serializers consider optional as part
# of the package compatibility contract. Keep section/header roots close to the
# shape Hancom writes so read-modify-save roundtrips do not look tampered with.
HWPML_COMPAT_ROOT_NAMESPACES = {
"ha": "http://www.hancom.co.kr/hwpml/2011/app",
"hp": HP_NS,
"hp10": HP10_NS,
"hs": HS_NS,
"hc": HC_NS,
"hh": HH_NS,
"hhs": "http://www.hancom.co.kr/hwpml/2011/history",
"hm": "http://www.hancom.co.kr/hwpml/2011/master-page",
"hpf": "http://www.hancom.co.kr/schema/2011/hpf",
"dc": "http://purl.org/dc/elements/1.1/",
"opf": OPF_NS,
"ooxmlchart": "http://www.hancom.co.kr/hwpml/2016/ooxmlchart",
"hwpunitchar": "http://www.hancom.co.kr/hwpml/2016/HwpUnitChar",
"epub": "http://www.idpf.org/2007/ops",
"config": "urn:oasis:names:tc:opendocument:xmlns:config:1.0",
}
32 changes: 31 additions & 1 deletion src/hwpx/tools/archive_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from lxml import etree # type: ignore[reportAttributeAccessIssue]

from ..opc.relationships import is_header_part_name, is_section_part_name
from ..oxml.namespaces import HWPML_COMPAT_ROOT_NAMESPACES
from .package_validator import validate_package

_XML_SUFFIXES = (".xml", ".hpf")
Expand Down Expand Up @@ -92,6 +94,30 @@ def _format_xml_bytes(payload: bytes) -> bytes:
)


def _normalize_hwpml_compat_root(rel_path: str, payload: bytes) -> bytes:
if not (is_section_part_name(rel_path) or is_header_part_name(rel_path)):
return payload
try:
root = etree.fromstring(payload)
except etree.XMLSyntaxError:
return payload
if not (root.tag.endswith("}sec") or root.tag.endswith("}head")):
return payload

wrapped = etree.Element(root.tag, nsmap=HWPML_COMPAT_ROOT_NAMESPACES)
wrapped.attrib.update(root.attrib)
wrapped.text = root.text
wrapped.tail = root.tail
for child in root:
wrapped.append(child)
return etree.tostring(
wrapped,
encoding="UTF-8",
xml_declaration=True,
standalone=True,
)


def _iter_file_entries(archive: ZipFile) -> tuple[ArchiveEntryInfo, ...]:
entries: list[ArchiveEntryInfo] = []
for info in archive.infolist():
Expand Down Expand Up @@ -261,7 +287,11 @@ def pack_hwpx(
compress_type = compress_types.get(rel_path, ZIP_DEFLATED)
if compress_type != ZIP_STORED:
compress_type = ZIP_DEFLATED
archive.write(root / rel_path, rel_path, compress_type=compress_type)
payload = _normalize_hwpml_compat_root(
rel_path,
(root / rel_path).read_bytes(),
)
archive.writestr(rel_path, payload, compress_type=compress_type)

_summarize_pack_validation(tmp_path)
os.replace(tmp_path, destination)
Expand Down
63 changes: 62 additions & 1 deletion src/hwpx/tools/package_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@

import argparse
import io
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from typing import BinaryIO, Literal, Sequence
from zipfile import ZIP_STORED, BadZipFile, ZipFile

from lxml import etree as LET # type: ignore[reportMissingImports]

from ..oxml.namespaces import HWPML_COMPAT_ROOT_NAMESPACES
from ..opc.relationships import (
MAIN_ROOTFILE_MEDIA_TYPE,
is_header_part_name,
is_section_part_name,
parse_container_rootfiles,
parse_manifest_relationships,
Expand All @@ -23,6 +28,9 @@
HEADER_PATH = "Contents/header.xml"
VERSION_PATH = "version.xml"

_XML_DECLARATION_RE = re.compile(br"^<\?xml\s+([^?]*?)\?>", re.IGNORECASE)
_STANDALONE_YES_RE = re.compile(br"\bstandalone\s*=\s*(['\"])yes\1", re.IGNORECASE)

IssueLevel = Literal["error", "warning"]

__all__ = [
Expand Down Expand Up @@ -83,6 +91,57 @@ def _parse_xml(payload: bytes) -> ET.Element:
raise ValueError(f"malformed XML: {exc}") from exc


def _root_declared_namespaces(payload: bytes) -> dict[str, str]:
try:
root = LET.fromstring(payload)
except LET.XMLSyntaxError:
return {}
return {"" if prefix is None else prefix: uri for prefix, uri in root.nsmap.items() if uri}


def _has_standalone_yes_declaration(payload: bytes) -> bool:
stripped = payload.lstrip()
if stripped.startswith(b"\xef\xbb\xbf"):
stripped = stripped[3:]
match = _XML_DECLARATION_RE.match(stripped)
return bool(match and _STANDALONE_YES_RE.search(match.group(1)))


def _check_hwpml_compat_root(
issues: list[PackageValidationIssue],
part_name: str,
payload: bytes,
root: ET.Element,
) -> None:
if not (
is_section_part_name(part_name)
or is_header_part_name(part_name)
or part_name == HEADER_PATH
):
return
if not (root.tag.endswith("}sec") or root.tag.endswith("}head")):
return
if not _has_standalone_yes_declaration(payload):
_error(
issues,
part_name,
'missing XML declaration with standalone="yes"',
)
declared = _root_declared_namespaces(payload)
missing = [
prefix
for prefix, uri in HWPML_COMPAT_ROOT_NAMESPACES.items()
if declared.get(prefix) != uri
]
if missing:
_error(
issues,
part_name,
"missing Hancom-compatible HWPML root namespace declarations: "
+ ", ".join(missing),
)


def _error(issues: list[PackageValidationIssue], part_name: str, message: str) -> None:
issues.append(PackageValidationIssue(part_name, message, "error"))

Expand Down Expand Up @@ -172,7 +231,9 @@ def validate_package(source: str | Path | bytes | BinaryIO) -> PackageValidation
_error(issues, name, "unable to read entry for XML parsing")
continue
try:
xml_roots[name] = _parse_xml(payload)
root = _parse_xml(payload)
xml_roots[name] = root
_check_hwpml_compat_root(issues, name, payload, root)
except ValueError as exc:
_error(issues, name, str(exc))

Expand Down
Loading
Loading