diff --git a/.gitignore b/.gitignore index 0fba81c56..a9cfd6ea5 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,4 @@ server.pid .DS_Store .idea/ +.vscode/ diff --git a/examples/twittermap/build.sbt b/examples/twittermap/build.sbt index cec718db9..e61b9d5fa 100644 --- a/examples/twittermap/build.sbt +++ b/examples/twittermap/build.sbt @@ -40,3 +40,9 @@ lazy val guardian = (project in file("guardian")). settings( libraryDependencies ++= guardianDependencies ) + +lazy val datatools = (project in file("datatools")). + settings(Commons.settings: _*). + settings( + libraryDependencies ++= datatoolsDependencies + ).dependsOn(gnosis, util) diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapter.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapter.java new file mode 100644 index 000000000..755099d33 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapter.java @@ -0,0 +1,34 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.Map; + +/** + * AsterixDBAdapter + * + * - provides APIs to transform a JSON record to AsterixDB format (currently ADM) + * + * @author Qiushi Bai + */ +public interface AsterixDBAdapter { + + SimpleDateFormat tweetDateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss z yyyy", Locale.US); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss.SSSZZZZ"); + + String DATE = "date"; + String DATETIME = "datetime"; + String INT64 = "int64"; + String STRING = "string"; // quoted value (suitable for string) + String VALUE = "value"; // no quoted value (suitable for int, boolean types) + String STRING_SET = "string_set"; // list of quoted value + String VALUE_SET = "value_set"; // list of no quoted value + String BOUNDING_BOX = "bounding_box"; // special treatment to bounding_box column + String OBJECT = "object"; // just use ObjectMapper to write it to string + + String transform(String tweet) throws Exception; + + String transform(Map tuple) throws Exception; +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForGeneralTwitter.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForGeneralTwitter.java new file mode 100644 index 000000000..094bb64df --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForGeneralTwitter.java @@ -0,0 +1,76 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.*; + +/** + * AsterixDBAdapterForTwitter + * + * - Implementation of AsterixDBAdapter for general Twitter data + * + * TODO - This does not work for now, + * because the mapper.writeValueAsString() will quote `datetime` function call in the output String + * + * @author Qiushi Bai + */ +public class AsterixDBAdapterForGeneralTwitter implements AsterixDBAdapter { + + public AsterixDBAdapterForGeneralTwitter() { + + // Twitter uses UTC timezone + tweetDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + public String transform(String tweet) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + Map tuple = mapper.readValue(tweet, Map.class); + return transform(tuple); + } + + public String transform(Map tuple) throws Exception { + + /** + * (1) Make sure "text" is always Non-truncated. + * - if twitter is 'truncated', + * use the 'extended_tweet'->'full_text' to replace 'text' + * */ + if (tuple.containsKey("truncated") && (Boolean)tuple.get("truncated")) { + if (tuple.containsKey("extended_tweet")) { + Map extendedTweet = (Map) tuple.get("extended_tweet"); + if (extendedTweet.containsKey("full_text")) { + tuple.put("text", extendedTweet.get("full_text")); + } + } + } + + /** + * (2) Transform all 'created_at' attributes to be 'datetime' recursively + * */ + transformCreatedAt(tuple); + + // write back to String + ObjectMapper mapper = new ObjectMapper(); + return mapper.writeValueAsString(tuple); + } + + public void transformCreatedAt(Map object) throws Exception { + if (object == null) return; + + // traverse all attributes of object + for (Map.Entry entry: object.entrySet()) { + // if attribute is an object + if (entry.getValue() instanceof Map) { + // recursive call + transformCreatedAt((Map)entry.getValue()); + } + // else attribute is flat + else { + // if this attribute is called 'created_at' + if (entry.getKey().equalsIgnoreCase("created_at")) { + Date date = AsterixDBAdapter.getDate((String) entry.getValue()); + entry.setValue("datetime(\"" + dateFormat.format(date) + "T" + timeFormat.format(date) + "\")"); + } + } + } + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitter.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitter.java new file mode 100644 index 000000000..dce60f32b --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitter.java @@ -0,0 +1,306 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringEscapeUtils; +import java.util.*; + +/** + * AsterixDBAdapterForTwitter + * + * - Implementation of AsterixDBAdapter for general Twitter data + * + * @author Qiushi Bai + */ +public class AsterixDBAdapterForTwitter implements AsterixDBAdapter { + + // Map - + public Map schema; + + public ObjectMapper mapper; // used for transform OBJECT columns + + public AsterixDBAdapterForTwitter() { + + // Twitter uses UTC timezone + tweetDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + mapper = new ObjectMapper(); + + /** + * Twitter JSON schema + * https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object + * */ + // Tweet Object + schema = new HashMap<>(); + schema.put("created_at", DATETIME); + schema.put("id", INT64); + schema.put("id_str", STRING); + schema.put("text", STRING); + schema.put("source", STRING); + schema.put("truncated", VALUE); + schema.put("in_reply_to_status_id", INT64); + schema.put("in_reply_to_status_id_str", STRING); + schema.put("in_reply_to_user_id", INT64); + schema.put("in_reply_to_user_id_str", STRING); + schema.put("in_reply_to_screen_name", STRING); + + // User Object + // https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/user-object + Map user = new HashMap<>(); + user.put("id", INT64); + user.put("id_str", STRING); + user.put("name", STRING); + user.put("screen_name", STRING); + user.put("location", STRING); + // skip "derived" - enterprise API only + user.put("url", STRING); + user.put("description", STRING); + user.put("protected", VALUE); + user.put("verified", VALUE); + user.put("followers_count", VALUE); + user.put("friends_count", VALUE); + user.put("listed_count", VALUE); + user.put("favourites_count", VALUE); + user.put("statues_count", VALUE); + user.put("created_at", DATETIME); + user.put("profile_banner_url", STRING); + user.put("profile_image_url_https", STRING); + user.put("default_profile", VALUE); + user.put("default_profile_image", VALUE); + user.put("withheld_in_countries", STRING_SET); + user.put("withheld_scope", STRING); + schema.put("user", user); + + // Coordinates Object + // https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects#coordinates-dictionary + Map coordinates = new HashMap<>(); + coordinates.put("coordinates", VALUE_SET); + coordinates.put("type", STRING); + schema.put("coordinates", coordinates); + + // Place Object + // https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects#place-dictionary + Map place = new HashMap<>(); + place.put("id", STRING); + place.put("url", STRING); + place.put("place_type", STRING); + place.put("name", STRING); + place.put("full_name", STRING); + place.put("country_code", STRING); + place.put("country", STRING); + place.put("bounding_box", BOUNDING_BOX); + schema.put("place", place); + + schema.put("quoted_status_id", INT64); + schema.put("quoted_status_id_str", STRING); + schema.put("is_quote_status", VALUE); + + // quote_status Tweet Object + // TODO - skip for now + + // retweeted_status Tweet Object + // TODO - skip for now + + schema.put("quote_count", VALUE); + schema.put("reply_count", VALUE); + schema.put("retweet_count", VALUE); + schema.put("favorite_count", VALUE); + + // entities + // https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/entities-object + schema.put("entities", OBJECT); + + // extended_entities + schema.put("extended_entities", OBJECT); + + schema.put("favorited", VALUE); + schema.put("retweeted", VALUE); + schema.put("possibly_sensitive", VALUE); + schema.put("filter_level", STRING); + schema.put("lang", STRING); + // skip matching_rules + + // additional columns added by our own TwitterGeoTagger + Map geoTag = new HashMap<>(); + geoTag.put("stateID", VALUE); + geoTag.put("stateName", STRING); + geoTag.put("countyID", VALUE); + geoTag.put("countyName", STRING); + geoTag.put("cityID", VALUE); + geoTag.put("cityName", STRING); + schema.put("geo_tag", geoTag); + } + + public String transform(String tweet) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + Map tuple = mapper.readValue(tweet, Map.class); + return transform(tuple); + } + + public String transform(Map tuple) throws Exception { + + /** Hard coded special treatment for Twitter data + * - if twitter is 'truncated', + * use the 'extended_tweet'->'full_text' to replace 'text' + * */ + if (tuple.containsKey("truncated") && (Boolean)tuple.get("truncated")) { + if (tuple.containsKey("extended_tweet")) { + Map extendedTweet = (Map) tuple.get("extended_tweet"); + if (extendedTweet.containsKey("full_text")) { + tuple.put("text", extendedTweet.get("full_text")); + } + } + } + + return transformObject(tuple, schema); + } + + public String transformObject(Map object, Map schema) throws Exception { + if (object == null) return "null"; + StringBuilder tuple = new StringBuilder(); + tuple.append("{"); + int i = 0; + // recursively transform object to target format String + for (Map.Entry entry: object.entrySet()) { + if (schema.containsKey(entry.getKey())) { + // recursive schema + if (schema.get(entry.getKey()) instanceof Map) { + if (entry.getValue() instanceof Map) { + Map subSchema = (Map) schema.get(entry.getKey()); + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + tuple.append(transformObject((Map) entry.getValue(), subSchema)); + i ++; + } + // object for this key is null + else if (entry.getValue() == null) { + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + tuple.append("null"); + i ++; + } + else { + System.err.println("[AsterixDBAdapter] tuple does not match schema!"); + //-DEBUG-// + System.err.println("key = " + entry.getKey()); + System.err.println("value = " + entry.getValue()); + return ""; + } + } + // flat schema + else { + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + transformColumn(tuple, entry.getKey(), (String) schema.get(entry.getKey()), entry.getValue()); + i ++; + } + } + } + tuple.append("}"); + return tuple.toString().replaceAll("\\s+", " "); + } + + public void transformKey(StringBuilder tuple, String key) { + tuple.append('"').append(StringEscapeUtils.escapeJava(key)).append('"'); + tuple.append(":"); + } + + public void transformColumn(StringBuilder tuple, String key, String type, Object value) throws Exception { + if (value == null) { + tuple.append("null"); + return; + } + switch (type) { + case "datetime": + Date date = getDate((String) value); + transformDateTimeColumn(tuple, date); + break; + case "int64": + tuple.append( "int64(\"" + value + "\")"); + break; + case "string": + tuple.append('"').append(StringEscapeUtils.escapeJava((String)value).replaceAll("\\s+", " ")).append('"'); + break; + case "value": + tuple.append(value); + break; + case "string_set": + List stringSets = (List) value; + if (stringSets == null) { + tuple.append("null"); + return; + } + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < stringSets.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append('"').append(StringEscapeUtils.escapeJava(stringSets.get(i)).replaceAll("\\s+", " ")).append('"'); + } + sb.append("]"); + tuple.append(sb.toString()); + break; + case "value_set": + List valueSets = (List) value; + if (valueSets == null) { + tuple.append("null"); + return; + } + sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < valueSets.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append(StringEscapeUtils.escapeJava(String.valueOf(valueSets.get(i)))); + } + sb.append("]"); + tuple.append(sb.toString()); + break; + case "bounding_box": + Map boundingBox = (Map) value; + if (boundingBox == null) { + tuple.append("null"); + return; + } + List>> bbCoordinates = (List>>) boundingBox.get("coordinates"); + sb = new StringBuilder(); + sb.append("{"); + sb.append('"').append("coordinates").append('"').append(":"); + sb.append("[["); + for (int i = 0; i < bbCoordinates.get(0).size(); i ++) { + if (i > 0) { + sb.append(","); + } + List coordinate = bbCoordinates.get(0).get(i); + sb.append("[").append(coordinate.get(0)).append(",").append(coordinate.get(1)).append("]"); + } + sb.append("]]"); + sb.append(","); + sb.append('"').append("type").append('"').append(":"); + sb.append('"').append(boundingBox.get("type")).append('"'); + sb.append("}"); + tuple.append(sb.toString()); + break; + case "object": + tuple.append(mapper.writeValueAsString(value)); + break; + default: + System.err.println("key = " + key + ", type = " + type + ", value = " + value); + throw new Exception("unknown data type"); + } + } + + public synchronized static Date getDate(String dateString) throws Exception { + return tweetDateFormat.parse(dateString); + } + + public synchronized static void transformDateTimeColumn(StringBuilder tuple, Date date) { + tuple.append("datetime(\"" + dateFormat.format(date) + "T" + timeFormat.format(date) + "\")"); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitterMap.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitterMap.java new file mode 100644 index 000000000..4b578784b --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterForTwitterMap.java @@ -0,0 +1,285 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.uci.ics.cloudberry.util.Rectangle; +import org.apache.commons.lang3.StringEscapeUtils; +import java.util.*; + +import static edu.uci.ics.cloudberry.datatools.twitter.geotagger.TwitterGeoTagger.boundingBox2Rectangle; + +/** + * AsterixDBAdapterForTwitterMap + * + * - Implementation of AsterixDBAdapter for TwitterMap application data + * + * TODO - re-implement this by using the old way of Tweet.toADM (object hard-coded transform) + * + * @author Qiushi Bai + */ +public class AsterixDBAdapterForTwitterMap implements AsterixDBAdapter { + + public String POINT = "point"; + public String RECTANGLE = "rectangle"; + + // Map - + public Map schema; + public Map stringSetKeys; + public Map rename; + + public AsterixDBAdapterForTwitterMap() { + + // Twitter uses UTC timezone + tweetDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + // currently hard coded with TwitterMap schema + // TODO - read the schema from a config file + schema = new HashMap<>(); + stringSetKeys = new HashMap<>(); + rename = new HashMap<>(); + schema.put("created_at", DATETIME); + rename.put("created_at", "create_at"); + schema.put("id", INT64); + schema.put("text", STRING); + schema.put("in_reply_to_status_id", INT64); + rename.put("in_reply_to_status_id", "in_reply_to_status"); + schema.put("in_reply_to_user_id", INT64); + rename.put("in_reply_to_user_id", "in_reply_to_user"); + schema.put("favorite_count", VALUE); + schema.put("retweet_count", VALUE); + schema.put("lang", STRING); + schema.put("is_retweet", VALUE); + schema.put("coordinates", POINT); + rename.put("coordinates", "coordinate"); + schema.put("hashtags", STRING_SET); + stringSetKeys.put("hashtags", "text"); + schema.put("user_mentions", STRING_SET); + stringSetKeys.put("user_mentions", "id"); + + Map place = new HashMap<>(); + place.put("country", STRING); + place.put("country_code", STRING); + place.put("full_name", STRING); + place.put("id", STRING); + place.put("name", STRING); + place.put("place_type", STRING); + place.put("bounding_box", RECTANGLE); + schema.put("place", place); + + Map user = new HashMap<>(); + user.put("id", INT64); + user.put("name", STRING); + user.put("screen_name", STRING); + user.put("profile_image_url", STRING); + user.put("lang", STRING); + user.put("location", STRING); + user.put("created_at", DATE); + user.put("description", STRING); + user.put("followers_count", VALUE); + user.put("friends_count", VALUE); + user.put("statues_count", VALUE); + schema.put("user", user); + + Map geoTag = new HashMap<>(); + geoTag.put("stateID", VALUE); + geoTag.put("stateName", STRING); + geoTag.put("countyID", VALUE); + geoTag.put("countyName", STRING); + geoTag.put("cityID", VALUE); + geoTag.put("cityName", STRING); + schema.put("geo_tag", geoTag); + } + + public String transform(String tweet) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + Map tuple = mapper.readValue(tweet, Map.class); + return transform(tuple); + } + + public String transform(Map tuple) throws Exception { + + /** Hard coded special treatment for Twitter data + * - if twitter is 'truncated', + * use the 'extended_tweet'->'full_text' to replace 'text' + * */ + if (tuple.containsKey("truncated") && (Boolean)tuple.get("truncated")) { + if (tuple.containsKey("extended_tweet")) { + Map extendedTweet = (Map) tuple.get("extended_tweet"); + if (extendedTweet.containsKey("full_text")) { + tuple.put("text", extendedTweet.get("full_text")); + } + } + } + + return transformObject(tuple, schema); + } + + public String transformObject(Map object, Map schema) throws Exception { + if (object == null) return "null"; + StringBuilder tuple = new StringBuilder(); + tuple.append("{"); + int i = 0; + // recursively transform object to target format String + for (Map.Entry entry: object.entrySet()) { + if (schema.containsKey(entry.getKey())) { + // recursive schema + if (schema.get(entry.getKey()) instanceof Map) { + if (entry.getValue() instanceof Map) { + Map subSchema = (Map) schema.get(entry.getKey()); + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + tuple.append(transformObject((Map) entry.getValue(), subSchema)); + i ++; + } + // object for this key is null + else if (entry.getValue() == null) { + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + tuple.append("null"); + i ++; + } + else { + System.err.println("[AsterixDBAdapter] tuple does not match schema!"); + //-DEBUG-// + System.err.println("key = " + entry.getKey()); + System.err.println("value = " + entry.getValue()); + return ""; + } + } + // flat schema + else { + if (i >= 1) { + tuple.append(","); + } + transformKey(tuple, entry.getKey()); + transformColumn(tuple, entry.getKey(), (String) schema.get(entry.getKey()), entry.getValue()); + i ++; + } + } + } + tuple.append("}"); + return tuple.toString().replaceAll("\\s+", " "); + } + + public void transformKey(StringBuilder tuple, String key) { + if (rename.containsKey(key)) { + tuple.append('"').append(StringEscapeUtils.escapeJava(rename.get(key))).append('"'); + } + else { + tuple.append('"').append(StringEscapeUtils.escapeJava(key)).append('"'); + } + tuple.append(":"); + } + + public void transformColumn(StringBuilder tuple, String key, String type, Object value) throws Exception { + if (value == null) { + tuple.append("null"); + return; + } + switch (type) { + case "date": + Date date = getDate((String) value); + transformDateColumn(tuple, date); + break; + case "datetime": + date = getDate((String) value); + transformDateTimeColumn(tuple, date); + break; + case "int64": + tuple.append( "int64(\"" + value + "\")"); + break; + case "string": + tuple.append('"').append(StringEscapeUtils.escapeJava((String)value).replaceAll("\\s+", " ")).append('"'); + break; + case "value": + tuple.append(value); + break; + case "string_set": + List stringSets = (List) value; + if (stringSets == null) { + tuple.append("null"); + return; + } + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < stringSets.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append('"').append(StringEscapeUtils.escapeJava(stringSets.get(i)).replaceAll("\\s+", " ")).append('"'); + } + sb.append("]"); + tuple.append(sb.toString()); + break; + case "value_set": + List valueSets = (List) value; + if (valueSets == null) { + tuple.append("null"); + return; + } + sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < valueSets.size(); i++) { + if (i > 0) { + sb.append(','); + } + sb.append(StringEscapeUtils.escapeJava(String.valueOf(valueSets.get(i)))); + } + sb.append("]"); + tuple.append(sb.toString()); + break; + case "point": + Map coordinates = (Map ) value; + if (coordinates == null) { + tuple.append("null"); + return; + } + List coordinate = (List) coordinates.get("coordinates"); + if (coordinate == null || coordinate.size() != 2) { + tuple.append("null"); + return; + } + tuple.append("point(\"" + coordinate.get(0) + "," + coordinate.get(1) + "\")"); + break; + case "rectangle": + Map boundingBox = (Map) value; + if (boundingBox == null) { + tuple.append("null"); + return; + } + // TODO - this logic was migrated from legacy code, should be revisited for newer version Twitter APIs. + List>> bbCoordinates = (List>>) boundingBox.get("coordinates"); + if (bbCoordinates == null || bbCoordinates.size() != 1 || (bbCoordinates.get(0).size() != 4 && bbCoordinates.get(0).size() != 2)) { + tuple.append("null"); + return; + } + sb = new StringBuilder("rectangle"); + Rectangle rectangle = boundingBox2Rectangle(bbCoordinates.get(0)); + sb.append("(\"").append(rectangle.swLog()).append(',') + .append(rectangle.swLat()) + .append(' ') + .append(rectangle.neLog()).append(',') + .append(rectangle.neLat()) + .append("\")"); + tuple.append(sb.toString()); + break; + default: + throw new Exception("unknown data type"); + } + } + + public synchronized static Date getDate(String dateString) throws Exception { + return tweetDateFormat.parse(dateString); + } + + public synchronized static void transformDateColumn(StringBuilder tuple, Date date) { + tuple.append("date(\"" + dateFormat.format(date) + "\")"); + } + + public synchronized static void transformDateTimeColumn(StringBuilder tuple, Date date) { + tuple.append("datetime(\"" + dateFormat.format(date) + "T" + timeFormat.format(date) + "\")"); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBFeedClient.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBFeedClient.java new file mode 100644 index 000000000..b7ad786e5 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBFeedClient.java @@ -0,0 +1,53 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; + +public class AsterixDBFeedClient { + private OutputStream out = null; + private int counter = 0; + private long startTime = 0; + + protected String host; + protected int port; + + protected Socket socket; + + public AsterixDBFeedClient(String host, int port) { + this.host = host; + this.port = port; + } + + public void initialize() throws IOException { + socket = new Socket(host, port); + out = socket.getOutputStream(); + } + + public void finalize() { + try { + out.close(); + socket.close(); + System.err.println("Socket feed to AsterixDB at " + host + ":" + port + " - Total # of ingested records: " + counter); + } catch (IOException e) { + System.err.println("Problem in closing socket against host " + host + " on the port " + port); + e.printStackTrace(); + } + } + + public void ingest(String record) throws IOException { + // initialize timer when ingest the first record + if (counter == 0) startTime = System.currentTimeMillis(); + + counter ++; + // output statistics for every 10,000 records ingested + if (counter % 10000 == 0) { + System.err.println("Socket feed to AsterixDB at " + host + ":" + port + " - # of ingested records: " + counter); + long endTime = System.currentTimeMillis(); + double rate = counter * 1000.0 / (endTime - startTime); + System.err.println("Average ingestion rate: " + rate + " records/second"); + } + byte[] b = record.replaceAll("\\s+", " ").getBytes(); + out.write(b); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionConfig.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionConfig.java new file mode 100644 index 000000000..2cb07cab2 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionConfig.java @@ -0,0 +1,162 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import java.util.HashSet; +import java.util.Set; +import org.apache.commons.cli.*; + +public class AsterixDBIngestionConfig { + + // source + private String fromProxy = null; + + // geo-json files, only required if fromProxy is not null + String stateJsonFile = null; + String countyJsonFile = null; + String cityJsonFile = null; + + // target AsterixDB + private String host = null; + private int port; + + // AsterixDB adapter + private String adapterName = null; + + public String getFromProxy() { + return fromProxy; + } + + public String getStateJsonFile() { + return stateJsonFile; + } + + public String getCountyJsonFile() { + return countyJsonFile; + } + + public String getCityJsonFile() { + return cityJsonFile; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getAdapterName() { + return adapterName; + } + + public static AsterixDBIngestionConfig createFromCLIArgs(String[] args) { + // define cli arguments options, consistent with members of this class + final Options options = new Options(); + final Option fromProxyOpt = Option.builder("fp") + .longOpt("from-proxy") + .desc("URL of the source twitter ingestion proxy, if not given, it will ingest from stdin.") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option stateOpt = Option.builder("state") + .longOpt("state-json-file") + .desc("State Json file for geographical information of states in the U.S.") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option countyOpt = Option.builder("county") + .longOpt("county-json-file") + .desc("County Json file for geographical information of counties in the U.S.") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option cityOpt = Option.builder("city") + .longOpt("city-json-file") + .desc("City Json file for geographical information of cities in the U.S.") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option hostOpt = Option.builder("h") + .longOpt("host") + .desc("Domain name or IP address of the target AsterixDB host.") + .type(String.class) + .required() + .hasArg() + .build(); + final Option portOpt = Option.builder("p") + .longOpt("port") + .desc("Port of target AsterixDB socket feed.") + .type(Integer.class) + .required() + .hasArg() + .build(); + final Option adapterNameOpt = Option.builder("an") + .longOpt("adapter-name") + .desc("AsterixDB adapter name using which to transform from JSON format to ADM format, two options: (1) twitter - for general Twitter data with maximized output columns; (2) twittermap - for TwitterMap application data with reduced output columns. (Default: twittermap)") + .type(String.class) + .required(false) + .hasArg() + .build(); + options.addOption(fromProxyOpt); + options.addOption(stateOpt); + options.addOption(countyOpt); + options.addOption(cityOpt); + options.addOption(hostOpt); + options.addOption(portOpt); + options.addOption(adapterNameOpt); + + // parse args to generate a TwitterIngestionConfig object + CommandLineParser parser = new DefaultParser(); + try { + CommandLine cmd = parser.parse(options, args); + + AsterixDBIngestionConfig config = new AsterixDBIngestionConfig(); + + // source + if (cmd.hasOption("from-proxy")) { + config.fromProxy = cmd.getOptionValue("from-proxy"); + if (!cmd.hasOption("state-json-file") || !cmd.hasOption("county-json-file") || !cmd.hasOption("city-json-file")) { + throw new ParseException("If from-proxy is given, state, county, city json files arguments are required."); + } + config.stateJsonFile = cmd.getOptionValue("state-json-file"); + config.countyJsonFile = cmd.getOptionValue("county-json-file"); + config.cityJsonFile = cmd.getOptionValue("city-json-file"); + } + + // target AsterixDB + config.host = cmd.getOptionValue("host"); + config.port = Integer.parseInt(cmd.getOptionValue("port")); + + // AsterixDB adapter + config.adapterName = cmd.getOptionValue("adapter-name", "twittermap"); + Set candidateAdapterNames = new HashSet(); + candidateAdapterNames.add("twitter"); + candidateAdapterNames.add("twittermap"); + if (!candidateAdapterNames.contains(config.adapterName)) { + throw new ParseException("The given adapter-name [" + config.adapterName + "] is not supported! Please give one of the two adapter-names: twitter, twittermap."); + } + + return config; + + } catch (ParseException e) { + e.printStackTrace(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(100, "AsterixDBIngestionDriver", "", + options ,"Example: \n" + + "java -cp datatools-assembly-1.0-SNAPSHOT.jar \\ \n" + + "edu.uci.ics.cloudberry.datatools.asterixdb.AsterixDBIngestionDriver \\ \n" + + "-fp ws://localhost:9088/proxy \\ \n" + + "-state web/public/data/state.json \\ \n" + + "-county web/public/data/county.json \\ \n" + + "-city web/public/data/city.json \\ \n" + + "-h localhost \\ \n" + + "-p 10001 \\ \n"); + } + + return null; + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionDriver.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionDriver.java new file mode 100644 index 000000000..c49f77085 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBIngestionDriver.java @@ -0,0 +1,178 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import edu.uci.ics.cloudberry.datatools.twitter.geotagger.TwitterGeoTagger; +import edu.uci.ics.cloudberry.datatools.twitter.geotagger.USGeoGnosisLoader; +import edu.uci.ics.cloudberry.gnosis.USGeoGnosis; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Map; + +/** + * AsterixDBIngestionDriver + * + * operates in the following two modes: + * + * - (1) If given `from-proxy` url argument, + * it listens to the TwitterIngestionProxy websocket, + * and for each tweet received, it geotags, transforms and + * then ingests the tweet to the target AsterixDB. + * + * - (2) If not given `from-proxy` url argument, + * it reads tweets from stdin line by line, + * and for each tweet received, it transforms and + * then ingests the tweet to the target AsterixDB. + */ +public class AsterixDBIngestionDriver { + + public static ProxyIngestionWorker proxyIngestionWorker; + public static AsterixDBFeedClient asterixDBFeedClient; + public static AsterixDBAdapter asterixDBAdapter; + public static USGeoGnosis usGeoGnosis; + public static long startTime; + public static long counter; + + /** + * tag and ingest one tweet + * + * - (1) use TwitterGeoTagger to geotag the tweet + * - (2) use AsterixDBAdapter to transform a tweet from JSON to AsterixDB object (reducing columns at the same time) + * - (3) use AsterixDBFeedClient to ingest the tweet to the target AsterixDB + * + * @param tweet + */ + public static void tagAndIngestOneTweet(String tweet) { + Map tweetObject = TwitterGeoTagger.tagOneTweet(usGeoGnosis, tweet); + try { + String tweetADBObject = asterixDBAdapter.transform(tweetObject); + asterixDBFeedClient.ingest(tweetADBObject); + } + catch (Exception e) { + System.err.println("processOneTweet failed!"); + e.printStackTrace(); + } + } + + /** + * ingest one tweet + * + * - (1) use AsterixDBAdapter to transform a tweet from JSON to AsterixDB object (reducing columns at the same time) + * - (2) use AsterixDBFeedClient to ingest the tweet to the target AsterixDB + * + * @param tweet + */ + public static void ingestOneTweet(String tweet) { + try { + String tweetADBObject = asterixDBAdapter.transform(tweet); + asterixDBFeedClient.ingest(tweetADBObject); + } + catch (Exception e) { + System.err.println("ingestOneTweet failed!"); + e.printStackTrace(); + } + } + + public static void main(String[] args) { + // parse command line arguments + AsterixDBIngestionConfig config = AsterixDBIngestionConfig.createFromCLIArgs(args); + // parsing exception or config invalid + if (config == null) { + return; + } + + // start an AsterixDB feed client to the target AsterixDB + asterixDBFeedClient = new AsterixDBFeedClient(config.getHost(), config.getPort()); + try { + asterixDBFeedClient.initialize(); + } catch (IOException e) { + System.err.println("Establish connection to the target AsterixDB Feed [" + config.getHost() + ": " + config.getPort() + "] failed!"); + e.printStackTrace(); + return; + } + + // initialize AsterixDB adapter + if (config.getAdapterName().equalsIgnoreCase("twittermap")) { + asterixDBAdapter = new AsterixDBAdapterForTwitterMap(); + } + else { + asterixDBAdapter = new AsterixDBAdapterForTwitter(); + } + + // mode (1) - from Twitter ingestion proxy + if (config.getFromProxy() != null) { + + // create a USGeoGnosis object for the use of TwitterGeoTagger + try { + usGeoGnosis = USGeoGnosisLoader.loadUSGeoGnosis(config.getStateJsonFile(), + config.getCountyJsonFile(), config.getCityJsonFile()); + } catch (Exception e) { + System.err.println("Load USGeoGnosis failed!"); + System.err.println("Please check the 3 GeoJson files for state, county and city are available."); + e.printStackTrace(); + } + + // start a Proxy Ingestion Worker to use a TwitterIngestProxySocketClient to get tweets from the Proxy Server + proxyIngestionWorker = new ProxyIngestionWorker(config); + Thread proxyIngestionWorkerThread = new Thread(proxyIngestionWorker); + // clean up when shutdown + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (proxyIngestionWorker != null) { + proxyIngestionWorker.cleanUp(); + } + })); + proxyIngestionWorkerThread.start(); + try { + proxyIngestionWorkerThread.join(); + } + catch(Exception e) { + e.printStackTrace(System.err); + } + + // WebSocketClient client = new WebSocketClient(); + // TwitterIngestionProxySocketClient socket = new TwitterIngestionProxySocketClient(); + // try { + // client.start(); + // URI fromProxyUri = new URI(config.getFromProxy()); + // ClientUpgradeRequest request = new ClientUpgradeRequest(); + // client.connect(socket, fromProxyUri, request); + // } + // catch (Exception e) { + // System.err.println("Establish connection to the source proxy socket [" + config.getFromProxy() + "] failed!"); + // e.printStackTrace(); + // } + } + // mode (2) - from stdin + // TODO - for this mode, also support use USGeoGnosis to geotag each tweet and then ingest into AsterixDB, + // similar to mode (1), but also use multi-threading similar to TwitterGeoTagger. + // Then it can be used to for ingesting raw tweets from files into AsterixDB efficiently. + else { + counter = 0; + startTime = System.currentTimeMillis(); + try { + InputStreamReader reader = new InputStreamReader(System.in); + BufferedReader bufferedReader = new BufferedReader(reader); + String tweet; + while ((tweet = bufferedReader.readLine()) != null) { + ingestOneTweet(tweet); + counter++; + if (counter % 100000 == 0) { + printStats(); + } + } + System.err.println(">>> Ingestion done! <<<"); + printStats(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public static void printStats() { + long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;; + long elapsedMinutes = elapsedSeconds / 60; + double rate = (double) counter / elapsedSeconds; + System.err.println(">>> # of ingested records: " + counter + " Elapsed (s) : " + + elapsedSeconds + " (m) : " + elapsedMinutes + " record/sec : " + rate); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/ProxyIngestionWorker.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/ProxyIngestionWorker.java new file mode 100644 index 000000000..c6ae78c84 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/ProxyIngestionWorker.java @@ -0,0 +1,60 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; + +import java.net.URI; + +/** + * ProxyIngestionWorker + * + * ProxyIngestionWorker is a runnable class. + * Once starts, it connects to given `from-proxy` url, and listens to the TwitterIngestionProxy websocket. + * + * @author Qiushi Bai, baiqiushi@gmail.com + */ +public class ProxyIngestionWorker implements Runnable{ + + AsterixDBIngestionConfig config; + WebSocketClient client; + + public ProxyIngestionWorker(AsterixDBIngestionConfig _config) { + config = _config; + } + + @Override + public void run() { + System.err.println("Proxy Ingestion Worker starts!"); + + // start a Twitter ingestion proxy socket client + client = new WebSocketClient(); + TwitterIngestionProxySocketClient socket = new TwitterIngestionProxySocketClient(); + try { + client.start(); + URI fromProxyUri = new URI(config.getFromProxy()); + ClientUpgradeRequest request = new ClientUpgradeRequest(); + client.connect(socket, fromProxyUri, request); + + System.err.println("Establish connection to the source proxy socket [" + config.getFromProxy() + "] succeeded!"); + } + catch (Exception e) { + System.err.println("Establish connection to the source proxy socket [" + config.getFromProxy() + "] failed!"); + e.printStackTrace(); + } + + while(true) { + try { + Thread.sleep(1000); + } + catch(Exception e) { + e.printStackTrace(System.err); + } + } + + //System.err.println("Proxy Ingestion Worker ends!"); + } + + public void cleanUp() { + // TODO - add any necessary cleanup code here. + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/README.md b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/README.md new file mode 100644 index 000000000..56428adca --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/README.md @@ -0,0 +1,52 @@ +# AsterixDB Ingestion Driver +A Java program to geotag and ingest Twitter JSON data into AsterixDB Socket Feed. + - mode (1) - geotag and ingest tweets from Twitter Ingestion Server Proxy. + - mode (2) - ingest geotagged tweets from stdin. + +## Prerequisite +- Java 8 + +## Build +```bash +$ cd twittermap +$ sbt 'project datatools' assembly +``` + +## Deploy +Copy the runnable file `datatools/target/scala-2.11/datatools-assembly-1.0-SNAPSHOT.jar` to your server. + +## Make a shell script for mode (1) +Create a `ingestProxy.sh` file with content: +```bash +#!/usr/bin/env bash +java -cp /path/to/datatools-assembly-1.0-SNAPSHOT.jar \ +edu.uci.ics.cloudberry.datatools.asterixdb.AsterixDBIngestionDriver \ + -fp ws://localhost:9088/proxy \ + -state /path/to/state.json \ + -county /path/to/county.json \ + -city /path/to/city.json \ + -h localhost \ + -p 10001 +``` +##### Note: The json files of state, county and city can be found in `twittermap/web/public/data/`. + +## Make a shell script for mode (2) +Create a `ingestFile.sh` file with content: +```bash +#!/usr/bin/env bash +java -cp /path/to/datatools-assembly-1.0-SNAPSHOT.jar \ +edu.uci.ics.cloudberry.datatools.asterixdb.AsterixDBIngestionDriver \ + -h localhost \ + -p 10001 +``` + +## Run AsterixDBIngestionDriver to geotag and ingest Twitter data from proxy server. +```bash +./ingestProxy.sh +``` + +## Run AsterixDBIngestionDriver to ingest geotagged tweets from stdin. +Suppose your Twitter JSON data is in file `Tweet_2020-05-03.gz`. +```bash +gunzip -c Tweet_2020-05-03.gz | ./geotag.sh 2> /dev/null | grep '^{' | ./ingestFile.sh +``` \ No newline at end of file diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/TwitterIngestionProxySocketClient.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/TwitterIngestionProxySocketClient.java new file mode 100644 index 000000000..159f16f66 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/asterixdb/TwitterIngestionProxySocketClient.java @@ -0,0 +1,36 @@ +package edu.uci.ics.cloudberry.datatools.asterixdb; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; + +@WebSocket(maxTextMessageSize = 64 * 1024) // For one tweet, we believe it's safe to set max text message size as 64KB +public class TwitterIngestionProxySocketClient { + + private Session session; + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + System.err.println("Socket Closed: [" + statusCode + "] " + reason); + } + + @OnWebSocketConnect + public void onConnect(Session session) + { + System.err.println("Socket Connected: " + session); + this.session = session; + } + + @OnWebSocketMessage + public void onMessage(String message) + { + AsterixDBIngestionDriver.tagAndIngestOneTweet(message); + } + + @OnWebSocketError + public void onError(Throwable cause) + { + System.err.println("Socket error:"); + cause.printStackTrace(System.err); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/README.md b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/README.md new file mode 100644 index 000000000..f8b3da72a --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/README.md @@ -0,0 +1,20 @@ +# Twitter Ingestion Server +A daemon service that can ingest real-time tweets from [Twitter Filter Stream API](https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter) into local gzip files in a daily rotation manner. + +## Prerequisite +- Java 8 + +## Build +```bash +$ cd twittermap +$ sbt 'project datatools' assembly +``` + +## Deploy +Copy the runnable file `datatools/target/scala-2.11/datatools-assembly-1.0-SNAPSHOT.jar` to your server. + +## Start service +Run command to start the Twitter Ingestion service: +```bash +java -cp datatools-assembly-1.0-SNAPSHOT.jar edu.uci.ics.cloudberry.datatools.twitter.TwitterIngestionServer +``` diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionConfig.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionConfig.java new file mode 100644 index 000000000..d969a1796 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionConfig.java @@ -0,0 +1,245 @@ +package edu.uci.ics.cloudberry.datatools.twitter; + +import java.util.HashSet; +import java.util.Set; +import com.twitter.hbc.core.endpoint.Location; +import org.apache.commons.cli.*; + +public class TwitterIngestionConfig { + + // authentication + private String consumerKey = null; + private String consumerSecret = null; + private String token = null; + private String tokenSecret = null; + + // filters + private String[] trackKeywords = null; + private Location[] trackLocations = null; + + // output + private String filePrefix = null; + private String outputPath = null; + private String rotateMode = null; + + // proxy + private int proxyPort; + + public String getConsumerKey() { + return consumerKey; + } + + public String getConsumerSecret() { + return consumerSecret; + } + + public String getToken() { + return token; + } + + public String getTokenSecret() { + return tokenSecret; + } + + public String[] getTrackKeywords() { + return trackKeywords; + } + + public Location[] getTrackLocations() { + return trackLocations; + } + + public String getFilePrefix() { + return filePrefix; + } + + public String getOutputPath() { + return outputPath; + } + + public String getRotateMode() { + return rotateMode; + } + + public int getProxyPort() { + return proxyPort; + } + + /** + * create a TwitterIngestionConfig object from parsing CLI arguments + * + * @param args + * @return TwitterIngestionConfig object, or null if any exception. + */ + public static TwitterIngestionConfig createFromCLIArgs(String[] args) { + // define cli arguments options, consistent with members of this class + final Options options = new Options(); + final Option consumerKeyOpt = Option.builder("ck") + .longOpt("consumer-key") + .desc("Consumer Key for Twitter OAuth") + .type(String.class) + .required() + .hasArg() + .build(); + final Option consumerSecretOpt = Option.builder("cs") + .longOpt("consumer-secret") + .desc("Consumer Secret for Twitter OAuth") + .type(String.class) + .required() + .hasArg() + .build(); + final Option tokenOpt = Option.builder("tk") + .longOpt("token") + .desc("Token for Twitter OAuth") + .type(String.class) + .required() + .hasArg() + .build(); + final Option tokenSecretOpt = Option.builder("ts") + .longOpt("token-secret") + .desc("Token secret for Twitter OAuth") + .type(String.class) + .required() + .hasArg() + .build(); + final Option trackOpt = Option.builder("tr") + .longOpt("track") + .desc("Keywords to track. Phrases of keywords are specified by a comma-separated list. \n" + + "See https://developer.twitter.com/en/docs/tweets/filter-realtime/guides/basic-stream-parameters for more information.") + .type(String.class) + .required(false) + .numberOfArgs(400) // maximum 400 keywords + .valueSeparator(',') + .build(); + final Option locationOpt = Option.builder() + .longOpt("locations") + .desc("Specifies a set of bounding boxes to track. Each bounding box must be in format: southwest.longitude, southwest.latitude, northeast.longitude, northeast.latitude. \n" + + "Note: keywords filter (--track) and locations filter (--locations) are in OR logical relationship.") + .type(Double.class) + .required(false) + .numberOfArgs(100) // 4 values for the bounding box + .valueSeparator(',') + .build(); + final Option filePrefixOpt = Option.builder("fp") + .longOpt("file-prefix") + .desc("Prefix for output gzip file names. (Default: Tweet)") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option outputPathOpt = Option.builder("op") + .longOpt("output-path") + .desc("Output path for output gzip files. (Default: ./)") + .required(false) + .hasArg() + .build(); + final Option rotateModeOpt = Option.builder("rm") + .longOpt("rotate-mode") + .desc("Output file rotate mode, supported values: (1) daily/day/d - daily rotating; (2) weekly/week/w - weekly rotating; (3) monthly/month/m - monthly rotating. (Default: weekly)") + .type(String.class) + .required(false) + .hasArg() + .build(); + final Option proxyPortOpt = Option.builder("pp") + .longOpt("proxy-port") + .desc("Port to which the proxy server will listen to, " + + "the proxy server outputs real time ingested tweets to any connected websocket client. " + + "(Default: 9088) Set -1 to disable this proxy server.") + .type(Integer.class) + .required(false) + .hasArg() + .build(); + options.addOption(consumerKeyOpt); + options.addOption(consumerSecretOpt); + options.addOption(tokenOpt); + options.addOption(tokenSecretOpt); + options.addOption(trackOpt); + options.addOption(locationOpt); + options.addOption(filePrefixOpt); + options.addOption(outputPathOpt); + options.addOption(rotateModeOpt); + options.addOption(proxyPortOpt); + + // parse args to generate a TwitterIngestionConfig object + CommandLineParser parser = new DefaultParser(); + try { + CommandLine cmd = parser.parse(options, args); + + TwitterIngestionConfig config = new TwitterIngestionConfig(); + // authentication + config.consumerKey = cmd.getOptionValue("consumer-key"); + config.consumerSecret = cmd.getOptionValue("consumer-secret"); + config.token = cmd.getOptionValue("token"); + config.tokenSecret = cmd.getOptionValue("token-secret"); + // filters + if (cmd.hasOption("track")) { + config.trackKeywords = cmd.getOptionValues("track"); + } + if (cmd.hasOption("locations")) { + String[] lnglats = cmd.getOptionValues("locations"); + if (lnglats.length % 4 != 0) { + throw new ParseException("The number of values for each bounding box should be four!"); + } + config.trackLocations = new Location[lnglats.length / 4]; + for (int i = 0; i < lnglats.length; i += 4) { + config.trackLocations[i / 4] = new Location( + new Location.Coordinate(Double.parseDouble(lnglats[i].trim()), + Double.parseDouble(lnglats[i + 1].trim())), + new Location.Coordinate(Double.parseDouble(lnglats[i + 2].trim()), + Double.parseDouble(lnglats[i + 3].trim()))); + } + } + if (config.getTrackKeywords() == null && config.getTrackLocations() == null) { + throw new ParseException("Please provide at least one tracking keyword, or one location bounding box!"); + } + // output + config.filePrefix = cmd.getOptionValue("file-prefix", "Tweet"); + config.outputPath = cmd.getOptionValue("output-path", "./"); + // rotate mode + if (cmd.hasOption("rotate-mode")) { + config.rotateMode = cmd.getOptionValue("rotate-mode"); + Set candidateRotateModes = new HashSet(); + candidateRotateModes.add("daily"); + candidateRotateModes.add("day"); + candidateRotateModes.add("d"); + candidateRotateModes.add("weekly"); + candidateRotateModes.add("week"); + candidateRotateModes.add("w"); + candidateRotateModes.add("monthly"); + candidateRotateModes.add("month"); + candidateRotateModes.add("m"); + if (!candidateRotateModes.contains(config.rotateMode)) { + throw new ParseException("The given rotate-mode [" + config.rotateMode + "] is not supported! Please give one of the three rotate-modes: daily, weekly, monthly."); + } + } + else{ + config.rotateMode = "weekly"; + } + // proxy + config.proxyPort = Integer.parseInt(cmd.getOptionValue("proxy-port", "9088")); + + return config; + + } catch (ParseException e) { + e.printStackTrace(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(100, "TwitterIngestion", "", + options ,"Example: \n" + + "java -cp datatools-assembly-1.0-SNAPSHOT.jar \\ \n" + + "edu.uci.ics.cloudberry.datatools.twitter.TwitterIngestionServer \\ \n" + + "-ck your_own_consumer_key \\ \n" + + "-cs your_own_consumer_secret \\ \n" + + "-tk your_own_token \\ \n" + + "-ts your_own_token_secret \\ \n" + + "-tr=hurricane,tornado,storm \\ \n" + + "--locations=-170,30,-160,40,100,20,120,40 \\ \n" + + "-fp Twitter_hurricane \\ \n" + + "-op ./ \n" + + "-rm weekly \n" + + "To get your own authentication keys, " + + "visit: https://developer.twitter.com/en/docs/basics/authentication/oauth-1-0a"); + } + + return null; + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionProxySocketServer.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionProxySocketServer.java new file mode 100644 index 000000000..cb159dff8 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionProxySocketServer.java @@ -0,0 +1,38 @@ +package edu.uci.ics.cloudberry.datatools.twitter; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.*; + +@WebSocket(maxTextMessageSize = 64 * 1024) // For one tweet, we believe it's safe to set max text message size as 64KB +public class TwitterIngestionProxySocketServer { + + int sessionId; + + @OnWebSocketConnect + public void onConnect(Session session) + { + System.err.println("Socket Connected: " + session); + sessionId = TwitterIngestionServer.subscribe(session.getRemote()); + } + + @OnWebSocketMessage + public void onMessage(String message) + { + System.err.println("Received TEXT message: " + message); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + System.err.println("Socket Closed: [" + statusCode + "] " + reason); + TwitterIngestionServer.unsubscribe(sessionId); + } + + @OnWebSocketError + public void onError(Throwable cause) + { + System.err.println("Socket error:"); + cause.printStackTrace(System.err); + TwitterIngestionServer.unsubscribe(sessionId); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionServer.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionServer.java new file mode 100644 index 000000000..074689bda --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionServer.java @@ -0,0 +1,146 @@ +package edu.uci.ics.cloudberry.datatools.twitter; + +import org.eclipse.jetty.http.pathmap.ServletPathSpec; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.resource.Resource; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class TwitterIngestionServer { + + // for websocket clients to subscribe to realtime ingestion proxy + public static ConcurrentHashMap subscribers = new ConcurrentHashMap<>(); + + public static int subscribe(RemoteEndpoint subscriber) { + int sessionId = subscribers.size() + 1; + subscribers.put(sessionId, subscriber); + return sessionId; + } + + public static void unsubscribe(int sessionId) { + subscribers.remove(sessionId); + } + + public static void publish(String tweet) { + for (Map.Entry subscriber: subscribers.entrySet() ) { + subscriber.getValue().sendStringByFuture(tweet); + } + } + + // for http clients to get statistics of TwitterIngestionWorker + public static TwitterIngestionWorker twitterIngestionWorker; + + public static String getStats() { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("\"startTime\": \"" + twitterIngestionWorker.getStartTime() + "\""); + sb.append(","); + sb.append("\"counter\": \"" + twitterIngestionWorker.getCounter() + "\""); + sb.append(","); + sb.append("\"averageRate\": \"" + twitterIngestionWorker.getAverageRate() + "\""); + sb.append(","); + sb.append("\"instantRate\": \"" + twitterIngestionWorker.getInstantRate() + "\""); + sb.append("}"); + return sb.toString(); + } + + /** + * TwitterIngestionProxySocketServerCreator + * + * only used for jetty server websocket filter setup + */ + public static class TwitterIngestionProxySocketServerCreator implements WebSocketCreator + { + @Override + public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) + { + return new TwitterIngestionProxySocketServer(); + } + } + + public static void main(String[] args) { + + // parse command line arguments + TwitterIngestionConfig config = TwitterIngestionConfig.createFromCLIArgs(args); + // parsing exception or config invalid + if (config == null) { + return; + } + + // start Twitter Ingestion Worker + twitterIngestionWorker = new TwitterIngestionWorker(config); + Thread twitterIngestionWorkerThread = new Thread(twitterIngestionWorker); + // clean up when shutdown + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + if (twitterIngestionWorker != null) { + twitterIngestionWorker.cleanUp(); + } + })); + twitterIngestionWorkerThread.start(); + + // start Twitter Ingestion Proxy + if (config.getProxyPort() > 0) { + try { + Server server = new Server(); + ServerConnector connector = new ServerConnector(server); + connector.setPort(config.getProxyPort()); + server.addConnector(connector); + + URL webRootLocation = TwitterIngestionServer.class.getResource("/webroot/index.html"); + if (webRootLocation == null) + { + throw new IllegalStateException("Unable to determine webroot URL location"); + } + + URI webRootUri = URI.create(webRootLocation.toURI().toASCIIString().replaceFirst("/index.html$","/")); + + // Setup the basic application "context" for this application at "/" + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + context.setBaseResource(Resource.newResource(webRootUri)); + context.setWelcomeFiles(new String[]{"index.html"}); + context.getMimeTypes().addMimeMapping("txt","text/plain;charset=utf-8"); + server.setHandler(context); + + // Setup websocket filter to map TwitterIngestionProxySocket listen to "/proxy" + WebSocketUpgradeFilter wsfilter = WebSocketUpgradeFilter.configure(context); + wsfilter.getFactory().getPolicy().setIdleTimeout(5000); + wsfilter.addMapping(new ServletPathSpec("/proxy"), new TwitterIngestionProxySocketServerCreator()); + + // Setup stats http servlet + context.addServlet(TwitterIngestionStats.class, "/stats"); + + // Setup default servlet to serve the static files + ServletHolder staticFileHolder = new ServletHolder("default", DefaultServlet.class); + staticFileHolder.setInitParameter("dirAllowed","true"); + context.addServlet(staticFileHolder,"/"); + + System.err.println("Twitter Ingestion Proxy Server started!"); + String ip = InetAddress.getLocalHost().getHostAddress(); + System.err.println(" - Visit http://" + ip + ":" + config.getProxyPort() + " to access the admin console."); + System.err.println(" - Build Websocket client connecting to ws://" + ip + ":" + config.getProxyPort() + "/proxy to receive realtime stream of this ingestion."); + + // start jetty http server + server.start(); + server.join(); + } catch (Throwable t) { + t.printStackTrace(System.err); + } + } + + System.err.println("main function exit..."); + } +} \ No newline at end of file diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionStats.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionStats.java new file mode 100644 index 000000000..f9b9f6ac5 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionStats.java @@ -0,0 +1,19 @@ +package edu.uci.ics.cloudberry.datatools.twitter; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; + +public class TwitterIngestionStats extends HttpServlet { + protected void doGet( + HttpServletRequest request, + HttpServletResponse response) + throws ServletException, IOException { + + response.setContentType("application/json"); + response.setStatus(HttpServletResponse.SC_OK); + response.getWriter().println(TwitterIngestionServer.getStats()); + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionWorker.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionWorker.java new file mode 100644 index 000000000..434b0c454 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/TwitterIngestionWorker.java @@ -0,0 +1,271 @@ +package edu.uci.ics.cloudberry.datatools.twitter; + +import com.google.common.collect.Lists; +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Client; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; +import com.twitter.hbc.core.endpoint.Location; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; + +import java.io.*; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.zip.GZIPOutputStream; + +/** + * TwitterIngestionWorker + * + * TwitterIngestionWorker is a runnable class. + * Once starts, it connects to Twitter streaming API "https://stream.twitter.com/1.1/statuses/filter.json" + * using filtering conditions defined in config (e.g. tracking keywords or tracking location), + * and dump the json into a gzip file in a daily manner. + * + * Reference: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter + * + * @author Qiushi Bai, baiqiushi@gmail.com + */ +public class TwitterIngestionWorker implements Runnable{ + + /** stats */ + Date startTime; + long counter; + int averageRate; + int instantCounter; + long instantStart; + long instantStop; + int instantRate; + + TwitterIngestionConfig config; + Client twitterClient; + BufferedWriter fileWriter; + Date currentFileDate; // the creation date of the current output file + SimpleDateFormat dateFormatter; + SimpleDateFormat timeFormatter; + + public TwitterIngestionWorker(TwitterIngestionConfig _config) { + dateFormatter = new SimpleDateFormat("yyyy-MM-dd"); + timeFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + config = _config; + averageRate = 0; + instantRate = 0; + } + + public String getStartTime() { + return timeFormatter.format(startTime); + } + + public long getCounter() { + return counter; + } + + public int getAverageRate() { + return averageRate; + } + + public int getInstantRate() { + return instantRate; + } + + /** + * Check if we need to rotate the output file + * + * Note: currently support daily, weekly, and monthly rotation. + * + * @return + */ + private boolean rotateFile(String rotateMode) { + Calendar currentFileCalendar = Calendar.getInstance(); + currentFileCalendar.setTime(currentFileDate); + Calendar rightNowCalendar = Calendar.getInstance(); + rightNowCalendar.setTime(new Date()); + boolean rotateFile = true; + switch (rotateMode.toLowerCase()) { + case "daily": + case "day": + case "d": + if (currentFileCalendar.get(Calendar.YEAR) == rightNowCalendar.get(Calendar.YEAR) && + currentFileCalendar.get(Calendar.MONTH) == rightNowCalendar.get(Calendar.MONTH) && + currentFileCalendar.get(Calendar.DAY_OF_MONTH) == rightNowCalendar.get(Calendar.DAY_OF_MONTH)) { + rotateFile = false; + } + break; + case "weekly": + case "week": + case "w": + if (currentFileCalendar.get(Calendar.YEAR) == rightNowCalendar.get(Calendar.YEAR) && + currentFileCalendar.get(Calendar.WEEK_OF_YEAR) == rightNowCalendar.get(Calendar.WEEK_OF_YEAR)) { + rotateFile = false; + } + break; + case "monthly": + case "month": + case "m": + if (currentFileCalendar.get(Calendar.YEAR) == rightNowCalendar.get(Calendar.YEAR) && + currentFileCalendar.get(Calendar.MONTH) == rightNowCalendar.get(Calendar.MONTH)) { + rotateFile = false; + } + break; + } + + return rotateFile; + } + + /** + * Get the file writer handle for ingestion worker. + * - if there's exiting gzip file for today (e.g. [prefix]_2020-05-01.gz) + * - open this file in append mode. + * - else + * - create new file with today's date. (e.g. [prefix]_2020-05-01.gz) + * + * @return + */ + private BufferedWriter getFileWriter(String filePath, String prefix) throws IOException { + currentFileDate = new Date(); + String strDate = dateFormatter.format(currentFileDate); + String fileName = prefix + "_" + strDate + ".gz"; + if (filePath.endsWith("/")) { + fileName = filePath + fileName; + } + else { + fileName = filePath + "/" + fileName; + } + GZIPOutputStream zip = new GZIPOutputStream( + new FileOutputStream(new File(fileName), true)); + BufferedWriter bw = new BufferedWriter( + new OutputStreamWriter(zip, "UTF-8")); + return bw; + } + + @Override + public void run() { + System.err.println("Twitter Ingestion Worker starts!"); + + try { + fileWriter = getFileWriter(config.getOutputPath(), config.getFilePrefix()); + } catch (IOException e) { + System.err.println("Opening output file failed!"); + e.printStackTrace(); + return; + } + + BlockingQueue queue = new LinkedBlockingQueue(10000); + StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); + + // add tracking keywords + if (config.getTrackKeywords() != null) { + System.err.print("Tracking keywords: "); + for (String keyword : config.getTrackKeywords()) { + System.err.print(keyword); + System.err.print(" "); + } + System.err.println(); + + endpoint.trackTerms(Lists.newArrayList(config.getTrackKeywords())); + } + + // add tracking locations + if (config.getTrackLocations() != null) { + System.err.print("Tracking locations:"); + for (Location location : config.getTrackLocations()) { + System.err.print(location); + System.err.print(" "); + } + System.err.println(); + + endpoint.locations(Lists.newArrayList(config.getTrackLocations())); + } + + // add OAuth keys + Authentication auth = new OAuth1(config.getConsumerKey(), config.getConsumerSecret(), config.getToken(), + config.getTokenSecret()); + + // build twitter client + twitterClient = new ClientBuilder() + .hosts(Constants.STREAM_HOST) + .endpoint(endpoint) + .authentication(auth) + .processor(new StringDelimitedProcessor(queue)) + .build(); + + try { + twitterClient.connect(); + startTime = new Date(); + counter = 0; + instantCounter = 0; + instantStart = System.currentTimeMillis(); + + while (!twitterClient.isDone()) { + + // get one tweet + String tweet = queue.take(); + + // count stats + counter ++; + instantCounter ++; + if (instantCounter == 100) { + instantStop = System.currentTimeMillis(); + instantRate = (int) (instantCounter * 1000 / (instantStop - instantStart)); + instantStart = instantStop; + instantCounter = 0; + } + if (counter % 10000 == 0) { + Date rightNow = new Date(); + long totalSeconds = (rightNow.getTime() - startTime.getTime()) / 1000; + averageRate = (int) (counter / totalSeconds); + } + + // if needs to rotate file, get new file writer + if (rotateFile(config.getRotateMode())) { + // close current file writer + if (fileWriter != null) { + try { + fileWriter.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + // get new file writer + fileWriter = getFileWriter(config.getOutputPath(), config.getFilePrefix()); + } + + // write to file + fileWriter.write(tweet); + + // publish to TwitterIngestionServer + TwitterIngestionServer.publish(tweet); + } + } catch (InterruptedException | IOException e) { + e.printStackTrace(); + } finally { + if (fileWriter != null) { + try { + fileWriter.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (twitterClient != null) { + twitterClient.stop(); + } + } + } + + public void cleanUp() { + if (fileWriter != null) { + try { + fileWriter.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (twitterClient != null) { + twitterClient.stop(); + } + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/README.md b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/README.md new file mode 100644 index 000000000..51c3f31df --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/README.md @@ -0,0 +1,33 @@ +# Twitter GeoTagger +A Java program to geoTag Twitter JSON with {stateName, stateID, countyName, countyID, cityName, cityID}. + +## Prerequisite +- Java 8 + +## Build +```bash +$ cd twittermap +$ sbt 'project datatools' assembly +``` + +## Deploy +Copy the runnable file `datatools/target/scala-2.11/datatools-assembly-1.0-SNAPSHOT.jar` to your server. + +## Make a shell script +Create a `geotag.sh` file with content: +```bash +#!/usr/bin/env bash +java -cp /path/to/datatools-assembly-1.0-SNAPSHOT.jar \ +edu.uci.ics.cloudberry.datatools.twitter.geotagger.TwitterGeoTagger \ + -state /path/to/state.json \ + -county /path/to/county.json \ + -city /path/to/city.json \ + -thread 4 +``` +##### Note: The json files of state, county and city can be found in `twittermap/web/public/data/`. + +## Run TwitterGeoTagger against Twitter JSON data +Suppose your Twitter JSON data is in file `Tweet_2020-05-03.gz`. +```bash +gunzip -c Tweet_2020-05-03.gz | ./geotag.sh > Tweet_2020-05-03_geotagged.json +``` \ No newline at end of file diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTagger.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTagger.java new file mode 100644 index 000000000..fb7b6a618 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTagger.java @@ -0,0 +1,318 @@ +package edu.uci.ics.cloudberry.datatools.twitter.geotagger; + +import com.fasterxml.jackson.core.JsonParser; +import edu.uci.ics.cloudberry.gnosis.USGeoGnosis; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.uci.ics.cloudberry.util.Rectangle; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * TwitterGeoTagger + * + * - (1) If used as an embedded API in other services, + * it provides a function tagOneTweet, + * which takes a Tweet String as input, along with a handle to a USGeoGnosis object, + * and outputs a geo_tagged Tweet Json Map (from jackson.databind.ObjectMapper). + * + * - (2) If used as standalone process, + * it takes 4 arguments [state, county and city geo-json files, # of concurrent threads], + * and it reads pipelined Tweet String from stdin, + * and it outputs pipelined geo_tagged Tweet String into stdout. + * + * @author Qiushi Bai + */ +public class TwitterGeoTagger { + + public static boolean DEBUG = false; + public static boolean SKIP_UNTAGGED = false; + + public static String GEO_TAG = "geo_tag"; + public static String STATE_ID = "stateID"; + public static String STATE_NAME = "stateName"; + public static String COUNTY_ID = "countyID"; + public static String COUNTY_NAME = "countyName"; + public static String CITY_ID = "cityID"; + public static String CITY_NAME = "cityName"; + + public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { + // parse command line arguments + TwitterGeoTaggerConfig config = TwitterGeoTaggerConfig.createFromCLIArgs(args); + // parsing exception or config invalid + if (config == null) { + return; + } + DEBUG = config.getDebug(); + SKIP_UNTAGGED = config.getSkipUntagged(); + + // new a USGeoGnosis object shared by all threads of TwitterGeoTagger to use. + USGeoGnosis usGeoGnosis = USGeoGnosisLoader.loadUSGeoGnosis(config.getStateJsonFile(), + config.getCountyJsonFile(), config.getCityJsonFile()); + + + // create a thread pool with given thread number + ExecutorService executorService = Executors.newFixedThreadPool(config.getThreadNumber()); + LinkedList> futures = new LinkedList<>(); + + // use a buffer to store a batch of records for multiple threads to process + Queue buffer = new LinkedList<>(); + + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); + String line; + // keep reading lines into buffer + while((line = bufferedReader.readLine()) != null) { + buffer.add(line); + // once enough lines are in buffer + if (buffer.size() == config.getBufferSize()) { + // submit a task to the executor service for each line in buffer + while (!buffer.isEmpty()) { + String tweet = buffer.poll(); + futures.add(executorService.submit(() -> { + printTagOneTweet(usGeoGnosis, tweet); + })); + } + // wait for all tasks are done. + while(!futures.isEmpty()) { + Future future = futures.poll(); + future.get(); + } + } + } + // process the last lines in buffer + while (!buffer.isEmpty()) { + String tweet = buffer.poll(); + futures.add(executorService.submit(() -> { + printTagOneTweet(usGeoGnosis, tweet); + })); + } + // wait for all tasks are done. + while(!futures.isEmpty()) { + Future future = futures.poll(); + future.get(); + } + executorService.shutdownNow(); + } + + public static Map tagOneTweet(USGeoGnosis usGeoGnosis, String tweetString) { + Map tweetObject; + + // (1) parse tweet string to tweet object (Map) + try { + tweetObject = parseOneTweet(tweetString); + } catch (IOException e) { + System.err.println("Parse tweet string failed!"); + e.printStackTrace(); + return null; + } + + // (2) add "geo_tag" object into tweet object + // - try text match first + // - then try exact point lookup + // - otherwise, no geo_tag information will be added, the tweet being as before + if (!textMatchPlace(usGeoGnosis, tweetObject)) { + exactPointLookup(usGeoGnosis, tweetObject); + } + + return tweetObject; + } + + public static boolean printTagOneTweet(USGeoGnosis usGeoGnosis, String tweetString) { + try { + ObjectMapper mapper = new ObjectMapper(); + Map tweetObject; + + // (1) parse tweet string to tweet object (Map) + tweetObject = mapper.readValue(tweetString, Map.class); + + boolean tagged = false; + // (2) add "geo_tag" object into tweet object + // (2.1) try text match first + if (!tagged) { + tagged = textMatchPlace(usGeoGnosis, tweetObject); + } + // (2.2) then try exact point lookup + if (!tagged) { + tagged = exactPointLookup(usGeoGnosis, tweetObject); + } + + // print out the tweet if successfully tagged or SKIP_UNTAGGED flag is false + if (tagged || !SKIP_UNTAGGED) { + // (3) write tweet object back to string + tweetString = mapper.writeValueAsString(tweetObject); + + // (4) print geo_tagged tweet to stdout + System.out.println(tweetString); + } + + } catch (Exception e) { + if (DEBUG) { + System.err.println("Geo-tag tweet failed!"); + System.err.println(tweetString); + e.printStackTrace(); + } + return false; + } + + return true; + } + + public static Map parseOneTweet(String tweetString) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(tweetString, Map.class); + } + + public static Rectangle boundingBox2Rectangle(List> boundingBox) { + + // TODO - this logic was migrated from legacy code, should be revisited for newer version Twitter APIs. + // compatible with historical tweets, it might have 4 long-lat coordinates, or 2 long-lat coordinates + if (boundingBox.size() != 4 && boundingBox.get(0).size() != 2) { + throw new IllegalArgumentException("unknown bounding_box"); + } + + // get the min lat, min long, max lat and max long. + double minLong = Double.MAX_VALUE; + double maxLong = -Double.MAX_VALUE; + double minLat = Double.MAX_VALUE; + double maxLat = -Double.MAX_VALUE; + + // boundingBox is in format: [[long0, lat0], [long1, lat1], ...] + for (List longLat: boundingBox) { + if (longLat.get(0) < minLong) { + minLong = longLat.get(0); + } + if (longLat.get(0) > maxLong) { + maxLong = longLat.get(0); + } + if (longLat.get(1) < minLat) { + minLat = longLat.get(1); + } + if (longLat.get(1) > maxLat) { + maxLat = longLat.get(1); + } + } + + // AsterixDB is unhappy with this kind of point "rectangular" + if (minLong == maxLong && minLat == maxLat){ + minLong = maxLong - 0.0000001; + minLat = maxLat - 0.0000001; + } + + if (minLong > maxLong || minLat > maxLat) { + throw new IllegalArgumentException( + "Not a good Rectangle: " + + "minLong:" + minLong + + ", minLat:" + minLat + + ", maxLong:" + maxLong + + ", maxLat:" + maxLat); + } + + return new Rectangle(minLong, minLat, maxLong, maxLat); + } + + public static void writeGeoTagToTweetObject(Map tweetObject, scala.Option geoTag) { + Map geoTagObject = new HashMap<>(); + geoTagObject.put(STATE_ID, geoTag.get().stateID()); + geoTagObject.put(STATE_NAME, geoTag.get().stateName()); + if (!geoTag.get().countyID().isEmpty()) + geoTagObject.put(COUNTY_ID, geoTag.get().countyID().get()); + if (!geoTag.get().countyName().isEmpty()) + geoTagObject.put(COUNTY_NAME, geoTag.get().countyName().get()); + if (!geoTag.get().cityID().isEmpty()) + geoTagObject.put(CITY_ID, geoTag.get().cityID().get()); + if (!geoTag.get().cityName().isEmpty()) + geoTagObject.put(CITY_NAME, geoTag.get().cityName().get()); + + tweetObject.put(GEO_TAG, geoTagObject); + } + + public static boolean textMatchPlace(USGeoGnosis usGeoGnosis, Map tweetObject) { + Map place = (Map) tweetObject.get("place"); + if (place == null) { + return false; + } + String country = (String) place.get("country"); + if (!("United States").equals(country)) { + return false; + } + scala.Option geoTag; + String type = (String) place.get("place_type"); + switch (type) { + case "country": + return false; + case "admin": // state level + return false; + case "city": + String fullName = (String) place.get("full_name"); + int index = fullName.indexOf(','); + if (index < 0) { + System.err.println("unknown neighborhood: " + fullName); + return false; + } + String stateAbbr = fullName.substring(index + 1).trim(); + String cityName = (String) place.get("name"); + geoTag = usGeoGnosis.tagCity(cityName, stateAbbr); + break; + case "neighborhood": // e.g. "The Las Vegas Strip, Paradise" + fullName = (String) place.get("full_name"); + index = fullName.indexOf(','); + if (index < 0) { + System.err.println("unknown neighborhood: " + fullName); + return false; + } + cityName = fullName.substring(index + 1).trim(); + Map boundingBox = (Map) place.get("bounding_box"); + List>> coordinates = (List>>) boundingBox.get("coordinates"); + geoTag = usGeoGnosis.tagNeighborhood(cityName, + boundingBox2Rectangle(coordinates.get(0))); + break; + case "poi": // a point + // use the first point in bounding_box as the point to look up + boundingBox = (Map) place.get("bounding_box"); + coordinates = (List>>) boundingBox.get("coordinates"); + double longitude = coordinates.get(0).get(0).get(0); + double latitude = coordinates.get(0).get(0).get(1); + geoTag = usGeoGnosis.tagPoint(longitude, latitude); + break; + default: + System.err.println("unknown place type: " + type); + return false; + } + + if (geoTag == null || geoTag.isEmpty()) { + return false; + } + + // write geoTag to tweetObject + writeGeoTagToTweetObject(tweetObject, geoTag); + + return true; + } + + public static boolean exactPointLookup(USGeoGnosis usGeoGnosis, Map tweetObject) { + Map coordinates = (Map ) tweetObject.get("coordinates"); + if (coordinates == null || coordinates.isEmpty()) { + return false; + } + List coordinate = (List) coordinates.get("coordinates"); + if (coordinate.size() != 2) { + System.err.println("unknown coordinate: " + coordinate); + return false; + } + + scala.Option geoTag = usGeoGnosis.tagPoint(coordinate.get(0), coordinate.get(1)); + if (geoTag.isEmpty()) { + return false; + } + + writeGeoTagToTweetObject(tweetObject, geoTag); + + return true; + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerConfig.java b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerConfig.java new file mode 100644 index 000000000..ae1f2f9ee --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerConfig.java @@ -0,0 +1,139 @@ +package edu.uci.ics.cloudberry.datatools.twitter.geotagger; + +import org.apache.commons.cli.*; + +public class TwitterGeoTaggerConfig { + String stateJsonFile = null; + String countyJsonFile = null; + String cityJsonFile = null; + int threadNumber = 0; + int bufferSize = 0; + boolean debug = false; + boolean skipUntagged = false; + + public String getStateJsonFile() { + return stateJsonFile; + } + + public String getCountyJsonFile() { + return countyJsonFile; + } + + public String getCityJsonFile() { + return cityJsonFile; + } + + public int getThreadNumber() { + return threadNumber; + } + + public int getBufferSize() { + return bufferSize; + } + + public boolean getDebug() { + return debug; + } + + public boolean getSkipUntagged() { + return skipUntagged; + } + + /** + * create a TwitterGeoTaggerConfig object from parsing CLI arguments + * + * @param args + * @return TwitterGeoTaggerConfig object, or null if any exception. + */ + public static TwitterGeoTaggerConfig createFromCLIArgs(String[] args) { + // define cli arguments options, consistent with members of this class + final Options options = new Options(); + final Option stateOpt = Option.builder("state") + .longOpt("state-json-file") + .desc("State Json file for geographical information of states in the U.S.") + .type(String.class) + .required() + .hasArg() + .build(); + final Option countyOpt = Option.builder("county") + .longOpt("county-json-file") + .desc("County Json file for geographical information of counties in the U.S.") + .type(String.class) + .required() + .hasArg() + .build(); + final Option cityOpt = Option.builder("city") + .longOpt("city-json-file") + .desc("City Json file for geographical information of cities in the U.S.") + .type(String.class) + .required() + .hasArg() + .build(); + final Option threadOpt = Option.builder("thread") + .longOpt("thread-number") + .desc("Number of threads to parallelize this TwitterGeoTagger process. (Default: 1)") + .type(Integer.class) + .required(false) + .hasArg() + .build(); + final Option bufferOpt = Option.builder("buffer") + .longOpt("buffer-size") + .desc("Size of buffer to synchronize all threads - finish geo-tagging all tweets in the buffer and then continue. (Default: 100)") + .type(Integer.class) + .required(false) + .hasArg() + .build(); + final Option debugOpt = Option.builder("debug") + .longOpt("enable-debug-mode") + .desc("Enable debug mode. (Default: false)") + .type(Boolean.class) + .required(false) + .hasArg(false) + .build(); + final Option skipOpt = Option.builder("skip") + .longOpt("skip-untagged") + .desc("Skip tweets that can not be tagged, i.e. not out putting them to the stdout at all. (Default: false)") + .type(Boolean.class) + .required(false) + .hasArg(false) + .build(); + options.addOption(stateOpt); + options.addOption(countyOpt); + options.addOption(cityOpt); + options.addOption(threadOpt); + options.addOption(bufferOpt); + options.addOption(debugOpt); + options.addOption(skipOpt); + + // parse args to generate a TwitterIngestionConfig object + CommandLineParser parser = new DefaultParser(); + try { + CommandLine cmd = parser.parse(options, args); + + TwitterGeoTaggerConfig config = new TwitterGeoTaggerConfig(); + config.stateJsonFile = cmd.getOptionValue("state-json-file"); + config.countyJsonFile = cmd.getOptionValue("county-json-file"); + config.cityJsonFile = cmd.getOptionValue("city-json-file"); + config.threadNumber = Integer.parseInt(cmd.getOptionValue("thread-number", "1")); + config.bufferSize = Integer.parseInt(cmd.getOptionValue("buffer-size", "100")); + config.debug = cmd.hasOption("debug"); + config.skipUntagged = cmd.hasOption("skip"); + return config; + + } catch (ParseException e) { + e.printStackTrace(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(100, "TwitterIngestion", "", + options ,"Example: \n" + + "java -cp datatools-assembly-1.0-SNAPSHOT.jar \\ \n" + + "edu.uci.ics.cloudberry.datatools.twitter.TwitterGeoTagger \\ \n" + + "-state web/public/data/state.json \\ \n" + + "-county web/public/data/county.json \\ \n" + + "-city web/public/data/city.json \\ \n" + + "-thread 2 \\ \n" + + "-buffer 100 \\ \n"); + } + + return null; + } +} diff --git a/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/USGeoGnosisLoader.scala b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/USGeoGnosisLoader.scala new file mode 100644 index 000000000..933687820 --- /dev/null +++ b/examples/twittermap/datatools/src/main/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/USGeoGnosisLoader.scala @@ -0,0 +1,21 @@ +package edu.uci.ics.cloudberry.datatools.twitter.geotagger + +import edu.uci.ics.cloudberry.gnosis._ +import java.io.File + +/** + * USGeoGnosisLoader + * - Help load a Scala USGeoGnosis Object with given 3 geo-json file paths, + * and the returned USGeoGnosis object can be used in Java + * + * @author: Qiushi Bai + */ +object USGeoGnosisLoader { + def loadUSGeoGnosis(stateJsonFile: String, countyJsonFile: String, cityJsonFile: String): USGeoGnosis = { + val shapeMap = Seq( StateLevel -> stateJsonFile, + CountyLevel -> countyJsonFile, + CityLevel -> cityJsonFile).toMap + val usGeoGnosis = new USGeoGnosis(shapeMap.mapValues(new File(_))) + usGeoGnosis + } +} diff --git a/examples/twittermap/datatools/src/main/resources/logback.xml b/examples/twittermap/datatools/src/main/resources/logback.xml new file mode 100644 index 000000000..ebb2ae705 --- /dev/null +++ b/examples/twittermap/datatools/src/main/resources/logback.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/examples/twittermap/datatools/src/main/resources/webroot/index.html b/examples/twittermap/datatools/src/main/resources/webroot/index.html new file mode 100644 index 000000000..6ead342bb --- /dev/null +++ b/examples/twittermap/datatools/src/main/resources/webroot/index.html @@ -0,0 +1,106 @@ + + + + + Twitter Ingestion Server Console + + + + + + + + +
+
+

Statistics

+

Realtime Samples

+
+
+
+
+
+
+

first started.

+
+

tweets ingested totally.

+
+

tweets/second on average.

+
+

tweets/second now.

+
+
+
+

+        
+
+
+
+ + + \ No newline at end of file diff --git a/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterTest.java b/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterTest.java new file mode 100644 index 000000000..7bd15438b --- /dev/null +++ b/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/asterixdb/AsterixDBAdapterTest.java @@ -0,0 +1,329 @@ + + +import edu.uci.ics.cloudberry.datatools.asterixdb.AsterixDBAdapterForGeneralTwitter; +import edu.uci.ics.cloudberry.datatools.asterixdb.AsterixDBAdapterForTwitterMap; +import edu.uci.ics.cloudberry.datatools.twitter.geotagger.TwitterGeoTagger; +import java.util.Map; + +public class AsterixDBAdapterTest { + + String sampleTweet = "{\n" + + " \"created_at\":\"Mon Apr 01 15:53:45 +0000 2019\",\n" + + " \"id\":1112744871062839297,\n" + + " \"id_str\":\"1112744871062839297\",\n" + + " \"text\":\"Just think I got pranked by the post office? Received my April Visa statement from 2018 \\ud83e\\udd26\\u200d\\u2640\\ufe0f\\ud83e\\udd74 #seriously\\u2026 https:\\/\\/t.co\\/HDT3kzgmr1\",\n" + + " \"source\":\"\\u003ca href=\\\"http:\\/\\/instagram.com\\\" rel=\\\"nofollow\\\"\\u003eInstagram\\u003c\\/a\\u003e\",\n" + + " \"truncated\":true,\n" + + " \"in_reply_to_status_id\":null,\n" + + " \"in_reply_to_status_id_str\":null,\n" + + " \"in_reply_to_user_id\":null,\n" + + " \"in_reply_to_user_id_str\":null,\n" + + " \"in_reply_to_screen_name\":null,\n" + + " \"user\":{\n" + + " \"id\":26009081,\n" + + " \"id_str\":\"26009081\",\n" + + " \"name\":\"Sarah Picot-Kirkby\",\n" + + " \"screen_name\":\"noshoesgal\",\n" + + " \"location\":\"Bahamas\",\n" + + " \"url\":\"http:\\/\\/barefootmarketing.net\\/\",\n" + + " \"description\":\"Owner of Barefoot Marketing , Barefoot Locations and 242newsbahamas in The Bahamas.\",\n" + + " \"translator_type\":\"none\",\n" + + " \"protected\":false,\n" + + " \"verified\":false,\n" + + " \"followers_count\":742,\n" + + " \"friends_count\":982,\n" + + " \"listed_count\":10,\n" + + " \"favourites_count\":127,\n" + + " \"statuses_count\":1171,\n" + + " \"created_at\":\"Mon Mar 23 14:22:06 +0000 2009\",\n" + + " \"utc_offset\":null,\n" + + " \"time_zone\":null,\n" + + " \"geo_enabled\":true,\n" + + " \"lang\":\"en\",\n" + + " \"contributors_enabled\":false,\n" + + " \"is_translator\":false,\n" + + " \"profile_background_color\":\"EDECE9\",\n" + + " \"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme3\\/bg.gif\",\n" + + " \"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme3\\/bg.gif\",\n" + + " \"profile_background_tile\":false,\n" + + " \"profile_link_color\":\"94D487\",\n" + + " \"profile_sidebar_border_color\":\"FFFFFF\",\n" + + " \"profile_sidebar_fill_color\":\"E3E2DE\",\n" + + " \"profile_text_color\":\"634047\",\n" + + " \"profile_use_background_image\":true,\n" + + " \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/1065629366791487488\\/I5pRkqrJ_normal.jpg\",\n" + + " \"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/1065629366791487488\\/I5pRkqrJ_normal.jpg\",\n" + + " \"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/26009081\\/1538977688\",\n" + + " \"default_profile\":false,\n" + + " \"default_profile_image\":false,\n" + + " \"following\":null,\n" + + " \"follow_request_sent\":null,\n" + + " \"notifications\":null\n" + + " },\n" + + " \"geo\":{\n" + + " \"type\":\"Point\",\n" + + " \"coordinates\":[\n" + + " 26.5356,\n" + + " -78.69696\n" + + " ]\n" + + " },\n" + + " \"coordinates\":{\n" + + " \"type\":\"Point\",\n" + + " \"coordinates\":[\n" + + " -78.69696,\n" + + " 26.5356\n" + + " ]\n" + + " },\n" + + " \"place\":{\n" + + " \"id\":\"b631437cf2f16804\",\n" + + " \"url\":\"https:\\/\\/api.twitter.com\\/1.1\\/geo\\/id\\/b631437cf2f16804.json\",\n" + + " \"place_type\":\"country\",\n" + + " \"name\":\"Bahamas\",\n" + + " \"full_name\":\"Bahamas\",\n" + + " \"country_code\":\"BS\",\n" + + " \"country\":\"Bahamas\",\n" + + " \"bounding_box\":{\n" + + " \"type\":\"Polygon\",\n" + + " \"coordinates\":[\n" + + " [\n" + + " [\n" + + " -80.475610,\n" + + " 20.912263\n" + + " ],\n" + + " [\n" + + " -80.475610,\n" + + " 27.237671\n" + + " ],\n" + + " [\n" + + " -72.712276,\n" + + " 27.237671\n" + + " ],\n" + + " [\n" + + " -72.712276,\n" + + " 20.912263\n" + + " ]\n" + + " ]\n" + + " ]\n" + + " },\n" + + " \"attributes\":{\n" + + "\n" + + " }\n" + + " },\n" + + " \"contributors\":null,\n" + + " \"is_quote_status\":false,\n" + + " \"extended_tweet\":{\n" + + " \"full_text\":\"Just think I got pranked by the post office? Received my April Visa statement from 2018 \\ud83e\\udd26\\u200d\\u2640\\ufe0f\\ud83e\\udd74 #seriously #mailaintbetterinthebahamas #muddasick\\u2026 https:\\/\\/t.co\\/g9F9XytTuB\",\n" + + " \"display_text_range\":[\n" + + " 0,\n" + + " 168\n" + + " ],\n" + + " \"entities\":{\n" + + " \"hashtags\":[\n" + + " {\n" + + " \"text\":\"seriously\",\n" + + " \"indices\":[\n" + + " 94,\n" + + " 104\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"text\":\"mailaintbetterinthebahamas\",\n" + + " \"indices\":[\n" + + " 105,\n" + + " 132\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"text\":\"muddasick\",\n" + + " \"indices\":[\n" + + " 133,\n" + + " 143\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"urls\":[\n" + + " {\n" + + " \"url\":\"https:\\/\\/t.co\\/g9F9XytTuB\",\n" + + " \"expanded_url\":\"https:\\/\\/www.instagram.com\\/p\\/Bvt_pGgHT_sRFxoRsW0RexBMkLSip4yBQvS6gI0\\/?utm_source=ig_twitter_share&igshid=1emy4yd6uomwx\",\n" + + " \"display_url\":\"instagram.com\\/p\\/Bvt_pGgHT_sR\\u2026\",\n" + + " \"indices\":[\n" + + " 145,\n" + + " 168\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"user_mentions\":[\n" + + "\n" + + " ],\n" + + " \"symbols\":[\n" + + "\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"quote_count\":0,\n" + + " \"reply_count\":0,\n" + + " \"retweet_count\":0,\n" + + " \"favorite_count\":0,\n" + + " \"entities\":{\n" + + " \"hashtags\":[\n" + + " {\n" + + " \"text\":\"seriously\",\n" + + " \"indices\":[\n" + + " 94,\n" + + " 104\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"urls\":[\n" + + " {\n" + + " \"url\":\"https:\\/\\/t.co\\/HDT3kzgmr1\",\n" + + " \"expanded_url\":\"https:\\/\\/twitter.com\\/i\\/web\\/status\\/1112744871062839297\",\n" + + " \"display_url\":\"twitter.com\\/i\\/web\\/status\\/1\\u2026\",\n" + + " \"indices\":[\n" + + " 106,\n" + + " 129\n" + + " ]\n" + + " }\n" + + " ],\n" + + " \"user_mentions\":[\n" + + "\n" + + " ],\n" + + " \"symbols\":[\n" + + "\n" + + " ]\n" + + " },\n" + + " \"favorited\":false,\n" + + " \"retweeted\":false,\n" + + " \"possibly_sensitive\":false,\n" + + " \"filter_level\":\"low\",\n" + + " \"lang\":\"en\",\n" + + " \"timestamp_ms\":\"1554134025718\"\n" + + "}"; + + String sampleTweet2 = "{\n" + + " \"created_at\":\"Fri Dec 31 07:23:26 +0000 2021\",\n" + + " \"id\":1476816248449163280,\n" + + " \"id_str\":\"1476816248449163280\",\n" + + " \"text\":\"Just posted a video @ Waldoboro, Maine https:\\/\\/t.co\\/JCpqmYEziw\",\n" + + " \"source\":\"\u003ca href=\\\"http:\\/\\/instagram.com\\\" rel=\\\"nofollow\\\"\u003eInstagram\u003c\\/a\u003e\",\n" + + " \"truncated\":false,\n" + + " \"in_reply_to_status_id\":null,\n" + + " \"in_reply_to_status_id_str\":null,\n" + + " \"in_reply_to_user_id\":null,\n" + + " \"in_reply_to_user_id_str\":null,\n" + + " \"in_reply_to_screen_name\":null,\n" + + " \"user\":{\n" + + " \"id\":77455872,\n" + + " \"id_str\":\"77455872\",\n" + + " \"name\":\"Rachel Genthner\",\n" + + " \"screen_name\":\"Racheltgal\", \n" + + " \"location\":\"Waldoboro, Maine\",\n" + + " \"url\":\"http:\\/\\/www.facebook.com\\/racheltgal\",\n" + + " \"description\":\"love my sons take photogarphs of gods beauty wildlife and landscapes flying my drones Ham Radio\",\n" + + " \"translator_type\":\"none\",\n" + + " \"protected\":false,\n" + + " \"verified\":false,\n" + + " \"followers_count\":82,\n" + + " \"friends_count\":221,\n" + + " \"listed_count\":2,\n" + + " \"favourites_count\":2546,\n" + + " \"statuses_count\":1145,\n" + + " \"created_at\":\"Sat Sep 26 11:42:42 +0000 2009\",\n" + + " \"utc_offset\":null,\n" + + " \"time_zone\":null,\n" + + " \"geo_enabled\":true,\n" + + " \"lang\":null,\n" + + " \"contributors_enabled\":false,\n" + + " \"is_translator\":false,\n" + + " \"profile_background_color\":\"642D8B\",\n" + + " \"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme10\\/bg.gif\",\n" + + " \"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme10\\/bg.gif\",\n" + + " \"profile_background_tile\":true,\n" + + " \"profile_link_color\":\"FF0000\",\n" + + " \"profile_sidebar_border_color\":\"65B0DA\",\n" + + " \"profile_sidebar_fill_color\":\"7AC3EE\",\n" + + " \"profile_text_color\":\"3D1957\",\n" + + " \"profile_use_background_image\":true,\n" + + " \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/1155122265870217216\\/cgKcA1cg_normal.jpg\",\n" + + " \"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/1155122265870217216\\/cgKcA1cg_normal.jpg\",\n" + + " \"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/77455872\\/1586957059\",\n" + + " \"default_profile\":false,\n" + + " \"default_profile_image\":false,\n" + + " \"following\":null,\n" + + " \"follow_request_sent\":null,\n" + + " \"notifications\":null,\n" + + " \"withheld_in_countries\":[]\n" + + " },\n" + + " \"geo\":{\n" + + " \"type\":\"Point\",\n" + + " \"coordinates\":[44.09684,-69.37314]\n" + + " },\n" + + " \"coordinates\":{\n" + + " \"type\":\"Point\",\n" + + " \"coordinates\":[-69.37314,44.09684]\n" + + " },\n" + + " \"place\":{\n" + + " \"id\":\"463f5d9615d7d1be\",\n" + + " \"url\":\"https:\\/\\/api.twitter.com\\/1.1\\/geo\\/id\\/463f5d9615d7d1be.json\",\n" + + " \"place_type\":\"admin\",\n" + + " \"name\":\"Maine\",\n" + + " \"full_name\":\"Maine, USA\",\n" + + " \"country_code\":\"US\",\n" + + " \"country\":\"United States\",\n" + + " \"bounding_box\":{\n" + + " \"type\":\"Polygon\",\n" + + " \"coordinates\":[[[-71.084335,42.917127],[-71.084335,47.459687],[-66.885075,47.459687],[-66.885075,42.917127]]]\n" + + " },\n" + + " \"attributes\":{}\n" + + " },\n" + + " \"contributors\":null,\n" + + " \"is_quote_status\":false,\n" + + " \"quote_count\":0,\n" + + " \"reply_count\":0,\n" + + " \"retweet_count\":0,\n" + + " \"favorite_count\":0,\n" + + " \"entities\":{\n" + + " \"hashtags\":[],\n" + + " \"urls\":[{\n" + + " \"url\":\"https:\\/\\/t.co\\/JCpqmYEziw\",\n" + + " \"expanded_url\":\"https:\\/\\/www.instagram.com\\/p\\/CYI4FNMqSOCcq-j9hz5VaR6JESU87Hfy5ENDzE0\\/?utm_medium=twitter\",\n" + + " \"display_url\":\"instagram.com\\/p\\/CYI4FNMqSOCc\u2026\",\n" + + " \"indices\":[39,62]\n" + + " }],\n" + + " \"user_mentions\":[],\n" + + " \"symbols\":[]\n" + + " },\n" + + " \"favorited\":false,\n" + + " \"retweeted\":false,\n" + + " \"possibly_sensitive\":false,\n" + + " \"filter_level\":\"low\",\n" + + " \"lang\":\"en\",\n" + + " \"timestamp_ms\":\"1640935406206\"\n" + + "}"; + + public boolean testGeneralTwitter() throws Exception { + Map tweetObject = TwitterGeoTagger.parseOneTweet(sampleTweet); + AsterixDBAdapterForGeneralTwitter asterixDBAdapterForGeneralTwitter = new AsterixDBAdapterForGeneralTwitter(); + String asterixDBTuple = asterixDBAdapterForGeneralTwitter.transform(tweetObject); + System.out.println(asterixDBTuple); + return true; + } + + public boolean testTwitterMap() throws Exception { + Map tweetObject = TwitterGeoTagger.parseOneTweet(sampleTweet2); + AsterixDBAdapterForTwitterMap asterixDBAdapterForTwitterMap = new AsterixDBAdapterForTwitterMap(); + String asterixDBTuple = asterixDBAdapterForTwitterMap.transform(tweetObject); + System.out.println(asterixDBTuple); + return true; + } + + public static void main(String[] args) { + AsterixDBAdapterTest test = new AsterixDBAdapterTest(); + try { + //test.testGeneralTwitter(); + test.testTwitterMap(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerTest.java b/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerTest.java new file mode 100644 index 000000000..eabaa1bc3 --- /dev/null +++ b/examples/twittermap/datatools/src/test/java/edu/uci/ics/cloudberry/datatools/twitter/geotagger/TwitterGeoTaggerTest.java @@ -0,0 +1,135 @@ +package edu.uci.ics.cloudberry.datatools.twitter.geotagger; + +import edu.uci.ics.cloudberry.gnosis.USGeoGnosis; + +public class TwitterGeoTaggerTest { + static String sampleTweet = "{\n" + + " \"created_at\":\"Mon Apr 01 15:53:45 +0000 2019\",\n" + + " \"id\":1112744870182031361,\n" + + " \"id_str\":\"1112744870182031361\",\n" + + " \"text\":\"Everyone from my old job keeps texting me they heard i left \\ud83d\\ude02\",\n" + + " \"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/download\\/iphone\\\" rel=\\\"nofollow\\\"\\u003eTwitter for iPhone\\u003c\\/a\\u003e\",\n" + + " \"truncated\":false,\n" + + " \"in_reply_to_status_id\":null,\n" + + " \"in_reply_to_status_id_str\":null,\n" + + " \"in_reply_to_user_id\":null,\n" + + " \"in_reply_to_user_id_str\":null,\n" + + " \"in_reply_to_screen_name\":null,\n" + + " \"user\":{\n" + + " \"id\":186944989,\n" + + " \"id_str\":\"186944989\",\n" + + " \"name\":\"Sa$ha\",\n" + + " \"screen_name\":\"taylormone__\",\n" + + " \"location\":\"DC | BK \",\n" + + " \"url\":null,\n" + + " \"description\":\"If you take tweets personal, that\\u2019s on you \\u2728\",\n" + + " \"translator_type\":\"none\",\n" + + " \"protected\":false,\n" + + " \"verified\":false,\n" + + " \"followers_count\":1233,\n" + + " \"friends_count\":934,\n" + + " \"listed_count\":14,\n" + + " \"favourites_count\":18548,\n" + + " \"statuses_count\":70524,\n" + + " \"created_at\":\"Sat Sep 04 20:53:59 +0000 2010\",\n" + + " \"utc_offset\":null,\n" + + " \"time_zone\":null,\n" + + " \"geo_enabled\":true,\n" + + " \"lang\":\"en\",\n" + + " \"contributors_enabled\":false,\n" + + " \"is_translator\":false,\n" + + " \"profile_background_color\":\"669EFF\",\n" + + " \"profile_background_image_url\":\"http:\\/\\/abs.twimg.com\\/images\\/themes\\/theme11\\/bg.gif\",\n" + + " \"profile_background_image_url_https\":\"https:\\/\\/abs.twimg.com\\/images\\/themes\\/theme11\\/bg.gif\",\n" + + " \"profile_background_tile\":true,\n" + + " \"profile_link_color\":\"F50C69\",\n" + + " \"profile_sidebar_border_color\":\"000000\",\n" + + " \"profile_sidebar_fill_color\":\"DAE0E3\",\n" + + " \"profile_text_color\":\"040D05\",\n" + + " \"profile_use_background_image\":true,\n" + + " \"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/1101586425064112129\\/QrswgZu-_normal.jpg\",\n" + + " \"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/1101586425064112129\\/QrswgZu-_normal.jpg\",\n" + + " \"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/186944989\\/1516814036\",\n" + + " \"default_profile\":false,\n" + + " \"default_profile_image\":false,\n" + + " \"following\":null,\n" + + " \"follow_request_sent\":null,\n" + + " \"notifications\":null\n" + + " },\n" + + " \"geo\":null,\n" + + " \"coordinates\":null,\n" + + " \"place\":{\n" + + " \"id\":\"011add077f4d2da3\",\n" + + " \"url\":\"https:\\/\\/api.twitter.com\\/1.1\\/geo\\/id\\/011add077f4d2da3.json\",\n" + + " \"place_type\":\"city\",\n" + + " \"name\":\"Brooklyn\",\n" + + " \"full_name\":\"Brooklyn, NY\",\n" + + " \"country_code\":\"US\",\n" + + " \"country\":\"United States\",\n" + + " \"bounding_box\":{\n" + + " \"type\":\"Polygon\",\n" + + " \"coordinates\":[\n" + + " [\n" + + " [\n" + + " -74.041878,\n" + + " 40.570842\n" + + " ],\n" + + " [\n" + + " -74.041878,\n" + + " 40.739434\n" + + " ],\n" + + " [\n" + + " -73.855673,\n" + + " 40.739434\n" + + " ],\n" + + " [\n" + + " -73.855673,\n" + + " 40.570842\n" + + " ]\n" + + " ]\n" + + " ]\n" + + " },\n" + + " \"attributes\":{\n" + + "\n" + + " }\n" + + " },\n" + + " \"contributors\":null,\n" + + " \"is_quote_status\":false,\n" + + " \"quote_count\":0,\n" + + " \"reply_count\":0,\n" + + " \"retweet_count\":0,\n" + + " \"favorite_count\":0,\n" + + " \"entities\":{\n" + + " \"hashtags\":[\n" + + "\n" + + " ],\n" + + " \"urls\":[\n" + + "\n" + + " ],\n" + + " \"user_mentions\":[\n" + + "\n" + + " ],\n" + + " \"symbols\":[\n" + + "\n" + + " ]\n" + + " },\n" + + " \"favorited\":false,\n" + + " \"retweeted\":false,\n" + + " \"filter_level\":\"low\",\n" + + " \"lang\":\"en\",\n" + + " \"timestamp_ms\":\"1554134025508\"\n" + + "}"; + + public static void main(String[] args) { + long start = System.currentTimeMillis(); + USGeoGnosis usGeoGnosis = USGeoGnosisLoader.loadUSGeoGnosis("web/public/data/state.json", + "web/public/data/county.json", + "web/public/data/city.json"); + long end = System.currentTimeMillis(); + System.out.println("Initializing USGeoGnosis takes time: " + (end - start) / 1000.0 + " seconds."); + + TwitterGeoTagger.DEBUG = true; + + TwitterGeoTagger.printTagOneTweet(usGeoGnosis, sampleTweet); + } +} diff --git a/examples/twittermap/project/commons.scala b/examples/twittermap/project/commons.scala index c1584e93b..233b80820 100644 --- a/examples/twittermap/project/commons.scala +++ b/examples/twittermap/project/commons.scala @@ -24,9 +24,11 @@ object Commons { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case PathList("com", "sun", "activation", xs @_*) => MergeStrategy.first case PathList("javax", "activation", xs @_*) => MergeStrategy.first - case x => - val oldStrategy = (assemblyMergeStrategy in assembly).value - oldStrategy(x) + case PathList("org", "seleniumhq", "selenium", xs @ _*) => MergeStrategy.first + case x => MergeStrategy.first + //case x => + // val oldStrategy = (assemblyMergeStrategy in assembly).value + // oldStrategy(x) } ) diff --git a/examples/twittermap/project/dependencies.scala b/examples/twittermap/project/dependencies.scala index 437c6b79f..543120685 100644 --- a/examples/twittermap/project/dependencies.scala +++ b/examples/twittermap/project/dependencies.scala @@ -90,4 +90,19 @@ object Dependencies { "org.webjars.npm" % "jsbi" % "3.1.1" ) ++ testDeps + + val datatoolsDependencies: Seq[ModuleID] = Seq( + ws, + "org.twitter4j" % "twitter4j-stream" % twitter4jVersion, + "org.twitter4j" % "twitter4j-core" % twitter4jVersion, + "com.twitter" % "hbc-core" % "2.2.0", + "commons-cli" % "commons-cli" % "1.4", + "org.eclipse.jetty" % "jetty-server" % "9.4.28.v20200408", + "org.eclipse.jetty" % "jetty-servlet" % "9.4.28.v20200408", + "org.eclipse.jetty.websocket" % "websocket-servlet" % "9.4.28.v20200408", + "org.eclipse.jetty.websocket" % "websocket-api" % "9.4.28.v20200408", + "org.eclipse.jetty.websocket" % "websocket-server" % "9.4.28.v20200408", + "com.fasterxml.jackson.core" % "jackson-databind" % "2.1.1", + ("org.apache.commons" % "commons-lang3" % "3.4").exclude("commons-logging", "commons-logging") + ) ++ testDeps }