diff --git a/.bazeliskrc b/.bazeliskrc new file mode 100644 index 000000000..dbb7b4bcc --- /dev/null +++ b/.bazeliskrc @@ -0,0 +1 @@ +USE_BAZEL_VERSION=6.4.0 \ No newline at end of file diff --git a/.bazelrc b/.bazelrc new file mode 100644 index 000000000..1a6e271a2 --- /dev/null +++ b/.bazelrc @@ -0,0 +1,21 @@ +## Disable remote cache completely when --config=local is passed +build:local --remote_cache= + +# Scala version config flags: +# To build with Scala 2.12, pass "--config scala_2.12" to "bazel build" +# To set a different default Scala version, add the following to +# user.bazelrc: +# common --config scala_2.12 +common:scala_2.12 --repo_env=SCALA_VERSION=2.12.18 +common:scala_2.13 --repo_env=SCALA_VERSION=2.13.12 +# Default scala version to 2.12 +common --repo_env=SCALA_VERSION=2.12.18 +# Default to Spark 3.2 if no value is specified +build --define=spark_version=3.2 +build --javacopt=-Xep:DoubleBraceInitialization:OFF + +# Don't implicitly create __init__.py files +build --incompatible_default_to_explicit_init_py + +# https://github.com/bazelbuild/bazel/issues/2377 +test --spawn_strategy=standalone diff --git a/.circleci/Dockerfile b/.circleci/Dockerfile index 095c62253..8de8be95c 100644 --- a/.circleci/Dockerfile +++ b/.circleci/Dockerfile @@ -34,6 +34,7 @@ RUN apt-get update && apt-get -y -q install \ openjdk-8-jdk \ pkg-config \ sbt \ + bazelisk \ && apt-get clean # Install thrift diff --git a/.circleci/config.yml b/.circleci/config.yml index 87c1c6aed..d67e1ed7e 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -142,14 +142,14 @@ jobs: name: Run Chronon Python tests shell: /bin/bash -leuxo pipefail command: | - conda activate chronon_py - pushd /chronon/api/ - thrift --gen py -out /chronon/api/py/ai/chronon\ - /chronon/api/thrift/api.thrift # Generate thrift files - cd /chronon/api/py # Go to Python module - pip install -r requirements/dev.txt # Install latest requirements - tox # Run tests - popd + conda activate chronon_py + pushd /chronon/api/ + thrift --gen py -out /chronon/api/py/ai/chronon\ + /chronon/api/thrift/api.thrift # Generate thrift files + cd /chronon/ # Go to Python module + pip install -r api/py/requirements/dev.txt # Install latest requirements + tox # Run tests + popd - store_artifacts: path: /chronon/api/py/htmlcov diff --git a/.gitignore b/.gitignore index 9f5f29e46..34aff3b94 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,6 @@ mvn_settings.xml # Release folder releases +# bazel +/bazel-* +/user.bazelrc diff --git a/.ijwb/.bazelproject b/.ijwb/.bazelproject new file mode 100644 index 000000000..d87cdc236 --- /dev/null +++ b/.ijwb/.bazelproject @@ -0,0 +1,16 @@ +directories: + # Add the directories you want added as source here + # By default, we've added your entire workspace ('.') + . + +# Automatically includes all relevant targets under the 'directories' above +derive_targets_from_directories: true + +targets: + # If source code isn't resolving, add additional targets that compile it here + +additional_languages: + # Uncomment any additional languages you want supported + python + scala + java diff --git a/AUTHORS b/AUTHORS index 31dbe1ba2..9c6c687b3 100644 --- a/AUTHORS +++ b/AUTHORS @@ -17,3 +17,4 @@ Daniel Kristjansson (Stripe) Piyush Narang (Stripe) Caio Camatta (Stripe) Divya Manohar (Stripe) +Krish Narukulla (Roku) \ No newline at end of file diff --git a/BUILD.bazel b/BUILD.bazel new file mode 100644 index 000000000..34e1d2ddc --- /dev/null +++ b/BUILD.bazel @@ -0,0 +1,17 @@ +package(default_visibility = ["//visibility:public"]) + +load("@rules_python//python:pip.bzl", "compile_pip_requirements") + +# To update py3_requirements_lock.txt, run: +compile_pip_requirements( + name = "pip", + extra_args = [ + "--allow-unsafe", + "--resolver=backtracking", + ], + requirements_in = "//:requirements.txt", + requirements_txt = "//:requirements_lock.txt", + # force this to mac-only, since that's where we expect it to run. + # remove + adopt requirements_[platform] arguments when we're on rules_python>=0.10.0 + tags = ["manual"], +) diff --git a/WORKSPACE b/WORKSPACE new file mode 100644 index 000000000..7405af785 --- /dev/null +++ b/WORKSPACE @@ -0,0 +1,134 @@ +workspace(name = "chronon") + +load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive") + +http_archive( + name = "bazel_skylib", + sha256 = "b8a1527901774180afc798aeb28c4634bdccf19c4d98e7bdd1ce79d1fe9aaad7", + urls = [ + "https://mirror.bazel.build/github.com/bazelbuild/bazel-skylib/releases/download/1.4.1/bazel-skylib-1.4.1.tar.gz", + "https://github.com/bazelbuild/bazel-skylib/releases/download/1.4.1/bazel-skylib-1.4.1.tar.gz", + ], +) + +http_archive( + name = "rules_python", + sha256 = "ca77768989a7f311186a29747e3e95c936a41dffac779aff6b443db22290d913", + strip_prefix = "rules_python-0.36.0", + url = "https://github.com/bazelbuild/rules_python/releases/download/0.36.0/rules_python-0.36.0.tar.gz", +) + +load("@rules_python//python:repositories.bzl", "py_repositories") + +py_repositories() + +load("@rules_python//python:repositories.bzl", "python_register_toolchains") + +python_register_toolchains( + name = "python_3_8", + # Available versions are listed in @rules_python//python:versions.bzl. + # We recommend using the same version your team is already standardized on. + python_version = "3.8", +) + +load("@rules_python//python:pip.bzl", "pip_parse") +load("@python_3_8//:defs.bzl", "interpreter") + +pip_parse( + name = "pypi", + python_interpreter_target = interpreter, + requirements_lock = "//:requirements_lock.txt", +) + +load("@pypi//:requirements.bzl", "install_deps") + +install_deps() + +# Remove all the remote_java_tools above when upgrading to rules_java 7.5.0 or greater +http_archive( + name = "rules_java", + sha256 = "e81e9deaae0d9d99ef3dd5f6c1b32338447fe16d5564155531ea4eb7ef38854b", + urls = [ + "https://github.com/bazelbuild/rules_java/releases/download/7.0.6/rules_java-7.0.6.tar.gz", + ], +) + +load("@rules_java//java:repositories.bzl", "rules_java_dependencies", "rules_java_toolchains") + +rules_java_dependencies() + +rules_java_toolchains() + +load("@rules_java//java:repositories.bzl", "remote_jdk8_repos") + +remote_jdk8_repos() + +############################# +# Protobuf # +############################# +http_archive( + name = "rules_proto", + sha256 = "dc3fb206a2cb3441b485eb1e423165b231235a1ea9b031b4433cf7bc1fa460dd", + strip_prefix = "rules_proto-5.3.0-21.7", + urls = [ + "https://github.com/bazelbuild/rules_proto/archive/refs/tags/5.3.0-21.7.tar.gz", + ], +) + +load("@rules_proto//proto:repositories.bzl", "rules_proto_dependencies", "rules_proto_toolchains") + +rules_proto_dependencies() + +rules_proto_toolchains() + +# Maven rules +RULES_JVM_EXTERNAL_TAG = "4.5" + +RULES_JVM_EXTERNAL_SHA = "b17d7388feb9bfa7f2fa09031b32707df529f26c91ab9e5d909eb1676badd9a6" + +http_archive( + name = "rules_jvm_external", + sha256 = RULES_JVM_EXTERNAL_SHA, + strip_prefix = "rules_jvm_external-%s" % RULES_JVM_EXTERNAL_TAG, + url = "https://github.com/bazelbuild/rules_jvm_external/archive/%s.zip" % RULES_JVM_EXTERNAL_TAG, +) + +load("@rules_jvm_external//:repositories.bzl", "rules_jvm_external_deps") + +rules_jvm_external_deps() + +load("@rules_jvm_external//:setup.bzl", "rules_jvm_external_setup") + +rules_jvm_external_setup() + +## Scala support + +# See https://github.com/bazelbuild/rules_scala/releases for up to date version information. +http_archive( + name = "io_bazel_rules_scala", + sha256 = "e734eef95cf26c0171566bdc24d83bd82bdaf8ca7873bec6ce9b0d524bdaf05d", + strip_prefix = "rules_scala-6.6.0", + url = "https://github.com/bazelbuild/rules_scala/releases/download/v6.6.0/rules_scala-v6.6.0.tar.gz", +) + +load("@io_bazel_rules_scala//:scala_config.bzl", "scala_config") + +scala_config(scala_version = "2.12.12") + +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_repositories") + +scala_repositories() + +load("@io_bazel_rules_scala//scala:toolchains.bzl", "scala_register_toolchains") + +scala_register_toolchains() + +load("@io_bazel_rules_scala//testing:scalatest.bzl", "scalatest_repositories", "scalatest_toolchain") + +scalatest_repositories() + +scalatest_toolchain() + +load("//jvm:mirrors.bzl", "load_deps") + +load_deps() diff --git a/aggregator/BUILD.bazel b/aggregator/BUILD.bazel new file mode 100644 index 000000000..facc9ccbc --- /dev/null +++ b/aggregator/BUILD.bazel @@ -0,0 +1,87 @@ +load("@io_bazel_rules_scala//scala:scala_cross_version_select.bzl", "select_for_scala_version") + +scala_library( + name = "aggregator", + srcs = glob(["src/main/scala/ai/chronon/aggregator/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + "//api:api-lib", + "//api:api-models", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("com.yahoo.datasketches:sketches-core"), + maven_artifact("com.yahoo.datasketches:memory"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + ] + select_for_scala_version( + before_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + scala_artifact("com.fasterxml.jackson.module:jackson-module-scala"), + ], + since_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-parallel-collections"), + ], + ), +) + +scala_library( + name = "test-lib", + srcs = glob(["src/test/scala/ai/chronon/aggregator/test/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":aggregator", + "//api:api-lib", + "//api:api-models", + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("org.apache.commons:commons-math3"), + maven_artifact("com.yahoo.datasketches:sketches-core"), + maven_artifact("com.yahoo.datasketches:memory"), + ] + select_for_scala_version( + before_2_12 = [], + ) + select_for_scala_version(before_2_13 = [ + maven_artifact("org.scala-lang.modules:scala-collection-compat_2.12"), + maven_artifact("com.fasterxml.jackson.module:jackson-module-scala_2.12"), + ]) + + select_for_scala_version(after_2_12 = [ + maven_artifact("org.scala-lang.modules:scala-parallel-collections_2.13"), + ]), +) + +scala_test_suite( + name = "test", + srcs = glob(["src/test/scala/ai/chronon/aggregator/test/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":aggregator", + ":test-lib", + "//api:api-lib", + "//api:api-models", + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("com.yahoo.datasketches:sketches-core"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("com.yahoo.datasketches:memory"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("org.apache.commons:commons-math3"), + ] + select_for_scala_version( + before_2_12 = [], + ) + select_for_scala_version(before_2_13 = [ + maven_artifact("org.scala-lang.modules:scala-collection-compat_2.12"), + maven_artifact("com.fasterxml.jackson.module:jackson-module-scala_2.12"), + ]) + + select_for_scala_version(after_2_12 = [ + maven_artifact("org.scala-lang.modules:scala-parallel-collections_2.13"), + ]), +) diff --git a/api/BUILD.bazel b/api/BUILD.bazel new file mode 100644 index 000000000..c1cce16dc --- /dev/null +++ b/api/BUILD.bazel @@ -0,0 +1,68 @@ +load("@io_bazel_rules_scala//scala:scala_cross_version_select.bzl", "select_for_scala_version") + +scala_library( + name = "api-lib", + srcs = glob(["src/main/scala/ai/chronon/api/*.scala"]) + + select_for_scala_version( + between_2_12_and_2_13 = [ + "//api/src/main/scala-2.12/scala/util:ScalaVersionSpecificCollectionsConverter.scala", + ], + since_2_13 = [ + "//api/src/main/scala-2.13/scala/util:ScalaVersionSpecificCollectionsConverter.scala", + ], + ), + visibility = ["//visibility:public"], + deps = [ + ":api-models", + "//third_party/java/spark:spark-exec", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + ] + select_for_scala_version( + before_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], + since_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-parallel-collections"), + ], + ), +) + +scala_test_suite( + name = "test", + srcs = glob(["src/test/scala/ai/chronon/api/test/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":api-models", + ":api-lib", + "//third_party/java/spark:spark-exec", + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + scala_artifact( + "org.scala-lang.modules:scala-collection-compat", + ), + ] + select_for_scala_version( + since_2_13 = [ + maven_artifact("org.scala-lang.modules:scala-parallel-collections_2.13"), + ], + ), +) + +java_library( + name = "api-models", + srcs = ["//api/thrift:api-models-java"], + visibility = ["//visibility:public"], + deps = [ + maven_artifact("javax.annotation:javax.annotation.api"), + maven_artifact("org.apache.thrift:libthrift"), + ], +) diff --git a/api/py/BUILD.bazel b/api/py/BUILD.bazel new file mode 100644 index 000000000..ebd0dc35b --- /dev/null +++ b/api/py/BUILD.bazel @@ -0,0 +1,48 @@ +load("@rules_python//python:py_test.bzl", "py_test") +load("@pypi//:requirements.bzl", "requirement") +load("//tools/build_rules/python:pytest_suite.bzl", "pytest_suite") +load("@rules_python//python:packaging.bzl", "py_wheel") + +py_library( + name = "api_py", + srcs = glob(["ai/chronon/**/*.py"]), # All Python files in your_module + imports = ["."], + visibility = ["//visibility:public"], + deps = [ + "//api/thrift:api-models-py", + requirement("thrift"), + ], +) + +## Create a Python wheel from your package +py_wheel( + name = "chronon_ai", + distribution = "chronon_ai", + version = "0.0.1", + visibility = ["//visibility:public"], + deps = [ + ":api_py", + ], +) + +pytest_suite( + name = "api_test", + srcs = glob( + ["test/**/*.py"], + exclude = ["test/sample/**/*"], + ), + data = glob(["test/sample/**/*"]), + env = { + "CHRONON_ROOT": "api/py/", + }, + imports = [ + ".", + "test/sample", + ], + deps = [ + "//api/py:api_py", + "//api/thrift:api-models-py", + requirement("thrift"), + requirement("click"), + ], +) diff --git a/api/py/ai/chronon/__init__.py b/api/py/ai/chronon/__init__.py deleted file mode 100644 index 32386973e..000000000 --- a/api/py/ai/chronon/__init__.py +++ /dev/null @@ -1,47 +0,0 @@ - -# Copyright (C) 2023 The Chronon Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import ai.chronon.api.ttypes as ttypes -import inspect -import json - - -# Takes in an conf object class like GroupBy, Join and StagingQuery -# And returns a function that dispatches the arguments correctly to the object class and inner metadata -# Remaining args will end up in object.metaData.customJson -def _metadata_shim(conf_class): - constructor_params = list(inspect.signature(conf_class.__init__).parameters.keys()) - assert constructor_params[0] == "self", "First param should be 'self', found {}".format( - constructor_params[0]) - assert constructor_params[1] == "metaData", "Second param should be 'metaData', found {}".format( - constructor_params[1]) - outer_params = constructor_params[2:] - metadata_params = list(inspect.signature(ttypes.MetaData.__init__).parameters.keys())[1:] - intersected_params = set(outer_params) & set(metadata_params) - unioned_params = set(outer_params) | set(metadata_params) - err_msg = "Cannot shim {}, because params: {} are intersecting with MetaData's params".format( - conf_class, intersected_params) - assert len(intersected_params) == 0, err_msg - - def shimmed_func(**kwargs): - meta_kwargs = {key: value for key, value in kwargs.items() if key in metadata_params} - outer_kwargs = {key: value for key, value in kwargs.items() if key in outer_params} - custom_json_args = {key: value for key, value in kwargs.items() if key not in unioned_params} - meta = ttypes.MetaData(customJson=json.dumps(custom_json_args), **meta_kwargs) - return conf_class(metaData=meta, **outer_kwargs) - return shimmed_func - - -StagingQuery = _metadata_shim(ttypes.StagingQuery) diff --git a/api/py/ai/chronon/utils.py b/api/py/ai/chronon/utils.py index bd12a3726..38d722787 100644 --- a/api/py/ai/chronon/utils.py +++ b/api/py/ai/chronon/utils.py @@ -31,7 +31,7 @@ ChrononJobTypes = Union[api.GroupBy, api.Join, api.StagingQuery] -chronon_root_path = "" # passed from compile.py +chronon_root_path = os.getenv("CHRONON_ROOT", "") # passed from compile.py @dataclass @@ -83,15 +83,21 @@ def __init__(self): self.old_name = "old.json" def diff(self, new_json_str: object, old_json_str: object, skipped_keys=[]) -> str: - new_json = {k: v for k, v in json.loads(new_json_str).items() if k not in skipped_keys} - old_json = {k: v for k, v in json.loads(old_json_str).items() if k not in skipped_keys} + new_json = { + k: v for k, v in json.loads(new_json_str).items() if k not in skipped_keys + } + old_json = { + k: v for k, v in json.loads(old_json_str).items() if k not in skipped_keys + } with open(os.path.join(self.temp_dir, self.old_name), mode="w") as old, open( os.path.join(self.temp_dir, self.new_name), mode="w" ) as new: old.write(json.dumps(old_json, sort_keys=True, indent=2)) new.write(json.dumps(new_json, sort_keys=True, indent=2)) - diff_str = subprocess.run(["diff", old.name, new.name], stdout=subprocess.PIPE).stdout.decode("utf-8") + diff_str = subprocess.run( + ["diff", old.name, new.name], stdout=subprocess.PIPE + ).stdout.decode("utf-8") return diff_str def clean(self): @@ -186,7 +192,9 @@ def get_mod_name_from_gc(obj, mod_prefix): def __set_name(obj, cls, mod_prefix): - module = importlib.import_module(get_mod_name_from_gc(obj, mod_prefix)) + module_name = get_mod_name_from_gc(obj, mod_prefix) + assert module_name, f"Couldn't find module name for object:\n{obj}\n" + module = importlib.import_module(module_name) eo.import_module_set_name(module, cls) @@ -208,7 +216,11 @@ def dict_to_bash_commands(d): return "" bash_commands = [] for key, value in d.items(): - cmd = f"--{key.replace('_', '-')}={value}" if value else f"--{key.replace('_', '-')}" + cmd = ( + f"--{key.replace('_', '-')}={value}" + if value + else f"--{key.replace('_', '-')}" + ) bash_commands.append(cmd) return " ".join(bash_commands) @@ -234,11 +246,17 @@ def output_table_name(obj, full_name: bool): def join_part_name(jp): if jp.groupBy is None: - raise NotImplementedError("Join Part names for non group bys is not implemented.") + raise NotImplementedError( + "Join Part names for non group bys is not implemented." + ) if not jp.groupBy.metaData.name and isinstance(jp.groupBy, api.GroupBy): __set_name(jp.groupBy, api.GroupBy, "group_bys") return "_".join( - [component for component in [jp.prefix, sanitize(jp.groupBy.metaData.name)] if component is not None] + [ + component + for component in [jp.prefix, sanitize(jp.groupBy.metaData.name)] + if component is not None + ] ) @@ -277,7 +295,9 @@ def log_table_name(obj, full_name: bool = False): return output_table_name(obj, full_name=full_name) + "_logged" -def get_staging_query_output_table_name(staging_query: api.StagingQuery, full_name: bool = False): +def get_staging_query_output_table_name( + staging_query: api.StagingQuery, full_name: bool = False +): """generate output table name for staging query job""" __set_name(staging_query, api.StagingQuery, "staging_queries") return output_table_name(staging_query, full_name=full_name) @@ -290,7 +310,9 @@ def get_join_output_table_name(join: api.Join, full_name: bool = False): # set output namespace if not join.metaData.outputNamespace: team_name = join.metaData.name.split(".")[0] - namespace = teams.get_team_conf(os.path.join(chronon_root_path, TEAMS_FILE_PATH), team_name, "namespace") + namespace = teams.get_team_conf( + os.path.join(chronon_root_path, TEAMS_FILE_PATH), team_name, "namespace" + ) join.metaData.outputNamespace = namespace return output_table_name(join, full_name=full_name) @@ -307,7 +329,10 @@ def get_dependencies( if meta_data is not None: result = [json.loads(dep) for dep in meta_data.dependencies] elif dependencies: - result = [{"name": wait_for_name(dep), "spec": dep, "start": start, "end": end} for dep in dependencies] + result = [ + {"name": wait_for_name(dep), "spec": dep, "start": start, "end": end} + for dep in dependencies + ] else: if src.entities and src.entities.mutationTable: # Opting to use no lag for all use cases because that the "safe catch-all" case when @@ -317,15 +342,23 @@ def get_dependencies( filter( None, [ - wait_for_simple_schema(src.entities.snapshotTable, lag, start, end), - wait_for_simple_schema(src.entities.mutationTable, lag, start, end), + wait_for_simple_schema( + src.entities.snapshotTable, lag, start, end + ), + wait_for_simple_schema( + src.entities.mutationTable, lag, start, end + ), ], ) ) elif src.entities: - result = [wait_for_simple_schema(src.entities.snapshotTable, lag, start, end)] + result = [ + wait_for_simple_schema(src.entities.snapshotTable, lag, start, end) + ] elif src.joinSource: - parentJoinOutputTable = get_join_output_table_name(src.joinSource.join, True) + parentJoinOutputTable = get_join_output_table_name( + src.joinSource.join, True + ) result = [wait_for_simple_schema(parentJoinOutputTable, lag, start, end)] else: result = [wait_for_simple_schema(src.events.table, lag, start, end)] @@ -339,17 +372,31 @@ def get_bootstrap_dependencies(bootstrap_parts) -> List[str]: dependencies = [] for bootstrap_part in bootstrap_parts: table = bootstrap_part.table - start = bootstrap_part.query.startPartition if bootstrap_part.query is not None else None - end = bootstrap_part.query.endPartition if bootstrap_part.query is not None else None + start = ( + bootstrap_part.query.startPartition + if bootstrap_part.query is not None + else None + ) + end = ( + bootstrap_part.query.endPartition + if bootstrap_part.query is not None + else None + ) dependencies.append(wait_for_simple_schema(table, 0, start, end)) return [json.dumps(dep) for dep in dependencies] def get_label_table_dependencies(label_part) -> List[str]: - label_info = [(label.groupBy.sources, label.groupBy.metaData) for label in label_part.labels] - label_info = [(source, meta_data) for (sources, meta_data) in label_info for source in sources] + label_info = [ + (label.groupBy.sources, label.groupBy.metaData) for label in label_part.labels + ] + label_info = [ + (source, meta_data) for (sources, meta_data) in label_info for source in sources + ] label_dependencies = [ - dep for (source, meta_data) in label_info for dep in get_dependencies(src=source, meta_data=meta_data) + dep + for (source, meta_data) in label_info + for dep in get_dependencies(src=source, meta_data=meta_data) ] label_dependencies.append( json.dumps( @@ -369,7 +416,9 @@ def wait_for_simple_schema(table, lag, start, end): clean_name = table_tokens[0] subpartition_spec = "/".join(table_tokens[1:]) if len(table_tokens) > 1 else "" return { - "name": "wait_for_{}_ds{}".format(clean_name, "" if lag == 0 else f"_minus_{lag}"), + "name": "wait_for_{}_ds{}".format( + clean_name, "" if lag == 0 else f"_minus_{lag}" + ), "spec": "{}/ds={}{}".format( clean_name, "{{ ds }}" if lag == 0 else "{{{{ macros.ds_add(ds, -{}) }}}}".format(lag), @@ -395,7 +444,8 @@ def dedupe_in_order(seq): def has_topic(group_by: api.GroupBy) -> bool: """Find if there's topic or mutationTopic for a source helps define streaming tasks""" return any( - (source.entities and source.entities.mutationTopic) or (source.events and source.events.topic) + (source.entities and source.entities.mutationTopic) + or (source.events and source.events.topic) for source in group_by.sources ) @@ -450,13 +500,19 @@ def get_modes_tables(conf: ChrononJobTypes) -> Dict[str, List[str]]: if requires_log_flattening_task(join): tables[Modes.log_flattener] = [f"{table_name}_logged"] if join.labelPart is not None: - tables[Modes.label_join] = [f"{table_name}_labels", f"{table_name}_labeled", f"{table_name}_labeled_latest"] + tables[Modes.label_join] = [ + f"{table_name}_labels", + f"{table_name}_labeled", + f"{table_name}_labeled_latest", + ] if conf.bootstrapParts or get_offline_schedule(conf) is not None: tables[SubStage.bootstrap] = [f"{table_name}_bootstrap"] if conf.joinParts: - tables[SubStage.join_parts] = [join_part_output_table_name(conf, jp) for jp in conf.joinParts] + tables[SubStage.join_parts] = [ + join_part_output_table_name(conf, jp) for jp in conf.joinParts + ] # from StagingQuery elif isinstance(conf, api.StagingQuery): tables[Modes.backfill] = [f"{table_name}"] @@ -469,7 +525,11 @@ def get_modes_tables(conf: ChrononJobTypes) -> Dict[str, List[str]]: def get_applicable_modes(conf: ChrononJobTypes) -> List[str]: """Based on a conf and mode determine if a conf should define a task.""" table_map = get_modes_tables(conf) - modes = [x for x in table_map.keys() if x not in [getattr(SubStage, field.name) for field in fields(SubStage)]] + modes = [ + x + for x in table_map.keys() + if x not in [getattr(SubStage, field.name) for field in fields(SubStage)] + ] return modes diff --git a/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py b/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py index 5bffce487..30eb707bd 100644 --- a/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py +++ b/api/py/test/sample/group_bys/sample_team/sample_chaining_group_by.py @@ -16,14 +16,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from sources import test_sources -from group_bys.sample_team import ( - event_sample_group_by, - entity_sample_group_by_from_module, - group_by_with_kwargs, -) +from joins.sample_team.sample_chaining_parent import parent_join -from ai.chronon.join import Join, JoinPart from ai.chronon.group_by import ( GroupBy, Aggregation, @@ -36,33 +30,20 @@ select, ) -parent_join = Join( - left=test_sources.event_source, - right_parts=[ - JoinPart( - group_by=event_sample_group_by.v1, - key_mapping={'subject': 'group_by_subject'}, - ), - JoinPart( - group_by=entity_sample_group_by_from_module.v1, - key_mapping={'subject': 'group_by_subject'}, - ), - ], - online=True, - check_consistency=True -) - chaining_group_by_v1 = GroupBy( - sources=ttypes.Source(joinSource=ttypes.JoinSource( - join=parent_join, - query=Query( - selects=select( - event="event_expr", - group_by_subject="group_by_expr", + sources=ttypes.Source( + joinSource=ttypes.JoinSource( + join=parent_join, + query=Query( + selects=select( + event="event_expr", + group_by_subject="group_by_expr", + ), + start_partition="2023-04-15", + time_column="ts", ), - start_partition="2023-04-15", - time_column="ts", - ))), + ) + ), keys=["user_id"], aggregations=[ Aggregation(input_column="event", operation=Operation.LAST), @@ -72,7 +53,7 @@ production=True, table_properties={ "sample_config_json": """{"sample_key": "sample_value"}""", - "description": "sample description" + "description": "sample description", }, output_namespace="test_namespace", ) diff --git a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 index 942d3f0b1..4ee217ad9 100644 --- a/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 +++ b/api/py/test/sample/production/group_bys/sample_team/sample_group_by_group_by.v1 @@ -4,7 +4,7 @@ "production": 0, "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", "dependencies": [ - "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" + "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" ], "tableProperties": { "sample_config_json": "{\"sample_key\": \"sample_value\"}", @@ -17,7 +17,7 @@ "sources": [ { "events": { - "table": "sample_namespace.sample_team_sample_group_by_group_by_require_backfill", + "table": "sample_namespace.sample_team_sample_group_by_require_backfill", "query": { "selects": { "event": "event_expr", diff --git a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by index 51df118a8..0f0fe60b6 100644 --- a/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by +++ b/api/py/test/sample/production/joins/sample_team/sample_join.group_by_of_group_by @@ -6,7 +6,7 @@ "customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}", "dependencies": [ "{\"name\": \"wait_for_sample_namespace.sample_team_sample_staging_query_v1_ds\", \"spec\": \"sample_namespace.sample_team_sample_staging_query_v1/ds={{ ds }}\", \"start\": \"2021-03-01\", \"end\": null}", - "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" + "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" ], "tableProperties": { "source": "chronon" @@ -39,7 +39,7 @@ "production": 0, "customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}", "dependencies": [ - "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" + "{\"name\": \"wait_for_sample_namespace.sample_team_sample_group_by_require_backfill_ds\", \"spec\": \"sample_namespace.sample_team_sample_group_by_require_backfill/ds={{ ds }}\", \"start\": \"2021-04-09\", \"end\": null}" ], "tableProperties": { "sample_config_json": "{\"sample_key\": \"sample_value\"}", @@ -52,7 +52,7 @@ "sources": [ { "events": { - "table": "sample_namespace.sample_team_sample_group_by_group_by_require_backfill", + "table": "sample_namespace.sample_team_sample_group_by_require_backfill", "query": { "selects": { "event": "event_expr", diff --git a/api/py/test/test_compile.py b/api/py/test/test_compile.py index c3a6da393..d563e7a1d 100644 --- a/api/py/test/test_compile.py +++ b/api/py/test/test_compile.py @@ -37,7 +37,7 @@ def _get_full_file_path(relative_path): def _invoke_cli_with_params(runner, input_path, flags=None): """Invoke the CLI command with consistent options and specified input_path.""" command = [ - "--chronon_root=test/sample", + "--chronon_root=api/py/test/sample", f"--input_path={input_path}", "--debug", ] @@ -88,12 +88,12 @@ def specific_setup(): def test_basic_compile(): runner = CliRunner() - result = runner.invoke(extract_and_convert, ["--chronon_root=test/sample", "--input_path=joins/sample_team/"]) + result = runner.invoke(extract_and_convert, ["--chronon_root=api/py/test/sample", "--input_path=joins/sample_team/"]) assert result.exit_code == 0 - result = runner.invoke(extract_and_convert, ["--chronon_root=test/sample", "--input_path=joins/sample_team"]) + result = runner.invoke(extract_and_convert, ["--chronon_root=api/py/test/sample", "--input_path=joins/sample_team"]) assert result.exit_code == 0 result = runner.invoke( - extract_and_convert, ["--chronon_root=test/sample", "--input_path=joins/sample_team/sample_join.py"] + extract_and_convert, ["--chronon_root=api/py/test/sample", "--input_path=joins/sample_team/sample_join.py"] ) assert result.exit_code == 0 @@ -104,7 +104,7 @@ def test_compile_group_by_deprecation(): result = runner.invoke( extract_and_convert, [ - "--chronon_root=test/sample", + "--chronon_root=api/py/test/sample", "--input_path=group_bys/sample_team/sample_deprecation_group_by.py", "--force-overwrite", ], @@ -121,7 +121,7 @@ def test_compile_join_deprecation(): result = runner.invoke( extract_and_convert, [ - "--chronon_root=test/sample", + "--chronon_root=api/py/test/sample", "--input_path=joins/sample_team/sample_deprecation_join.py", "--force-overwrite", ], @@ -135,7 +135,7 @@ def test_compile_join_deprecation(): def test_debug_compile(): runner = CliRunner() result = runner.invoke( - extract_and_convert, ["--chronon_root=test/sample", "--input_path=joins/sample_team/", "--debug"] + extract_and_convert, ["--chronon_root=api/py/test/sample", "--input_path=joins/sample_team/", "--debug"] ) assert result.exit_code == 0 @@ -234,7 +234,7 @@ def test_detected_dependent_joins_materialized(): runner = CliRunner() result = _invoke_cli_with_params(runner, "group_bys/sample_team/event_sample_group_by.py", ["--force-overwrite"]) assert result.exit_code == 0 - expected_message = "Successfully wrote 8 Join objects to test/sample/production".strip().lower() + expected_message = "Successfully wrote 8 Join objects to api/py/test/sample/production".strip().lower() actual_message = str(result.output).strip().lower() assert expected_message in actual_message, f"Got a different message than expected {actual_message}" @@ -266,7 +266,7 @@ def test_detected_dependent_group_bys_materialized(): runner = CliRunner() result = _invoke_cli_with_params(runner, "joins/unit_test/sample_parent_join.py", ["--force-overwrite"]) assert result.exit_code == 0 - expected_message = "Successfully wrote 2 GroupBy objects to test/sample/production".strip().lower() + expected_message = "Successfully wrote 2 GroupBy objects to api/py/test/sample/production".strip().lower() actual_message = str(result.output).strip().lower() assert expected_message in actual_message, f"Got a different message than expected {actual_message}" @@ -280,7 +280,7 @@ def test_detected_dependent_nested_joins(): runner, "group_bys/unit_test/user/sample_nested_group_by.py", ["--force-overwrite"] ) assert result.exit_code == 0 - expected_message = "Successfully wrote 1 Join objects to test/sample/production".strip().lower() + expected_message = "Successfully wrote 1 Join objects to api/py/test/sample/production".strip().lower() actual_message = str(result.output).strip().lower() assert expected_message in actual_message, f"Got a different message than expected {actual_message}" @@ -341,7 +341,7 @@ def test_table_display_staging_query(): result = runner.invoke( extract_and_convert, [ - "--chronon_root=test/sample", + "--chronon_root=api/py/test/sample", f"--input_path={input_path}", "--table-display", ], @@ -362,7 +362,7 @@ def test_compile_dependency_staging_query(): input_path = f"staging_queries/sample_team/sample_staging_query.py" result = runner.invoke( extract_and_convert, - ["--chronon_root=test/sample", f"--input_path={input_path}"], + ["--chronon_root=api/py/test/sample", f"--input_path={input_path}"], ) assert result.exit_code == 0 diff --git a/api/py/test/test_dependency_tracker.py b/api/py/test/test_dependency_tracker.py index a5a87528b..16eff800a 100644 --- a/api/py/test/test_dependency_tracker.py +++ b/api/py/test/test_dependency_tracker.py @@ -22,7 +22,7 @@ @pytest.fixture def test_dependency_tracker(): - return dependency_tracker.ChrononEntityDependencyTracker(chronon_root_path="test/sample/production") + return dependency_tracker.ChrononEntityDependencyTracker(chronon_root_path="api/py/test/sample/production") def test_get_group_by_dependency_names(test_dependency_tracker): diff --git a/api/py/test/test_utils.py b/api/py/test/test_utils.py index cb6975a3b..6b6e36c27 100644 --- a/api/py/test/test_utils.py +++ b/api/py/test/test_utils.py @@ -164,7 +164,7 @@ def test_get_applicable_mode_for_joins( def test_get_related_table_names_for_group_bys(group_by_requiring_backfill, online_group_by_requiring_streaming): - with open("test/sample/production/group_bys/sample_team/entity_sample_group_by_from_module.v1") as conf_file: + with open("api/py/test/sample/production/group_bys/sample_team/entity_sample_group_by_from_module.v1") as conf_file: json = conf_file.read() group_by = json2thrift(json, api.GroupBy) tables = utils.get_related_table_names(group_by) @@ -172,7 +172,7 @@ def test_get_related_table_names_for_group_bys(group_by_requiring_backfill, onli def test_get_related_table_names_for_group_bys(): - with open("test/sample/production/group_bys/sample_team/entity_sample_group_by_from_module.v1") as conf_file: + with open("api/py/test/sample/production/group_bys/sample_team/entity_sample_group_by_from_module.v1") as conf_file: json = conf_file.read() group_by = json2thrift(json, api.GroupBy) tables = utils.get_related_table_names(group_by) @@ -188,7 +188,7 @@ def test_get_related_table_names_for_group_bys(): def test_get_related_table_names_for_simple_joins(): - with open("test/sample/production/joins/sample_team/sample_join.v1") as conf_file: + with open("api/py/test/sample/production/joins/sample_team/sample_join.v1") as conf_file: json = conf_file.read() join = json2thrift(json, api.Join) tables = utils.get_related_table_names(join) @@ -206,7 +206,7 @@ def test_get_related_table_names_for_simple_joins(): def test_get_related_table_names_for_label_joins(): - with open("test/sample/production/joins/sample_team/sample_label_join.v1") as conf_file: + with open("api/py/test/sample/production/joins/sample_team/sample_label_join.v1") as conf_file: json = conf_file.read() join = json2thrift(json, api.Join) tables = utils.get_related_table_names(join) @@ -226,7 +226,7 @@ def test_get_related_table_names_for_label_joins(): def test_get_related_table_names_for_consistency_joins(): - with open("test/sample/production/joins/sample_team/sample_join.consistency_check") as conf_file: + with open("api/py/test/sample/production/joins/sample_team/sample_join.consistency_check") as conf_file: json = conf_file.read() join = json2thrift(json, api.Join) tables = utils.get_related_table_names(join) @@ -245,7 +245,7 @@ def test_get_related_table_names_for_consistency_joins(): def test_get_related_table_names_for_bootstrap_joins(): - with open("test/sample/production/joins/sample_team/sample_join_bootstrap.v1") as conf_file: + with open("api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1") as conf_file: json = conf_file.read() join = json2thrift(json, api.Join) tables = utils.get_related_table_names(join) @@ -264,7 +264,7 @@ def test_get_related_table_names_for_bootstrap_joins(): def test_tables_with_without_skip_join_parts(): - with open("test/sample/production/joins/sample_team/sample_join_bootstrap.v1") as conf_file: + with open("api/py/test/sample/production/joins/sample_team/sample_join_bootstrap.v1") as conf_file: json = conf_file.read() join = json2thrift(json, api.Join) tables_without = utils.get_related_table_names(join) diff --git a/api/py/test/test_validator.py b/api/py/test/test_validator.py index 51040c91a..62cec8848 100644 --- a/api/py/test/test_validator.py +++ b/api/py/test/test_validator.py @@ -22,7 +22,7 @@ @pytest.fixture def zvalidator(): - return validator.ChrononRepoValidator(chronon_root_path="test/sample", output_root="production") + return validator.ChrononRepoValidator(chronon_root_path="api/py/test/sample", output_root="production") @pytest.fixture diff --git a/api/src/main/scala-2.12/scala/util/BUILD.bazel b/api/src/main/scala-2.12/scala/util/BUILD.bazel new file mode 100644 index 000000000..f61b113e1 --- /dev/null +++ b/api/src/main/scala-2.12/scala/util/BUILD.bazel @@ -0,0 +1,5 @@ +exports_files( + [ + "ScalaVersionSpecificCollectionsConverter.scala", + ], +) diff --git a/api/src/main/scala-2.13/scala/util/BUILD.bazel b/api/src/main/scala-2.13/scala/util/BUILD.bazel new file mode 100644 index 000000000..f61b113e1 --- /dev/null +++ b/api/src/main/scala-2.13/scala/util/BUILD.bazel @@ -0,0 +1,5 @@ +exports_files( + [ + "ScalaVersionSpecificCollectionsConverter.scala", + ], +) diff --git a/api/thrift/BUILD.bazel b/api/thrift/BUILD.bazel new file mode 100644 index 000000000..0486beb3e --- /dev/null +++ b/api/thrift/BUILD.bazel @@ -0,0 +1,14 @@ +load("//tools/build_rules/thrift:thrift.bzl", "thrift_java_library", "thrift_python_library") + +thrift_python_library( + name = "api-models-py", + srcs = ["api.thrift"], + namespace = "ai.chronon", + visibility = ["//visibility:public"], +) + +thrift_java_library( + name = "api-models-java", + srcs = ["api.thrift"], + visibility = ["//visibility:public"], +) diff --git a/build.sbt b/build.sbt index 2c192b15d..ab10f1c39 100644 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,7 @@ lazy val scala213 = "2.13.6" lazy val spark2_4_0 = "2.4.0" lazy val spark3_1_1 = "3.1.1" lazy val spark3_2_1 = "3.2.1" +lazy val spark3_5_3 = "3.5.3" lazy val tmp_warehouse = "/tmp/chronon/" ThisBuild / organization := "ai.chronon" @@ -37,6 +38,9 @@ ThisBuild / developers := List( ) ThisBuild / assembly / test := {} +val use_spark_3_5 = settingKey[Boolean]("Flag to build for 3.5") +ThisBuild / use_spark_3_5 := false + def buildTimestampSuffix = ";build.timestamp=" + new java.util.Date().getTime lazy val publishSettings = Seq( publishTo := { @@ -132,6 +136,15 @@ val VersionMatrix: Map[String, VersionDependency] = Map( Some(spark3_1_1), Some(spark3_2_1) ), + "spark-sql-3-5" -> VersionDependency( + Seq( + "org.apache.spark" %% "spark-sql", + "org.apache.spark" %% "spark-core" + ), + Some(spark2_4_0), + Some(spark3_5_3), + Some(spark3_5_3) + ), "spark-all" -> VersionDependency( Seq( "org.apache.spark" %% "spark-sql", @@ -144,6 +157,18 @@ val VersionMatrix: Map[String, VersionDependency] = Map( Some(spark3_1_1), Some(spark3_2_1) ), + "spark-all-3-5" -> VersionDependency( + Seq( + "org.apache.spark" %% "spark-sql", + "org.apache.spark" %% "spark-hive", + "org.apache.spark" %% "spark-core", + "org.apache.spark" %% "spark-streaming", + "org.apache.spark" %% "spark-sql-kafka-0-10" + ), + Some(spark2_4_0), + Some(spark3_5_3), + Some(spark3_5_3) + ), "scala-reflect" -> VersionDependency( Seq("org.scala-lang" % "scala-reflect"), Some(scala211), @@ -360,6 +385,20 @@ val sparkBaseSettings: Seq[Setting[_]] = Seq( val art = (assembly / artifact).value art.withClassifier(Some("assembly")) }, + Compile / unmanagedSources := { + val sources = (Compile / unmanagedSources).value + val srcDir = (Compile / sourceDirectory).value + + val spark_3_5_encoder = srcDir / "spark-3_5_plus" / "ai" / "chronon" / "spark" / "EncoderUtil.scala" + val spark_default_encoder = srcDir / "spark-default" / "ai" / "chronon" / "spark" / "EncoderUtil.scala" + + val filteredSources = sources.filterNot(f => + f.getAbsolutePath == spark_3_5_encoder.getAbsolutePath || + f.getAbsolutePath == spark_default_encoder.getAbsolutePath + ) + + filteredSources :+ (if (use_spark_3_5.value) spark_3_5_encoder else spark_default_encoder) + }, mainClass in (Compile, run) := Some("ai.chronon.spark.Driver"), cleanFiles ++= Seq(file(tmp_warehouse)), Test / testOptions += Tests.Setup(() => cleanSparkMeta()), @@ -373,7 +412,10 @@ lazy val spark_uber = (project in file("spark")) sparkBaseSettings, version := git.versionProperty.value, crossScalaVersions := supportedVersions, - libraryDependencies ++= fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided") + libraryDependencies ++= (if (use_spark_3_5.value) + fromMatrix(scalaVersion.value, "jackson", "spark-all-3-5/provided", "delta-core/provided") + else + fromMatrix(scalaVersion.value, "jackson", "spark-all/provided", "delta-core/provided")) ) lazy val spark_embedded = (project in file("spark")) @@ -382,7 +424,10 @@ lazy val spark_embedded = (project in file("spark")) sparkBaseSettings, version := git.versionProperty.value, crossScalaVersions := supportedVersions, - libraryDependencies ++= fromMatrix(scalaVersion.value, "spark-all", "delta-core"), + libraryDependencies ++= (if (use_spark_3_5.value) + fromMatrix(scalaVersion.value, "spark-all-3-5", "delta-core") + else + fromMatrix(scalaVersion.value, "spark-all", "delta-core")), target := target.value.toPath.resolveSibling("target-embedded").toFile, Test / test := {} ) diff --git a/devnotes.md b/devnotes.md index d5e072fdc..3ebe72bad 100644 --- a/devnotes.md +++ b/devnotes.md @@ -1,287 +1,348 @@ -# Intro +# Development -## Commands +## Intro -***All commands assume you are in the root directory of this project***. -For me, that looks like `~/repos/chronon`. +### Commands + +***All commands assume you are in the root directory of this project***. For me, that looks like `~/repos/chronon`. + +--- ## Prerequisites -Add the following to your shell run command files e.g. `~/.bashrc`. +### Environment Setup -``` +Add the following to your shell run command files, e.g., `~/.bashrc` (primarily for SBT users): + +```shell export CHRONON_OS= export CHRONON_API=$CHRONON_OS/api/py alias materialize="PYTHONPATH=$CHRONON_API:$PYTHONPATH $CHRONON_API/ai/chronon/repo/compile.py" ``` -### Install specific version of thrift +### Install Thrift (Version 0.13) + +This step is relevant to both Bazel and SBT users. Thrift is a dependency for compile. The latest version, 0.14, is incompatible with Hive Metastore. Install version 0.13: -Thrift is a dependency for compile. The latest version 0.14 is very new - feb 2021, and incompatible with hive metastore. So we force 0.13. +Thrift is a dependency for compile. The latest version, 0.14, is incompatible with Hive Metastore. Install version 0.13: ```shell brew tap cartman-kai/thrift brew install thrift@0.13 ``` -### Install Python dependency packages for API +### Install Python Dependencies for the API + ```shell python3 -m pip install -U tox build ``` -### Configuring IntelliJ +--- -Be sure to open the project from the `build.sbt` file (at the root level of the git directory). +## Build Systems -Mark the following directories as `Sources Root` by right clicking on the directory in the tree view, and selecting `Mark As` -> `Sources Root`: -- aggregator/src/main/scala -- api/src/main/scala -- spark/src/main/scala +This project supports both Bazel and SBT. Bazel's hermetic nature simplifies setup compared to SBT, as it doesn't require extensive environment configuration. Choose the system that works best for your workflow. +### Bazel Setup -Mark the following directories as `Test Root` in a similar way: -- aggregator/src/test/scala -- api/src/test/scala -- spark/src/test/scala +1. Bazel is hermetic and does not require additional environment setup, except for the installation of Thrift@0.13. -The project should then automatically start indexing, and when it finishes you should be good to go. +2. Ensure you have a `WORKSPACE` file and `BUILD` files in the appropriate directories. -**Troubleshooting** +3. Common Bazel Commands: -Try the following if you are seeing flaky issues in IntelliJ -``` -sbt +clean -sbt +assembly -``` + - Build all targets: + ```shell + bazel build //... + ``` + - Run tests: + ```shell + bazel test //... + ``` + - Run specific tests: + ```shell + bazel test //module:TestName + ``` + - Build a fat jar: + ```shell + bazel build //module:deploy.jar + bazel build //spark:spark-assembly_deploy.jar + ``` -### Generate python thrift definitions +### SBT Setup + +1. Open the project in IntelliJ from the `build.sbt` file (at the root level). + +2. Configure IntelliJ: + + - Mark these directories as `Sources Root`: + - `aggregator/src/main/scala` + - `api/src/main/scala` + - `spark/src/main/scala` + - Mark these directories as `Test Root`: + - `aggregator/src/test/scala` + - `api/src/test/scala` + - `spark/src/test/scala` + +3. Common SBT Commands: + + - Build all artifacts: + ```shell + sbt package + ``` + - Build a fat jar: + ```shell + sbt assembly + ``` + - Run all tests: + ```shell + sbt test + ``` + - Run specific tests: + ```shell + sbt "testOnly *" + ``` + +### Generate Python Thrift Definitions + +#### With SBT: ```shell sbt py_thrift ``` -### Materializing confs -``` -materialize --input_path= -``` +#### With Bazel: -### Testing -All tests ```shell -sbt test +bazel build //api/thrift:api-models-py +``` + +## Dependency Management + +### Bazel: + +#### Adding Java/Scala Dependencies + +1. Update Maven Dependencies: + - Locate `jvm/_repo.bzl` file (e.g., `jvm/maven_repo.bzl`) + - Add/update the dependency declaration: + ```python + artifacts = [ + "org.apache.thrift:libthrift:0.13.0", + ] + ``` + +2. Reference in BUILD files: + - Add the dependency to your target's deps attribute: + ```python + scala_library( + name = "my_library", + srcs = glob(["src/main/scala/**/*.scala"]), + deps = [ + "@maven//:org_example_library", # Note: colons become underscores + # Other dependencies... + ], + ) + ``` + +3. Refresh Bazel's dependency cache: + ```shell + bazel clean --expunge + bazel build //... + ``` + +#### Adding Python Dependencies + +1. Update `requirements.txt`: + - Add your dependency with version: + ```text + requests==2.28.1 + numpy>=1.21.0 + ``` + +2. Update pip dependencies: + ```shell + bazel run //:pip.update + ``` + +3. Reference in BUILD files: + ```python + py_library( + name = "my_python_lib", + srcs = glob(["*.py"]), + deps = [ + requirement("requests"), + requirement("numpy"), + ], + ) + ``` + +Graph view of dependencies: + +```shell +bazel query 'deps(//module:target)' --output graph ``` +--- + +## Materializing Configurations -Specific submodule tests ```shell -sbt "testOnly *" -# example to test FetcherTest with 9G memory -sbt -mem 9000 "test:testOnly *FetcherTest" -# example to test specific test method from GroupByTest -sbt "test:testOnly *GroupByTest -- -t *testSnapshotEntities" +materialize --input_path= ``` -### Check module dependencies +--- + +## Testing + +### Bazel: + +Run all tests: + ```shell -# Graph based view of all the dependencies -sbt dependencyBrowseGraph +bazel test //... +``` -# Tree based view of all the dependencies -sbt dependencyBrowseTree +Run a specific test module: + +```shell +bazel test //module:SpecificTest ``` -# Chronon Build Process -* Inside the `$CHRONON_OS` directory. +### SBT: + +Run all tests: -### To build all of the Chronon artifacts locally (builds all the JARs, and Python API) ```shell -sbt package +sbt test ``` -### Build Python API +Run specific tests: + ```shell -sbt python_api +sbt "testOnly *" ``` -Note: This will create the artifacts with the version specific naming specified under `version.sbt` -```text -Builds on main branch will result in: --.jar -[JARs] chronon_2.11-0.7.0-SNAPSHOT.jar -[Python] chronon-ai-0.7.0-SNAPSHOT.tar.gz +--- +## Dependency Management -Builds on user branches will result in: ---.jar -[JARs] chronon_2.11-jdoe--branch-0.7.0-SNAPSHOT.jar -[Python] chronon-ai-jdoe--branch-ai-0.7.0-SNAPSHOT.tar.gz -``` +### Bazel: + +Graph view of dependencies: + +Bazel's dependency graph is hermetic and reproducible. It ensures that all dependencies are defined explicitly, avoiding issues caused by system-level or implicit dependencies. This contrasts with SBT, which relies on configuration files and environment settings to resolve dependencies. -### Build a fat jar ```shell -sbt assembly +bazel query 'deps(//module:target)' --output graph ``` -### Building a fat jar for just one submodule +### SBT: + +Graph-based view of dependencies: + ```shell -sbt 'spark_uber/assembly' +sbt dependencyBrowseGraph ``` -# Chronon Artifacts Publish Process -* Inside the `$CHRONON_OS` directory. +Tree-based view of dependencies: -To publish all the Chronon artifacts of the current git HEAD (builds and publishes all the JARs) ```shell -sbt publish +sbt dependencyBrowseTree ``` -* All the SNAPSHOT ones are published to the maven repository as specified by the env variable `$CHRONON_SNAPSHOT_REPO`. -* All the final artifacts are published to the MavenCentral (via Sonatype) +--- -NOTE: Python API package will also be generated, but it will not be pushed to any PyPi repository. Only `release` will -push the Python artifacts to the public repository. +## Artifact Building -## Setup for publishing artifacts to the JFrog artifactory -1. Login into JFrog artifactory webapp console and create an API Key under user profile section. -2. In `~/.sbt/1.0/jfrog.sbt` add -```scala -credentials += Credentials(Path.userHome / ".sbt" / "jfrog_credentials") -``` -4. In `~/.sbt/jfrog_credentials` add -``` -realm=Artifactory Realm -host= -user= -password= -``` +### Bazel: -## Setup for publishing artifacts to MavenCentral (via sonatype) -1. Get maintainer access to Maven Central on Sonatype - 1. Create a sonatype account if you don't have one. - 1. Sign up here https://issues.sonatype.org/ - 2. Ask a current Chronon maintainer to add you to Sonatype project. - 1. To add a new member, an existing Chronon maintainer will need to [email Sonatype central support](https://central.sonatype.org/faq/what-happened-to-issues-sonatype-org/#where-did-issuessonatypeorg-go) and request a new member to be added as a maintainer. Include the username for the newly created Sonatype account in the email. -2. `brew install gpg` on your mac -3. In `~/.sbt/1.0/sonatype.sbt` add -```scala -credentials += Credentials(Path.userHome / ".sbt" / "sonatype_credentials") -``` -4. In `~/.sbt/sonatype_credentials` add -``` -realm=Sonatype Nexus Repository Manager -host=s01.oss.sonatype.org -user= -password= -``` -5. setup gpg - just first step in this [link](https://www.scala-sbt.org/1.x/docs/Using-Sonatype.html#step+1%3A+PGP+Signatures) +Default settings in .bazelrc -## Setup for pushing python API package to PyPi repository +Build all artifacts: -1. Setup your pypi public account and contact @Nikhil to get added to the PyPi package as a [collaborator](https://pypi.org/manage/project/chronon-ai/collaboration/) -2. Install `tox, build, twine`. There are three python requirements for the python build process. -* tox: Module for testing. To run the tests run tox in the main project directory. -* build: Module for building. To build run `python -m build` in the main project directory -* twine: Module for publishing. To upload a distribution run `twine upload dist/.whl` -``` -python3 -m pip install -U tox build twine +```shell +bazel build //... ``` -3. Fetch the user token from the PyPi website. -4. Make sure you have the credentials configuration for the python repositories you manage. Normally in `~/.pypirc` +Build a specific artifact: + +```shell +bazel build //module:artifact_name ``` -[distutils] - index-servers = - local - pypi - chronon-pypi - -[local] - repository = # local artifactory - username = # local username - password = # token or password - -[pypi] - username = # username or __token__ - password = # password or token - -# Or if using a project specific token -[chronon-pypi] - repository = https://upload.pypi.org/legacy/ - username = __token__ - password = # Project specific pypi token. + +Build a scala version specific artifact: + +```shell +bazel build --config scala_2.12 //module:artifact_name ``` -# Chronon Release Process +Build a spark version specific artifact: -## Publishing all the artifacts of Chronon -1. Run release command in the right HEAD of chronon repository. Before running this, you may want to activate your Python venv or install the required Python packages on the laptop. Otherwise, the Python release will fail due to missing deps. +```shell +bazel build --define spark_version=3.5 //module:artifact_name ``` -GPG_TTY=$(tty) sbt -mem 8192 release + +Build deploy jar aka Uber jar or fat jar: + +```shell +bazel build --config scala_2.13 --define spark_version=3.5 //spark:spark-assembly_deploy.jar ``` -This command will take into the account of `version.sbt` and handles a series of events: -* Marks the current SNAPSHOT codebase as final (git commits). -* Creates a new git tag (e.g v0.7.0) pointing to the release commit. -* Builds the artifacts with released versioning suffix and pushes them to Sonatype, and PyPi central. -* Updates the `version.sbt` to point to the next in line developmental version (git commits). - -2. login into the [staging repo](https://s01.oss.sonatype.org/#stagingRepositories) in nexus (same password as sonatype jira) -3. In the staging repos list - select your publish - 1. select "close" wait for the steps to finish - 2. Select "refresh" and "release" - 3. Wait for 30 mins to sync to [maven](https://repo1.maven.org/maven2/) or [sonatype UI](https://search.maven.org/search?q=g:ai.chronon) -4. Push the local release commits (DO NOT SQUASH), and the new tag created from step 1 to Github. - 1. chronon repo disallow push to main branch directly, so instead push commits to a branch `git push origin main:your-name--release-xxx` - 2. your PR should contain exactly two commits, 1 setting the release version, 1 setting the new snapshot version. - 3. make sure to use **Rebase pull request** instead of the regular Merge or Squash options when merging the PR. -5. Push release tag to main branch - 1. tag new version to release commit `Setting version to 0.0.xx`. If not already tagged, can be added by - ``` - git tag -fa v0.0.xx - ``` - 2. push tag - ``` - git push origin - ``` - 3. New tag should be available here - https://github.com/airbnb/chronon/tags -6. Verify the Python API from the [PyPi website](https://pypi.org/project/chronon-ai/) that we are pointing to the latest. -### Troubleshooting -* Most common reason for Python failure is re-uploading a version that's already uploaded. -## [TODO] Publishing a driver to github releases -We use gh releases to release the driver that can backfill, upload, stream etc. -Currently the repo is not public and the run.py script can't reach it. +### SBT: -# Chronon Documentation via Sphinx -Run the sbt sphinx command to generate the sphinx docs locally and open it. -``` -sbt sphinx +Build all artifacts: + +```shell +sbt package ``` -# build artifacts and release to gcloud +Build Python API: + ```shell -bash build.sh -bash gcloud_release.sh +sbt python_api ``` -# Testing on REPL -{One-time} First install the ammonite REPL with [support](https://ammonite.io/#OlderScalaVersions) for scala 2.12 +Build a fat jar: + ```shell -sudo sh -c '(echo "#!/usr/bin/env sh" && curl -L https://github.com/com-lihaoyi/Ammonite/releases/download/3.0.0-M0/2.12-3.0.0-M0) > /usr/local/bin/amm && chmod +x /usr/local/bin/amm' && amm +sbt assembly ``` -Build the chronon jar for scala 2.12 +--- + +## Publishing Artifacts + +### Using SBT + +Publish all artifacts: + ```shell -sbt ++2.12.12 spark_uber/assembly +sbt publish ``` -Start the REPL +### Using Bazel + +Publish to a custom repository: + ```shell -/usr/local/bin/amm +bazel run //module:publish ``` -In the repl prompt load the jar -```scala -import $cp.spark.target.`scala-2.12`.`spark_uber-assembly-0.0.63-SNAPSHOT.jar` +--- + +## Documentation + +Generate documentation via Sphinx: + +```shell +sbt sphinx ``` -Now you can import the chronon classes and use them directly from repl for testing. +--- + +## Additional Notes + +For Bazel-specific troubleshooting, refer to the Bazel documentation: [https://bazel.build](https://bazel.build) + +For SBT-specific troubleshooting, refer to the SBT documentation: [https://www.scala-sbt.org](https://www.scala-sbt.org) diff --git a/flink/BUILD.bazel b/flink/BUILD.bazel new file mode 100644 index 000000000..ec749c347 --- /dev/null +++ b/flink/BUILD.bazel @@ -0,0 +1,47 @@ +scala_library( + name = "flink", + srcs = glob(["src/main/scala/ai/chronon/flink/**/*.scala"]), + deps = [ + "//aggregator", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + maven_artifact("org.apache.flink:flink-core"), + maven_artifact("org.apache.flink:flink-metrics-core"), + maven_artifact("org.apache.flink:flink-streaming-java"), + scala_artifact("org.apache.flink:flink-scala"), + scala_artifact("org.apache.flink:flink-streaming-scala"), + maven_artifact("org.apache.flink:flink-metrics-dropwizard"), + maven_artifact("io.dropwizard.metrics:metrics-core"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], +) + +scala_test_suite( + name = "test", + srcs = glob(["src/main/scala/ai/chronon/flink/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":flink", + "//aggregator", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + maven_artifact("org.apache.flink:flink-core"), + maven_artifact("org.apache.flink:flink-metrics-core"), + maven_artifact("org.apache.flink:flink-streaming-java"), + scala_artifact("org.apache.flink:flink-scala"), + scala_artifact("org.apache.flink:flink-streaming-scala"), + maven_artifact("org.apache.flink:flink-metrics-dropwizard"), + maven_artifact("io.dropwizard.metrics:metrics-core"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], +) diff --git a/jvm/BUILD b/jvm/BUILD new file mode 100644 index 000000000..e69de29bb diff --git a/jvm/defs.bzl b/jvm/defs.bzl new file mode 100644 index 000000000..e2acd7248 --- /dev/null +++ b/jvm/defs.bzl @@ -0,0 +1,36 @@ +load("@rules_jvm_external//:specs.bzl", "parse") +load("//tools/build_rules:utils.bzl", "flatten", "map") +load("@rules_jvm_external//:defs.bzl", "artifact") + +def _parse_versioned_artifact(artifact, version, exclusions): + result = parse.parse_maven_coordinate("{}:{}".format(artifact, version)) + if (exclusions != None): + result["exclusions"] = exclusions + return result + +def versioned_artifacts(version, artifacts, exclusions = None): + return map(lambda artifact: _parse_versioned_artifact(artifact, version, exclusions), artifacts) + +SUPPORTED_SCALA_VERSIONS = ["2.12", "2.13"] + +def repo(name, pinned = True, artifacts = [], overrides = {}, provided = False, vars = {}, excluded_artifacts = []): + final_artifacts = [] + flat_artifacts = flatten(artifacts) + for artifact in parse.parse_artifact_spec_list(flat_artifacts): + # Empty string in packaging seems to mess up Coursier, maybe a bug in RJE + if artifact.get("packaging") == "": + artifact.pop("packaging") + artifact["version"] = artifact["version"].format(**vars) + final_artifacts.append(artifact) + return struct( + name = name, + pinned = pinned, + artifacts = final_artifacts, + overrides = overrides, + provided = provided, + vars = vars, + excluded_artifacts = excluded_artifacts, + ) + +def get_jars_for_repo(repo_name, jars): + return [artifact(jar, repository_name = repo_name) for jar in jars] diff --git a/jvm/maven_repo.bzl b/jvm/maven_repo.bzl new file mode 100644 index 000000000..e1437b766 --- /dev/null +++ b/jvm/maven_repo.bzl @@ -0,0 +1,133 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repo", "versioned_artifacts") + +maven_repo = repo( + name = "maven", + pinned = False, + artifacts = [ + "org.scala-lang.modules:scala-collection-compat_2.12:2.6.0", + "org.scala-lang.modules:scala-collection-compat_2.13:2.6.0", + "org.scala-lang.modules:scala-parser-combinators_2.13:2.4.0", + "org.scala-lang.modules:scala-java8-compat_2.12:1.0.0", + "org.scala-lang.modules:scala-java8-compat_2.13:1.0.0", + "org.apache.thrift:libthrift:0.13.0", + "org.scala-lang.modules:scala-parallel-collections_2.13:1.0.4", + "org.apache.commons:commons-lang3:3.12.0", + "org.apache.commons:commons-math3:3.6.1", + versioned_artifacts("1.2.3", [ + "ch.qos.logback:logback-classic", + "ch.qos.logback:logback-core", + ]), + "ch.qos.reload4j:reload4j:1.2.19", + + # JUnit + "junit:junit:4.13.2", + "com.novocode:junit-interface:0.11", + "org.scalatestplus:mockito-3-4_2.12:3.2.10.0", + "org.scalatestplus:mockito-3-4_2.13:3.2.10.0", + "org.mockito:mockito-core:4.6.1", + "org.mockito:mockito-scala_2.12:1.17.0", + "org.mockito:mockito-scala_2.13:1.17.0", + "org.scalatest:scalatest_2.12:3.2.15", + "org.scalatest:scalatest_2.13:3.2.15", + "org.scalatest:scalatest-shouldmatchers_2.12:3.2.15", + "org.scalatest:scalatest-shouldmatchers_2.13:3.2.15", + "org.scalatest:scalatest-matchers-core_2.12:3.2.15", + "org.scalatest:scalatest-matchers-core_2.13:3.2.15", + "org.scalactic:scalactic_2.12:3.2.15", + "org.scalactic:scalactic_2.13:3.2.15", + + # Add other dependencies + "org.apache.hadoop:hadoop-common:3.3.1", + "com.typesafe.akka:akka-actor_2.12:2.6.18", + "com.typesafe.akka:akka-stream_2.12:2.6.18", + "org.slf4j:slf4j-api:1.7.30", + "org.slf4j:slf4j-log4j12:1.7.30", + "org.apache.logging.log4j:log4j-core:2.14.1", + "org.apache.logging.log4j:log4j-api:2.14.1", + "com.fasterxml.jackson.core:jackson-core:2.12.5", + "com.fasterxml.jackson.core:jackson-annotations:2.12.5", + "com.fasterxml.jackson.core:jackson-databind:2.12.5", + "com.fasterxml.jackson.module:jackson-module-scala_2.12:2.12.5", + "com.fasterxml.jackson.module:jackson-module-scala_2.13:2.12.5", + "org.apache.httpcomponents:httpclient:4.5.13", + "org.apache.httpcomponents:httpcore:4.4.14", + "com.google.guava:guava:30.1-jre", + "org.xerial:sqlite-jdbc:3.34.0", + "com.yahoo.datasketches:sketches-core:0.13.4", + "com.google.code.gson:gson:2.8.6", + "commons-lang:commons-lang:2.6", + "commons-io:commons-io:2.11.0", + "com.github.ben-manes.caffeine:caffeine:2.8.5", + "net.jodah:typetools:0.4.1", + "org.rogach:scallop_2.12:4.0.1", + "org.rogach:scallop_2.13:4.0.1", + "org.json4s:json4s-jackson_2.12:3.6.12", + "org.json4s:json4s-core_2.12:3.6.12", + "org.json4s:json4s-native_2.12:3.6.12", + "org.json4s:json4s-ast_2.12:3.6.12", + "org.json4s:json4s-jackson_2.13:3.6.12", + "org.json4s:json4s-core_2.13:3.6.12", + "org.json4s:json4s-native_2.13:3.6.12", + "org.json4s:json4s-ast_2.13:3.6.12", + "com.datadoghq:java-dogstatsd-client:2.7", + "com.esotericsoftware:kryo:5.6.2", + "io.delta:delta-core_2.12:2.0.2", + "io.delta:delta-core_2.13:2.0.2", + "com.github.jnr:jnr-ffi:2.2.10", + "org.apache.kafka:kafka-clients:3.7.1", + "org.codehaus.janino:janino:3.0.9", + "org.codehaus.janino:commons-compiler:3.0.9", + "org.codehaus.jackson:jackson-core-asl:1.9.13", + + # Vertx dependencies + "io.vertx:vertx-core:4.5.10", + "io.vertx:vertx-web:4.5.10", + "io.vertx:vertx-config:4.5.10", + "io.vertx:vertx-codegen:4.5.10", + "io.vertx:vertx-unit:4.5.10", + "io.vertx:vertx-micrometer-metrics:4.5.10", + "io.micrometer:micrometer-core:1.12.4", + "io.micrometer:micrometer-registry-statsd:1.12.4", + + # Proto + "com.google.protobuf:protobuf-java:3.21.7", + + # Flink + versioned_artifacts("1.19.0", [ + "org.apache.flink:flink-clients", + "org.apache.flink:flink-connector-files", + "org.apache.flink:flink-connector-hive_2.12", + "org.apache.flink:flink-csv", + "org.apache.flink:flink-json", + "org.apache.flink:flink-metrics-core", + "org.apache.flink:flink-metrics-prometheus:jar", + "org.apache.flink:flink-orc", + "org.apache.flink:flink-parquet", + "org.apache.flink:flink-protobuf", + "org.apache.flink:flink-scala_2.12", + "org.apache.flink:flink-sql-gateway-api", + "org.apache.flink:flink-streaming-java", + "org.apache.flink:flink-streaming-scala_2.12", + "org.apache.flink:flink-table-api-java", + "org.apache.flink:flink-table-planner_2.12", + "org.apache.flink:flink-test-utils", + "org.apache.flink:flink-streaming-java:jar:tests", + "org.apache.flink:flink-metrics-dropwizard", + ]), + + # MongoDB Scala Driver and dependencies + "org.mongodb.scala:mongo-scala-driver_2.12:4.11.1", + "org.mongodb.scala:mongo-scala-bson_2.12:4.11.1", + "org.mongodb:mongodb-driver-core:4.11.1", + "org.mongodb:bson:4.11.1", + "org.mongodb:mongodb-driver-sync:4.11.1", + "org.mongodb:mongodb-driver-reactivestreams:4.11.1", + "org.reactivestreams:reactive-streams:1.0.4", + "org.mongodb:mongodb-driver-reactivestreams:4.11.1", + "org.reactivestreams:reactive-streams:1.0.4", + "io.dropwizard.metrics:metrics-core:4.2.20", + ], + overrides = { + }, +) diff --git a/jvm/mirrors.bzl b/jvm/mirrors.bzl new file mode 100644 index 000000000..788d9b0b2 --- /dev/null +++ b/jvm/mirrors.bzl @@ -0,0 +1,35 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("@bazel_skylib//lib:dicts.bzl", "dicts") +load("@rules_jvm_external//:defs.bzl", "maven_install") +load(":repos.bzl", "get_override_targets", "repos") + +# These mirrors apply to all repos +_mirrors = [ + # Private repositories are supported through HTTP Basic auth + # "http://username:password@localhost:8081/artifactory/my-repository", + "https://repo1.maven.org/maven2/", + "https://mvnrepository.com/artifact", + "https://packages.confluent.io/maven/", +] + +# Any mirrors needed by a particular repo should go here to avoid ballooning the size of the repos that don't use them. +_extra_mirrors = { + "maven": [ + ], +} + +def load_deps(): + for repo in repos: + maven_install( + name = repo.name, + artifacts = repo.artifacts, + repositories = _mirrors + _extra_mirrors.get(repo.name, []), + version_conflict_policy = "pinned", + fetch_sources = True, + override_targets = dicts.add(get_override_targets(repo.name), repo.overrides), + duplicate_version_warning = "error", + fail_if_repin_required = True, + resolve_timeout = 5000, + maven_install_json = None, + excluded_artifacts = repo.excluded_artifacts, + ) diff --git a/jvm/repos.bzl b/jvm/repos.bzl new file mode 100644 index 000000000..0ad0c0319 --- /dev/null +++ b/jvm/repos.bzl @@ -0,0 +1,50 @@ +load("@rules_jvm_external//:defs.bzl", "DEFAULT_REPOSITORY_NAME") +load("@rules_jvm_external//:specs.bzl", "json", "maven", "parse") +load("//tools/build_rules:common.bzl", "jar") +load(":defs.bzl", "repo", "versioned_artifacts") + +# repos with artifacts defined in external files +load(":maven_repo.bzl", "maven_repo") +load(":spark_3_1_repo.bzl", "spark_3_1_repo") +load(":spark_3_2_repo.bzl", "spark_3_2_repo") +load(":spark_3_5_repo.bzl", "spark_3_5_repo") + +repos = [ + # The main repos are defined in individual files, which are loaded above and referenced here + maven_repo, # defined in maven_repo.bzl + spark_3_1_repo, + spark_3_2_repo, + spark_3_5_repo, + repo(name = "scala_2.12", artifacts = [ + versioned_artifacts("2.12.18", [ + "org.scala-lang:scala-library", + "org.scala-lang:scala-compiler", + "org.scala-lang:scala-reflect", + ]), + ]), + repo(name = "scala_2.13", artifacts = [ + versioned_artifacts("2.13.12", [ + "org.scala-lang:scala-library", + "org.scala-lang:scala-compiler", + "org.scala-lang:scala-reflect", + ]), + ]), +] + +def get_repo(repo_name): + for repo in repos: + if repo.name == repo_name: + return repo + return None + +def get_override_targets(repo_name = DEFAULT_REPOSITORY_NAME): + if repo_name.startswith("scala_"): + return {} + overrides = { + "log4j:log4j": "@maven//:ch_qos_reload4j_reload4j", + "org.scala-lang:scala-library": "@//third_party/scala:scala-library", + "org.scala-lang:scala-reflect": "@//third_party/scala:reflect", + "org.scala-lang:scala-compiler": "@//third_party/scala:scala-compiler", + } + repo = get_repo(repo_name) or fail("Repo {} does not exist".format(repo_name)) + return overrides diff --git a/jvm/spark_3_1_repo.bzl b/jvm/spark_3_1_repo.bzl new file mode 100644 index 000000000..c56f271c0 --- /dev/null +++ b/jvm/spark_3_1_repo.bzl @@ -0,0 +1,25 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repo", "versioned_artifacts") + +spark_3_1_repo = repo(name = "spark_3_1", provided = True, artifacts = [ + "org.apache.avro:avro:1.10.2", + "org.apache.curator:apache-curator:2.12.0", + "org.apache.datasketches:datasketches-java:2.0.0", + "org.apache.datasketches:datasketches-memory:1.3.0", + "org.apache.hive:hive-exec:3.1.2", + "org.apache.kafka:kafka_2.12:2.6.3", + versioned_artifacts("3.1.1", [ + "org.apache.spark:spark-streaming_2.12", + "org.apache.spark:spark-core_2.12:jar:tests", + "org.apache.spark:spark-hive_2.12", + "org.apache.spark:spark-sql-kafka-0-10_2.12", + "org.apache.spark:spark-sql_2.12", + "org.apache.spark:spark-streaming-kafka-0-10_2.12", + ]), + versioned_artifacts("3.7.0-M5", [ + "org.json4s:json4s-ast_2.12", + "org.json4s:json4s-core_2.12", + "org.json4s:json4s-jackson_2.12", + ]), + "org.apache.hive:hive-metastore:2.3.9", +], excluded_artifacts = ["org.slf4j:slf4j-log4j12"]) diff --git a/jvm/spark_3_2_repo.bzl b/jvm/spark_3_2_repo.bzl new file mode 100644 index 000000000..b30b349c4 --- /dev/null +++ b/jvm/spark_3_2_repo.bzl @@ -0,0 +1,46 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repo") + +spark_3_2_repo = repo( + name = "spark_3_2", + provided = True, + vars = { + "spark_version": "3.2.1", + "hadoop_version": "3.3.6", + }, + artifacts = [ + # Spark artifacts - only Scala 2.12 since that's our target + "org.apache.spark:spark-sql_2.12:{spark_version}", + "org.apache.spark:spark-hive_2.12:{spark_version}", + "org.apache.spark:spark-streaming_2.12:{spark_version}", + + # Spark artifacts for Scala 2.13 + "org.apache.spark:spark-sql_2.13:{spark_version}", + "org.apache.spark:spark-hive_2.13:{spark_version}", + "org.apache.spark:spark-streaming_2.13:{spark_version}", + + # Other dependencies + "org.apache.curator:apache-curator:2.12.0", + "com.esotericsoftware:kryo:5.1.1", + "com.yahoo.datasketches:sketches-core:0.13.4", + "com.yahoo.datasketches:memory:0.12.2", + "com.yahoo.datasketches:sketches-hive:0.13.0", + "org.apache.datasketches:datasketches-java:2.0.0", + "org.apache.datasketches:datasketches-memory:1.3.0", + + # Kafka dependencies - only Scala 2.12 + "org.apache.kafka:kafka_2.12:2.6.3", + + # Avro dependencies + "org.apache.avro:avro:1.8.2", + "org.apache.avro:avro-mapred:1.8.2", + "org.apache.hive:hive-metastore:2.3.9", + "org.apache.hive:hive-exec:3.1.2", + + # Monitoring + "io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0", + ], + excluded_artifacts = [ + "org.pentaho:pentaho-aggdesigner-algorithm", + ], +) diff --git a/jvm/spark_3_5_repo.bzl b/jvm/spark_3_5_repo.bzl new file mode 100644 index 000000000..2ff9515b9 --- /dev/null +++ b/jvm/spark_3_5_repo.bzl @@ -0,0 +1,46 @@ +load("@rules_jvm_external//:specs.bzl", "maven") +load(":defs.bzl", "repo") + +spark_3_5_repo = repo( + name = "spark_3_5", + provided = True, + vars = { + "spark_version": "3.5.4", + "hadoop_version": "3.3.6", + }, + artifacts = [ + # Spark artifacts - for scala 2.12 + "org.apache.spark:spark-sql_2.12:{spark_version}", + "org.apache.spark:spark-hive_2.12:{spark_version}", + "org.apache.spark:spark-streaming_2.12:{spark_version}", + + # Spark artifacts for Scala 2.13 + "org.apache.spark:spark-sql_2.13:{spark_version}", + "org.apache.spark:spark-hive_2.13:{spark_version}", + "org.apache.spark:spark-streaming_2.13:{spark_version}", + + # Other dependencies + "org.apache.curator:apache-curator:2.12.0", + "com.esotericsoftware:kryo:5.1.1", + "com.yahoo.datasketches:sketches-core:0.13.4", + "com.yahoo.datasketches:memory:0.12.2", + "com.yahoo.datasketches:sketches-hive:0.13.0", + "org.apache.datasketches:datasketches-java:2.0.0", + "org.apache.datasketches:datasketches-memory:1.3.0", + + # Kafka dependencies - only Scala 2.12 + "org.apache.kafka:kafka_2.12:2.6.3", + + # Avro dependencies + "org.apache.avro:avro:1.8.2", + "org.apache.avro:avro-mapred:1.8.2", + "org.apache.hive:hive-metastore:2.3.9", + "org.apache.hive:hive-exec:3.1.2", + + # Monitoring + "io.prometheus.jmx:jmx_prometheus_javaagent:0.20.0", + ], + excluded_artifacts = [ + "org.pentaho:pentaho-aggdesigner-algorithm", + ], +) diff --git a/online/BUILD.bazel b/online/BUILD.bazel new file mode 100644 index 000000000..3c96c693a --- /dev/null +++ b/online/BUILD.bazel @@ -0,0 +1,122 @@ +load("@io_bazel_rules_scala//scala:scala_cross_version_select.bzl", "select_for_scala_version") + +scala_library( + name = "online", + srcs = glob(["src/main/scala/ai/chronon/online/*.scala"]) + + select_for_scala_version( + between_2_12_and_2_13 = [ + # For Scala 2.12.x + "//online/src/main/scala-2.12/ai/chronon/online:ScalaVersionSpecificCatalystHelper.scala", + "//online/src/main/scala-2.12/ai/chronon/online:FutureConverters.scala", + ], + since_2_13 = [ + # For Scala 2.13.x + "//online/src/main/scala-2.13/ai/chronon/online:ScalaVersionSpecificCatalystHelper.scala", + "//online/src/main/scala-2.13/ai/chronon/online:FutureConverters.scala", + ], + ) + glob([ + "src/main/java/ai/chronon/online/*.java", + ]), + visibility = ["//visibility:public"], + deps = [ + "//api:api-models", + "//api:api-lib", + "//third_party/java/spark:spark-exec", + "//aggregator", + maven_artifact("com.esotericsoftware:kryo"), + scala_artifact("org.json4s:json4s-core"), + scala_artifact("org.json4s:json4s-jackson"), + scala_artifact("org.json4s:json4s-ast"), + maven_artifact("com.datadoghq:java-dogstatsd-client"), + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("com.github.ben-manes.caffeine:caffeine"), + maven_artifact("org.codehaus.janino:janino"), + maven_artifact("org.codehaus.janino:commons-compiler"), + maven_artifact("com.google.guava:guava"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("org.codehaus.jackson:jackson-core-asl"), + ] + select_for_scala_version( + before_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-java8-compat"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], + since_2_13 = [ + scala_artifact("org.scala-lang.modules:scala-parallel-collections"), + ], + ), +) + +scala_library( + name = "test-lib", + srcs = glob(["src/test/scala/ai/chronon/online/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + "//api:api-models", + "//api:api-lib", + ":online", + "//aggregator", + "//third_party/java/spark:spark-exec", + maven_artifact("com.esotericsoftware:kryo"), + scala_artifact("org.json4s:json4s-core"), + scala_artifact("org.json4s:json4s-jackson"), + scala_artifact("org.json4s:json4s-ast"), + maven_artifact("com.datadoghq:java-dogstatsd-client"), + scala_artifact("org.scala-lang.modules:scala-java8-compat"), + scala_artifact("org.scalatest:scalatest-matchers-core"), + scala_artifact("org.scalatest:scalatest-core"), + maven_artifact("org.scalatest:scalatest-compatible"), + scala_artifact("org.scalatest:scalatest-shouldmatchers"), + scala_artifact("org.scalactic:scalactic"), + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + scala_artifact("org.mockito:mockito-scala"), + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.apache.thrift:libthrift"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("com.github.ben-manes.caffeine:caffeine"), + maven_artifact("org.codehaus.janino:janino"), + maven_artifact("org.codehaus.janino:commons-compiler"), + maven_artifact("com.google.guava:guava"), + maven_artifact("org.apache.commons:commons-lang3"), + maven_artifact("org.codehaus.jackson:jackson-core-asl"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + ] + select_for_scala_version( + since_2_13 = [ + maven_artifact("org.scala-lang.modules:scala-parallel-collections_2.13"), + ], + ), +) + +scala_test_suite( + name = "test", + srcs = glob(["src/test/scala/ai/chronon/online/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":online", + ":test-lib", + "//aggregator", + "//api:api-lib", + "//api:api-models", + "//third_party/java/spark:spark-exec", + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + maven_artifact("org.apache.thrift:libthrift"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("com.google.code.gson:gson"), + maven_artifact("com.github.ben-manes.caffeine:caffeine"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + ], +) diff --git a/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java b/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java index 5610727ba..021c4f82a 100644 --- a/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java +++ b/online/src/main/java/ai/chronon/online/JavaExternalSourceHandler.java @@ -17,7 +17,7 @@ package ai.chronon.online; import scala.collection.Seq; -import scala.compat.java8.FutureConverters; +import ai.chronon.online.FutureConverters; import scala.concurrent.Future; import scala.util.ScalaVersionSpecificCollectionsConverter; diff --git a/online/src/main/java/ai/chronon/online/JavaFetcher.java b/online/src/main/java/ai/chronon/online/JavaFetcher.java index 60d996809..eca72ce72 100644 --- a/online/src/main/java/ai/chronon/online/JavaFetcher.java +++ b/online/src/main/java/ai/chronon/online/JavaFetcher.java @@ -18,11 +18,12 @@ import ai.chronon.online.Fetcher.Request; import ai.chronon.online.Fetcher.Response; +import ai.chronon.online.FutureConverters; + import scala.collection.Iterator; import scala.collection.Seq; import scala.Option; import scala.collection.mutable.ArrayBuffer; -import scala.compat.java8.FutureConverters; import scala.concurrent.Future; import java.util.ArrayList; diff --git a/online/src/main/scala-2.11/ai/chronon/online/FutureConverters.scala b/online/src/main/scala-2.11/ai/chronon/online/FutureConverters.scala new file mode 100644 index 000000000..0547807ff --- /dev/null +++ b/online/src/main/scala-2.11/ai/chronon/online/FutureConverters.scala @@ -0,0 +1,15 @@ +package ai.chronon.online + +import java.util.concurrent.CompletionStage +import scala.concurrent.Future +import scala.compat.java8.{FutureConverters => JFutConv} + +object FutureConverters { + def toJava[T](f: Future[T]): CompletionStage[T] = { + JFutConv.toJava(f) + } + + def toScala[T](cs: CompletionStage[T]): Future[T] = { + JFutConv.toScala(cs) + } +} diff --git a/online/src/main/scala-2.12/ai/chronon/online/BUILD b/online/src/main/scala-2.12/ai/chronon/online/BUILD new file mode 100644 index 000000000..01d13d62b --- /dev/null +++ b/online/src/main/scala-2.12/ai/chronon/online/BUILD @@ -0,0 +1,6 @@ +exports_files( + [ + "ScalaVersionSpecificCatalystHelper.scala", + "FutureConverters.scala" + ], +) diff --git a/online/src/main/scala-2.12/ai/chronon/online/FutureConverters.scala b/online/src/main/scala-2.12/ai/chronon/online/FutureConverters.scala new file mode 100644 index 000000000..0547807ff --- /dev/null +++ b/online/src/main/scala-2.12/ai/chronon/online/FutureConverters.scala @@ -0,0 +1,15 @@ +package ai.chronon.online + +import java.util.concurrent.CompletionStage +import scala.concurrent.Future +import scala.compat.java8.{FutureConverters => JFutConv} + +object FutureConverters { + def toJava[T](f: Future[T]): CompletionStage[T] = { + JFutConv.toJava(f) + } + + def toScala[T](cs: CompletionStage[T]): Future[T] = { + JFutConv.toScala(cs) + } +} diff --git a/online/src/main/scala-2.13/ai/chronon/online/BUILD b/online/src/main/scala-2.13/ai/chronon/online/BUILD new file mode 100644 index 000000000..01d13d62b --- /dev/null +++ b/online/src/main/scala-2.13/ai/chronon/online/BUILD @@ -0,0 +1,6 @@ +exports_files( + [ + "ScalaVersionSpecificCatalystHelper.scala", + "FutureConverters.scala" + ], +) diff --git a/online/src/main/scala-2.13/ai/chronon/online/FutureConverters.scala b/online/src/main/scala-2.13/ai/chronon/online/FutureConverters.scala new file mode 100644 index 000000000..a8dca9366 --- /dev/null +++ b/online/src/main/scala-2.13/ai/chronon/online/FutureConverters.scala @@ -0,0 +1,15 @@ +package ai.chronon.online + +import java.util.concurrent.CompletionStage +import scala.concurrent.Future +import scala.jdk.{FutureConverters => JFutConv} + +object FutureConverters { + def toJava[T](f: Future[T]): CompletionStage[T] = { + (new JFutConv.FutureOps(f)).asJava + } + + def toScala[T](cs: CompletionStage[T]): Future[T] = { + (new JFutConv.CompletionStageOps(cs)).asScala + } +} diff --git a/quickstart/mongo-online-impl/BUILD.bazel b/quickstart/mongo-online-impl/BUILD.bazel new file mode 100644 index 000000000..6cc652ba1 --- /dev/null +++ b/quickstart/mongo-online-impl/BUILD.bazel @@ -0,0 +1,25 @@ +scala_library( + name = "mongo-online-impl", + srcs = glob(["src/main/scala/ai/chronon/quickstart/online/*.scala"]), + resources = [ + "src/main/scala/resources/logback.xml", + ], + deps = [ + "//aggregator", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + maven_artifact("org.mongodb.scala:mongo-scala-driver_2.12"), + maven_artifact("org.mongodb.scala:mongo-scala-bson_2.12"), + maven_artifact("org.mongodb:mongodb-driver-core"), + maven_artifact("org.mongodb:bson"), + maven_artifact("io.dropwizard.metrics:metrics-core"), + maven_artifact("org.reactivestreams:reactive-streams"), + maven_artifact("org.mongodb:mongodb-driver-reactivestreams"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + ], +) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..2460242d6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +click +thrift==0.13 +pytest \ No newline at end of file diff --git a/requirements_lock.txt b/requirements_lock.txt new file mode 100644 index 000000000..ee0be9f52 --- /dev/null +++ b/requirements_lock.txt @@ -0,0 +1,71 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# bazel run //:pip.update +# +click==8.1.8 \ + --hash=sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2 \ + --hash=sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a + # via -r requirements.txt +exceptiongroup==1.2.2 \ + --hash=sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b \ + --hash=sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc + # via pytest +iniconfig==2.0.0 \ + --hash=sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3 \ + --hash=sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374 + # via pytest +packaging==24.2 \ + --hash=sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759 \ + --hash=sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f + # via pytest +pluggy==1.5.0 \ + --hash=sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1 \ + --hash=sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669 + # via pytest +pytest==8.3.4 \ + --hash=sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6 \ + --hash=sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761 + # via -r requirements.txt +six==1.17.0 \ + --hash=sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274 \ + --hash=sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81 + # via thrift +thrift==0.13.0 \ + --hash=sha256:9af1c86bf73433afc6010ed376a6c6aca2b54099cc0d61895f640870a9ae7d89 + # via -r requirements.txt +tomli==2.2.1 \ + --hash=sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6 \ + --hash=sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd \ + --hash=sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c \ + --hash=sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b \ + --hash=sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8 \ + --hash=sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6 \ + --hash=sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77 \ + --hash=sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff \ + --hash=sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea \ + --hash=sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192 \ + --hash=sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249 \ + --hash=sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee \ + --hash=sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4 \ + --hash=sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98 \ + --hash=sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8 \ + --hash=sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4 \ + --hash=sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281 \ + --hash=sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744 \ + --hash=sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69 \ + --hash=sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13 \ + --hash=sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140 \ + --hash=sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e \ + --hash=sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e \ + --hash=sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc \ + --hash=sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff \ + --hash=sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec \ + --hash=sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2 \ + --hash=sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222 \ + --hash=sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106 \ + --hash=sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272 \ + --hash=sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a \ + --hash=sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7 + # via pytest diff --git a/service/BUILD.bazel b/service/BUILD.bazel new file mode 100644 index 000000000..a41b4ba3b --- /dev/null +++ b/service/BUILD.bazel @@ -0,0 +1,42 @@ +java_library( + name = "service", + srcs = glob(["src/main/java/ai/chronon/service/**/*.java"]), + resources = [ + "src/main/resources/example_config.json", + "src/main/resources/logback.xml", + ], + visibility = ["//visibility:public"], + deps = [ + "//api:api-lib", + "//online", + maven_artifact("com.fasterxml.jackson.core:jackson-annotations"), + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + maven_artifact("org.scala-lang:scala-library"), + maven_artifact("io.micrometer:micrometer-core"), + maven_artifact("io.micrometer:micrometer-registry-statsd"), + maven_artifact("io.vertx:vertx-core"), + maven_artifact("io.vertx:vertx-config"), + maven_artifact("io.vertx:vertx-web"), + maven_artifact("io.vertx:vertx-micrometer-metrics"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + ], +) + +scala_test_suite( + name = "test", + srcs = glob(["src/test/java/ai/chronon/service/handlers/*.java"]), + visibility = ["//visibility:public"], + deps = [ + ":service", + "//online", + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + maven_artifact("io.vertx:vertx-unit"), + maven_artifact("io.vertx:vertx-core"), + maven_artifact("io.vertx:vertx-web"), + ], +) diff --git a/spark/BUILD.bazel b/spark/BUILD.bazel new file mode 100644 index 000000000..67159bed4 --- /dev/null +++ b/spark/BUILD.bazel @@ -0,0 +1,114 @@ +load("@io_bazel_rules_scala//scala:scala_cross_version_select.bzl", "select_for_scala_version") + +scala_library( + name = "spark", + srcs = glob(["src/main/scala/ai/chronon/spark/**/*.scala"]) + + select({ + "//tools/flags/spark:spark_3_5": [ + "//spark/src/main/spark-3_5_plus/ai/chronon/spark:EncoderUtil.scala", + ], + "//conditions:default": [ + "//spark/src/main/spark-default/ai/chronon/spark:EncoderUtil.scala", + ], + }), + resources = [ + "src/main/resources/log4j.properties", + ], + deps = [ + "//aggregator", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + maven_artifact("com.esotericsoftware:kryo"), + maven_artifact("com.fasterxml.jackson.core:jackson-core"), + maven_artifact("com.fasterxml.jackson.core:jackson-databind"), + scala_artifact("com.fasterxml.jackson.module:jackson-module-scala"), + maven_artifact("com.github.jnr:jnr-ffi"), + maven_artifact("com.google.guava:guava"), + maven_artifact("com.yahoo.datasketches:sketches-core"), + maven_artifact("com.yahoo.datasketches:memory"), + maven_artifact("commons.io:commons-io"), + maven_artifact("commons.lang:commons-lang"), + scala_artifact("io.delta:delta-core"), + maven_artifact("org.apache.kafka:kafka-clients"), + maven_artifact("org.apache.thrift:libthrift"), + scala_artifact("org.json4s:json4s-core"), + scala_artifact("org.json4s:json4s-jackson"), + scala_artifact("org.json4s:json4s-ast"), + scala_artifact("org.rogach:scallop"), + scala_artifact("org.scala-lang.modules:scala-collection-compat"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + maven_artifact("com.google.code.gson:gson"), + ], +) + +scala_library( + name = "test-lib", + srcs = glob(["src/test/scala/ai/chronon/spark/test/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + "spark", + "//aggregator", + "//aggregator:test-lib", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + maven_artifact("com.google.code.gson:gson"), + maven_artifact("org.apache.thrift:libthrift"), + maven_artifact("com.google.guava:guava"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + scala_artifact("org.rogach:scallop"), + maven_artifact("commons.io:commons-io"), + maven_artifact("commons.lang:commons-lang"), + scala_artifact("org.scalatest:scalatest-matchers-core"), + scala_artifact("org.scalatest:scalatest-core"), + maven_artifact("org.scalatest:scalatest-compatible"), + scala_artifact("org.scalatest:scalatest-shouldmatchers"), + scala_artifact("org.scalactic:scalactic"), + scala_artifact("org.scala-lang.modules:scala-java8-compat"), + scala_artifact("com.fasterxml.jackson.module:jackson-module-scala"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + ], +) + +scala_test_suite( + name = "test", + srcs = glob(["src/test/scala/ai/chronon/spark/test/**/*.scala"]), + visibility = ["//visibility:public"], + deps = [ + ":spark", + ":test-lib", + "//aggregator", + "//aggregator:test-lib", + "//api:api-lib", + "//api:api-models", + "//online", + "//third_party/java/spark:spark-exec", + scala_artifact("org.scala-lang.modules:scala-java8-compat"), + maven_artifact("junit:junit"), + maven_artifact("com.novocode:junit-interface"), + maven_artifact("commons.io:commons-io"), + maven_artifact("com.google.guava:guava"), + scala_artifact("com.fasterxml.jackson.module:jackson-module-scala"), + maven_artifact("com.google.code.gson:gson"), + scala_artifact("org.rogach:scallop"), + scala_artifact("org.scalatestplus:mockito-3-4"), + maven_artifact("org.mockito:mockito-core"), + maven_artifact("org.slf4j:slf4j-api"), + maven_artifact("org.slf4j:slf4j-log4j12"), + ], +) + +jvm_binary( + name = "spark-assembly", + deploy_env = ["//third_party/java/spark:spark"], + main_class = "ai.chronon.spark.Driver", + runtime_deps = [":spark"], +) diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala index add4dc300..2660cc21f 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/GroupBy.scala @@ -23,17 +23,16 @@ import ai.chronon.api.{Row => _, _} import ai.chronon.online._ import ai.chronon.api.Extensions._ import ai.chronon.online.Extensions.ChrononStructTypeOps -import ai.chronon.spark.GenericRowHandler +import ai.chronon.spark.{EncoderUtil, GenericRowHandler} import com.google.gson.Gson import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery, Trigger} import java.time.format.DateTimeFormatter import java.time.{Instant, ZoneId, ZoneOffset} import java.util.Base64 import scala.collection.JavaConverters._ -import scala.concurrent.duration.{DurationInt} +import scala.concurrent.duration.DurationInt class GroupBy(inputStream: DataFrame, session: SparkSession, @@ -129,7 +128,7 @@ class GroupBy(inputStream: DataFrame, Seq(mutation.after, mutation.before) .filter(_ != null) .map(SparkConversions.toSparkRow(_, streamDecoder.schema, GenericRowHandler.func).asInstanceOf[Row]) - }(RowEncoder(streamSchema)) + }(EncoderUtil(streamSchema)) des.createOrReplaceTempView(streamingTable) diff --git a/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala b/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala index c0e01b03c..8631f4dcc 100644 --- a/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala +++ b/spark/src/main/scala/ai/chronon/spark/streaming/JoinSourceRunner.scala @@ -22,10 +22,9 @@ import ai.chronon.api._ import ai.chronon.online.Fetcher.Request import ai.chronon.online.KVStore.PutRequest import ai.chronon.online._ -import ai.chronon.spark.{GenericRowHandler, TableUtils} +import ai.chronon.spark.{GenericRowHandler, TableUtils, EncoderUtil} import com.google.gson.Gson import org.apache.spark.api.java.function.{MapPartitionsFunction, VoidFunction2} -import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession} @@ -259,7 +258,7 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map Seq(mutation.after, mutation.before) .filter(_ != null) .map(SparkConversions.toSparkRow(_, streamDecoder.schema, GenericRowHandler.func).asInstanceOf[Row]) - }(RowEncoder(streamSchema)) + }(EncoderUtil(streamSchema)) dataStream.copy(df = des) } @@ -342,7 +341,7 @@ class JoinSourceRunner(groupByConf: api.GroupBy, conf: Map[String, String] = Map val schemas = buildSchemas(leftSchema) val joinChrononSchema = SparkConversions.toChrononSchema(schemas.joinSchema) - val joinEncoder: Encoder[Row] = RowEncoder(schemas.joinSchema) + val joinEncoder: Encoder[Row] = EncoderUtil(schemas.joinSchema) val joinFields = schemas.joinSchema.fieldNames val leftColumns = schemas.leftSourceSchema.fieldNames logger.info(s""" diff --git a/spark/src/main/spark-3_5_plus/ai/chronon/spark/BUILD b/spark/src/main/spark-3_5_plus/ai/chronon/spark/BUILD new file mode 100644 index 000000000..c2ef379a2 --- /dev/null +++ b/spark/src/main/spark-3_5_plus/ai/chronon/spark/BUILD @@ -0,0 +1,3 @@ +exports_files( + ["EncoderUtil.scala"], +) diff --git a/spark/src/main/spark-3_5_plus/ai/chronon/spark/EncoderUtil.scala b/spark/src/main/spark-3_5_plus/ai/chronon/spark/EncoderUtil.scala new file mode 100644 index 000000000..e86fc06cf --- /dev/null +++ b/spark/src/main/spark-3_5_plus/ai/chronon/spark/EncoderUtil.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2023 The Chronon Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.chronon.spark + +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Encoder, Encoders, Row} + +import scala.collection.Seq + +object EncoderUtil { + + def apply(structType: StructType): Encoder[Row] = Encoders.row(structType) + +} diff --git a/spark/src/main/spark-default/ai/chronon/spark/BUILD b/spark/src/main/spark-default/ai/chronon/spark/BUILD new file mode 100644 index 000000000..c2ef379a2 --- /dev/null +++ b/spark/src/main/spark-default/ai/chronon/spark/BUILD @@ -0,0 +1,3 @@ +exports_files( + ["EncoderUtil.scala"], +) diff --git a/spark/src/main/spark-default/ai/chronon/spark/EncoderUtil.scala b/spark/src/main/spark-default/ai/chronon/spark/EncoderUtil.scala new file mode 100644 index 000000000..3bff26a49 --- /dev/null +++ b/spark/src/main/spark-default/ai/chronon/spark/EncoderUtil.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2023 The Chronon Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ai.chronon.spark + +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Encoder, Row} + +object EncoderUtil { + + def apply(structType: StructType): Encoder[Row] = RowEncoder(structType) + +} diff --git a/third_party/java/spark/BUILD b/third_party/java/spark/BUILD new file mode 100644 index 000000000..72fa9d661 --- /dev/null +++ b/third_party/java/spark/BUILD @@ -0,0 +1,112 @@ +package(default_visibility = ["//visibility:public"]) + +load("//jvm:defs.bzl", "get_jars_for_repo") + +SPARK_JARS = [ + scala_jar( + name = "spark-core", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-catalyst", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sql", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-hive", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sketch", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-streaming", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-tags", + org = "org.apache.spark", + ), + jar( + name = "scala-library", + org = "org.scala-lang", + ), + scala_jar( + name = "spark-unsafe", + org = "org.apache.spark", + ), + jar( + name = "avro", + org = "org.apache.avro", + ), + jar( + name = "hive-metastore", + org = "org.apache.hive", + ), + jar( + name = "hive-exec", + org = "org.apache.hive", + ), + jar( + name = "hadoop-common", + org = "org.apache.hadoop", + ), + jar( + name = "jackson-core", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "jackson-annotations", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "jackson-databind", + org = "com.fasterxml.jackson.core", + ), + jar( + name = "kryo_shaded", + org = "com.esotericsoftware", + ), + scala_jar( + name = "json4s-jackson", + org = "org.json4s", + ), + jar( + name = "commons-lang3", + org = "org.apache.commons", + ), +] + +SPARK_3_5_JARS = SPARK_JARS + [ + scala_jar( + name = "spark-common-utils", + org = "org.apache.spark", + ), + scala_jar( + name = "spark-sql-api", + org = "org.apache.spark", + ), +] + +java_library( + name = "spark-exec", + visibility = ["//visibility:public"], + exports = select({ + "//conditions:default": get_jars_for_repo("spark_3_2", SPARK_JARS), + "//tools/flags/spark:spark_3_1": get_jars_for_repo("spark_3_1", SPARK_JARS), + "//tools/flags/spark:spark_3_2": get_jars_for_repo("spark_3_2", SPARK_JARS), + "//tools/flags/spark:spark_3_5": get_jars_for_repo("spark_3_5", SPARK_3_5_JARS), + }), +) + +java_binary( + name = "spark", + main_class = "None", #hack + runtime_deps = [ + "//third_party/java/spark:spark-exec", + ], +) diff --git a/third_party/scala/BUILD b/third_party/scala/BUILD new file mode 100644 index 000000000..3e410b432 --- /dev/null +++ b/third_party/scala/BUILD @@ -0,0 +1,42 @@ +load("@io_bazel_rules_scala_config//:config.bzl", "SCALA_MAJOR_VERSION", "SCALA_VERSION") + +package(default_visibility = ["//visibility:public"]) + +alias( + name = "combinators", + actual = scala_artifact("org.scala-lang.modules:scala-parser-combinators"), +) + +alias( + name = "reflect", + actual = maven_artifact( + "org.scala-lang:scala-reflect", + repository_name = "scala_" + SCALA_MAJOR_VERSION, + ), +) + +alias( + name = "scala-compiler", + actual = maven_artifact( + "org.scala-lang:scala-compiler", + repository_name = "scala_" + SCALA_MAJOR_VERSION, + ), +) + +alias( + name = "scalactic", + actual = scala_artifact("org.scalactic:scalactic"), +) + +alias( + name = "scalatest", + actual = scala_artifact("org.scalatest:scalatest"), +) + +alias( + name = "scala-library", + actual = maven_artifact( + "org.scala-lang:scala-library", + repository_name = "scala_" + SCALA_MAJOR_VERSION, + ), +) diff --git a/tools/BUILD b/tools/BUILD new file mode 100644 index 000000000..e69de29bb diff --git a/tools/build_rules/BUILD b/tools/build_rules/BUILD new file mode 100644 index 000000000..e69de29bb diff --git a/tools/build_rules/common.bzl b/tools/build_rules/common.bzl new file mode 100644 index 000000000..23dcdfe0d --- /dev/null +++ b/tools/build_rules/common.bzl @@ -0,0 +1,18 @@ +load("@io_bazel_rules_scala_config//:config.bzl", "SCALA_MAJOR_VERSION") +load("@rules_jvm_external//:defs.bzl", "artifact") + +def jar(org, name, rev = None, classifier = None): + if rev: + fail("Passing rev is no longer supported in jar() and scala_jar()") + rev = "" + if classifier: + return "{}:{}:jar:{}:{}".format(org, name, classifier, rev) + else: + return "{}:{}:{}".format(org, name, rev) + +def scala_jar(org, name, rev = None, classifier = None): + name = "{}_{}".format(name, SCALA_MAJOR_VERSION) + return jar(org, name, rev, classifier) + +def exclude(org, name): + return "@maven//:" + org + ":" + name diff --git a/tools/build_rules/jvm_binary.bzl b/tools/build_rules/jvm_binary.bzl new file mode 100644 index 000000000..2c4f96991 --- /dev/null +++ b/tools/build_rules/jvm_binary.bzl @@ -0,0 +1,54 @@ +load("@rules_java//java:defs.bzl", "java_binary", "java_library") +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_binary", "scala_library") + +def jvm_binary( + name, + srcs = [], + deps = [], + runtime_deps = [], + services = {}, + tags = None, + main_class = None, + visibility = None, + create_executable = True, + testonly = None, + # All other flags are passed to java_binary + **kwargs): + has_scala_srcs = False + has_java_srcs = False + for src in srcs: + if src.endswith(".scala"): + has_scala_srcs = True + if src.endswith(".java"): + has_java_srcs = True + if has_scala_srcs and has_java_srcs: + fail("Cannot have scala and java sources in same jvm_binary") + + lib_name = name + "_lib" + if has_scala_srcs: + scala_library( + name = lib_name, + srcs = srcs, + deps = deps, + runtime_deps = runtime_deps, + tags = tags, + ) + else: + java_library( + name = lib_name, + srcs = srcs, + deps = deps, + runtime_deps = runtime_deps, + tags = tags, + testonly = testonly, + ) + + java_binary( + name = name, + runtime_deps = [lib_name], + tags = tags, + main_class = main_class, + create_executable = create_executable, + testonly = testonly, + **kwargs + ) diff --git a/tools/build_rules/maven_artifact.bzl b/tools/build_rules/maven_artifact.bzl new file mode 100644 index 000000000..b8be9d4be --- /dev/null +++ b/tools/build_rules/maven_artifact.bzl @@ -0,0 +1,42 @@ +load("@rules_jvm_external//:defs.bzl", _rje_artifact = "artifact") +load("@io_bazel_rules_scala_config//:config.bzl", "SCALA_MAJOR_VERSION") + +def _safe_name(coord): + return coord.replace(":", "_").replace(".", "_").replace("-", "_") + +def maven_artifact(coord, repository_name = "maven"): + """ + Helper macro to translate Maven coordinates into Bazel deps. Example: + + java_library( + name = "foo", + srcs = ["Foo.java"], + deps = [maven_artifact("com.google.guava:guava")], + ) + + Arguments: + repository_name: If provided, always fetch from this Maven repo instead of determining + the repo automatically. Be careful when using this as Bazel will not prevent multiple + jars from providing the same class on the classpath, in which case the order of "deps" + will determine which one "wins". + """ + if repository_name: + return _rje_artifact(coord, repository_name = repository_name) + + safe_name = _safe_name(coord) + + if not native.existing_rule(safe_name): + native.java_library( + name = safe_name, + exports = [coord], + visibility = ["//visibility:private"], + tags = ["manual"], + ) + return safe_name + +def scala_artifact(coord, repository_name = "maven"): + """ + Same as "maven_artifact" but appends the current Scala version to the Maven coordinate. + """ + full_coord = coord + "_" + SCALA_MAJOR_VERSION + return maven_artifact(full_coord, repository_name) diff --git a/tools/build_rules/prelude_bazel b/tools/build_rules/prelude_bazel new file mode 100644 index 000000000..eb693d0d1 --- /dev/null +++ b/tools/build_rules/prelude_bazel @@ -0,0 +1,13 @@ +""" + Import the translation layer here. +""" + +load( + "//tools/build_rules:common.bzl", + "jar", + "scala_jar") + +load("@rules_java//java:defs.bzl", "java_library","java_binary") +load("@io_bazel_rules_scala//scala:scala.bzl", "scala_library", "scala_binary","scala_test_suite") +load("//tools/build_rules:maven_artifact.bzl", "maven_artifact", "scala_artifact") +load("//tools/build_rules:jvm_binary.bzl", "jvm_binary") diff --git a/tools/build_rules/python/BUILD.bazel b/tools/build_rules/python/BUILD.bazel new file mode 100644 index 000000000..5f5847033 --- /dev/null +++ b/tools/build_rules/python/BUILD.bazel @@ -0,0 +1,4 @@ +exports_files([ + "pytest_wrapper.py", + ".pylintrc", +]) diff --git a/tools/build_rules/python/pytest_suite.bzl b/tools/build_rules/python/pytest_suite.bzl new file mode 100644 index 000000000..ee02d3db8 --- /dev/null +++ b/tools/build_rules/python/pytest_suite.bzl @@ -0,0 +1,24 @@ +load("@rules_python//python:defs.bzl", "py_test") +load("@pypi//:requirements.bzl", "requirement") + +def pytest_suite(name, srcs, deps = [], args = [], data = [], **kwargs): + """ + Call pytest_suite + """ + py_test( + name = name, + srcs = [ + "//tools/build_rules/python:pytest_wrapper.py", + ] + srcs, + main = "//tools/build_rules/python:pytest_wrapper.py", + args = ["--capture=no"] + ["$(location :%s)" % x for x in srcs] + args, + python_version = "PY3", + srcs_version = "PY3", + deps = deps + [ + requirement("pytest"), + # requirement("pytest-black"), + # requirement("pytest-pylint"), + ], + data = data, + **kwargs + ) diff --git a/tools/build_rules/python/pytest_wrapper.py b/tools/build_rules/python/pytest_wrapper.py new file mode 100644 index 000000000..705ba143a --- /dev/null +++ b/tools/build_rules/python/pytest_wrapper.py @@ -0,0 +1,6 @@ +import sys +import pytest + +# if using 'bazel test ...' +if __name__ == "__main__": + sys.exit(pytest.main(sys.argv[1:])) \ No newline at end of file diff --git a/tools/build_rules/thrift/BUILD b/tools/build_rules/thrift/BUILD new file mode 100644 index 000000000..e69de29bb diff --git a/tools/build_rules/thrift/thrift.bzl b/tools/build_rules/thrift/thrift.bzl new file mode 100644 index 000000000..e8ab15b8d --- /dev/null +++ b/tools/build_rules/thrift/thrift.bzl @@ -0,0 +1,151 @@ +load("@rules_python//python:defs.bzl", "py_library") + +def _thrift_java_library_impl(ctx): + thrift_path = ctx.attr.thrift_binary + outputs = [] + commands = [] + + for src in ctx.files.srcs: + # Generate unique output filenames + basename = src.basename.replace(".thrift", "") + output = ctx.actions.declare_file("{}_{}-java.srcjar".format(ctx.attr.name, basename)) + outputs.append(output) + + # Temporary directory for generated Java code + gen_java_dir = "{}-tmp".format(output.path) + + # Command for generating Java from Thrift + command = """ + mkdir -p {gen_java_dir} && \ + {thrift_path} -strict -gen java -out {gen_java_dir} {src} && \ + find {gen_java_dir} -exec touch -t 198001010000 {{}} + && \ + jar cf {gen_java} -C {gen_java_dir} . && \ + rm -rf {gen_java_dir} + """.format( + gen_java_dir = gen_java_dir, + thrift_path = thrift_path, + src = src.path, + gen_java = output.path, + ) + + # Adjust command for Windows if necessary + if ctx.configuration.host_path_separator == ";": + command = command.replace("&&", "&").replace("/", "\\") + + commands.append(command) + + # Combine all commands into a single shell command + combined_command = " && ".join(commands) + + ctx.actions.run_shell( + outputs = outputs, + inputs = ctx.files.srcs, + command = combined_command, + progress_message = "Generating Java code from {} Thrift files".format(len(ctx.files.srcs)), + ) + + return [DefaultInfo(files = depset(outputs))] + +_thrift_java_library = rule( + implementation = _thrift_java_library_impl, + attrs = { + "srcs": attr.label_list( + allow_files = [".thrift"], + mandatory = True, + doc = "List of .thrift source files", + ), + "thrift_binary": attr.string(), + }, +) + +def thrift_java_library(name, srcs, **kwargs): + _thrift_java_library( + name = name, + srcs = srcs, + thrift_binary = select({ + "@platforms//os:macos": "/opt/homebrew/bin/thrift", + "//conditions:default": "/usr/local/bin/thrift", + }), + **kwargs + ) + +def _thrift_python_library_impl(ctx): + thrift_binary = ctx.attr.thrift_binary + all_outputs = [] + commands = [] + + for src in ctx.files.srcs: + # Get base name without .thrift extension + base_name = src.basename.replace(".thrift", "") + + # Convert namespace to directory structure + namespace_dir = ctx.attr.namespace.replace(".", "/") + + # Declare output directory matching the namespace structure + output_dir = "{}/{}".format(namespace_dir, base_name) + main_py = ctx.actions.declare_file("{}/{}.py".format(output_dir, base_name)) + constants_py = ctx.actions.declare_file("{}/constants.py".format(output_dir)) + ttypes_py = ctx.actions.declare_file("{}/ttypes.py".format(output_dir)) + module_init = ctx.actions.declare_file("{}/__init__.py".format(output_dir)) + + file_outputs = [main_py, constants_py, ttypes_py, module_init] + all_outputs.extend(file_outputs) + + # Command to generate files in the correct namespace + command = """ + mkdir -p {output_dir} && \ + {thrift_binary} --gen py:package_prefix={namespace}. -out $(dirname {output_dir}) {src} && \ + touch {main_py} {constants_py} {ttypes_py} {module_init} + """.format( + thrift_binary = thrift_binary, + namespace = ctx.attr.namespace, + output_dir = main_py.dirname, + src = src.path, + main_py = main_py.path, + constants_py = constants_py.path, + ttypes_py = ttypes_py.path, + module_init = module_init.path, + ) + commands.append(command) + + combined_command = " && ".join(commands) + + print("commands: {}".format(combined_command)) + + # Generate files + ctx.actions.run_shell( + outputs = all_outputs, + inputs = ctx.files.srcs, + command = combined_command, + progress_message = "Generating Python code from Thrift files: %s" % ", ".join([src.path for src in ctx.files.srcs]), + ) + + return [DefaultInfo(files = depset(all_outputs))] + +_thrift_python_library_gen = rule( + implementation = _thrift_python_library_impl, + attrs = { + "srcs": attr.label_list(allow_files = [".thrift"]), + "thrift_binary": attr.string(), + "namespace": attr.string(), + }, +) + +def thrift_python_library(name, srcs, namespace, visibility = None): + """Generates Python code from Thrift files with correct namespace structure.""" + _thrift_python_library_gen( + name = name + "_gen", + srcs = srcs, + namespace = namespace, + thrift_binary = select({ + "@platforms//os:macos": "/opt/homebrew/bin/thrift", + "//conditions:default": "/usr/local/bin/thrift", + }), + ) + + py_library( + name = name, + srcs = [":" + name + "_gen"], + imports = ["."], + visibility = visibility, + ) diff --git a/tools/build_rules/utils.bzl b/tools/build_rules/utils.bzl new file mode 100644 index 000000000..f6550084b --- /dev/null +++ b/tools/build_rules/utils.bzl @@ -0,0 +1,30 @@ +def map(f, items): + return [f(x) for x in items] + +def _is_list(x): + return type(x) == "list" + +def flat_map(f, items): + result = [] + for x in items: + fx = f(x) + result.extend(fx) if _is_list(fx) else result.append(fx) + return result + +def identity(x): + return x + +def flatten(items, max_depth = 1): + """Flatten a list of items. + see utils_tests.bzl for examples + + Args: + items: the list to flatten + max_depth: The maximum depth to flatten to + Returns: + a flattened list of items + """ + result = items + for i in range(max_depth): + result = flat_map(identity, result) + return result diff --git a/tools/flags/BUILD b/tools/flags/BUILD new file mode 100644 index 000000000..e69de29bb diff --git a/tools/flags/spark/BUILD b/tools/flags/spark/BUILD new file mode 100644 index 000000000..f6fd81fe1 --- /dev/null +++ b/tools/flags/spark/BUILD @@ -0,0 +1,14 @@ +config_setting( + name = "spark_3_1", + define_values = {"spark_version": "3.1"}, +) + +config_setting( + name = "spark_3_2", + define_values = {"spark_version": "3.2"}, +) + +config_setting( + name = "spark_3_5", + define_values = {"spark_version": "3.5"}, +) diff --git a/api/py/tox.ini b/tox.ini similarity index 68% rename from api/py/tox.ini rename to tox.ini index 54f628e1b..a0437494a 100644 --- a/api/py/tox.ini +++ b/tox.ini @@ -4,20 +4,20 @@ envlist = py3 skipsdist = True [testenv] -deps = -rrequirements/dev.txt +deps = -rapi/py/requirements/dev.txt allowlist_externals = rm -setenv = PYTHONPATH = {toxinidir}:{toxinidir}/test/sample +setenv = PYTHONPATH = {toxinidir}/api/py:{toxinidir}/api/py/test/sample # Run a compile test run. commands_pre = rm -rf test/sample/production/joins/sample_team rm -rf test/sample/production/group_bys/sample_team # Materialized files are verified in release script - python-api-build.sh - python ai/chronon/repo/compile.py \ - --chronon_root=test/sample \ + python api/py/ai/chronon/repo/compile.py \ + --chronon_root=api/py/test/sample \ --input_path=joins/sample_team/ \ --force-overwrite commands = - pytest {posargs:test/} \ + pytest {posargs:api/py/test/} \ --cov=ai/ \ --cov-report term \ --cov-report html