mirror of
https://github.com/sascha-hemi/pycom-documentation.git
synced 2026-03-21 03:04:13 +01:00
PyNode Air and AirQ examples (#450)
* PyNode Air and AirQ examples * Update config.toml Add Pynode+ to menu * Update _index.md * Update _index.md
This commit is contained in:
22
config.toml
22
config.toml
@@ -163,6 +163,24 @@ theme = "doc-theme"
|
||||
identifier = "tutorials@basic@repl"
|
||||
parent = "tutorials@basic"
|
||||
weight = 10
|
||||
[[menu.main]]
|
||||
name = "PyNode+"
|
||||
url = "/tutorials/pynode/"
|
||||
identifier = "tutorials@pynode"
|
||||
parent = "tutorials"
|
||||
weight = 12
|
||||
[[menu.main]]
|
||||
name = "PyNode+ Air"
|
||||
url = "/tutorials/pynode/pynodeair/"
|
||||
identifier = "tutorials@pynode@pynodeair"
|
||||
parent = "tutorials@pynode"
|
||||
weight = 12
|
||||
[[menu.main]]
|
||||
name = "PyNode+ AirQ"
|
||||
url = "/tutorials/pynode/pynodeairq/"
|
||||
identifier = "tutorials@pynode@pynodeairq"
|
||||
parent = "tutorials@pynode"
|
||||
weight = 12
|
||||
[[menu.main]]
|
||||
name = "Sleep"
|
||||
url = "/tutorials/basic/sleep/"
|
||||
@@ -438,7 +456,7 @@ theme = "doc-theme"
|
||||
url = "/tutorials/expansionboards/pygate/"
|
||||
identifier = "tutorials@expansionboards@pygate"
|
||||
parent = "tutorials@expansionboards"
|
||||
weight = 5
|
||||
weight = 5
|
||||
[[menu.main]]
|
||||
name = "Tracking"
|
||||
url = "/tutorials/expansionboards/tracking/"
|
||||
@@ -690,7 +708,7 @@ theme = "doc-theme"
|
||||
identifier = "firmwareapi@pycom@network@bluetooth@BLE_Mesh"
|
||||
parent = "firmwareapi@pycom@network@bluetooth"
|
||||
weight = 70
|
||||
|
||||
|
||||
|
||||
# [Errno 2] No such file or directory: './content/firmwareapi/pycom/network/lora/README.md'
|
||||
[[menu.main]]
|
||||
|
||||
11
content/tutorials/pynode/_index.md
Normal file
11
content/tutorials/pynode/_index.md
Normal file
@@ -0,0 +1,11 @@
|
||||
---
|
||||
title: 'PyNode+'
|
||||
|
||||
---
|
||||
|
||||
The following PyNode+ BLE enabled sensors are now available:
|
||||
|
||||
* [PyNode Air](../pynode/pynodeair/)
|
||||
* [PyNode AirQ](../pynode/pynodeairq/)
|
||||
|
||||
> Please see the tutorials above to query the PuNode+ sensor from MicroPython or adjust the sleep time.
|
||||
150
content/tutorials/pynode/pynodeair.md
Normal file
150
content/tutorials/pynode/pynodeair.md
Normal file
@@ -0,0 +1,150 @@
|
||||
---
|
||||
title: "PyNode+ Air Examples"
|
||||
aliases:
|
||||
- tutorials/pynode/PyNodeAir.html
|
||||
- tutorials/pynode/pynodeair.md
|
||||
- chapter/tutorials/pynode/PyNodeAir
|
||||
---
|
||||
|
||||
You can use the following code to query the PyNode+ Air sensor from MicroPython
|
||||
|
||||
```python
|
||||
from network import Bluetooth
|
||||
import time
|
||||
import ubinascii
|
||||
import struct
|
||||
bt = Bluetooth()
|
||||
bt.start_scan(-1)
|
||||
|
||||
def twoscmp(value):
|
||||
if value > 128:
|
||||
value = value - 256
|
||||
return value
|
||||
|
||||
while True:
|
||||
adv = bt.get_adv()
|
||||
if adv:
|
||||
read_adv = bt.resolve_adv_data(adv.data, Bluetooth.ADV_MANUFACTURER_DATA)
|
||||
if read_adv==None:
|
||||
pass
|
||||
else:
|
||||
manuf = ubinascii.hexlify(read_adv)
|
||||
manuf_data = ubinascii.hexlify(read_adv[0:4])
|
||||
if manuf_data == b'4c000215': # ibeacon
|
||||
print(ubinascii.hexlify(adv.mac))
|
||||
uuid = ubinascii.hexlify(read_adv[4:20])
|
||||
major = ubinascii.hexlify(read_adv[20:22])
|
||||
minor = ubinascii.hexlify(read_adv[22:24])
|
||||
tx_power = ubinascii.hexlify(read_adv[24:25])
|
||||
tx_power_real = twoscmp(int(tx_power, 16))
|
||||
major_int = int(major, 16)
|
||||
major_f = float(-45 + 175*major_int/65536)
|
||||
minor_int = int(minor,16)
|
||||
minor_f = float(100*minor_int/65536)
|
||||
print("uuid:",ubinascii.unhexlify(uuid), " rest:", major, minor)
|
||||
print("temperature:",major_f, "humidity:", minor_f)
|
||||
print("")
|
||||
else:
|
||||
time.sleep(0.050)
|
||||
```
|
||||
|
||||
|
||||
The below code can be used to modify the sleep duration.
|
||||
|
||||
```python
|
||||
import time
|
||||
import ubinascii
|
||||
import struct
|
||||
|
||||
def get_adv_int(arg):
|
||||
time_msec = {
|
||||
100: b'\x01', # 100ms
|
||||
200: b'\x02', # 200ms
|
||||
500: b'\x05', # 500ms
|
||||
1000: b'\x10' # 1000ms
|
||||
}
|
||||
return time_msec.get(arg, b'\x00') # default is 1000ms
|
||||
|
||||
def get_adv_dur(arg):
|
||||
time_sec = {
|
||||
1: b'\x01', # 1 hour
|
||||
2: b'\x02', # 3 hours
|
||||
3: b'\x03', # 6 hours
|
||||
4: b'\x04', # 11 hours (max)
|
||||
5: b'\x05' # 10 secs
|
||||
}
|
||||
return time_sec.get(arg, b'\x00') # default is 10 secs
|
||||
|
||||
def get_sleep_dur(arg):
|
||||
time_sec = {
|
||||
10: b'\xFF', # 10 secs
|
||||
3: b'\x03', # 3 secs
|
||||
20: b'\x20', # 20 secs
|
||||
30: b'\x30', # 30 secs
|
||||
11: b'\x11', # 1 hour
|
||||
33: b'\x33', # 3 hours
|
||||
66: b'\x66', # 6 hours
|
||||
1111: b'\xBB' # 11 hours (max)
|
||||
}
|
||||
return time_sec.get(arg, b'\x00') # default is 5 secs
|
||||
|
||||
def change_adv(adv_int, adv_dur, sleep_dur):
|
||||
from network import Bluetooth
|
||||
bt = Bluetooth()
|
||||
bt.start_scan(10)
|
||||
last_time = time.time()
|
||||
print("scanning |", end="")
|
||||
while (time.time()-last_time)<10:
|
||||
print("\b\\", end="")
|
||||
time.sleep(0.050)
|
||||
print("\b-", end="")
|
||||
time.sleep(0.050)
|
||||
print("\b|", end="")
|
||||
time.sleep(0.050)
|
||||
print("\b/", end="")
|
||||
time.sleep(0.050)
|
||||
adv = bt.get_adv()
|
||||
if adv and bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL) == 'PyNode Air':
|
||||
try:
|
||||
conn = bt.connect(adv.mac)
|
||||
services = conn.services()
|
||||
for service in services:
|
||||
u = service.uuid()
|
||||
time.sleep(0.50)
|
||||
if isinstance(u, int):
|
||||
pass
|
||||
else:
|
||||
if ubinascii.hexlify(u) == b'595a08e4862a9e8fe911bc7c7c464218':
|
||||
chars = service.characteristics()
|
||||
for char in chars:
|
||||
c_uuid = char.uuid()
|
||||
descriptor = char.read_descriptor(0x2901)
|
||||
print("")
|
||||
print(descriptor.decode('utf-8'),end=" ")
|
||||
c_hex = ubinascii.hexlify(c_uuid)
|
||||
if c_hex == b'23ee8d0ce1f04a0cb325dc536a68862d': # sleep duration
|
||||
print("is changed to:", sleep_dur,"sec")
|
||||
value = get_sleep_dur(sleep_dur)
|
||||
char.write(value)
|
||||
elif c_hex == b'25ee8d0ce1f04a0cb325dc536a68862d': # advertising duration
|
||||
print("is changed to:", adv_dur,"sec")
|
||||
value = get_adv_dur(adv_dur)
|
||||
char.write(value)
|
||||
elif c_hex == b'22ee8d0ce1f04a0cb325dc536a68862d': # advertising interval
|
||||
print("is changed to:", adv_int,"ms")
|
||||
value = get_adv_int(adv_int)
|
||||
char.write(value)
|
||||
else:
|
||||
print(" ")
|
||||
conn.disconnect()
|
||||
break
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if conn:
|
||||
conn.disconnect()
|
||||
bt.deinit()
|
||||
print("Error while connecting or reading from the BLE device")
|
||||
break
|
||||
else:
|
||||
time.sleep(0.050)
|
||||
```
|
||||
241
content/tutorials/pynode/pynodeairq.md
Normal file
241
content/tutorials/pynode/pynodeairq.md
Normal file
@@ -0,0 +1,241 @@
|
||||
---
|
||||
title: "PyNode+ AirQ Examples"
|
||||
aliases:
|
||||
- tutorials/pynode/PyNodeAirQ.html
|
||||
- tutorials/pynode/pynodeairq.md
|
||||
- chapter/tutorials/pynode/PyNodeAirQ
|
||||
---
|
||||
|
||||
You can use the following code to query the PyNode+ AirQ sensor from MicroPython
|
||||
|
||||
```python
|
||||
from network import Bluetooth
|
||||
import time
|
||||
import ubinascii
|
||||
import struct
|
||||
import math
|
||||
|
||||
bt = Bluetooth()
|
||||
bt.start_scan(-1)
|
||||
|
||||
def twoscmp(value):
|
||||
if value > 128:
|
||||
value = value - 256
|
||||
return value
|
||||
|
||||
def byte_to_info(uuid):
|
||||
gas_res_d = 0
|
||||
name = uuid[0:3]
|
||||
name_text = ''.join(chr(t) for t in name)
|
||||
if name_text == "PyN":
|
||||
sensor_id = uuid[7]
|
||||
mac = ubinascii.hexlify(uuid[10:16])
|
||||
press = ubinascii.hexlify(uuid[8:10])
|
||||
press_d = int(press, 16)
|
||||
gas_res = ubinascii.hexlify(uuid[3:7])
|
||||
gas_res_d = int(gas_res, 16)
|
||||
print("{} {} BLE_MAC: {}, Pressure: {} hPa, Gas resistance: {} ohm".format(name_text, sensor_id, mac, press_d, gas_res_d), end=", ")
|
||||
return (name_text,gas_res_d)
|
||||
|
||||
def air_quality_score(hum, gas_res):
|
||||
gas_reference = 250000
|
||||
hum_reference = 40
|
||||
gas_lower_limit = 5000
|
||||
gas_upper_limit = 50000
|
||||
if (hum >= 38 and hum <= 42):
|
||||
hum_score = 0.25*100
|
||||
else:
|
||||
if (hum < 38):
|
||||
hum_score = 0.25/hum_reference*hum*100
|
||||
else:
|
||||
hum_score = ((-0.25/(100-hum_reference)*hum)+0.416666)*100
|
||||
if (gas_reference > gas_upper_limit):
|
||||
gas_reference = gas_upper_limit
|
||||
if (gas_reference < gas_lower_limit):
|
||||
gas_reference = gas_lower_limit
|
||||
gas_score = (0.75/(gas_upper_limit-gas_lower_limit)*gas_reference -(gas_lower_limit*(0.75/(gas_upper_limit-gas_lower_limit))))*100
|
||||
air_quality_score = hum_score + gas_score
|
||||
|
||||
print("IAQ score:", air_quality_score)
|
||||
|
||||
print("Air quality is", end=" ")
|
||||
air_quality_score = (100-air_quality_score)*5
|
||||
if (air_quality_score >= 301):
|
||||
print("Hazardous")
|
||||
elif (air_quality_score >= 201 and air_quality_score <= 300 ):
|
||||
print("Very Unhealthy")
|
||||
elif (air_quality_score >= 176 and air_quality_score <= 200 ):
|
||||
print("Unhealthy")
|
||||
elif (air_quality_score >= 151 and air_quality_score <= 175 ):
|
||||
print("Unhealthy for Sensitive Groups")
|
||||
elif (air_quality_score >= 51 and air_quality_score <= 150 ):
|
||||
print("Moderate")
|
||||
elif (air_quality_score >= 00 and air_quality_score <= 50 ):
|
||||
print("Good")
|
||||
|
||||
while True:
|
||||
adv = bt.get_adv()
|
||||
if adv: # and adv.rssi>-80:# and ubinascii.hexlify(adv.mac)==b'cd9e13c0f24a':
|
||||
read_adv = bt.resolve_adv_data(adv.data, Bluetooth.ADV_MANUFACTURER_DATA)
|
||||
if read_adv==None:
|
||||
pass
|
||||
else:
|
||||
manuf = ubinascii.hexlify(read_adv)
|
||||
manuf_data = ubinascii.hexlify(read_adv[0:4])
|
||||
# print(manuf_data)
|
||||
if (manuf_data == b'4c000215') :#or (manuf_data == b'd2000215')):# company id=d2 is Dialog, b'4c000215' is Apple's id and it implies ibeacon
|
||||
# print("mac:", ubinascii.hexlify(adv.mac))
|
||||
uuid_raw = read_adv[4:20]
|
||||
uuid = ubinascii.hexlify(uuid_raw)
|
||||
name, air=byte_to_info(uuid_raw)
|
||||
if name == "PyN":
|
||||
print("rssi:",adv.rssi)
|
||||
major = ubinascii.hexlify(read_adv[20:22])
|
||||
minor = ubinascii.hexlify(read_adv[22:24])
|
||||
tx_power = ubinascii.hexlify(read_adv[24:25])
|
||||
tx_power_real = twoscmp(int(tx_power, 16))
|
||||
major_int = int(major, 16)
|
||||
major_f = major_int/100 # bme688
|
||||
minor_int = int(minor,16)
|
||||
minor_f = minor_int/100 # bme688, it is divided by 10 initially in the dialog's firmware.
|
||||
print("Temperature: {} C, Humidity: {} %r.H.".format(major_f, minor_f), time.time())
|
||||
air_quality_score(minor_f, air)
|
||||
print("")
|
||||
else:
|
||||
time.sleep(0.050)
|
||||
```
|
||||
|
||||
|
||||
The below code can be used to modify the sleep duration.
|
||||
|
||||
```python
|
||||
import time
|
||||
import ubinascii
|
||||
import struct
|
||||
|
||||
def get_adv_int(arg):
|
||||
time_msec = {
|
||||
'100ms': b'\x01', # 100ms
|
||||
'200ms': b'\x02', # 200ms
|
||||
'500ms': b'\x05', # 500ms
|
||||
'1000ms': b'\x10' # 1000ms
|
||||
}
|
||||
return time_msec.get(arg, b'\x00') # default is 1000ms
|
||||
|
||||
def get_adv_dur(time_value, scale):
|
||||
calc_time = bytes([0])
|
||||
if scale == 'ms':
|
||||
if time_value >= 100 and time_value < 1000:
|
||||
calc_time = bytes([(int(time_value/100))])
|
||||
elif scale == 'sec':
|
||||
if time_value >= 1 and time_value < 10:
|
||||
calc_time = bytes([(int(time_value) | 0x10)])
|
||||
elif scale == 'sec':
|
||||
if time_value >= 10 and time_value < 60:
|
||||
calc_time = bytes([(int(time_value/10) | 0x20)])
|
||||
return calc_time
|
||||
|
||||
def get_sleep_dur(time_value, scale):
|
||||
calc_time = bytes([0])
|
||||
if scale == 'ms':
|
||||
if time_value >= 100 and time_value < 1000:
|
||||
calc_time = bytes([(int(time_value/100))])
|
||||
elif scale == 'sec':
|
||||
if time_value >= 1 and time_value < 10:
|
||||
calc_time = bytes([(int(time_value) | 0x10)])
|
||||
elif scale == 'sec':
|
||||
if time_value >= 10 and time_value < 60:
|
||||
calc_time = bytes([(int(time_value/10) | 0x20)])
|
||||
elif scale == 'min':
|
||||
if time_value >= 1 and time_value < 6:
|
||||
calc_time = bytes([(int(time_value) | 0xA0)])
|
||||
elif scale == 'min':
|
||||
if time_value >= 10 and time_value < 60:
|
||||
calc_time = bytes([(int(time_value/10) | 0xB0)])
|
||||
elif scale == 'hour':
|
||||
if time_value >= 1 and time_value < 12:
|
||||
calc_time = bytes([(time_value | 0xC0)])
|
||||
return calc_time
|
||||
|
||||
def measurement_count(arg):
|
||||
count = bytes([arg])
|
||||
return count
|
||||
|
||||
def twoscmp(value):
|
||||
if value > 128:
|
||||
value = value - 256
|
||||
return value
|
||||
|
||||
def change_adv(adv_int, adv_dur, sleep_dur, meas_count):
|
||||
from network import Bluetooth
|
||||
bt = Bluetooth()
|
||||
bt.start_scan(-1)
|
||||
last_time = time.time()
|
||||
print("scanning |", end="")
|
||||
while (time.time()-last_time)<20:
|
||||
print("\b\\", end="")
|
||||
time.sleep(0.010)
|
||||
print("\b-", end="")
|
||||
time.sleep(0.010)
|
||||
print("\b|", end="")
|
||||
time.sleep(0.010)
|
||||
print("\b/", end="")
|
||||
time.sleep(0.010)
|
||||
adv = bt.get_adv()
|
||||
if adv and bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL) == 'PyNode+ Air Quality':
|
||||
print("Found!")
|
||||
try:
|
||||
print(adv.mac)
|
||||
conn = bt.connect(adv.mac)
|
||||
print(adv.mac)
|
||||
services = conn.services()
|
||||
for service in services:
|
||||
u = service.uuid()
|
||||
time.sleep(1)
|
||||
if isinstance(u, int):
|
||||
pass
|
||||
else:
|
||||
if ubinascii.hexlify(u) == b'595a08e4862a9e8fe911bc7c7c464218':
|
||||
chars = service.characteristics()
|
||||
for char in chars:
|
||||
c_uuid = char.uuid()
|
||||
descriptor = char.read_descriptor(0x2901)
|
||||
print("")
|
||||
print(descriptor.decode('utf-8'),end=" ")
|
||||
c_hex = ubinascii.hexlify(c_uuid)
|
||||
if c_hex == b'23ee8d0ce1f04a0cb325dc536a68862d': # sleep duration
|
||||
print("is changed to:", sleep_dur[0],sleep_dur[1])
|
||||
value = get_sleep_dur(sleep_dur[0], sleep_dur[1])
|
||||
char.write(value)
|
||||
elif c_hex == b'25ee8d0ce1f04a0cb325dc536a68862d': # advertising duration
|
||||
print("is changed to:", adv_dur[0], adv_dur[1])
|
||||
value = get_adv_dur(adv_dur[0], adv_dur[1])
|
||||
char.write(value)
|
||||
elif c_hex == b'22ee8d0ce1f04a0cb325dc536a68862d': # advertising interval
|
||||
print("is changed to:", adv_int)
|
||||
value = get_adv_int(adv_int)
|
||||
char.write(value)
|
||||
elif c_hex == b'27ee8d0ce1f04a0cb325dc536a68862d': # measurement count
|
||||
print("is changed to:", (meas_count*5))
|
||||
value = measurement_count(meas_count)
|
||||
char.write(value)
|
||||
else:
|
||||
print(" ")
|
||||
conn.disconnect()
|
||||
break
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if conn:
|
||||
conn.disconnect()
|
||||
bt.deinit()
|
||||
print("Error while connecting or reading from the BLE device")
|
||||
break
|
||||
else:
|
||||
time.sleep(0.050)
|
||||
|
||||
# try 5 secs sleep
|
||||
# change_adv("100ms",[1,"sec"],[1,"min"], 1)
|
||||
|
||||
|
||||
change_adv("100ms",[200,"ms"],[100,"ms"], 255) # 01, 02, 01, 255
|
||||
```
|
||||
Reference in New Issue
Block a user