From e09afc7eb5bd57d9cb526feade71b1404ef33bf6 Mon Sep 17 00:00:00 2001 From: Amy Unruh Date: Wed, 17 Dec 2014 11:12:17 -0800 Subject: [PATCH] initial checkin --- LICENSE | 201 ++++ README.md | 11 +- bigquery-controller.yaml | 37 + bigquery-setup/schema.json | 1674 +++++++++++++++++++++++++++ favicon.ico | Bin 0 -> 8348 bytes pipeline-image/Dockerfile | 12 + pipeline-image/controller.py | 11 + pipeline-image/redis-to-bigquery.py | 161 +++ pipeline-image/twitter-to-redis.py | 85 ++ redis-master-service.yaml | 15 + redis-master.yaml | 23 + twitter-stream.yaml | 41 + 12 files changed, 2268 insertions(+), 3 deletions(-) create mode 100644 LICENSE create mode 100644 bigquery-controller.yaml create mode 100644 bigquery-setup/schema.json create mode 100644 favicon.ico create mode 100644 pipeline-image/Dockerfile create mode 100644 pipeline-image/controller.py create mode 100644 pipeline-image/redis-to-bigquery.py create mode 100644 pipeline-image/twitter-to-redis.py create mode 100644 redis-master-service.yaml create mode 100644 redis-master.yaml create mode 100644 twitter-stream.yaml diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..04cb0d7 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2013 Google Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index 57ff941..4667ffa 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,9 @@ -kubernetes-bigquery-python -========================== -Example Kubernetes app that shows how to build a 'pipeline' to stream data into BigQuery. Uses Redis. +Copyright (C) 2014 Google Inc. + +# Example app: Real-time data analysis using Kubernetes, Redis, and BigQuery + +This is an example Kubernetes app that shows how to build a 'pipeline' to stream data into BigQuery. It uses Redis. + +Documentation for this example can be found on the Google Cloud Platform site: +https://cloud.google.com/solutions/real-time-analysis/kubernetes-redis-bigquery diff --git a/bigquery-controller.yaml b/bigquery-controller.yaml new file mode 100644 index 0000000..a02ae50 --- /dev/null +++ b/bigquery-controller.yaml @@ -0,0 +1,37 @@ +id: bigQueryController +apiVersion: v1beta1 +kind: ReplicationController +desiredState: + replicas: 2 + # replicaSelector identifies the set of Pods that this + # replicaController is responsible for managing + replicaSelector: + name: bigquery-controller + # podTemplate defines the 'cookie cutter' used for creating + # new pods when necessary + podTemplate: + desiredState: + manifest: + version: v1beta1 + id: bqController + containers: + - name: bigquery + # Change this to your docker hub username + image: your-docker-hub-username/pipeline + env: + - name: PROCESSINGSCRIPT + value: redis-to-bigquery + - name: REDISLIST + value: twitter-stream + # Change this to your project ID. + - name: PROJECT_ID + value: xxxx + # Change the following two settings to your dataset and table. + - name: BQ_DATASET + value: xxxx + - name: BQ_TABLE + value: xxxx + # Important: these labels need to match the selector above + # The api server enforces this constraint. + labels: + name: bigquery-controller diff --git a/bigquery-setup/schema.json b/bigquery-setup/schema.json new file mode 100644 index 0000000..79ba013 --- /dev/null +++ b/bigquery-setup/schema.json @@ -0,0 +1,1674 @@ +[ + { + "name": "created_at", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "extended_entities", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "media", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "source_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "media_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "sizes", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "small", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "large", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "medium", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "thumb", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "media_url", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "truncated", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_user_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_user_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "user", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "location", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "description", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "protected", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "verified", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "followers_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "friends_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "listed_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "favourites_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "statuses_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "utc_offset", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "time_zone", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_enabled", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "lang", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "contributors_enabled", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "is_translator", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_background_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_image_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_image_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_tile", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_link_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_sidebar_border_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_sidebar_fill_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_text_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_use_background_image", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_image_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_image_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_banner_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "default_profile", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "default_profile_image", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "following", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "follow_request_sent", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "notifications", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "geo", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, + { + "name": "coordinates", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, +{ + "name": "place", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "place_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "full_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "country_code", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "country", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "bounding_box", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, + { + "name": "attributes", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "contributors", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "retweeted_status", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "created_at", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "scopes", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "followers", + "type": "BOOLEAN", + "mode": "NULLABLE" + } + ] + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "extended_entities", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "media", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "media_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "sizes", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "large", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "small", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "medium", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + }, + { + "name": "thumb", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "media_url", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "truncated", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_user_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_user_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "in_reply_to_screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "user", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "location", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "description", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "protected", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "verified", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "followers_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "friends_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "listed_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "favourites_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "statuses_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "created_at", + "type": "TIMESTAMP", + "mode": "NULLABLE" + }, + { + "name": "utc_offset", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "time_zone", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "geo_enabled", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "lang", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "contributors_enabled", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "is_translator", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_background_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_image_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_image_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_background_tile", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_link_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_sidebar_border_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_sidebar_fill_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_text_color", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_use_background_image", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "profile_image_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_image_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "profile_banner_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "default_profile", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "default_profile_image", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "following", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "follow_request_sent", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "notifications", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "geo", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "pl", + "type": "FLOAT", + "mode": "REPEATED" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, + { + "name": "coordinates", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, +{ + "name": "place", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "id", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "place_type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "full_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "country_code", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "country", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "bounding_box", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "coordinates", + "type": "FLOAT", + "mode": "REPEATED" + } + ] + }, + { + "name": "attributes", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "contributors", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "retweet_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "favorite_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "entities", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "hashtags", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "trends", + "type": "STRING", + "mode": "REPEATED", + "fields": [ + + ] + }, + { + "name": "urls", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "user_mentions", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "symbols", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "media", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "media_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "media_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "source_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "source_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "sizes", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "large", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "medium", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "thumb", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "small", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + } + ] + } + ] + }, + { + "name": "favorited", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "retweeted", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "timestamp_ms", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "possibly_sensitive", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "filter_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "lang", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "retweet_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "favorite_count", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "entities", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "hashtags", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "trends", + "type": "STRING", + "mode": "REPEATED", + "fields": [ + + ] + }, + { + "name": "urls", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "user_mentions", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "screen_name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "name", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + } + ] + }, + { + "name": "symbols", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "text", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "media", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "id_str", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "indices", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "media_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "media_url_https", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "display_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "expanded_url", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "type", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "sizes", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "large", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "medium", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "thumb", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + }, + { + "name": "small", + "type": "RECORD", + "mode": "NULLABLE", + "fields": [ + { + "name": "w", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "h", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "resize", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "source_status_id", + "type": "INTEGER", + "mode": "NULLABLE" + }, + { + "name": "source_status_id_str", + "type": "STRING", + "mode": "NULLABLE" + } + ] + } + ] + }, + { + "name": "favorited", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "retweeted", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "timestamp_ms", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "possibly_sensitive", + "type": "BOOLEAN", + "mode": "NULLABLE" + }, + { + "name": "filter_level", + "type": "STRING", + "mode": "NULLABLE" + }, + { + "name": "lang", + "type": "STRING", + "mode": "NULLABLE" + } +] diff --git a/favicon.ico b/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..23c553a2966ca4cecf146093f33d8114b4f1e368 GIT binary patch literal 8348 zcmeI0du&tJ8Ng3Vwv|ewX4Hu@my&|vCD%DN@CLnxyptBT_%HjatoYr7%N3zGPed#@ckvA><;HWecDU4&2}*2(h%gm&WgCftWj zkQfW;&mXr@0S&(csd+cjaImbXSy=NC<10}z(efGwgi=;B$dt=HkKYCvp-#3KX+qIu zxx$>zcwk+N=c%1^J9)sO44lmS0##~wzU(43f>wW-p_YXwj>s>3{gHI*^oo1lmJ!X*bw{3UBAKRut zUh5Uy4_94IIkdxC^v{_g%FjuEI+f&;9KQ3i+nK7pU_RivFjP_DTl-&gP_qr$nD=M{ z@z;C3#y5Jsh77svGQ4u$ZX)t}=e52{#Xvk;4qIxNh86kR$OhCAie%&e`U{r{ACQZ@ z-)1#sItef{2L za&U;nm+_lo@lbR7vat&^EcECsH)uSnhlht@etsV4LE5B`j#GW*iur~}xpYlTtc;FC{nl|LK^miZ0y3WU>MIM zmc}94VzEFx9?#&?4l+h;ggzvOeCI$ob=`tBp*&EtirH2xU z4a#KB)IMa{_YdYrnxq>-DrrQ>rfpYe8@|Nc-oHnG*L(HR$}d4Eo2&X@o1~9l@%@W) zU{%rv$`tBAOHJH+?zcv7`!T~x;=<#7@4R5T^5$-%Pz-NAYt+5{d;>R+9W24y&`;ZkZqFGQ6_)<35Z_$5V#ga#=N9985-M0KR*fl@h4M0BxWvbYQr6t zULtYBAMJ%iU>x|?U8z_Zyv1jg_VcZ^kO)pd_)jk`_~2MHZmybbloW?l)lnMrb~TAX zV&%#e+VLvM4!%js+%B6}N!=udFlN5Jv;yQm0sdW({6ny+{{$_b`%pI&q3%#p3R?q( z%3@zpafPo4-GA}ErIfU@j?gpfdgeyI);;S-otz(GefQbXvG3J6hbwD_0*YOuqb1V8 zpQm{(oT|g$!tcw;Ck6l>=mo$z0J@0f0^U0{x?+mD8}QB{9yV7qQ;&$5^%*fb@qUB& zX(O<{D-aZ493K;1xH%!}9+}vt8Sshs4os9)d132glTa=lIJv}MJyU_Y!ZFl62j9@l zggg2y{y~c&Vm0cMa@}r@bbn?HR4Sa|66p;nlMl_6D%_%EjCNRq)NBvx!R!t`@zUoW zKV#O#EhZy4>~?VU+rfg@_W_4Kt~zS<|3JjVMcZ#exy;pDUypq|xjpD&0#H{BHgrvM zI=z9nnTN~NCX?mf@DOU;!82-#gXGsw5CSSf22lJ>u8duE&igGuZq4lVZf*LH2%QqOqkv@O{w`Y}q~yWm0EP zeSP~Hau49ZBU@&hWwH5YG0dnGVN1^iztQFh>rIvj5$iQ;q^nyuSuXo`U~{E8DpuIS zA{kR5oC9o=_>jhfRX@QTs1KRm{@F31 zFKP1!w?51@R^I~kA%H*B0W^sKnyVIsv`_2;=zbUGR9pOTqUhV{{^UH=RQ2@S?`uaQ zEy`)Gsd}1IEedW&4lAe0Sg5h`nQXqaZ}RyEzX{FT?hmF3>0_QQT1V~jI$wc&1@aZh ZS0G=3d706zJ{{SaIO|Jj| literal 0 HcmV?d00001 diff --git a/pipeline-image/Dockerfile b/pipeline-image/Dockerfile new file mode 100644 index 0000000..7c17abe --- /dev/null +++ b/pipeline-image/Dockerfile @@ -0,0 +1,12 @@ +FROM google/python + +RUN pip install tweepy +RUN pip install --upgrade google-api-python-client +RUN pip install redis +RUN pip install python-dateutil + +ADD twitter-to-redis.py /twitter-to-redis.py +ADD redis-to-bigquery.py /redis-to-bigquery.py +ADD controller.py /controller.py + +CMD python controller.py diff --git a/pipeline-image/controller.py b/pipeline-image/controller.py new file mode 100644 index 0000000..005395a --- /dev/null +++ b/pipeline-image/controller.py @@ -0,0 +1,11 @@ + +import os + +script = os.environ['PROCESSINGSCRIPT'] + +if script == 'redis-to-bigquery': + os.system("python redis-to-bigquery.py") +elif script == 'twitter-to-redis': + os.system("python twitter-to-redis.py") +else: + print "unknown script %s" % script diff --git a/pipeline-image/redis-to-bigquery.py b/pipeline-image/redis-to-bigquery.py new file mode 100644 index 0000000..d6ff45a --- /dev/null +++ b/pipeline-image/redis-to-bigquery.py @@ -0,0 +1,161 @@ +"""This script grabs tweets from a redis server, and stores them in BiqQuery +using the BigQuery Streaming API. +""" + +import collections +import json +import os +import time +import urllib + +import dateutil.parser +from googleapiclient.discovery import build +import httplib2 +from oauth2client.client import AccessTokenCredentials +import redis + +# Get info on the Redis host and port from the environment variables. +# The name of this variable comes from the redis service id, 'redismaster'. +REDIS_HOST = os.environ['REDISMASTER_SERVICE_HOST'] +REDIS_PORT = os.environ['REDISMASTER_SERVICE_PORT'] +REDIS_LIST = os.environ['REDISLIST'] + +r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0) + +# Get the numeric project ID from the environment variable set in +# the 'bigquery-controller.json' manifest. +PROJECT_ID = os.environ['PROJECT_ID'] +BQ_SCOPE = 'https://www.googleapis.com/auth/bigquery' + + +def flatten(l): + """Helper function used to massage the raw tweet data.""" + for el in l: + if isinstance(el, collections.Iterable) and not isinstance(el, basestring): + for sub in flatten(el): + yield sub + else: + yield el + + +def strip_none(data): + """Do some data massaging.""" + if isinstance(data, dict): + newdict = {} + for k,v in data.items(): + if k == 'coordinates' and isinstance(v, list): + # flatten list + newdict[k] = list(flatten(v)) + elif k == 'created_at' and v: + newdict[k] = str(dateutil.parser.parse(v)) + elif v == False: + newdict[k] = v + else: + if k and v: + newdict[k] = strip_none(v) + return newdict + else: + return data + + +def write_to_bq(bigquery): + """Write the data to BigQuery in small chunks.""" + tweets = [] + CHUNK = 50 # The size of the BigQuery insertion batch. + twstring = '' + tweet = None + mtweet = None + while True: + while len(tweets) < CHUNK: + # We'll use a blocking list pop -- it returns when there is new data. + res = r.brpop(REDIS_LIST) + twstring = res[1] + try: + tweet = json.loads(res[1]) + except Exception, bqe: + print bqe + # First do some massaging of the raw data + mtweet = strip_none(tweet) + # We only want to write tweets to BigQuery; we'll skip 'delete' and + # 'limit' information. + if 'delete' in mtweet: + continue + if 'limit' in mtweet: + print mtweet + continue + tweets.append(mtweet) + + try: + rowlist = [] + # Generate the data that will be sent to BigQuery + for item in tweets: + item_row = {"json": item} + rowlist.append(item_row) + body = {"rows": rowlist} + dataset = os.environ['BQ_DATASET'] + table = os.environ['BQ_TABLE'] + # Try the insertion. + response = bigquery.tabledata().insertAll( + projectId=PROJECT_ID, datasetId=dataset, tableId=table, body=body + ).execute() + print "streaming response: %s" % response + try: + # If there was an insertion error returned, print diagnostics. + if 'insertErrors' in response: + err_resp = response['insertErrors'] + if 'errors' in err_resp[0] and err_resp[0]['errors']: + reason = err_resp[0]['errors'][0]['reason'] + if reason == 'timeout' or reason == 'stopped': + print "error was 'timeout' or 'stopped'." + else: # got some other type of error; print for diagnostics + print twstring + print '-----------tweet to json string' + print json.dumps(mtweet) + else: + print twstring + print '-----------tweet to json string' + print json.dumps(mtweet) + except Exception, exp: + print exp + tweets = [] + except Exception: + # If an exception was thrown in making the insertion call, try again. + time.sleep(2) + print "trying again." + try: + response = bigquery.tabledata().insertAll( + projectId=PROJECT_ID, datasetId=dataset, tableId=table, body=body + ).execute() + print "streaming response: %s" % response + except Exception: + time.sleep(4) + print "One more retry." + try: + # first refresh on the auth, as if there has been a long gap since we + # last grabbed data from Redis, we may need to re-auth. + http = GenerateAuthenticatedHttp(BQ_SCOPE) + bigquery = build("bigquery", "v2", http=http) + response = bigquery.tabledata().insertAll( + projectId=PROJECT_ID, datasetId=dataset, tableId=table, body=body + ).execute() + print "streaming response: %s" % response + except Exception, e3: + print "Giving up: %s" % e3 + + +def GenerateAuthenticatedHttp(scopes): + """Authenticate to write to BigQuery.""" + res1 = httplib2.Http().request( + 'http://metadata/0.1/meta-data/service-accounts/default/acquire?%s' + % urllib.urlencode({'scopes': scopes}), method='GET', + headers={'Content-Length': '0'}) + return AccessTokenCredentials(json.loads(res1[1])['accessToken'], + '').authorize(httplib2.Http()) + + +if __name__ == '__main__': + print "starting write to BigQuery...." + http = GenerateAuthenticatedHttp(BQ_SCOPE) + bigquery = build("bigquery", "v2", http=http) + write_to_bq(bigquery) + diff --git a/pipeline-image/twitter-to-redis.py b/pipeline-image/twitter-to-redis.py new file mode 100644 index 0000000..cc08752 --- /dev/null +++ b/pipeline-image/twitter-to-redis.py @@ -0,0 +1,85 @@ +"""This script uses the Twitter Streaming API, via the tweepy library, +to pull in tweets and store them in a Redis server. +""" + +import os + +import redis +from tweepy import OAuthHandler +from tweepy import Stream +from tweepy.streaming import StreamListener + +# Get your twitter credentials from the environment variables. +# These are set in the 'twitter-stream.json' manifest file. +consumer_key = os.environ['CONSUMERKEY'] +consumer_secret = os.environ['CONSUMERSECRET'] +access_token = os.environ['ACCESSTOKEN'] +access_token_secret = os.environ['ACCESSTOKENSEC'] + +# Get info on the Redis host and port from the environment variables. +# The name of this variable comes from the redis service id, 'redismaster'. +REDIS_HOST = os.environ['REDISMASTER_SERVICE_HOST'] +REDIS_PORT = os.environ['REDISMASTER_SERVICE_PORT'] +REDIS_LIST = os.environ['REDISLIST'] + + +class StdOutListener(StreamListener): + """A listener handles tweets that are received from the stream. + This listener dumps the tweets into Redis. + """ + + count = 0 + twstring = '' + tweets = [] + r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0) + total_tweets = 10000000 + + def write_to_redis(self, tw): + try: + self.r.lpush(REDIS_LIST, tw) + except: + print 'Problem adding sensor data to Redis.' + + def on_data(self, data): + """What to do when tweet data is received.""" + self.write_to_redis(data) + self.count += 1 + # if we've grabbed more than total_tweets tweets, exit the script. + # If this script is being run in the context of a kubernetes + # replicationController, the pod will be restarted fresh when + # that happens. + if self.count > self.total_tweets: + return False + if (self.count % 1000) == 0: + print 'count is: %s' % self.count + return True + + def on_error(self, status): + print status + + +if __name__ == '__main__': + print '....' + l = StdOutListener() + auth = OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(access_token, access_token_secret) + print 'stream mode is: %s' % os.environ['TWSTREAMMODE'] + + stream = Stream(auth, l) + # set up the streaming depending upon whether our mode is 'sample', which will + # sample the twitter public stream. If not 'sample', instead track the given + # set of keywords. + # This environment var is set in the 'twstream.json' manifest. + if os.environ['TWSTREAMMODE'] == 'sample': + stream.sample() + else: + stream.filter( + track=['bigdata', 'kubernetes', 'bigquery', 'docker', 'google', + 'googlecloud', 'golang', 'dataflow', + 'containers', 'appengine', 'gcp', 'compute', 'scalability', + 'gigaom', 'news', 'tech', 'apple', + 'amazon', 'cluster', 'distributed', 'computing', 'cloud', + 'android', 'mobile', 'ios', 'iphone', + 'python', 'recode', 'techcrunch', 'timoreilly'] + ) + diff --git a/redis-master-service.yaml b/redis-master-service.yaml new file mode 100644 index 0000000..0b11391 --- /dev/null +++ b/redis-master-service.yaml @@ -0,0 +1,15 @@ +kind: Service +apiVersion: v1beta1 +id: redismaster +# the port that this service should serve on +port: 10000 +# just like the selector in the replication controller, +# but this time it identifies the set of pods to load balance +# traffic to. +selector: + name: redis-master +# the container on each pod to connect to, can be a name +# (e.g. 'www') or a number (e.g. 80) +containerPort: 6379 +labels: + name: redismaster diff --git a/redis-master.yaml b/redis-master.yaml new file mode 100644 index 0000000..44e5a9d --- /dev/null +++ b/redis-master.yaml @@ -0,0 +1,23 @@ +apiVersion: v1beta1 +id: redis-master-pod +desiredState: + manifest: + version: v1beta1 + id: redis-master-pod + containers: + - name: redis-master + image: dockerfile/redis + ports: + - containerPort: 6379 + volumeMounts: + # name must match the volume name below + - name: redis-persistent-storage + # mount path within the container + mountPath: /data/redis + volumes: + - name: redis-persistent-storage + source: + emptyDir: {} +labels: + name: redis-master + role: master diff --git a/twitter-stream.yaml b/twitter-stream.yaml new file mode 100644 index 0000000..77f453a --- /dev/null +++ b/twitter-stream.yaml @@ -0,0 +1,41 @@ +id: twitterStreamController +apiVersion: v1beta1 +kind: ReplicationController +desiredState: + replicas: 1 + # replicaSelector identifies the set of Pods that this + # replicaController is responsible for managing + replicaSelector: + name: twitter-stream + # podTemplate defines the 'cookie cutter' used for creating + # new pods when necessary + podTemplate: + desiredState: + manifest: + version: v1beta1 + id: twitterStream + containers: + - name: twitter-to-redis + # Change this to your docker hub username + image: your-docker-hub-username/pipeline + env: + - name: PROCESSINGSCRIPT + value: twitter-to-redis + - name: REDISLIST + value: twitter-stream + # Change the following four settings to your twitter credentials + # information. + - name: CONSUMERKEY + value: xxxx + - name: CONSUMERSECRET + value: xxxx + - name: ACCESSTOKEN + value: xxxx + - name: ACCESSTOKENSEC + value: xxxx + - name: TWSTREAMMODE + value: sample + # Important: these labels need to match the selector above + # The api server enforces this constraint. + labels: + name: twitter-stream