Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
179 commits
Select commit Hold shift + click to select a range
e295162
add test for streaming from fhir_receiver
imranq2 Jul 31, 2024
c7d5976
add test for streaming from fhir_receiver
imranq2 Jul 31, 2024
6ce4baf
switch to generator
imranq2 Jul 31, 2024
a2af132
update helix.fhir.client.sdk
imranq2 Aug 2, 2024
19fcf1d
update helix.fhir.client.sdk
imranq2 Aug 2, 2024
fe07518
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
9bce79e
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
78753f7
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
96e201f
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
1e76f58
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
b2a3a51
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
644a271
use streaming APIs from helix.fhir.client.sdk
imranq2 Aug 2, 2024
b336496
got test working
imranq2 Aug 2, 2024
a76a24d
got test working
imranq2 Aug 2, 2024
ff6eb43
add comments
imranq2 Aug 2, 2024
a526734
add comments
imranq2 Aug 2, 2024
82d25d8
fix tests
imranq2 Aug 2, 2024
7960fba
added streaming tests
imranq2 Aug 2, 2024
bf85614
added streaming tests
imranq2 Aug 2, 2024
4f9a62a
switch to ndjson test file
imranq2 Aug 2, 2024
ee043a7
switch to ndjson test file
imranq2 Aug 2, 2024
b80961c
switch to ndjson test file
imranq2 Aug 5, 2024
5190b77
update helix.fhir.client.sdk
imranq2 Aug 5, 2024
19366c9
add try catch around merge
imranq2 Aug 5, 2024
a0217f0
add try catch around merge
imranq2 Aug 5, 2024
0e6a534
add try catch around merge
imranq2 Aug 5, 2024
6f2fbee
generate dict properly
imranq2 Aug 5, 2024
9d14ba7
convert issue to string
imranq2 Aug 5, 2024
d3d2432
convert issue to string
imranq2 Aug 6, 2024
4a76bd4
support passing in source_view and schema
imranq2 Aug 6, 2024
839e438
support passing in source_view and schema
imranq2 Aug 6, 2024
2842ba2
change to reading as text
imranq2 Aug 6, 2024
0a0cbce
change to reading as text
imranq2 Aug 6, 2024
ce583df
change to reading as text
imranq2 Aug 6, 2024
41fe1d3
change to reading as text
imranq2 Aug 6, 2024
174dfd9
add check for "desc" in exception
imranq2 Aug 6, 2024
1e67098
return error in ElasticSearchResult
imranq2 Aug 6, 2024
09466fa
return error in ElasticSearchResult
imranq2 Aug 6, 2024
b35b45a
add address_standardizer
imranq2 Aug 6, 2024
00bf547
update package
imranq2 Aug 6, 2024
acf6202
fix tests
imranq2 Aug 6, 2024
876cf7f
fix tests
imranq2 Aug 6, 2024
b697556
refactor address_standardization.py
imranq2 Aug 6, 2024
c0ac3a9
refactor address_standardization.py
imranq2 Aug 6, 2024
28f0828
refactor address_standardization.py
imranq2 Aug 6, 2024
a8339b4
refactor address_standardization.py
imranq2 Aug 6, 2024
e6f558c
add dynamic_class_loader.py
imranq2 Aug 6, 2024
f4a757e
add dynamic_class_loader.py
imranq2 Aug 6, 2024
2381d7a
add dynamic_class_loader.py
imranq2 Aug 6, 2024
c314440
add dynamic_class_loader.py
imranq2 Aug 6, 2024
a728436
change to StdAddress
imranq2 Aug 6, 2024
034ccac
change to StdAddress
imranq2 Aug 6, 2024
438c092
change to StdAddress
imranq2 Aug 6, 2024
c9c7e00
refactor address_standardization.py
imranq2 Aug 6, 2024
2941240
revert to previous address standardized
imranq2 Aug 6, 2024
b200b8a
add test for census standardize
imranq2 Aug 6, 2024
2311749
use bulk api
imranq2 Aug 6, 2024
4229ba1
fixes from testing in databricks
imranq2 Aug 7, 2024
5151fd2
fixes from testing in databricks
imranq2 Aug 7, 2024
cc51038
fixes from testing in databricks
imranq2 Aug 7, 2024
0e0c78b
fixes from testing in databricks
imranq2 Aug 7, 2024
0fd778b
add useaddress package
imranq2 Aug 7, 2024
0a90781
parse address using usaddress
imranq2 Aug 7, 2024
79ca1e6
use AsyncPandasColumnUDF
imranq2 Aug 7, 2024
8c11538
create v2 of helix.geolocation
imranq2 Aug 7, 2024
4b53c70
convert census_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
287d72d
convert census_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
2faebbc
convert melissa_standardizing_vendor.py to use aiohttp and async
imranq2 Aug 7, 2024
9eaef0f
add async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
a162fb3
convert fhir_receiver to use async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
016568a
convert fhir_sender to use async_pandas_dataframe_udf.py
imranq2 Aug 7, 2024
1eace1f
add back error checking
imranq2 Aug 7, 2024
aad6586
add back error checking
imranq2 Aug 7, 2024
9fcb08b
convert fhir_receiver_processor.py to async
imranq2 Aug 7, 2024
fd29060
convert fhir_receiver_processor.py to async
imranq2 Aug 7, 2024
a8c8232
use AsyncHelper.run_in_event_loop
imranq2 Aug 7, 2024
430c40f
fix elasticsearch_processor test
imranq2 Aug 7, 2024
c1fda65
fix fhir_receiver_processor.py
imranq2 Aug 7, 2024
86499c6
fix fhir_sender_processor.py
imranq2 Aug 7, 2024
3b8c8bf
fix fhir_sender_processor.py
imranq2 Aug 7, 2024
c99b6b8
fix infinite loop
imranq2 Aug 7, 2024
b0ea1f5
fix infinite loop
imranq2 Aug 7, 2024
f08fd0f
remove census_standardizing_vendor.py from v1
imranq2 Aug 7, 2024
9372892
add fhir_helpers_async
imranq2 Aug 7, 2024
4b933c7
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
49c38a4
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
29c189e
refactor FhirMergeResponseItem
imranq2 Aug 7, 2024
5901925
fix test
imranq2 Aug 7, 2024
cea07f8
add aiohttp package
imranq2 Aug 7, 2024
7fb7bdd
pass well_known_url
imranq2 Aug 7, 2024
1ccc86c
get access token async
imranq2 Aug 7, 2024
c4e3d2a
remove print statements
imranq2 Aug 7, 2024
320322b
add tests using real fhir server
imranq2 Aug 8, 2024
9cb7dc9
add tests using real fhir server
imranq2 Aug 8, 2024
b5a6ee8
add tests using real fhir server
imranq2 Aug 8, 2024
1fe199d
use transform_async
imranq2 Aug 8, 2024
ec8f875
add transform_async to framework_transformer.py
imranq2 Aug 8, 2024
a083093
rename AsyncHelper.run()
imranq2 Aug 8, 2024
e75bfac
change elasticsearch_sender.py to use async
imranq2 Aug 8, 2024
d8f7a18
change elasticsearch_sender.py to use async
imranq2 Aug 8, 2024
c32fc92
try another technique for waiting on async function
imranq2 Aug 8, 2024
58a579e
get a working solution for nested async
imranq2 Aug 8, 2024
4cfceff
get a working solution for nested async
imranq2 Aug 8, 2024
7ccc3bd
add better error message in AsyncHelper
imranq2 Aug 8, 2024
d064e25
use async in fhir_sender
imranq2 Aug 8, 2024
5213b54
update github runners
imranq2 Aug 8, 2024
8f32175
add print statements in test
imranq2 Aug 8, 2024
2a613ca
update docker-compose.yml
imranq2 Aug 8, 2024
a2889ba
update packages
imranq2 Aug 8, 2024
431c322
use ES async api
imranq2 Aug 8, 2024
96a37fd
minor
imranq2 Aug 8, 2024
5995b20
make vendor_specific_to_std be an abstract method
imranq2 Aug 8, 2024
36263e0
make other methods abstract
imranq2 Aug 8, 2024
b41a46e
rework address_standardization.py to use generic types
imranq2 Aug 8, 2024
ea56fc2
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
9d12147
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
c9c7014
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
e584895
rework address_standardization.py to use generic types
imranq2 Aug 9, 2024
cb0d892
fix data_class_loader.py
imranq2 Aug 9, 2024
0e1fc6e
switch to Pydantic
imranq2 Aug 9, 2024
72c2f78
switch to Pydantic
imranq2 Aug 9, 2024
3f329f6
fixes after switch to Pydantic
imranq2 Aug 9, 2024
e66f283
fixes after switch to Pydantic
imranq2 Aug 9, 2024
6c29d44
fix test
imranq2 Aug 9, 2024
72e52aa
fix test
imranq2 Aug 9, 2024
848b130
fix test
imranq2 Aug 9, 2024
8b4ceac
fix test
imranq2 Aug 9, 2024
bc0e89f
use https
imranq2 Aug 9, 2024
4062a6e
fix graph test
imranq2 Aug 9, 2024
60b6899
fix graph test
imranq2 Aug 9, 2024
bcf95d0
create GetBatchError for errors in batch
imranq2 Aug 9, 2024
f5000bd
segt use_data_streaming to be the default in fhir receiver v2
imranq2 Aug 9, 2024
35ff3da
separate error and non error resources
imranq2 Aug 9, 2024
276ae0f
separate error and non error resources
imranq2 Aug 9, 2024
ff0bee2
separate error and non error resources
imranq2 Aug 9, 2024
0546224
separate error and non error resources
imranq2 Aug 9, 2024
5f86a80
separate error and non error resources
imranq2 Aug 9, 2024
62a1d39
fix test
imranq2 Aug 9, 2024
4b635bb
fix test
imranq2 Aug 10, 2024
20226dd
update fhir client sdk
imranq2 Aug 10, 2024
dec23d0
add test for pandas dataframe udf
imranq2 Aug 10, 2024
1863395
add test for pandas dataframe udf
imranq2 Aug 10, 2024
3ace211
add test for pandas dataframe udf
imranq2 Aug 10, 2024
2886947
add list of ids
imranq2 Aug 10, 2024
425c27e
add list of ids
imranq2 Aug 10, 2024
5af1f36
add list of ids
imranq2 Aug 10, 2024
f83624a
add list of ids
imranq2 Aug 10, 2024
24809a5
add list of ids
imranq2 Aug 11, 2024
cfcfa66
pass partition and chunk index to panda functions
imranq2 Aug 11, 2024
b1e5bff
pass partition and chunk index to panda functions
imranq2 Aug 11, 2024
3687ff3
rename HandlePandasBatchFunction
imranq2 Aug 11, 2024
68d77c2
fix chunk_index
imranq2 Aug 11, 2024
fa8d432
print range
imranq2 Aug 11, 2024
ba6ee78
fix batches of size
imranq2 Aug 11, 2024
a79b874
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
b164d82
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
1884ba9
create a base class for async_pandas_dataframe_udf.py
imranq2 Aug 11, 2024
8b0e24e
fix column test
imranq2 Aug 11, 2024
cc84e46
rename column class
imranq2 Aug 11, 2024
bd1aa79
make return type of abstract base class generic
imranq2 Aug 11, 2024
737b837
add type aliases
imranq2 Aug 11, 2024
ef5ffae
add type aliases
imranq2 Aug 11, 2024
f1bcda8
add type aliases
imranq2 Aug 11, 2024
9bcc7cd
add type aliases
imranq2 Aug 11, 2024
467d7dc
fix test
imranq2 Aug 11, 2024
b09fd34
if LOGLEVEL is set to DEBUG then write out partitions as we process them
imranq2 Aug 11, 2024
f5f78ff
show hostname when available
imranq2 Aug 12, 2024
d119e91
pass timeout for elastic search calls
imranq2 Aug 12, 2024
a80cd49
pass timeout for elastic search calls
imranq2 Aug 12, 2024
fa38794
add documentation
imranq2 Aug 12, 2024
4796568
share code for repartition into SparkPartitionHelper
imranq2 Aug 12, 2024
0dde707
add repartition code to address_standardization.py
imranq2 Aug 12, 2024
2af2a2d
add v2 of framework_partitioner.py
imranq2 Aug 12, 2024
86d3264
remove partitioning from fhir_sender, fhir_receiver and elasticsearch…
imranq2 Aug 12, 2024
f418cb0
remove partitioning from fhir_sender, fhir_receiver and elasticsearch…
imranq2 Aug 12, 2024
58a2d95
fix warning about ClosedConnection
imranq2 Aug 12, 2024
ccebb9a
fix warning about ClosedConnection
imranq2 Aug 12, 2024
94a3894
add calculate_automatically mode to framework_partitioner.py
imranq2 Aug 12, 2024
a942070
fix test
imranq2 Aug 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
.DS_Store
.github
.idea
.mypy_cache
.pytest_cache
ADR
notebooks
tmp
docsrc
docs
spf_tests
tests
*/test/*
*.yml
*.md
metastore_db

.git
.svn
.cvs
.hg
!README*.md

.env
spark-events
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ jobs:

steps:
# Checks-out your repository
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.10'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install dependencies
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ run-pre-commit: setup-pre-commit
update: down Pipfile.lock setup-pre-commit ## Updates all the packages using Pipfile
docker compose run --rm --name spf_pipenv dev pipenv sync --dev && \
make devdocker && \
make pipenv-setup
make pipenv-setup && \
make up

.PHONY:tests
tests:
Expand Down
24 changes: 17 additions & 7 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ bounded-pool-executor = ">=0.0.3"
# fastjsonschema is needed for validating JSON
fastjsonschema= ">=2.18.0"
# helix.fhir.client.sdk is needed for interacting with FHIR servers
"helix.fhir.client.sdk" = ">=2.0.8"
"helix.fhir.client.sdk" = ">=2.0.16"
# opensearch-py is needed for interacting with OpenSearch
opensearch-py= ">=1.1.0"
opensearch-py= { extras = ['async'], version = ">=2.6.0" }
# pyathena is needed for interacting with Athena in AWS
pyathena = ">2.14.0"
# spark-nlp is needed for natural language processing
Expand All @@ -60,6 +60,14 @@ pandas = ">=2.2.2"
numexpr = ">=2.8.4"
# bottleneck is needed for working with numerical data. pandas requires this minimum version.
bottleneck = ">=1.3.6"
# structlog is needed for structured logging
structlog = ">=23.1.0"
# usaddress is needed for parsing street addresses
"usaddress"=">=0.5.10" # for parsing street addresses
# usaddress-scourgify is needed for normalizing addresses
"usaddress-scourgify"=">=0.6.0" # for normalizing addresses
# aiohttp is needed for making HTTP requests asynchronously
aiohttp = ">=3"

[dev-packages]
# setuptools is needed for building the package
Expand All @@ -73,11 +81,11 @@ pre-commit=">=3.7.1"
# autoflake is needed for removing unused imports
autoflake=">=2.3.1"
# mypy is needed for type checking
mypy = ">=1.10.1"
mypy = ">=1.11.1"
# pytest is needed for running tests
pytest = ">=8.2.2"
pytest = ">=8.3.2"
# black is needed for formatting code
black = ">=24.4.2"
black = ">=24.8.0"
# pygments is needed for syntax highlighting
pygments=">=2.8.1" # not directly required, pinned by Snyk to avoid a vulnerability
# Sphinx is needed for generating documentation
Expand All @@ -101,13 +109,13 @@ sparkdataframecomparer = ">=2.0.2"
# pytest-ayncio is needed for running async tests
pytest-asyncio = ">=0.23.8"
# helix-mockserver-client is needed for mocking servers
helix-mockserver-client=">=1.2.1"
helix-mockserver-client=">=1.3.0"
# sparkfhirschemas is needed for FHIR schemas
sparkfhirschemas = ">=1.0.17"
# types-boto3 is needed for type hints for boto3
types-boto3 = ">=1.0.2"
# moto is needed for mocking AWS services
moto = { extras = ['all'], version = ">=5.0.11" }
moto = { extras = ['all'], version = ">=5.0.12" }
# types-requests is needed for type hints for requests
types-requests=">=2.31.0"
# types-PyMySQL is needed for type hints for PyMySQL
Expand All @@ -120,6 +128,8 @@ types-python-dateutil=">=2.8.19.14"
pandas-stubs=">=2.2.2"
# types-pytz is needed for type hints for pytz
types-pytz=">=2024.1.0"
# pydantic is needed for data class loading
pydantic=">=2.8.2"

# These dependencies are required for pipenv-setup. They conflict with ones above, so we install these
# only when running pipenv-setup
Expand Down
1,194 changes: 670 additions & 524 deletions Pipfile.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,5 +252,9 @@ This will install Java, Scala, Spark and other packages

# Publishing a new package
1. Create a new release
3. The GitHub Action should automatically kick in and publish the package
4. You can see the status in the Actions tab
2. The GitHub Action should automatically kick in and publish the package
3. You can see the status in the Actions tab


# Asynchronous Processing
[Asynchronous Processing](asynchronous.md)
231 changes: 231 additions & 0 deletions asynchronous.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Asynchronous Processing

## Introduction
Asynchronous processing is a form of parallel processing that allows a system to handle multiple requests at the same time. This is particularly useful when a system needs to wait on other systems to respond, such as reading from a file, accessing a web server (e.g. FHIR or Elasticsearch) or making a network request.

Instead of blocking the current process, we can "await" the response from the other system and continue processing other requests.

This is the `async` and `await` pattern in Python (and other programming languages). The functions are defined with an "async" prefix and then we use "await" on calls to external systems.

## How use Asynchronous Processing with Spark
Apache Spark is a distributed computing system that is designed to process large datasets quickly.
Each partition of a data frame can be processed in parallel on different nodes in a cluster.

However if we are not using async calls then each worker node can get blocked waiting for the external system to respond for the call we make for that partition.

Instead, what we want is that each worker can make multiple async calls at the same time and then once all the async calls have returned then that partition is finished and the worker node can work on a different partition.


## Using async and await with Spark
In this framework, we provide a few helper classes that allow you to easily use async functions with Spark data frames.
These functions can be used with the `mapInPandas()` function in Spark or as Pandas UDF (User Defined Functions) in the `withColumn()` or `select()` function.

### Case 1: When you want to read a whole data frame and return a different data frame
Examples of this include when you read FHIR resources from a source dataframe, send those to the FHIR server and return a data frame that contains the responses from the FHIR server.
In this case your goal is to create a new data frame that has the same number of rows as the input data frame.

For this case, you can use the `AsyncPandasDataFrameUDF` class like so:
```python
from spark_pipeline_framework.utilities.async_pandas_udf.v1.async_pandas_dataframe_udf import (
AsyncPandasDataFrameUDF,
)
from pyspark.sql import SparkSession, DataFrame
from spark_pipeline_framework.utilities.async_pandas_udf.v1.function_types import (
HandlePandasDataFrameBatchFunction,
)
from typing import (
List,
Dict,
Any,
Optional,
AsyncGenerator,
cast,
Iterable,
Tuple,
Generator,
)
import dataclasses
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

@dataclasses.dataclass
class MyParameters:
log_level: str



def my_code():
async def test_async(
*,
partition_index: int,
chunk_index: int,
chunk_input_range: range,
input_values: List[Dict[str, Any]],
parameters: Optional[MyParameters],
) -> AsyncGenerator[Dict[str, Any], None]:
# your async code here
# yield a dict for each row in the input_values list
yield {
"name": "test"
}

output_schema = StructType(
[
StructField("name", StringType(), True),
]
)
result_df: DataFrame = source_df.mapInPandas(
AsyncPandasDataFrameUDF(
parameters=MyParameters(log_level="DEBUG"),
async_func=cast(
HandlePandasDataFrameBatchFunction[MyParameters], test_async
),
batch_size=2,
).get_pandas_udf(),
schema=output_schema,
)
```

### Case 2: When you want to read a single column (or set of columns) and want to append (or replace) a column in the same dataframe
Example of this include reading the raw_address column of every row, calling a service to standardize the address and then adding the standardized address as a new column called standardized_address.

In this case, your goal is to add columns to an existing data frame and not replace the whole dataframe.

There are four kinds of column transformations:
1. Struct column to struct column. Use `AsyncPandasStructColumnToStructColumnUDF` class.
2. Struct column to scalar column. Use `AsyncPandasStructColumnToScalarColumnUDF` class.
3. Scalar column to struct column. Use `AsyncPandasScalarColumnToStructColumnUDF` class.
4. Scalar column to scalar column. Use `AsyncPandasScalarColumnToScalarColumnUDF` class.

For example for scalar column to scalar column transformation, you can use the `AsyncPandasScalarColumnToScalarColumnUDF` class like so:
```python
from spark_pipeline_framework.utilities.async_pandas_udf.v1.async_pandas_scalar_column_to_scalar_udf import (
AsyncPandasScalarColumnToScalarColumnUDF,
)
from pyspark.sql import SparkSession, DataFrame
from spark_pipeline_framework.utilities.async_pandas_udf.v1.function_types import (
HandlePandasScalarToScalarBatchFunction,
)
from typing import (
List,
Dict,
Any,
Optional,
AsyncGenerator,
cast,
)
import dataclasses
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

@dataclasses.dataclass
class MyParameters:
log_level: str

def my_code():
async def test_async(
*,
partition_index: int,
chunk_index: int,
chunk_input_range: range,
input_values: List[Dict[str, Any]],
parameters: Optional[MyParameters],
) -> AsyncGenerator[Dict[str, Any], None]:
# your async code here
# yield a dict for each row in the input_values list
yield {}

output_schema = StructType(
[
# your output schema here
]
)
source_df: DataFrame = spark.createDataFrame(
[
# your data here
],
schema=StructType(
[
StructField("raw_address", StringType(), True),
]
),
)
result_df: DataFrame = df.withColumn(
colName="processed_name",
col=AsyncPandasScalarColumnToScalarColumnUDF(
async_func=cast(
HandlePandasScalarToScalarBatchFunction[MyParameters],
test_async,
),
parameters=MyParameters(log_level="DEBUG"),
batch_size=2,
).get_pandas_udf(return_type=StringType())(source_df["name"]),
)
```

# Logging
The partition_index and chunk_index are passed to the async function so that you can log them. This can be useful for debugging purposes. The log level is passed as a parameter to the async function so that you can control the log level.

In addition, you can use the SparkPartitionInformation class to get additional information about the partition, such as the number of rows in the partition and the partition index.

```python
from spark_pipeline_framework.utilities.spark_partition_information.v1.spark_partition_information import (
SparkPartitionInformation,
)
from typing import (
List,
Dict,
Any,
Optional,
AsyncGenerator,
cast,
Iterable,
Tuple,
Generator,
)

async def test_async(
*,
partition_index: int,
chunk_index: int,
chunk_input_range: range,
input_values: List[Dict[str, Any]],
parameters: Optional[MyParameters],
) -> AsyncGenerator[Dict[str, Any], None]:
spark_partition_information: SparkPartitionInformation = (
SparkPartitionInformation.from_current_task_context(
chunk_index=chunk_index,
)
)
print(str(spark_partition_information))
# your async code here
# yield a dict for each row in the input_values list
yield {}


```

# Memory usage
Typically, when you transform a data frame in Pyton code, Spark has to serialize the data frame and send it to the Python worker. And then it has to serialize the result and send it back to the JVM.

This can be memory intensive.

However, this framework uses Arrow (a columnar memory framework) so the data is shared between JVM and Python without needing to serialize and deserialize between Python and JVM.

# Use asyncio drivers for external systems
To make this work you need to use an async driver for the external system you are calling. The synchronous drivers will block the worker node and prevent it from making other async calls.

For example for HTTP calls, you can use `aiohttp` library and for elastic search you can use `opensearch-py` library.

You can find a list of async drivers here: https://github.com/timofurrer/awesome-asyncio

# Controlling how big of a batch to send to the async function
Set the `batch_size` parameter in the `AsyncPandasDataFrameUDF` or `AsyncPandasScalarColumnToScalarColumnUDF` class to control how many rows to send to the async function at a time.

The partition will be divided into chunks of `batch_size` rows and each chunk will be sent to the async function at the same time.

Hence you should set the `partition_size` to be as large as can fit in memory and then let this divide into batches of `batch_size` to send to async function at one time.

This will be most efficient since you'll have multiple async calls (one for each batch) waiting in each partition.


# Summary
By using these classes you can easily use async functions with Spark data frames. This can be useful when you need to perform tasks that are time-consuming, such as reading from a file, accessing a web server (e.g. FHIR or Elasticsearch) or making a network request. By using asynchronous processing, a system can handle multiple requests at the same time, which can improve performance and reduce latency.
Loading