Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 68 additions & 76 deletions uds/uds_communications/TransportProtocols/Can/CanTp.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
__email__ = "richard.clubb@embeduk.com"
__status__ = "Development"

import can
from can.interfaces import pcan, vector
from time import sleep

from uds import iTp
Expand All @@ -21,7 +23,7 @@
FIRST_FRAME_DATA_START_INDEX, SINGLE_FRAME_DATA_START_INDEX, CONSECUTIVE_FRAME_SEQUENCE_NUMBER_INDEX, \
CONSECUTIVE_FRAME_SEQUENCE_DATA_START_INDEX, FLOW_CONTROL_BS_INDEX, FLOW_CONTROL_STMIN_INDEX
from uds import CanConnectionFactory
# from uds import CanConnection
#from uds import CanConnection
from uds import Config

from os import path
Expand All @@ -34,6 +36,7 @@
# Will spawn a CanTpListener class for incoming messages
# depends on a bus object for communication on CAN
class CanTp(iTp):

configParams = ['reqId', 'resId', 'addressingType']

##
Expand All @@ -45,15 +48,16 @@ def __init__(self, configPath=None, **kwargs):

self.__loadConfiguration(configPath)
self.__checkKwargs(**kwargs)

# load variables from the config
self.__N_AE = int(self.__config['canTp']['N_AE'], 16)
self.__N_TA = int(self.__config['canTp']['N_TA'], 16)
self.__N_SA = int(self.__config['canTp']['N_SA'], 16)

Mtype = self.__config['canTp']['Mtype']
if Mtype == "DIAGNOSTICS":
if (Mtype == "DIAGNOSTICS"):
self.__Mtype = CanTpMTypes.DIAGNOSTICS
elif Mtype == "REMOTE_DIAGNOSTICS":
elif (Mtype == "REMOTE_DIAGNOSTICS"):
self.__Mtype = CanTpMTypes.REMOTE_DIAGNOSTICS
else:
raise Exception("Do not understand the Mtype config")
Expand All @@ -74,14 +78,14 @@ def __init__(self, configPath=None, **kwargs):
self.__resId = int(self.__config['canTp']['resId'], 16)

# sets up the relevant parameters in the instance
if (
if(
(self.__addressingType == CanTpAddressingTypes.NORMAL) |
(self.__addressingType == CanTpAddressingTypes.NORMAL_FIXED)
):
self.__minPduLength = 7
self.__maxPduLength = 63
self.__pduStartIndex = 0
elif (
elif(
(self.__addressingType == CanTpAddressingTypes.EXTENDED) |
(self.__addressingType == CanTpAddressingTypes.MIXED)
):
Expand All @@ -92,7 +96,7 @@ def __init__(self, configPath=None, **kwargs):
# set up the CAN connection
canConnectionFactory = CanConnectionFactory()
self.__connection = canConnectionFactory(self.callback_onReceive,
self.__resId, # <-filter
self.__resId, # <-filter
configPath, **kwargs)

self.__recvBuffer = []
Expand All @@ -103,7 +107,7 @@ def __init__(self, configPath=None, **kwargs):
# @brief used to load the local configuration options and override them with any passed in from a config file
def __loadConfiguration(self, configPath, **kwargs):

# load the base config
#load the base config
baseConfig = path.dirname(__file__) + "/config.ini"
self.__config = Config()
if path.exists(baseConfig):
Expand Down Expand Up @@ -175,44 +179,40 @@ def __checkKwargs(self, **kwargs):
# @brief send method
# @param [in] payload the payload to be sent
def send(self, payload, functionalReq=False):
self.clearBufferedMessages()
self.encode_isotp(payload, functionalReq)
sleep(0.01)

##
# @brief encoding method
# @param payload the payload to be sent
# @param use_external_snd_rcv_functions boolean to state if external sending and receiving functions shall be used
def encode_isotp(self, payload, functionalReq: bool = False, use_external_snd_rcv_functions: bool = False):
data = None
endOfMessage_flag = False
while endOfMessage_flag is False:
rxPdu = self.getNextBufferedMessage()
payloadLength = len(payload)
payloadPtr = 0
payloadLength = len(payload)
payloadPtr = 0

state = CanTpState.IDLE
state = CanTpState.IDLE

if payloadLength > CANTP_MAX_PAYLOAD_LENGTH:
raise Exception("Payload too large for CAN Transport Protocol")
if payloadLength > CANTP_MAX_PAYLOAD_LENGTH:
raise Exception("Payload too large for CAN Transport Protocol")

if payloadLength < self.__maxPduLength:
state = CanTpState.SEND_SINGLE_FRAME
else:
# we might need a check for functional request as we may not be able to service functional requests for
# multi frame requests
state = CanTpState.SEND_FIRST_FRAME
if payloadLength < self.__maxPduLength:
state = CanTpState.SEND_SINGLE_FRAME
else:
# we might need a check for functional request as we may not be able to service functional requests for
# multi frame requests
state = CanTpState.SEND_FIRST_FRAME

txPdu = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
txPdu = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]

sequenceNumber = 1
endOfMessage_flag = False

sequenceNumber = 1
blockList = []
currBlock = []

blockList = []
currBlock = []
## this needs fixing to get the timing from the config
timeoutTimer = ResettableTimer(1)
stMinTimer = ResettableTimer()

self.clearBufferedMessages()

while endOfMessage_flag is False:

rxPdu = self.getNextBufferedMessage()

# this needs fixing to get the timing from the config
timeoutTimer = ResettableTimer(1)
stMinTimer = ResettableTimer()
if rxPdu is not None:
N_PCI = (rxPdu[0] & 0xF0) >> 4
if N_PCI == CanTpMessageType.FLOW_CONTROL:
Expand All @@ -225,7 +225,7 @@ def encode_isotp(self, payload, functionalReq: bool = False, use_external_snd_rc
if state == CanTpState.WAIT_FLOW_CONTROL:
if fs == CanTpFsTypes.CONTINUE_TO_SEND:
bs = rxPdu[FC_BS_INDEX]
if bs == 0:
if(bs == 0):
bs = 585
blockList = self.create_blockList(payload[payloadPtr:],
bs)
Expand All @@ -251,56 +251,49 @@ def encode_isotp(self, payload, functionalReq: bool = False, use_external_snd_rc
txPdu[N_PCI_INDEX] = 0
txPdu[FIRST_FRAME_DL_INDEX_LOW] = payloadLength
txPdu[FIRST_FRAME_DATA_START_INDEX:] = payload
data = self.transmit(txPdu, functionalReq, use_external_snd_rcv_functions)
self.transmit(txPdu, functionalReq)
endOfMessage_flag = True
elif state == CanTpState.SEND_FIRST_FRAME:
payloadLength_highNibble = (payloadLength & 0xF00) >> 8
payloadLength_lowNibble = (payloadLength & 0x0FF)
payloadLength_lowNibble = (payloadLength & 0x0FF)
txPdu[N_PCI_INDEX] += (CanTpMessageType.FIRST_FRAME << 4)
txPdu[FIRST_FRAME_DL_INDEX_HIGH] += payloadLength_highNibble
txPdu[FIRST_FRAME_DL_INDEX_LOW] += payloadLength_lowNibble
txPdu[FIRST_FRAME_DATA_START_INDEX:] = payload[0:self.__maxPduLength - 1]
payloadPtr += self.__maxPduLength - 1
data = self.transmit(txPdu, functionalReq, use_external_snd_rcv_functions)
txPdu[FIRST_FRAME_DATA_START_INDEX:] = payload[0:self.__maxPduLength-1]
payloadPtr += self.__maxPduLength-1
self.transmit(txPdu, functionalReq)
timeoutTimer.start()
state = CanTpState.WAIT_FLOW_CONTROL
elif state == CanTpState.SEND_CONSECUTIVE_FRAME:
if (stMinTimer.isExpired()):
if(stMinTimer.isExpired()):
txPdu[N_PCI_INDEX] += (CanTpMessageType.CONSECUTIVE_FRAME << 4)
txPdu[CONSECUTIVE_FRAME_SEQUENCE_NUMBER_INDEX] += sequenceNumber
txPdu[CONSECUTIVE_FRAME_SEQUENCE_DATA_START_INDEX:] = currBlock.pop(0)
payloadPtr += self.__maxPduLength
data = self.transmit(txPdu, functionalReq, use_external_snd_rcv_functions)
self.transmit(txPdu, functionalReq)
sequenceNumber = (sequenceNumber + 1) % 16
stMinTimer.restart()
if (len(currBlock) == 0):
if (len(blockList) == 0):
if(len(currBlock) == 0):
if(len(blockList) == 0):
endOfMessage_flag = True
else:
timeoutTimer.start()
state = CanTpState.WAIT_FLOW_CONTROL
# print("waiting for flow control")
#print("waiting for flow control")

txPdu = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
# timer / exit condition checks
if timeoutTimer.isExpired():
if(timeoutTimer.isExpired()):
raise Exception("Timeout waiting for message")
if use_external_snd_rcv_functions:
return data

sleep(0.01)

##
# @brief recv method
# @param [in] timeout_ms The timeout to wait before exiting
# @return a list
def recv(self, timeout_s=1):
self.decode_isotp(timeout_s)
def recv(self, timeout_s):

##
# @breif decoding method
# @param timeout_ms the timeout to wait before exiting
# @param received_data the data that should be decoded in case of ITF Automation
# @param use_external_snd_rcv_functions boolean to state if external sending and receiving functions shall be used
# return a list
def decode_isotp(self, timeout_s=1, received_data=None, use_external_snd_rcv_functions: bool = False):
timeoutTimer = ResettableTimer(timeout_s)

payload = []
Expand All @@ -316,12 +309,10 @@ def decode_isotp(self, timeout_s=1, received_data=None, use_external_snd_rcv_fun
state = CanTpState.IDLE

timeoutTimer.start()

while endOfMessage_flag is False:

rxPdu = self.getNextBufferedMessage()
if use_external_snd_rcv_functions:
rxPdu = received_data

if rxPdu is not None:
if rxPdu[N_PCI_INDEX] == 0x00:
rxPdu = rxPdu[1:]
Expand All @@ -335,8 +326,7 @@ def decode_isotp(self, timeout_s=1, received_data=None, use_external_snd_rcv_fun
endOfMessage_flag = True
elif N_PCI == CanTpMessageType.FIRST_FRAME:
payload = rxPdu[FIRST_FRAME_DATA_START_INDEX:]
payloadLength = ((rxPdu[FIRST_FRAME_DL_INDEX_HIGH] & 0x0F) << 8) + rxPdu[
FIRST_FRAME_DL_INDEX_LOW]
payloadLength = ((rxPdu[FIRST_FRAME_DL_INDEX_HIGH] & 0x0F) << 8) + rxPdu[FIRST_FRAME_DL_INDEX_LOW]
payloadPtr = self.__maxPduLength - 1
state = CanTpState.SEND_FLOW_CONTROL
elif state == CanTpState.RECEIVING_CONSECUTIVE_FRAME:
Expand All @@ -347,7 +337,7 @@ def decode_isotp(self, timeout_s=1, received_data=None, use_external_snd_rcv_fun
else:
sequenceNumberExpected = (sequenceNumberExpected + 1) % 16
payload += rxPdu[CONSECUTIVE_FRAME_SEQUENCE_DATA_START_INDEX:]
payloadPtr += self.__maxPduLength
payloadPtr += (self.__maxPduLength)
timeoutTimer.restart()
else:
raise Exception("Unexpected PDU received")
Expand All @@ -373,6 +363,7 @@ def decode_isotp(self, timeout_s=1, received_data=None, use_external_snd_rcv_fun
def closeConnection(self):
# deregister filters, listeners and notifiers etc
# close can connection
CanConnectionFactory.clearConnections()
self.__connection.shutdown()
self.__connection = None

Expand All @@ -386,7 +377,7 @@ def clearBufferedMessages(self):
# @return list, or None if nothing is on the receive list
def getNextBufferedMessage(self):
length = len(self.__recvBuffer)
if length != 0:
if(length != 0):
return self.__recvBuffer.pop(0)
else:
return None
Expand All @@ -408,7 +399,7 @@ def callback_onReceive(self, msg):
# @brief function to decode the StMin parameter
@staticmethod
def decode_stMin(val):
if val <= 0x7F:
if (val <= 0x7F):
time = val / 1000
return time
elif (
Expand Down Expand Up @@ -436,20 +427,20 @@ def create_blockList(self, payload, blockSize):
blockLength = blockSize * pduLength

working = True
while working:
while(working):
if (payloadPtr + pduLength) >= payloadLength:
working = False
currPdu = fillArray(payload[payloadPtr:], pduLength)
currBlock.append(currPdu)
blockList.append(currBlock)

if working:
currPdu = payload[payloadPtr:payloadPtr + pduLength]
currPdu = payload[payloadPtr:payloadPtr+pduLength]
currBlock.append(currPdu)
payloadPtr += pduLength
blockPtr += pduLength

if blockPtr == blockLength:
if(blockPtr == blockLength):
blockList.append(currBlock)
currBlock = []
blockPtr = 0
Expand All @@ -466,25 +457,24 @@ def create_blockList(self, payload, blockSize):
# else:
# self.__connection.transmit(data, self.__reqId, self.__addressingType)

def transmit(self, data, functionalReq=False, use_external_snd_rcv_functions: bool = False):
def transmit(self, data, functionalReq=False):
# check functional request
if functionalReq:
raise Exception("Functional requests are currently not supported")

transmitData = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]

if (
(self.__addressingType == CanTpAddressingTypes.NORMAL) |
(self.__addressingType == CanTpAddressingTypes.NORMAL_FIXED)
(self.__addressingType == CanTpAddressingTypes.NORMAL) |
(self.__addressingType == CanTpAddressingTypes.NORMAL_FIXED)
):
transmitData = data
elif self.__addressingType == CanTpAddressingTypes.MIXED:
transmitData[0] = self.__N_AE
transmitData[1:] = data
else:
raise Exception("I do not know how to send this addressing type")
if use_external_snd_rcv_functions:
return transmitData

self.__connection.transmit(transmitData, self.__reqId, )

@property
Expand All @@ -502,3 +492,5 @@ def resIdAddress(self):
@resIdAddress.setter
def resIdAddress(self, value):
self.__resId = value


Loading