Smart Plug Interactive Demo

From Digi Developer

Jump to: navigation, search

Below is a simple interactive demo that makes use of the functionality for the XBee Smart Plug. It is designed to run on a Digi ConnectPort gateway such as an X2 or X4.

from zigbee import getnodelist, ddo_get_param, ddo_set_param
import struct, sys
 
def parseIS(data):
    if len(data) % 2 == 0:
        sets, datamask, analogmask = struct.unpack("!BHB", data[:4])
        data = data[4:]
 
    else:        
        sets, mask = struct.unpack("!BH", data[:3])
        data = data[3:]
        datamask = mask % 512 # Move the first 9 bits into a seperate mask
        analogmask  = mask >> 9 #Move the last 7 bits into a seperate mask
 
    retdir = {}
 
    if datamask:
        datavals = struct.unpack("!H", data[:2])[0]
        data = data[2:]
 
        currentDI = 0
        while datamask:
            if datamask & 1:
                retdir["DIO%d" % currentDI] = datavals & 1
            datamask >>= 1
            datavals >>= 1
            currentDI += 1
 
    currentAI = 0
    while analogmask:
        if analogmask & 1:
            aval = struct.unpack("!H", data[:2])[0]
            data = data[2:]
 
            retdir["AI%d" % currentAI] = aval
        analogmask >>= 1
        currentAI += 1
 
    return retdir
 
class sp:
  def __init__(self, addr):
    self.addr = addr
    self.status = 'Unknown'
 
  def get_status(self):
    return self.status
 
  def enable_sensors(self):
    try:
      ddo_set_param(self.addr, 'D1', 2)
      ddo_set_param(self.addr, 'D3', 2)
      ddo_set_param(self.addr, 'WR')
      ddo_set_param(self.addr, 'AC')
    except Exception, e:      
      raise Exception("Failed to enable sensors on device: %s" %self.addr)
 
  def get_sensor(self):
    try:
      raw_sample = ddo_get_param(self.addr, 'IS')
    except Exception, e:
      print "Failed to get sample from device: %s" %self.addr      
      return (0, 0)
 
    parsed_sample = parseIS(raw_sample)    
 
    lux = (float(parsed_sample["AI1"]) / 1023.0) * 1200.0
 
    mV = (float(parsed_sample["AI3"]) /1023.0) * 1200.0    
    current = (mV*(156.0/47.0) - 520.0) /180.0 * .7071 
 
    return (lux, current)
 
  def turn_on(self):
    try:
      ddo_set_param(self.addr, 'D4', 5)
      ddo_set_param(self.addr, 'WR')
      ddo_set_param(self.addr, 'AC')
      self.status = 'on'
    except Exception, e:      
      print "Error with turning on device %s" %self.addr
 
  def turn_off(self):
    try:
      ddo_set_param(self.addr, 'D4', 4)
      ddo_set_param(self.addr, 'WR')
      ddo_set_param(self.addr, 'AC')
      self.status = 'off'
    except Exception, e:      
      print "Error with turning off device %s" %self.addr
 
def main():
  print "Searching for Smart Plugs on XBee network..."
  node_list = getnodelist()
  smart_plugs = []
  for node in node_list:
    if node.device_type & 0x000F == 0x000F:
      t = sp(node.addr_extended)
      smart_plugs.append(t)
 
  assert len(smart_plugs) != 0, 'No Smart plugs detected'
 
  print "Enabling sensors on device"
  for node in smart_plugs:
    node.enable_sensors()
 
  status_str = "\nIndex: %d Addr: %s \n\tstatus = %s Light = %f Current = %f"
  help_str = "Type 'quit' to exit\n\t'on 1' to turn on node 1\n\t'off 1' to turn off node 1\n"   
 
  print help_str
  while 1:
    for node in smart_plugs:
      i = smart_plugs.index(node)
      a = node.addr
      s = node.get_status()
      l, c = node.get_sensor()
      print status_str %(i, a, s, l, c)
 
    input = raw_input("\n=>")
    input = input.lower().strip()
 
    if input == 'quit':
      print "Quitting"
      sys.exit(0)
    if input in ['help', '?']:
      print help_str
      continue
    else:
      parsed = input.split()
      if len(parsed) == 2:
        action = parsed[0]
        try:
          index = int(parsed[1])
          assert len(smart_plugs) > index >= 0, '%d > %d >= %d' %(len(smart_plugs), index, 0) 
        except Exception, e:          
          print "Index error: %s" %parsed[1]
          continue
 
        if action == 'on':
          print "Turning smart plug %s on" %smart_plugs[index].addr
          smart_plugs[index].turn_on()
 
        elif action == 'off':
          print "Turning smart plug %s off" %smart_plugs[index].addr
          smart_plugs[index].turn_off()
 
        else:
          print "Unknown action: %s" %action
          continue
 
if __name__ == '__main__':
  main()

Media:Smart_plug.zip

Personal tools
Wiki Editing