Here's an example of dynamic scheduler for taskiq. It uses postgres to store all tasks.
We created custom schedule source that is capable of storing and retrieving scheduled tasks.
Start workers by running:
taskiq worker tkq:broker --fs-discover
import os | |
from pydantic import AmqpDsn | |
from pydantic import BaseSettings | |
from pydantic import HttpUrl | |
from pydantic import PostgresDsn | |
from pydantic import RedisDsn | |
from . import VERSION |
Here's an example of dynamic scheduler for taskiq. It uses postgres to store all tasks.
We created custom schedule source that is capable of storing and retrieving scheduled tasks.
Start workers by running:
taskiq worker tkq:broker --fs-discover
# syntax=docker/dockerfile:1 | |
# Keep this syntax directive! It's used to enable Docker BuildKit | |
# Based on https://github.com/python-poetry/poetry/discussions/1879?sort=top#discussioncomment-216865 | |
# but I try to keep it updated (see history) | |
################################ | |
# PYTHON-BASE | |
# Sets up all our shared environment variables | |
################################ |
#!/usr/bin/env ipython -i | |
import datetime | |
import json | |
from typing import Optional | |
import sqlalchemy as sa | |
from sqlalchemy.orm import declarative_base, sessionmaker | |
from sqlalchemy.dialects.postgresql import JSONB | |
from pydantic import BaseModel, Field, parse_obj_as |
OPENWEATHER_APIKEY = "" # TODO add your apikey here | |
lat = "49.868183" # latitude of the city you want to get the weather from | |
lon = "8.626288499" # longitude of the city you want to get the weather from | |
def getWeatherEmoji(weatherID) | |
# Openweathermap Weather codes and corressponding emojis | |
thunderstorm = "\u{1F4A8}" # Code: 200's, 900, 901, 902, 905 | |
drizzle = "\u{1F4A7}" # Code: 300's | |
rain = "\u{02614}" # Code: 500's |