From c15a19b6614c358780a7641a06ac0316924df055 Mon Sep 17 00:00:00 2001 From: Wolfgang Draxinger Date: Wed, 26 Oct 2011 18:39:40 +0200 Subject: communication works, basic status decoding works, axis aliases implemented --- Phytron.py | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 14 deletions(-) diff --git a/Phytron.py b/Phytron.py index 52cb096..c00d079 100644 --- a/Phytron.py +++ b/Phytron.py @@ -10,10 +10,16 @@ class ReceiveChecksumError(exceptions.Exception): self.message = "Checksum Error: expected %x, got %x" % (expected, received) class Axis: - def __init__(self, ipcomm, address): + def __init__(self, ipcomm, ID): self.ipcomm = ipcomm - self.address = address + self.ID = ID + def execute(self, cmd): + result = self.ipcomm.execute(self.ID, cmd) + assert result['ID'] == self.ID + del result['ID'] + return result + def goToAbs(self, position): pass @@ -39,8 +45,39 @@ def checksum(data): chksm = chksm ^ ord(d) return chksm +class Status: + def __init__(self, bitvector): + self.coldboot = not not (bitvector & (1<<7)) + self.any_error = not not (bitvector & (1<<6)) + self.rx_error = not not (bitvector & (1<<5)) + self.SFI_error = not not (bitvector & (1<<4)) + self.outputstage_error = not not (bitvector & (1<<3)) + self.initiator_m = not not (bitvector & (1<<2)) + self.initiator_p = not not (bitvector & (1<<1)) + self.running = not not (bitvector & (1<<0)) + + def __str__(self): + status = list() + if self.coldboot: + status += ['Cold Boot'] + if self.any_error: + status += ['Any Error'] + if self.rx_error: + status += ['RX Error'] + if self.SFI_error: + status += ['SFI Error'] + if self.outputstage_error: + status += ['Output Stage Error'] + if self.initiator_m: + status += ['Initiator -'] + if self.initiator_p: + status += ['Initiator +'] + if self.running: + status += ['Running'] + return '{'+ ('|'.join(status)) + '}' + class IPCOMM: - def __init__(self, url, baudrate = 38400): + def __init__(self, url, baudrate = 38400, axisnames = None): self.conn = serial.serial_for_url(url) self.conn.baudrate = baudrate self.conn.parity = serial.PARITY_NONE @@ -48,15 +85,30 @@ class IPCOMM: self.conn.dsrdtr = False self.conn.xonxoff = False self.conn.timeout = 0.1 - - def enumerate(self, names): - for n in range(0x10): + self.axisByID = dict() + self.axisByName = dict() + self.enumerate(axisnames) + + def axis(self, nameOrID): + if isinstance(nameOrID, str) and nameOrID.isalpha(): + return self.axisByName[nameOrID] + return self.axisByID[int(nameOrID)] + + def enumerate(self, names=None): + self.axisByID.clear() + self.axisByName.clear() + for ID in range(0x10): try: - address = self.execute(n, 'IS?')[0] - assert address == n + assert self.execute(ID, 'IS?')['ID'] == ID + axis = Axis(self, ID) + self.axisByID[ID] = axis + + if (isinstance(names, dict) and names.haskey(ID)) or (isinstance(names, list) and ID < len(names)): + assert names[ID].isalpha() + self.axisByName[str(names[ID])] = axis + except ReceiveTimeout: continue - print("Axis %d found" % n) def send(self, data): self.conn.write('\x02' + data + ':' + ('%02X' % checksum(data + ':')) + '\x03') @@ -85,28 +137,30 @@ class IPCOMM: if expected_chksm != chksm: raise ReceiveChecksumError(expected_chksm, chksm) - return string.atoi(status[0], 0x10), string.atoi(status[1:], 0x10), data + return {'ID': string.atoi(status[0], 0x10), + 'status': Status(string.atoi(status[1:], 0x10)), + 'data': data} def broadcast(self, cmd): self.conn.flushInput() self.send( '@' + cmd ) - def execute(self, address, cmd): + def execute(self, ID, cmd): self.conn.flushInput() - self.send( ('%X' % address) + cmd ) + self.send( ('%X' % ID) + cmd ) recv_data = None while not recv_data: try: recv_data = self.recv() except ReceiveChecksumError: - self.send( ('%X' % address) + 'R') + self.send( ('%X' % ID) + 'R') # self.parseStatus(recv_status) return recv_data def goToAbs(self, positions): pass - + def goToRelative(self, offset): pass -- cgit v1.2.3