Skip to content

Commit

Permalink
Merge pull request #2321 from tseaver/2083-bigquery-async_query_job-q…
Browse files Browse the repository at this point in the history
…uery_results

Add 'QueryJob.results' method
  • Loading branch information
tseaver authored Sep 15, 2016
2 parents 9239c3c + ef7b499 commit a92d69f
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 131 deletions.
13 changes: 13 additions & 0 deletions docs/bigquery-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ Poll until the job is complete:
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)

Retrieve the results:

.. doctest::

>>> results = job.results()
>>> rows, total_count, token = query.fetch_data() # API requet
>>> while True:
... do_something_with(rows)
... if token is None:
... break
... rows, total_count, token = query.fetch_data(
... page_token=token) # API request


Inserting data (asynchronous)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
52 changes: 52 additions & 0 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,55 @@ def _validate(self, value):
"""
if value not in self.ALLOWED:
raise ValueError('Pass one of: %s' ', '.join(self.ALLOWED))


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
:type udf_type: str
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
:type value: str
:param value: the inline code or resource URI.
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


class UDFResourcesProperty(object):
"""Custom property type, holding :class:`UDFResource` instances."""

def __get__(self, instance, owner):
"""Descriptor protocol: accessor"""
if instance is None:
return self
return list(instance._udf_resources)

def __set__(self, instance, value):
"""Descriptor protocol: mutator"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
instance._udf_resources = tuple(value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended.
:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs
67 changes: 12 additions & 55 deletions google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,62 +23,10 @@
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import _build_schema_resource
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty


class UDFResource(object):
"""Describe a single user-defined function (UDF) resource.
:type udf_type: str
:param udf_type: the type of the resource ('inlineCode' or 'resourceUri')
:type value: str
:param value: the inline code or resource URI.
See
https://cloud.google.com/bigquery/user-defined-functions#api
"""
def __init__(self, udf_type, value):
self.udf_type = udf_type
self.value = value

def __eq__(self, other):
return(
self.udf_type == other.udf_type and
self.value == other.value)


def _build_udf_resources(resources):
"""
:type resources: sequence of :class:`UDFResource`
:param resources: fields to be appended.
:rtype: mapping
:returns: a mapping describing userDefinedFunctionResources for the query.
"""
udfs = []
for resource in resources:
udf = {resource.udf_type: resource.value}
udfs.append(udf)
return udfs


class UDFResourcesProperty(object):
"""Custom property type for :class:`QueryJob`.
Also used by :class:`~google.cloud.bigquery.query.Query`.
"""
def __get__(self, instance, owner):
"""Descriptor protocol: accessor"""
if instance is None:
return self
return list(instance._udf_resources)

def __set__(self, instance, value):
"""Descriptor protocol: mutator"""
if not all(isinstance(u, UDFResource) for u in value):
raise ValueError("udf items must be UDFResource")
instance._udf_resources = tuple(value)
from google.cloud.bigquery._helpers import _build_udf_resources


class Compression(_EnumProperty):
Expand Down Expand Up @@ -957,7 +905,7 @@ class QueryJob(_AsyncJob):
:type udf_resources: tuple
:param udf_resources: An iterable of
:class:`google.cloud.bigquery.job.UDFResource`
:class:`google.cloud.bigquery._helpers.UDFResource`
(empty by default)
"""
_JOB_TYPE = 'query'
Expand Down Expand Up @@ -1130,3 +1078,12 @@ def from_api_repr(cls, resource, client):
job = cls(name, query, client=client)
job._set_properties(resource)
return job

def results(self):
"""Construct a QueryResults instance, bound to this job.
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)
27 changes: 25 additions & 2 deletions google/cloud/bigquery/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import UDFResourcesProperty
from google.cloud.bigquery.job import _build_udf_resources
from google.cloud.bigquery.table import _parse_schema_resource
from google.cloud.bigquery._helpers import _build_udf_resources
from google.cloud.bigquery._helpers import UDFResourcesProperty


class _SyncQueryConfiguration(object):
Expand Down Expand Up @@ -65,6 +65,26 @@ def __init__(self, query, client, udf_resources=()):
self.udf_resources = udf_resources
self._job = None

@classmethod
def from_query_job(cls, job):
"""Factory: construct from an existing job.
:type job: :class:`~google.cloud.bigquery.job.QueryJob`
:param job: existing job
:rtype: :class:`QueryResults`
:returns: the instance, bound to the job
"""
instance = cls(job.query, job._client, job.udf_resources)
instance._job = job
if job.default_dataset is not None:
instance.default_dataset = job.default_dataset
if job.use_query_cache is not None:
instance.use_query_cache = job.use_query_cache
if job.use_legacy_sql is not None:
instance.use_legacy_sql = job.use_legacy_sql
return instance

@property
def project(self):
"""Project bound to the job.
Expand Down Expand Up @@ -307,6 +327,9 @@ def run(self, client=None):
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
"""
if self._job is not None:
raise ValueError("Query job is already running.")

client = self._require_client(client)
path = '/projects/%s/queries' % (self.project,)
api_response = client.connection.api_request(
Expand Down
70 changes: 70 additions & 0 deletions unit_tests/bigquery/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,76 @@ def __init__(self):
self.assertEqual(wrapper._configuration._attr, None)


class Test_UDFResourcesProperty(unittest.TestCase):

def _getTargetClass(self):
from google.cloud.bigquery._helpers import UDFResourcesProperty
return UDFResourcesProperty

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def _descriptor_and_klass(self):
descriptor = self._makeOne()

class _Test(object):
_udf_resources = ()
udf_resources = descriptor

return descriptor, _Test

def test_class_getter(self):
descriptor, klass = self._descriptor_and_klass()
self.assertTrue(klass.udf_resources is descriptor)

def test_instance_getter_empty(self):
_, klass = self._descriptor_and_klass()
instance = klass()
self.assertEqual(instance.udf_resources, [])

def test_instance_getter_w_non_empty_list(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = tuple(udf_resources)

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_empty_list(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = udf_resources

instance.udf_resources = []

self.assertEqual(instance.udf_resources, [])

def test_instance_setter_w_valid_udf(self):
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()

instance.udf_resources = udf_resources

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_bad_udfs(self):
_, klass = self._descriptor_and_klass()
instance = klass()

with self.assertRaises(ValueError):
instance.udf_resources = ["foo"]

self.assertEqual(instance.udf_resources, [])


class _Field(object):

def __init__(self, mode, name='unknown', field_type='UNKNOWN', fields=()):
Expand Down
80 changes: 9 additions & 71 deletions unit_tests/bigquery/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,6 @@
import unittest


class Test_UDFResourcesProperty(unittest.TestCase):

def _getTargetClass(self):
from google.cloud.bigquery.job import UDFResourcesProperty
return UDFResourcesProperty

def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)

def _descriptor_and_klass(self):
descriptor = self._makeOne()

class _Test(object):
_udf_resources = ()
udf_resources = descriptor

return descriptor, _Test

def test_class_getter(self):
descriptor, klass = self._descriptor_and_klass()
self.assertTrue(klass.udf_resources is descriptor)

def test_instance_getter_empty(self):
_, klass = self._descriptor_and_klass()
instance = klass()
self.assertEqual(instance.udf_resources, [])

def test_instance_getter_w_non_empty_list(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = tuple(udf_resources)

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_empty_list(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()
instance._udf_resources = udf_resources

instance.udf_resources = []

self.assertEqual(instance.udf_resources, [])

def test_instance_setter_w_valid_udf(self):
from google.cloud.bigquery.job import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
udf_resources = [UDFResource("resourceUri", RESOURCE_URI)]
_, klass = self._descriptor_and_klass()
instance = klass()

instance.udf_resources = udf_resources

self.assertEqual(instance.udf_resources, udf_resources)

def test_instance_setter_w_bad_udfs(self):
_, klass = self._descriptor_and_klass()
instance = klass()

with self.assertRaises(ValueError):
instance.udf_resources = ["foo"]

self.assertEqual(instance.udf_resources, [])


class _Base(object):
PROJECT = 'project'
SOURCE1 = 'http://example.com/source1.csv'
Expand Down Expand Up @@ -1466,6 +1396,14 @@ def test_from_api_repr_w_properties(self):
self.assertTrue(dataset._client is client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_results(self):
from google.cloud.bigquery.query import QueryResults
client = _Client(self.PROJECT)
job = self._makeOne(self.JOB_NAME, self.QUERY, client)
results = job.results()
self.assertIsInstance(results, QueryResults)
self.assertTrue(results._job is job)

def test_begin_w_bound_client(self):
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
Expand Down Expand Up @@ -1568,7 +1506,7 @@ def test_begin_w_alternate_client(self):
self._verifyResourceProperties(job, RESOURCE)

def test_begin_w_bound_client_and_udf(self):
from google.cloud.bigquery.job import UDFResource
from google.cloud.bigquery._helpers import UDFResource
RESOURCE_URI = 'gs://some-bucket/js/lib.js'
PATH = 'projects/%s/jobs' % self.PROJECT
RESOURCE = self._makeResource()
Expand Down
Loading

0 comments on commit a92d69f

Please sign in to comment.