import serial, string, threading class ReceiveTimeout(Exception): pass class ReceiveChecksumError(Exception): def __init__(self, expected, received): self.expected = expected self.received = received self.message = "Checksum Error: expected %x, got %x" % (expected, received) class RXBufferOverrunError(EnvironmentError): pass class NotNowWarning(UserWarning): pass class UnknownCommand(Exception): pass class BadValueError(Exception): pass class ParameterLimitsError(ValueError): pass def checksum(data): chksm = 0 for d in data: chksm = chksm ^ ord(d) return chksm class Status: COLDBOOT = (1<<7) ANY_ERROR = (1<<6) RX_ERROR = (1<<5) SFI_ERROR = (1<<4) OUTPUTSTAGE_ERROR = (1<<3) INITIATOR_MINUS = (1<<2) INITIATOR_PLUS = (1<<1) RUNNING = (1<<0) def __init__(self, bitvector): self.coldboot = not not (bitvector & Status.COLDBOOT) self.any_error = not not (bitvector & Status.ANY_ERROR) self.rx_error = not not (bitvector & Status.RX_ERROR) self.SFI_error = not not (bitvector & Status.SFI_ERROR) self.outputstage_error = not not (bitvector & Status.OUTPUTSTAGE_ERROR) self.initiator_minus = not not (bitvector & Status.INITIATOR_MINUS) self.initiator_plus = not not (bitvector & Status.INITIATOR_PLUS) self.running = not not (bitvector & Status.RUNNING) def __str__(self): status = list() if self.coldboot: status += ['Cold Boot'] if self.any_error: status += ['Any Error'] if self.rx_error: status += ['RX Error'] if self.SFI_error: status += ['SFI Error'] if self.outputstage_error: status += ['Output Stage Error'] if self.initiator_minus: status += ['Initiator -'] if self.initiator_plus: status += ['Initiator +'] if self.running: status += ['Running'] return '{'+ ('|'.join(status)) + '}' class ExtendedStatus: CHECKSUM_ERROR = (1<<23) # (1<<22) RXBUFFER_OVERRUN = (1<<21) NOT_NOW = (1<<20) UNKNOWN_COMMAND = (1<<19) BAD_VALUE = (1<<18) PARAMETER_LIMITS = (1<<17) # (1<<16) NO_SYSTEM = (1<<15) NO_RAMPS = (1<<14) PARAMETER_CHANGED = (1<<13) BUSY = (1<<12) PROGRAMING_ERROR = (1<<11) HIGH_TEMPERATURE = (1<<10) INITIATOR_ERROR = (1<< 9) INTERNAL_ERROR = (1<< 8) DRIVER_ERROR = (1<< 7) # (1<< 6) WAIT_FOR_SYNC = (1<< 5) LINEAR_AXIS = (1<< 4) FREE_RUNNING = (1<< 3) INITIALIZED = (1<< 2) HW_DISABLE = (1<< 1) INITIALIZING = (1<< 0) def __init__(self, bitvector): self.checksum_error = not not (bitvector & ExtendedStatus.CHECKSUM_ERROR) self.rxbuffer_overrun = not not (bitvector & ExtendedStatus.RXBUFFER_OVERRUN) self.not_now = not not (bitvector & ExtendedStatus.NOT_NOW) self.unknown_command = not not (bitvector & ExtendedStatus.UNKNOWN_COMMAND) self.bad_value = not not (bitvector & ExtendedStatus.BAD_VALUE) self.parameter_limits = not not (bitvector & ExtendedStatus.PARAMETER_LIMITS) self.no_system = not not (bitvector & ExtendedStatus.NO_SYSTEM) self.no_ramps = not not (bitvector & ExtendedStatus.NO_RAMPS) self.parameter_changed = not not (bitvector & ExtendedStatus.PARAMETER_CHANGED) self.busy = not not (bitvector & ExtendedStatus.BUSY) self.programing_error = not not (bitvector & ExtendedStatus.PROGRAMING_ERROR) self.high_temperature = not not (bitvector & ExtendedStatus.HIGH_TEMPERATURE) self.initiator_error = not not (bitvector & ExtendedStatus.INITIATOR_ERROR) self.internal_error = not not (bitvector & ExtendedStatus.INTERNAL_ERROR) self.driver_error = not not (bitvector & ExtendedStatus.DRIVER_ERROR) self.wait_for_sync = not not (bitvector & ExtendedStatus.WAIT_FOR_SYNC) self.linear_axis = not not (bitvector & ExtendedStatus.LINEAR_AXIS) self.free_running = not not (bitvector & ExtendedStatus.FREE_RUNNING) self.initialized = not not (bitvector & ExtendedStatus.INITIALIZED) self.hw_disable = not not (bitvector & ExtendedStatus.HW_DISABLE) self.initializing = not not (bitvector & ExtendedStatus.INITIALIZING) def __str__(self): status = list() if self.checksum_error: status += ["Checksum Error"] if self.rxbuffer_overrun: status += ["RX Buffer Overrun"] if self.not_now: status += ["Not Now"] if self.unknown_command: status += ["Unknown Command"] if self.bad_value: status += ["Bad Value"] if self.parameter_limits: status += ["Parameter Limits"] if self.no_system: status += ["Mo System"] if self.no_ramps: status += ["No Ramps"] if self.parameter_changed: status += ["Parameter Changed"] if self.busy: status += ["Busy"] if self.programing_error: status += ["Programming Error"] if self.high_temperature: status += ["High Temperature"] if self.initiator_error: status += ["Initiator Error"] if self.driver_error: status += ["Driver Error"] if self.wait_for_sync: status += ["Wait For Sync"] if self.linear_axis: status += ["Linear Axis"] if self.free_running: status += ["Free Running"] if self.initialized: status += ["Initialized"] if self.hw_disable: status += ["HW Disable"] if self.initializing: status += ["Initialzing"] return '{'+ ('|'.join(status)) + '}' class ReceiveData: def __init__(self, ID, status, data): self.ID = ID self.status = status = data class Axis: """ Abstraction for a IPCOMM Axis Phytron IPCOMM devices are addressable by a 4 bit ID (i.e. range 0x0 = 0 ... 0xf = 15). This allows for up to 16 IPCOMM devices being connected to a single communication bus. Each device is a strict master/slave communication endpoint ultimately driving an actuator. This is referred to as an Axis. """ def __init__(self, ipcomm, ID, name = None): """ ipcomm: a instance of Phytron.IPCOMM class, encapsulating a communication bus name: The human readable name given to the axis (served not purpose in this class at the moment) """ self.ipcomm = ipcomm self.ID = ID = name def execute(self, cmd): """ Execute a command on the axis. """ result = self.ipcomm.execute(self.ID, cmd) assert result.ID == self.ID self.status = result.status if isinstance(, ExtendedStatus): self.extended_status = return result def gotoAbs(self, position): return self.execute("GA%d" % position).status def gotoRelative(self, offset): return self.execute("GR%d" % offset).status def runForward(self): return self.execute("GF+").status def runBackward(self): return self.execute("GF-").status def stepForward(self): return self.execute("GS+").status def stepBackward(self): return self.execute("GS-").status def initializePlus(self): return self.execute("GI+").status def initializeMinus(self): return self.execute("GI-").status def syncstartCommence(self): return self.execute("GW").status def syncstartAbort(self): return self.execute("GB").status def halt(self): return self.execute("H").status def stop(self): return self.execute("B").status def setRunCurrent(self, current): return self.execute("PR%1.1f" % current) def getRunCurrent(self): return float(self.execute("PR??").data) def setBoostCurrent(self, current): return self.execute("PA%1.1f" % current) def getBoostCurrent(self): return float(self.execute("PA??").data) def setBoostDuration(self, duration): return self.execute("PT%d" % int(duration * 1e3)).status def getBoostDuration(self): return float(self.execute("PT?").data) * 1e-3 def setHaltCurrent(self, current): return self.execute("PS%1.1f" % current) def getHaltCurrent(self): return float(self.execute("PS??").data) def setCurrentPosition(self, position): return self.execute("PC%d" % position).status def getCurrentPosition(self): return int(self.execute("PC?").data) def setRunFrequency(self, freq): return self.execute("PF%d" % freq).status def getRunFrequency(self): return int(self.execute("PF?").data) def getMaxFrequency(self): return int(self.execute("IF?").data) def setOffsetFrequency(self, freq): return self.execute("PO%d" % freq).status def getOffsetFrequency(self): return int(self.execute("PO?").data) def setRunLimit(self, limit): if not limit: limit = 0xffffffff return self.execute("PG%d" % limit).status def getRunLimit(self, position): return int(self.execute("PG?").data) def setOffsetMinus(self, offset): return self.execute("PM%d" % offset).status def getOffsetMinus(self): return int(self.execute("PM?").data) def setOffsetPlus(self, offset): return self.execute("PP%d" % offset).status def getOffsetPlus(self): return int(self.execute("PP?").data) def setAxisLimited(self, limited): if limited: limited = 1 else: limited = 0 return self.execute("PL%d" % limited).status def getAxisLimited(self): return bool(int(self.execute("PL?").data)) def setDeltaZero(self, deltazero): return self.execute("IZ%d" % deltazero).status def getDeltaZero(self): return int(self.execute("IZ?")) def setOutputs(self, outputstate): outputs = 0 if isinstance(list, outputstate): for i,s in enumerate(outputstate): if not not outputstate[i]: outputs |= 1<