Application Interface (API)

The Qb2 uses gRPC Remote Procedure Calls for communicating with the outside world. The corresponding .proto files are publicy available on Github. They can be compiled for various programming languages, listed here. The gRPC API documentation can be found in the protocol section of this document.

Additionally, there is a pre-built python package available which will be used in the following examples. In this guide the Blickfeld Qb2 Python library will be first installed and a secure connection to a Qb2 device will be established. Then the version of the currently installed firmware is fetched from the device. Finally we take a look at asynchronous usage of the Blickfeld library.

Installing Blickfeld Qb2 Library

The python package to control the Qb2 device is called blickfeld_qb2 and can be installed from the PyPI repository with the respective command shown below:

pip3 install blickfeld_qb2

Connecting to Qb2

To interact with a Qb2 device, a gRPC connection has to be established. For that we use the gRPC Channel from the base.grpc.channel namespace of the blickfeld_qb2 library:

from blickfeld_qb2.base.grpc.channel import Channel

An IP address or a hostname of a Qb2 device can be used to establish a connection. The connection is by default encrypted and authenticated. The serial_number can be passed additionally to authenticate a particular device. If it is not supplied, the authentication only checks if it is a Blickfeld device.

Get Qb2 Firmware Version

The code snippet below shows an example of connecting to Qb2 with a hostname qb2-xxxxxxxxx and fetching information about its firmware version. For that we use the Firmware service from blickfeld_qb2.system.services namespace over the gRPC channel and make a request get_status() to get a response with information about the currently installed firmware.

import blickfeld_qb2
from blickfeld_qb2.base.grpc.channel import Channel

# open secure connection to Qb2
with Channel(fqdn_or_ip="qb2-xxxxxxxxx") as channel:
    # request firmware status
    response = blickfeld_qb2.system.services.Firmware(channel).get_status()
    # extract firmware version
    firmware_version = response.status.installed_firmware_info
    # print firmware version
    print(firmware_version)

    # NOTE: A channel can be reused for other requests. It is not required to create a channel per request.

Every response to the service call is an object that usually consists of several nested objects according to the protocol schema defined in the corresponding sections of the documentation. In this scenario we can get information about firmware version installed via the field response.status.installed_firmware_info.

The output of the print should look similar to the one shown below:

FirmwareInfo(label='JAKOB v1.5.0', version=Version(major=1, minor=5, revision='e87dfda0', is_release=True), allowed_downgrade_version=Version(minor=26))

Asynchronous Requests

If needed, the Blickfeld library can be also used in a fully asynchronous mode. In the example shown below we introduce a coroutine get_firmware_version(), which opens an asynchronous gRPC channel and makes use of an asynchronous request async_get_status() to get the same information about the Qb2 firmware status.

This subsection is optional
import asyncio
import blickfeld_qb2
from blickfeld_qb2.base.grpc.channel import Channel

async def main():
    # open secure asynchronous connection to Qb2
    async with Channel(fqdn_or_ip="qb2-xxxxxxxxx") as channel:
        # request firmware status asynchronously and await for the response
        response = await blickfeld_qb2.system.services.Firmware(channel).async_get_status()
        # get firmware version from the response
        firmware_version = response.status.installed_firmware_info
        # print firmware version
        print(firmware_version)

        # NOTE: A channel can be reused for other requests. It is not required to create a channel per request.

# run couroutine on the event loop
asyncio.run(main())
To make an asynchronous request over the asynchronous channel, the desired method name should be prefixed with async_.

This coroutine can be further executed on the event loop. The output of the print should look similar to the one shown below:

FirmwareInfo(label='JAKOB v1.5.0', version=Version(major=1, minor=5, revision='e87dfda0', is_release=True), allowed_downgrade_version=Version(minor=26))