streamlit_azure.py
import streamlit as st
import time
import json
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import CloudToDeviceMethod, CloudToDeviceMethodResult
def get_config():
with open('config.json') as config_file:
data = json.load(config_file)
return data
def iothub_method_sample_run(connection_string, device_id):
try:
registry_manager = IoTHubRegistryManager(connection_string)
direct_method = CloudToDeviceMethod(method_name="Your method name", payload=None)
result = registry_manager.invoke_device_method(device_id, direct_method)
return result.payload
except Exception as ex:
print("Unexpected error {0}".format(ex))
return
except KeyboardInterrupt:
print("iothub_registry_manager_sample stopped")
def main():
st.title('Azure IoT Hub Data')
config = get_config()
connection_string = config['IOT_HUB_CONNECTION_STRING']
device_id = config['DEVICE_ID']
interval = config['INTERVAL']
while True:
data = iothub_method_sample_run(connection_string, device_id)
st.write(data)
time.sleep(interval)
if __name__ == "__main__":
main()