带有温度传感器和LCD的PicoW NTP 客户端

树莓派 PICO 和 RP2040 相关应用讨论
回复
头像
shaoziyang
帖子: 3917
注册时间: 2019年 10月 21日 13:48

带有温度传感器和LCD的PicoW NTP 客户端

#1

帖子 shaoziyang »

图片

代码: 全选

#
# Idea: downloaded on 2022-07-17 23h45 utc+1
# Idea: downloaded from: https://gist.github.com/aallan/581ecf4dc92cd53e3a415b7c33a1147c
# Idea: aallan/picow_ntp_client.py
# 2022-07-19. In this version added readings from an Adafruit AHT20 temperature/humidity sensor
# Added DST timezone data for ten years 2022-2031, timezone: Europe/Portugal
# 2022-07-20: Added functionality to display data on a SparkFun 4x20 serLCD
# 2022-07-21: Added functionality to import the contents of file: dst_data.json as a dictionary.
# This file now contains the timezone in the first line, followed by ten lines with dst data.
# In this way - when necessary - the timezone and/or the dst data can be updated without altering the script.
#
import network
import socket
import time
import utime
import struct
# added to have not to show credentials in this code
from secrets import secrets
#from machine import Pin
import machine
import busio
import board
import json

# +--------------------------+
# | Imports for LCD control |
# +--------------------------+
from sparkfun_serlcd import Sparkfun_SerLCD_I2C

my_debug = False
use_aht20_sensor = True
time_is_set = False
show_NTP_upd_msg_on_lcd = True

"""
Note about time.ticks_diff(ticks12, ticks2)
Note: Do not pass time() values to ticks_diff(), you should use normal mathematical operations on them.
But note that time() may (and will) also overflow.
This is known as https://en.wikipedia.org/wiki/Year_2038_problem .

Note about time.time()
Returns the number of seconds, as an integer, since the Epoch, assuming that underlying RTC is set
and maintained as described above. If an RTC is not set, this function returns number of seconds since
a port-specific reference point in time (for embedded boards without a battery-backed RTC,
usually since power up or reset). If you want to develop portable MicroPython application,
ou should not rely on this function to provide higher than second precision.
See: https://docs.micropython.org/en/latest/library/time.html

"""

# Initialize I2C
# busio.I2C(SCL, SDA)
# Mod by @PaulskPt: SCL, SDA pins to GP3 and GP2 so that GP0 is free to toggle the built-in LED
i2c = busio.I2C(board.GP3, board.GP2)

# +-------------------------------------------------+
# | Definitions for 20x4 character LCD (I2C serial) |
# +-------------------------------------------------+

# --- STATIC (not to be changed!) -------
lcd_columns = 20
lcd_rows = 4
lcd_max_row_cnt = 3 # 0 - 3 = 4 lines
my_row1 = 1
my_row2 = 2
my_col = 3
# --- End-of STATIC ----------------------
lcd_max_row_cnt = 3 # 0 - 3 = 4 lines
lcd_row = 0
lcd_col = 0
lcd_data = None # See setup()

# It happens when that SerLCD gets locked-up e.g. caused by touching with a finger the RX pin (4th pin fm left). This pin is very sensitive!
# A locked-up situation is often shown as that all the 8x5 segments of the LCD are 'filled with inverse pixels.
# see: https://github.com/KR0SIV/SerLCD_Reset for a reset tool. But this is not what I want. I want to be able to reset the LCD from within this script.
while True:
try:
lcd = Sparkfun_SerLCD_I2C(i2c)
break
except ValueError as msg: #.
print("The LCD is locked-up. Please connect RS with GND for a second or so.")
time.sleep(10) # wait a bit

"""
Talk to the LCD at I2C address 0x27.
The number of rows and columns defaults to 4x20, so those
arguments could be omitted in this case.
Note @paulsk: I experienced that one needs to use the 'num_rows =' and 'num_cols ='
when these parameters are used.
"""

if use_aht20_sensor:
import adafruit_ahtx0
sensor = adafruit_ahtx0.AHTx0(i2c)

# See: https://docs.python.org/3/library/time.html#time.struct_time
tm_year = 0
tm_mon = 1 # range [1, 12]
tm_mday = 2 # range [1, 31]
tm_hour = 3 # range [0, 23]
tm_min = 4 # range [0, 59]
tm_sec = 5 # range [0, 61] in strftime() description
tm_wday = 6 # range 8[0, 6] Monday = 0
tm_yday = 7 # range [0, 366]
tm_isdst = 8 # 0, 1 or -1
tm_tmzone = 'Europe/Lisbon' # default (set in dst_data.json) abbreviation of timezone name
timezone_set = False
#tm_tmzone_dst = "WET0WEST,M3.5.0/1,M10.5.0"

# Timezone and dst data are in external file "dst_data.json"
# See: https://www.epochconverter.com/
# The "start" and "end" values are for timezone "Europe/Portugal"
dst = None # See setup()

tm_gmtoff = 3600 # offset east of GMT in seconds
# added '- tm_gmtoff' to subtract 1 hour = 3600 seconds to get GMT + 1 for Portugal
NTP_DELTA = 2208988800 - tm_gmtoff # mod by @PaulskPt.
# modified to use Portugues ntp host
host = "pt.pool.ntp.org" # mod by @PaulskPt
dim = {1:31, 2:28, 3:31, 4:30, 5:31, 6:30, 7:31, 8:31, 9:30, 10:31, 11:30, 12:31}

weekdays = {0:"Monday", 1:"Tuesday", 2:"Wednesday", 3:"Thursday", 4:"Friday", 5:"Saturday", 6:"Sunday"}

led = machine.Pin("LED", machine.Pin.OUT)

"""
function searches the dst dictionary for prensence of the year
Parameter: integer: yr, the year to search
Return: the index of the found key to the dst dictionary. If not found: -1


"""
def yr_in_dst_dict(yr):
TAG = "yr_in_dst_dict(): "
dst_idx = -1

if my_debug:
print(TAG+"type(dst)=", type(dst))
print(TAG+"type(dst[\"dst\"][0])={}, contents={}".format(type(dst["dst"][0]), dst["dst"]))
le = len(dst["dst"])
if my_debug:
print(TAG+"length dst =", le)
t_dst = int(dst["dst"][1]["year"]) # the first record holds the timezone
print(TAG+"1st item in dst[\"dst\"]= {}, type={}".format(t_dst, type(t_dst)))

for i in range(1,le):
if yr == int(dst["dst"][i]["year"]):
if my_debug:
print(TAG+"year {} found in dst dict, index: {}".format(dst["dst"][i]["year"], i))
dst_idx = i
break
if dst_idx < 0:
print(TAG+"year {} not found in dst dict".format(yr))

return dst_idx

"""
Determine if the current date and time are within the Daylight Saving Time period
for the TimeZone: Europe/Lisbon.
Parameters: None
Return: Boolean
If the Year of current date is outside of dst.keys()
then print a message on the REPL and raise a SystemExit
"""
def is_dst():
TAG = "is_dst(): "
# We only can call utime.time() if the built-in RTC has been set by set_time()
if not time_is_set:
print("is_dst(): cannot continue: built-in RTC has not been set with time of NTP host")
return False

t = utime.time()
yr = utime.localtime(t)[0]

dst_idx = yr_in_dst_dict(yr)
if dst_idx >= 0:
t_dst = dst["dst"][dst_idx]
if my_debug:
print(TAG+"t_dst= ",end='')
print(*sorted(t_dst.items()))
start = int(t_dst["start"]) # integer
end = int(t_dst["end"]) # integer
start_info = t_dst["start_info"] # string
end_info = t_dst["end_info"] # string
if my_debug:
print(TAG+"start={}, end={}".format(start,end))
print(TAG+"start_info=\"{}\", end_info=\"{}\"".format(start_info,end_info))
return False if t < start or t > end else True
else:
print("year: {} not in dst dictionary ({}).\nUpdate the dictionary! Exiting...".format(yr, dst.keys()))
raise SystemExit

"""
Get the date and time from a NTP host
Set the built-in RTC
Parameters: None
Return: None
"""
def set_time():
global time_is_set
TAG = "set_time(): "
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1B
NTP_ERROR_CORR = 12 # deduct 12 seconds from the time received (empirecally determined while comparing with another NTP synchronized clock I use)
addr = socket.getaddrinfo(host, 123)[0][-1]
print(TAG+"Time zone: \"{}\"".format(tm_tmzone))
print(TAG+"NTP host address: \"{}\"".format(addr[0]))
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
tm = None

while True:
msg = None
res = None
try:
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
if my_debug:
print(TAG+"msg received:", msg)
except OSError as exc:
if exc.args[0] == 110: # ETIMEDOUT
print(TAG+"ETIMEDOUT. Returning False")
time.sleep(2)
continue
finally:
s.close()
break

val = struct.unpack("!I", msg[40:44])[0]
t = val - NTP_DELTA - NTP_ERROR_CORR
tm = time.gmtime(t)

dst_idx = yr_in_dst_dict(tm[tm_year])
if dst_idx < 0:
print(TAG+"year: {} not in dst dictionary ({}).\nUpdate the dictionary! Exiting...".format(tm[tm_year], dst.keys()))
raise SystemExit
machine.RTC().datetime((tm[tm_year], tm[tm_mon], tm[tm_mday], tm[tm_wday] + 1,
tm[tm_hour], tm[tm_min], tm[tm_sec], 0))
if not time_is_set:
time_is_set = True # Flag, only set once
if not my_debug:
print(TAG+"built-in RTC has been set with time from NTP host")
if show_NTP_upd_msg_on_lcd:
lcd.clear()
lcd.set_cursor(0,1)
lcd.write(TAG)
lcd.set_cursor(0,2)
lcd.write("RTC set fm NTC host")
time.sleep(2)

if tm is not None:
print(TAG+"date/time updated from: \"{}\"".format(host))

"""
setup() function. Called by main()
Parameters: None
Return: None
"""
def setup():
global dst, dst_info, tm_tmzone, timezone_set
TAG = "setup(): "
t = {0:"RPi PicoW",1:"NTP-Client",2:"Temp/Humidity Sensor",3:"(c) 2022 @PaulskPt"}
fn = 'dst_data.json'
f = None
dst_data = None

# lcd.backlight();
lcd.system_messages(False)
lcd.set_contrast(120) # Set lcd contrast to default value
lcd.set_backlight(0xFF8C00) # orange backlight color
lcd.cursor(1) # hide cursor was (2) show cursor
lcd.blink(0) # don't blink. Was (1) blink cursor
lcd.clear()
for i in range(len(t)):
lcd.set_cursor(lcd_col, lcd_row+i) # ATTENTION: the SerLCD module set_cursor(col,row)
lcd.write(t[i])
lcd.set_cursor(19,3)
time.sleep(3) # <--------------- DELAY ---------------

# Load dst and dst_info dictionaries
try:
with open(fn) as f:
dst_data = f.read()
if my_debug:
print(TAG+"Data type data1 b4 reconstruction: ", type(dst_data))
print(TAG+"Contents dst_data b4 \"{}\"".format(dst_data))
# reconstructing the data as a dictionary
dst = json.loads(dst_data)
dst_data = None # Cleanup
if my_debug:
print(TAG+"Data type after reconstruction: ", type(dst))
print(TAG+"contents dst:", dst)
except OSError as exc:
if exc.args[0] == 2: # File not found (ENOENT)
print(TAG+"ERROR: File: \"{}\" not found. Exiting...".format(fn))
raise SystemExit
else:
print(TAG+"Error \"{}\" occurred. Exiting...".format(exc))
raise

# Read the timezone from file:
if not timezone_set:
tz = dst["dst"][0]["timezone"]
if my_debug:
print(TAG+"Timezone imported from file: \"{}\" = \"{}\"".format(fn, tz))
if isinstance(tz, str) and len(tz) > 0:
tm_tmzone = tz
timezone_set = True

# Load login data from different file for safety reasons
ssid = secrets['ssid']
password = secrets['pw']
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)

max_wait = 10
while max_wait > 0:
if wlan.status() < 0 or wlan.status() >= 3:
break
max_wait -= 1
print('waiting for connection...')
time.sleep(1)

if wlan.status() != 3:
raise RuntimeError('network connection failed')
else:
print('connected')
status = wlan.ifconfig()
print( 'ip = ' + status[0] )

"""
Show on LCD and print on REPL:
- Date,
- HH:MM:SS
- Day-of-the-Year
- DST Yes/No
- Temperature and Humidity (depending flag: use_aht20_sensor)
Parameters: t: time.localtime() struct;
start: if True display all on LCD. if False display only HH:MM:SS and Temperature and Humidity (depending flag: use_ath20_sensor)
Return: None
"""
def show_dt_t_h(t, start):
dst = "Yes" if is_dst() else "No"
tp = "\nLocal date/time: {} {}-{:02d}-{:02d}, {:02d}:{:02d}:{:02d}, day of the year: {}. DST: {}".format(weekdays[t[tm_wday]], t[tm_year],
t[tm_mon], t[tm_mday], t[tm_hour], t[tm_min], t[tm_sec], t[tm_yday], dst)

if start:
t0 = "{} {}-{:02d}-{:02d}".format(weekdays[t[tm_wday]][:3],
t[tm_year], t[tm_mon], t[tm_mday])
t1 = "Day {} DST {}".format(t[tm_yday], dst)
t2 = "Hr {:02d}:{:02d}:{:02d}".format(t[tm_hour], t[tm_min], t[tm_sec])

if use_aht20_sensor:
tmp = "{:4.1f}C".format(sensor.temperature)
hum = "{:4.1f}%".format(sensor.relative_humidity)
t3 = "T {} H {}".format(tmp, hum)

print(tp)
if start:
lcd.clear()
lcd.set_cursor(0, 0)
lcd.write(t0)
lcd.set_cursor(0, 1)
lcd.write(t1)

lcd.set_cursor(0, 2)
lcd.write(t2)

if use_aht20_sensor:
lcd.set_cursor(0, 3)
lcd.write(t3)
print("Temperature: {}, Humidity: {}".format(tmp, hum))
lcd.set_cursor(19,3) # Park cursor

"""
main() function
Parameters: None
Return: None
"""
def main():
global time_is_set
setup()
set_time() # call at start
t = time.localtime()
o_mday = t[tm_mday]
o_hour = t[tm_hour] # Set hour for set_time() call interval
o_sec = t[tm_sec]
start = True
while True:
try:
t = time.localtime()
# yr, mo, dd, hh, mm, ss, wd, yd, dst
if o_mday != t[tm_mday]:
o_mday = t[tm_mday] # remember current day of the month
start = True # Force to rebuild the whole LCD
if o_hour != t[tm_hour]:
o_hour = t[tm_hour] # remember current hour
set_time()
t = time.localtime() # reload RTC time after being synchronized
start = True
if o_sec != t[tm_sec]:
o_sec = t[tm_sec] # remember current second
led.on()
show_dt_t_h(t, start)
if start:
start = False
led.off()

except KeyboardInterrupt:
print("Keyboard interrupt...exiting...")
led.off()
#lcd.clear()
raise SystemExit
except OSError as exc:
if exc.args[0] == 110: # ETIMEDOUT
time.sleep(2)
pass

if __name__ == '__main__':
main()[/i][/i][/i]

https://forum.micropython.org/viewtopic ... 318#p69318
 

回复

  • 随机主题
    回复总数
    阅读次数
    最新文章