Subscribing to a Channel
From Digi Developer
Subscribing to a Channel
One of the powerful advanced features within the DIA platform is the ability to have data automatically pushed from one device driver to another. This is called publish/subscribe.
This article is intended for programmers writing their own device drivers - the devices described are NOT part of DIA.
Counting Hours - Simple Example
In this example we'll create two devices.
- The first is a thread-based AlarmClockDevice which wakes up periodically to do things. It has an output (meaning something which can be 'GET') named hour which pulses once per hour. The DIA Alarm Clock device sample code is here.
- The second device is a passive object without any thread. Instead it subscribes to the AlarmClockDevice's once per hour pulse, and catches the act of publishing to do something once per hour - in this case, it is a counter which counts the input pulse, so indirectly counts the hours.
Here is a fragment of the YML file used for this.
devices: - name: tick_tock driver: devices.alarm_clock_device:AlarmClockDevice - name: count_hours driver: devices.blocks.counter_device:CounterBlock settings: input_source: tick_tock.hour
The most important line is the input_source: tick_tock.hour, which describes a setting named input_source within the counter object which takes a string value. tick_tock is the name of another device, which offers a channel named hour which can be gotten.
Publisher/Source Device Code
The AlarmClockDevice defines these GETTABLE values in the def __init__() function.
## Channel Properties Definition: property_list = [ # gettable properties ChannelSourceDeviceProperty(name='minute', type=tuple, initial=Sample(timestamp=0, value=(0,None)), perms_mask=DPROP_PERM_GET), ChannelSourceDeviceProperty(name='hour', type=tuple, initial=Sample(timestamp=0, value=(0,None)), perms_mask=DPROP_PERM_GET), ChannelSourceDeviceProperty(name='day', type=tuple, initial=Sample(timestamp=0, value=(0,None)), perms_mask=DPROP_PERM_GET), ]
Then later in the thread.run() section of the code there is a fragment which sets the hour property when appropriate. In this design it is always set to True - it is never set False because it is the act of setting which triggers the desired reaction based on the publish/subscribe.
if( self.time_tup[TM_MIN] == 0): # publish hourly pulse only first time we see min=zero if( not self.push_hr): self.push_hr = True # now_tuple hold the time and time_tuple info self.property_set(PROP_HOUR, Sample(self.time_now, now_tuple )) else: self.push_hr = False
That is it for the source of the data - it merely sets a new sample named hour once per hour, and has no understanding of the external events this drives.
Subscriber/Consumer Device Code
The CounterBlock code is more involved. First we need some properties/settings in the def __init__() function.
- The first is a string setting called input_source, which defaults to None meaning there is no publish/subscribe link desired.
- The second is a gettable/settable property called input which includes a callback function to handle any sets - which will also handle the sample push as the source device publishes the new sample.
## Settings Table Definition: settings_list = [ Setting(name='input_source', type=str, required=False, default_value=None), ] ## Channel Properties Definition: property_list = [ ChannelSourceDeviceProperty(name="input", type=tuple, initial=Sample(timestamp=0, value=(0,None)), perms_mask=DPROP_PERM_SET | DPROP_PERM_GET, set_cb=self.prop_set_input), ]
Now we need to actually create the link when the CounterBlock starts (in the def start(self) routine). It fetches the current setting named input_source, which should be either None or a string like tick_tock.hour (or device.property). Warning: make sure you put this near the END of the self.start() routine as it could cause a premature callback if the channel is already active. Or said another way, make sure everything in your callback is initialized and ready to go before subscribing to the channel.
# wire up any inputs cm = self.__core.get_service("channel_manager") cp = cm.channel_publisher_get() cdb = cm.channel_database_get() try: source_name = SettingsBase.get_setting(self, 'input_source') if( source_name != None): source_chan = cdb.channel_get( source_name) # pre-load the starting value, which won't be published to us self.my_input = source_chan.get().value cp.subscribe( source_name, self.prop_set_input ) except: traceback.print_exc() self.my_input = True
Finally we need to create the actual callback to handle either a direct set, or the publish set. The real meat of the passive action is in the self.process( sam.value) call. In this example it merely counts the hours.
def prop_set_input( self, sam): # someone pushing in new input - is either sample or channel if( not isinstance( sam, Sample)): # the publish/sub pushes in the channel, so convert to sample sam = sam.get() self.process( sam.value) self.property_set("input", Sample(sam.timestamp, self.my_input)) return True
Counting Motor Run Hours
The first example was simple, but such a DIA-based uptime counter is not very practical.
So we can expand the example with real-world I/O to create a motor-run timer.
- motor_01 is a Digi XBee DIO Adapter which reads the motor's run contact on its first input (motor_01.channel1_input).
- motor_01_hours is an HourMeterBlock, which is a special sub-class of the CounterBlock object which counts minutes that the input is true or false - not the number of times it is set. The motor_01_hours.hours output would show the resettable run total in hours (a float), while the motor_01_hours.total_hours output would show the non-resettable run total in hours (a float).
- Since a reset total of 250 is included, the motor_01_hours.overflow output (a bool) could be used to trigger some action like sending an email. Since motor_01_hours auto resets, motor_01_hours.hours would automatically drop back to zero after 250 hours where seen.
devices: - name: xbee_device_manager driver: devices.xbee.xbee_device_manager.xbee_device_manager:XBeeDeviceManager - name: motor_01 driver: devices.xbee.xbee_devices.xbee_dio:XBeeDIO settings: xbee_device_manager: xbee_device_manager extended_address: "00:13:a2:00:40:0a:49:7a!" sample_rate_ms: 1000 channel1_dir: "in" - name: tick_tock driver: devices.alarm_clock_device:AlarmClockDevice - name: motor_01_hours driver: devices.blocks.counter_device:HourMeterBlock settings: auto_reset: 250 input_source: motor_01.channel1_input tick_source: tick_tock.minute
Counting Motor Run Hours with Lamp
This example can be expanded even furrther:
- motor_01 is a Digi XBee DIO Adapter with the follwoing config:
- motor_01.channel1_input reads the motor run contact. It is used as the motor_01_hours.input, enabling totalizing running hours in minute increments.
- motor_01.channel2_source sends motor_01_hours.overflow to a red lamp the maintenance people can see
- motor_01.channel3_input reads a push button the maintenance people can push, which resets the motor_01_hours block and turns the red lamp off.
- motor_01_hours is an HourMeterBlock, which as before counts the hours the motor runs using minute ticks provided by the AlarmClockDevice.
- Since a reset total of 250 is included, the motor_01_hours.overflow output (a bool) can also be used to trigger some action like sending an email. In this design motor_01_hours doesn't auto resets, so motor_01_hours.hours keeps counting above 250 hours, and only resets when the user pushes the button on motor_01.channel3_input.
devices: - name: xbee_device_manager driver: devices.xbee.xbee_device_manager.xbee_device_manager:XBeeDeviceManager - name: motor_01 driver: devices.xbee.xbee_devices.xbee_dio:XBeeDIO settings: xbee_device_manager: xbee_device_manager extended_address: "00:13:a2:00:40:0a:49:7a!" sample_rate_ms: 1000 channel1_dir: "in" channel2_dir: "out" channel3_dir: "in" channel2_source: motor_01_hours:overflow - name: tick_tock driver: devices.alarm_clock_device:AlarmClockDevice - name: motor_01_hours driver: devices.blocks.counter_device:HourMeterBlock settings: manual_reset: 250 input_source: motor_01.channel1_input reset_source: motor_01.channel3_input tick_source: tick_tock.minute