From 8fb7a8a8809ca77863c5c5bf1d62fd9a65434499 Mon Sep 17 00:00:00 2001 From: mrbwburns <> Date: Mon, 22 Jan 2024 20:52:19 +0100 Subject: [PATCH] first shot at the openweathermap_wrapper extension (PyOwm replacement) --- inkycal/custom/__init__.py | 3 +- inkycal/custom/openweathermap_wrapper.py | 287 +++++++++++++++++++++-- inkycal/modules/inkycal_fullweather.py | 63 ++--- requirements.txt | 3 - 4 files changed, 302 insertions(+), 54 deletions(-) diff --git a/inkycal/custom/__init__.py b/inkycal/custom/__init__.py index f9d1efb..ddd9b61 100644 --- a/inkycal/custom/__init__.py +++ b/inkycal/custom/__init__.py @@ -1,3 +1,2 @@ from .functions import * -from .inkycal_exceptions import * -from .openweathermap_wrapper import OpenWeatherMap \ No newline at end of file +from .inkycal_exceptions import * \ No newline at end of file diff --git a/inkycal/custom/openweathermap_wrapper.py b/inkycal/custom/openweathermap_wrapper.py index 20c050f..da220d6 100644 --- a/inkycal/custom/openweathermap_wrapper.py +++ b/inkycal/custom/openweathermap_wrapper.py @@ -1,43 +1,286 @@ +import json import logging -from enum import Enum +from datetime import datetime +from datetime import timedelta +from typing import Dict +from typing import List +from typing import Literal import requests -import json +from dateutil import tz + +from inkycal.custom.functions import get_system_tz + +TEMP_UNITS = Literal["celsius", "fahrenheit"] +WIND_UNITS = Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"] logger = logging.getLogger(__name__) +logger.setLevel("DEBUG") -class WEATHER_OPTIONS(Enum): - CURRENT_WEATHER = "weather" - -class FORECAST_INTERVAL(Enum): - THREE_HOURS = "3h" - FIVE_DAYS = "5d" +def is_timestamp_within_range(timestamp: datetime, start_time: datetime, end_time: datetime) -> bool: + # Check if the timestamp is within the range + return start_time <= timestamp <= end_time class OpenWeatherMap: - def __init__(self, api_key:str, city_id:int, units:str) -> None: + def __init__( + self, + api_key: str, + city_id: int, + temp_unit: TEMP_UNITS = "celsius", + wind_unit: WIND_UNITS = "meters_sec", + language: str = "en", + ) -> None: self.api_key = api_key self.city_id = city_id - assert (units in ["metric", "imperial"] ) - self.units = units + self.temp_unit = temp_unit + self.wind_unit = wind_unit + self.language = language self._api_version = "2.5" self._base_url = f"https://api.openweathermap.org/data/{self._api_version}" + self.tz_zone = tz.gettz(get_system_tz()) - - def get_current_weather(self) -> dict: - current_weather_url = f"{self._base_url}/weather?id={self.city_id}&appid={self.api_key}&units={self.units}" + def get_current_weather(self) -> Dict: + """ + Gets current weather status from this API: https://openweathermap.org/current + :return: + Current weather as dictionary + """ + # Gets weather forecast from this API: + current_weather_url = ( + f"{self._base_url}/weather?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}" + ) response = requests.get(current_weather_url) if not response.ok: - raise AssertionError(f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}") - data = json.loads(response.text) - return data + raise AssertionError( + f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}" + ) + current_data = json.loads(response.text) - def get_weather_forecast(self) -> dict: - forecast_url = f"{self._base_url}/forecast?id={self.city_id}&appid={self.api_key}&units={self.units}" + current_weather = {} + current_weather["detailed_status"] = current_data["weather"][0]["description"] + current_weather["weather_icon_name"] = current_data["weather"][0]["icon"] + current_weather["temp"] = self.get_converted_temperature( + current_data["main"]["temp"] + ) # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit + current_weather["temp_feels_like"] = self.get_converted_temperature(current_data["main"]["feels_like"]) + current_weather["min_temp"] = self.get_converted_temperature(current_data["main"]["temp_min"]) + current_weather["max_temp"] = self.get_converted_temperature(current_data["main"]["temp_max"]) + current_weather["humidity"] = current_data["main"]["humidity"] # OWM Unit: % rH + current_weather["wind"] = self.get_converted_windspeed( + current_data["wind"]["speed"] + ) # OWM Unit Default: meter/sec, Metric: meter/sec + current_weather["wind_gust"] = self.get_converted_windspeed(current_data["wind"]["gust"]) + current_weather["uvi"] = None # TODO: this is no longer supported with 2.5 API, find alternative + + self.current_weather = current_weather + + return current_weather + + def get_weather_forecast(self) -> List[Dict]: + """ + Gets weather forecasts from this API: https://openweathermap.org/forecast5 + What you get is a list of 40 forecasts for 3-hour time slices, totaling to 5 days. + :return: + Forecasts data dictionary + """ + # + forecast_url = ( + f"{self._base_url}/forecast?id={self.city_id}&appid={self.api_key}&units=Metric&lang={self.language}" + ) response = requests.get(forecast_url) if not response.ok: - raise AssertionError(f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}") - data = json.loads(response.text)["list"] - return data + raise AssertionError( + f"Failure getting the current weather: code {response.status_code}. Reason: {response.text}" + ) + forecast_data = json.loads(response.text)["list"] + # Add forecast data to hourly_data_dict list of dictionaries + hourly_forecasts = [] + for forecast in forecast_data: + # calculate combined precipitation (snow + rain) + precip_mm = 0.0 + if "rain" in forecast.keys(): + precip_mm = +forecast["rain"]["3h"] # OWM Unit: mm + if "snow" in forecast.keys(): + precip_mm = +forecast["snow"]["3h"] # OWM Unit: mm + hourly_forecasts.append( + { + "temp": self.get_converted_temperature( + forecast["main"]["temp"] + ), # OWM Unit Default: Kelvin, Metric: Celsius, Imperial: Fahrenheit + "min_temp": self.get_converted_temperature(forecast["main"]["temp_min"]), + "max_temp": self.get_converted_temperature(forecast["main"]["temp_max"]), + "precip_3h_mm": precip_mm, + "wind": self.get_converted_windspeed( + forecast["wind"]["speed"] + ), # OWM Unit Default: meter/sec, Metric: meter/sec, Imperial: miles/hour + "wind_gust": self.get_converted_windspeed(forecast["wind"]["gust"]), + "pressure": forecast["main"]["pressure"], # OWM Unit: hPa + "humidity": forecast["main"]["humidity"], # OWM Unit: % rH + "precip_probability": forecast["pop"] + * 100.0, # OWM value is unitless, directly converting to % scale + "icon": forecast["weather"][0]["icon"], + "datetime": datetime.fromtimestamp(forecast["dt"], tz=self.tz_zone) + } + ) + logger.debug(f"Added rain forecast at {datetime.fromtimestamp(forecast['dt'], tz=self.tz_zone)}: {precip_mm}") + + self.hourly_forecasts = hourly_forecasts + + return self.hourly_forecasts + + def get_forecast_for_day(self, days_from_today: int) -> Dict: + """ + Get temperature range, rain and most frequent icon code + for the day that is days_from_today away + :param days_from_today: + should be int from 0-4: e.g. 2 -> 2 days from today + :return: + Forecast dictionary + """ + # Make sure hourly forecasts are up to date + _ = self.get_weather_forecast() + + # Calculate the start and end times for the specified number of days from now + current_time = datetime.now() + start_time = ( + (current_time + timedelta(days=days_from_today)) + .replace(hour=0, minute=0, second=0, microsecond=0) + .astimezone(tz=self.tz_zone) + ) + end_time = (start_time + timedelta(days=1)).astimezone(tz=self.tz_zone) + + # Get all the forecasts for that day's time range + forecasts = [ + f + for f in self.hourly_forecasts + if is_timestamp_within_range(timestamp=f["datetime"], start_time=start_time, end_time=end_time) + ] + + # In case the next available forecast is already for the next day, use that one for the less than 3 remaining hours of today + if forecasts == []: + forecasts.append(self.hourly_forecasts[0]) + + # Get rain and temperatures for that day + temps = [f["temp"] for f in forecasts] + rain = sum([f["precip_3h_mm"] for f in forecasts]) + + # Get all weather icon codes for this day + icons = [f["icon"] for f in forecasts] + day_icons = [icon for icon in icons if "d" in icon] + + # Use the day icons if possible + icon = max(set(day_icons), key=icons.count) if len(day_icons) > 0 else max(set(icons), key=icons.count) + + # Return a dict with that day's data + day_data = { + "datetime": start_time.timestamp(), + "icon": icon, + "temp_min": min(temps), + "temp_max": max(temps), + "precip_mm": rain, + } + + return day_data + + def get_converted_temperature(self, value: float) -> float: + if self.temp_unit == "fahrenheit": + value = self.celsius_to_fahrenheit(value) + return value + + def get_converted_windspeed(self, value: float) -> float: + Literal["meters_sec", "km_hour", "miles_hour", "knots", "beaufort"] + if self.wind_unit == "km_hour": + value = self.celsius_to_fahrenheit(value) + elif self.wind_unit == "km_hour": + value = self.mps_to_kph(value) + elif self.wind_unit == "miles_hour": + value = self.mps_to_mph(value) + elif self.wind_unit == "knots": + value = self.mps_to_knots(value) + elif self.wind_unit == "beaufort": + value = self.mps_to_beaufort(value) + return value + + @staticmethod + def mps_to_beaufort(meters_per_second: float) -> int: + """Map meters per second to the beaufort scale. + + Args: + meters_per_second: + float representing meters per seconds + + Returns: + an integer of the beaufort scale mapping the input + """ + thresholds = [0.3, 1.6, 3.4, 5.5, 8.0, 10.8, 13.9, 17.2, 20.8, 24.5, 28.5, 32.7] + return next((i for i, threshold in enumerate(thresholds) if meters_per_second < threshold), 12) + + @staticmethod + def mps_to_mph(meters_per_second: float) -> float: + """Map meters per second to miles per hour + + Args: + meters_per_second: + float representing meters per seconds. + + Returns: + float representing the input value in miles per hour. + """ + # 1 m/s is approximately equal to 2.23694 mph + miles_per_hour = meters_per_second * 2.23694 + return miles_per_hour + + @staticmethod + def mps_to_kph(meters_per_second: float) -> float: + """Map meters per second to kilometers per hour + + Args: + meters_per_second: + float representing meters per seconds. + + Returns: + float representing the input value in kilometers per hour. + """ + # 1 m/s is equal to 3.6 km/h + kph = meters_per_second * 3.6 + return kph + + @staticmethod + def mps_to_knots(meters_per_second: float) -> float: + """Map meters per second to knots (nautical miles per hour) + + Args: + meters_per_second: + float representing meters per seconds. + + Returns: + float representing the input value in knots. + """ + # 1 m/s is equal to 1.94384 knots + knots = meters_per_second * 1.94384 + return knots + + @staticmethod + def celsius_to_fahrenheit(celsius: int or float) -> float: + """Converts the given temperate from degrees Celsius to Fahrenheit.""" + fahrenheit = (float(celsius) * 9.0 / 5.0) + 32.0 + return fahrenheit + + +def main(): + """Main function, only used for testing purposes""" + key = "" + city = 2643743 + lang = "de" + owm = OpenWeatherMap(api_key=key, city_id=city, language=lang) + + current_weather = owm.get_current_weather() + print(current_weather) + hourly_forecasts = owm.get_weather_forecast() + print(owm.get_forecast_for_day(days_from_today=2)) + +if __name__ == "__main__": + main() diff --git a/inkycal/modules/inkycal_fullweather.py b/inkycal/modules/inkycal_fullweather.py index 699a2e7..12eb10b 100644 --- a/inkycal/modules/inkycal_fullweather.py +++ b/inkycal/modules/inkycal_fullweather.py @@ -19,7 +19,7 @@ from PIL import ImageFont from PIL import ImageOps from icons.weather_icons.weather_icons import get_weather_icon -from inkycal.custom import owm_forecasts +from inkycal.custom import openweathermap_wrapper from inkycal.custom.functions import fonts from inkycal.custom.functions import internet_available from inkycal.custom.functions import top_level @@ -82,11 +82,11 @@ class Fullweather(inkycal_module): optional = { "orientation": {"label": "Please select the desired orientation", "options": ["vertical", "horizontal"]}, - "temp_units": { + "temp_unit": { "label": "Which temperature unit should be used?", "options": ["celsius", "fahrenheit"], }, - "wind_units": { + "wind_unit": { "label": "Which wind speed unit should be used?", "options": ["beaufort", "knots", "miles_hour", "km_hour", "meters_sec"], }, @@ -112,7 +112,7 @@ class Fullweather(inkycal_module): }, "font": { "label": "Font family to use for the entire screen", - "options": ["Roboto", "NotoSans", "Poppins"], + "options": ["NotoSans", "Roboto", "Poppins"], }, "chart_title": { "label": "Title of the temperature and precipitation plot", @@ -151,17 +151,17 @@ class Fullweather(inkycal_module): assert self.orientation in ["horizontal", "vertical"] else: self.orientation = "horizontal" - if "wind_units" in config: - self.wind_units = config["wind_units"] + if "wind_unit" in config: + self.wind_unit = config["wind_unit"] else: - self.wind_units = "meters_sec" - if self.wind_units == "beaufort": + self.wind_unit = "meters_sec" + if self.wind_unit == "beaufort": self.windDispUnit = "bft" - elif self.wind_units == "knots": + elif self.wind_unit == "knots": self.windDispUnit = "kn" - elif self.wind_units == "km_hour": + elif self.wind_unit == "km_hour": self.windDispUnit = "km/h" - elif self.wind_units == "miles_hour": + elif self.wind_unit == "miles_hour": self.windDispUnit = "mph" else: self.windDispUnit = "m/s" @@ -171,13 +171,13 @@ class Fullweather(inkycal_module): else: self.wind_gusts = True - if "temp_units" in config: - self.temp_units = config["temp_units"] + if "temp_unit" in config: + self.temp_unit = config["temp_unit"] else: - self.temp_units = "celsius" - if self.temp_units == "fahrenheit": + self.temp_unit = "celsius" + if self.temp_unit == "fahrenheit": self.tempDispUnit = "F" - elif self.temp_units == "celsius": + elif self.temp_unit == "celsius": self.tempDispUnit = "°" if "weekly_title" in config: @@ -304,7 +304,7 @@ class Fullweather(inkycal_module): self.image.paste(humidityIcon, (15, humidity_y)) # Humidity - humidityString = f"{self.current_weather.humidity} %" + humidityString = f"{self.current_weather['humidity']} %" humidityFont = self.get_font("Bold", self.font_size + 8) image_draw.text((65, humidity_y), humidityString, font=humidityFont, fill=(255, 255, 255)) @@ -315,7 +315,7 @@ class Fullweather(inkycal_module): self.image.paste(uvIcon, (15, ux_y)) # uvindex - uvString = f"{self.current_weather.uvi if self.current_weather.uvi else '0'}" + uvString = f"{self.current_weather['uvi'] if self.current_weather['uvi'] else '0'}" uvFont = self.get_font("Bold", self.font_size + 8) image_draw.text((65, ux_y), uvString, font=uvFont, fill=(255, 255, 255)) @@ -327,7 +327,7 @@ class Fullweather(inkycal_module): image_draw = ImageDraw.Draw(self.image) ## Add detailed weather status text to the image - sumString = self.current_weather.detailed_status.replace(" ", "\n ") + sumString = self.current_weather["detailed_status"].replace(" ", "\n ") sumFont = self.get_font("Regular", self.font_size + 8) maxW = 0 totalH = 0 @@ -343,7 +343,7 @@ class Fullweather(inkycal_module): logger.debug(f"Added current weather detailed status text: {sumString} at x:{sumtext_x}/y:{sumtext_y}.") ## Add current weather icon to the image - icon = get_weather_icon(icon_name=self.current_weather.weather_icon_name, size=150) + icon = get_weather_icon(icon_name=self.current_weather["weather_icon_name"], size=150) # Create a mask from the alpha channel of the weather icon if len(icon.split()) == 4: mask = icon.split()[-1] @@ -355,7 +355,7 @@ class Fullweather(inkycal_module): self.image.paste(icon, (icon_x, icon_y), mask) ## Add current temperature to the image - tempString = f"{self.current_weather.temperature(self.temp_units)['feels_like']:.0f}{self.tempDispUnit}" + tempString = f"{self.current_weather['temp_feels_like']:.0f}{self.tempDispUnit}" tempFont = self.get_font("Bold", 68) # Get the width of the text tempStringbbox = tempFont.getbbox(tempString) @@ -425,7 +425,7 @@ class Fullweather(inkycal_module): # Plot Temperature as line plot in red ax1.plot(timestamps, temperatures, marker=".", linestyle="-", color="r") - temp_base = 3 if self.temp_units == "celsius" else 5 + temp_base = 3 if self.temp_unit == "celsius" else 5 fig.gca().yaxis.set_major_locator(ticker.MultipleLocator(base=temp_base)) ax1.tick_params(axis="y", colors="red") ax1.set_yticks(ax1.get_yticks()) @@ -508,7 +508,7 @@ class Fullweather(inkycal_module): x_rect = self.left_section_width + 20 + i * rectangle_width # Start from the title width y_rect = int(self.height / 2 + 30) - day_data = owm_forecasts.get_forecast_for_day(days_from_today=i, hourly_forecasts=self.hourly_forecasts) + day_data = self.my_owm.get_forecast_for_day(days_from_today=i) rect = Image.new("RGBA", (int(rectangle_width), int(rectangle_height)), (255, 255, 255)) rect_draw = ImageDraw.Draw(rect) @@ -605,13 +605,22 @@ class Fullweather(inkycal_module): raise NetworkNotReachableError # Get the weather - (self.current_weather, self.hourly_forecasts) = owm_forecasts.get_owm_data( - token=self.api_key, + self.my_owm = openweathermap_wrapper.OpenWeatherMap( + api_key=self.api_key, city_id=self.location, - temp_units=self.temp_units, - wind_units=self.wind_units, + temp_unit=self.temp_unit, + wind_unit=self.wind_unit, language=self.language, ) + self.current_weather = self.my_owm.get_current_weather() + self.hourly_forecasts = self.my_owm.get_weather_forecast() + # (self.current_weather, self.hourly_forecasts) = owm_forecasts.get_owm_data( + # token=self.api_key, + # city_id=self.location, + # temp_unit=self.temp_unit, + # wind_unit=self.wind_unit, + # language=self.language, + # ) ## Create Base Image self.createBaseImage() diff --git a/requirements.txt b/requirements.txt index bd22fa7..c75546b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,6 @@ cycler==0.12.1 feedparser==6.0.10 fonttools==4.45.1 frozendict==2.4.0 -geojson==2.5.0 gpiozero==2.0 html2text==2020.1.16 html5lib==1.1 @@ -27,9 +26,7 @@ packaging==23.2 pandas==2.1.4 peewee==3.17.0 Pillow==10.1.0 -pyowm==3.3.0 pyparsing==3.1.1 -PySocks==1.7.1 python-dateutil==2.8.2 python-dotenv==1.0.0 pytz==2023.3.post1