123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- #!/usr/bin/env python3
- #
- # Read values from DHT11/22 to I2C converter
- #
- # Copyright (c) 2018 Michael Buesch <m@bues.ch>
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License along
- # with this program; if not, write to the Free Software Foundation, Inc.,
- # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- #
- import sys
- import time
- import getopt
- import smbus
- class DHT_via_I2C(object):
- """Read a DHT11/22 sensor via I2C.
- """
- # Device types.
- DHT11 = 0
- DHT22 = 1
- DEFAULT_BUS = 1
- DEFAULT_ADDR = 0x76
- def __init__(self,
- devType=DHT11,
- i2cBus=DEFAULT_BUS,
- i2cAddr=DEFAULT_ADDR):
- self.__devType = devType
- self.__seqCount = -1
- self.__temp = 0.0
- self.__hum = 0.0
- self.__i2cAddr = i2cAddr
- self.__i2c = smbus.SMBus(i2cBus)
- def __crc(self, data):
- crc = 0
- for d in data:
- crc ^= d
- for i in range(8):
- if crc & 0x80:
- crc = (crc << 1) & 0xFF
- crc ^= 0x07
- else:
- crc = (crc << 1) & 0xFF
- return crc & 0xFF
- def update(self):
- """Try to get new temperature and humidity values.
- Returns True, if new values have been fetched.
- """
- # Read the data from I2C.
- try:
- data = self.__i2c.read_i2c_block_data(
- self.__i2cAddr,
- self.__devType, 7)
- except OSError as e:
- return False
- # Extract and check the I2C data.
- seqCount = data[0]
- dhtData = data[1:-1]
- dataCRC = data[-1]
- calcCRC = self.__crc(data[0:-1])
- if dataCRC != calcCRC:
- return False
- if seqCount == self.__seqCount:
- return False
- # Extract and check the DHT11/22 data.
- hum = (dhtData[0] << 8) | (dhtData[1])
- temp = (dhtData[2] << 8) | (dhtData[3])
- calcChecksum = sum(dhtData[0:-1]) & 0xFF
- dataChecksum = dhtData[-1]
- if dataChecksum != calcChecksum:
- return False
- # Convert the temperature and humidity values.
- if self.__devType == self.DHT11:
- divisor = 256.0
- tempFactor = 1.0
- else:
- divisor = 10.0
- if temp & 0x8000:
- temp &= 0x7FFF
- tempFactor = -1.0
- else:
- tempFactor = 1.0
- self.__hum = float(hum) / divisor
- self.__temp = (float(temp) / divisor) * tempFactor
- self.__seqCount = seqCount
- return True
- @property
- def tempCelsius(self):
- """Get the measured temperature, in Celcius.
- """
- return self.__temp
- @property
- def tempFahrenheit(self):
- """Get the measured temperature, in Fahrenheit.
- """
- return self.tempCelsius * 1.8 + 32
- @property
- def humidityPercent(self):
- """Get the measured humidity, in percent.
- """
- return self.__hum
- def usage():
- print("Usage: dhtget [OPTIONS]")
- print()
- print(" -1|--dht11 The connected device is a DHT11 (default).")
- print(" -2|--dht22 The connected device is a DHT22.")
- print(" -l|--loop COUNT Run in a loop.")
- print(" If count is negative, run an infinite loop.")
- print(" Default: Run once.")
- print(" -c|--csv Print output as CSV.")
- print(" -H|--no-header Do not print CSV header.")
- print(" -f|--fahrenheit Use Fahrenheit instead of Celsius.")
- print(" -b|--i2c-bus BUS Use I2C bus number BUS. Default: 1.")
- print(" -a|--i2c-addr ADDR Use I2C address ADDR. Default: 0x76.")
- print(" -h|--help Print this help text.")
- def main(argv):
- opt_devType = DHT_via_I2C.DHT11
- opt_loop = 1
- opt_csv = False
- opt_fahrenheit = False
- opt_header = True
- opt_i2cBus = DHT_via_I2C.DEFAULT_BUS
- opt_i2cAddr = DHT_via_I2C.DEFAULT_ADDR
- try:
- opts, args = getopt.getopt(argv[1:],
- "h12l:cHfb:a:",
- [ "help",
- "dht11", "dht22",
- "loop=",
- "csv", "no-header",
- "fahrenheit",
- "i2c-bus=",
- "i2c-addr=", ])
- except getopt.GetoptError as e:
- print(str(e), file=sys.stderr)
- return 1
- for o, v in opts:
- if o in ("-h", "--help"):
- usage()
- return 0
- if o in ("-1", "--dht11"):
- opt_devType = DHT_via_I2C.DHT11
- if o in ("-2", "--dht22"):
- opt_devType = DHT_via_I2C.DHT22
- if o in ("-l", "--loop"):
- try:
- opt_loop = int(v, 0)
- except ValueError as e:
- print("Invalid -l|--loop value.", file=sys.stderr)
- return 1
- if o in ("-c", "--csv"):
- opt_csv = True
- if o in ("-H", "--no-header"):
- opt_header = False
- if o in ("-f", "--fahrenheit"):
- opt_fahrenheit = True
- if o in ("-b", "--i2c-bus"):
- try:
- opt_i2cBus = int(v, 0)
- except ValueError as e:
- print("Invalid -b|--i2c-bus value.", file=sys.stderr)
- return 1
- if o in ("-a", "--i2c-addr"):
- try:
- opt_i2cAddr = int(v, 0)
- except ValueError as e:
- print("Invalid -a|--i2c-addr value.", file=sys.stderr)
- return 1
- try:
- dht = DHT_via_I2C(devType=opt_devType,
- i2cBus=opt_i2cBus,
- i2cAddr=opt_i2cAddr)
- except OSError as e:
- print("Failed to establish DHT-I2C communication:\n%s" % str(e),
- file=sys.stderr)
- return 1
- tempUnit = "F" if opt_fahrenheit else "*C"
- if opt_csv and opt_header:
- print("Temperature %s;Humidity %%" % tempUnit)
- count = opt_loop
- stream = sys.stdout
- while count != 0:
- if dht.update():
- temp = dht.tempFahrenheit if opt_fahrenheit else dht.tempCelsius
- hum = dht.humidityPercent
- if opt_csv:
- stream.write("%.1f;%.1f\n" % (temp, hum))
- else:
- stream.write("Temperature: %.1f %s\n" % (
- temp, tempUnit))
- stream.write("Humidity: %.1f %%\n" % hum)
- stream.flush()
- if count > 0:
- count -= 1
- time.sleep(0.1)
- return 0
- try:
- sys.exit(main(sys.argv))
- except KeyboardInterrupt as e:
- sys.exit(0)
|