Skip to content

Commit

Permalink
Merge pull request #413 from Hurence/feature/sessionization2
Browse files Browse the repository at this point in the history
Feature/sessionization2
  • Loading branch information
oalam authored Oct 3, 2018
2 parents ce885d2 + 244be83 commit ceda70d
Show file tree
Hide file tree
Showing 12 changed files with 3,011 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -87,15 +88,17 @@ public boolean equals(Object o) {

if (!type.equals(field.type)) return false;
if (!name.equals(field.name)) return false;
return rawValue.equals(field.rawValue);
return Objects.equals(rawValue, field.rawValue);

}

@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + name.hashCode();
result = 31 * result + rawValue.hashCode();
if ( rawValue != null ) {
result = 31 * result + rawValue.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


@Tags({"datastore", "record", "service"})
@CapabilityDescription("A controller service for accessing an abstract datatore.")
@CapabilityDescription("A controller service for accessing an abstract datastore.")
public interface DatastoreClientService extends ControllerService {

PropertyDescriptor FLUSH_INTERVAL = new PropertyDescriptor.Builder()
Expand Down
80 changes: 76 additions & 4 deletions logisland-documentation/components.rst
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,11 @@ In the list below, the names of required properties appear in **bold**. Any othe

"**hbase.client.service**", "The instance of the Controller Service to use for accessing HBase.", "", "null", "", ""
"**table.name.field**", "The field containing the name of the HBase Table to fetch from.", "", "null", "", "**true**"
"**row.identifier.field**", "The field containing the identifier of the row to fetch.", "", "null", "", "**true**"
"**row.identifier.field**", "The field containing the identifier of the row to fetch.", "", "null", "", "**true**"
"columns.field", "The field containing an optional comma-separated list of "<colFamily>:<colQualifier>" pairs to fetch. To return all columns for a given family, leave off the qualifier such as "<colFamily1>,<colFamily2>".", "", "null", "", "**true**"
"record.serializer", "the serializer needed to i/o the record in the HBase row", "kryo serialization (serialize events as json blocs), json serialization (serialize events as json blocs), avro serialization (serialize events as avro blocs), no serialization (send events as bytes)", "com.hurence.logisland.serializer.KryoSerializer", "", ""
"record.schema", "the avro schema definition for the Avro serialization", "", "null", "", ""
"table.name.default", "The table table to use if table name field is not set", "", "null", "", ""
"table.name.default", "The table to use if table name field is not set", "", "null", "", ""

----------

Expand Down Expand Up @@ -864,6 +864,78 @@ In the list below, the names of required properties appear in **bold**. Any othe

----------

.. _com.hurence.logisland.processor.webAnalytics.IncrementalWebSession:

IncrementalWebSession
---------------------
This processor creates and updates web-sessions based on incoming web-events. Note that both web-sessions and web-events are stored in elasticsearch.
Firstly, web-events are grouped by their session identifier and processed in chronological order.
Then each web-session associated to each group is retrieved from elasticsearch.
In case none exists yet then a new web session is created based on the first web event.
The following fields of the newly created web session are set based on the associated web event: session identifier, first timestamp, first visited page. Secondly, once created, or retrieved, the web session is updated by the remaining web-events.
Updates have impacts on fields of the web session such as event counter, last visited page, session duration, ...
Before updates are actually applied, checks are performed to detect rules that would trigger the creation of a new session: the duration between the web session and the web event must not exceed the specified time-out,
the web session and the web event must have timestamps within the same day (at midnight a new web session is created),
source of traffic (campaign, ...) must be the same on the web session and the web event.
When a breaking rule is detected, a new web session is created with a new session identifier where as remaining web-events still have the original session identifier. The new session identifier is the original session suffixed with the character '#' followed with an incremented counter. This new session identifier is also set on the remaining web-events.
Finally when all web events were applied, all web events -potentially modified with a new session identifier- are save in elasticsearch. And web sessions are passed to the next processor.

WebSession information are:
- first and last visited page
- first and last timestamp of processed event
- total number of processed events
- the userId
- a boolean denoting if the web-session is still active or not
- an integer denoting the duration of the web-sessions
- optional fields that may be retrieved from the processed events



Class
_____
com.hurence.logisland.processor.webAnalytics.IncrementalWebSession

Tags
____
analytics, web, session

Properties
__________
In the list below, the names of required properties appear in **bold**. Any other properties (not in bold) are considered optional. The table also indicates any default values
.

.. csv-table:: allowable-values
:header: "Name","Description","Allowable Values","Default Value","Sensitive","EL"
:widths: 20,60,30,20,10,10

"debug", "Enable debug. If enabled, debug information are logged.", "", "false", "", ""
"**es.session.index**", "Name of the ES index containing the web session documents.", "", "null", "", ""
"**es.session.type**", "Name of the ES type of web session documents.", "", "null", "", ""
"**es.event.index**", "Name of the ES index containing the web event documents.", "", "null", "", ""
"**es.event.type**", "Name of the ES type of web event documents.", "", "null", "", ""
"sessionid.field", "the name of the field containing the session id => will override default value if set", "", "sessionId", "", ""
"timestamp.field", "the name of the field containing the timestamp => will override default value if set", "", "h2kTimestamp", "", ""
"visitedpage.field", "the name of the field containing the visited page => will override default value if set", "", "location", "", ""
"userid.field", "the name of the field containing the userId => will override default value if set", "", "userId", "", ""
"fields.to.return", "the list of fields to return", "", "null", "", ""
"firstVisitedPage.out.field", "the name of the field containing the first visited page => will override default value if set", "", "firstVisitedPage", "", ""
"lastVisitedPage.out.field", "the name of the field containing the last visited page => will override default value if set", "", "lastVisitedPage", "", ""
"isSessionActive.out.field", "the name of the field stating whether the session is active or not => will override default value if set", "", "is_sessionActive", "", ""
"sessionDuration.out.field", "the name of the field containing the session duration => will override default value if set", "", "sessionDuration", "", ""
"sessionInactivityDuration.out.field", "the name of the field containing the session inactivity duration => will override default value if set", "", "sessionInactivityDuration", "", ""
"session.timeout", "session timeout in sec", "", "1800", "", ""
"eventsCounter.out.field", "the name of the field containing the session duration => will override default value if set", "", "eventsCounter", "", ""
"firstEventDateTime.out.field", "the name of the field containing the date of the first event => will override default value if set", "", "firstEventDateTime", "", ""
"lastEventDateTime.out.field", "the name of the field containing the date of the last event => will override default value if set", "", "lastEventDateTime", "", ""
"utm_source.field", "Name of the field containing the utm_source value in the session", "", "utm_source", "", ""
"utm_campaign.field", "Name of the field containing the utm_campaign value in the session", "", "utm_campaign", "", ""
"utm_medium.field", "Name of the field containing the utm_medium value in the session", "", "utm_medium", "", ""
"utm_content.field", "Name of the field containing the utm_content value in the session", "", "utm_content", "", ""
"utm_term.field", "Name of the field containing the utm_term value in the session", "", "utm_term", "", ""
"**elasticsearch.client.service**", "The instance of the Controller Service to use for accessing Elasticsearch.", "", "null", "", ""

----------

.. _com.hurence.logisland.processor.enrichment.IpToFqdn:

IpToFqdn
Expand Down Expand Up @@ -1881,11 +1953,11 @@ setSourceOfTraffic
------------------
Compute the source of traffic of a web session. Users arrive at a website or application through a variety of sources,
including advertising/paying campaigns, search engines, social networks, referring sites or direct access.
When analysing user experience on a webshop, it is crucial to collects, processes, and reports the campaign and traffic-source data.
When analysing user experience on a webshop, it is crucial to collect, process, and report the campaign and traffic-source data.
To compute the source of traffic of a web session, the user has to provide the utm_* related properties if available
i-e: **utm_source.field**, **utm_medium.field**, **utm_campaign.field**, **utm_content.field**, **utm_term.field**)
, the referer (**referer.field** property) and the first visited page of the session (**first.visited.page.field** property).
By default the source of traffic informations are placed in a flat structure (specified by the **source_of_traffic.suffix** property
By default the source of traffic information are placed in a flat structure (specified by the **source_of_traffic.suffix** property
with a default value of source_of_traffic_). To work properly the setSourceOfTraffic processor needs to have access to an
Elasticsearch index containing a list of the most popular search engines and social networks. The ES index (specified by the **es.index** property) should be structured such that the _id of an ES document MUST be the name of the domain. If the domain is a search engine, the related ES doc MUST have a boolean field (default being search_engine) specified by the property **es.search_engine.field** with a value set to true. If the domain is a social network , the related ES doc MUST have a boolean field (default being social_network) specified by the property **es.social_network.field** with a value set to true.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import kafka.message.MessageAndMetadata
import kafka.serializer.DefaultDecoder
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -291,20 +292,35 @@ abstract class AbstractKafkaRecordStream extends AbstractRecordStream with Kafka
* previous values :
* refresh.leader.backoff.ms -> 1000
*/
val kafkaStreamsParams = Map(
"metadata.broker.list" -> brokerList,
"bootstrap.servers" -> brokerList,
"group.id" -> appName,
"refresh.leader.backoff.ms" -> "5000",
"auto.offset.reset" -> "largest"
var kafkaStreamsParams = Map(
"metadata.broker.list" -> brokerList,
"bootstrap.servers" -> brokerList,
"group.id" -> appName,
"refresh.leader.backoff.ms" -> "5000"
)
val autoOffsetResetProp = streamContext.getPropertyValue(AbstractKafkaRecordStream.KAFKA_MANUAL_OFFSET_RESET)
val autoOffsetReset = if (autoOffsetResetProp.isSet) {
val value = autoOffsetResetProp.asString
if ( ! AbstractKafkaRecordStream.KAFKA_MANUAL_OFFSET_RESET.validate(value).isValid ) {
logger.error(s"Invalid value '${value}' for property ${AbstractKafkaRecordStream.KAFKA_MANUAL_OFFSET_RESET.getName()}")
throw new IllegalStateException(s"Invalid value '${value}' for property ${AbstractKafkaRecordStream.KAFKA_MANUAL_OFFSET_RESET.getName()}");
}
value
}
else {
null
}

if ( autoOffsetReset != null ) {
kafkaStreamsParams = kafkaStreamsParams + (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset)
}

val offsets = zkSink.value.loadOffsetRangesFromZookeeper(brokerList, appName, inputTopics)
@transient val kafkaStream = if (
streamContext.getPropertyValue(AbstractKafkaRecordStream.KAFKA_MANUAL_OFFSET_RESET).isSet
|| offsets.isEmpty) {

logger.info(s"starting Kafka direct stream on topics $inputTopics from largest offsets")
logger.info(s"starting Kafka direct stream on topics $inputTopics from offsets $autoOffsetReset")
KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
ssc,
kafkaStreamsParams,
Expand Down
Loading

0 comments on commit ceda70d

Please sign in to comment.