Skip to content

TwitterMap documentation

vidhyasagar edited this page May 5, 2017 · 10 revisions

TwitterMap

Based on the Play Framework requirement, the HTML source is located in the twittermap/app/views/index.scala.html. It is a scripted HTML (.scala.html) which will be rendered by the framework. Since we are using Angular to control the main logic, we don't put too many scripts here. The main logic is implemented in the javascript.

The javascript codes located in twittermap/public/javascripts/ folder. The app.js is the entrance of the js. Each front-end component is implemented as an Angular Directive. The meaning of each folder is introduced in below.

common

The common module defines an Angular service that communicates with the back-end server by using JSON request via web socket connection.

It defines

  • a query function that can be called to send the JSON requests to the Neo server;
  • a ws.onmessage function that receives the JSON messages from the Neo server and updates the corresponding global values;

The examples below show real JSON requests to the Neo server.

The count JSON request, requests the total counts of tweets every second.

{
  dataset: "twitter.ds_tweet",
  global: {
    globalAggregate: {
      field: "*",
      apply: {
        name: "count"
      },
      as: "count"
    }
  },
  estimable : true,
  transform: {
    wrap: {
      key: "totalCount"
    }
  }
}

The sample JSON request, requests the most recent 10 tweets satisfying the filter condition.

{
  dataset: "twitter.ds_tweet",
  filter: [
    {
      field: "geo_tag.stateID",
      relation: "in",
      values: [37,51,24,11,10,34,42,9,44,48,35,4,40,6,20,32,8,49,12,22,28,1,13,45,5,47,21,29,54,17,18,39,19,55,26,27,31, 
               56,41,46,16,30,53,38,25,36,50,33,23,2]
    }, {
      field: "create_at",
      relation: "inRange",
      values: ["2016-01-01T00:00:00.000Z", "2016-12-31T00:00:00.000Z"]
    }, {
      field: "text",
      relation: "contains",
      values: ["zika", "virus"]
    }
  ],
  select: {
    order: ["-create_at"],
    limit: 10,
    offset: 0,
    field: ["create_at", "id", "user.id"]
  },
  transform: {
    wrap: {
      key: "sample"
    }
  }
}

The batch JSON request, requests for the data group by geo, by time and by hashtag.

{
  batch: [
    {
      dataset: "twitter.ds_tweet",
      filter: [
        {
          field: "geo_tag.stateID",
          relation: "in",
          values: [37,51,24,11,10,34,42,9,44,48,35,4,40,6,20,32,8,49,12,22,28,1,13,45,5,47,21,29,54,17,18,39,19,55,26,27,31, 
               56,41,46,16,30,53,38,25,36,50,33,23,2]
        }, {
          field: "create_at",
          relation: "inRange",
          values: ["2016-01-01T00:00:00.000Z", "2016-12-31T00:00:00.000Z"]
        }, {
          field: "text",
          relation: "contains",
          values: ["zika", "virus"]
        }
      ],
      group: {
        by: [{
          field: "create_at",
          apply: {
            name: "interval",
            args: {
              unit: "day"
            }
          },
          as: "day"
        }],
        aggregate: [{
          field: "*",
          apply: {
            name: "count"
          },
          as: "count"
        }]
      }
    },
    {
      dataset: "twitter.ds_tweet",
      filter: [
        {
          field: "geo_tag.stateID",
          relation: "in",
          values: [37,51,24,11,10,34,42,9,44,48,35,4,40,6,20,32,8,49,12,22,28,1,13,45,5,47,21,29,54,17,18,39,19,55,26,27,31, 
               56,41,46,16,30,53,38,25,36,50,33,23,2]
        }, {
          field: "create_at",
          relation: "inRange",
          values: ["2016-01-01T00:00:00.000Z", "2016-12-31T00:00:00.000Z"]
        }, {
          field: "text",
          relation: "contains",
          values: ["zika", "virus"]
        }
      ],
      group: {
        by: [{
          field: "geo",
          apply: {
            name: "level",
            args: {
              level: "stateID"
            }
          },
          as: "stateID"
        }],
        aggregate: [{
          field: "*",
          apply: {
            name: "count"
          },
          as: "count"
        }]
      }
    }, 
    {
      dataset: "twitter.ds_tweet",
      filter: [
        {
          field: "geo_tag.stateID",
          relation: "in",
          values: [37,51,24,11,10,34,42,9,44,48,35,4,40,6,20,32,8,49,12,22,28,1,13,45,5,47,21,29,54,17,18,39,19,55,26,27,31, 
               56,41,46,16,30,53,38,25,36,50,33,23,2]
        }, {
          field: "create_at",
          relation: "inRange",
          values: ["2016-01-01T00:00:00.000Z", "2016-12-31T00:00:00.000Z"]
        }, {
          field: "text",
          relation: "contains",
          values: ["zika", "virus"]
        }
      ],
      unnest: [{
        hashtags: "tag"
      }],
      group: {
        by: [{
          field: "tag"
        }],
        aggregate: [{
          field: "*",
          apply: {
            name: "count"
          },
          as: "count"
        }]
      },
      select: {
        order: ["-count"],
        limit: 50,
        offset: 0
      }
    }
  ],
  option: {
    sliceMillis: 2000
  },
  transform: {
    wrap: {
      key: "batch"
    }
  }
}

It also defines several global values (e.g. mapResults, timeResults, etc) to store the results. The dependent modules UI can be bound to specific values by using Angular watch function

map

The map directive is implemented by extends the existing Angular leaflet-directive. Initially, it loads the state and the county shapes by asking the resource file from Neo server. Then if the map has the zoom-in, zoom-out, or drag actions, it calls query function in common module. It also watches the mapResults values that the draw function will be called once the results has changed.

searchbar

The directive to control the search box.

timeserials

The directive to show the time serial chart that is implemented using dc.js.

sidebar

It controls the hashtag and the sample tweets parts.

cache

cache is a angular-service that renders cityPolygon data to map directive .It caches city polygons requested by users.Next time ,when user requests data that is already in cache ,the response is provided by cache rather than sending a http request to middleware. If the user requested data is not there in cache ,cache requests data for the user requested area along with some extra region (pre-fetching) from middleware and stores in cache. So the next time if user has requested a nearby region,it will be in cache .

This helps us to reduce number of requests to middleware and faster rendering of data when user's requests are concentrated on a particular area.

The data structure to store the geo JSON data is rTree .When the cache becomes full we completely empty the cache and start over .For cache replacement ,we consider both temporal and spatial data before removing the region.

More future front-ends

dashboard

An experimental demo to let each state clickable.

AsterixDB Feed

DataFlow

To use AsterixDB’s data feed, we need to open a socket using AQL to listen to connections. Example AQL, see cloudberry/noah/src/main/resources/aql/feed.aql. Then create a socketAdapterClient to connect to AsterixDB’s socket and send records to AsterixDB through the socket.

FeedSocketAdapterClient

FeedSocketAdapterClient could initialize a socket connection with AsterixDB and send records to AsterixDB. It contains three important functions:

  • initialize(): should be called after new a FeedSocketAdapterClient object. It sets up socket connection with AsterixDB.
  • ingest(String record): sends a record to AsterixDB through the socket.
  • finalized(): should be called after the feed ends. It closes the socket.

Both FileFeedDriver and TwitterFeedStreamDriver create a FeedSocketAdapterClient object and call ingest function to send records to AsterixDB.

FileFeedDriver

It feeds data from an adm file to AsterixDB. First, it initializes a FeedSocketAdapterClient. Then, it reads record from file line by line and calls FeedSocketAdapterClient.ingest to send the record to AsterixDB.

To use the FileFeedDriver, run fileFeed.sh

TwitterFeedStreamDriver

This class is the current pipeline which fetches real time twitter data and feeds the data to AsterixDB. The procedure is:

  • Use twitter streaming API to fetch real time twitter data.
  • For every tweet, geotag it, convert it from json format to adm format.
  • Call FeedSocketAdapterClient.ingest to send the record to AsterixDB.

To use TwitterFeedStreamDriver, modify and run streamFeed.sh

Twitter driver documentation: https://docs.google.com/document/d/1j2vXRL8WeSoqzUKb2Kv4sebKHA0rQIvJZviUSH5cAo4/edit

Use Kafka to ingest to AsterixDB

See this page