-
-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
python can filter is not correct #136
Comments
i have to correct my previous post, only missing DAQ Lists IDs are an issue. |
bugfix |
Hallo, ...
DAQ_LISTS = [
DaqList("pwm_stuff", 2, False, False, [
("period", 0x001C0028, 0, "F32"),
("channel2", 0x1BD008, 0, "F32"),
("PWMFiltered", 0x1BDDE2, 0, "U8"),
("PWM", 0x1BDDDF, 0, "U8"),
("Triangle", 0x1BDDDE, 0, "I8"),
]),
]
... This is not just a mock-up example, this is currently working/tested with Vector XCPsim; finally there are some .CSV files written (one per DAQ-list), nice. Another big thing is the completely new Specifying per DAQ-list CAN identifiers is then possible as follows:
OK, regarding CAN, there are still some open issues:
|
Hello, Yes, I am using predefined DAQ List. Just to make things clear, I am not that much of an expert like you are for sure. What i am currently developing is a python qt application which uses the pyxcp protocol as a low level driver to handle received packages. Since I had issues that the daq lists were not received at all and i did not find much of documentation on the pyxcp why I had to dig deeper (which took me some days and so...) and found out the root cause. What you see here is the fixes i had to make to be able to receive daq lists data from the XCP slave. I found out that the python-can driver throughs out any other messages then the master and slave can IDs. Some additional info: I have written the xcp driver on my own which processes the daq lists. I can share this also. But since you are working on similar stuff, i guess this is then irrelevant. Anyway pyxcp is great and i will keep using it for my application. Let me know if I can contribute somehow. |
If i may contribute somehow to the major update let me know... we can save double work... here some part of the xcp driver i had to create: class XCPDriver:
# RES/ERR PIDs
RES_PID_POSITIVE = 0xFF
ERR_PID_NEGATIVE = 0xFE
# Error codes
ERR_CMD_SYNCH = 0x00
ERR_CMD_BUSY = 0x10
ERR_DAQ_ACTIVE = 0x11
def __init__(self):
# Initialize logger using GlobalLoggerControl
self.logger = GlobalLoggerControl.get_logger("XCPDriver", "DEBUG")
# Initialize available address granularity (AG) for all ODTs to 6 bytes
self.available_ag_in_odt = []
# Initialize an empty simulated DAQ list
self.simulated_daqList = []
# DAQ lists master
self.daqLists_master = []
# daq processor info
self.dqp = []
self.daq0 = DAQList("daq0", 10, 6, 0.01)
self.daq1 = DAQList("daq1", 20, 6, 0.1)
self.daq2 = DAQList("daq2", 20, 6, 1)
class DAQList:
# define the buffer length of a DAQ List in seconds
DAQ_BUFFER_LENGTH_IN_SECONDS = 2
DAQ_SAMPLE_POINTER_THD_IN_SECONDS = 1 # after 1 second the driver will exist the layer driver
def __init__(self, daq_name: str, max_odts: int, max_odt_entries: int, sample_time: float):
self.name = daq_name
self.idx = int(daq_name[-1]) # Take the last character and convert it to an integer
self.max_odts = max_odts
self.max_odt_entries = max_odt_entries
# daq list filled flags
self.size4_full = False
self.size2_full = False
self.size1_full = False
self.sizeAll_full = False
def _add_variable_to_daq_list(self, variable_name, variable_size, variable_address_int, variable_data_type):
"""
Helper function to add a single variable to the simulated DAQ list.
"""
# Store the current state
backup_daqList = copy.deepcopy(self.simulated_daqList)
backup_ag_in_odt = copy.deepcopy(self.available_ag_in_odt)
# Extract currently added variables
current_variables = []
for odt in self.simulated_daqList:
for variable in odt:
current_variables.append({
"name": variable["variable_name"],
"size": variable["size"],
"address": variable["address"], # extract the address as well,
"type": variable["type"]
})
# Log the current state of current_variables before adding the new variable
self.logger.debug(f"Current variables before adding new one: {current_variables}")
# Add the new variable
current_variables.append({
"name": variable_name,
"size": variable_size,
"address": variable_address_int,
"type": variable_data_type
})
# Log the state of current_variables after adding the new variable
self.logger.debug(f"Current variables after adding new one: {current_variables}")
# Sort variables by size, descending
sorted_variables = sorted(current_variables, key=lambda x: x["size"], reverse=True)
# Clear the current DAQ list and the available AG
self.clear_daq_list()
# Now, add variables to the DAQ list in sorted order
try:
for var in sorted_variables:
self._add_single_variable(var["name"], var["size"], var["address"], var["type"]) # pass the address to the helper function as well
except ValueError: # if adding any variable fails
# Restore old state
self.simulated_daqList = backup_daqList
self.available_ag_in_odt = backup_ag_in_odt
self.logger.error(f"Could not fit variable {variable_name}. Restored previous DAQ list state.")
return False # Indicate a negative feedback
# update daq list state
# if the flags for fill state get true, do not attempt to add other variables before removal
self.update_filled_flags()
return True |
Hello, |
great, then let me know when it is there. I will fork and try to integrate in my application and can try some early hands on testing. |
the read methodin python_can should be adapted:
first it should listen for the slave ID and second it should listen also for DAQ IDs.
def read(self):
if not self.connected:
#self.parent.logger.debug("Not connected. Returning None.")
return None
try:
#self.parent.logger.debug("Waiting to receive frame...")
frame = self.bus.recv(5)
except CanError:
#self.parent.logger.debug("Error occurred while receiving frame. Returning None.")
return None
else:
if frame is None or frame.arbitration_id != self.parent.can_id_master.id or not len(frame.data):
if frame.arbitration_id != self.parent.can_id_master.id:
print(f"Received frame with unexpected arbitration ID: {frame.arbitration_id}")
return None # Timeout condition.
print(frame)
extended = frame.is_extended_id
identifier = can.Identifier.make_identifier(frame.arbitration_id, extended)
return can.Frame(
id_=identifier,
dlc=frame.dlc,
data=frame.data,
timestamp=frame.timestamp,
)
The text was updated successfully, but these errors were encountered: