IoTHubと連動する前に、時系列データの取り扱いが得意なStream AnalyticsとFunctionを連動する方法を調べてみます。
実際のところ
Functionは直接Azure Stream Analytics(ASA)にデータをおくる事ができないので
Azure Event Hubsを咬ませてから実行します。
Azure Event Hubsのキー生成
スクリプト本体
次のような動作をするスクリプトは以下の通り。
import logging import azure.functions as func import json import datetime from azure.eventhub import EventHubProducerClient, EventData import os import json def main(req: func.HttpRequest) -> func.HttpResponse: logging.info('Python HTTPトリガー関数がリクエストを処理しました。') # HTTPリクエストからJSONを解析します try: req_body = req.get_json() except ValueError: pass else: # JSONからValueを取得します value = req_body.get('Value') # タイムスタンプを作成します timestamp = datetime.datetime.now().isoformat() # Valueとタイムスタンプを統合した単一のJSONを作成します data = { 'Value': value, 'TimeStamp': timestamp, } data = json.dumps(data) # JSONからEvent Hubsの接続文字列とハブ名を取得します with open('config.json') as config_file: config = json.load(config_file) event_hub_connection_str = config["EventHubsConnectionString"] event_hub_name = config["EventHubName"] # データをAzure Event Hubsに送信します producer = EventHubProducerClient.from_connection_string(event_hub_connection_str, event_hub_name) event_data_batch = producer.create_batch() event_data_batch.add(EventData(data)) producer.send_batch(event_data_batch) return func.HttpResponse("Event Hubsにデータを送信しました。")