Smart Plug Interactive Demo
From Digi Developer
Below is a simple interactive demo that makes use of the functionality for the XBee Smart Plug. It is designed to run on a Digi ConnectPort gateway such as an X2 or X4.
from zigbee import getnodelist, ddo_get_param, ddo_set_param import struct, sys def parseIS(data): if len(data) % 2 == 0: sets, datamask, analogmask = struct.unpack("!BHB", data[:4]) data = data[4:] else: sets, mask = struct.unpack("!BH", data[:3]) data = data[3:] datamask = mask % 512 # Move the first 9 bits into a seperate mask analogmask = mask >> 9 #Move the last 7 bits into a seperate mask retdir = {} if datamask: datavals = struct.unpack("!H", data[:2])[0] data = data[2:] currentDI = 0 while datamask: if datamask & 1: retdir["DIO%d" % currentDI] = datavals & 1 datamask >>= 1 datavals >>= 1 currentDI += 1 currentAI = 0 while analogmask: if analogmask & 1: aval = struct.unpack("!H", data[:2])[0] data = data[2:] retdir["AI%d" % currentAI] = aval analogmask >>= 1 currentAI += 1 return retdir class sp: def __init__(self, addr): self.addr = addr self.status = 'Unknown' def get_status(self): return self.status def enable_sensors(self): try: ddo_set_param(self.addr, 'D1', 2) ddo_set_param(self.addr, 'D3', 2) ddo_set_param(self.addr, 'WR') ddo_set_param(self.addr, 'AC') except Exception, e: raise Exception("Failed to enable sensors on device: %s" %self.addr) def get_sensor(self): try: raw_sample = ddo_get_param(self.addr, 'IS') except Exception, e: print "Failed to get sample from device: %s" %self.addr return (0, 0) parsed_sample = parseIS(raw_sample) lux = (float(parsed_sample["AI1"]) / 1023.0) * 1200.0 mV = (float(parsed_sample["AI3"]) /1023.0) * 1200.0 current = (mV*(156.0/47.0) - 520.0) /180.0 * .7071 return (lux, current) def turn_on(self): try: ddo_set_param(self.addr, 'D4', 5) ddo_set_param(self.addr, 'WR') ddo_set_param(self.addr, 'AC') self.status = 'on' except Exception, e: print "Error with turning on device %s" %self.addr def turn_off(self): try: ddo_set_param(self.addr, 'D4', 4) ddo_set_param(self.addr, 'WR') ddo_set_param(self.addr, 'AC') self.status = 'off' except Exception, e: print "Error with turning off device %s" %self.addr def main(): print "Searching for Smart Plugs on XBee network..." node_list = getnodelist() smart_plugs = [] for node in node_list: if node.device_type & 0x000F == 0x000F: t = sp(node.addr_extended) smart_plugs.append(t) assert len(smart_plugs) != 0, 'No Smart plugs detected' print "Enabling sensors on device" for node in smart_plugs: node.enable_sensors() status_str = "\nIndex: %d Addr: %s \n\tstatus = %s Light = %f Current = %f" help_str = "Type 'quit' to exit\n\t'on 1' to turn on node 1\n\t'off 1' to turn off node 1\n" print help_str while 1: for node in smart_plugs: i = smart_plugs.index(node) a = node.addr s = node.get_status() l, c = node.get_sensor() print status_str %(i, a, s, l, c) input = raw_input("\n=>") input = input.lower().strip() if input == 'quit': print "Quitting" sys.exit(0) if input in ['help', '?']: print help_str continue else: parsed = input.split() if len(parsed) == 2: action = parsed[0] try: index = int(parsed[1]) assert len(smart_plugs) > index >= 0, '%d > %d >= %d' %(len(smart_plugs), index, 0) except Exception, e: print "Index error: %s" %parsed[1] continue if action == 'on': print "Turning smart plug %s on" %smart_plugs[index].addr smart_plugs[index].turn_on() elif action == 'off': print "Turning smart plug %s off" %smart_plugs[index].addr smart_plugs[index].turn_off() else: print "Unknown action: %s" %action continue if __name__ == '__main__': main()
