mirror of
https://github.com/sascha-hemi/pycom-documentation.git
synced 2026-03-21 23:06:15 +01:00
247 lines
6.7 KiB
Markdown
247 lines
6.7 KiB
Markdown
# Onewire Driver
|
|
|
|
This tutorial explains how to connect and read data from a DS18x20 temperature sensor. The onewire library is also available at the [pycom-libraries](https://github.com/pycom/pycom-libraries/tree/master/lib/onewire) GitHub Repository.
|
|
|
|
## Basic usage
|
|
|
|
```python
|
|
import time
|
|
from machine import Pin
|
|
from onewire import DS18X20
|
|
from onewire import OneWire
|
|
|
|
# DS18B20 data line connected to pin P10
|
|
ow = OneWire(Pin('P10'))
|
|
temp = DS18X20(ow)
|
|
|
|
while True:
|
|
print(temp.read_temp_async())
|
|
time.sleep(1)
|
|
temp.start_conversion()
|
|
time.sleep(1)
|
|
```
|
|
|
|
## Library
|
|
|
|
```python
|
|
#!/usr/bin/env python3
|
|
|
|
"""
|
|
OneWire library for MicroPython
|
|
"""
|
|
|
|
import time
|
|
import machine
|
|
|
|
class OneWire:
|
|
CMD_SEARCHROM = const(0xf0)
|
|
CMD_READROM = const(0x33)
|
|
CMD_MATCHROM = const(0x55)
|
|
CMD_SKIPROM = const(0xcc)
|
|
|
|
def __init__(self, pin):
|
|
self.pin = pin
|
|
self.pin.init(pin.OPEN_DRAIN, pin.PULL_UP)
|
|
|
|
def reset(self):
|
|
"""
|
|
Perform the onewire reset function.
|
|
Returns True if a device asserted a presence pulse, False otherwise.
|
|
"""
|
|
sleep_us = time.sleep_us
|
|
disable_irq = machine.disable_irq
|
|
enable_irq = machine.enable_irq
|
|
pin = self.pin
|
|
|
|
pin(0)
|
|
sleep_us(480)
|
|
i = disable_irq()
|
|
pin(1)
|
|
sleep_us(60)
|
|
status = not pin()
|
|
enable_irq(i)
|
|
sleep_us(420)
|
|
return status
|
|
|
|
def read_bit(self):
|
|
sleep_us = time.sleep_us
|
|
enable_irq = machine.enable_irq
|
|
pin = self.pin
|
|
|
|
pin(1) # half of the devices don't match CRC without this line
|
|
i = machine.disable_irq()
|
|
pin(0)
|
|
sleep_us(1)
|
|
pin(1)
|
|
sleep_us(1)
|
|
value = pin()
|
|
enable_irq(i)
|
|
sleep_us(40)
|
|
return value
|
|
|
|
def read_byte(self):
|
|
value = 0
|
|
for i in range(8):
|
|
value |= self.read_bit() << i
|
|
return value
|
|
|
|
def read_bytes(self, count):
|
|
buf = bytearray(count)
|
|
for i in range(count):
|
|
buf[i] = self.read_byte()
|
|
return buf
|
|
|
|
def write_bit(self, value):
|
|
sleep_us = time.sleep_us
|
|
pin = self.pin
|
|
|
|
i = machine.disable_irq()
|
|
pin(0)
|
|
sleep_us(1)
|
|
pin(value)
|
|
sleep_us(60)
|
|
pin(1)
|
|
sleep_us(1)
|
|
machine.enable_irq(i)
|
|
|
|
def write_byte(self, value):
|
|
for i in range(8):
|
|
self.write_bit(value & 1)
|
|
value >>= 1
|
|
|
|
def write_bytes(self, buf):
|
|
for b in buf:
|
|
self.write_byte(b)
|
|
|
|
def select_rom(self, rom):
|
|
"""
|
|
Select a specific device to talk to. Pass in rom as a bytearray (8 bytes).
|
|
"""
|
|
self.reset()
|
|
self.write_byte(CMD_MATCHROM)
|
|
self.write_bytes(rom)
|
|
|
|
def crc8(self, data):
|
|
"""
|
|
Compute CRC
|
|
"""
|
|
crc = 0
|
|
for i in range(len(data)):
|
|
byte = data[i]
|
|
for b in range(8):
|
|
fb_bit = (crc ^ byte) & 0x01
|
|
if fb_bit == 0x01:
|
|
crc = crc ^ 0x18
|
|
crc = (crc >> 1) & 0x7f
|
|
if fb_bit == 0x01:
|
|
crc = crc | 0x80
|
|
byte = byte >> 1
|
|
return crc
|
|
|
|
def scan(self):
|
|
"""
|
|
Return a list of ROMs for all attached devices.
|
|
Each ROM is returned as a bytes object of 8 bytes.
|
|
"""
|
|
devices = []
|
|
diff = 65
|
|
rom = False
|
|
for i in range(0xff):
|
|
rom, diff = self._search_rom(rom, diff)
|
|
if rom:
|
|
devices += [rom]
|
|
if diff == 0:
|
|
break
|
|
return devices
|
|
|
|
def _search_rom(self, l_rom, diff):
|
|
if not self.reset():
|
|
return None, 0
|
|
self.write_byte(CMD_SEARCHROM)
|
|
if not l_rom:
|
|
l_rom = bytearray(8)
|
|
rom = bytearray(8)
|
|
next_diff = 0
|
|
i = 64
|
|
for byte in range(8):
|
|
r_b = 0
|
|
for bit in range(8):
|
|
b = self.read_bit()
|
|
if self.read_bit():
|
|
if b: # there are no devices or there is an error on the bus
|
|
return None, 0
|
|
else:
|
|
if not b: # collision, two devices with different bit meaning
|
|
if diff > i or ((l_rom[byte] & (1 << bit)) and diff != i):
|
|
b = 1
|
|
next_diff = i
|
|
self.write_bit(b)
|
|
if b:
|
|
r_b |= 1 << bit
|
|
i -= 1
|
|
rom[byte] = r_b
|
|
return rom, next_diff
|
|
|
|
class DS18X20(object):
|
|
def __init__(self, onewire):
|
|
self.ow = onewire
|
|
self.roms = [rom for rom in self.ow.scan() if rom[0] == 0x10 or rom[0] == 0x28]
|
|
|
|
def isbusy(self):
|
|
"""
|
|
Checks wether one of the DS18x20 devices on the bus is busy
|
|
performing a temperature conversion
|
|
"""
|
|
return not self.ow.read_bit()
|
|
|
|
def start_conversion(self, rom=None):
|
|
"""
|
|
Start the temp conversion on one DS18x20 device.
|
|
Pass the 8-byte bytes object with the ROM of the specific device you want to read.
|
|
If only one DS18x20 device is attached to the bus you may omit the rom parameter.
|
|
"""
|
|
rom = rom or self.roms[0]
|
|
ow = self.ow
|
|
ow.reset()
|
|
ow.select_rom(rom)
|
|
ow.write_byte(0x44) # Convert Temp
|
|
|
|
def read_temp_async(self, rom=None):
|
|
"""
|
|
Read the temperature of one DS18x20 device if the conversion is complete,
|
|
otherwise return None.
|
|
"""
|
|
if self.isbusy():
|
|
return None
|
|
rom = rom or self.roms[0]
|
|
ow = self.ow
|
|
ow.reset()
|
|
ow.select_rom(rom)
|
|
ow.write_byte(0xbe) # Read scratch
|
|
data = ow.read_bytes(9)
|
|
return self.convert_temp(rom[0], data)
|
|
|
|
def convert_temp(self, rom0, data):
|
|
"""
|
|
Convert the raw temperature data into degrees celsius and return as a fixed point with 2 decimal places.
|
|
"""
|
|
temp_lsb = data[0]
|
|
temp_msb = data[1]
|
|
if rom0 == 0x10:
|
|
if temp_msb != 0:
|
|
# convert negative number
|
|
temp_read = temp_lsb >> 1 | 0x80 # truncate bit 0 by shifting, fill high bit with 1.
|
|
temp_read = -((~temp_read + 1) & 0xff) # now convert from two's complement
|
|
else:
|
|
temp_read = temp_lsb >> 1 # truncate bit 0 by shifting
|
|
count_remain = data[6]
|
|
count_per_c = data[7]
|
|
temp = 100 * temp_read - 25 + (count_per_c - count_remain) // count_per_c
|
|
return temp
|
|
elif rom0 == 0x28:
|
|
return (temp_msb << 8 | temp_lsb) * 100 // 16
|
|
else:
|
|
assert False
|
|
```
|
|
|