mirror of
https://github.com/sascha-hemi/pycom-documentation.git
synced 2026-03-21 06:06:03 +01:00
Merge branch 'docs-restructure' of https://github.com/pycom/pycom-documentation into docs-restructure
This commit is contained in:
0
content/tutorials/hardware/_index.md
Normal file
0
content/tutorials/hardware/_index.md
Normal file
44
content/tutorials/hardware/adc.md
Normal file
44
content/tutorials/hardware/adc.md
Normal file
@@ -0,0 +1,44 @@
|
||||
---
|
||||
title: "ADC"
|
||||
aliases:
|
||||
- tutorials/all/adc.html
|
||||
- tutorials/all/adc.md
|
||||
- chapter/tutorials/all/adc
|
||||
---
|
||||
|
||||
This example is a simple ADC sample. For more information please see [`ADC`](/firmwareapi/pycom/machine/adc).
|
||||
|
||||
```python
|
||||
|
||||
from machine import ADC
|
||||
adc = ADC(0)
|
||||
adc_c = adc.channel(pin='P13')
|
||||
adc_c()
|
||||
adc_c.value()
|
||||
```
|
||||
|
||||
## Calibration
|
||||
|
||||
Currently the ESP32's ADC is not calibrated from the factory. This means it must be calibrated each time you wish to use it. To do this you must firstly measure the internal voltage reference. The following code will connect the 1.1v reference to `P22`
|
||||
|
||||
```python
|
||||
|
||||
from machine import ADC
|
||||
adc = ADC()
|
||||
|
||||
# Output Vref of P22
|
||||
adc.vref_to_pin('P22')
|
||||
```
|
||||
|
||||
Now that the voltage reference is externally accessible you should measure it with the most accurate voltmeter you have access to. Note down the reading in millivolts, e.g. `1120`. To disconnect the 1.1v reference from `P22` please reset your module. You can now calibrate the ADC by telling it the true value of the internal reference. You should then check your calibration by connecting the ADC to a known voltage source.
|
||||
|
||||
```python
|
||||
|
||||
# Set calibration - see note above
|
||||
adc.vref(1100)
|
||||
|
||||
# Check calibration by reading a known voltage
|
||||
adc_c = adc.channel(pin='P16', attn=ADC.ATTN_11DB)
|
||||
print(adc_c.voltage())
|
||||
```
|
||||
|
||||
103
content/tutorials/hardware/i2c.md
Normal file
103
content/tutorials/hardware/i2c.md
Normal file
@@ -0,0 +1,103 @@
|
||||
---
|
||||
title: "I2C"
|
||||
aliases:
|
||||
- tutorials/all/i2c.html
|
||||
- tutorials/all/i2c.md
|
||||
- chapter/tutorials/all/i2c
|
||||
---
|
||||
|
||||
The following example receives data from a light sensor using I2C. Sensor used is the BH1750FVI Digital Light Sensor.
|
||||
|
||||
```python
|
||||
|
||||
import time
|
||||
from machine import I2C
|
||||
import bh1750fvi
|
||||
|
||||
i2c = I2C(0, I2C.MASTER, baudrate=100000)
|
||||
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
|
||||
|
||||
while(True):
|
||||
data = light_sensor.read()
|
||||
print(data)
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
## Drivers for the BH1750FVI
|
||||
|
||||
Place this sample code into a file named `bh1750fvi.py`. This can then be imported as a library.
|
||||
|
||||
```python
|
||||
|
||||
# Simple driver for the BH1750FVI digital light sensor
|
||||
|
||||
class BH1750FVI:
|
||||
MEASUREMENT_TIME = const(120)
|
||||
|
||||
def __init__(self, i2c, addr=0x23, period=150):
|
||||
self.i2c = i2c
|
||||
self.period = period
|
||||
self.addr = addr
|
||||
self.time = 0
|
||||
self.value = 0
|
||||
self.i2c.writeto(addr, bytes([0x10])) # start continuos 1 Lux readings every 120ms
|
||||
|
||||
def read(self):
|
||||
self.time += self.period
|
||||
if self.time >= MEASUREMENT_TIME:
|
||||
self.time = 0
|
||||
data = self.i2c.readfrom(self.addr, 2)
|
||||
self.value = (((data[0] << 8) + data[1]) * 1200) // 1000
|
||||
return self.value
|
||||
```
|
||||
|
||||
## Light sensor and LoRa
|
||||
|
||||
This is the same code, with added LoRa connectivity, sending the lux value from the light sensor to another LoRa enabled device.
|
||||
|
||||
```python
|
||||
|
||||
import socket
|
||||
import time
|
||||
import pycom
|
||||
import struct
|
||||
from network import LoRa
|
||||
from machine import I2C
|
||||
import bh1750fvi
|
||||
|
||||
LORA_PKG_FORMAT = "!BH"
|
||||
LORA_CONFIRM_FORMAT = "!BB"
|
||||
|
||||
DEVICE_ID = 1
|
||||
|
||||
pycom.heartbeat(False)
|
||||
|
||||
lora = LoRa(mode=LoRa.LORA, tx_iq=True, region=LoRa.EU868)
|
||||
lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
|
||||
lora_sock.setblocking(False)
|
||||
|
||||
i2c = I2C(0, I2C.MASTER, baudrate=100000)
|
||||
light_sensor = bh1750fvi.BH1750FVI(i2c, addr=i2c.scan()[0])
|
||||
|
||||
while(True):
|
||||
msg = struct.pack(LORA_PKG_FORMAT, DEVICE_ID, light_sensor.read())
|
||||
lora_sock.send(msg)
|
||||
|
||||
pycom.rgbled(0x150000)
|
||||
|
||||
wait = 5
|
||||
while (wait > 0):
|
||||
wait = wait - 0.1
|
||||
time.sleep(0.1)
|
||||
recv_data = lora_sock.recv(64)
|
||||
|
||||
if (len (recv_data) >= 2):
|
||||
status, device_id = struct.unpack(LORA_CONFIRM_FORMAT, recv_data)
|
||||
|
||||
if (device_id == DEVICE_ID and status == 200):
|
||||
pycom.rgbled(0x001500)
|
||||
wait = 0
|
||||
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
147
content/tutorials/hardware/modbus.md
Normal file
147
content/tutorials/hardware/modbus.md
Normal file
@@ -0,0 +1,147 @@
|
||||
---
|
||||
title: "Modbus"
|
||||
aliases:
|
||||
- tutorials/all/modbus.html
|
||||
- tutorials/all/modbus.md
|
||||
- chapter/tutorials/all/modbus
|
||||
---
|
||||
|
||||
Modbus is a messaging protocol that defines the packet structure for transferring data between devices in a master/slave architecture. The protocol is independent of the transmission medium and is usually transmitted over TCP (MODBUS TCP) or serial communication (MODBUS RTU). Modbus is intended as a request/reply protocol and delivers services specified by function codes. The function code in the request tells the addressed slave what kind of action to perform. The function codes most commonly supported by devices are listed below.
|
||||
|
||||
| Function Name | Function Code |
|
||||
| :--- | :--- |
|
||||
| Read Coils | 0x01 |
|
||||
| Read Discrete Inputs | 0x02 |
|
||||
| Read Holding Registers | 0x03 |
|
||||
| Read Input Registers | 0x04 |
|
||||
| Write Single Coil | 0x05 |
|
||||
| Write Single Register | 0x06 |
|
||||
| Write Multiple Coils | 0x0F |
|
||||
| Write Multiple Registers | 0x10 |
|
||||
|
||||
For more information on the MODBUS RTU see the following <a href="http://www.modbus.org/docs/Modbus_over_serial_line_V1_02.pdf" target="_blank"> PDF File </a>. Information on the MODBUS TCP can be found <a href="http://www.modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf" target="_blank"> Here </a>.
|
||||
|
||||
## Pycom Modbus Library
|
||||
|
||||
Python libraries and sample code that support Modbus TCP and Modbus RTU are available at the following [GitHub Repository](https://github.com/pycom/pycom-modbus). To use this library, connect to the target Pycom device via ftp and upload the uModbus folder to `/flash`. A description of the supported function codes is found below.
|
||||
|
||||
### Read Coils
|
||||
|
||||
This function code requests the status (ON/OFF) of discrete coils on a remote device. The slave device address, the address of the first coil and the number of coils must be specified in the request. The address of the first coil is 0 and a maximum of 2000 contiguous coils can be read. Python sample code is shown below.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
coil_quantity=100
|
||||
|
||||
coil_status = modbus_obj.read_coils(slave_addr, starting_address, coil_quantity)
|
||||
print('Coil status: ' + ' '.join('{:d}'.format(x) for x in coil_status))
|
||||
```
|
||||
|
||||
### Read Discrete Inputs
|
||||
|
||||
This command is used to read the status (ON/OFF) of discrete inputs on a remote device. The slave address, the address of the first input, and the quantity of inputs to be read must be specified. The address of the first input is 0 and a maximum of 2000 continuous inputs can be read. The Python sample code is shown below.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
starting_address=0x0
|
||||
input_quantity=100
|
||||
|
||||
input_status = modbus_obj.read_discrete_inputs(slave_addr, starting_address, input_quantity)
|
||||
print('Input status: ' + ' '.join('{:d}'.format(x) for x in input_status))
|
||||
```
|
||||
|
||||
### Read Holding Registers
|
||||
|
||||
This function code is used to read the contents of analogue output holding registers. The slave address, the starting register address, the number of registers to read and the sign of the data must be specified. Register addresses start at 0 and a maximum of 125 continuous registers can be read.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
register_quantity=100
|
||||
signed=True
|
||||
|
||||
register_value = modbus_obj.read_holding_registers(slave_addr, starting_address, register_quantity, signed)
|
||||
print('Holding register value: ' + ' '.join('{:d}'.format(x) for x in register_value))
|
||||
```
|
||||
|
||||
### Read Input Registers
|
||||
|
||||
This command is used to read up to 125 continuous input registers on a remote device. The slave address, the starting register address, the number of input registers and the sign of the data must be specified. The address of the first input registers is 0.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
register_quantity=100
|
||||
signed=True
|
||||
|
||||
register_value = modbus_obj.read_input_registers(slave_addr, starting_address, register_quantity, signed)
|
||||
print('Input register value: ' + ' '.join('{:d}'.format(x) for x in register_value))
|
||||
```
|
||||
|
||||
### Write Single Coil
|
||||
|
||||
This function code is used to write the state of a discrete coil on a remote device. A value of `0xFF00` means the coil should be set to ON, while a value of `0x0000` means the coil should be set to OFF. The Python sample code to set the coil at address `0x00`, to an ON state is shown below.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
output_address=0x00
|
||||
output_value=0xFF00
|
||||
|
||||
return_flag = modbus_obj.write_single_coil(slave_addr, output_address, output_value)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing single coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Single Register
|
||||
|
||||
This command is used to write the contents of an analog output holding register on a remote device. The slave address, the register address, the register value, and the signature of the data must be specified. As for all the other commands, the register addresses start from 0.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
register_address=0x01
|
||||
register_value=-32768
|
||||
signed=True
|
||||
|
||||
return_flag = modbus_obj.write_single_register(slave_addr, register_address, register_value, signed)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing single coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Multiple Coils
|
||||
|
||||
This function code is used to set a continuous sequence of coils, in a remote device, to either ON or OFF. The slave address, the starting address of the coils and an array with the coil states must be specified.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
starting_address=0x00
|
||||
output_values=[1,1,1,0,0,1,1,1,0,0,1,1,1]
|
||||
|
||||
return_flag = modbus_obj.write_multiple_coils(slave_addr, starting_address, output_values)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing multiple coil status: ' + output_flag)
|
||||
```
|
||||
|
||||
### Write Multiple Registers
|
||||
|
||||
This command is used to write the contents of a continuous sequence of analogue registers on a remote device. The slave address, the starting register address, the register values, and the signature of the data must be specified. The address of the first register is 0 and a maximum of 125 register values can be written. The Python sample code is shown below.
|
||||
|
||||
```python
|
||||
|
||||
slave_addr=0x0A
|
||||
register_address=0x01
|
||||
register_values=[2, -4, 6, -256, 1024]
|
||||
signed=True
|
||||
|
||||
return_flag = modbus_obj.write_multiple_registers(slave_addr, register_address, register_values, signed)
|
||||
output_flag = 'Success' if return_flag else 'Failure'
|
||||
print('Writing multiple register status: ' + output_flag)
|
||||
```
|
||||
|
||||
254
content/tutorials/hardware/owd.md
Normal file
254
content/tutorials/hardware/owd.md
Normal file
@@ -0,0 +1,254 @@
|
||||
---
|
||||
title: "Onewire Driver"
|
||||
aliases:
|
||||
- tutorials/all/owd.html
|
||||
- tutorials/all/owd.md
|
||||
- chapter/tutorials/all/owd
|
||||
---
|
||||
|
||||
This tutorial explains how to connect and read data from a DS18x20 temperature sensor. The onewire library is also available at the [pycom-libraries](https://github.com/pycom/pycom-libraries/tree/master/lib/onewire) GitHub Repository.
|
||||
|
||||
## Basic usage
|
||||
|
||||
```python
|
||||
|
||||
import time
|
||||
from machine import Pin
|
||||
from onewire import DS18X20
|
||||
from onewire import OneWire
|
||||
|
||||
# DS18B20 data line connected to pin P10
|
||||
ow = OneWire(Pin('P10'))
|
||||
temp = DS18X20(ow)
|
||||
|
||||
while True:
|
||||
print(temp.read_temp_async())
|
||||
time.sleep(1)
|
||||
temp.start_conversion()
|
||||
time.sleep(1)
|
||||
```
|
||||
|
||||
## Library
|
||||
|
||||
```python
|
||||
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
OneWire library for MicroPython
|
||||
"""
|
||||
|
||||
import time
|
||||
import machine
|
||||
|
||||
class OneWire:
|
||||
CMD_SEARCHROM = const(0xf0)
|
||||
CMD_READROM = const(0x33)
|
||||
CMD_MATCHROM = const(0x55)
|
||||
CMD_SKIPROM = const(0xcc)
|
||||
|
||||
def __init__(self, pin):
|
||||
self.pin = pin
|
||||
self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)
|
||||
|
||||
def reset(self):
|
||||
"""
|
||||
Perform the onewire reset function.
|
||||
Returns True if a device asserted a presence pulse, False otherwise.
|
||||
"""
|
||||
sleep_us = time.sleep_us
|
||||
disable_irq = machine.disable_irq
|
||||
enable_irq = machine.enable_irq
|
||||
pin = self.pin
|
||||
|
||||
pin(0)
|
||||
sleep_us(480)
|
||||
i = disable_irq()
|
||||
pin(1)
|
||||
sleep_us(60)
|
||||
status = not pin()
|
||||
enable_irq(i)
|
||||
sleep_us(420)
|
||||
return status
|
||||
|
||||
def read_bit(self):
|
||||
sleep_us = time.sleep_us
|
||||
enable_irq = machine.enable_irq
|
||||
pin = self.pin
|
||||
|
||||
pin(1) # half of the devices don't match CRC without this line
|
||||
i = machine.disable_irq()
|
||||
pin(0)
|
||||
sleep_us(1)
|
||||
pin(1)
|
||||
sleep_us(1)
|
||||
value = pin()
|
||||
enable_irq(i)
|
||||
sleep_us(40)
|
||||
return value
|
||||
|
||||
def read_byte(self):
|
||||
value = 0
|
||||
for i in range(8):
|
||||
value |= self.read_bit() << i
|
||||
return value
|
||||
|
||||
def read_bytes(self, count):
|
||||
buf = bytearray(count)
|
||||
for i in range(count):
|
||||
buf[i] = self.read_byte()
|
||||
return buf
|
||||
|
||||
def write_bit(self, value):
|
||||
sleep_us = time.sleep_us
|
||||
pin = self.pin
|
||||
|
||||
i = machine.disable_irq()
|
||||
pin(0)
|
||||
sleep_us(1)
|
||||
pin(value)
|
||||
sleep_us(60)
|
||||
pin(1)
|
||||
sleep_us(1)
|
||||
machine.enable_irq(i)
|
||||
|
||||
def write_byte(self, value):
|
||||
for i in range(8):
|
||||
self.write_bit(value & 1)
|
||||
value >>= 1
|
||||
|
||||
def write_bytes(self, buf):
|
||||
for b in buf:
|
||||
self.write_byte(b)
|
||||
|
||||
def select_rom(self, rom):
|
||||
"""
|
||||
Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
|
||||
"""
|
||||
self.reset()
|
||||
self.write_byte(CMD_MATCHROM)
|
||||
self.write_bytes(rom)
|
||||
|
||||
def crc8(self, data):
|
||||
"""
|
||||
Compute CRC
|
||||
"""
|
||||
crc = 0
|
||||
for i in range(len(data)):
|
||||
byte = data[i]
|
||||
for b in range(8):
|
||||
fb_bit = (crc ^ byte) & 0x01
|
||||
if fb_bit == 0x01:
|
||||
crc = crc ^ 0x18
|
||||
crc = (crc >> 1) & 0x7f
|
||||
if fb_bit == 0x01:
|
||||
crc = crc | 0x80
|
||||
byte = byte >> 1
|
||||
return crc
|
||||
|
||||
def scan(self):
|
||||
"""
|
||||
Return a list of ROMs for all attached devices.
|
||||
Each ROM is returned as a bytes object of 8 bytes.
|
||||
"""
|
||||
devices = []
|
||||
diff = 65
|
||||
rom = False
|
||||
for i in range(0xff):
|
||||
rom, diff = self._search_rom(rom, diff)
|
||||
if rom:
|
||||
devices += [rom]
|
||||
if diff == 0:
|
||||
break
|
||||
return devices
|
||||
|
||||
def _search_rom(self, l_rom, diff):
|
||||
if not self.reset():
|
||||
return None, 0
|
||||
self.write_byte(CMD_SEARCHROM)
|
||||
if not l_rom:
|
||||
l_rom = bytearray(8)
|
||||
rom = bytearray(8)
|
||||
next_diff = 0
|
||||
i = 64
|
||||
for byte in range(8):
|
||||
r_b = 0
|
||||
for bit in range(8):
|
||||
b = self.read_bit()
|
||||
if self.read_bit():
|
||||
if b: # there are no devices or there is an error on the bus
|
||||
return None, 0
|
||||
else:
|
||||
if not b: # collision, two devices with different bit meaning
|
||||
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
|
||||
b = 1
|
||||
next_diff = i
|
||||
self.write_bit(b)
|
||||
if b:
|
||||
r_b |= 1 << bit
|
||||
i -= 1
|
||||
rom[byte] = r_b
|
||||
return rom, next_diff
|
||||
|
||||
class DS18X20(object):
|
||||
def __init__(self, onewire):
|
||||
self.ow = onewire
|
||||
self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
|
||||
|
||||
def isbusy(self):
|
||||
"""
|
||||
Checks wether one of the DS18x20 devices on the bus is busy
|
||||
performing a temperature conversion
|
||||
"""
|
||||
return not self.ow.read_bit()
|
||||
|
||||
def start_conversion(self, rom=None):
|
||||
"""
|
||||
Start the temp conversion on one DS18x20 device.
|
||||
Pass the 8-byte bytes object with the ROM of the specific device you want to read.
|
||||
If only one DS18x20 device is attached to the bus you may omit the rom parameter.
|
||||
"""
|
||||
rom = rom or self.roms[0]
|
||||
ow = self.ow
|
||||
ow.reset()
|
||||
ow.select_rom(rom)
|
||||
ow.write_byte(0x44) # Convert Temp
|
||||
|
||||
def read_temp_async(self, rom=None):
|
||||
"""
|
||||
Read the temperature of one DS18x20 device if the conversion is complete,
|
||||
otherwise return None.
|
||||
"""
|
||||
if self.isbusy():
|
||||
return None
|
||||
rom = rom or self.roms[0]
|
||||
ow = self.ow
|
||||
ow.reset()
|
||||
ow.select_rom(rom)
|
||||
ow.write_byte(0xbe) # Read scratch
|
||||
data = ow.read_bytes(9)
|
||||
return self.convert_temp(rom[0], data)
|
||||
|
||||
def convert_temp(self, rom0, data):
|
||||
"""
|
||||
Convert the raw temperature data into degrees celsius and return as a fixed point with 2 decimal places.
|
||||
"""
|
||||
temp_lsb = data[0]
|
||||
temp_msb = data[1]
|
||||
if rom0 == 0x10:
|
||||
if temp_msb != 0:
|
||||
# convert negative number
|
||||
temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
|
||||
temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
|
||||
else:
|
||||
temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
|
||||
count_remain = data[6]
|
||||
count_per_c = data[7]
|
||||
temp = 100 * temp_read - 25 + (count_per_c - count_remain) // count_per_c
|
||||
return temp
|
||||
elif rom0 == 0x28:
|
||||
return (temp_msb << 8 | temp_lsb) * 100 // 16
|
||||
else:
|
||||
assert False
|
||||
```
|
||||
|
||||
134
content/tutorials/hardware/pir.md
Normal file
134
content/tutorials/hardware/pir.md
Normal file
@@ -0,0 +1,134 @@
|
||||
---
|
||||
title: "PIR Sensor"
|
||||
aliases:
|
||||
- tutorials/all/pir.html
|
||||
- tutorials/all/pir.md
|
||||
- chapter/tutorials/all/pir
|
||||
---
|
||||
|
||||
This code reads PIR sensor triggers from this simple [PIR sensor](https://www.kiwi-electronics.nl/PIR-Motion-Sensor) and sends an HTTP request for every trigger, in this case to a [Domoticz](https://domoticz.com/) installation. When motion is constantly detected, this PIR sensor keeps the pin high, in which case this code will keep sending HTTP requests every 10 seconds (configurable with the hold\_time variable).
|
||||
|
||||
## Main (`main.py`)
|
||||
|
||||
```python
|
||||
|
||||
import time
|
||||
from network import WLAN
|
||||
from machine import Pin
|
||||
from domoticz import Domoticz
|
||||
|
||||
wl = WLAN(WLAN.STA)
|
||||
d = Domoticz("<ip>", 8080 ,"<hash>")
|
||||
|
||||
#config
|
||||
hold_time_sec = 10
|
||||
|
||||
#flags
|
||||
last_trigger = -10
|
||||
|
||||
pir = Pin('G4',mode=Pin.IN, pull=Pin.PULL_UP)
|
||||
|
||||
# main loop
|
||||
print("Starting main loop")
|
||||
while True:
|
||||
if pir() == 1:
|
||||
if time.time() - last_trigger > hold_time_sec:
|
||||
last_trigger = time.time()
|
||||
print("Presence detected, sending HTTP request")
|
||||
try:
|
||||
return_code = d.setVariable('Presence:LivingRoom','1')
|
||||
print("Request result: "+str(return_code))
|
||||
except Exception as e:
|
||||
print("Request failed")
|
||||
print(e)
|
||||
else:
|
||||
last_trigger = 0
|
||||
print("No presence")
|
||||
|
||||
time.sleep_ms(500)
|
||||
|
||||
print("Exited main loop")
|
||||
```
|
||||
|
||||
## Boot (`boot.py`)
|
||||
|
||||
For more WiFi scripts, see the wlan step by step tutorial.
|
||||
|
||||
```python
|
||||
|
||||
import os
|
||||
import machine
|
||||
|
||||
uart = machine.UART(0, 115200)
|
||||
os.dupterm(uart)
|
||||
|
||||
known_nets = {
|
||||
'NetworkID': {'pwd': '<password>', 'wlan_config': ('10.0.0.8', '255.255.0.0', '10.0.0.1', '10.0.0.1')},
|
||||
}
|
||||
|
||||
from network import WLAN
|
||||
wl = WLAN()
|
||||
|
||||
|
||||
if machine.reset_cause() != machine.SOFT_RESET:
|
||||
|
||||
wl.mode(WLAN.STA)
|
||||
original_ssid = wl.ssid()
|
||||
original_auth = wl.auth()
|
||||
|
||||
print("Scanning for known wifi nets")
|
||||
available_nets = wl.scan()
|
||||
nets = frozenset([e.ssid for e in available_nets])
|
||||
|
||||
known_nets_names = frozenset([key for key in known_nets])
|
||||
net_to_use = list(nets & known_nets_names)
|
||||
try:
|
||||
net_to_use = net_to_use[0]
|
||||
net_properties = known_nets[net_to_use]
|
||||
pwd = net_properties['pwd']
|
||||
sec = [e.sec for e in available_nets if e.ssid == net_to_use][0]
|
||||
if 'wlan_config' in net_properties:
|
||||
wl.ifconfig(config=net_properties['wlan_config'])
|
||||
wl.connect(net_to_use, (sec, pwd), timeout=10000)
|
||||
while not wl.isconnected():
|
||||
machine.idle() # save power while waiting
|
||||
print("Connected to "+net_to_use+" with IP address:" + wl.ifconfig()[0])
|
||||
|
||||
except Exception as e:
|
||||
print("Failed to connect to any known network, going into AP mode")
|
||||
wl.init(mode=WLAN.AP, ssid=original_ssid, auth=original_auth, channel=6, antenna=WLAN.INT_ANT)
|
||||
```
|
||||
|
||||
## Domoticz Wrapper (`domoticz.py`)
|
||||
|
||||
```python
|
||||
|
||||
import socket
|
||||
class Domoticz:
|
||||
|
||||
def __init__(self, ip, port, basic):
|
||||
self.basic = basic
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
|
||||
def setLight(self, idx, command):
|
||||
return self.sendRequest("type=command¶m=switchlight&idx="+idx+"&switchcmd="+command)
|
||||
|
||||
def setVariable(self, name, value):
|
||||
return self.sendRequest("type=command¶m=updateuservariable&vtype=0&vname="+name+"&vvalue="+value)
|
||||
|
||||
def sendRequest(self, path):
|
||||
try:
|
||||
s = socket.socket()
|
||||
s.connect((self.ip,self.port))
|
||||
s.send(b"GET /json.htm?"+path+" HTTP/1.1\r\nHost: pycom.io\r\nAuthorization: Basic "+self.basic+"\r\n\r\n")
|
||||
status = str(s.readline(), 'utf8')
|
||||
code = status.split(" ")[1]
|
||||
s.close()
|
||||
return code
|
||||
|
||||
except Exception:
|
||||
print("HTTP request failed")
|
||||
return 0
|
||||
```
|
||||
|
||||
156
content/tutorials/hardware/rmt.md
Normal file
156
content/tutorials/hardware/rmt.md
Normal file
@@ -0,0 +1,156 @@
|
||||
---
|
||||
title: "RMT"
|
||||
aliases:
|
||||
- tutorials/all/rmt.html
|
||||
- tutorials/all/rmt.md
|
||||
- chapter/tutorials/all/rmt
|
||||
---
|
||||
|
||||
Detailed information about this class can be found in [`RMT`](/firmwareapi/pycom/machine/rmt).
|
||||
|
||||
The RMT (Remote Control) peripheral of the ESP32 is primarily designed to send and receive infrared remote control signals that use on-off-keying of a carrier frequency, but due to its design it can be used to generate various types of signals, this class will allow you to do this.
|
||||
|
||||
The RMT has 7 channels, of which 5 are available and can be mapped to any GPIO pin (_Note:_ Pins `P13` -`P18` can only be used as inputs).
|
||||
|
||||
| Channel | Resolution | Maximum Pulse Width |
|
||||
| :--- | :--- | :--- |
|
||||
| 0 | Used by on-board LED | |
|
||||
| 1 | Used by `pycom.pulses_get()` | |
|
||||
| 2 | 100nS | 3.2768 ms |
|
||||
| 3 | 100nS | 3.2768 ms |
|
||||
| 4 | 1000nS | 32.768 ms |
|
||||
| 5 | 1000nS | 32.768 ms |
|
||||
| 6 | 3125nS | 102.4 ms |
|
||||
| 7 | 3125nS | 102.4 ms |
|
||||
|
||||
## Transmitting
|
||||
|
||||
The following examples create an RMT object on channel 4, configure it for transmission and send some data in various forms. The resolution of channel 4 is 1000 nano seconds, the given values are interpreted accordingly.
|
||||
|
||||
In this first example, we define the signal as a tuple of binary values that define the shape of the desired signal along with the duration of a bit.
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# Produces the pattern shown in data, where each bit lasts
|
||||
# duration * channel resolution = 10000 * 1000ns = 10ms
|
||||
data = (1,0,1,1,1,0,1,0,1)
|
||||
duration = 10000
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
In this example we define the signal by a tuple of durations and what state the signal starts in.
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# The list of durations for each pulse to be, these are in units of the channels
|
||||
# resolution:
|
||||
# duration = Desired pulse length / Channel Resolution
|
||||
duration = (8000,11000,8000,11000,6000,13000,6000,3000,8000)
|
||||
|
||||
# `start_level` defines if the signal starts off as LOW or HIGH, it will then
|
||||
# toggle state between each duration
|
||||
rmt.pulses_send(duration, start_level=RMT.HIGH)
|
||||
```
|
||||
|
||||

|
||||
|
||||
This third example, is a combination of the above two styles of defining a signal. Each pulse has a defined duration as well as a state. This is useful if you don't always want the signal to toggle state.
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
# Map RMT channel 4 to P21, when the RMT is idle, it will output LOW
|
||||
rmt = RMT(channel=4, gpio="P21", tx_idle_level=RMT.LOW)
|
||||
|
||||
# Produces the pattern shown in data, where each bit lasts
|
||||
# duration[i] * channel resolution = duration[i] * 1000ns
|
||||
data = (1,0,1,1,0,1)
|
||||
duration = (400,200,100,300,200,400)
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
The following example creates an RMT object on channel 4 and configures it for transmission with carrier modulation.
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
rmt = RMT(channel=4,
|
||||
gpio="P21",
|
||||
tx_idle_level=RMT.LOW,
|
||||
# Carrier = 100Hz, 80% duty, modules HIGH signals
|
||||
tx_carrier = (100, 70, RMT.HIGH))
|
||||
data = (1,0,1)
|
||||
duration = 10000
|
||||
rmt.pulses_send(duration, data)
|
||||
```
|
||||
|
||||

|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving, then waits for the first, undefined number of pulses without timeout
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
rmt = machine.RMT(channel=2)
|
||||
rmt.init(gpio="P21", rx_idle_threshold=1000)
|
||||
|
||||
data = rmt.pulses_get()
|
||||
```
|
||||
|
||||
|
||||
|
||||
{{% hint style="danger" %}}
|
||||
If tx_idle_level is not set to the opposite of the third value in the tx_carrier tuple, the carrier wave will continue to be generated when the RMT channel is idle.
|
||||
{{% /hint %}}
|
||||
|
||||
## Receiving
|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving a undefined number of pulses, then waits maximum of 1000us for the first pulse.
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
# Sets RMT channel 2 to P21 and sets the maximum length of a valid pulse to
|
||||
# 1000*channel resolution = 1000 * 100ns = 100us
|
||||
rmt = machine.RMT(channel=2, gpio="P21", rx_idle_threshold=1000)
|
||||
rmt.init()
|
||||
|
||||
# Get a undefined number of pulses, waiting a maximum of 500us for the first
|
||||
# pulse (unlike other places where the absolute duration was based on the RMT
|
||||
# channels resolution, this value is in us) until a pulse longer than
|
||||
# rx_idle_threshold occurs.
|
||||
data = rmt.pulses_get(timeout=500)
|
||||
```
|
||||
|
||||
The following example creates an RMT object on channel 2, configures it for receiving, filters out pulses with width < 20\*100 nano seconds, then waits for 100 pulses
|
||||
|
||||
```python
|
||||
|
||||
from machine import RMT
|
||||
|
||||
rmt = machine.RMT(channel=2, # Resolution = 100ns
|
||||
gpio="P21",
|
||||
# Longest valid pulse = 1000*100ns = 100us
|
||||
rx_idle_threshold=1000,
|
||||
# Filter out pulses shorter than 20*100ns = 2us
|
||||
rx_filter_threshold=20)
|
||||
|
||||
# Receive 100 pulses, pulses shorter than 2us or longer than 100us will be
|
||||
# ignored. That means if it receives 80 valid pulses but then the signal
|
||||
# doesn't change for 10 hours and then 20 more pulses occur, this function
|
||||
# will wait for 10h
|
||||
data = rmt.pulses_get(pulses=100)
|
||||
```
|
||||
|
||||
36
content/tutorials/hardware/threading.md
Normal file
36
content/tutorials/hardware/threading.md
Normal file
@@ -0,0 +1,36 @@
|
||||
---
|
||||
title: "Threading"
|
||||
aliases:
|
||||
- tutorials/all/threading.html
|
||||
- tutorials/all/threading.md
|
||||
- chapter/tutorials/all/threading
|
||||
---
|
||||
|
||||
MicroPython supports spawning threads by the `_thread` module. The following example demonstrates the use of this module. A thread is simply defined as a function that can receive any number of parameters. Below 3 threads are started, each one perform a print at a different interval.
|
||||
|
||||
```python
|
||||
|
||||
import _thread
|
||||
import time
|
||||
|
||||
def th_func(delay, id):
|
||||
while True:
|
||||
time.sleep(delay)
|
||||
print('Running thread %d' % id)
|
||||
|
||||
for i in range(3):
|
||||
_thread.start_new_thread(th_func, (i + 1, i))
|
||||
```
|
||||
|
||||
## Using Locks:
|
||||
|
||||
```python
|
||||
|
||||
import _thread
|
||||
|
||||
a_lock = _thread.allocate_lock()
|
||||
|
||||
with a_lock:
|
||||
print("a_lock is locked while this executes")
|
||||
```
|
||||
|
||||
60
content/tutorials/hardware/timers.md
Normal file
60
content/tutorials/hardware/timers.md
Normal file
@@ -0,0 +1,60 @@
|
||||
---
|
||||
title: "Timers"
|
||||
aliases:
|
||||
- tutorials/all/timers.html
|
||||
- tutorials/all/timers.md
|
||||
- chapter/tutorials/all/timers
|
||||
---
|
||||
|
||||
Detailed information about this class can be found in [`Timer`](/firmwareapi/pycom/machine/timer).
|
||||
|
||||
## Chronometer
|
||||
|
||||
The Chronometer can be used to measure how much time has elapsed in a block of code. The following example uses a simple stopwatch.
|
||||
|
||||
```python
|
||||
|
||||
from machine import Timer
|
||||
import time
|
||||
|
||||
chrono = Timer.Chrono()
|
||||
|
||||
chrono.start()
|
||||
time.sleep(1.25) # simulate the first lap took 1.25 seconds
|
||||
lap = chrono.read() # read elapsed time without stopping
|
||||
time.sleep(1.5)
|
||||
chrono.stop()
|
||||
total = chrono.read()
|
||||
|
||||
print()
|
||||
print("\nthe racer took %f seconds to finish the race" % total)
|
||||
print(" %f seconds in the first lap" % lap)
|
||||
print(" %f seconds in the last lap" % (total - lap))
|
||||
```
|
||||
|
||||
## Alarm
|
||||
|
||||
The Alarm can be used to get interrupts at a specific interval. The following code executes a callback every second for 10 seconds.
|
||||
|
||||
```python
|
||||
|
||||
from machine import Timer
|
||||
|
||||
class Clock:
|
||||
|
||||
def __init__(self):
|
||||
self.seconds = 0
|
||||
self.__alarm = Timer.Alarm(self._seconds_handler, 1, periodic=True)
|
||||
|
||||
def _seconds_handler(self, alarm):
|
||||
self.seconds += 1
|
||||
print("%02d seconds have passed" % self.seconds)
|
||||
if self.seconds == 10:
|
||||
alarm.callback(None) # stop counting after 10 seconds
|
||||
|
||||
clock = Clock()
|
||||
```
|
||||
|
||||
{{% hint style="info" %}}
|
||||
There are no restrictions to what can be done in an interrupt. For example, it is possible to even do network requests with an interrupt. However, it is important to keep in mind that interrupts are handled sequentially, so it's good practice to keep them short. More information can be found in [`Interrupt Handling`](/firmwareapi/notes#interrupt-handling).
|
||||
{{% /hint %}}
|
||||
86
content/tutorials/hardware/touch.md
Normal file
86
content/tutorials/hardware/touch.md
Normal file
@@ -0,0 +1,86 @@
|
||||
---
|
||||
title: "Touch"
|
||||
aliases:
|
||||
- tutorials/all/touch.html
|
||||
- tutorials/all/touch.md
|
||||
---
|
||||
|
||||
Example of how to use the Touch class of the Machine module:
|
||||
|
||||
```python
|
||||
|
||||
from machine import Touch
|
||||
from machine import Timer
|
||||
import time
|
||||
|
||||
|
||||
class TouchPad:
|
||||
|
||||
def __init__(self, pin, name):
|
||||
self.touch = Touch(pin)
|
||||
self.last_press = 0
|
||||
self.name = name
|
||||
self.pressed = False
|
||||
|
||||
def ispressed(self):
|
||||
if self.touch.read() < (self.touch.init_value() * 2 / 3):
|
||||
self.pressed = True
|
||||
else:
|
||||
self.pressed = False
|
||||
return self.pressed
|
||||
|
||||
def just_pressed(self):
|
||||
now = time.ticks_ms()
|
||||
if now - self.last_press < 500:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def set_press(self):
|
||||
self.last_press = time.ticks_ms()
|
||||
|
||||
|
||||
class TouchController:
|
||||
|
||||
def __init__(self, pads):
|
||||
self.pads = pads
|
||||
for pad in self.pads:
|
||||
pad.touch.init_value(1500)
|
||||
|
||||
def check(self, alarm):
|
||||
for pad in self.pads:
|
||||
if pad.ispressed() and not pad.just_pressed():
|
||||
pad.set_press()
|
||||
if pad.name == 'Right':
|
||||
if tleft.just_pressed():
|
||||
print('Swipe right')
|
||||
else:
|
||||
print('{} pressed'.format(pad.name))
|
||||
elif pad.name == 'Left':
|
||||
if tright.just_pressed():
|
||||
print('Swipe left')
|
||||
else:
|
||||
print('{} pressed'.format(pad.name))
|
||||
elif pad.name == 'Up':
|
||||
if tbott.just_pressed():
|
||||
print('Swipe up')
|
||||
else:
|
||||
print('{} pressed'.format(pad.name))
|
||||
elif pad.name == 'Bottom':
|
||||
if tupp.just_pressed():
|
||||
print('Swipe down')
|
||||
else:
|
||||
print('{} pressed'.format(pad.name))
|
||||
|
||||
|
||||
tleft = TouchPad('P4', 'Left')
|
||||
tright = TouchPad('P8', 'Right')
|
||||
tbott = TouchPad('P9', 'Bottom')
|
||||
tupp = TouchPad('P23', 'Up')
|
||||
|
||||
# initialize the touch controller
|
||||
touch_controller = TouchController(pads=[tleft, tright, tbott, tupp])
|
||||
|
||||
# enable the alarm to check the status
|
||||
Timer.Alarm(touch_controller.check, ms=10, periodic=True)
|
||||
```
|
||||
Reference in New Issue
Block a user