diff --git a/aliasMapEnquiry.py b/aliasMapEnquiry.py
index b286236..a254f8a 100755
--- a/aliasMapEnquiry.py
+++ b/aliasMapEnquiry.py
@@ -67,45 +67,40 @@ def main():
if options.veryverbose :
connection.network.verbose = True
- '''
- @todo identifynode option not currently implemented
- '''
- #if options.identifynode :
- # import getUnderTestAlias
- # options.dest, otherNodeId = getUnderTestAlias.get(options.alias,
- # None,
- # options.verbose)
- # if options.nodeid == None :
- # options.nodeid = otherNodeId
+ if options.identifynode :
+ import getUnderTestAlias
+ options.dest, options.nodeid = getUnderTestAlias.get(options.alias, None, options.verbose or options.veryverbose)
retval = test(options.alias, options.dest, options.nodeid, connection,
options.verbose)
connection.network.close()
exit(retval)
-
+
def test(alias, dest, nodeID, connection, verbose) :
+ retval = 0
+
# check with node id in frame
connection.network.send(makeframe(alias, nodeID))
expect = canolcbutils.makeframestring(0x10701000 + dest, nodeID)
- if (connection.network.expect(exact=expect) == None) :
- print "Expected reply when node ID matches not received"
- return 2
+ if (connection.network.expect(startswith=expect) == None) :
+ print ("Expected reply "+expect+" when node ID matches not received")
+ retval = retval | 1
- # check without node id in frame
+ # check without node id in frame
connection.network.send(canolcbutils.makeframestring(0x10702000+alias,None))
expect = canolcbutils.makeframestring(0x10701000 + dest, nodeID)
- if (connection.network.expect(exact=expect) == None) :
- print "Expected reply when node ID matches not received"
- return 2
+ if (connection.network.expect(startswith=expect) == None) :
+ print ("Expected reply when node ID matches not received")
+ retval = retval | 2
# test non-matching NodeID using a reserved one
connection.network.send(makeframe(alias, [0,0,0,0,0,1]))
reply = connection.network.receive()
- if (connection.network.expect(exact=expect) != None) :
- print "Unexpected reply received when node ID didnt match ", reply
- return 2
-
- return 0
+ if (connection.network.expect(startswith=expect) != None) :
+ print ("Unexpected reply received when node ID didnt match ", reply)
+ retval = retval | 4
+
+ return retval
if __name__ == '__main__':
main()
diff --git a/allTest.py b/allTest.py
index fc834dd..92806ea 100755
--- a/allTest.py
+++ b/allTest.py
@@ -8,24 +8,24 @@
import connection as connection
import canolcbutils
import defaults
-
+
def usage() :
- print ""
- print "Called standalone, will sequence through a set of tests"
- print ""
- print ""
- print "Default connection detail taken from connection.py"
- print ""
- print "-a --alias source alias (default 123)"
- print "-d --dest dest alias (default 123)"
- print "-b number of datagram buffers to test (sends b+1 requests) default 1"
- print "-c continue after error; (attempts to) run to completion even if error encountered"
- print "-e --event eventID as 1.2.3.4.5.6.7.8 form"
- print "-n --node dest nodeID (-t option sets automatically, format is 01.02.03.04.05.06)"
- print "-r run until error; repeats until failure"
- print "-t find destination alias automatically"
- print "-v verbose"
- print "-V Very verbose"
+ print ("")
+ print ("Called standalone, will sequence through a set of tests")
+ print ("")
+ print ("")
+ print ("Default connection detail taken from connection.py")
+ print ("")
+ print ("-a --alias source alias (default 123)")
+ print ("-d --dest dest alias (default 123)")
+ print ("-b number of datagram buffers to test (sends b+1 requests) default 1")
+ print ("-c continue after error; (attempts to) run to completion even if error encountered")
+ print ("-e --event eventID as 1.2.3.4.5.6.7.8 form")
+ print ("-n --node dest nodeID (-t option sets automatically, format is 01.02.03.04.05.06)")
+ print ("-r run until error; repeats until failure")
+ print ("-t find destination alias automatically")
+ print ("-v verbose")
+ print ("-V Very verbose")
import getopt, sys
@@ -40,12 +40,12 @@ def main():
complete = False
repeat = False
bufnum = 1
-
+
try:
opts, remainder = getopt.getopt(sys.argv[1:], "e:n:b:a:d:vVtcr", ["event=", "alias=", "node=", "dest="])
- except getopt.GetoptError, err:
+ except getopt.GetoptError as err:
# print help information and exit:
- print str(err) # will print something like "option -a not recognized"
+ print (str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for opt, arg in opts:
@@ -75,16 +75,25 @@ def main():
# now execute
retval = test(alias, dest, nodeID, event, connection, verbose, complete, repeat, identifynode, bufnum)
- done(retval)
+ done(retval)
def done(retval) :
connection.network.close()
exit(retval)
-
+
+def purge() :
+ # wait two seconds, dropping all traffic
+ # used after errors to clear up any remnant traffic
+ import time
+ time.sleep(2)
+ while connection.network.receive() != None :
+ continue
+ return
+
def test(alias, dest, nodeID, event, connection, verbose, complete, repeat, identifynode, bufnum):
result = 0;
-
+
while True :
if identifynode :
@@ -92,177 +101,196 @@ def test(alias, dest, nodeID, event, connection, verbose, complete, repeat, iden
dest, nodeID = getUnderTestAlias.get(alias, None, verbose)
import aliasMapEnquiry
- if verbose : print "aliasMapEnquiry"
+ if verbose : print ("aliasMapEnquiry")
retval = aliasMapEnquiry.test(alias, dest, nodeID, connection, verbose)
if retval != 0 :
- print "Error in aliasMapEnquiry"
+ print ("Error in aliasMapEnquiry")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import verifyNodeGlobal
- #if verbose : print "verifyNodeGlobal w no NodeID"
+ #if verbose : print ("verifyNodeGlobal w no NodeID")
#retval = verifyNodeGlobal.test(alias, None, connection)
#if retval != 0 :
- # print "Error in verifyNodeGlobal w no NodeID"
+ # print ("Error in verifyNodeGlobal w no NodeID")
# if not complete : done(retval)
# result |= retval
- if verbose : print "verifyNodeGlobal"
+ if verbose : print ("verifyNodeGlobal")
retval = verifyNodeGlobal.test(alias, nodeID, connection)
if retval != 0 :
- print "Error in verifyNodeGlobal"
+ print ("Error in verifyNodeGlobal")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import verifyNodeAddressed
- if verbose : print "verifyNodeAddressed"
+ if verbose : print ("verifyNodeAddressed")
retval = verifyNodeAddressed.test(alias, dest, nodeID, connection, verbose)
if retval != 0 :
- print "Error in verifyNodeAddressed"
+ print ("Error in verifyNodeAddressed")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import protocolIdentProtocol
- if verbose : print "protocolIdentProtocol"
+ if verbose : print ("protocolIdentProtocol")
retval = protocolIdentProtocol.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in protocolIdentProtocol"
+ print ("Error in protocolIdentProtocol")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import identifyEventsGlobal
- if verbose : print "identifyEventsGlobal"
+ if verbose : print ("identifyEventsGlobal")
retval = identifyEventsGlobal.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in identifyEventsGlobal"
+ print ("Error in identifyEventsGlobal")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import identifyEventsAddressed
- if verbose : print "identifyEventsAddressed"
+ if verbose : print ("identifyEventsAddressed")
retval = identifyEventsAddressed.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in identifyEventsAddressed"
+ print ("Error in identifyEventsAddressed")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import identifyConsumers
- if verbose : print "identifyConsumers"
+ if verbose : print ("identifyConsumers")
retval = identifyConsumers.test(alias, event, connection, verbose)
if retval != 0 :
- print "Error in identifyConsumers"
+ print ("Error in identifyConsumers")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import identifyProducers
- if verbose : print "identifyProducers"
+ if verbose : print ("identifyProducers")
retval = identifyProducers.test(alias, event, connection, verbose)
if retval != 0 :
- print "Error in identifyProducers"
+ print ("Error in identifyProducers")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testProducerConsumerNotification
- if verbose : print "testProducerConsumerNotification"
+ if verbose : print ("testProducerConsumerNotification")
retval = testProducerConsumerNotification.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in testProducerConsumerNotification"
+ print ("Error in testProducerConsumerNotification")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testConfigurationProtocol
- if verbose : print "testConfigurationProtocol"
+ if verbose : print ("testConfigurationProtocol")
retval = testConfigurationProtocol.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in testConfigurationProtocol", retval
+ print ("Error in testConfigurationProtocol")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testDatagram
- if verbose : print "testDatagram"
+ if verbose : print ("testDatagram")
retval = testDatagram.test(alias, dest, connection, bufnum, verbose)
if retval != 0 :
- print "Error in testDatagram", retval
+ print ("Error in testDatagram")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testOverlappingDatagrams
- if verbose : print "testOverlappingDatagrams"
+ if verbose : print ("testOverlappingDatagrams")
retval = testOverlappingDatagrams.test(alias, dest, bufnum, connection, verbose)
if retval != 0 :
- print "Error in testOverlappingDatagrams", retval
+ print ("Error in testOverlappingDatagrams")
if not complete : done(retval)
+ purge()
result |= retval
import simpleNodeIdentificationInformation
- if verbose : print "simpleNodeIdentificationInformation"
+ if verbose : print ("simpleNodeIdentificationInformation")
retval = simpleNodeIdentificationInformation.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in simpleNodeIdentificationInformation"
+ print ("Error in simpleNodeIdentificationInformation")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testCDI
- if verbose : print "testCDI"
+ if verbose : print ("testCDI")
retval = testCDI.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in testCDI"
+ print ("Error in testCDI")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testReservedBits
- if verbose : print "testReservedBits"
+ if verbose : print ("testReservedBits")
retval = testReservedBits.test(alias, nodeID, dest, connection, verbose)
if retval != 0 :
- print "Error in testReservedBits"
+ print ("Error in testReservedBits")
if not complete : done(retval)
- result |= retval
-
+ purge()
+ result |= retval
+
import unknownDatagramType
- if verbose : print "unknownDatagramType"
+ if verbose : print ("unknownDatagramType")
retval = unknownDatagramType.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in unknownDatagramType"
+ print ("Error in unknownDatagramType")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import unknownMtiAddressed
- if verbose : print "unknownMtiAddressed"
+ if verbose : print ("unknownMtiAddressed")
retval = unknownMtiAddressed.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in unknownMtiAddressed"
+ print ("Error in unknownMtiAddressed")
if not complete : done(retval)
+ purge()
result |= retval
-
+
import testStandardFrame
import time
- if verbose : print "testStandardFrame"
+ if verbose : print ("testStandardFrame")
retval = testStandardFrame.test(connection, verbose)
if retval != 0 :
- print "Error in testStandardFrame"
+ print ("Error in testStandardFrame")
if not complete : done(retval)
+ purge()
result |= retval
time.sleep(3)
-
+
# done last, as changes alias in use
import testAliasConflict
- if verbose : print "testAliasConflict"
+ if verbose : print ("testAliasConflict")
retval = testAliasConflict.test(alias, dest, connection, verbose)
if retval != 0 :
- print "Error in testAliasConflict"
+ print ("Error in testAliasConflict")
if not complete : done(retval)
+ purge()
result |= retval
if not repeat : break
- if verbose : print "End of pass, repeat"
- if verbose : print ""
-
- if verbose : print "Note: Did not perform testStartup, which is manual"
- if verbose : print "Note: Did not perform testForZeroAlias.py, which is slow"
+ if verbose : print ("End of pass, repeat")
+ if verbose : print ("")
+
+ if verbose : print ("Note: Did not perform testStartup, which is manual")
+ if verbose : print ("Note: Did not perform testForZeroOrRepeatedAlias.py, which is slow")
if verbose :
- if result != 0 : print "Encountered errors"
- else : print "Normal end"
+ if result != 0 : print ("Encountered errors")
+ else : print ("Normal end")
- return
+ return result
if __name__ == '__main__':
main()
diff --git a/canloader.py b/canloader.py
index 1ad330f..65ad8e3 100644
--- a/canloader.py
+++ b/canloader.py
@@ -21,29 +21,29 @@
BOOTLOADER_VERSION = 0xA5
def selectNode(CRIS, NNB, connection, verbose) :
- print "In selectNode"
+ print("In selectNode")
frame = makeframestring(CRIS, CAN_SELECT_NODE, [NNB])
response = False
resp_frame = ''
counter = 100
while (not response) and counter:
- print "sending CAN_SELECT_NODE"
+ print("sending CAN_SELECT_NODE")
connection.network.send(frame)
resp_frame = connection.network.receive()
if (resp_frame != None) and isFrameType(resp_frame, CAN_SELECT_NODE, CRIS):
- print "node selected!"
+ print("node selected!")
response = True
else:
counter -= 1
if(counter == 0): return False
#now examine the response more carefully
data = bodyArray(resp_frame)
- print "data contains ",
- print data
+ print("data contains ", end=' ')
+ print(data)
if(data[0] >= BOOTLOADER_VERSION) and (data[1] == 1):
- print "selectNode success! Done"
+ print("selectNode success! Done")
return True
- print "selectNode fail!"
+ print("selectNode fail!")
return False
def expect(good, bad, connection, verbose) :
@@ -83,10 +83,10 @@ def selectMemoryPage(page, CRIS, connection, verbose):
return expect(expected_response, error_response, connection, verbose)
def eraseMemory(CRIS, connection, verbose) :
- print "eraseMemory"
+ print("eraseMemory")
frame = makeframestring(CRIS, CAN_PROG_START, [0x80, 0xFF, 0xFF])
expected_response = makeframestring(CRIS, CAN_PROG_START, [0x00])
- print expected_response
+ print(expected_response)
error_response = makeframestring(CRIS, CAN_ERROR, [0x00])
connection.network.send(frame)
return expect(expected_response, error_response, connection, verbose)
@@ -99,7 +99,7 @@ def startWrite(start, end, CRIS, connection, verbose) :
return expect(expected_response, error_response, connection, verbose)
def writeMemory(ih, zero_index, start, end, CRIS, connection, verbose) :
- if verbose: print "WRITE"
+ if verbose: print("WRITE")
if not startWrite(start, end, CRIS, connection, verbose) :
return False
address = start
@@ -117,7 +117,7 @@ def writeMemory(ih, zero_index, start, end, CRIS, connection, verbose) :
if(response != None) and isFrameType(response, CAN_PROG_DATA, CRIS):
payload = bodyArray(response)
if payload[0] != 0x00 and payload[0] != 0x02:
- print "ERROR WRITING"
+ print("ERROR WRITING")
return False
else:
done = True
@@ -127,7 +127,7 @@ def writeMemory(ih, zero_index, start, end, CRIS, connection, verbose) :
return True
def verifyMemory(ih, CRIS, connection, verbose) :
- if verbose: print "VERIFY"
+ if verbose: print("VERIFY")
address = ih.minaddr()
while(address < ih.maxaddr()):
incr = min(8, (ih.maxaddr()+1) - address)
@@ -140,7 +140,7 @@ def verifyMemory(ih, CRIS, connection, verbose) :
payload = bodyArray(response)
for i in range(len(payload)):
if payload[i] != ih[address + i]:
- print "ERROR IN VERIFY!"
+ print("ERROR IN VERIFY!")
exit
address += len(payload)
done = True
@@ -151,20 +151,20 @@ def startApplication(CRIS, connection, verbose) :
connection.network.send(frame)
def usage() :
- print ""
- print "TODO Called standalone, will send one CAN datagram message"
- print " and display response."
- print ""
- print "Expect a single datagram reply in return"
- print "e.g. [1Esssddd] 4C"
- print "from destination alias to source alias"
- print ""
- print "Default connection detail taken from connection.py"
- print ""
- print "-d --dest dest alias (default 0x"+hex(connection.testNodeAlias).upper()+")"
- print "-t find destination alias semi-automatically"
- print "-v verbose"
- print "-V Very verbose"
+ print("")
+ print("TODO Called standalone, will send one CAN datagram message")
+ print(" and display response.")
+ print("")
+ print("Expect a single datagram reply in return")
+ print("e.g. [1Esssddd] 4C")
+ print("from destination alias to source alias")
+ print("")
+ print("Default connection detail taken from connection.py")
+ print("")
+ print("-d --dest dest alias (default 0x"+hex(connection.testNodeAlias).upper()+")")
+ print("-t find destination alias semi-automatically")
+ print("-v verbose")
+ print("-V Very verbose")
if __name__ == '__main__':
@@ -177,9 +177,9 @@ def usage() :
identifynode = False
try:
opts, remainder = getopt.getopt(sys.argv[1:], "f:d:vVt", ["dest=", "alias=", "content="])
- except getopt.GetoptError, err:
+ except getopt.GetoptError as err:
# print help information and exit:
- print str(err) # will print something like "option -a not recognized"
+ print(str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for opt, arg in opts:
diff --git a/canolcbutils.py b/canolcbutils.py
index 31e7b53..8505f58 100755
--- a/canolcbutils.py
+++ b/canolcbutils.py
@@ -27,10 +27,10 @@ def makeframestring(header, body) :
def splitSequence(seq) :
strings = seq.split('.')
result = []
- for a in strings :
+ for a in strings :
result = result+[int(a, 16)]
return result
-
+
'''
Pull body bytes from frame as array
'''
@@ -41,7 +41,7 @@ def bodyArray(frame) :
result = result+[int(string[:2],16)]
string = string[2:]
return result
-
+
'''
Return (header, body) of a P/C Event Report frame
alias: the source alias of this node
@@ -53,11 +53,10 @@ def eventframe(alias, event) :
def main():
(header, body) = eventframe(0x123, [11,255,240,4,5,6,7,8]);
- print makeframestring(header, body)
- print splitSequence("1.2.3.a.0a.10.4")
- print bodyArray(":X1E000000F010203040506;")
-
+ print (makeframestring(header, body))
+ print (splitSequence("1.2.3.a.0a.10.4"))
+ print (bodyArray(":X1E000000F010203040506;"))
+
if __name__ == '__main__':
main()
-
\ No newline at end of file
diff --git a/connection.py b/connection.py
index 61b26bd..4bcbfed 100755
--- a/connection.py
+++ b/connection.py
@@ -12,35 +12,35 @@
network = defaults.network
-thisNodeID = defaults.thisNodeID
+thisNodeID = defaults.thisNodeID
thisNodeAlias = defaults.thisNodeAlias
-testNodeID = defaults.testNodeID
+testNodeID = defaults.testNodeID
testNodeAlias = defaults.testNodeAlias
def main():
usage()
-
+
return # done with example
-
+
def list() :
- print "network.host = "+network.host
- print "network.port = "+str(network.port)
- print "thisNodeID = ",thisNodeID
- print "thisNodeAlias = "+hex(thisNodeAlias)
- print "testNodeID = ",testNodeID
- print "testNodeAlias = "+hex(testNodeAlias)
+ print ("network.host = "+network.host)
+ print ("network.port = "+str(network.port))
+ print ("thisNodeID = ",thisNodeID)
+ print ("thisNodeAlias = "+hex(thisNodeAlias))
+ print ("testNodeID = ",testNodeID)
+ print ("testNodeAlias = "+hex(testNodeAlias))
return
def usage() :
- print ""
- print "Python module for defining the layout connection."
- print ""
- print "Invoked by other routines to know how to send, "
- print "not intended to be invoked standalone"
- print ""
+ print ("")
+ print ("Python module for defining the layout connection.")
+ print ("")
+ print ("Invoked by other routines to know how to send, ")
+ print ("not intended to be invoked standalone")
+ print ("")
list()
return
-
+
if __name__ == '__main__':
main()
diff --git a/datagram.py b/datagram.py
index 9b420bf..3fd28ba 100755
--- a/datagram.py
+++ b/datagram.py
@@ -50,16 +50,16 @@ def sendOneDatagram(alias, dest, content, connection, verbose) :
frame = connection.network.receive()
if frame == None :
- print "Did not receive reply"
+ print("Did not receive reply")
return 1
if not isOkReply(frame) :
- print "Unexpected message received instead of Datagram Received OK"
+ print("Unexpected message received instead of Datagram Received OK")
return 2
if not int(frame[12:15],16) == alias:
- print "Improper dest alias in reply", frame
+ print("Improper dest alias in reply", frame)
return 3
if not int(frame[7:10],16) == dest:
- print "Improper source alias in reply", frame
+ print("Improper source alias in reply", frame)
return 3
return 0
@@ -68,7 +68,7 @@ def receiveOneDatagram(alias, dest, conection, verbose) :
while True :
reply = connection.network.receive()
if (reply == None ) :
- print "No datagram segment received"
+ print("No datagram segment received")
return 4
elif reply.startswith(":X1B") or reply.startswith(":X1C") :
retval = retval + canolcbutils.bodyArray(reply)
@@ -78,7 +78,7 @@ def receiveOneDatagram(alias, dest, conection, verbose) :
connection.network.send(makereply(alias, dest))
return retval
else :
- print "Unexpected message instead of datagram segment", reply
+ print("Unexpected message instead of datagram segment", reply)
return 3
def sendOneDatagramNoWait(alias, dest, content, connection, verbose) :
@@ -108,7 +108,7 @@ def receiveDatagramReplyAndOneDatagram(alias, dest, conection, verbose) :
while True :
reply = connection.network.receive()
if (reply == None ) :
- print "Missing response"
+ print("Missing response")
return 4
elif isOkReply(reply) :
haveReply = True
@@ -124,29 +124,29 @@ def receiveDatagramReplyAndOneDatagram(alias, dest, conection, verbose) :
if haveReply :
return retval
else :
- print "Unexpected message", reply
+ print("Unexpected message", reply)
return 3
def usage() :
- print ""
- print "Called standalone, will send one CAN datagram message"
- print " and display response."
- print ""
- print "Expect a single datagram reply in return"
- print "e.g. [1Esssddd] 4C"
- print "from destination alias to source alias"
- print ""
- print "Default connection detail taken from connection.py"
- print ""
- print "See also testDatagram.py"
- print ""
- print "-a --alias source alias (default 0x"+hex(connection.thisNodeAlias).upper()+")"
- print "-d --dest dest alias (default 0x"+hex(connection.testNodeAlias).upper()+")"
- print "-c --content message content (default 1.2.3.4)"
- print "-t find destination alias automatically"
- print "-v verbose"
- print "-V Very verbose"
+ print("")
+ print("Called standalone, will send one CAN datagram message")
+ print(" and display response.")
+ print("")
+ print("Expect a single datagram reply in return")
+ print("e.g. [1Esssddd] 4C")
+ print("from destination alias to source alias")
+ print("")
+ print("Default connection detail taken from connection.py")
+ print("")
+ print("See also testDatagram.py")
+ print("")
+ print("-a --alias source alias (default 0x"+hex(connection.thisNodeAlias).upper()+")")
+ print("-d --dest dest alias (default 0x"+hex(connection.testNodeAlias).upper()+")")
+ print("-c --content message content (default 1.2.3.4)")
+ print("-t find destination alias automatically")
+ print("-v verbose")
+ print("-V Very verbose")
import getopt, sys
@@ -160,9 +160,9 @@ def main():
try:
opts, remainder = getopt.getopt(sys.argv[1:], "d:a:c:vVt", ["dest=", "alias=", "content="])
- except getopt.GetoptError, err:
+ except getopt.GetoptError as err:
# print help information and exit:
- print str(err) # will print something like "option -a not recognized"
+ print(str(err)) # will print something like "option -a not recognized"
usage()
sys.exit(2)
for opt, arg in opts:
diff --git a/defaults.py b/defaults.py
index 7055573..5f0aa9d 100755
--- a/defaults.py
+++ b/defaults.py
@@ -1,51 +1,57 @@
-serial = False
-tcp = True
-ethernet = False
+# select the connection type (only one)
+serial = True
windows = False
+serialPort = "/dev/cu.usbmodemCC570001B1"
+
+tcp = False
+ethernet = False
+networkHost = "192.168.16.212"
+
local = False
+# thisNode is the node doing the testing
+thisNodeID = [1,2,3,4,5,6]
+thisNodeAlias = 0xAAA
-if tcp and not local:
+# testNode is the node under test
+# usually the -t option is used to gather this info from the single attached node
+testNodeID = [2,3,4,5,6,1]
+testNodeAlias = 0xDDD
+# an event produced and consumed by the device under test or None
+testEventID = [0x05, 0x02, 0x01, 0x02, 0x02, 0x00, 0x00, 0x00]
+
+
+if tcp :
import tcpolcblink
network = tcpolcblink.TcpToOlcbLink()
- network.host = "174.18.137.234"
- network.host = "localhost"
- #network.host = "propername.local."
+ network.host = networkHost
network.port = 12021
-elif ethernet and not local:
+elif ethernet :
import ethernetolcblink
network = ethernetolcblink.EthernetToOlcbLink()
- network.host = "174.18.137.234"
- #network.host = "propername.local."
+ network.host = networkHost
network.port = 12021
-elif windows and not local :
+elif windows :
import serialolcblink
network = serialolcblink.SerialOlcbLink()
- network.port = "COM9"
+ network.port = serialPort
network.speed = 500000
network.startdelay = 0
-elif serial and not local :
+elif serial :
import serialolcblink
network = serialolcblink.SerialOlcbLink()
- #network.port = "/dev/cu.usbserial-A900fLVC"
- #network.port = "/dev/cu.usbmodem401331"
- #network.port = "/dev/cu.usbserial-AE015IZE" # Ioduino
- network.port = "/dev/cu.usbserial-A5VRG6OF" # TCH parallel
- network.speed = 230400
- network.parallel = True
- network.startdelay = 2
+ network.port = serialPort
+ network.speed = 57600 #115200 #230400
+ network.parallel = False
+ network.startdelay = 1 # time to wait at start for adapter to come up, in seconds
elif local :
import pipeolcblink
network = pipeolcblink.PipeOlcbLink()
network.name = "pyOlcbBasicNode"
else :
- print "Please set one of the options to True"
+ print ("Please set one of the options to True")
-thisNodeID = [1,2,3,4,5,6]
-thisNodeAlias = 0xAAA
testNodeID = [2,3,4,5,6,1]
testNodeAlias = 0xDDD
-
-
testEventID = [0x05, 0x02, 0x01, 0x02, 0x02, 0x00, 0x00, 0x00]
diff --git a/ethernetolcblink.py b/ethernetolcblink.py
index a14c482..cdd2759 100755
--- a/ethernetolcblink.py
+++ b/ethernetolcblink.py
@@ -15,7 +15,7 @@ def __init__(self) :
# prepare, but don't open
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#self.socket.timeout(10)
-
+
# defaults (generally overridden by system-wide defaults elsewhere)
self.host = "10.00.01.98" # Arduino adapter default
self.port = 23
@@ -25,51 +25,100 @@ def __init__(self) :
self.socket = None
self.rcvData = ""
return
-
+
def connect(self) :
# if verbose, print
- if (self.verbose) : print " connect to ",self.host,":",self.port
-
+ if (self.verbose) : print (" connect to ",self.host,":",self.port)
+
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
-
+
# wait for link startup
# after (possible) reset due to serial startup
if self.startdelay > 0 :
- if self.verbose : print " waiting", self.startdelay, "seconds for adapter restart"
+ if self.verbose : print (" waiting", self.startdelay, "seconds for adapter restart")
time.sleep(self.startdelay)
return
-
+
def send(self, frame) :
if (self.socket == None) : self.connect()
-
+
# if verbose, print
- if (self.verbose) : print " send ",frame
-
+ if (self.verbose) : print (" send ",frame)
+
# send
- self.socket.send(frame+'\n')
-
+ self.socket.send((frame+'\n').encode())
+
return
-
+
def receive(self) : # returns frame
if (self.socket == None) : self.connect()
-
- # if verbose, print
- if (self.verbose) : print " receive ",
-
+
self.socket.settimeout(self.timeout)
while (self.rcvData.find('\n') < 0) :
try:
- self.rcvData = self.rcvData+self.socket.recv(1024)
- except socket.timeout, err:
- if (self.verbose) : print " This is the index page of the OpenLCBâ„¢ Python prototypes
directory. Most of the Python modules in this package are meant to be run
-from the command line, or invoked programmatically.
+from the command line, or invoked programmatically.
To run one from the Mac OS X or Linux command line, first edit the
defaults.py file with your connection
-information, then do e.g.:
+information, then do e.g.:
To run the full set of tests from the Mac OS X or Linux command line,
-do :
+do :
-followed by testing the start-up sequencing with :
+followed by testing the start-up sequencing with :
+
(Reset the node when prompted to force the start-up sequence to run) When running from the command line, internal help is available via
the -h option, as in: There are text files containing typical
-output from running
+output from running
-and
+and
test scripts that you can consult to see what normal operation looks
like. Note that they contain node IDs, aliases, and event IDs that
-are specific to the particular node being tested, so your mileage
- will vary.
+are specific to the particular node being tested, so your mileage
+will vary.
The type of access (Ethernet or USB)
and parameters such as IP address, default node IDs, etc are defined
in the default.py file. When using the command line, edit that file
for your specific needs. If you're using a serial connection, you
-need to have PySerial installed, which is
+need to have PySerial installed, which is
described here. The basic Ethernet connection is accessed via the
-"ethernetolcblink" module.
+"ethernetolcblink" module.
The basic USB-Serial connection is accessed via the "serialolcblink"
-module.
+module.
Other cross-connection utilities can found in the "canolcbutils"
-module.
+module.
OpenLCB
+
OpenLCB
Python Prototype./verifyNodeGlobal.py -v./allTest.py -t -v && echo OK./testStartup.py -t -v && echo OK./testStartup.py && echo OK
./allTest.py -t -V./testStartup.py -t -V
-
-Utilities and Modules
import ethernetolcblink import serialolcblinkFrame and Message Tools
Tests
+
+You can learn about use of these will the -h option, which is generally available.
+
Tools
@@ -156,4 +158,4 @@
page.
This is SVN $Revision$ of