11"""
22 emitters.py
33
4- Copyright (c) 2013-2014 Snowplow Analytics Ltd. All rights reserved.
4+ Copyright (c) 2013-2019 Snowplow Analytics Ltd. All rights reserved.
55
66 This program is licensed to you under the Apache License Version 2.0,
77 and you may not use this file except in compliance with the Apache License
1515 language governing permissions and limitations there under.
1616
1717 Authors: Anuj More, Alex Dean, Fred Blundun
18- Copyright: Copyright (c) 2013-2014 Snowplow Analytics Ltd
18+ Copyright: Copyright (c) 2013-2019 Snowplow Analytics Ltd
1919 License: Apache License Version 2.0
2020"""
2121
5151
5252new_contract ("redis" , lambda x : isinstance (x , (redis .Redis , redis .StrictRedis )))
5353
54- try :
55- # Check whether a custom Celery configuration module named "snowplow_celery_config" exists
56- import snowplow_celery_config
57- app = Celery ()
58- app .config_from_object (snowplow_celery_config )
59- except ImportError :
60- # Otherwise configure Celery with default settings
61- snowplow_celery_config = None
62- app = Celery ("Snowplow" , broker = "redis://guest@localhost//" )
63-
6454
6555class Emitter (object ):
6656 """
@@ -84,7 +74,7 @@ def __init__(self, endpoint, protocol="http", port=None, method="get", buffer_si
8474 :param on_success: Callback executed after every HTTP request in a flush has status code 200
8575 Gets passed the number of events flushed.
8676 :type on_success: function | None
87- :param on_failure: Callback executed if at least one HTTP request in a flush has status code 200
77+ :param on_failure: Callback executed if at least one HTTP request in a flush has status code other than 200
8878 Gets passed two arguments:
8979 1) The number of events which were successfully sent
9080 2) If method is "post": The unsent data in string form;
@@ -342,7 +332,7 @@ def __init__(
342332 :param on_success: Callback executed after every HTTP request in a flush has status code 200
343333 Gets passed the number of events flushed.
344334 :type on_success: function | None
345- :param on_failure: Callback executed if at least one HTTP request in a flush has status code 200
335+ :param on_failure: Callback executed if at least one HTTP request in a flush has status code other than 200
346336 Gets passed two arguments:
347337 1) The number of events which were successfully sent
348338 2) If method is "post": The unsent data in string form;
@@ -385,30 +375,38 @@ def consume(self):
385375 self .queue .task_done ()
386376
387377
388- @app .task (bind = True , name = 'tasks.flush' ) # the self passed with bind can be used for on_fail/retrying
389- def flush_emitter (self , emitter ):
390- try :
391- emitter .flush ()
392- finally :
393- logger .info ("Flush called on emitter" )
394-
395-
396378class CeleryEmitter (Emitter ):
397379 """
398380 Uses a Celery worker to send HTTP requests asynchronously.
399381 Works like the base Emitter class,
400382 but on_success and on_failure callbacks cannot be set.
401383 """
384+ celery_app = None
385+
402386 def __init__ (self , endpoint , protocol = "http" , port = None , method = "get" , buffer_size = None , byte_limit = None ):
403387 super (CeleryEmitter , self ).__init__ (endpoint , protocol , port , method , buffer_size , None , None , byte_limit )
404388
389+ try :
390+ # Check whether a custom Celery configuration module named "snowplow_celery_config" exists
391+ import snowplow_celery_config
392+ self .celery_app = Celery ()
393+ self .celery_app .config_from_object (snowplow_celery_config )
394+ except ImportError :
395+ # Otherwise configure Celery with default settings
396+ self .celery_app = Celery ("Snowplow" , broker = "redis://guest@localhost//" )
397+
398+ self .async_flush = self .celery_app .task (self .async_flush )
399+
405400 def flush (self ):
406401 """
407402 Schedules a flush task
408403 """
409- flush_emitter . delay (self ) # passes emitter (self - CeleryEmitter) to task
404+ self . async_flush . delay ()
410405 logger .info ("Scheduled a Celery task to flush the event queue" )
411406
407+ def async_flush (self ):
408+ super (CeleryEmitter , self ).flush ()
409+
412410
413411class RedisEmitter (object ):
414412 """
0 commit comments