From 70f1c0fa0b6979b7b6e9da385460c5f18783512b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 20 Feb 2025 14:12:32 -0800 Subject: [PATCH] Avoid missing hashlib.file_digest for Python < 3.11. --- sdks/python/apache_beam/yaml/yaml_provider.py | 14 ++++++- .../yaml/yaml_provider_unit_test.py | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index aa3c5d905150..23a6cc14b200 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -1057,7 +1057,7 @@ def normalize_package(package): # Ignore the exact path by which this package was referenced, # but do create a new environment if it changed. with open(package, 'rb') as fin: - return os.path.basename(package) + '-' + hashlib.file_digest( + return os.path.basename(package) + '-' + _file_digest( fin, 'sha256').hexdigest() else: # Assume urls and pypi identifiers are immutable. @@ -1363,3 +1363,15 @@ def standard_providers(): io_providers(), load_providers( os.path.join(os.path.dirname(__file__), 'standard_providers.yaml'))) + + +def _file_digest(fileobj, digest): + if hasattr(hashlib, 'file_digest'): # Python 3.11+ + return hashlib.file_digest(fileobj, digest) + else: + hasher = hashlib.new(digest) + data = fileobj.read(1 << 20) + while data: + hasher.update(data) + data = fileobj.read(1 << 20) + return hasher diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py index 975ecddbae9d..c7ac25dfe4ed 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py @@ -261,6 +261,48 @@ def test_recursive(self): assert_that(result | beam.Map(lambda x: x.value), equal_to([120])) +class PythonProviderDepsTest(unittest.TestCase): + def test_env_package_sensitive(self): + self.assertNotEqual( + yaml_provider.PypiExpansionService._key('base', ['pkg1']), + yaml_provider.PypiExpansionService._key('base', ['pkg2'])) + + def test_env_base_sensitive(self): + self.assertNotEqual( + yaml_provider.PypiExpansionService._key('base1', ['pkg']), + yaml_provider.PypiExpansionService._key('base2', ['pkg'])) + + def test_env_order_invariant(self): + self.assertEqual( + yaml_provider.PypiExpansionService._key('base', ['pkg1', 'pkg2']), + yaml_provider.PypiExpansionService._key('base', ['pkg2', 'pkg1'])) + + def test_env_path_invariant(self): + with tempfile.TemporaryDirectory() as tmpdir: + os.mkdir(os.path.join(tmpdir, 'a')) + pkgA = os.path.join(tmpdir, 'a', 'pkg.tgz') + os.mkdir(os.path.join(tmpdir, 'b')) + pkgB = os.path.join(tmpdir, 'b', 'pkg.tgz') + with open(pkgA, 'w') as fout: + fout.write('content') + with open(pkgB, 'w') as fout: + fout.write('content') + self.assertEqual( + yaml_provider.PypiExpansionService._key('base', [pkgA]), + yaml_provider.PypiExpansionService._key('base', [pkgB])) + + def test_env_content_sensitive(self): + with tempfile.TemporaryDirectory() as tmpdir: + pkg = os.path.join(tmpdir, 'pkg.tgz') + with open(pkg, 'w') as fout: + fout.write('content') + before = yaml_provider.PypiExpansionService._key('base', [pkg]) + with open(pkg, 'w') as fout: + fout.write('new content') + after = yaml_provider.PypiExpansionService._key('base', [pkg]) + self.assertNotEqual(before, after) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()