Skip to content
This repository has been archived by the owner on Dec 22, 2021. It is now read-only.

CDAP-15237 Enhance the streaming plugin to support CDC option #14

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

mlozbin-cybervisiontech
Copy link
Contributor

JIRA: https://issues.cask.co/browse/CDAP-15237
WIKI: https://wiki.cask.co/display/CE/CDC+Salesforce+Streaming+Source

In scope of this PR:

  • source plugin implementation
  • widget
  • plugin docs

@mlozbin-cybervisiontech
Copy link
Contributor Author

@albertshau could you, please, take a look

Copy link
Contributor

@albertshau albertshau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only partially reviewed. Please add much more documentation around what is happening. I'm having a hard time following the logic.


Properties
----------
**clientId**: Client ID from the connected app
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should match the user friendly names set in the widget spec that the user will see in the UI. Client ID, Client Secret, etc.


**clientSecret**: Client Secret from the connected app

**username**: Username
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Username to use when connecting to Salesforce


**username**: Username

**password**: Password
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Password to use when connecting to Salesforce


**password**: Password

**loginUrl**: (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Salesforce login URL to authenticate against. The default value is https://login.salesforce.com/services/oauth2/token. This should be changed when running against the Salesforce sandbox.

**loginUrl**: (default is https://login.salesforce.com/services/oauth2/token) For Salesforce sandbox runs login url is
different. That's why user needs this option.

**objects**: list of object's API names (For example: Task for base object and Employee__c for custom) separated by ",".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objects to read change events from.


public SalesforceEventTopicListener(AuthenticatorCredentials credentials, List<String> objectsForTracking) {
this.credentials = credentials;
this.objectsForTracking = objectsForTracking;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to create a copy of the input list, in case it gets modified by the caller.

* Start the Bayeux Client which listens to the Salesforce EventTopic and saves received messages
* to the queue.
*/
public void start() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like there should be a corresponding stop() method.

case STOP:
throw new RuntimeException("Failed to process message", e);
default:
throw new UnexpectedFormatException(String.format("Unknown error handling strategy '%s'", errorHandling));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we usually use IllegalStateException for these cases in enums that shouldn't even happen


/**
* Implementation of Spark receiver to receive Salesforce change events
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more javadocs about what this class is doing. More specifically, how it consumes the change events and processes them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be far more documentation here. What are the different types of events that can be read from Salesforce? What are the types of events that this emits? Examples?

/**
* Contains information about change event
*/
public class ChangeEventRecord {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no salesforce class for this objects returned by their API?

If there isn't, we should use Gson in combination with a Java class that directly represents the JSON response for the API, instead of doing custom parsing.

Copy link
Contributor

@nitinmotgi nitinmotgi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments

"type": "record",
"fields": [
{
"name": "cdcMessage",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can name the field ‘cdc_msg’

},
{
"widget-type": "dsv",
"label": "Objects for tracking",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“Tracking Objects”

"name": "clientSecret"
},
{
"widget-type": "textbox",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn’t this be first ?

@mlozbin-cybervisiontech
Copy link
Contributor Author

@albertshau @nitinmotgi addressed all your comments

Copy link
Contributor

@albertshau albertshau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's pause work on this. I think there are large gaps in the design and understanding of the API, and I am thinking CDC in general needs a complete re-design.


Properties
----------
**Client Id**: Client ID from the connected app.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was another PR in the salesforce project that renamed these (https://github.com/data-integrations/salesforce/blob/master/docs/Salesforce-batchsource.md). Let's use the same wording and descriptions.


Salesforce Change Data Capture
--------------------------
When something changes in object for which is enable 'Change notifications'. A Change Data Capture event, or change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this information seems like it should be moved from here to the Description section.


Salesforce Change Data Capture
--------------------------
When something changes in object for which is enable 'Change notifications'. A Change Data Capture event, or change
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "When something changes in object for which is enable 'Change notifications'."

**Tracking Objects**: Objects to read change events from (For example: Task for base object and Employee__c for custom) separated by ",".
If list is empty then subscription for all events will be used.

**Error Handling**: Possible values are: "Skip on error" or "Fail on error". These are strategies on handling records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would a user ever want to skip a change event? Skipping an event can cause the application of future events to fail and kind of defeats the purpose of CDC.

events.put(event.getTransactionKey(), eventsList);
}
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the different types of exceptions that can happen? Seems like it will need to behave differently depending on the type of error scenario.

In general, this should retry whenever it encounters a failure. For example, suppose salesforce is temporarily down. The source should retry until the API comes back up. It should also not flood the logs in that type of situation, but log every X seconds.


/**
* Contains information about change event. Should be used instead of {@link com.sforce.soap.partner.ChangeEventHeader}
* because GSON does not support setters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this means. Why would we want setters? Why does this class have setters?

private void processEvents(List<ChangeEventHeader> events, PartnerConnection connection) throws ConnectionException {
for (ChangeEventHeader event : events) {
SObjectDescriptor descriptor = SObjectDescriptor.fromName(event.getEntityName(), connection);
SObjectsDescribeResult describeResult = new SObjectsDescribeResult(connection, descriptor.getAllParentObjects());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this looking up the SObject descriptor as it currently is? Does the change event not have all the information required to apply the change? If not, I don't understand how you can apply the right change (or how you can do anything efficiently), as this object may be different now than it was when the change event occurred. Unless objects cannot change?

I thought I asked these questions in the design doc. Were they ever addressed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CREATE, DELETE, UNDELETE, UPDATE types change event message contains information only about fields which were changed. If type of event is GAP_CREATE, GAP_DELETE, GAP_UNDELETE, GAP_UPDATE, GAP_OVERFLOW than it will contain only ids of records. So we have to make requests to salesforce to get full information.

Schema schema = SalesforceRecord.getSchema(descriptor, describeResult);
updateSchemaIfNecessary(event.getEntityName(), schema);

if (getOperationType(event) != OperationType.DELETE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more natural to read if you do:

if (getOperationType(event) == OperationType.DELETE) {
  sendDeleteRecords()
} else {
  sendUpdateRecords()
}


/**
* Implementation of Spark receiver to receive Salesforce change events
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be far more documentation here. What are the different types of events that can be read from Salesforce? What are the types of events that this emits? Examples?

private void sendUpdateRecords(ChangeEventHeader event, SObjectDescriptor descriptor, Schema schema,
PartnerConnection connection) throws ConnectionException {
String query = getQuery(event, descriptor.getFieldsNames());
QueryResult queryResult = connection.query(query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is querying the SObject to get the record value?? Maybe I am missing some key information about the salesforce API.

What happens when 2 records with the same key get updated in an SObject? Do we get 2 change events? This type of information needs to be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If update is similar for several records of the same type, we can receive the list of ids that were changed and common changes of their fields.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants