3636import sif3 .common .exception .PersistenceException ;
3737import sif3 .common .exception .ServiceInvokationException ;
3838import sif3 .common .header .HeaderProperties ;
39+ import sif3 .common .header .HeaderValues .EventAction ;
3940import sif3 .common .header .HeaderValues .QueryIntention ;
4041import sif3 .common .header .HeaderValues .RequestType ;
4142import sif3 .common .header .HeaderValues .ResponseAction ;
4243import sif3 .common .header .HeaderValues .ServiceType ;
44+ import sif3 .common .header .HeaderValues .UpdateType ;
4345import sif3 .common .interfaces .DelayedConsumer ;
46+ import sif3 .common .interfaces .EventConsumer ;
4447import sif3 .common .interfaces .FunctionalServiceConsumer ;
4548import sif3 .common .model .ACL .AccessRight ;
4649import sif3 .common .model .ACL .AccessType ;
4750import sif3 .common .model .CustomParameters ;
51+ import sif3 .common .model .EventMetadata ;
4852import sif3 .common .model .PagingInfo ;
4953import sif3 .common .model .QueryCriteria ;
5054import sif3 .common .model .SIFContext ;
55+ import sif3 .common .model .SIFEvent ;
5156import sif3 .common .model .SIFZone ;
5257import sif3 .common .model .ServiceInfo ;
5358import sif3 .common .model .URLQueryParameter ;
8893 *
8994 * @author Joerg Huber
9095 */
91- public abstract class AbstractFunctionalServiceConsumer extends BaseConsumer implements FunctionalServiceConsumer , DelayedConsumer
96+ public abstract class AbstractFunctionalServiceConsumer extends BaseConsumer implements FunctionalServiceConsumer , DelayedConsumer , EventConsumer < JobCollectionType >, Runnable
9297{
9398 protected final Logger logger = LoggerFactory .getLogger (getClass ());
9499
@@ -99,7 +104,7 @@ public abstract class AbstractFunctionalServiceConsumer extends BaseConsumer imp
99104 private UnmarshalFactory unmarshaller = new InfraUnmarshalFactory ();
100105
101106 /*--------------------------------------------------------------------*/
102- /* Abstract method relating to FUnctional Service Type functionality. */
107+ /* Abstract method relating to Functional Service Type functionality. */
103108 /*--------------------------------------------------------------------*/
104109 /**
105110 * Must return the name of the Functional Service. This must match the value of an entry in the
@@ -228,6 +233,33 @@ public abstract class AbstractFunctionalServiceConsumer extends BaseConsumer imp
228233 */
229234 public abstract void processDelayedPhaseDelete (PhaseInfo phaseInfo , PhaseDataResponse phaseDataResponse , DelayedResponseReceipt receipt );
230235
236+ /*------------------------------------------------------*/
237+ /* Abstract method relating to Job Event functionality. */
238+ /*------------------------------------------------------*/
239+ /**
240+ * This method is called when a consumer service has received a Job event. This class does implement the actual onEvent()
241+ * method from the event interface. It may do some additional work for house keeping purpose so the original onEvent() id processed
242+ * as part of this class but then this method is called so that the actual consumer can do its work as required.
243+ *
244+ * @param sifEvent The event data that has been received and shall be processed by the consumer. This parameter also holds
245+ * the zone and context in the limitToZoneCtxList property. It will always only hold one entry in that
246+ * list. So the zone can be retrieved with the following call: sifEvent.getLimitToZoneCtxList().get(0).getZone().
247+ * However for simplicity reasons the zone and context is already extracted and passed to this method in the
248+ * zone anc context parameter.
249+ * @param zone The zone from which the event was received from. The framework ensures that this is never null.
250+ * @param context The context from which the event was received from. The framework ensures that this is never null.
251+ * @param metadata Additional metadata that is known for the event. Typical values include custom HTTP headers, sourceName etc.
252+ * @param msgReadID The ID of the SIF queue reader. It is informative only and is only of use where there are multiple concurrent
253+ * subscribers on a message queue.
254+ * @param consumerID The consumer ID that has been used to receive the event from the event queue. It is informative
255+ * only and is only of use where there are multiple event subscribers enabled.
256+ */
257+ public abstract void processJobEvent (SIFEvent <JobCollectionType > sifEvent , SIFZone zone , SIFContext context , EventMetadata metadata , String msgReadID , String consumerID );
258+
259+ /*-------------------------*/
260+ /* Implementation of Class */
261+ /*-------------------------*/
262+
231263 /**
232264 * Constructor.
233265 */
@@ -566,6 +598,42 @@ public void onUpdateMany(MultiOperationStatusList<OperationStatus> statusList, D
566598 //Job Objects cannot be updated. No implementation needed here.
567599 }
568600
601+ /*------------------------------------*/
602+ /* EventConsumer Interface Methods. --*/
603+ /*------------------------------------*/
604+
605+ @ Override
606+ public void onEvent (SIFEvent <JobCollectionType > sifEvent , EventMetadata metadata , String msgReadID , String consumerID )
607+ {
608+ if (sifEvent != null )
609+ {
610+ // We know from the framework that zone and context is never null. For the time being we just log the event.
611+ processJobEvent (sifEvent , sifEvent .getLimitToZoneCtxList ().get (0 ).getZone (), sifEvent .getLimitToZoneCtxList ().get (0 ).getContext (), metadata , msgReadID , consumerID );
612+ }
613+ }
614+
615+ @ Override
616+ public SIFEvent <JobCollectionType > createEventObject (Object sifObjectList , EventAction eventAction , UpdateType updateType )
617+ {
618+ if (sifObjectList != null )
619+ {
620+ if (sifObjectList instanceof JobCollectionType )
621+ {
622+ int size = ((JobCollectionType )sifObjectList ).getJob ().size ();
623+ return new SIFEvent <JobCollectionType >((JobCollectionType )sifObjectList , eventAction , updateType , size );
624+ }
625+ else
626+ {
627+ logger .error ("The given event data is not of type JobCollectionType as expected. Cannot create event object. Return null" );
628+ }
629+ }
630+ else
631+ {
632+ logger .error ("The given job event data is null. Cannot create job event object. Return null" );
633+ }
634+ return null ; // if something is wrong then we get here.
635+ }
636+
569637 /*------------------------------------------------*/
570638 /* FunctionalServiceConsumer Interface Methods. --*/
571639 /*------------------------------------------------*/
@@ -1080,6 +1148,18 @@ public Response getPhaseStates(PhaseInfo phaseInfo,
10801148 return response ;
10811149 }
10821150
1151+ /*----------------------------------------*/
1152+ /* Implemented Method for Multi-threading */
1153+ /*----------------------------------------*/
1154+
1155+ /**
1156+ * This method is all that is needed to run the subscriber in its own thread.
1157+ */
1158+ @ Override
1159+ public final void run ()
1160+ {
1161+ // logger.debug("============================\n Start "+getConsumerName()+" worker thread.\n======================================");
1162+ }
10831163
10841164 /*----------------------------*/
10851165 /*-- Other required methods --*/
@@ -1126,7 +1206,7 @@ protected final List<ServiceInfo> getAllApprovedCRUDServices()
11261206 * implementations would not require any overriding of this method. If a specific implementation wishes to filter out
11271207 * some of the environment provided subscriptions then the sub-class of this class should override this method.
11281208 *
1129- * @param allServices A list of services for this that is allowed to subscribe to events across the environment of this consumer.
1209+ * @param envEventServices A list of services for this that is allowed to subscribe to events across the environment of this consumer.
11301210 *
11311211 * @return The final list of services for which this consumer class shall subscribe to.
11321212 */
@@ -1141,7 +1221,7 @@ public List<ServiceInfo> filterEventServices(List<ServiceInfo> envEventServices)
11411221 protected final List <ServiceInfo > getEventServices ()
11421222 {
11431223 //Note the service name is the name of the functional service
1144- return filterEventServices (getAllApprovedServicesForRights (getServiceURLNamePlural (), ServiceType .OBJECT , AccessRight .SUBSCRIBE ));
1224+ return filterEventServices (getAllApprovedServicesForRights (getServiceURLNamePlural (), ServiceType .FUNCTIONAL , AccessRight .SUBSCRIBE ));
11451225 }
11461226
11471227 /*---------------------*/
0 commit comments