Skip to content

Commit 07c1767

Browse files
Merge pull request #39 from InfluxCommunity/38-write-error-when-org=-isnt-set-for-the-client
Pylint updated. fixed upload file flags
2 parents 6dad255 + 9e3b9ac commit 07c1767

File tree

3 files changed

+166
-42
lines changed

3 files changed

+166
-42
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import influxdb_client_3 as InfluxDBClient3
2+
import pandas as pd
3+
import numpy as np
4+
from influxdb_client_3 import write_client_options, WriteOptions, InfluxDBError, file_parser_options
5+
6+
7+
class BatchingCallback(object):
8+
9+
def success(self, conf, data: str):
10+
print(f"Written batch: {conf}, data: {data}")
11+
12+
def error(self, conf, data: str, exception: InfluxDBError):
13+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
14+
15+
def retry(self, conf, data: str, exception: InfluxDBError):
16+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
17+
18+
callback = BatchingCallback()
19+
20+
write_options = WriteOptions(batch_size=500,
21+
flush_interval=10_000,
22+
jitter_interval=2_000,
23+
retry_interval=5_000,
24+
max_retries=5,
25+
max_retry_delay=30_000,
26+
exponential_base=2)
27+
28+
wco = write_client_options(success_callback=callback.success,
29+
error_callback=callback.error,
30+
retry_callback=callback.retry,
31+
WriteOptions=write_options
32+
)
33+
34+
with InfluxDBClient3.InfluxDBClient3(
35+
token="",
36+
host="eu-central-1-1.aws.cloud2.influxdata.com",
37+
org="6a841c0c08328fb1",
38+
database="python", write_client_options=wco) as client:
39+
40+
41+
fpo = file_parser_options(columns=["time", "machineID", "vibration"])
42+
43+
client.write_file(
44+
file='./out.parquet',
45+
timestamp_column='time', tag_columns=["provider", "machineID"], measurement_name='machine_data', file_parser_options=fpo)
46+

influxdb_client_3/__init__.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
1-
import json
2-
import urllib.parse
1+
import urllib.parse, json
32
import pyarrow as pa
43
from influxdb_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
54
from influxdb_client.client.write_api import WriteApi as _WriteApi, SYNCHRONOUS, ASYNCHRONOUS, PointSettings
65
from influxdb_client.domain.write_precision import WritePrecision
76
from influxdb_client.client.exceptions import InfluxDBError
87
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
9-
from influxdb_client_3.read_file import upload_file
8+
from influxdb_client_3.read_file import UploadFile
109

1110

1211
def write_client_options(**kwargs):
12+
"""
13+
Function for providing additional arguments for the WriteApi client.
14+
15+
:param kwargs: Additional arguments for the WriteApi client.
16+
:return: dict with the arguments.
17+
"""
1318
return kwargs
1419

1520
def default_client_options(**kwargs):
1621
return kwargs
1722

1823
def flight_client_options(**kwargs):
19-
return kwargs # You can replace this with a specific data structure if needed
24+
"""
25+
Function for providing additional arguments for the FlightClient.
26+
27+
:param kwargs: Additional arguments for the FlightClient.
28+
:return: dict with the arguments.
29+
"""
30+
return kwargs
31+
32+
def file_parser_options(**kwargs):
33+
"""
34+
Function for providing additional arguments for the file parser.
35+
36+
:param kwargs: Additional arguments for the file parser.
37+
:return: dict with the arguments.
38+
"""
39+
return kwargs
2040

2141

2242
class InfluxDBClient3:
@@ -40,13 +60,13 @@ def __init__(
4060
:type database: str
4161
:param token: The authentication token for accessing the InfluxDB server.
4262
:type token: str
43-
:param write_client_options: Options for the WriteAPI.
44-
:type write_client_options: dict
45-
:param flight_client_options: Options for the FlightClient.
46-
:type flight_client_options: dict
63+
:param write_client_options: Function for providing additional arguments for the WriteApi client.
64+
:type write_client_options: callable
65+
:param flight_client_options: Function for providing additional arguments for the FlightClient.
66+
:type flight_client_options: callable
4767
:param kwargs: Additional arguments for the InfluxDB Client.
4868
"""
49-
self._org = org
69+
self._org = org if org is not None else "default"
5070
self._database = database
5171
self._token = token
5272
self._write_client_options = write_client_options if write_client_options is not None else default_client_options(write_options=SYNCHRONOUS)
@@ -72,7 +92,9 @@ def write(self, record=None, database=None ,**kwargs):
7292
Write data to InfluxDB.
7393
7494
:param record: The data point(s) to write.
75-
:type record: Point or list of Point objects
95+
:type record: object or list of objects
96+
:param database: The database to write to. If not provided, uses the database provided during initialization.
97+
:type database: str
7698
:param kwargs: Additional arguments to pass to the write API.
7799
"""
78100
if database is None:
@@ -84,7 +106,7 @@ def write(self, record=None, database=None ,**kwargs):
84106
raise e
85107

86108

87-
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, **kwargs):
109+
def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_column='time', database=None, file_parser_options=None ,**kwargs):
88110
"""
89111
Write data from a file to InfluxDB.
90112
@@ -96,13 +118,17 @@ def write_file(self, file, measurement_name=None, tag_columns=None, timestamp_co
96118
:type tag_columns: list
97119
:param timestamp_column: Timestamp column name. Defaults to 'time'.
98120
:type timestamp_column: str
121+
:param database: The database to write to. If not provided, uses the database provided during initialization.
122+
:type database: str
123+
:param file_parser_options: Function for providing additional arguments for the file parser.
124+
:type file_parser_options: callable
99125
:param kwargs: Additional arguments to pass to the write API.
100126
"""
101127
if database is None:
102128
database = self._database
103129

104130
try:
105-
table = upload_file(file).load_file()
131+
table = UploadFile(file, file_parser_options).load_file()
106132
df = table.to_pandas() if isinstance(table, pa.Table) else table
107133
self._process_dataframe(df, measurement_name, tag_columns or [], timestamp_column, database=database, **kwargs)
108134
except Exception as e:
@@ -138,10 +164,13 @@ def query(self, query, language="sql", mode="all", database=None, **kwargs ):
138164
139165
:param query: The query string.
140166
:type query: str
141-
:param language: The query language (default is "sql").
167+
:param language: The query language; "sql" or "influxql" (default is "sql").
142168
:type language: str
143169
:param mode: The mode of fetching data (all, pandas, chunk, reader, schema).
144170
:type mode: str
171+
:param database: The database to query from. If not provided, uses the database provided during initialization.
172+
:type database: str
173+
:param kwargs: Additional arguments for the query.
145174
:return: The queried data.
146175
"""
147176
if database is None:
@@ -188,5 +217,6 @@ def __exit__(self, exc_type, exc_val, exc_tb):
188217
"WritePrecision",
189218
"WriteOptions",
190219
"write_client_options",
191-
"flight_client_options"
220+
"flight_client_options",
221+
"file_parser_options"
192222
]

influxdb_client_3/read_file.py

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,99 @@
1-
from pyarrow import json as pa_json
1+
import os
22
import pyarrow.csv as csv
33
import pyarrow.feather as feather
44
import pyarrow.parquet as parquet
5-
import os
5+
import pandas as pd
66

77
# Check if the OS is not Windows
88
if os.name != 'nt':
99
import pyarrow.orc as orc
1010

11-
import pandas as pd
1211

13-
class upload_file:
14-
def __init__(self, file, **kwargs):
12+
class UploadFile:
13+
"""
14+
Class for uploading and reading different types of files.
15+
"""
16+
def __init__(self, file, file_parser_options=None):
17+
"""
18+
Initialize an UploadFile instance.
19+
20+
:param file: The file to upload.
21+
:type file: str
22+
:param kwargs: Additional arguments for file loading functions.
23+
"""
1524
self._file = file
16-
self._kwargs = kwargs
25+
self._kwargs = file_parser_options if file_parser_options is not None else {}
1726

1827
def load_file(self):
19-
if self._file.endswith(".feather"):
20-
return self.load_feather(self._file, **self._kwargs)
21-
elif self._file.endswith(".parquet"):
22-
return self.load_parquet(self._file)
23-
elif self._file.endswith(".csv"):
24-
return self.load_csv(self._file)
25-
elif self._file.endswith(".json"):
26-
return self.load_json(self._file)
27-
elif self._file.endswith(".orc"):
28-
29-
return self.load_orc(self._file)
30-
else:
31-
raise ValueError("Unsupported file type")
32-
33-
def load_feather(self, file):
28+
"""
29+
Load a file based on its extension.
30+
31+
:return: The loaded file.
32+
:raises ValueError: If the file type is not supported.
33+
"""
34+
if self._file.endswith(".feather"):
35+
return self.load_feather(self._file)
36+
elif self._file.endswith(".parquet"):
37+
return self.load_parquet(self._file)
38+
elif self._file.endswith(".csv"):
39+
return self.load_csv(self._file)
40+
elif self._file.endswith(".json"):
41+
return self.load_json(self._file)
42+
elif self._file.endswith(".orc"):
43+
return self.load_orc(self._file)
44+
else:
45+
raise ValueError("Unsupported file type")
46+
47+
def load_feather(self, file ):
48+
"""
49+
Load a Feather file.
50+
51+
:param file: The Feather file to load.
52+
:type file: str
53+
:return: The loaded Feather file.
54+
"""
3455
return feather.read_table(file, **self._kwargs)
35-
56+
3657
def load_parquet(self, file):
58+
"""
59+
Load a Parquet file.
60+
61+
:param file: The Parquet file to load.
62+
:type file: str
63+
:return: The loaded Parquet file.
64+
"""
3765
return parquet.read_table(file, **self._kwargs)
38-
66+
3967
def load_csv(self, file):
68+
"""
69+
Load a CSV file.
70+
71+
:param file: The CSV file to load.
72+
:type file: str
73+
:return: The loaded CSV file.
74+
"""
4075
return csv.read_csv(file, **self._kwargs)
41-
76+
4277
def load_orc(self, file):
78+
"""
79+
Load an ORC file.
80+
81+
:param file: The ORC file to load.
82+
:type file: str
83+
:return: The loaded ORC file.
84+
:raises ValueError: If the OS is Windows.
85+
"""
4386
if os.name == 'nt':
4487
raise ValueError("Unsupported file type for this OS")
4588
else:
4689
return orc.read_table(file, **self._kwargs)
47-
48-
#TODO: Use pyarrow.json.read_json() instead of pandas.read_json()
90+
4991
def load_json(self, file):
50-
return pd.read_json(file, **self._kwargs)
51-
92+
"""
93+
Load a JSON file.
94+
95+
:param file: The JSON file to load.
96+
:type file: str
97+
:return: The loaded JSON file.
98+
"""
99+
return pd.read_json(file, **self._kwargs)

0 commit comments

Comments
 (0)