|
|
Le but de cette page est d'expliquer comment dialoguer depuis python avec une station météo WS3500 ou WS3650. |
|
|
|
préambule :
Tout d'abord je ne souhaite pas remercier La Crosse Technologie qui n'ont pas souhaité communiquer le protocole de communication avec leur station météo.
Ensuite, je ne suis évidemment pas responsable des effets non désirables de l'utilisation de mes programmes... :p
Enfin, heureusement qu'il y a déjà des gens qui se sont penchés sur le problème pour me faciliter grandement la tâche, tout particulièrement les membres du projet open3600.Ce projet a le mérite d'être libre, open source et multi plateforme (linux et windows) mais il a le gros inconvénient d'avoir des exécutables différents et difficilement intégrables dans d'autres applications, sans compter la lisibilité du langage C qui n'a rien à voir avec celle de mon langage fétiche, python !
|
Description du protocole |
La station météo WS3500/WS3650 communique via le port série (RS232 pour les pros) :
- Vitesse : 300 (c'est un peu lent mais bon...)
- Bits de données : 8
- Bits de stop : 1
- Parité : aucune
- Protocole : aucun (propriétaire pfff...)
- Timeout : aucun
|
Résumé du protocole |
Le protocole de communication se résume ainsi :
- Initialisation de la communication
- Envoi de l'adresse
- Envoi de la commande
- Lecture de la réponse
|
Initalisation |
L'initialisation se passe ainsi :
- On écrit 2000 "U" (va savoir pourquoi mais c'est comme ça...)
- DTR = 0
- RTS = 0
- On attend un laps de temps que le DSR passe à 1
- On attend un laps de temps que le DSR passe à 0
- RTS = 1
- DTR = 1
- On écrit 2000 "U" (rebelotte...)
|
Commandes |
De ce qu'on sait, il y a 2 commandes :
a0 - donner l'adresse et écrire les éventuelles données à écrire dans la mémoire, la commande a0 est sur 3 octets + données à écrire + fin
1er octet - le code commande : a0
2e octet - octet fort de l'adresse
3e octet - octet faible de l'adresse
n octets - données à écrire dans la mémoire, après chaque octet écrit, la séquence suivante du signal doit être effectuée
- RTS=0,
- lire l'état de CTS - doit être à 1,
- DTR=0,
- DTR=1
m octets - fin de la séquence : DTR=0, RTS=0, RTS=1, DTR=1, RTS=0
a1 - lire la mémoire interne à partir de l'adresse mise par la commande a0 (même en cas d'écriture par la commande a0), la commande a1 est sur un octet (sans paramètre):
1 octet - le code commande : a1
après chaque octet lu (excepté le dernier) la séquence suivante doit être effectuée :
- envoyer le bit 0 à la station
- RTS=0
après le dernier octet lu, la séquence suivante doit être effectuée (marque de fin de lecture):
- RTS=1,
- DTR=0,
- RTS=0,
- RTS=1,
- DTR=1,
- RTS=0
|
Structure d'un enregistrement d'historique |
Un enregistrement d'historique tient sur 18 octets
de la manière suivante :
- minute (BCD)
- heure (BCD)
- jour (BCD)
- mois (BCD)
- année (BCD)
- température intérieure 1s et 0.1s (BCD)
- quartet fort de l'octet : température extérieure 0.1s (BCD), quartet faible de l'octet : température intérieure 10s (BCD)
- température extérieure 10s et 1s (BCD)
- pression absolue 1s et 0.1s (BCD)
- pression absolue 100s et 10s (BCD)
- quartet fort de l'octet : taux d'humidité intérieure 1s (BCD), quartet faible de l'octet : pression absolue 1000s (BCD)
- quartet fort de l'octet : taux d'humidité extérieure 1s (BCD), quartet faible de l'octet : taux d'humidité intérieure 10s (BCD)
- quartet fort de l'octet : quartet faible du compteur de précipitation (BIN), quartet faible de l'octet : taux d'humidité extérieure 10s (BCD)
- octet fort du compteur de précipitation (BIN)
- octet faible de la vitesse du vent (BIN)
- quartet fort de l'octet : direction du vent (BIN), quartet faible de l'octet : octet fort de la vitesse du vent (BIN)
- octet faible de la rafale (BIN)
- quartet fort de l'octet : ? (toujours à 0), quartet faible de l'octet : octet fort de la rafale (BIN)
Les températures sont en Celsius mais il faut ôter 40 (décimal) pour obtenir les bonnes valeurs.
Le nombre de précipitation peut être calculé en soustrayant les valeurs des deux enregistrements et en les multipliant par 0.518.
|
Description du signal dans la communication |
Signal |
Direction |
Description |
RxD |
|
Non utilisé |
TxD |
--> |
Signal pas clair, on l'utilise seulement à l'initialisation pour envoyer des caractères U (0x55) |
DTR |
--> |
Données de sortie |
RTS |
--> |
Horloge |
CTS |
<-- |
Données en entrée |
DSR |
<-- |
Utilisé seulement à l'initialisation |
|
|
|
Module nécessaire : |
- pyserial mais dommage qu'il faille le modifiier sous windows pour que la communication réussisse. Le protocole propriétaire de La Crosse Technologie n'étant pas standard, le module n'est pas utilisable en l'état.
|
Modification de la procédure "write" de la classe Win32Serial : |
def write(self, data): """Output the given string over the serial port.""" if not self.hComPort: raise portNotOpenError #~ if not isinstance(data, (bytes, bytearray)): #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview data = bytes(data) if data: n = win32.DWORD() err = win32.WriteFile(self.hComPort, data, len(data), ctypes.byref(n), self._overlappedWrite) if not err and win32.GetLastError() != win32.ERROR_IO_PENDING: raise SerialException("WriteFile failed (%s)" % ctypes.WinError()) return n.value else: return 0
|
La classe d'utilisation de la station : |
# -*- coding: iso-8859-1 -*-
import time import serial
DELAY_CONST = 0.001 MAXRETRIES = 10 HISTORY_BUFFER_ADR = 0x1A0 HISTORY_REC_NO = 1796 SETBIT = 0x12 UNSETBIT = 0x32
DEBUG = 0
def debug(s, level = 2, tab = 0): if DEBUG >= level: print " "*tab, s
def string2hex(s): return " ".join(map(lambda(x):hex(ord(x)), s))
class WS3500(): def __init__(self, ser): self.ser = ser self._init = False def _initialize(self): debug("initialize()", level=1) buff = "U"*2000
self.ser.write(buff)
debug("-DTR", tab=1) self.ser.setDTR(0) debug("-RTS", tab=1) self.ser.setRTS(0)
INIT_WAIT = 500 i = 0 while i < INIT_WAIT and not self.ser.getDSR(): i += 1 time.sleep(DELAY_CONST) if i == INIT_WAIT: debug("timeout 1", level=1) return 0 i = 0
time.sleep(DELAY_CONST) while i < INIT_WAIT and self.ser.getDSR(): i += 1 time.sleep(DELAY_CONST) if i != INIT_WAIT: debug("+RTS", tab=1) self.ser.setRTS(1) debug("+DTR", tab=1) self.ser.setDTR(1) else: debug("timeout 2", level=1) return 0 self.ser.write(buff)
self._init = True
return 1
def _read_safe(self, address, number): debug("read_safe(%i, %i)" % (address, number), level=1) readdata = "" readdata2 = "" nb_tests = 1 for j in range(MAXRETRIES): self._write_data(address, 0, None, tab=1) readdata = self._read_data(number, tab=1)
if nb_tests == 2: self._write_data(address, 0, None, tab=1) readdata2 = self._read_data(number, tab=1) if readdata == readdata2: if readdata == chr(0) * number or readdata == "": debug("--> only zeros", level=1, tab=1) self._init = False time.sleep(0.1) self._initialize() else: break else: debug("--> not identical", level=1, tab=1) else: if readdata == chr(0) * number or readdata == "": debug("--> only zeros", level=1, tab=1) else: break
if j == MAXRETRIES: return -1
return readdata
def _read_data(self, number, tab=0): debug("read_data(%i)" % number, level=1, tab=tab) command = 0xa1 readdata = ""
if self._write_byte(command, tab=tab+1): for i in range(number): b = self._read_byte(tab=tab+1) readdata = readdata + chr(b) if i + 1 < number: self._read_next_byte_seq(tab=tab+1) self._read_last_byte_seq(tab=tab+1)
debug("--> %s" % string2hex(readdata), level=1, tab=tab) return readdata else: debug("--> erreur read_data", level=1, tab=tab) return ""
def _write_data(self, address, number, writedata, tab=0): debug("write_data(%s, %i)" % (address, number), level=1, tab=tab) command = 0xa0 i = -1
self._write_byte(command, tab=tab+1) self._write_byte(address/256, tab=tab+1) self._write_byte(address%256, tab=tab+1)
if writedata != None: for i in range(number): self._write_byte(ord(writedata[i]), tab=tab+1)
debug("-DTR", tab=tab) self.ser.setDTR(0) debug("-RTS", tab=tab) self.ser.setRTS(0) debug("+RTS", tab=tab) self.ser.setRTS(1) debug("+DTR", tab=tab) self.ser.setDTR(1) debug("-RTS", tab=tab) self.ser.setRTS(0) return i
def _read_next_byte_seq(self, tab=0): debug("read_next_byte_seq()", level=1, tab=tab) self._write_bit(0) debug("-RTS", tab=tab) self.ser.setRTS(0)
def _read_last_byte_seq(self, tab=0): debug("read_last_byte_seq()", level=1, tab=tab) debug("+RTS", tab=tab) self.ser.setRTS(1) debug("-DTR", tab=tab) self.ser.setDTR(0) debug("-RTS", tab=tab) self.ser.setRTS(0) debug("+RTS", tab=tab) self.ser.setRTS(1) debug("+DTR", tab=tab) self.ser.setDTR(1) debug("-RTS", tab=tab) self.ser.setRTS(0)
def _read_bit(self, tab=0): debug("read_bit()", level=1, tab=tab) debug("-DTR", tab=tab) self.ser.setDTR(0) status = self.ser.getCTS() debug("CTS ->" + str(status), tab=tab) debug("+DTR", tab=tab) self.ser.setDTR(1)
debug("--> %i" % int(not status), level=1, tab=tab)
return int(not status)
def _write_bit(self, bit, tab=0): debug("write_bit(%i)" % bit, level=1, tab=tab) if bit: debug("->-RTS", tab=tab) self.ser.setRTS(0) else: debug("->+RTS", tab=tab) self.ser.setRTS(1) debug("-DTR", tab=tab) self.ser.setDTR(0) debug("+DTR", tab=tab) self.ser.setDTR(1)
def _read_byte(self, tab=0): debug("read_byte()", level=1, tab=tab) byte = 0
for i in range(8): byte *= 2 byte += self._read_bit(tab=tab+1)
debug("--> %i - %s" % (byte, hex(byte)), level=1, tab=tab)
return byte
def _write_byte(self, byte, tab=0): debug("write_byte(%i)" % byte, level=1, tab=tab) for i in range(8): self._write_bit(byte & 0x80, tab=tab+1) byte <<= 1 byte &= 0xff
debug("-RTS", tab=tab) self.ser.setRTS(0) status = self.ser.getCTS() debug("CTS->" + str(status), tab=tab) debug("-DTR", tab=tab) self.ser.setDTR(0) debug("+DTR", tab=tab) self.ser.setDTR(1) debug("---> %i" % int(status), level=1, tab=tab) if status: return 1 else: return 0
def conv_temp(self, data): return ((ord(data[1]) & 0xF) * 10 + (ord(data[0]) >> 4) + (ord(data[0]) & 0xF) / 10.0) - 40.0
def conv_datetime(self, data): minute = (ord(data[0]) >> 4) + (ord(data[1]) & 0xF) * 10 heure = (ord(data[1]) >> 4) + (ord(data[2]) & 0xF) * 10 jour = (ord(data[2]) >> 4) + (ord(data[3]) & 0xF) * 10 mois = (ord(data[3]) >> 4) + (ord(data[4]) & 0xF) * 10 annee = 2000 + (ord(data[4]) >> 4) + (ord(data[5]) & 0xF) * 10 return "%04i-%02i-%02i %02i:%02i" % (annee, mois, jour, heure, minute)
def conv_datetime2(self, data): minute = (ord(data[0]) >> 4) * 10 + (ord(data[0]) & 0xF) heure = (ord(data[1]) >> 4) * 10 + (ord(data[1]) & 0xF) jour = (ord(data[2]) >> 4) * 10 + (ord(data[2]) & 0xF) mois = (ord(data[3]) >> 4) * 10 + (ord(data[3]) & 0xF) annee = 2000 + (ord(data[4]) >> 4) * 10 + (ord(data[4]) & 0xF) return "%04i-%02i-%02i %02i:%02i" % (annee, mois, jour, heure, minute)
def getTemp(self, address, meteo_data = None): l = 2 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return self.conv_temp(data_read)
def getTempBCD(self, address, meteo_data = None): l = 2 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return (ord(data_read[1]) >> 4) * 10 + (ord(data_read[1] )& 0xF) + (ord(data_read[0]) >> 4) / 10.0 - 40.0
def getDateTime(self, address, meteo_data = None): l = 6 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return self.conv_datetime(data_read)
def getDateTime2(self, address, meteo_data = None): l = 5 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return self.conv_datetime2(data_read)
def getHumidity(self, address, meteo_data = None): l = 1 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return (ord(data_read[0]) >> 4) * 10 + (ord(data_read[0]) & 0xf)
def getPressure(self, address, meteo_data = None): l = 3 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return (ord(data_read[2]) & 0xF) * 1000 + (ord(data_read[1]) >> 4) * 100 + (ord(data_read[1]) & 0xF) * 10 + (ord(data_read[0]) >> 4) + (ord(data_read[0]) & 0xF) / 10.0 def temp_int(self, meteo_data = None): address = 0x26
return self.getTemp(address, meteo_data)
def temp_ext(self, meteo_data = None): address = 0x3D
return self.getTemp(address, meteo_data)
def temperature_indoor_minmax(self, meteo_data = None): address_min = 0x28 address_max = 0x2B address_mintime = 0x2C address_maxtime = 0x31
data = [] data.append(self.getTempBCD(address_min, meteo_data)) data.append(self.getTemp(address_max, meteo_data)) data.append(self.getDateTime(address_mintime, meteo_data)) data.append(self.getDateTime(address_maxtime, meteo_data))
return data
def temperature_outdoor_minmax(self, meteo_data = None): address_min = 0x3F address_max = 0x42 address_mintime = 0x43 address_maxtime = 0x48
data = [] data.append(self.getTempBCD(address_min, meteo_data)) data.append(self.getTemp(address_max, meteo_data)) data.append(self.getDateTime(address_mintime, meteo_data)) data.append(self.getDateTime(address_maxtime, meteo_data))
return data
def dewpoint(self, meteo_data = None): address = 0x6B
return self.getTemp(address, meteo_data)
def dewpoint_minmax(self, meteo_data = None): address_min = 0x6D address_max = 0x70 address_mintime = 0x71 address_maxtime = 0x76
data = [] data.append(self.getTempBCD(address_min, meteo_data)) data.append(self.getTemp(address_max, meteo_data)) data.append(self.getDateTime(address_mintime, meteo_data)) data.append(self.getDateTime(address_maxtime, meteo_data))
return data
def humidity_indoor(self, meteo_data = None): address = 0x81
return self.getHumidity(address, meteo_data)
def humidity_outdoor(self, meteo_data = None): address = 0x90
return self.getHumidity(address, meteo_data)
def humidity_indoor_minmax(self, meteo_data = None): address_min = 0x82 address_max = 0x83 address_mintime = 0x84 address_maxtime = 0x89
data = [] data.append(self.getHumidity(address_min, meteo_data)) data.append(self.getHumidity(address_max, meteo_data)) data.append(self.getDateTime2(address_mintime, meteo_data)) data.append(self.getDateTime2(address_maxtime, meteo_data))
return data
def humidity_outdoor_minmax(self, meteo_data = None): address_min = 0x91 address_max = 0x92 address_mintime = 0x93 address_maxtime = 0x98
data = [] data.append(self.getHumidity(address_min, meteo_data)) data.append(self.getHumidity(address_max, meteo_data)) data.append(self.getDateTime2(address_mintime, meteo_data)) data.append(self.getDateTime2(address_maxtime, meteo_data))
return data
def rel_pressure(self, meteo_data = None): address = 0x13D
return self.getPressure(address, meteo_data)
def rel_pressure_minmax(self, meteo_data = None): address_min = 0x156 address_max = 0x14C address_mintime = 0x160 address_maxtime = 0x15B
data = [] data.append(self.getPressure(address_min, meteo_data)) data.append(self.getPressure(address_max, meteo_data)) data.append(self.getDateTime2(address_mintime, meteo_data)) data.append(self.getDateTime2(address_maxtime, meteo_data))
return data
def abs_pressure(self, meteo_data = None): address = 0x138
return self.getPressure(address, meteo_data)
def abs_pressure_minmax(self, meteo_data = None): address_min = 0x5F6 address_max = 0x5F6 + 12 address_mintime = 0x61E address_maxtime = 0x61E + 5
data = [] data.append(self.getPressure(address_min, meteo_data)) data.append(self.getPressure(address_max, meteo_data)) data.append(self.getDateTime2(address_mintime, meteo_data)) data.append(self.getDateTime2(address_maxtime, meteo_data))
return data
def tendency_forecast(self, meteo_data = None): address = 0x24 l = 1 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
tendency_values = ["Constant", "Augmentant", "Diminuant"] forecast_values = ["Pluvieux", "Nuageux", "Ensoleillé"]
return tendency_values[ord(data_read[0]) >> 4], forecast_values[ord(data_read[0]) & 0xF]
def read_history_info(self): if not self._init: if not self._initialize(): return "" address = 0x6B2 - 2 l = 12 data_read = self._read_safe(address, l)
debug(string2hex(data_read), 1)
data = []
interval = (ord(data_read[1]) & 0xF) * 256 + ord(data_read[0]) + 1 data.append(interval) countdown = ord(data_read[2])*16 + (ord(data_read[1]) >> 4) + 1 data.append(countdown)
jour = (ord(data_read[0]) >> 4) * 10 + (ord(data_read[0]) & 0xF) mois = (ord(data_read[1]) >> 4) * 10 + (ord(data_read[1]) & 0xF) annee = 2000 + (ord(data_read[2]) >> 4) * 10 + (ord(data_read[2]) & 0xF) data.append((jour, mois, annee)) no_records = ord(data_read[9]) data.append(no_records)
return data
def read_history_record(self, data, record_no): record = data minute = ((ord(record[0]) & 0xf0) >> 4)* 10 + (ord(record[0]) & 0xf) if minute >= 60: return None heure = ((ord(record[1]) & 0xf0) >> 4) * 10 + (ord(record[1]) & 0xf) jour = ((ord(record[2]) & 0xf0) >> 4) * 10 + (ord(record[2]) & 0xf) mois = ((ord(record[3]) & 0xf0) >> 4) * 10 + (ord(record[3]) & 0xf) annee = 2000 + ((ord(record[4]) & 0xf0) >> 4) * 10 + (ord(record[4]) & 0xf)
temperature_indoor = ((ord(record[5]) & 0xf0) >> 4) * 10 + (ord(record[5]) & 0xf) temperature_indoor += (ord(record[6]) & 0xf) * 100 - 400 ##temperature_indoor /= 10.0 temperature_outdoor = (ord(record[6]) & 0xf0) >> 4 temperature_outdoor += ((ord(record[7]) & 0xf0) >> 4) * 100 + (ord(record[7]) & 0xf) * 10 - 400 ##temperature_outdoor /= 10.0 pressure = ((ord(record[8]) & 0xf0) >> 4) * 10 + (ord(record[8]) & 0xf) pressure += ((ord(record[9]) & 0xf0) >> 4) * 1000 + (ord(record[9]) & 0xf) * 100 pressure += (ord(record[10]) & 0xf) * 10000 ##pressure /= 10.0 humidity_indoor = (ord(record[10]) & 0xf0) >> 4 humidity_indoor += (ord(record[11]) & 0xf) * 10 humidity_outdoor = (ord(record[11]) & 0xf0) >> 4 humidity_outdoor += (ord(record[12]) & 0xf) * 10 raincount = ord(record[12]) >> 4 raincount += ord(record[13]) * 16 windspeed = (ord(record[15]) & 0xf) * 256 + ord(record[14]) ##windspeed /= 10.0 if (ord(record[17]) & 0xf) != 1 or ord(record[16]) != 0xFE: windgust = (ord(record[17]) & 0xf) * 256 + ord(record[16]) ##windgust /= 10.0 else: windgust = -1 winddir_degrees = ((ord(record[15]) & 0xF0)>> 4) * 22.5
return ["%04i-%02i-%02i %02i:%02i" % (annee, mois, jour, heure, minute), temperature_indoor, temperature_outdoor, pressure, humidity_indoor, humidity_outdoor, raincount, windspeed, winddir_degrees]
def read_history(self, delai_dernier_releve = 0): if not self._init: if not self._initialize(): return "" l = 0x7fff - HISTORY_BUFFER_ADR + 1 nb = 12 * 5 * 20 l = 18 * nb nb_enreg_max = 1750 address_max = HISTORY_BUFFER_ADR + nb_enreg_max * 18 - 1 address = HISTORY_BUFFER_ADR end_of_data = False data = [] j = 0 last_record = None while not end_of_data and j < MAXRETRIES: nb_ok = 0 j += 1 debug("read history from address : %s" % hex(address), 1) if l + address > address_max: l = address_max - address + 1 if l > 0: data_read = self._read_safe(address, l)
if data_read: for i in range(nb):##range(HISTORY_REC_NO): try: record = None record = self.read_history_record(data_read[i*18:(i+1)*18], i) except Exception, e: print "exception :", e, i, string2hex(data_read[i*18:(i+1)*18]) record = None if record is None: end_of_data = False break else: last_record = record nb_ok += 1 data.append(record)
address += 18 * nb_ok if last_record and delai_dernier_releve > 0 : try: if DateTime.DateTimeFrom(last_record[0]) + DateTime.RelativeDateTime(minutes = delai_dernier_releve) > DateTime.now(): end_of_data = True except: pass else: debug("no data read", 1) end_of_data = False
return data
def recording_interval(self, meteo_data = None): ## returned value by the station : ## 0 - 1 min ## 1 - 5 min ## 2 - 10 min ## 3 - 15 min ## 4 - 20 min ## 5 - 30 min ## 6 - 60 min ## 7 - 2 hours ## 8 - 3 hours ## 9 - 4 hours ## A - 6 hours ## B - 8 hours ## C - 12 hours ## D - 24 hours duree = {0 : 1, 1 : 5, 2 : 10, 3 : 15, 4 : 20, 5 : 30, 6 : 60, 7 : 120, 8 : 180, 9 : 240, 0xa : 360, 0xb : 480, 0xc : 720, 0xd : 1440} l = 1 address = 0xE3 if meteo_data is None: if not self._init: if not self._initialize(): return "" data_read = self._read_safe(address, l) else: data_read = meteo_data[address:address+l]
return duree[ord(data_read[0]) >> 4]
def load_main_data(self): if not self._init: if not self._initialize(): return "" address = 0x00 l = 0x16F data_read = self._read_safe(address, l)
return data_read
def light(self, active): ## Doesn't work if not self._init: if not self._initialize(): return "" address = 0x16 l = 1 if active: writedata = chr(SETBIT) else: writedata = chr(UNSETBIT) self._write_data(address, l, writedata, tab=1)
|
Utilisation de la classe : |
Comme test, affichons les données actuelles.
if __name__ == "__main__": ser = serial.Serial(baudrate=300, port=0, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=None, writeTimeout=None, interCharTimeout=None, rtscts=0, dsrdtr=None, xonxoff=0) try: ws = WS3500(ser) meteo_data = ws.load_main_data() if meteo_data: t_int = ws.temp_int(meteo_data) print "T° int :", t_int if t_int > 100: print string2hex(meteo_data) else: print "T° ext :", ws.temp_ext(meteo_data) l = ws.temperature_indoor_minmax(meteo_data) print "intérieur :" print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] l = ws.temperature_outdoor_minmax(meteo_data) print "extérieur :" print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] print "Point de rosée :", ws.dewpoint(meteo_data) l = ws.dewpoint_minmax(meteo_data) print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] print "Humidité int :", ws.humidity_indoor(meteo_data) print "Humidité ext :", ws.humidity_outdoor(meteo_data) l = ws.humidity_indoor_minmax(meteo_data) print "intérieur :" print "Humidité min :", l[0], " à ", l[2], "Humidité max :", l[1], " à ", l[3] l = ws.humidity_outdoor_minmax(meteo_data) print "extérieur :" print "Humidité min :", l[0], " à ", l[2], "Humidité max :", l[1], " à ", l[3] tendency, forecast = ws.tendency_forecast(meteo_data) tendency_values = ["Constant", "Augmentant", "Diminuant"] forecast_values = ["Pluvieux", "Nuageux", "Ensoleillé"] print "Prévisions :", tendency_values[tendency], "et", forecast_values[forecast] print "Prel int :", ws.rel_pressure(meteo_data) l = ws.rel_pressure_minmax(meteo_data) print "Prel min :", l[0], " à ", l[2], "Prel max :", l[1], " à ", l[3] print "Pabs int :", ws.abs_pressure(meteo_data) l = ws.abs_pressure_minmax() print "Pabs min :", l[0], " à ", l[2], "Pabs max :", l[1], " à ", l[3] interval = ws.recording_interval(meteo_data) print "Recording interval :", interval, "min" except serial.SerialException, e: print "SerialException :", e except Exception, e: print "Exception :", e finally: ser.close()
|
Utilisation sous linux : |
Avec le pinguoin, c'est encore plus simple, puisque le module pyserial est utilisable tel quel. |
|
|
Evolutions possibles : |
- Enregistrement des données à intervalle régulier dans une base de donnnées mysql par exemple
- Générations de graphiques à envoyer sur un site web
- Coupler à de la domotique, type x10, pour automatiser l'arrosage du jardin ou la fermeture des volets lorqu'il fait trop chaud, ou encore la fermeture de la piscine (pour les chanceux) quand la pression est trop basse
|
|
|
Pour enregistrer toutes les mesures prises par la station, rien de plus efficace qu'une vraie base de données qu'il faudra synchroniser régulièrement.
|
Création de la base de données "METEO" sous mysql avec une seule table "MESURE" qui contiendra tous les relevés météos. |
CREATE DATABASE IF NOT EXISTS METEO;
USE METEO;
DROP TABLE IF EXISTS `MESURE`;
CREATE TABLE `MESURE` (
`IdMesure` int(11) NOT NULL AUTO_INCREMENT,
`DateMesure` datetime DEFAULT NULL,
`PressionAbs` smallint(6) DEFAULT NULL,
`PressionRel` smallint(6) DEFAULT NULL,
`TemperatureInt` smallint(4) DEFAULT NULL,
`TemperatureExt` smallint(4) DEFAULT NULL,
`HumiditeInt` smallint(4) DEFAULT NULL,
`HumiditeExt` smallint(4) DEFAULT NULL,
`PointRosee` smallint(4) DEFAULT NULL,
PRIMARY KEY IdMesure,
UNIQUE KEY DateMesure,
INDEX DateMesure
);
|
Modules nécessaires : |
- MySQL for Python pour interfacer python avec MySQL
- mx.DateTime est une boîte à outils assez complète pour manipuler des dates et des heures
|
Module Utils.py d'interfaçage avec la base de données : |
# -*- coding: ISO-8859-1 -*-
import os from MySQLdb.connections import * from mx import DateTime import sys
## ## Définitions des classes de gestion des erreurs ##
class MyDBException(Exception): def __init__(self, message = '', e = None, req = ""): Exception.__init__(self) self.Message = message if e: self.Message += " Error : " + str(e) if req: self.Message += " Requête : " + str(req) ##print self.Message
class MyDBIntegrityException(MyDBException): def __init__(self, e = None, req = ""): MyDBException.__init__(self, "L'intégrité des données n'est pas respectée.", e, req)
class MyDBDataException(MyDBException): def __init__(self, e = None, req = ""): MyDBException.__init__(self, "Les données obligatoires ne sont pas renseignées.", e, req)
class MyDBInsertException(MyDBException): def __init__(self, e = None, req = ""): MyDBException.__init__(self, "Les données n'ont pas pu être ajoutées dans la base de données.", e, req)
class MyDBDeleteException(MyDBException): def __init__(self, e = None, req = ""): MyDBException.__init__(self, "Les données n'ont pas pu être supprimées dans la base de données.", e, req)
class MyDBUpdateException(MyDBException): def __init__(self, e = None, req = ""): MyDBException.__init__(self, "Les données n'ont pas pu être modifiées dans la base de données.", e, req)
def Date2Str(date): if date == None: return "" else: if type(date) != type(""): date = date.strftime("%Y-%m-%d") return date[-2:] + "/" + date[5:7] + "/" + date[:4]
def Bigint2Str(montant, nb_decimales = 2): if montant == None: return "0." + "0"*nb_decimales else: return str(montant).zfill(nb_decimales + 1)[:-nb_decimales] + "." + str(montant).zfill(nb_decimales + 1)[-nb_decimales:]
def Str2Bigint(value): if value.find(".") == -1: return int(value) * 100 elif value.find(".") == len(value)-2: return int(value.replace(".", "")) * 10 else: return int(value.replace(".", ""))
def SQLValue(pvalue, type_of_value): if type(pvalue) == type(unicode()): value = pvalue.encode("iso8859-1") else: value = pvalue if value == None: return "NULL" elif type_of_value == 'entier': return str(value) elif type_of_value == 'date_heure' and type(value) != type(''): return my_dbc.string_literal(value.strftime("%Y-%m-%d %H:%M:%S")) elif type_of_value == 'date' and type(value) != type(''): return my_dbc.string_literal(value.strftime("%Y-%m-%d")) elif type_of_value == 'heure' and type(value) != type(''): return my_dbc.string_literal(value.strftime("%H:%M:%S")) else: return my_dbc.string_literal(value)
## ## Recherche de l'adresse du serveur MySQL ##
def GetMySQLParameters(fichier_ini): mysql_host = "localhost" mysql_base = fichier_ini mysql_user = "root" mysql_password = "" try: import ConfigParser cp = ConfigParser.ConfigParser() f = open(os.getcwd() + os.sep + "%s.ini" % fichier_ini, "r") cp.readfp(f) if cp.has_section("server"): if cp.has_option("server", "host"): mysql_host = cp.get("server", "host") else: print "option 'host' is not present" if cp.has_option("server", "base"): mysql_base = cp.get("server", "base") if cp.has_option("server", "user"): mysql_user = cp.get("server", "user") if cp.has_option("server", "password"): mysql_password = cp.get("server", "password") else: print "section 'server' is not present" f.close() except: print "exception in reading '%s.ini'" % fichier_ini host = "localhost" return mysql_host, mysql_base, mysql_user, mysql_password
def GetAllFromMesure(where = "", order_by = ""): list_mesure = [] if my_dbc: c = my_dbc.cursor() req = "SELECT PressionAbs, TemperatureExt, HumiditeInt, IdMesure, TemperatureInt, HumiditeExt, PressionRel, DateMesure, PointRosee FROM mesure" if where: req += " WHERE " + where if order_by: req += " ORDER BY " + order_by try: r = c.execute(req) if r: results = c.fetchall() for result in results: list_mesure.append(Mesure(pressionabs = result[0], temperatureext = result[1], humiditeint = result[2], id = result[3], temperatureint = result[4], humiditeext = result[5], pressionrel = result[6], date = result[7], pointrosee = result[8])) except Exception, e: print "exception in executing :", req print e c.close() return list_mesure
def GetMesure(p_id = None): mesure = None if my_dbc and p_id: c = my_dbc.cursor() req = "SELECT PressionAbs, TemperatureExt, HumiditeInt, IdMesure, TemperatureInt, HumiditeExt, PressionRel, DateMesure, PointRosee FROM mesure WHERE IdMesure = " + str(p_id) try: r = c.execute(req) if r: result = c.fetchone() mesure = Mesure(pressionabs = result[0], temperatureext = result[1], humiditeint = result[2], id = result[3], temperatureint = result[4], humiditeext = result[5], pressionrel = result[6], date = result[7], pointrosee = result[8]) except Exception, e: print "exception in executing :", req print e c.close() return mesure
class DBMesure: def __init__(self, pressionabs = None, temperatureext = None, humiditeint = None, id = None, temperatureint = None, humiditeext = None, pressionrel = None, date = None, pointrosee = None): self.PressionAbs = pressionabs self.TemperatureExt = temperatureext self.HumiditeInt = humiditeint self.Id = id self.TemperatureInt = temperatureint self.HumiditeExt = humiditeext self.PressionRel = pressionrel self.Date = date self.PointRosee = pointrosee
def SetPressionAbs(self, pressionabs): self.PressionAbs = pressionabs
def SetTemperatureExt(self, temperatureext): self.TemperatureExt = temperatureext
def SetHumiditeInt(self, humiditeint): self.HumiditeInt = humiditeint
def SetId(self, id): self.Id = id
def SetTemperatureInt(self, temperatureint): self.TemperatureInt = temperatureint
def SetHumiditeExt(self, humiditeext): self.HumiditeExt = humiditeext
def SetPressionRel(self, pressionrel): self.PressionRel = pressionrel
def SetDate(self, date): self.Date = date
def SetPointRosee(self, pointrosee): self.PointRosee = pointrosee
def GetPressionAbs(self): return self.PressionAbs
def GetTemperatureExt(self): return self.TemperatureExt
def GetHumiditeInt(self): return self.HumiditeInt
def GetId(self): return self.Id
def GetTemperatureInt(self): return self.TemperatureInt
def GetHumiditeExt(self): return self.HumiditeExt
def GetPressionRel(self): return self.PressionRel
def GetDate(self): return self.Date
def GetPointRosee(self): return self.PointRosee
def AddIntoDB(self): c = my_dbc.cursor() req = "INSERT INTO MESURE (PressionAbs, TemperatureExt, HumiditeInt, TemperatureInt, HumiditeExt, PressionRel, DateMesure, PointRosee) " req += "VALUES(" + SQLValue(self.PressionAbs, 'entier') + ", " + SQLValue(self.TemperatureExt, 'entier') + ", " + SQLValue(self.HumiditeInt, 'entier') + ", " + SQLValue(self.TemperatureInt, 'entier') + ", " + SQLValue(self.HumiditeExt, 'entier') + ", " + SQLValue(self.PressionRel, 'entier') + ", " + SQLValue(self.Date, 'date') + ", " + SQLValue(self.PointRosee, 'entier') + ")" try: r = c.execute(req) if r: self.Id = c.connection.insert_id() my_dbc.commit() except Exception, e: raise MyDBInsertException(e, req) c.close()
def DelFromDB(self): c = my_dbc.cursor() req = "DELETE FROM mesure " req += "WHERE IdMesure = " + str(self.Id) try: r = c.execute(req) my_dbc.commit() except Exception, e: raise MyDBDeleteException(e, req) c.close()
def UpDateInDB(self): c = my_dbc.cursor() req = "UPDATE mesure " req += "SET PressionAbs = " + SQLValue(self.PressionAbs, 'entier') + ", TemperatureExt = " + SQLValue(self.TemperatureExt, 'entier') + ", HumiditeInt = " + SQLValue(self.HumiditeInt, 'entier') + ", TemperatureInt = " + SQLValue(self.TemperatureInt, 'entier') + ", HumiditeExt = " + SQLValue(self.HumiditeExt, 'entier') + ", PressionRel = " + SQLValue(self.PressionRel, 'entier') + ", DateMesure = " + SQLValue(self.Date, 'date') + ", PointRosee = " + SQLValue(self.PointRosee, 'entier') + " " req += "WHERE IdMesure = " + str(self.Id) try: r = c.execute(req) my_dbc.commit() except Exception, e: raise MyDBUpdateException(e, req) c.close()
def ToXML(self, h): d = {} d['DateMesure'] = str(self.Date) d['HumiditeExt'] = str(self.HumiditeExt) d['HumiditeInt'] = str(self.HumiditeInt) d['IdMesure'] = str(self.Id) d['PointRosee'] = str(self.PointRosee) d['PressionAbs'] = str(self.PressionAbs) d['PressionRel'] = str(self.PressionRel) d['TemperatureExt'] = str(self.TemperatureExt) d['TemperatureInt'] = str(self.TemperatureInt) h.startElement('Mesure', d) h.endElement('Mesure')
def FromXML(self, attrs): if attrs.has_key("DateMesure") and attrs.getValue("DateMesure").encode("iso8859_1") != 'None': self.Date = attrs.getValue("DateMesure").encode("iso8859_1") else: self.Date = None if attrs.has_key("HumiditeExt") and attrs.getValue("HumiditeExt").encode("iso8859_1") != 'None': self.HumiditeExt = int(attrs.getValue("HumiditeExt")) else: self.HumiditeExt = None if attrs.has_key("HumiditeInt") and attrs.getValue("HumiditeInt").encode("iso8859_1") != 'None': self.HumiditeInt = int(attrs.getValue("HumiditeInt")) else: self.HumiditeInt = None if attrs.has_key("IdMesure") and attrs.getValue("IdMesure").encode("iso8859_1") != 'None': self.Id = int(attrs.getValue("IdMesure")) else: self.Id = None if attrs.has_key("PointRosee") and attrs.getValue("PointRosee").encode("iso8859_1") != 'None': self.PointRosee = int(attrs.getValue("PointRosee")) else: self.PointRosee = None if attrs.has_key("PressionAbs") and attrs.getValue("PressionAbs").encode("iso8859_1") != 'None': self.PressionAbs = int(attrs.getValue("PressionAbs")) else: self.PressionAbs = None if attrs.has_key("PressionRel") and attrs.getValue("PressionRel").encode("iso8859_1") != 'None': self.PressionRel = int(attrs.getValue("PressionRel")) else: self.PressionRel = None if attrs.has_key("TemperatureExt") and attrs.getValue("TemperatureExt").encode("iso8859_1") != 'None': self.TemperatureExt = int(attrs.getValue("TemperatureExt")) else: self.TemperatureExt = None if attrs.has_key("TemperatureInt") and attrs.getValue("TemperatureInt").encode("iso8859_1") != 'None': self.TemperatureInt = int(attrs.getValue("TemperatureInt")) else: self.TemperatureInt = None
def __eq__(self, mesure): return self.Id == mesure.Id
def __repr__(self): return str(self.PressionAbs) + ' ' + str(self.TemperatureExt) + ' ' + str(self.HumiditeInt) + ' ' + str(self.Id) + ' ' + str(self.TemperatureInt) + ' ' + str(self.HumiditeExt) + ' ' + str(self.PressionRel) + ' ' + str(self.Date) + ' ' + str(self.PointRosee)
def GetLastDateTimeMesure(): d = None req = "SELECT MAX(DateMesure) FROM MESURE" c = my_dbc.cursor() try: r = c.execute(req) if r: result = c.fetchone() if result[0]: d = DateTime.DateTimeFrom(result[0].strftime("%Y-%m-%d %H:%M:00")) except Exception, e: print "exception :", e return d
class Mesure(DBMesure): def __init__(self, *args, **kwargs): DBMesure.__init__(self, *args, **kwargs)
## ## Connection à la base de données MySQL ## mysql_host = '' mysql_base = '' mysql_user = '' mysql_password = ''
try: mysql_host, mysql_base, mysql_user, mysql_password = GetMySQLParameters('meteo') my_dbc = Connection(mysql_host, mysql_user, mysql_password) my_dbc.select_db(mysql_base) except Exception, e: print e my_dbc = None print "Impossible de se connecter à la base de données meteo sur " + mysql_host + "/" + mysql_base + ". Par conséquent, l'application ne peut pas se poursuivre." sys.exit(0)
|
Fichier de configuration meteo.ini : |
[server]
# serveur mysql
host = localhost
# nom de la base
base = meteo
# login
user = mon_utilisateur
# mot de passe
password = vouscroyezqdmemepasquejvaisvousldonner
|
Fonction point_de_rosee pour calculer le point de rosée en fonction de la température et de l'humidité extérieures : |
def point_de_rosee(t, rh): if t is None or rh is None: return None alpha = 17.27 * t / (237.7 + t) + math.log(rh) td = 237.7 * alpha / (17.27 - alpha) return td
|
Fonction synchro_base à ajouter à la classe WS3500 : |
def synchro_base(self, trace = False, delai_dernier_releve = 0): date_now = DateTime.now().Format("%Y-%m-%d %H:%M") if trace: f_erreur = open("ws_synchro_base.txt", "a") if not f_erreur: print "Enable to open file 'ws_synchro_base.txt'." else: f_erreur.write("*"*10 + " Begin : " + date_now + " " + "*"*10 + " ") last_date = GetLastDateTimeMesure() if last_date is None: last_date = "" else: last_date = last_date.Format("%Y-%m-%d %H:%M") if trace: print "Derniere mesure enregistree :", str(last_date) f_erreur.write("Last mesure recorded : %s " % str(last_date)) historique = self.read_history(delai_dernier_releve) mesures = [] new_last_date = "" d1 = "" d2 = "" nb = 0 for date_heure, temperature_indoor, temperature_outdoor, pressure, humidity_indoor, humidity_outdoor, raincount, windspeed, winddir_degrees in historique: if date_heure > date_now or temperature_indoor >= 1000 or temperature_outdoor >= 1000: pass else: if date_heure > last_date: if date_heure > new_last_date: if new_last_date == "": d1 = date_heure d2 = date_heure new_last_date = date_heure if pressure == 111110: pressure = None if temperature_indoor == 811: temperature_indoor = None if temperature_outdoor == 811: temperature_outdoor = None if humidity_indoor == 1100: humidity_indoor = None if humidity_outdoor == 1100: humidity_outdoor = None print "ajout de :", date_heure, temperature_indoor, temperature_outdoor, pressure, humidity_indoor, humidity_outdoor, raincount, windspeed, winddir_degrees try: ##mes[0].strftime("%Y-%m-%d %H:%M:00") if temperature_outdoor is None or humidity_outdoor is None: point_rosee = None else: point_rosee = 10 * round(point_de_rosee(temperature_outdoor / 10.0, humidity_outdoor / 100.0),1) if pressure is None: pressure_rel = None else: pressure_rel = pressure + 370 mesure = Mesure(date = date_heure, pressionabs = pressure, pressionrel = pressure_rel, temperatureint = temperature_indoor, temperatureext = temperature_outdoor, humiditeint = humidity_indoor, humiditeext = humidity_outdoor, pointrosee = point_rosee) mesure.AddIntoDB() nb += 1 except MyDBException, e: print "Erreur MySQL :", str(e) print e.Message if trace: f_erreur.write("Exception : " + str(e) + " " + str(e.Message) + " ") if trace: print print "synchro terminee." f_erreur.write("%i mesures added from %s to %s " % (nb, d1, d2)) f_erreur.write("*"*10 + " End : " + DateTime.now().Format("%Y-%m-%d %H:%M") + " " + "*"*10 + " ") f_erreur.close() return nb
|
|
|
Pour générer des statistiques plus visuelles, il existe plusieurs méthodes
|
1ère solution brute : générer du code html avec les données courantes qu'il suffira d'intégrer dans une page html. |
Pour cela, il suffit d'ajouter une méthode "main2html" à notre classe WS3500 :
def main2html(self, data = None): if data is None: data = self.load_main_data(data)
tendency, forecast = ws.tendency_forecast(meteo_data) t_min_max = ws.temperature_outdoor_minmax(meteo_data) h_min_max = ws.humidity_outdoor_minmax(meteo_data)
html = "<table>" html += "<tr>" html += "<td>Température</td>" html += "<td>Humidité</td>" html += "<td><img src="images/set04_%s.png" width="80" height="80"></td>" % ["fluctuating", "sunny", "rainy"][forecast] html += "</tr>" html += "<tr>" html += "<td>%s \xb0C</td>" % str(ws.temp_ext(meteo_data)) html += "<td>%s %%</td>" % str(ws.humidity_outdoor(meteo_data)) html += "<td><img src="images/set04_tendency_%s.png" width="80" height="80"></td>" % ["no", "up", "down"][tendency] html += "</tr>" html += "<tr>" html += "<td>min %s \xb0C le %s</td>" % (str(t_min_max[0]), DateTime.DateFrom(t_min_max[2]).Format("%d/%m/%Y \xe0 %H:%M")) html += "<td>min %s %% le %s</td>" % (str(h_min_max[0]), DateTime.DateFrom(h_min_max[2]).Format("%d/%m/%Y \xe0 %H:%M")) html += "<td> </td>" html += "</tr>" html += "<tr>" html += "<td>max %s \xb0C le %s</td>" % (str(t_min_max[1]), DateTime.DateFrom(t_min_max[3]).Format("%d/%m/%Y \xe0 %H:%M")) html += "<td>max %s %% le %s</td>" % (str(h_min_max[1]), DateTime.DateFrom(h_min_max[3]).Format("%d/%m/%Y \xe0 %H:%M")) html += "<td> </td>" html += "</tr>" html += "</table>"
return html
|
2ème solution plus souple : générer du code xml avec les données courantes qu'il faudra lire et mettre en forme dans une application tierce |
Pour cela, il suffit d'ajouter une méthode "main2xml" à notre classe WS3500 :
def main2xml(self, xml_path, xml_filename, data = None): if data is None: data = self.load_main_data(data)
tendency, forecast = ws.tendency_forecast(meteo_data) t_min_max = ws.temperature_outdoor_minmax(meteo_data) h_min_max = ws.humidity_outdoor_minmax(meteo_data)
if not os.path.exists(xml_path): os.makedirs(xml_path) f = open(xml_path + os.sep + xml_filename, "w") h = xml.sax.saxutils.XMLGenerator(f) h.startDocument() d = DateTime.now() h.startElement("meteo", {"date":d.Format("%d/%m/%Y"), "heure":d.Format("%H:%M")})
d = {} d["t_ext"] = str(ws.temp_ext(meteo_data)) d["h_ext"] = str(ws.humidity_outdoor(meteo_data)) d["tendance"] = str(tendency) d["prevision"] = str(forecast) d["t_ext_min"] = str(t_min_max[0]) d["t_ext_date_min"] = DateTime.DateFrom(t_min_max[2]).Format("%d/%m/%Y \xe0 %H:%M") d["t_ext_max"] = str(t_min_max[1]) d["t_ext_date_max"] = DateTime.DateFrom(t_min_max[3]).Format("%d/%m/%Y \xe0 %H:%M") d["h_ext_min"] = str(h_min_max[0]) d["h_ext_date_min"] = DateTime.DateFrom(h_min_max[2]).Format("%d/%m/%Y \xe0 %H:%M") d["h_ext_max"] = str(h_min_max[1]) d["h_ext_date_max"] = DateTime.DateFrom(h_min_max[3]).Format("%d/%m/%Y \xe0 %H:%M") h.startElement('mesure', d) h.endElement('mesure')
h.endElement("meteo") h.endDocument() f.close()
|
3ème solution plus sexy : générer des graphiques sous forme d'images |
Là, tout dépend de ce qu'on veut comme statistiques ! |
|
|
Pour l'automatisation, il y a plusieurs méthodes :
|
1ère solution : |
Le plus simple c'est de lancer par un planificateur de tâche l'exécution du programme python via la commande : python ws3500.py |
2ème solution : |
On peut créer un exéutable sous windows, pas vraiment d'intérêt pour moi mais plus pour les autres qui n'ont pas besoin d'installer python. Pour linux, pas d'exécutable mais de toute façon python est installé en standard.
Pour créer l'exécutable, on utilise le module py2exe et on crée le programme "setup.py" ci-dessous :
# -*- coding: ISO-8859-1 -*- import sys
from distutils.core import setup import py2exe import os
fichiers_necessaires = ["meteo.ini"]
setup(name = "ws3500", windows = [{'script': "ws3500.py"}], data_files = [("", fichiers_necessaires)], )
Puis on lance la commande :python setup.py py2exe -c --packages encodings -d install
|
Exemple d'automatisation : |
Voici mon automatisation sur mon PC à la campagne, complètement autonome, équipé d'une station météo WS3650 et en Win XP :
- A 12:00 tous les jours : L'ordinateur démarre automatiquement (configuration du bios).
- A 12:05 : la tâche planifiée exécute le script station_meteo_auxon.py qui fait les traitements suivants :
- ouverture du logiciel HeavyWeather fourni avec la station pour enregistrer l'historique de la station météo (on sait jamais, des fois que mon script ne fonctionne pas... on n'est jamais assez prudent...)
- 2 minutes après, fermeture forcée de cette application(le port com ne peut pas être utilisé par deux applications en même temps)
- 10s après (le temps que le port soit bien fermé), enregistrement de l'historique de la station dans ma base mysql via mon script. 3 lectures à 7 minutes d'intervalles.
- arrêt forcé de l'ordinateur
# -*- coding: iso-8859-1 -*-
import time import subprocess import win32api import os
from Utils import * from ws3500 import *
DELAY_HEAVYWEATHER = 60 * 2
process = subprocess.Popen(["C:\\Program Files\\HeavyWeather3500\\HeavyWeather.exe", '', '']) time.sleep(DELAY_HEAVYWEATHER) ## wait for 2 minutes before killing HeavyWeather win32api.TerminateProcess(int(process._handle), -1)
time.sleep(10)
DELAY_RECORD = 60 * 7
mysql_host = '' mysql_base = '' mysql_user = '' mysql_password = ''
try: mysql_host, mysql_base, mysql_user, mysql_password = GetMySQLParameters('meteo') my_dbc = Connection(mysql_host, mysql_user, mysql_password) my_dbc.select_db(mysql_base) except Exception, e: print e my_dbc = None print "Impossible de se connecter à la base de données meteo sur " + mysql_host + "/" + mysql_base + ". Par conséquent, l'application ne peut pas se poursuivre."
for i in range(3): if i > 0: time.sleep(DELAY_RECORD)
ser = serial.Serial(baudrate=300, port=0, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=None, writeTimeout=None, interCharTimeout=None, rtscts=0, dsrdtr=None, xonxoff=0) try: ws = WS3500(ser, 3600) interval = 0
print "Lecture des données n°", i meteo_data = ws.load_main_data() if meteo_data: t_int = ws.temp_int(meteo_data) print "T° int :", t_int if t_int > 100: print string2hex(meteo_data) else: print "T° ext :", ws.temp_ext(meteo_data) l = ws.temperature_indoor_minmax(meteo_data) print "intérieur :" print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] l = ws.temperature_outdoor_minmax(meteo_data) print "extérieur :" print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] print "Point de rosée :", ws.dewpoint(meteo_data) l = ws.dewpoint_minmax(meteo_data) print "T° min :", l[0], " à ", l[2], "T° max :", l[1], " à ", l[3] print "Humidité int :", ws.humidity_indoor(meteo_data) print "Humidité ext :", ws.humidity_outdoor(meteo_data) l = ws.humidity_indoor_minmax(meteo_data) print "intérieur :" print "Humidité min :", l[0], " à ", l[2], "Humidité max :", l[1], " à ", l[3] l = ws.humidity_outdoor_minmax(meteo_data) print "extérieur :" print "Humidité min :", l[0], " à ", l[2], "Humidité max :", l[1], " à ", l[3] tendency, forecast = ws.tendency_forecast(meteo_data) tendency_values = ["Constant", "Augmentant", "Diminuant"] forecast_values = ["Pluvieux", "Nuageux", "Ensoleillé"] print "Prévisions :", tendency_values[tendency], "et", forecast_values[forecast] print "Prel int :", ws.rel_pressure(meteo_data) l = ws.rel_pressure_minmax(meteo_data) print "Prel min :", l[0], " à ", l[2], "Prel max :", l[1], " à ", l[3] print "Pabs int :", ws.abs_pressure(meteo_data) interval = ws.recording_interval(meteo_data) print "Recording interval :", interval, "min"
print ws.main2html(meteo_data) ws.main2xml(os.getcwd(), "meteo.xml", meteo_data) if ws.GetVersion() == 3600: try: print "Vent :", ws.wind_current() l = ws.wind_minmax() print "Vent min :", l[0], " à ", l[2], "Vent max :", l[1], " à ", l[3] print "T° ressentie :", ws.windchill() l = ws.windchill_minmax() print "T° ressentie min :", l[0], " à ", l[2], "T° ressentie max :", l[1], " à ", l[3] print "Pluie depuis 1h :", ws.rain_1h() l = ws.rain_1h_max() print "Pluie max depuis 1h :", l[0], " à ", l[1] print "Pluie depuis 1 jour :", ws.rain_24h() l = ws.rain_24h_max() print "Pluie max depuis 1 jour :", l[0], " à ", l[1] print "Pluie depuis 1 semaine :", ws.rain_1w() l = ws.rain_1w_max() print "Pluie max depuis 1 semaine :", l[0], " à ", l[1] print "Pluie depuis 1 mois :", ws.rain_1m() l = ws.rain_1m_max() print "Pluie max depuis 1 mois :", l[0], " à ", l[1] print "Pluie totale :", ws.rain_total() print "Pluie depuis :", ws.rain_total_time() except serial.SerialException, e: raise e except Exception, e: print "exception :", e
nb_synchro = ws.synchro_base(True, interval) print "Nb synchro :", nb_synchro nb_synchro = 0
if nb_synchro: GraphMeteo() GraphTemp() GraphMoyenne() if False: ## tant que pas de connexion internet... SendImagesMeteo(filenames = ["meteo.xml"] + filter(lambda(x):os.path.splitext(x)[1] == os.extsep + "png", os.listdir(os.getcwd()))) else: pass ## tant que pas de connexion internet... ##SendImagesMeteo(filenames = ["meteo.xml"], ssdir_ftp = "auxon") except serial.SerialException, e: print "SerialException :", e except Exception, e: print "Exception :", e finally: ser.close() my_dbc.close()
## Attention demande d'arrêt de l'ordinateur os.popen("shutdown -s -f")
|
Automatisation de la connexion internet : |
Si la station météo est à son domicile, c'est bien pratique
pour l'automatisation parce qu'en général on dispose d'une connexion internet haut débit. En revanche, si on est comme dans mon cas dans une maison de campagne, il n'y a pas de connexion internet permanente. L'astuce est donc d'utiliser une clé 3G et comme les forfaits sont hors de prix, on va évidemment essayer d'optimiser les temps d'accès, en les démarrant sur demande. Le hic, c'est que la connexion doit se faire à travers un logiciel propriétaire, et le deuxième hic, c'est que la clé est protégée par un code PIN. On pourrait désactiver l'authentification par code, mais il n'y aurait plus du tout de sécrité en cas de vol.
L'objectif va donc être d'exécuter le logiciel du FAI, de simuler une saisie du code PIN et la demande de connexion. Et heureusement, il existe un module PyWin32 qui donne accès à l'API de Windows.
On définit les modules et les constantes
import subprocess import time import win32gui
WM_SETTEXT = 0xC BM_CLICK = 0xF5
- On définit les constantes de la connexion: le temps d'attente entre chaque essai, le code PIN et le répertoire de l'exécutable.
DELAI_WAIT = 10 PIN_CODE = "1234" REP_EXE = "C:\Program Files (x86)\Internet 3G+ Bouygues Telecom\"
- On exécute le programme de connexion et on récupère le handle de la fenêtre nommée "Internet 3G+ Bouygues Telecom", le handle de son bouton nommée "Connecter" ainsi que celui de la fenêtre "vérifier PIN" qui demande le code PIN.
def enum_window_callback(hwnd, l): fen_name = win32gui.GetWindowText(hwnd) if fen_name != "": if fen_name == "Internet 3G+ Bouygues Telecom": l[0].append(hwnd) elif fen_name == "vérifier PIN": l[1].append(hwnd) return True
def enum_child_window_callback(hwnd, l): fen_name = win32gui.GetWindowText(hwnd) if fen_name != "": if win32gui.GetClassName(hwnd).lower() == "button" and fen_name == "&Connecter": l.append(hwnd) return True
subprocess.Popen([REP_EXE + "Internet 3G+ Bouygues Telecom.exe", "", ""]) trouve = False for i in range(5): print "tentative ", i+1 time.sleep(DELAI_WAIT) l = [[], []] win32gui.EnumWindows(enum_window_callback, l)
for hwnd in l[0]: hwnd_btn_connect = [] win32gui.EnumChildWindows(hwnd, enum_child_window_callback, hwnd_btn_connect) if hwnd_btn_connect: l[0] = (hwnd, hwnd_btn_connect[0]) trouve = True break
if trouve and l[1]: break
if not trouve: print "traitement abandonné"
- On inscrit le code PIN et on clique sur le bouton "OK" puis on attend que la fenêtre de saisie du code PIN disparaisse, avant de cliquer sur le bouton "Connecter"
if l[0] and l[1]: hwnd = l[1][0] if hwnd != 0: hwnd_pass = win32gui.FindWindowEx(hwnd, 0, "edit", None) if hwnd_pass != 0: if win32gui.SendMessage(hwnd_pass, WM_SETTEXT, 0, PIN_CODE): hwnd_btn_ok = win32gui.FindWindowEx(hwnd, 0, "button", "OK") if hwnd_btn_ok != 0: win32gui.SendMessage(hwnd_btn_ok, BM_CLICK, 0, 0)
for i in range(5): print "waiting for authentification", i+1 time.sleep(DELAI_WAIT)
if win32gui.IsWindow(hwnd) == 0: break if win32gui.IsWindow(hwnd) == 0: if l[0][1] != 0: win32gui.SendMessage(l[0][1], BM_CLICK, 0, 0)
|
|
|