Home > Blog > Test data upload over MQTT using Python to ThingsBoard (community)

Test data upload over MQTT using Python to ThingsBoard (community)

This sample Python script uploads data to my self-hosted ThingsBoard server. Before I start, I need to install the script requirements and set up an API key in my ThingsBoard server.

Main configuration

Part 1 : MQTT client setup

sudo pip install paho-mqtt
sudo apt-get install python-dev

Copy this code into a test.py file.

import os
import time
import sys
import paho.mqtt.client as mqtt
import json
import random


# Thingsboard server hostname
THINGSBOARD_HOST = 'iot.gabrielkheisa.xyz'

# set your own access token into Thingsboard server's device configuration
ACCESS_TOKEN = '072b6a04ea02876164cace948248b5d'

# loop interval
INTERVAL=2

sensor_data = {'temperature': 0, 'humidity': 0}

next_reading = time.time() 

client = mqtt.Client()

# Set access token
client.username_pw_set(ACCESS_TOKEN)

# Connect to ThingsBoard using default MQTT port and 60 seconds keepalive interval
# 1883 is the default port for MQTT
client.connect(THINGSBOARD_HOST, 1883, 60)

client.loop_start()

try:
  while True:
    # the data that I'll send, using random integer generator
    humidity = random.randint(50, 60)
    temperature = random.randint(20, 25)
    humidity = round(humidity, 2)
    temperature = round(temperature, 2)
    latitude = -7.771221 + random.uniform(-0.001, 0.001)
    longitude = 110.377512 + random.uniform(-0.001, 0.001)
    print("Temperature: ",temperature,"\u00b0C", "Humidity: ",humidity, "Latitude: ",latitude, "Longitude: ", longitude)
    sensor_data['temperature'] = temperature
    sensor_data['humidity'] = humidity
    sensor_data['latitude'] = latitude
    sensor_data['longitude'] = longitude

    # Sending humidity and temperature data to ThingsBoard
    # dashboard here https://iot.gabrielkheisa.xyz/dashboard/a1af1620-e275-11ec-9400-036f00b69a80?publicId=ce59c250-e276-11ec-9400-036f00b69a80
    client.publish('v1/devices/me/telemetry', json.dumps(sensor_data), 1)

    next_reading += INTERVAL
    sleep_time = next_reading-time.time()
    if sleep_time > 0:
      time.sleep(sleep_time)

except:
  print("Loop ended")

client.loop_stop()
client.disconnect()

Part 2: MQTT server setup (ThingsBoard)

  1. Log in as tenant
  2. Go to devices
  3. Add new devices
Device section

4. Set your own name and an access token, from the previous code, my access token is 072b6a04ea02876164cace948248b5d .

Access token from the Python script

Part 3: Testing

Test the Python script and go to the latest telemetry.

the Python script has been run from Colab.
Latest telemetry from ThingsBoard
Dashboard preview

These are some of my resources, you may use these for testing purposes :

  1. Python script in Google Colab
  2. Dashboard preview in ThingsBoard

Reference :

Leave a Reply

Your email address will not be published. Required fields are marked *