mirror of
https://github.com/sascha-hemi/pycom-documentation.git
synced 2026-03-21 11:06:37 +01:00
GitBook: [master] 324 pages modified
This commit is contained in:
committed by
gitbook-bot
parent
13e99f7fde
commit
a986dbba9b
4
tutorials/all/README.md
Normal file
4
tutorials/all/README.md
Normal file
@@ -0,0 +1,4 @@
|
||||
# All Pycom Device Examples
|
||||
|
||||
This section contains generic examples that will work across all Pycom devices and Expansion Boards.
|
||||
|
||||
35
tutorials/all/adc.md
Normal file
35
tutorials/all/adc.md
Normal file
@@ -0,0 +1,35 @@
|
||||
# ADC
|
||||
|
||||
This example is a simple ADC sample. For more information please see [`ADC`](../../firmwareapi/pycom/machine/adc.md).
|
||||
|
||||
```python
|
||||
from machine import ADC
|
||||
adc = ADC(0)
|
||||
adc_c = adc.channel(pin='P13')
|
||||
adc_c()
|
||||
adc_c.value()
|
||||
```
|
||||
|
||||
## Calibration
|
||||
|
||||
Currently the ESP32's ADC is not calibrated from the factory. This means it must be calibrated each time you wish to use it. To do this you must firstly measure the internal voltage reference. The following code will connect the 1.1v reference to `P22`
|
||||
|
||||
```python
|
||||
from machine import ADC
|
||||
adc = ADC()
|
||||
|
||||
# Output Vref of P22
|
||||
adc.vref_to_pin('P22')
|
||||
```
|
||||
|
||||
Now that the voltage reference is externally accessible you should measure it with the most accurate voltmeter you have access to. Note down the reading in millivolts, e.g. `1120`. To disconnect the 1.1v reference from `P22` please reset your module. You can now calibrate the ADC by telling it the true value of the internal reference. You should then check your calibration by connecting the ADC to a known voltage source.
|
||||
|
||||
```python
|
||||
# Set calibration - see note above
|
||||
adc.vref(1100)
|
||||
|
||||
# Check calibration by reading a known voltage
|
||||
adc_c = adc.channel(pin='P16', attn=ADC.ATTN_11DB)
|
||||
print(adc_c.voltage())
|
||||
```
|
||||
|
||||
198
tutorials/all/aws.md
Normal file
198
tutorials/all/aws.md
Normal file
@@ -0,0 +1,198 @@
|
||||
# AWS
|
||||
|
||||
The AWS IoT platform enables devices to connect to the Amazon cloud and lets applications in the cloud interact with Internet-connected things. Common IoT applications either collect and process telemetry from devices or enable users to control a device remotely. Things report their state by publishing messages, in JSON format, on MQTT topics.
|
||||
|
||||
For more information see this [PDF File](http://docs.aws.amazon.com/iot/latest/developerguide/iot-dg.pdf).
|
||||
|
||||
## Getting Started with AWS IoT
|
||||
|
||||
### Creating the message broker \(Amazon website\):
|
||||
|
||||
* Sign in to the [AWS Management Console](https://aws.amazon.com/console/)
|
||||
* Navigate to the IoT Console by clicking on the [AWS IoT link](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-1.png)
|
||||
* In the left navigation pane, choose [Register/Manage](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-2.png)
|
||||
* Click on the create button, give your [device a name and press create](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-3.png)
|
||||
* Click on the device that has been created
|
||||
* On the Details page, in the left navigation pane, choose [Security](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-4.png)
|
||||
* On the Certificates page, choose Create certificate
|
||||
* Download all the certificates, then press the Activate and the Attach a Policy buttons. [See image](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-5.png)
|
||||
* Click on the Create New Policy button
|
||||
* On the [Create Policy](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-6.png) page, choose a policy name and the actions to authorise.
|
||||
* Go to the certificates page, click on the three dots of your certificate and attach the policy to the certificate as shown in the [diagram](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-7.png)
|
||||
|
||||
### Setting up the device \(Pycom device\):
|
||||
|
||||
* Download the latest sample code from the Pycom [GitHub Repository](https://github.com/pycom/aws-pycom).
|
||||
* Connect to the device via FTP and put the root CA certificate, the client certificate \(`*.pem.crt`\) and the private key \(`*.private.pem.key`\) in the `/flash/cert` folder.
|
||||
* Update the config file with your WiFi settings, the [AWS Host](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-8.png) and the certificate paths.
|
||||
* Put the `config.py` and the `main.py` in the device flash
|
||||
|
||||
### Configuration \(`config.py`\):
|
||||
|
||||
This file contains the WiFi, certificate paths and application specific settings that need to be updated by the user.
|
||||
|
||||
```python
|
||||
# WiFi configuration
|
||||
WIFI_SSID = 'my_wifi_ssid'
|
||||
WIFI_PASS = 'my_wifi_password'
|
||||
|
||||
# AWS general configuration
|
||||
AWS_PORT = 8883
|
||||
AWS_HOST = 'aws_host_url'
|
||||
AWS_ROOT_CA = '/flash/cert/aws_root.ca'
|
||||
AWS_CLIENT_CERT = '/flash/cert/aws_client.cert'
|
||||
AWS_PRIVATE_KEY = '/flash/cert/aws_private.key'
|
||||
|
||||
################## Subscribe / Publish client #################
|
||||
CLIENT_ID = 'PycomPublishClient'
|
||||
TOPIC = 'PublishTopic'
|
||||
OFFLINE_QUEUE_SIZE = -1
|
||||
DRAINING_FREQ = 2
|
||||
CONN_DISCONN_TIMEOUT = 10
|
||||
MQTT_OPER_TIMEOUT = 5
|
||||
LAST_WILL_TOPIC = 'PublishTopic'
|
||||
LAST_WILL_MSG = 'To All: Last will message'
|
||||
|
||||
####################### Shadow updater ########################
|
||||
#THING_NAME = "my thing name"
|
||||
#CLIENT_ID = "ShadowUpdater"
|
||||
#CONN_DISCONN_TIMEOUT = 10
|
||||
#MQTT_OPER_TIMEOUT = 5
|
||||
|
||||
####################### Delta Listener ########################
|
||||
#THING_NAME = "my thing name"
|
||||
#CLIENT_ID = "DeltaListener"
|
||||
#CONN_DISCONN_TIMEOUT = 10
|
||||
#MQTT_OPER_TIMEOUT = 5
|
||||
|
||||
####################### Shadow Echo ########################
|
||||
#THING_NAME = "my thing name"
|
||||
#CLIENT_ID = "ShadowEcho"
|
||||
#CONN_DISCONN_TIMEOUT = 10
|
||||
#MQTT_OPER_TIMEOUT = 5
|
||||
```
|
||||
|
||||
### Subscibe / Publish \(`main.py`\)
|
||||
|
||||
To subscribe to a topic:
|
||||
|
||||
* Go to the AWS Iot page, click on manage and choose your device
|
||||
* From the left hand side, choose Activity and then click MQTT client.
|
||||
* Choose the [topic name](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-9.png) you entered in the configuration file.
|
||||
* Messages should be published as shown in the [diagram](https://github.com/pycom/pycom-docs/tree/37661883902849b1a931ee273a23ae8e0f3d773e/img/aws-10.png)
|
||||
|
||||
```python
|
||||
# user specified callback function
|
||||
def customCallback(client, userdata, message):
|
||||
print("Received a new message: ")
|
||||
print(message.payload)
|
||||
print("from topic: ")
|
||||
print(message.topic)
|
||||
print("--------------\n\n")
|
||||
|
||||
# configure the MQTT client
|
||||
pycomAwsMQTTClient = AWSIoTMQTTClient(config.CLIENT_ID)
|
||||
pycomAwsMQTTClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
|
||||
pycomAwsMQTTClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
|
||||
|
||||
pycomAwsMQTTClient.configureOfflinePublishQueueing(config.OFFLINE_QUEUE_SIZE)
|
||||
pycomAwsMQTTClient.configureDrainingFrequency(config.DRAINING_FREQ)
|
||||
pycomAwsMQTTClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
|
||||
pycomAwsMQTTClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
|
||||
pycomAwsMQTTClient.configureLastWill(config.LAST_WILL_TOPIC, config.LAST_WILL_MSG, 1)
|
||||
|
||||
#Connect to MQTT Host
|
||||
if pycomAwsMQTTClient.connect():
|
||||
print('AWS connection succeeded')
|
||||
|
||||
# Subscribe to topic
|
||||
pycomAwsMQTTClient.subscribe(config.TOPIC, 1, customCallback)
|
||||
time.sleep(2)
|
||||
|
||||
# Send message to host
|
||||
loopCount = 0
|
||||
while loopCount < 8:
|
||||
pycomAwsMQTTClient.publish(config.TOPIC, "New Message " + str(loopCount), 1)
|
||||
loopCount += 1
|
||||
time.sleep(5.0)
|
||||
```
|
||||
|
||||
### Shadow updater \(`main.py`\)
|
||||
|
||||
```python
|
||||
# user specified callback functions
|
||||
def customShadowCallback_Update(payload, responseStatus, token):
|
||||
if responseStatus == "timeout":
|
||||
print("Update request " + token + " time out!")
|
||||
if responseStatus == "accepted":
|
||||
payloadDict = json.loads(payload)
|
||||
print("Update request with token: " + token + " accepted!")
|
||||
print("property: " + str(payloadDict["state"]["desired"]["property"]))
|
||||
if responseStatus == "rejected":
|
||||
print("Update request " + token + " rejected!")
|
||||
|
||||
def customShadowCallback_Delete(payload, responseStatus, token):
|
||||
if responseStatus == "timeout":
|
||||
print("Delete request " + token + " time out!")
|
||||
if responseStatus == "accepted":
|
||||
print("Delete request with token: " + token + " accepted!")
|
||||
if responseStatus == "rejected":
|
||||
print("Delete request " + token + " rejected!")
|
||||
|
||||
# configure the MQTT client
|
||||
pycomAwsMQTTShadowClient = AWSIoTMQTTShadowClient(config.CLIENT_ID)
|
||||
pycomAwsMQTTShadowClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
|
||||
pycomAwsMQTTShadowClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
|
||||
|
||||
pycomAwsMQTTShadowClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
|
||||
pycomAwsMQTTShadowClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
|
||||
|
||||
# Connect to MQTT Host
|
||||
if pycomAwsMQTTShadowClient.connect():
|
||||
print('AWS connection succeeded')
|
||||
|
||||
deviceShadowHandler = pycomAwsMQTTShadowClient.createShadowHandlerWithName(config.THING_NAME, True)
|
||||
|
||||
# Delete shadow JSON doc
|
||||
deviceShadowHandler.shadowDelete(customShadowCallback_Delete, 5)
|
||||
|
||||
# Update shadow in a loop
|
||||
loopCount = 0
|
||||
while True:
|
||||
JSONPayload = '{"state":{"desired":{"property":' + str(loopCount) + '}}}'
|
||||
deviceShadowHandler.shadowUpdate(JSONPayload, customShadowCallback_Update, 5)
|
||||
loopCount += 1
|
||||
time.sleep(5)
|
||||
```
|
||||
|
||||
### Delta Listener \(`main.py`\)
|
||||
|
||||
```python
|
||||
# Custom Shadow callback
|
||||
def customShadowCallback_Delta(payload, responseStatus, token):
|
||||
payloadDict = json.loads(payload)
|
||||
print("property: " + str(payloadDict["state"]["property"]))
|
||||
print("version: " + str(payloadDict["version"]))
|
||||
|
||||
# configure the MQTT client
|
||||
pycomAwsMQTTShadowClient = AWSIoTMQTTShadowClient(config.CLIENT_ID)
|
||||
pycomAwsMQTTShadowClient.configureEndpoint(config.AWS_HOST, config.AWS_PORT)
|
||||
pycomAwsMQTTShadowClient.configureCredentials(config.AWS_ROOT_CA, config.AWS_PRIVATE_KEY, config.AWS_CLIENT_CERT)
|
||||
|
||||
pycomAwsMQTTShadowClient.configureConnectDisconnectTimeout(config.CONN_DISCONN_TIMEOUT)
|
||||
pycomAwsMQTTShadowClient.configureMQTTOperationTimeout(config.MQTT_OPER_TIMEOUT)
|
||||
|
||||
# Connect to MQTT Host
|
||||
if pycomAwsMQTTShadowClient.connect():
|
||||
print('AWS connection succeeded')
|
||||
|
||||
deviceShadowHandler = pycomAwsMQTTShadowClient.createShadowHandlerWithName(config.THING_NAME, True)
|
||||
|
||||
# Listen on deltas
|
||||
deviceShadowHandler.shadowRegisterDeltaCallback(customShadowCallback_Delta)
|
||||
|
||||
# Loop forever
|
||||
while True:
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
111
tutorials/all/ble.md
Normal file
111
tutorials/all/ble.md
Normal file
@@ -0,0 +1,111 @@
|
||||
# Bluetooth
|
||||
|
||||
At present, basic BLE functionality is available. More features will be implemented in the near future, such as pairing. This page will be updated in line with these features.
|
||||
|
||||
Full info on `bluetooth` can be found within [Bluetooth page](../../firmwareapi/pycom/network/bluetooth/) of the Firmware API Reference.
|
||||
|
||||
## Scan for BLE Devices
|
||||
|
||||
Scan for all of the advertising devices within range of the scanning device.
|
||||
|
||||
```python
|
||||
bluetooth.start_scan(10) # starts scanning and stop after 10 seconds
|
||||
bluetooth.start_scan(-1) # starts scanning indefinitely until bluetooth.stop_scan() is called
|
||||
```
|
||||
|
||||
## Raw Data from a BLE Device
|
||||
|
||||
A quick usage example that scans and prints the raw data from advertisements.
|
||||
|
||||
```python
|
||||
from network import Bluetooth
|
||||
|
||||
bluetooth = Bluetooth()
|
||||
bluetooth.start_scan(-1) # start scanning with no timeout
|
||||
|
||||
while True:
|
||||
print(bluetooth.get_adv())
|
||||
```
|
||||
|
||||
## Connect to a BLE Device
|
||||
|
||||
Connecting to a device that is sending advertisements.
|
||||
|
||||
```python
|
||||
from network import Bluetooth
|
||||
import ubinascii
|
||||
bluetooth = Bluetooth()
|
||||
|
||||
# scan until we can connect to any BLE device around
|
||||
bluetooth.start_scan(-1)
|
||||
adv = None
|
||||
while True:
|
||||
adv = bluetooth.get_adv()
|
||||
if adv:
|
||||
try:
|
||||
bluetooth.connect(adv.mac)
|
||||
except:
|
||||
# start scanning again
|
||||
bluetooth.start_scan(-1)
|
||||
continue
|
||||
break
|
||||
print("Connected to device with addr = {}".format(ubinascii.hexlify(adv.mac)))
|
||||
```
|
||||
|
||||
## Connect to a BLE Device and Retrieve Data
|
||||
|
||||
Connecting to a device named 'Heart Rate' and receiving data from it’s services.
|
||||
|
||||
```python
|
||||
from network import Bluetooth
|
||||
import time
|
||||
bt = Bluetooth()
|
||||
bt.start_scan(-1)
|
||||
|
||||
while True:
|
||||
adv = bt.get_adv()
|
||||
if adv and bt.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL) == 'Heart Rate':
|
||||
try:
|
||||
conn = bt.connect(adv.mac)
|
||||
services = conn.services()
|
||||
for service in services:
|
||||
time.sleep(0.050)
|
||||
if type(service.uuid()) == bytes:
|
||||
print('Reading chars from service = {}'.format(service.uuid()))
|
||||
else:
|
||||
print('Reading chars from service = %x' % service.uuid())
|
||||
chars = service.characteristics()
|
||||
for char in chars:
|
||||
if (char.properties() & Bluetooth.PROP_READ):
|
||||
print('char {} value = {}'.format(char.uuid(), char.read()))
|
||||
conn.disconnect()
|
||||
break
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
time.sleep(0.050)
|
||||
```
|
||||
|
||||
## Retrieve the Name & Manufacturer from a BLE Device
|
||||
|
||||
Using `resolve_adv_data()` to attempt to retrieve the name and manufacturer data from the advertiser.
|
||||
|
||||
```python
|
||||
import ubinascii
|
||||
from network import Bluetooth
|
||||
bluetooth = Bluetooth()
|
||||
|
||||
bluetooth.start_scan(20)
|
||||
while bluetooth.isscanning():
|
||||
adv = bluetooth.get_adv()
|
||||
if adv:
|
||||
# try to get the complete name
|
||||
print(bluetooth.resolve_adv_data(adv.data, Bluetooth.ADV_NAME_CMPL))
|
||||
|
||||
mfg_data = bluetooth.resolve_adv_data(adv.data, Bluetooth.ADV_MANUFACTURER_DATA)
|
||||
|
||||
if mfg_data:
|
||||
# try to get the manufacturer data (Apple's iBeacon data is sent here)
|
||||
print(ubinascii.hexlify(mfg_data))
|
||||
```
|
||||
|
||||
28
tutorials/all/https.md
Normal file
28
tutorials/all/https.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# HTTPS
|
||||
|
||||
Basic connection using `ssl.wrap_socket()`.
|
||||
|
||||
```python
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
s = socket.socket()
|
||||
ss = ssl.wrap_socket(s)
|
||||
ss.connect(socket.getaddrinfo('www.google.com', 443)[0][-1])
|
||||
```
|
||||
|
||||
Below is an example using certificates with the blynk cloud.
|
||||
|
||||
Certificate was downloaded from the blynk examples [folder](https://github.com/wipy/wipy/tree/master/examples/blynk) and placed in `/flash/cert/` on the device.
|
||||
|
||||
```python
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
s = socket.socket()
|
||||
ss = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED, ca_certs='/flash/cert/ca.pem')
|
||||
ss.connect(socket.getaddrinfo('cloud.blynk.cc', 8441)[0][-1])
|
||||
```
|
||||
|
||||
For more info, check the [`ssl`](../../firmwareapi/micropython/ussl.md) module in the API reference.
|
||||
|
||||
94
tutorials/all/i2c.md
Normal file
94
tutorials/all/i2c.md
Normal file
@@ -0,0 +1,94 @@
|
||||
# I2C
|
||||
|
||||
The following example receives data from a light sensor using I2C. Sensor used is the BH1750FVI Digital Light Sensor.
|
||||
|
||||
```python
|
||||
import time
|
||||
from machine import I2C
|
||||
import bh1750fvi
|
||||
|
||||
i2c = I2C(0, I2C.MASTER, baudrate=100000)
|
||||
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
|
||||
|
||||
while(True):
|
||||
data = light_sensor.read()
|
||||
print(data)
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
## Drivers for the BH1750FVI
|
||||
|
||||
Place this sample code into a file named `bh1750fvi.py`. This can then be imported as a library.
|
||||
|
||||
```python
|
||||
# Simple driver for the BH1750FVI digital light sensor
|
||||
|
||||
class BH1750FVI:
|
||||
MEASUREMENT_TIME = const(120)
|
||||
|
||||
def __init__(self, i2c, addr=0x23, period=150):
|
||||
self.i2c = i2c
|
||||
self.period = period
|
||||
self.addr = addr
|
||||
self.time = 0
|
||||
self.value = 0
|
||||
self.i2c.writeto(addr, bytes([0x10])) # start continuos 1 Lux readings every 120ms
|
||||
|
||||
def read(self):
|
||||
self.time += self.period
|
||||
if self.time >= MEASUREMENT_TIME:
|
||||
self.time = 0
|
||||
data = self.i2c.readfrom(self.addr, 2)
|
||||
self.value = (((data[0] << 8) + data[1]) * 1200) // 1000
|
||||
return self.value
|
||||
```
|
||||
|
||||
## Light sensor and LoRa
|
||||
|
||||
This is the same code, with added LoRa connectivity, sending the lux value from the light sensor to another LoRa enabled device.
|
||||
|
||||
```python
|
||||
import socket
|
||||
import time
|
||||
import pycom
|
||||
import struct
|
||||
from network import LoRa
|
||||
from machine import I2C
|
||||
import bh1750fvi
|
||||
|
||||
LORA_PKG_FORMAT = "!BH"
|
||||
LORA_CONFIRM_FORMAT = "!BB"
|
||||
|
||||
DEVICE_ID = 1
|
||||
|
||||
pycom.heartbeat(False)
|
||||
|
||||
lora = LoRa(mode=LoRa.LORA, tx_iq=True, region=LoRa.EU868)
|
||||
lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
|
||||
lora_sock.setblocking(False)
|
||||
|
||||
i2c = I2C(0, I2C.MASTER, baudrate=100000)
|
||||
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
|
||||
|
||||
while(True):
|
||||
msg = struct.pack(LORA_PKG_FORMAT, DEVICE_ID, light_sensor.read())
|
||||
lora_sock.send(msg)
|
||||
|
||||
pycom.rgbled(0x150000)
|
||||
|
||||
wait = 5
|
||||
while (wait > 0):
|
||||
wait = wait - 0.1
|
||||
time.sleep(0.1)
|
||||
recv_data = lora_sock.recv(64)
|
||||
|
||||
if (len (recv_data) >= 2):
|
||||
status, device_id = struct.unpack(LORA_CONFIRM_FORMAT, recv_data)
|
||||
|
||||
if (device_id == DEVICE_ID and status == 200):
|
||||
pycom.rgbled(0x001500)
|
||||
wait = 0
|
||||
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
133
tutorials/all/modbus.md
Normal file
133
tutorials/all/modbus.md
Normal file
@@ -0,0 +1,133 @@
|
||||
# Modbus
|
||||
|
||||
Modbus is a messaging protocol that defines the packet structure for transferring data between devices in a master/slave architecture. The protocol is independent of the transmission medium and is usually transmitted over TCP \(MODBUS TCP\) or serial communication \(MODBUS RTU\). Modbus is intended as a request/reply protocol and delivers services specified by function codes. The function code in the request tells the addressed slave what kind of action to perform. The function codes most commonly supported by devices are listed below.
|
||||
|
||||
| Function Name | Function Code |
|
||||
| :--- | :--- |
|
||||
| Read Coils | 0x01 |
|
||||
| Read Discrete Inputs | 0x02 |
|
||||
| Read Holding Registers | 0x03 |
|
||||
| Read Input Registers | 0x04 |
|
||||
| Write Single Coil | 0x05 |
|
||||
| Write Single Register | 0x06 |
|
||||
| Write Multiple Coils | 0x0F |
|
||||
| Write Multiple Registers | 0x10 |
|
||||
|
||||
For more information on the MODBUS RTU see the following [PDF File](http://www.modbus.org/docs/Modbus_over_serial_line_V1_02.pdf). Information on the MODBUS TCP can be found [here](http://www.modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf).
|
||||
|
||||
## Pycom Modbus Library
|
||||
|
||||
Python libraries and sample code that support Modbus TCP and Modbus RTU are available at the following [GitHub Repository](https://github.com/pycom/pycom-modbus). To use this library, connect to the target Pycom device via ftp and upload the uModbus folder to `/flash`. A description of the supported function codes is found below.
|
||||
|
||||
### Read Coils
|
||||
|
||||
This function code requests the status \(ON/OFF\) of discrete coils on a remote device. The slave device address, the address of the first coil and the number of coils must be specified in the request. The address of the first coil is 0 and a maximum of 2000 contiguous coils can be read. Python sample code is shown below.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
coil_quantity=100
|
||||
|
||||
coil_status = modbus_obj.read_coils(slave_addr, starting_address, coil_quantity)
|
||||
print('Coil status: ' + ' '.join('{:d}'.format(x) for x in coil_status))
|
||||
```
|
||||
|
||||
### Read Discrete Inputs
|
||||
|
||||
This command is used to read the status \(ON/OFF\) of discrete inputs on a remote device. The slave address, the address of the first input, and the quantity of inputs to be read must be specified. The address of the first input is 0 and a maximum of 2000 continuous inputs can be read. The Python sample code is shown below.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
starting_address=0x0
|
||||
input_quantity=100
|
||||
|
||||
input_status = modbus_obj.read_discrete_inputs(slave_addr, starting_address, input_quantity)
|
||||
print('Input status: ' + ' '.join('{:d}'.format(x) for x in input_status))
|
||||
```
|
||||
|
||||
### Read Holding Registers
|
||||
|
||||
This function code is used to read the contents of analogue output holding registers. The slave address, the starting register address, the number of registers to read and the sign of the data must be specified. Register addresses start at 0 and a maximum of 125 continuous registers can be read.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
register_quantity=100
|
||||
signed=True
|
||||
|
||||
register_value = modbus_obj.read_holding_registers(slave_addr, starting_address, register_quantity, signed)
|
||||
print('Holding register value: ' + ' '.join('{:d}'.format(x) for x in register_value))
|
||||
```
|
||||
|
||||
### Read Input Registers
|
||||
|
||||
This command is used to read up to 125 continuous input registers on a remote device. The slave address, the starting register address, the number of input registers and the sign of the data must be specified. The address of the first input registers is 0.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
register_quantity=100
|
||||
signed=True
|
||||
|
||||
register_value = modbus_obj.read_input_registers(slave_addr, starting_address, register_quantity, signed)
|
||||
print('Input register value: ' + ' '.join('{:d}'.format(x) for x in register_value))
|
||||
```
|
||||
|
||||
### Write Single Coil
|
||||
|
||||
This function code is used to write the state of a discrete coil on a remote device. A value of `0xFF00` means the coil should be set to ON, while a value of `0x0000` means the coil should be set to OFF. The Python sample code to set the coil at address `0x00`, to an ON state is shown below.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
output_address=0x00
|
||||
output_value=0xFF00
|
||||
|
||||
return_flag = modbus_obj.write_single_coil(slave_addr, output_address, output_value)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing single coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Single Register
|
||||
|
||||
This command is used to write the contents of an analog output holding register on a remote device. The slave address, the register address, the register value, and the signature of the data must be specified. As for all the other commands, the register addresses start from 0.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
register_address=0x01
|
||||
register_value=-32768
|
||||
signed=True
|
||||
|
||||
return_flag = modbus_obj.write_single_register(slave_addr, register_address, register_value, signed)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing single coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Multiple Coils
|
||||
|
||||
This function code is used to set a continuous sequence of coils, in a remote device, to either ON or OFF. The slave address, the starting address of the coils and an array with the coil states must be specified.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
output_values=[1,1,1,0,0,1,1,1,0,0,1,1,1]
|
||||
|
||||
return_flag = modbus_obj.write_multiple_coils(slave_addr, starting_address, output_values)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing multiple coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Multiple Registers
|
||||
|
||||
This command is used to write the contents of a continuous sequence of analogue registers on a remote device. The slave address, the starting register address, the register values, and the signature of the data must be specified. The address of the first register is 0 and a maximum of 125 register values can be written. The Python sample code is shown below.
|
||||
|
||||
```python
|
||||
slave_addr=0x0A
|
||||
register_address=0x01
|
||||
register_values=[2, -4, 6, -256, 1024]
|
||||
signed=True
|
||||
|
||||
return_flag = modbus_obj.write_multiple_registers(slave_addr, register_address, register_values, signed)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing multiple register status: ' + output_flag)
|
||||
```
|
||||
|
||||
43
tutorials/all/mqtt.md
Normal file
43
tutorials/all/mqtt.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# MQTT
|
||||
|
||||
MQTT is a lightweight messaging protocol that is ideal for sending small packets of data to and from IoT devices via WiFi.
|
||||
|
||||
The broker used in this example is the [IO Adafruit](https://io.adafruit.com) platform, which is free and allows for tinkering with MQTT.
|
||||
|
||||
Visit [IO Adafruit](https://io.adafruit.com) and create an account. You'll need to get hold of an API Key as well as your credentials. Visit this [guide](https://learn.adafruit.com/adafruit-io/mqtt-api) for more information about MQTT and how to use it with Adafruit's Broker.
|
||||
|
||||
This example will send a message to a topic on the Adafruit MQTT broker and then also subscribe to the same topic, in order to show how to use the subscribe functionality.
|
||||
|
||||
```python
|
||||
from mqtt import MQTTClient
|
||||
from network import WLAN
|
||||
import machine
|
||||
import time
|
||||
|
||||
def sub_cb(topic, msg):
|
||||
print(msg)
|
||||
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
wlan.connect("yourwifinetwork", auth=(WLAN.WPA2, "wifipassword"), timeout=5000)
|
||||
|
||||
while not wlan.isconnected():
|
||||
machine.idle()
|
||||
print("Connected to WiFi\n")
|
||||
|
||||
client = MQTTClient("device_id", "io.adafruit.com",user="your_username", password="your_api_key", port=1883)
|
||||
|
||||
client.set_callback(sub_cb)
|
||||
client.connect()
|
||||
client.subscribe(topic="youraccount/feeds/lights")
|
||||
|
||||
while True:
|
||||
print("Sending ON")
|
||||
client.publish(topic="youraccount/feeds/lights", msg="ON")
|
||||
time.sleep(1)
|
||||
print("Sending OFF")
|
||||
client.publish(topic="youraccount/feeds/lights", msg="OFF")
|
||||
client.check_msg()
|
||||
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
207
tutorials/all/ota.md
Normal file
207
tutorials/all/ota.md
Normal file
@@ -0,0 +1,207 @@
|
||||
# OTA update
|
||||
|
||||
## Overview
|
||||
|
||||
Pycom modules come with the ability to update the devices firmware, while it is still running, we call this an "over the air" \(OTA\) update. The [`pycom`](../../firmwareapi/pycom/pycom.md) library provides several functions to achieve this. This example will demonstrate how you could potentially use this functionality to update deployed devices. The full source code of this example can be found [here](https://github.com/pycom/pycom-libraries/tree/master/examples/OTA).
|
||||
|
||||
## Method
|
||||
|
||||
Here we will describe one possible update methodology you could use that is implemented by this example.
|
||||
|
||||
Imagine you a smart metering company and you wish to roll out an update for your Pycom based smart meter. These meters usually send data back via LoRa. Unfortunately LoRa downlink messages have a very limited size and several hundred if not thousand would be required to upload a complete firmware image. To get around this you can have your devices sending their regular data via LoRa and when they receive a special command via a downlink message, the devices will connect to a WiFi network. It is unfeasible to ask customers to allow your device to connect to their home network so instead this network could be provided by a vehicle. This vehicle will travel around a certain geographic area in which the devices have been sent the special downlink message to initiate the update. The devices will look for the WiFi network being broadcast by the vehicle and connect. The devices will then connect to a server running on this WiFi network. This server \(also shown in this example\) will generate manifest files that instruct the device on what it should update, and where to get the update data from.
|
||||
|
||||
## Server
|
||||
|
||||
Code available [here](https://github.com/pycom/pycom-libraries/blob/master/examples/OTA/OTA_server.py).
|
||||
|
||||
This script runs a HTTP server on port `8000` that provisions over the air \(OTA\) update manifests in JSON format as well as serving the update content. This script should be run in a directory that contains every version of the end devices code, in the following structure:
|
||||
|
||||
```text
|
||||
- server directory
|
||||
|- this_script.py
|
||||
|- 1.0.0
|
||||
| |- flash
|
||||
| | |- lib
|
||||
| | | |- lib_a.py
|
||||
| | |- main.py
|
||||
| | |- boot.py
|
||||
| |- sd
|
||||
| |- some_asset.txt
|
||||
| |- asset_that_will_be_removed.wav
|
||||
|- 1.0.1
|
||||
| |- flash
|
||||
| | |- lib
|
||||
| | | |- lib_a.py
|
||||
| | | |- new_lib.py
|
||||
| | |- main.py
|
||||
| | |- boot.py
|
||||
| |- sd
|
||||
| |- some_asset.txt
|
||||
|- firmware_1.0.0.bin
|
||||
|- firmware_1.0.1.bin
|
||||
```
|
||||
|
||||
The top level directory that contains this script can contain one of two things:
|
||||
|
||||
* Update directory: These should be named with a version number compatible
|
||||
|
||||
with the python LooseVersion versioning scheme
|
||||
|
||||
\([http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html](http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html)\).
|
||||
|
||||
They should contain the entire file system of the end device for the
|
||||
|
||||
corresponding version number.
|
||||
|
||||
* Firmware: These files should be named in the format `firmare_VERSION.bin`, where VERSION is a a version number compatible with the python LooseVersion versioning scheme \([http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html](http://epydoc.sourceforge.net/stdlib/distutils.version.LooseVersion-class.html)\).
|
||||
|
||||
This file should be in the format of the `appimg.bin` created by the Pycom
|
||||
|
||||
firmware build scripts.
|
||||
|
||||
### How to use
|
||||
|
||||
Once the directory has been setup as described above you simply need to start this script using python3. Once started this script will run a HTTP server on port `8000` \(this can be changed by changing the PORT variable\). This server will serve all the files in directory as expected along with one additional special file, `manifest.json`. This file does not exist on the file system but is instead generated when requested and contains the required changes to bring the end device from its current version to the latest available version. You can see an example of this by pointing your web browser at:
|
||||
|
||||
`http://127.0.0.1:8000/manifest.json?current_ver=1.0.0`
|
||||
|
||||
The `current_ver` field at the end of the URL should be set to the current firmware version of the end device. The generated manifest will contain lists of which files are new, have changed or need to be deleted along with SHA1 hashes of the files. Below is an example of what such a manifest might look like:
|
||||
|
||||
```text
|
||||
{
|
||||
"delete": [
|
||||
"flash/old_file.py",
|
||||
"flash/other_old_file.py"
|
||||
],
|
||||
"firmware": {
|
||||
"URL": "http://192.168.1.144:8000/firmware_1.0.1b.bin",
|
||||
"hash": "ccc6914a457eb4af8855ec02f6909316526bdd08"
|
||||
},
|
||||
"new": [
|
||||
{
|
||||
"URL": "http://192.168.1.144:8000/1.0.1b/flash/lib/new_lib.py",
|
||||
"dst_path": "flash/lib/new_lib.py",
|
||||
"hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"
|
||||
}
|
||||
],
|
||||
"update": [
|
||||
{
|
||||
"URL": "http://192.168.1.144:8000/1.0.1b/flash/changed_file.py",
|
||||
"dst_path": "flash/changed_file.py",
|
||||
"hash": "1095df8213aac2983efd68dba9420c8efc9c7c4a"
|
||||
}
|
||||
],
|
||||
"version": "1.0.1b"
|
||||
}
|
||||
```
|
||||
|
||||
The manifest contains the following fields:
|
||||
|
||||
* `delete`: A list of paths to files which are no longer needed
|
||||
* `firmware`: The URL and SHA1 hash of the firmware image
|
||||
* `new`: the URL, path on end device and SHA1 hash of all new files
|
||||
* `update`: the URL, path on end device and SHA1 hash of all files which
|
||||
|
||||
existed before but have changed.
|
||||
|
||||
* `version`: The version number that this manifest will update the client to
|
||||
* `previous_version`: The version the client is currently on before applying
|
||||
|
||||
this update
|
||||
|
||||
_Note_: The version number of the files might not be the same as the firmware. The highest available version number, higher than the current client version is used for both firmware and files. This may differ between the two.
|
||||
|
||||
In order for the URL's to be properly formatted you are required to send a "host" header along with your HTTP get request e.g:
|
||||
|
||||
```text
|
||||
GET /manifest.json?current_ver=1.0.0 HTTP/1.0\r\nHost: 192.168.1.144:8000\r\n\r\n
|
||||
```
|
||||
|
||||
## Client Library
|
||||
|
||||
A MicroPyton library for interfacing with the server described above is available [here](https://github.com/pycom/pycom-libraries/blob/master/examples/OTA/1.0.0/flash/lib/OTA.py).
|
||||
|
||||
This library is split into two layers. The top level `OTA` class implements all the high level functionality such as parsing the JSON file, making back copies of files being updated incase the update fails, etc. The layer of the library is agnostic to your chosen transport method. Below this is the `WiFiOTA` class. This class implements the actual transport mechanism of how the device fetches the files and update manifest \(via WiFi as the class name suggests\). The reason for this split is so that the high level functionality can be reused regardless of what transport mechanism you end up using. This could be implemented on top of Bluetooth for example, or the sever changed from HTTP to FTP.
|
||||
|
||||
{% hint style="danger" %}
|
||||
Although the above code is functional, it is provided only as an example of how an end user might implement a OTA update mechanism. It is not 100% feature complete e.g. even though it does backup previous versions of files, the roll back procedure is not implemented. This is left of the end user to do.
|
||||
{% endhint %}
|
||||
|
||||
## Example
|
||||
|
||||
Below is am example implementing the methodology previously explained in this tutorial to initiate an OTA update.
|
||||
|
||||
{% hint style="info" %}
|
||||
The example below will only work on a Pycom device with LoRa capabilities. If want to test it out on a device without LoRa functionality then simply comment out any code relating to LoRa. Leaving just the `WiFiOTA` initialisation and they `ota.connect()` and `ota.update()`
|
||||
{% endhint %}
|
||||
|
||||
```python
|
||||
from network import LoRa, WLAN
|
||||
import socket
|
||||
import time
|
||||
from OTA import WiFiOTA
|
||||
from time import sleep
|
||||
import pycom
|
||||
import ubinascii
|
||||
|
||||
from config import WIFI_SSID, WIFI_PW, SERVER_IP
|
||||
|
||||
# Turn on GREEN LED
|
||||
pycom.heartbeat(False)
|
||||
pycom.rgbled(0xff00)
|
||||
|
||||
# Setup OTA
|
||||
ota = WiFiOTA(WIFI_SSID,
|
||||
WIFI_PW,
|
||||
SERVER_IP, # Update server address
|
||||
8000) # Update server port
|
||||
|
||||
# Turn off WiFi to save power
|
||||
w = WLAN()
|
||||
w.deinit()
|
||||
|
||||
# Initialise LoRa in LORAWAN mode.
|
||||
lora = LoRa(mode=LoRa.LORAWAN, region=LoRa.EU868)
|
||||
|
||||
app_eui = ubinascii.unhexlify('70B3D57ED0008CD6')
|
||||
app_key = ubinascii.unhexlify('B57F36D88691CEC5EE8659320169A61C')
|
||||
|
||||
# join a network using OTAA (Over the Air Activation)
|
||||
lora.join(activation=LoRa.OTAA, auth=(app_eui, app_key), timeout=0)
|
||||
|
||||
# wait until the module has joined the network
|
||||
while not lora.has_joined():
|
||||
time.sleep(2.5)
|
||||
print('Not yet joined...')
|
||||
|
||||
# create a LoRa socket
|
||||
s = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
|
||||
|
||||
# set the LoRaWAN data rate
|
||||
s.setsockopt(socket.SOL_LORA, socket.SO_DR, 5)
|
||||
|
||||
# make the socket blocking
|
||||
# (waits for the data to be sent and for the 2 receive windows to expire)
|
||||
s.setblocking(True)
|
||||
|
||||
while True:
|
||||
# send some data
|
||||
s.send(bytes([0x04, 0x05, 0x06]))
|
||||
|
||||
# make the socket non-blocking
|
||||
# (because if there's no data received it will block forever...)
|
||||
s.setblocking(False)
|
||||
|
||||
# get any data received (if any...)
|
||||
data = s.recv(64)
|
||||
|
||||
# Some sort of OTA trigger
|
||||
if data == bytes([0x01, 0x02, 0x03]):
|
||||
print("Performing OTA")
|
||||
# Perform OTA
|
||||
ota.connect()
|
||||
ota.update()
|
||||
|
||||
sleep(5)
|
||||
```
|
||||
|
||||
246
tutorials/all/owd.md
Normal file
246
tutorials/all/owd.md
Normal file
@@ -0,0 +1,246 @@
|
||||
# Onewire Driver
|
||||
|
||||
This tutorial explains how to connect and read data from a DS18x20 temperature sensor. The onewire library is also available at the [pycom-libraries](https://github.com/pycom/pycom-libraries/tree/master/lib/onewire) GitHub Repository.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python
|
||||
import time
|
||||
from machine import Pin
|
||||
from onewire import DS18X20
|
||||
from onewire import OneWire
|
||||
|
||||
# DS18B20 data line connected to pin P10
|
||||
ow = OneWire(Pin('P10'))
|
||||
temp = DS18X20(ow)
|
||||
|
||||
while True:
|
||||
print(temp.read_temp_async())
|
||||
time.sleep(1)
|
||||
temp.start_conversion()
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
## Library
|
||||
|
||||
```python
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
OneWire library for MicroPython
|
||||
"""
|
||||
|
||||
import time
|
||||
import machine
|
||||
|
||||
class OneWire:
|
||||
CMD_SEARCHROM = const(0xf0)
|
||||
CMD_READROM = const(0x33)
|
||||
CMD_MATCHROM = const(0x55)
|
||||
CMD_SKIPROM = const(0xcc)
|
||||
|
||||
def __init__(self, pin):
|
||||
self.pin = pin
|
||||
self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Perform the onewire reset function.
|
||||
Returns True if a device asserted a presence pulse, False otherwise.
|
||||
"""
|
||||
sleep_us = time.sleep_us
|
||||
disable_irq = machine.disable_irq
|
||||
enable_irq = machine.enable_irq
|
||||
pin = self.pin
|
||||
|
||||
pin(0)
|
||||
sleep_us(480)
|
||||
i = disable_irq()
|
||||
pin(1)
|
||||
sleep_us(60)
|
||||
status = not pin()
|
||||
enable_irq(i)
|
||||
sleep_us(420)
|
||||
return status
|
||||
|
||||
def read_bit(self):
|
||||
sleep_us = time.sleep_us
|
||||
enable_irq = machine.enable_irq
|
||||
pin = self.pin
|
||||
|
||||
pin(1) # half of the devices don't match CRC without this line
|
||||
i = machine.disable_irq()
|
||||
pin(0)
|
||||
sleep_us(1)
|
||||
pin(1)
|
||||
sleep_us(1)
|
||||
value = pin()
|
||||
enable_irq(i)
|
||||
sleep_us(40)
|
||||
return value
|
||||
|
||||
def read_byte(self):
|
||||
value = 0
|
||||
for i in range(8):
|
||||
value |= self.read_bit() << i
|
||||
return value
|
||||
|
||||
def read_bytes(self, count):
|
||||
buf = bytearray(count)
|
||||
for i in range(count):
|
||||
buf[i] = self.read_byte()
|
||||
return buf
|
||||
|
||||
def write_bit(self, value):
|
||||
sleep_us = time.sleep_us
|
||||
pin = self.pin
|
||||
|
||||
i = machine.disable_irq()
|
||||
pin(0)
|
||||
sleep_us(1)
|
||||
pin(value)
|
||||
sleep_us(60)
|
||||
pin(1)
|
||||
sleep_us(1)
|
||||
machine.enable_irq(i)
|
||||
|
||||
def write_byte(self, value):
|
||||
for i in range(8):
|
||||
self.write_bit(value & 1)
|
||||
value >>= 1
|
||||
|
||||
def write_bytes(self, buf):
|
||||
for b in buf:
|
||||
self.write_byte(b)
|
||||
|
||||
def select_rom(self, rom):
|
||||
"""
|
||||
Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
|
||||
"""
|
||||
self.reset()
|
||||
self.write_byte(CMD_MATCHROM)
|
||||
self.write_bytes(rom)
|
||||
|
||||
def crc8(self, data):
|
||||
"""
|
||||
Compute CRC
|
||||
"""
|
||||
crc = 0
|
||||
for i in range(len(data)):
|
||||
byte = data[i]
|
||||
for b in range(8):
|
||||
fb_bit = (crc ^ byte) & 0x01
|
||||
if fb_bit == 0x01:
|
||||
crc = crc ^ 0x18
|
||||
crc = (crc >> 1) & 0x7f
|
||||
if fb_bit == 0x01:
|
||||
crc = crc | 0x80
|
||||
byte = byte >> 1
|
||||
return crc
|
||||
|
||||
def scan(self):
|
||||
"""
|
||||
Return a list of ROMs for all attached devices.
|
||||
Each ROM is returned as a bytes object of 8 bytes.
|
||||
"""
|
||||
devices = []
|
||||
diff = 65
|
||||
rom = False
|
||||
for i in range(0xff):
|
||||
rom, diff = self._search_rom(rom, diff)
|
||||
if rom:
|
||||
devices += [rom]
|
||||
if diff == 0:
|
||||
break
|
||||
return devices
|
||||
|
||||
def _search_rom(self, l_rom, diff):
|
||||
if not self.reset():
|
||||
return None, 0
|
||||
self.write_byte(CMD_SEARCHROM)
|
||||
if not l_rom:
|
||||
l_rom = bytearray(8)
|
||||
rom = bytearray(8)
|
||||
next_diff = 0
|
||||
i = 64
|
||||
for byte in range(8):
|
||||
r_b = 0
|
||||
for bit in range(8):
|
||||
b = self.read_bit()
|
||||
if self.read_bit():
|
||||
if b: # there are no devices or there is an error on the bus
|
||||
return None, 0
|
||||
else:
|
||||
if not b: # collision, two devices with different bit meaning
|
||||
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
|
||||
b = 1
|
||||
next_diff = i
|
||||
self.write_bit(b)
|
||||
if b:
|
||||
r_b |= 1 << bit
|
||||
i -= 1
|
||||
rom[byte] = r_b
|
||||
return rom, next_diff
|
||||
|
||||
class DS18X20(object):
|
||||
def __init__(self, onewire):
|
||||
self.ow = onewire
|
||||
self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
|
||||
|
||||
def isbusy(self):
|
||||
"""
|
||||
Checks wether one of the DS18x20 devices on the bus is busy
|
||||
performing a temperature conversion
|
||||
"""
|
||||
return not self.ow.read_bit()
|
||||
|
||||
def start_conversion(self, rom=None):
|
||||
"""
|
||||
Start the temp conversion on one DS18x20 device.
|
||||
Pass the 8-byte bytes object with the ROM of the specific device you want to read.
|
||||
If only one DS18x20 device is attached to the bus you may omit the rom parameter.
|
||||
"""
|
||||
rom = rom or self.roms[0]
|
||||
ow = self.ow
|
||||
ow.reset()
|
||||
ow.select_rom(rom)
|
||||
ow.write_byte(0x44) # Convert Temp
|
||||
|
||||
def read_temp_async(self, rom=None):
|
||||
"""
|
||||
Read the temperature of one DS18x20 device if the conversion is complete,
|
||||
otherwise return None.
|
||||
"""
|
||||
if self.isbusy():
|
||||
return None
|
||||
rom = rom or self.roms[0]
|
||||
ow = self.ow
|
||||
ow.reset()
|
||||
ow.select_rom(rom)
|
||||
ow.write_byte(0xbe) # Read scratch
|
||||
data = ow.read_bytes(9)
|
||||
return self.convert_temp(rom[0], data)
|
||||
|
||||
def convert_temp(self, rom0, data):
|
||||
"""
|
||||
Convert the raw temperature data into degrees celsius and return as a fixed point with 2 decimal places.
|
||||
"""
|
||||
temp_lsb = data[0]
|
||||
temp_msb = data[1]
|
||||
if rom0 == 0x10:
|
||||
if temp_msb != 0:
|
||||
# convert negative number
|
||||
temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
|
||||
temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
|
||||
else:
|
||||
temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
|
||||
count_remain = data[6]
|
||||
count_per_c = data[7]
|
||||
temp = 100 * temp_read - 25 + (count_per_c - count_remain) // count_per_c
|
||||
return temp
|
||||
elif rom0 == 0x28:
|
||||
return (temp_msb << 8 | temp_lsb) * 100 // 16
|
||||
else:
|
||||
assert False
|
||||
```
|
||||
|
||||
125
tutorials/all/pir.md
Normal file
125
tutorials/all/pir.md
Normal file
@@ -0,0 +1,125 @@
|
||||
# PIR Sensor
|
||||
|
||||
This code reads PIR sensor triggers from this simple [PIR sensor](https://www.kiwi-electronics.nl/PIR-Motion-Sensor) and sends an HTTP request for every trigger, in this case to a [Domoticz](https://domoticz.com/) installation. When motion is constantly detected, this PIR sensor keeps the pin high, in which case this code will keep sending HTTP requests every 10 seconds \(configurable with the hold\_time variable\).
|
||||
|
||||
## Main \(`main.py`\)
|
||||
|
||||
```python
|
||||
import time
|
||||
from network import WLAN
|
||||
from machine import Pin
|
||||
from domoticz import Domoticz
|
||||
|
||||
wl = WLAN(WLAN.STA)
|
||||
d = Domoticz("<ip>", 8080 ,"<hash>")
|
||||
|
||||
#config
|
||||
hold_time_sec = 10
|
||||
|
||||
#flags
|
||||
last_trigger = -10
|
||||
|
||||
pir = Pin('G4',mode=Pin.IN, pull=Pin.PULL_UP)
|
||||
|
||||
# main loop
|
||||
print("Starting main loop")
|
||||
while True:
|
||||
if pir() == 1:
|
||||
if time.time() - last_trigger > hold_time_sec:
|
||||
last_trigger = time.time()
|
||||
print("Presence detected, sending HTTP request")
|
||||
try:
|
||||
return_code = d.setVariable('Presence:LivingRoom','1')
|
||||
print("Request result: "+str(return_code))
|
||||
except Exception as e:
|
||||
print("Request failed")
|
||||
print(e)
|
||||
else:
|
||||
last_trigger = 0
|
||||
print("No presence")
|
||||
|
||||
time.sleep_ms(500)
|
||||
|
||||
print("Exited main loop")
|
||||
```
|
||||
|
||||
## Boot \(`boot.py`\)
|
||||
|
||||
For more WiFi scripts, see the wlan step by step tutorial.
|
||||
|
||||
```python
|
||||
import os
|
||||
import machine
|
||||
|
||||
uart = machine.UART(0, 115200)
|
||||
os.dupterm(uart)
|
||||
|
||||
known_nets = {
|
||||
'NetworkID': {'pwd': '<password>', 'wlan_config': ('10.0.0.8', '255.255.0.0', '10.0.0.1', '10.0.0.1')},
|
||||
}
|
||||
|
||||
from network import WLAN
|
||||
wl = WLAN()
|
||||
|
||||
|
||||
if machine.reset_cause() != machine.SOFT_RESET:
|
||||
|
||||
wl.mode(WLAN.STA)
|
||||
original_ssid = wl.ssid()
|
||||
original_auth = wl.auth()
|
||||
|
||||
print("Scanning for known wifi nets")
|
||||
available_nets = wl.scan()
|
||||
nets = frozenset([e.ssid for e in available_nets])
|
||||
|
||||
known_nets_names = frozenset([key for key in known_nets])
|
||||
net_to_use = list(nets & known_nets_names)
|
||||
try:
|
||||
net_to_use = net_to_use[0]
|
||||
net_properties = known_nets[net_to_use]
|
||||
pwd = net_properties['pwd']
|
||||
sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]
|
||||
if 'wlan_config' in net_properties:
|
||||
wl.ifconfig(config=net_properties['wlan_config'])
|
||||
wl.connect(net_to_use, (sec, pwd), timeout=10000)
|
||||
while not wl.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])
|
||||
|
||||
except Exception as e:
|
||||
print("Failed to connect to any known network, going into AP mode")
|
||||
wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)
|
||||
```
|
||||
|
||||
## Domoticz Wrapper \(`domoticz.py`\)
|
||||
|
||||
```python
|
||||
import socket
|
||||
class Domoticz:
|
||||
|
||||
def __init__(self, ip, port, basic):
|
||||
self.basic = basic
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
|
||||
def setLight(self, idx, command):
|
||||
return self.sendRequest("type=command¶m=switchlight&idx="+idx+"&switchcmd="+command)
|
||||
|
||||
def setVariable(self, name, value):
|
||||
return self.sendRequest("type=command¶m=updateuservariable&vtype=0&vname="+name+"&vvalue="+value)
|
||||
|
||||
def sendRequest(self, path):
|
||||
try:
|
||||
s = socket.socket()
|
||||
s.connect((self.ip,self.port))
|
||||
s.send(b"GET /json.htm?"+path+" HTTP/1.1\r\nHost: pycom.io\r\nAuthorization: Basic "+self.basic+"\r\n\r\n")
|
||||
status = str(s.readline(), 'utf8')
|
||||
code = status.split(" ")[1]
|
||||
s.close()
|
||||
return code
|
||||
|
||||
except Exception:
|
||||
print("HTTP request failed")
|
||||
return 0
|
||||
```
|
||||
|
||||
45
tutorials/all/repl.md
Normal file
45
tutorials/all/repl.md
Normal file
@@ -0,0 +1,45 @@
|
||||
# REPL
|
||||
|
||||
Using the Pymakr Plugin, open and connect a device or use serial terminal \(PuTTY, screen, picocom, etc\). Upon connecting, there should be a blank screen with a flashing cursor. Press Enter and a MicroPython prompt should appear, i.e. `>>>`. Let’s make sure it is working with the obligatory test:
|
||||
|
||||
```python
|
||||
>>> print("Hello LoPy!")
|
||||
Hello LoPy!
|
||||
```
|
||||
|
||||
In the example above, the `>>>` characters should not be typed. They are there to indicate that the text should be placed after the prompt. Once the text has been entered `print("Hello LoPy!")` and pressed `Enter`, the output should appear on screen, identical to the example above.
|
||||
|
||||
Basic Python commands can be tested out in a similar fashion.
|
||||
|
||||
If this is not working, try either a hard reset or a soft reset; see below.
|
||||
|
||||
Here are some other example, utilising the device's hardware features:
|
||||
|
||||
```python
|
||||
>>> from machine import Pin
|
||||
>>> led = Pin('G16', mode=Pin.OUT, value=1)
|
||||
>>> led(0)
|
||||
>>> led(1)
|
||||
>>> led.toggle()
|
||||
>>> 1 + 2
|
||||
3
|
||||
>>> 5 / 2
|
||||
2.5
|
||||
>>> 20 * 'py'
|
||||
'pypypypypypypypypypypypypypypypypypypypy'
|
||||
```
|
||||
|
||||
## Resetting the Device
|
||||
|
||||
If something goes wrong, the device can be reset with two methods. The first is to press `CTRL-D` at the MicroPython prompt, which will perform a soft reset. A message, as following, will appear:
|
||||
|
||||
```python
|
||||
>>>
|
||||
PYB: soft reboot
|
||||
MicroPython v1.4.6-146-g1d8b5e5 on 2016-10-21; LoPy with ESP32
|
||||
Type "help()" for more information.
|
||||
>>>
|
||||
```
|
||||
|
||||
If that still isn’t working a hard reset can be performed \(power-off/on\) by pressing the `RST` switch \(the small black button next to the RGB LED\). Using telnet, this will end the session, disconnecting the program that was used to connect to the Pycom Device.
|
||||
|
||||
33
tutorials/all/rgbled.md
Normal file
33
tutorials/all/rgbled.md
Normal file
@@ -0,0 +1,33 @@
|
||||
# RGB LED
|
||||
|
||||
By default the heartbeat LED flashes in blue colour once every 4s to signal that the system is alive. This can be overridden through the `pycom` module.
|
||||
|
||||
```python
|
||||
import pycom
|
||||
|
||||
pycom.heartbeat(False)
|
||||
pycom.rgbled(0xff00) # turn on the RGB LED in green colour
|
||||
```
|
||||
|
||||
The heartbeat LED is also used to indicate that an error was detected.
|
||||
|
||||
The following piece of code uses the RGB LED to make a traffic light that runs for 10 cycles.
|
||||
|
||||
```python
|
||||
import pycom
|
||||
import time
|
||||
|
||||
pycom.heartbeat(False)
|
||||
for cycles in range(10): # stop after 10 cycles
|
||||
pycom.rgbled(0x007f00) # green
|
||||
time.sleep(5)
|
||||
pycom.rgbled(0x7f7f00) # yellow
|
||||
time.sleep(1.5)
|
||||
pycom.rgbled(0x7f0000) # red
|
||||
time.sleep(4)
|
||||
```
|
||||
|
||||
Here is the expected result:
|
||||
|
||||

|
||||
|
||||
141
tutorials/all/rmt.md
Normal file
141
tutorials/all/rmt.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# RMT
|
||||
|
||||
Detailed information about this class can be found in [`RMT`](../../firmwareapi/pycom/machine/rmt.md).
|
||||
|
||||
The RMT \(Remote Control\) peripheral of the ESP32 is primarily designed to send and receive infrared remote control signals that use on-off-keying of a carrier frequency, but due to its design it can be used to generate various types of signals, this class will allow you to do this.
|
||||
|
||||
The RMT has 7 channels, of which 5 are available and can be mapped to any GPIO pin \(_Note:_ Pins `P13` -`P18` can only be used as inputs\).
|
||||
|
||||
| Channel | Resolution | Maximum Pulse Width |
|
||||
| :--- | :--- | :--- |
|
||||
| 0 | Used by on-board LED | |
|
||||
| 1 | Used by `pycom.pulses_get()` | |
|
||||
| 2 | 100nS | 3.2768 ms |
|
||||
| 3 | 100nS | 3.2768 ms |
|
||||
| 4 | 1000nS | 32.768 ms |
|
||||
| 5 | 1000nS | 32.768 ms |
|
||||
| 6 | 3125nS | 102.4 ms |
|
||||
| 7 | 3125nS | 102.4 ms |
|
||||
|
||||
## Transmitting
|
||||
|
||||
The following examples create an RMT object on channel 4, configure it for transmission and send some data in various forms. The resolution of channel 4 is 1000 nano seconds, the given values are interpreted accordingly.
|
||||
|
||||
In this first example, we define the signal as a tuple of binary values that define the shape of the desired signal along with the duration of a bit.
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# Produces the pattern shown in data, where each bit lasts
|
||||
# duration * channel resolution = 10000 * 1000ns = 10ms
|
||||
data = (1,0,1,1,1,0,1,0,1)
|
||||
duration = 10000
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
In this example we define the signal by a tuple of durations and what state the signal starts in.
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# The list of durations for each pulse to be, these are in units of the channels
|
||||
# resolution:
|
||||
# duration = Desired pulse length / Channel Resolution
|
||||
duration = (8000,11000,8000,11000,6000,13000,6000,3000,8000)
|
||||
|
||||
# `start_level` defines if the signal starts off as LOW or HIGH, it will then
|
||||
# toggle state between each duration
|
||||
rmt.pulses_send(duration, start_level=RMT.HIGH)
|
||||
```
|
||||
|
||||

|
||||
|
||||
This third example, is a combination of the above two styles of defining a signal. Each pulse has a defined duration as well as a state. This is useful if you don't always want the signal to toggle state.
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# Produces the pattern shown in data, where each bit lasts
|
||||
# duration[i] * channel resolution = duration[i] * 1000ns
|
||||
data = (1,0,1,1,0,1)
|
||||
duration = (400,200,100,300,200,400)
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
The following example creates an RMT object on channel 4 and configures it for transmission with carrier modulation.
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
rmt = RMT(channel=4,
|
||||
gpio="P21",
|
||||
tx_idle_level=RMT.LOW,
|
||||
# Carrier = 100Hz, 80% duty, modules HIGH signals
|
||||
tx_carrier = (100, 70, RMT.HIGH))
|
||||
data = (1,0,1)
|
||||
duration = 10000
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving, then waits for the first, undefined number of pulses without timeout
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
rmt = machine.RMT(channel=2)
|
||||
rmt.init(gpio="P21", rx_idle_threshold=1000)
|
||||
|
||||
data = rmt.pulses_get()
|
||||
```
|
||||
|
||||
{% hint style="danger" %}
|
||||
If `tx_idle_level` is not set to the opposite of the third value in the `tx_carrier` tuple, the carrier wave will continue to be generated when the RMT channel is idle.
|
||||
{% endhint %}
|
||||
|
||||
## Receiving
|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving a undefined number of pulses, then waits maximum of 1000us for the first pulse.
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
# Sets RMT channel 2 to P21 and sets the maximum length of a valid pulse to
|
||||
# 1000*channel resolution = 1000 * 100ns = 100us
|
||||
rmt = machine.RMT(channel=2, gpio="P21", rx_idle_threshold=1000)
|
||||
rmt.init()
|
||||
|
||||
# Get a undefined number of pulses, waiting a maximum of 500us for the first
|
||||
# pulse (unlike other places where the absolute duration was based on the RMT
|
||||
# channels resolution, this value is in us) until a pulse longer than
|
||||
# rx_idle_threshold occurs.
|
||||
data = rmt.pulses_get(timeout=500)
|
||||
```
|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving, filters out pulses with width < 20\*100 nano seconds, then waits for 100 pulses
|
||||
|
||||
```python
|
||||
from machine import RMT
|
||||
|
||||
rmt = machine.RMT(channel=2, # Resolution = 100ns
|
||||
gpio="P21",
|
||||
# Longest valid pulse = 1000*100ns = 100us
|
||||
rx_idle_threshold=1000,
|
||||
# Filter out pulses shorter than 20*100ns = 2us
|
||||
rx_filter_threshold=20)
|
||||
|
||||
# Receive 100 pulses, pulses shorter than 2us or longer than 100us will be
|
||||
# ignored. That means if it receives 80 valid pulses but then the signal
|
||||
# doesn't change for 10 hours and then 20 more pulses occur, this function
|
||||
# will wait for 10h
|
||||
data = rmt.pulses_get(pulses=100)
|
||||
```
|
||||
|
||||
28
tutorials/all/threading.md
Normal file
28
tutorials/all/threading.md
Normal file
@@ -0,0 +1,28 @@
|
||||
# Threading
|
||||
|
||||
MicroPython supports spawning threads by the `_thread` module. The following example demonstrates the use of this module. A thread is simply defined as a function that can receive any number of parameters. Below 3 threads are started, each one perform a print at a different interval.
|
||||
|
||||
```python
|
||||
import _thread
|
||||
import time
|
||||
|
||||
def th_func(delay, id):
|
||||
while True:
|
||||
time.sleep(delay)
|
||||
print('Running thread %d' % id)
|
||||
|
||||
for i in range(3):
|
||||
_thread.start_new_thread(th_func, (i + 1, i))
|
||||
```
|
||||
|
||||
## Using Locks:
|
||||
|
||||
```python
|
||||
import _thread
|
||||
|
||||
a_lock = _thread.allocate_lock()
|
||||
|
||||
with a_lock:
|
||||
print("a_lock is locked while this executes")
|
||||
```
|
||||
|
||||
53
tutorials/all/timers.md
Normal file
53
tutorials/all/timers.md
Normal file
@@ -0,0 +1,53 @@
|
||||
# Timers
|
||||
|
||||
Detailed information about this class can be found in [`Timer`](../../firmwareapi/pycom/machine/timer.md).
|
||||
|
||||
## Chronometer
|
||||
|
||||
The Chronometer can be used to measure how much time has elapsed in a block of code. The following example uses a simple stopwatch.
|
||||
|
||||
```python
|
||||
from machine import Timer
|
||||
import time
|
||||
|
||||
chrono = Timer.Chrono()
|
||||
|
||||
chrono.start()
|
||||
time.sleep(1.25) # simulate the first lap took 1.25 seconds
|
||||
lap = chrono.read() # read elapsed time without stopping
|
||||
time.sleep(1.5)
|
||||
chrono.stop()
|
||||
total = chrono.read()
|
||||
|
||||
print()
|
||||
print("\nthe racer took %f seconds to finish the race" % total)
|
||||
print(" %f seconds in the first lap" % lap)
|
||||
print(" %f seconds in the last lap" % (total - lap))
|
||||
```
|
||||
|
||||
## Alarm
|
||||
|
||||
The Alarm can be used to get interrupts at a specific interval. The following code executes a callback every second for 10 seconds.
|
||||
|
||||
```python
|
||||
from machine import Timer
|
||||
|
||||
class Clock:
|
||||
|
||||
def __init__(self):
|
||||
self.seconds = 0
|
||||
self.__alarm = Timer.Alarm(self._seconds_handler, 1, periodic=True)
|
||||
|
||||
def _seconds_handler(self, alarm):
|
||||
self.seconds += 1
|
||||
print("%02d seconds have passed" % self.seconds)
|
||||
if self.seconds == 10:
|
||||
alarm.callback(None) # stop counting after 10 seconds
|
||||
|
||||
clock = Clock()
|
||||
```
|
||||
|
||||
{% hint style="info" %}
|
||||
There are no restrictions to what can be done in an interrupt. For example, it is possible to even do network requests with an interrupt. However, it is important to keep in mind that interrupts are handled sequentially, so it’s good practice to keep them short. More information can be found in [`Interrupt Handling`](../../firmwareapi/notes.md#interrupt-handling).
|
||||
{% endhint %}
|
||||
|
||||
144
tutorials/all/wlan.md
Normal file
144
tutorials/all/wlan.md
Normal file
@@ -0,0 +1,144 @@
|
||||
# WLAN
|
||||
|
||||
The WLAN is a system feature of all Pycom devices, therefore it is enabled by default.
|
||||
|
||||
In order to retrieve the current WLAN instance, run:
|
||||
|
||||
```python
|
||||
>>> from network import WLAN
|
||||
>>> wlan = WLAN() # we call the constructor without params
|
||||
```
|
||||
|
||||
The current mode \(`WLAN.AP` after power up\) may be checked by running:
|
||||
|
||||
```python
|
||||
>>> wlan.mode()
|
||||
```
|
||||
|
||||
{% hint style="danger" %}
|
||||
|
||||
When changing the WLAN mode, if following the instructions below, the WLAN connection to the Pycom device will be broken. This means commands will not run interactively over WiFi.
|
||||
|
||||
**There are two ways around this:**
|
||||
|
||||
1. Put this setup code into the `boot.py` file of the Pycom device so that it gets executed automatically after reset.
|
||||
2. Duplicate the REPL on UART. This way commands can be run via Serial USB.
|
||||
|
||||
## Connecting to a Router
|
||||
|
||||
The WLAN network class always boots in `WLAN.AP` mode; to connect it to an existing network, the WiFi class must be configured as a station:
|
||||
|
||||
```python
|
||||
from network import WLAN
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
```
|
||||
|
||||
Now the device may proceed to scan for networks:
|
||||
|
||||
```python
|
||||
nets = wlan.scan()
|
||||
for net in nets:
|
||||
if net.ssid == 'mywifi':
|
||||
print('Network found!')
|
||||
wlan.connect(net.ssid, auth=(net.sec, 'mywifikey'), timeout=5000)
|
||||
while not wlan.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
print('WLAN connection succeeded!')
|
||||
break
|
||||
```
|
||||
|
||||
## Assigning a Static IP Address at Boot Up
|
||||
|
||||
If the users wants their device to connect to a home router upon boot up, using with a fixed IP address, use the following script as `/flash/boot.py`:
|
||||
|
||||
```python
|
||||
import machine
|
||||
from network import WLAN
|
||||
wlan = WLAN() # get current object, without changing the mode
|
||||
|
||||
if machine.reset_cause() != machine.SOFT_RESET:
|
||||
wlan.init(mode=WLAN.STA)
|
||||
# configuration below MUST match your home router settings!!
|
||||
wlan.ifconfig(config=('192.168.178.107', '255.255.255.0', '192.168.178.1', '8.8.8.8'))
|
||||
|
||||
if not wlan.isconnected():
|
||||
# change the line below to match your network ssid, security and password
|
||||
wlan.connect('mywifi', auth=(WLAN.WPA2, 'mywifikey'), timeout=5000)
|
||||
while not wlan.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
```
|
||||
|
||||
{% hint style="info" %}
|
||||
Notice how we check for the reset cause and the connection status, this is crucial in order to be able to soft reset the LoPy during a telnet session without breaking the connection.
|
||||
{% endhint %}
|
||||
|
||||
## Multiple Networks using a Static IP Address
|
||||
|
||||
The following script holds a list with nets and an optional list of `wlan_config` to set a fixed IP
|
||||
|
||||
```python
|
||||
import os
|
||||
import machine
|
||||
|
||||
uart = machine.UART(0, 115200)
|
||||
os.dupterm(uart)
|
||||
|
||||
known_nets = {
|
||||
'<net>': {'pwd': '<password>'},
|
||||
'<net>': {'pwd': '<password>', 'wlan_config': ('10.0.0.114', '255.255.0.0', '10.0.0.1', '10.0.0.1')}, # (ip, subnet_mask, gateway, DNS_server)
|
||||
}
|
||||
|
||||
if machine.reset_cause() != machine.SOFT_RESET:
|
||||
from network import WLAN
|
||||
wl = WLAN()
|
||||
wl.mode(WLAN.STA)
|
||||
original_ssid = wl.ssid()
|
||||
original_auth = wl.auth()
|
||||
|
||||
print("Scanning for known wifi nets")
|
||||
available_nets = wl.scan()
|
||||
nets = frozenset([e.ssid for e in available_nets])
|
||||
|
||||
known_nets_names = frozenset([key for key in known_nets])
|
||||
net_to_use = list(nets & known_nets_names)
|
||||
try:
|
||||
net_to_use = net_to_use[0]
|
||||
net_properties = known_nets[net_to_use]
|
||||
pwd = net_properties['pwd']
|
||||
sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]
|
||||
if 'wlan_config' in net_properties:
|
||||
wl.ifconfig(config=net_properties['wlan_config'])
|
||||
wl.connect(net_to_use, (sec, pwd), timeout=10000)
|
||||
while not wl.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])
|
||||
|
||||
except Exception as e:
|
||||
print("Failed to connect to any known network, going into AP mode")
|
||||
wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)
|
||||
```
|
||||
|
||||
## Connecting to a WPA2-Enterprise network
|
||||
|
||||
### Connecting with EAP-TLS:
|
||||
|
||||
Before connecting, obtain and copy the public and private keys to the device, e.g. under location `/flash/cert`. If it is required to validate the server’s public key, an appropriate CA certificate \(chain\) must also be provided.
|
||||
|
||||
```python
|
||||
from network import WLAN
|
||||
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
wlan.connect(ssid='mywifi', auth=(WLAN.WPA2_ENT,), identity='myidentity', ca_certs='/flash/cert/ca.pem', keyfile='/flash/cert/client.key', certfile='/flash/cert/client.crt')
|
||||
```
|
||||
|
||||
### Connecting with EAP-PEAP or EAP-TTLS:
|
||||
|
||||
In case of EAP-PEAP \(or EAP-TTLS\), the client key and certificate are not necessary, only a username and password pair. If it is required to validate the server’s public key, an appropriate CA certificate \(chain\) must also be provided.
|
||||
|
||||
```python
|
||||
from network import WLAN
|
||||
|
||||
wlan = WLAN(mode=WLAN.STA)
|
||||
wlan.connect(ssid='mywifi', auth=(WLAN.WPA2_ENT, 'username', 'password'), identity='myidentity', ca_certs='/flash/cert/ca.pem')
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user