Subscribing to a Channel

From Digi Developer

Jump to: navigation, search

Contents

Subscribing to a Channel

One of the powerful advanced features within the DIA platform is the ability to have data automatically pushed from one device driver to another. This is called publish/subscribe.

This article is intended for programmers writing their own device drivers - the devices described are NOT part of DIA.

Counting Hours - Simple Example

Hypothetical YML

In this example we'll create two devices.

  • The second device is a passive object without any thread. Instead it subscribes to the AlarmClockDevice's once per hour pulse, and catches the act of publishing to do something once per hour - in this case, it is a counter which counts the input pulse, so indirectly counts the hours.

Image:Count_hours.png

Here is a fragment of the YML file used for this.

devices:

  - name: tick_tock
    driver: devices.alarm_clock_device:AlarmClockDevice

  - name: count_hours
    driver: devices.blocks.counter_device:CounterBlock
    settings:
        input_source: tick_tock.hour

The most important line is the input_source: tick_tock.hour, which describes a setting named input_source within the counter object which takes a string value. tick_tock is the name of another device, which offers a channel named hour which can be gotten.

Publisher/Source Device Code

The AlarmClockDevice defines these GETTABLE values in the def __init__() function.

    ## Channel Properties Definition:
    property_list = [
            # gettable properties
 
            ChannelSourceDeviceProperty(name='minute', type=tuple,
                initial=Sample(timestamp=0, value=(0,None)),
                perms_mask=DPROP_PERM_GET),
 
            ChannelSourceDeviceProperty(name='hour', type=tuple,
                initial=Sample(timestamp=0, value=(0,None)),
                perms_mask=DPROP_PERM_GET),
 
            ChannelSourceDeviceProperty(name='day', type=tuple,
                initial=Sample(timestamp=0, value=(0,None)),
                perms_mask=DPROP_PERM_GET),
    ]

Then later in the thread.run() section of the code there is a fragment which sets the hour property when appropriate. In this design it is always set to True - it is never set False because it is the act of setting which triggers the desired reaction based on the publish/subscribe.

    if( self.time_tup[TM_MIN] == 0):
       # publish hourly pulse only first time we see min=zero
       if( not self.push_hr):
          self.push_hr = True
          # now_tuple hold the time and time_tuple info
          self.property_set(PROP_HOUR, Sample(self.time_now, now_tuple ))
       else:
          self.push_hr = False

That is it for the source of the data - it merely sets a new sample named hour once per hour, and has no understanding of the external events this drives.

Subscriber/Consumer Device Code

The CounterBlock code is more involved. First we need some properties/settings in the def __init__() function.

  • The first is a string setting called input_source, which defaults to None meaning there is no publish/subscribe link desired.
  • The second is a gettable/settable property called input which includes a callback function to handle any sets - which will also handle the sample push as the source device publishes the new sample.
        ## Settings Table Definition:
        settings_list = [
            Setting(name='input_source', type=str, required=False, default_value=None),
        ]
 
        ## Channel Properties Definition:
        property_list = [
 
            ChannelSourceDeviceProperty(name="input", type=tuple,
                initial=Sample(timestamp=0, value=(0,None)),
                perms_mask=DPROP_PERM_SET | DPROP_PERM_GET,
                set_cb=self.prop_set_input),
            ]

Now we need to actually create the link when the CounterBlock starts (in the def start(self) routine). It fetches the current setting named input_source, which should be either None or a string like tick_tock.hour (or device.property). Warning: make sure you put this near the END of the self.start() routine as it could cause a premature callback if the channel is already active. Or said another way, make sure everything in your callback is initialized and ready to go before subscribing to the channel.

        # wire up any inputs
        cm = self.__core.get_service("channel_manager")
        cp = cm.channel_publisher_get()
        cdb = cm.channel_database_get()
 
        try:
            source_name = SettingsBase.get_setting(self, 'input_source')
            if( source_name != None):
                source_chan = cdb.channel_get( source_name)
                # pre-load the starting value, which won't be published to us
                self.my_input = source_chan.get().value
                cp.subscribe( source_name, self.prop_set_input )
 
        except:
            traceback.print_exc()
            self.my_input = True

Finally we need to create the actual callback to handle either a direct set, or the publish set. The real meat of the passive action is in the self.process( sam.value) call. In this example it merely counts the hours.

    def prop_set_input( self, sam):
        # someone pushing in new input - is either sample or channel
 
        if( not isinstance( sam, Sample)):
            # the publish/sub pushes in the channel, so convert to sample
            sam = sam.get()
 
        self.process( sam.value)
        self.property_set("input", Sample(sam.timestamp, self.my_input))
        return True

Counting Motor Run Hours

The first example was simple, but such a DIA-based uptime counter is not very practical.

So we can expand the example with real-world I/O to create a motor-run timer.

  • motor_01 is a Digi XBee DIO Adapter which reads the motor's run contact on its first input (motor_01.channel1_input).
  • motor_01_hours is an HourMeterBlock, which is a special sub-class of the CounterBlock object which counts minutes that the input is true or false - not the number of times it is set. The motor_01_hours.hours output would show the resettable run total in hours (a float), while the motor_01_hours.total_hours output would show the non-resettable run total in hours (a float).
  • Since a reset total of 250 is included, the motor_01_hours.overflow output (a bool) could be used to trigger some action like sending an email. Since motor_01_hours auto resets, motor_01_hours.hours would automatically drop back to zero after 250 hours where seen.

Image:Motor_run.png

devices:

  - name: xbee_device_manager
    driver: devices.xbee.xbee_device_manager.xbee_device_manager:XBeeDeviceManager

  - name: motor_01
    driver: devices.xbee.xbee_devices.xbee_dio:XBeeDIO
    settings:
        xbee_device_manager: xbee_device_manager
        extended_address: "00:13:a2:00:40:0a:49:7a!"
        sample_rate_ms: 1000
        channel1_dir: "in"

  - name: tick_tock
    driver: devices.alarm_clock_device:AlarmClockDevice

  - name: motor_01_hours
    driver: devices.blocks.counter_device:HourMeterBlock
    settings:
        auto_reset: 250
        input_source: motor_01.channel1_input
        tick_source: tick_tock.minute
 

Counting Motor Run Hours with Lamp

This example can be expanded even furrther:

  • motor_01 is a Digi XBee DIO Adapter with the follwoing config:
    • motor_01.channel1_input reads the motor run contact. It is used as the motor_01_hours.input, enabling totalizing running hours in minute increments.
    • motor_01.channel2_source sends motor_01_hours.overflow to a red lamp the maintenance people can see
    • motor_01.channel3_input reads a push button the maintenance people can push, which resets the motor_01_hours block and turns the red lamp off.
  • motor_01_hours is an HourMeterBlock, which as before counts the hours the motor runs using minute ticks provided by the AlarmClockDevice.
  • Since a reset total of 250 is included, the motor_01_hours.overflow output (a bool) can also be used to trigger some action like sending an email. In this design motor_01_hours doesn't auto resets, so motor_01_hours.hours keeps counting above 250 hours, and only resets when the user pushes the button on motor_01.channel3_input.

Image:Motor_lamp.png

devices:

  - name: xbee_device_manager
    driver: devices.xbee.xbee_device_manager.xbee_device_manager:XBeeDeviceManager

  - name: motor_01
    driver: devices.xbee.xbee_devices.xbee_dio:XBeeDIO
    settings:
        xbee_device_manager: xbee_device_manager
        extended_address: "00:13:a2:00:40:0a:49:7a!"
        sample_rate_ms: 1000
        channel1_dir: "in"
        channel2_dir: "out"
        channel3_dir: "in"
        channel2_source: motor_01_hours:overflow

  - name: tick_tock
    driver: devices.alarm_clock_device:AlarmClockDevice

  - name: motor_01_hours
    driver: devices.blocks.counter_device:HourMeterBlock
    settings:
        manual_reset: 250
        input_source: motor_01.channel1_input
        reset_source: motor_01.channel3_input
        tick_source: tick_tock.minute
 
Personal tools
Wiki Editing