Skip to content

Commit

Permalink
Sentiment Analysis application revised
Browse files Browse the repository at this point in the history
  • Loading branch information
sv-giampa committed Sep 27, 2023
1 parent 076d7a6 commit ac8ff2c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 42 deletions.
9 changes: 9 additions & 0 deletions parsoda/apps/sentiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from parsoda import SocialDataApp
from parsoda.function.analysis.gap_bide_analysis import GapBIDE
from parsoda.function.filtering import HasEmoji
from parsoda.function.filtering.contains_keywords import ContainsKeywords

from parsoda.function.mapping.classify_by_emoji import ClassifyByEmoji
from parsoda.function.reduction.reduce_by_emoji_polarity import ReduceByEmojiPolarity
Expand All @@ -20,10 +21,18 @@ def parsoda_sentiment_analysis(
chunk_size=64,
emoji_file="./resources/input/emoji.json",
visualization_file="./resources/output/emoji_polarization.txt",
keywords: str = "",
keywords_separator: str = " ",
keywords_threshold: int = 1
):
app = SocialDataApp("Sentiment Analysis", driver, num_partitions=num_partitions, chunk_size=chunk_size)
app.set_crawlers(crawlers)
app.set_filters([
ContainsKeywords(
keywords=keywords,
separator=keywords_separator,
threshold=keywords_threshold
),
HasEmoji()
])
app.set_mapper(ClassifyByEmoji(emoji_file))
Expand Down
25 changes: 19 additions & 6 deletions parsoda/function/filtering/contains_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,27 @@

class ContainsKeywords(Filter):
"""
Checks if an item contains at least one of the given keywords
"""
Checks if an item contains at least the specified number (threshold) of the given keywords
"""

def __init__(self, keywords, separator=' ', threshold: int = 1):
"""Define a filter for items which include keywords
def __init__(self, keywords, separator=':'):
self.keywords = keywords.split(separator)
Args:
keywords (_type_): The keywords that must be included in items as a single string
separator (str, optional): the separator of the keywords in the specified string. Defaults to ' ' (space).
threshold (int, optional): The number of different keywords that must be included in the item text. Defaults to 1.
"""
self.__keywords = keywords.split(separator)
self.__threshold = threshold

def test(self, item: SocialDataItem):
for keyword in self.keywords:
if self.__keywords is None or self.__keywords == "":
return True
contains = 0
for keyword in self.__keywords:
if keyword in item.text:
return True
contains+=1
if contains >= self.__threshold:
return True
return False
27 changes: 0 additions & 27 deletions parsoda/model/driver/parsoda_multiprocessing_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def _task_load(p: CrawlerPartition):
return p.load_data().parse_data()

def _task_filter(filter_func, partition: List):
#print(f"_task_filter: partition={partition}")
filtered_partition = []
for item in partition:
if filter_func(item):
Expand All @@ -26,20 +25,6 @@ def _task_map(mapper, partition: List):
mapped_partition.extend(mapper(item))
return mapped_partition

def _task_sort(partition: List, key=lambda kv: kv[0]):
partition.sort()
return partition

def _task_reduce(reducer, partition: List[Tuple]):
reduce_result = Dict()
for kv in partition:
k, v = kv[0], kv[1]
if k in reduce_result:
reduce_result[k] = reducer(reduce_result[k], v)
else:
reduce_result[k] = v
return reduce_result

def _task_group(partition: List[Tuple])->Dict:
result = {}
for k, v in partition:
Expand Down Expand Up @@ -119,8 +104,6 @@ def filter(self, filter_func):
def flatmap(self, mapper):
mapped_partitions = []
futures = []

#self.__dataset = self.__pool.map(_task_map, self.__dataset, 1)

for p in self.__dataset:
future = self.__pool.apply_async(_task_map, (mapper, p))
Expand All @@ -146,17 +129,7 @@ def combine(accumulator: dict, partition: dict):
for p in grouped_partitions:
combine(accumulator, p)


# for p in self.__dataset:
# future = self.__pool.apply_async(_task_group, (p,))
# futures.append(future)

# accumulator = {}
# for future in futures:
# grouped_partition: dict = future.get()
# combine(accumulator, grouped_partition)
result = [(k,v) for k, v in accumulator.items()]
print(f"grouped={result}")
self.__partitioning(result)

def get_result(self):
Expand Down
9 changes: 0 additions & 9 deletions parsoda/model/social_data_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,6 @@ def execute(self) -> ParsodaReport:

reduction_result_length = len(reduction_result)
print(f"[ParSoDA/{self.__app_name}] len(reduction_result)={reduction_result_length}")

# reduction_data_len = 0
# for k, v in reduction_result.items():
# reduction_data_len += 1
# if isinstance(v, Iterable):
# reduction_data_len += len(v)
# else:
# reduction_data_len += 1
# print(f"[ParSoDA/{self.__app_name}] all reduction results (keys and values)={reduction_data_len}")
stopwatch.reset()

print(f"[ParSoDA/{self.__app_name}] disposing driver...")
Expand Down

0 comments on commit ac8ff2c

Please sign in to comment.