@@ -277,7 +277,7 @@ def _get_columns_schema_column(self, column):
277
277
for field in column ["fields" ]:
278
278
columns .append ("{}.{}" .format (column ["name" ], field ["name" ]))
279
279
else :
280
- columns .append (column ["name" ])
280
+ columns .append ({ "name" : column ["name" ], "type" : column [ "type" ]} )
281
281
282
282
return columns
283
283
@@ -299,35 +299,44 @@ def _get_project_datasets(self, project_id):
299
299
def get_schema (self , get_stats = False ):
300
300
if not self .configuration .get ("loadSchema" , False ):
301
301
return []
302
-
302
+ service = self . _get_bigquery_service ()
303
303
project_id = self ._get_project_id ()
304
304
datasets = self ._get_project_datasets (project_id )
305
-
306
- query_base = """
307
- SELECT table_schema, table_name, field_path, data_type
308
- FROM `{dataset_id}`.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS
309
- WHERE table_schema NOT IN ('information_schema')
310
- """
311
-
312
- schema = {}
313
- queries = []
305
+ schema = []
314
306
for dataset in datasets :
315
307
dataset_id = dataset ["datasetReference" ]["datasetId" ]
316
- query = query_base .format (dataset_id = dataset_id )
317
- queries .append (query )
308
+ tables = (
309
+ service .tables ()
310
+ .list (projectId = project_id , datasetId = dataset_id )
311
+ .execute ()
312
+ )
313
+ while True :
314
+ for table in tables .get ("tables" , []):
315
+ table_data = (
316
+ service .tables ()
317
+ .get (
318
+ projectId = project_id ,
319
+ datasetId = dataset_id ,
320
+ tableId = table ["tableReference" ]["tableId" ],
321
+ )
322
+ .execute ()
323
+ )
324
+ table_schema = self ._get_columns_schema (table_data )
325
+ schema .append (table_schema )
318
326
319
- query = "\n UNION ALL\n " .join (queries )
320
- results , error = self .run_query (query , None )
321
- if error is not None :
322
- self ._handle_run_query_error (error )
327
+ next_token = tables .get ("nextPageToken" , None )
328
+ if next_token is None :
329
+ break
323
330
324
- for row in results ["rows" ]:
325
- table_name = "{0}.{1}" .format (row ["table_schema" ], row ["table_name" ])
326
- if table_name not in schema :
327
- schema [table_name ] = {"name" : table_name , "columns" : []}
328
- schema [table_name ]["columns" ].append ({"name" : row ["field_path" ], "type" : row ["data_type" ]})
331
+ tables = (
332
+ service .tables ()
333
+ .list (
334
+ projectId = project_id , datasetId = dataset_id , pageToken = next_token
335
+ )
336
+ .execute ()
337
+ )
329
338
330
- return list ( schema . values ())
339
+ return schema
331
340
332
341
def run_query (self , query , user ):
333
342
logger .debug ("BigQuery got query: %s" , query )
@@ -337,12 +346,12 @@ def run_query(self, query, user):
337
346
338
347
try :
339
348
if "totalMBytesProcessedLimit" in self .configuration :
340
- limitMB = self .configuration ["totalMBytesProcessedLimit" ]
341
- processedMB = self ._get_total_bytes_processed (jobs , query ) / 1000.0 / 1000.0
342
- if limitMB < processedMB :
349
+ limit_mb = self .configuration ["totalMBytesProcessedLimit" ]
350
+ processed_mb = self ._get_total_bytes_processed (jobs , query ) / 1000.0 / 1000.0
351
+ if limit_mb < processed_mb :
343
352
return (
344
353
None ,
345
- "Larger than %d MBytes will be processed (%f MBytes)" % (limitMB , processedMB ),
354
+ "Larger than %d MBytes will be processed (%f MBytes)" % (limit_mb , processed_mb ),
346
355
)
347
356
348
357
data = self ._get_query_result (jobs , query )
0 commit comments