diff --git a/.dockerignore b/.dockerignore index c5ef958..cee03ca 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,9 @@ *.pyc __pycache__ .venv -.deta .env + +_test.py +celerybeat-schedule.db +celerybeat-schedule-* +celerybeat-schedule diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 5785e6e..e210819 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -8,7 +8,7 @@ on: jobs: check-requirements: name: Check Requirements - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - name: Set Version Tag run: echo "API_TAG=$(echo $GITHUB_REF | awk -F '/' '{print $NF}')" >> $GITHUB_ENV @@ -17,25 +17,31 @@ jobs: build-image: name: Build Image - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 needs: check-requirements steps: - uses: actions/checkout@v4 - name: Parse API Version run: echo "API_VERSION=$(echo $GITHUB_REF | awk -F '/' '{print $NF}' | cut -c 2-)" >> $GITHUB_ENV - name: Docker Login - run: echo ${{ secrets.DOCKER_ACCESS_TOKEN }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin + run: | + echo ${{ secrets.DOCKER_ACCESS_TOKEN }} | docker login -u ${{ secrets.DOCKER_USERNAME }} --password-stdin + echo ${{ secrets.GHCRIO_ACCESS_TOKEN }} | docker login ghcr.io -u ${{ secrets.GHCRIO_USERNAME }} --password-stdin - name: Build Image - run: docker build -t steamcmd/api:latest . + run: docker build -t steamcmd/api:latest -t ghcr.io/steamcmd/api:latest . # deploy - name: Tag Image - run: docker tag steamcmd/api:latest steamcmd/api:$API_VERSION + run: | + docker tag steamcmd/api:latest steamcmd/api:$API_VERSION + docker tag ghcr.io/steamcmd/api:latest ghcr.io/steamcmd/api:$API_VERSION - name: Push Image - run: docker push steamcmd/api --all-tags + run: | + docker push steamcmd/api --all-tags + docker push ghcr.io/steamcmd/api --all-tags update-readme: name: Update Readme - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - name: Update Docker Hub Description @@ -44,28 +50,3 @@ jobs: DOCKERHUB_USERNAME: ${{ secrets.DOCKER_USERNAME }} DOCKERHUB_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKERHUB_REPOSITORY: steamcmd/api - - deploy-fly: - name: Deploy Fly.io - runs-on: ubuntu-22.04 - needs: [check-requirements, build-image] - steps: - - uses: actions/checkout@v4 - - uses: superfly/flyctl-actions/setup-flyctl@master - - name: Parse API Version - run: echo "API_VERSION=$(echo $GITHUB_REF | awk -F '/' '{print $NF}' | cut -c 2-)" >> $GITHUB_ENV - - name: Deploy API on Fly.io - run: flyctl deploy --app steamcmd --image steamcmd/api:${{ env.API_VERSION }} -e VERSION=${{ env.API_VERSION }} - env: - FLY_API_TOKEN: ${{ secrets.FLY_ACCESS_TOKEN }} - - deploy-render: - name: Deploy Render.com - runs-on: ubuntu-22.04 - needs: [check-requirements, build-image] - steps: - - uses: actions/checkout@v4 - - name: Parse API Version - run: echo "API_VERSION=$(echo $GITHUB_REF | awk -F '/' '{print $NF}' | cut -c 2-)" >> $GITHUB_ENV - - name: Deploy API on Render.com - run: curl https://api.render.com/deploy/${{ secrets.RENDER_SERVICE_ID }}?key=${{ secrets.RENDER_API_KEY }}&imgURL=docker.io%2Fsteamcmd%2Fapi%40${{ env.API_VERSION }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6bbbc5..19bce21 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,7 +11,7 @@ on: jobs: test-image: name: Test Image - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - name: Build Image @@ -19,7 +19,7 @@ jobs: python-lint: name: Python Lint - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 - uses: jpetrucciani/ruff-check@main diff --git a/.gitignore b/.gitignore index c5ef958..aa25c57 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,12 @@ +.DS_Store +.ruff_cache + *.pyc __pycache__ .venv -.deta .env + +_test.py +celerybeat-schedule.db +celerybeat-schedule-* +celerybeat-schedule diff --git a/Dockerfile b/Dockerfile index 3a46c80..2436dc0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,4 +29,4 @@ COPY --chown=$USER:$USER src/ $HOME/ ##################### INSTALLATION END ##################### # Set default container command -CMD exec gunicorn main:app --max-requests 3000 --max-requests-jitter 150 --workers $WORKERS --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:$PORT \ No newline at end of file +CMD exec gunicorn web:app --max-requests 3000 --max-requests-jitter 150 --workers $WORKERS --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:$PORT \ No newline at end of file diff --git a/README.md b/README.md index 0d1faa4..93182ac 100644 --- a/README.md +++ b/README.md @@ -4,64 +4,20 @@ [![Discord Online](https://img.shields.io/discord/928592378711912488.svg)](https://discord.steamcmd.net) [![Mastodon Follow](https://img.shields.io/mastodon/follow/109302774947550572?domain=https%3A%2F%2Ffosstodon.org&style=flat)](https://fosstodon.org/@steamcmd) [![Image Size](https://img.shields.io/docker/image-size/steamcmd/api/latest.svg)](https://hub.docker.com/r/steamcmd/api) -[![Better Uptime](https://betteruptime.com/status-badges/v1/monitor/ln3p.svg)](https://status.steamcmd.net) +[![Uptime Robot Uptime](https://img.shields.io/uptimerobot/ratio/m782827237-5067fd1d69e3b1b2e4e40fff)](https://status.steamcmd.net) [![GitHub Release](https://img.shields.io/github/v/release/steamcmd/api?label=version)](https://github.com/steamcmd/api/releases) [![GitHub Sponsors](https://img.shields.io/github/sponsors/steamcmd)](https://github.com/sponsors/steamcmd) [![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) # SteamCMD API -Read-only API interface for steamcmd app_info. Updates of this code are -automatically deployed via [Github Actions](https://github.com/steamcmd/api/actions) -when a new version has been created on Github. +Read-only API interface for steamcmd app_info. The official API is reachable on +[api.steamcmd.net](https://api.steamcmd.net) and it's documentation can be found +on [www.steamcmd.net](https://www.steamcmd.net). ## Self-hosting -The easiest way to host the API yourself is using the free cloud platform -[Fly.io](https://fly.io). Install the CLI according to the documentation: -[https://fly.io/docs/hands-on/install-flyctl/](https://fly.io/docs/hands-on/install-flyctl/). - -After installing, authenticate locally with the `flyctl` cli: -```bash -fly auth login -``` -Create the app and redis instances (choose your own names): -```bash -fly apps create -fly redis create -``` -Retrieve the Redis connection URL (you will need this later): -```bash -fly redis status - -Redis - ID = xxxxxxxxxxxxxxxxxx - Name = api - Plan = Free - Primary Region = ams - Read Regions = None - Eviction = Enabled - Private URL = redis://default:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx@fly-api.upstash.io <== Write the password down -``` -Set the required configuration environment variables: -```bash -fly secrets set --app \ - CACHE=True \ - CACHE_TYPE=redis \ - CACHE_EXPIRATION=120 \ - REDIS_HOST="fly-api.upstash.io" \ - REDIS_PORT=6379 \ - REDIS_PASSWORD="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" -``` -Finally deploy the API Docker image with the latest code: -```bash -fly deploy --app --image steamcmd/api:latest -e VERSION=1.0.0 -``` -The version is optional and currently only required for the `/v1/version` endpoint. - -## Container - -The API can easily be run via a Docker image which contains the API code and the +The API can easily be run via a container image which contains the API code and the `uvicorn` tool to be able to respond to web requests. With every new version of the API the Docker images is automatically rebuild and pushed to the Docker Hub: ```bash @@ -73,8 +29,16 @@ docker pull steamcmd/api:1.10.0 ```bash docker run -p 8000:8000 -d steamcmd/api:latest ``` -However during development, using Docker Compose is preferred. See the -[Development](#development) section for information. +The API consists of 2 services; the **Web** and the **Job** service and the Redis +cache. The **Job** service and the Redis cache are both optional but are both required +if you want to run the **Job** service. + +Details on how the official API is hosted can be found in the +[platform](https://github.com/steamcmd/platform) repository. This repository contains +all the infrastructure as code that is used to deploy the API on a Kubernetes cluster. + +See the [Development](#development) section for more information on running +the API and Job services directly via Python. ## Configuration @@ -89,7 +53,7 @@ that you will need to set the corresponding cache settings for that type as well when using the **redis** type). All the available options in an `.env` file: -``` +```shell # general VERSION=1.0.0 @@ -109,34 +73,34 @@ REDIS_URL="redis://YourUsername:YourRedisP@ssword!@your.redis.host.example.com:6 # logging LOG_LEVEL=info - -# deta -DETA_BASE_NAME="steamcmd" -DETA_PROJECT_KEY="YourDet@ProjectKey!" ``` ## Development -Run the api locally by installing a web server like uvicorn and running it: +To develop locally start by creating a Python virtual environment and install the prerequisites: ```bash python3 -m venv .venv source .venv/bin/activate pip install -r requirements.txt -pip install uvicorn -cd src/ -uvicorn main:app --reload ``` -The easiest way to spin up a complete development environment is using Docker -compose. This will build the image locally, mount the correct directory (`src`) -and set the required environment variables. If you are on windows you should -store the repository in the WSL filesystem or it will fail. Execute compose up -in the root: +Run the Web Service (FastAPI) locally by running the FastAPI development server: ```bash -docker compose up +source .venv/bin/activate +cd src/ +fastapi dev web.py ``` Now you can reach the SteamCMD API locally on [http://localhost:8000](http://localhost:8000). +Run the Job Service (Celery) locally by running celery directly: +```bash +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +cd src/ +celery -A job worker --loglevel=info --concurrency=2 --beat +``` + ### Black To keep things simple, [Black](https://github.com/python/black) is used for code diff --git a/docker-compose.yml b/docker-compose.yml index 893a12e..7c723ae 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: web: build: . - command: "gunicorn main:app --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 --reload" + command: "gunicorn web:app --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 --reload" ports: - "8000:8000" volumes: @@ -10,7 +10,7 @@ services: PORT: 8000 WORKERS: 4 VERSION: 9.9.9 - CACHE: True + CACHE: "True" CACHE_TYPE: redis CACHE_EXPIRATION: 120 REDIS_HOST: redis diff --git a/requirements.txt b/requirements.txt index 10495dc..0c04916 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,17 @@ -fastapi -redis -deta - +## general semver -python-dotenv logfmter +## web +fastapi[standard] +redis +minio + +## steam steam[client] gevent + +## job +celery +celery-singleton +flower \ No newline at end of file diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..8123ff1 --- /dev/null +++ b/src/config.py @@ -0,0 +1,69 @@ +import utils.general +import utils.helper +import logging +from dotenv import load_dotenv +from logfmter import Logfmter + +# fmt: off + +# Load values from .env file +load_dotenv() + +# Set variables based on environment +cache = utils.helper.read_env("CACHE", "False", choices=[ "True", "False" ]) +cache_type = utils.helper.read_env("CACHE_TYPE", "redis", choices=[ "redis" ]) +cache_expiration = utils.helper.read_env("CACHE_EXPIRATION", "120") + +redis_url = utils.helper.read_env("REDIS_URL") +redis_host = utils.helper.read_env("REDIS_HOST", "localhost") +redis_port = utils.helper.read_env("REDIS_PORT", "6379") +redis_password = utils.helper.read_env("REDIS_PASSWORD") +redis_database = utils.helper.read_env("REDIS_DATABASE", "0") + +storage_type = utils.helper.read_env("STORAGE_TYPE", "local", choices=[ "local", "object" ]) +storage_directory = utils.helper.read_env("STORAGE_DIRECTORY", "data/", dependency={ "STORAGE_TYPE": "local" }) +storage_object_endpoint = utils.helper.read_env("STORAGE_OBJECT_ENDPOINT", dependency={ "STORAGE_TYPE": "object" }) +storage_object_access_key = utils.helper.read_env("STORAGE_OBJECT_ACCESS_KEY", dependency={ "STORAGE_TYPE": "object" }) +storage_object_secret_key = utils.helper.read_env("STORAGE_OBJECT_SECRET_KEY", dependency={ "STORAGE_TYPE": "object" }) +storage_object_bucket = utils.helper.read_env("STORAGE_OBJECT_BUCKET", dependency={ "STORAGE_TYPE": "object" }) +storage_object_secure = utils.helper.read_env("STORAGE_OBJECT_SECURE", True) +storage_object_region = utils.helper.read_env("STORAGE_OBJECT_REGION", False) + +log_level = utils.helper.read_env("LOG_LEVEL", "info", choices=[ "debug", "info", "warning", "error", "critical" ]) +version = utils.helper.read_env("VERSION", "9.9.9") + +# Set general settings +chunk_size = 10 + +# Logging configuration +formatter = Logfmter(keys=["level"], mapping={"level": "levelname"}) +handler = logging.StreamHandler() +handler.setFormatter(formatter) +logging.basicConfig(handlers=[handler], level=utils.general.log_level(log_level)) + +# Set Celery configuration +timezone = "UTC" +broker_url = redis_url +broker_connection_retry_on_startup = True +beat_schedule = { + "check-changelist-every-5-seconds": { + "task": "check_changelist", + "schedule": 5.0 + }, + #"check-missing-apps-every-30-minutes": { + # "task": "check_missing_apps", + # "schedule": 1800.0, + #}, + "check-incorrect-apps-every-30-minutes": { + "task": "check_incorrect_apps", + "schedule": 1800.0, + }, + "check-deadlocks-every-1-hour": { + "task": "check_deadlocks", + "schedule": 3600.0, + }, +} +worker_concurrency = 4 + +# Dynamically import all tasks files +imports = utils.helper.list_tasks() diff --git a/src/functions.py b/src/functions.py deleted file mode 100644 index ff87f51..0000000 --- a/src/functions.py +++ /dev/null @@ -1,274 +0,0 @@ -""" -General Functions -""" - -# import modules -import os -import json -import gevent -import redis -import logging -from steam.client import SteamClient -from deta import Deta - - -def app_info(app_id): - connect_retries = 2 - connect_timeout = 3 - - logging.info("Started requesting app info", extra={"app_id": app_id}) - - try: - # Sometimes it hangs for 30+ seconds. Normal connection takes about 500ms - for _ in range(connect_retries): - count = str(_) - - try: - with gevent.Timeout(connect_timeout): - logging.info( - "Retrieving app info from steamclient", - extra={"app_id": app_id, "retry_count": count}, - ) - - logging.debug("Connecting via steamclient to steam api") - client = SteamClient() - client.anonymous_login() - client.verbose_debug = False - - logging.debug("Requesting app info from steam api") - info = client.get_product_info(apps=[app_id], timeout=1) - - return info - - except gevent.timeout.Timeout: - logging.warning( - "Encountered timeout when trying to connect to steam api. Retrying.." - ) - client._connecting = False - - else: - logging.info("Succesfully retrieved app info", extra={"app_id": app_id}) - break - else: - logging.error( - "Max connect retries exceeded", - extra={"connect_retries": connect_retries}, - ) - raise Exception(f"Max connect retries ({connect_retries}) exceeded") - - except Exception as err: - logging.error("Failed in retrieving app info", extra={"app_id": app_id}) - logging.error(err, extra={"app_id": app_id}) - - -def cache_read(app_id): - """ - Read app info from chosen cache. - """ - - if os.environ["CACHE_TYPE"] == "redis": - return redis_read(app_id) - elif os.environ["CACHE_TYPE"] == "deta": - return deta_read(app_id) - else: - # print query parse error and return empty dict - logging.error( - "Set incorrect cache type", - extra={"app_id": app_id, "cache_type": os.environ["CACHE_TYPE"]}, - ) - - # return failed status - return False - - -def cache_write(app_id, data): - """ - write app info to chosen cache. - """ - - if os.environ["CACHE_TYPE"] == "redis": - return redis_write(app_id, data) - elif os.environ["CACHE_TYPE"] == "deta": - return deta_write(app_id, data) - else: - # print query parse error and return empty dict - logging.error( - "Set incorrect cache type", - extra={"app_id": app_id, "cache_type": os.environ["CACHE_TYPE"]}, - ) - - # return failed status - return False - - -def redis_connection(): - """ - Parse redis config and connect. - """ - - # try connection string, or default to separate REDIS_* env vars - if "REDIS_URL" in os.environ: - rds = redis.Redis.from_url(os.environ["REDIS_URL"]) - elif "REDIS_PASSWORD" in os.environ: - rds = redis.Redis( - host=os.environ["REDIS_HOST"], - port=os.environ["REDIS_PORT"], - password=os.environ["REDIS_PASSWORD"], - ) - else: - rds = redis.Redis(host=os.environ["REDIS_HOST"], port=os.environ["REDIS_PORT"]) - - # return connection - return rds - - -def redis_read(app_id): - """ - Read app info from Redis cache. - """ - - rds = redis_connection() - - try: - # get info from cache - data = rds.get(app_id) - - # return if not found - if not data: - # return failed status - return False - - # decode bytes to str - data = data.decode("UTF-8") - - # convert json to python dict - data = json.loads(data) - - # return cached data - return data - - except Exception as redis_error: - # print query parse error and return empty dict - logging.error( - "An error occured while trying to read and decode from Redis cache", - extra={"app_id": app_id, "error_msg": redis_error}, - ) - - # return failed status - return False - - -def redis_write(app_id, data): - """ - Write app info to Redis cache. - """ - - rds = redis_connection() - - # write cache data and set ttl - try: - # convert dict to json - data = json.dumps(data) - - # insert data into cache - expiration = int(os.environ["CACHE_EXPIRATION"]) - rds.set(app_id, data, ex=expiration) - - # return succes status - return True - - except Exception as redis_error: - # print query parse error and return empty dict - logging.error( - "An error occured while trying to write to Redis cache", - extra={"app_id": app_id, "error_msg": redis_error}, - ) - - # return fail status - return False - - -def log_level(level): - """ - Sets lowest level to log. - """ - - match level: - case "debug": - logging.getLogger().setLevel(logging.DEBUG) - case "info": - logging.getLogger().setLevel(logging.INFO) - case "warning": - logging.getLogger().setLevel(logging.WARNING) - case "error": - logging.getLogger().setLevel(logging.ERROR) - case "critical": - logging.getLogger().setLevel(logging.CRITICAL) - case _: - logging.getLogger().setLevel(logging.WARNING) - - -def deta_read(app_id): - """ - Read app info from Deta base cache. - """ - - # initialize with a project key - deta = Deta(os.environ["DETA_PROJECT_KEY"]) - - # connect (and create) database - dbs = deta.Base(os.environ["DETA_BASE_NAME"]) - - try: - # get info from cache - data = dbs.get(str(app_id)) - - # return if not found - if not data: - # return failed status - return False - - # return cached data - return data["data"] - - except Exception as read_error: - # print query parse error and return empty dict - print( - "The following error occured while trying to read and decode " - + "from Deta cache:" - ) - print("> " + str(read_error)) - - # return failed status - return False - - -def deta_write(app_id, data): - """ - Write app info to Deta base cache. - """ - - # initialize with a project key - deta = Deta(os.environ["DETA_PROJECT_KEY"]) - - # connect (and create) database - dbs = deta.Base(os.environ["DETA_BASE_NAME"]) - - # write cache data and set ttl - try: - # set expiration ttl - expiration = int(os.environ["CACHE_EXPIRATION"]) - - # insert data into cache - dbs.put({"data": data}, str(app_id), expire_in=expiration) - - # return succes status - return True - - except Exception as write_error: - # print query parse error and return empty dict - print("The following error occured while trying to write to Deta cache:") - print("> " + str(write_error)) - - # return fail status - return False diff --git a/src/job.py b/src/job.py new file mode 100644 index 0000000..decf66c --- /dev/null +++ b/src/job.py @@ -0,0 +1,16 @@ +""" +Job Service and background worker. +""" + +# import modules +from celery import Celery +import logging + +# set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# initialise Celery application +app = Celery() +app.config_from_object("config") +app.autodiscover_tasks(["job.tasks"]) diff --git a/src/tasks/check_changelist.py b/src/tasks/check_changelist.py new file mode 100644 index 0000000..56fab17 --- /dev/null +++ b/src/tasks/check_changelist.py @@ -0,0 +1,61 @@ +from job import app +from celery_singleton import Singleton +from .get_app_info import get_app_info_task +import utils.steam +import utils.redis +import logging +import config +import time + + +@app.task(name="check_changelist", base=Singleton, lock_expiry=10) +def check_changelist_task(): + """ + Check for app and package changes between changelists + and start tasks to retrieve these changes. + """ + + previous_change_number = utils.redis.read("_state.change_number") + latest_change_number = utils.steam.get_change_number() + + if not previous_change_number: + logging.warning("Previous changenumber could not be retrieved from Redis") + utils.redis.write("_state.change_number", latest_change_number) + + elif not latest_change_number: + logging.error( + "The current change number could not be retrieved. Instead got: " + + str(latest_change_number) + ) + pass + + elif int(previous_change_number) == int(latest_change_number): + logging.info( + "The previous and current change number " + + str(latest_change_number) + + " are the same" + ) + pass + + else: + logging.info( + "The changenumber has been updated from " + + str(previous_change_number) + + " to " + + str(latest_change_number) + ) + changes = utils.steam.get_changes_since_change_number(previous_change_number) + + while not changes: + changes = utils.steam.get_changes_since_change_number( + previous_change_number + ) + time.sleep(1) + + for i in range(0, len(changes["apps"]), config.chunk_size): + chunk = changes["apps"][i : i + config.chunk_size] + get_app_info_task.delay(chunk) + + utils.redis.write("_state.change_number", latest_change_number) + utils.redis.increment("_state.changed_apps", len(changes["apps"])) + utils.redis.increment("_state.changed_packages", len(changes["packages"])) diff --git a/src/tasks/check_deadlocks.py b/src/tasks/check_deadlocks.py new file mode 100644 index 0000000..d118f0b --- /dev/null +++ b/src/tasks/check_deadlocks.py @@ -0,0 +1,25 @@ +from job import app +import logging +from celery_singleton import clear_locks + + +@app.task(name="check_deadlocks", bind=True) +def check_deadlocks_task(self): + """ + If no running tasks, purge all singleton locks + in Redis to avoid for ever deadlocks. + """ + + workers = app.control.inspect() + workers = workers.active() + + total_tasks = [] + for worker in workers: + active_tasks = workers[worker] + for task in active_tasks: + if task["id"] != self.request.id: + total_tasks.append(task) + + clear_locks(app) + + logging.info("Cleared locks. No tasks were running.") diff --git a/src/tasks/check_incorrect_apps.py b/src/tasks/check_incorrect_apps.py new file mode 100644 index 0000000..e8a1500 --- /dev/null +++ b/src/tasks/check_incorrect_apps.py @@ -0,0 +1,51 @@ +from job import app, logger +from celery_singleton import Singleton +from .get_app_info import get_app_info_task +import utils.redis +import utils.steam +import config + + +@app.task(name="check_incorrect_apps", base=Singleton, lock_expiry=7200) +def check_incorrect_apps_task(): + """ + Check for stored apps that have the value of "false" and + remove them from the cache. Then start tasks to retrieve + the info for these apps. + """ + + # connecting to Redis + rds = utils.redis.connect() + + # initialize cursor + cursor = 0 + false_apps = [] + + # use SCAN to iterate through keys that start with "app." + while True: + cursor, apps = rds.scan(cursor, match="app.*") + for app in apps: + # Check if the value of the app is "false" + if rds.get(app) == b"false": + app = app.decode("UTF-8") + app = app.split(".")[1] + false_apps.append(int(app)) + + if cursor == 0: + break + + if len(false_apps) > 0: + logger.warning( + "Found " + + str(len(false_apps)) + + " apps that have a stored value of 'false'" + ) + + for i in range(0, len(false_apps), config.chunk_size): + chunk = false_apps[i : i + config.chunk_size] + logger.warning( + "Deleting " + + str(chunk) + + " apps from cache and starting app info retrieval again" + ) + get_app_info_task.delay(chunk) diff --git a/src/tasks/check_missing_apps.py b/src/tasks/check_missing_apps.py new file mode 100644 index 0000000..da7ca9f --- /dev/null +++ b/src/tasks/check_missing_apps.py @@ -0,0 +1,34 @@ +from job import app, logger +from celery_singleton import Singleton +from .get_app_info import get_app_info_task +import utils.steam +import config + + +@app.task(name="check_missing_apps", base=Singleton, lock_expiry=7200) +def check_missing_apps_task(): + """ + Check for missing stored apps by comparing them with + all available apps in Steam and start tasks to + retrieve the info for the missing apps. + """ + + steam_apps = utils.steam.get_app_list() + + stored_apps = utils.storage.list("app/") + stored_apps_list = [] + for app_obj in stored_apps: + app_id = app_obj.split(".")[0] + app_ext = app_obj.split(".")[1] + if app_ext == "json": + stored_apps_list.append(int(app_id)) + + diff = utils.helper.list_differences(steam_apps, stored_apps_list) + if len(diff) > 0: + logger.info( + "Compared stored apps and found " + str(len(diff)) + " missing apps" + ) + + for i in range(0, len(diff), config.chunk_size): + chunk = diff[i : i + config.chunk_size] + get_app_info_task.delay(chunk) diff --git a/src/tasks/get_app_info.py b/src/tasks/get_app_info.py new file mode 100644 index 0000000..42b05c5 --- /dev/null +++ b/src/tasks/get_app_info.py @@ -0,0 +1,25 @@ +from job import app, logger +import utils.storage +import utils.steam +import json + + +@app.task( + name="get_app_info", + time_limit=15, + autoretry_for=(Exception,), + retry_kwargs={"max_retries": 3, "countdown": 5}, +) +def get_app_info_task(apps=[]): + """ + Get app information of input list of apps, generate + separate json files and upload them to the store. + """ + + logger.info("Getting product info for following apps: " + str(apps)) + apps = utils.steam.get_apps_info(apps) + + for app_obj in apps: + content = {app_obj: apps[app_obj]} + content = json.dumps(content) + utils.redis.write("app." + str(app_obj), content) diff --git a/src/tasks/get_package_info.py b/src/tasks/get_package_info.py new file mode 100644 index 0000000..0f3a167 --- /dev/null +++ b/src/tasks/get_package_info.py @@ -0,0 +1,24 @@ +from job import app, logger +import utils.storage +import utils.steam +import json + + +@app.task( + name="get_package_info", + time_limit=3, + autoretry_for=(Exception,), + retry_kwargs={"max_retries": 3, "countdown": 5}, +) +def get_package_info_task(packages=[]): + """ + Get package information of input list of packages, generate + separate json files and upload them to the store. + """ + + logger.info("Getting product info for following packages: " + str(packages)) + packages = utils.steam.get_packages_info(packages) + + for package_obj in packages: + content = json.dumps(packages[package_obj]) + utils.redis.write("package." + str(package_obj), content) diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/general.py b/src/utils/general.py new file mode 100644 index 0000000..6fcdbe8 --- /dev/null +++ b/src/utils/general.py @@ -0,0 +1,21 @@ +import logging + + +def log_level(level): + """ + Sets lowest level to log. + """ + + match level: + case "debug": + return logging.DEBUG + case "info": + return logging.INFO + case "warning": + return logging.WARNING + case "error": + return logging.ERROR + case "critical": + return logging.CRITICAL + case _: + return logging.WARNING diff --git a/src/utils/helper.py b/src/utils/helper.py new file mode 100644 index 0000000..4c04529 --- /dev/null +++ b/src/utils/helper.py @@ -0,0 +1,110 @@ +import logging +import sys +import os + + +def list_tasks(): + """ + List Python files in local tasks directory + and return tuple of these files. + """ + + tasks_tuple = () + tasks_directory = normalize_directory("tasks") + for task in os.listdir(tasks_directory): + if os.path.splitext(task)[1] == ".py": + tasks_tuple = tasks_tuple + ("tasks." + os.path.splitext(task)[0],) + + return tasks_tuple + + +def read_env(name, default=False, dependency={}, choices=[]): + """ + Get value from environment variable and return + false if not exist. Optionally check if other + variable exists. + """ + + try: + value = os.environ[name] + if choices and value not in choices: + logging.critical( + "The value '" + + str(value) + + "' of variable '" + + str(name) + + "' is incorrect and can only be set to one of the these values: " + + str(choices) + ) + sys.exit(1) + + except KeyError: + if default: + value = default + else: + value = False + + for dep in dependency: + if read_env(dep) == dependency[dep]: + logging.critical( + "The variable '" + + str(name) + + "' must be set because it is required when '" + + str(dep) + + "' is set to '" + + str(dependency[dep]) + + "'" + ) + sys.exit(1) + + return value + + +def combine_paths(*args): + """ + Combine the input paths to 1 path and make sure + it is valid by checking the slashes. + """ + + combined_path = "" + for arg in args: + if arg[0] == "/": + arg = arg[1:] + + if arg[-1] != "/": + arg = arg + "/" + + combined_path = combined_path + arg + + combined_path = combined_path.replace("//", "/") + combined_path = combined_path.replace("///", "/") + + return combined_path + + +def normalize_directory(path): + """ + Makes sure that relative paths always start from + the root, have a full path and that all paths end + with a "/". + """ + + if not path[0] == "/": + root = os.getcwd() + path = root + "/" + path + + if path[-1] != "/": + path = path + "/" + + return path + + +def list_differences(list1, list2): + """ + Return the items that are in list1 but + not in list2. + """ + + difference = [item for item in list1 if item not in list2] + + return difference diff --git a/src/utils/redis.py b/src/utils/redis.py new file mode 100644 index 0000000..09f0341 --- /dev/null +++ b/src/utils/redis.py @@ -0,0 +1,124 @@ +import logging +import config +import redis + + +def connect(): + """ + Parse redis config and connect. + """ + + try: + # try connection string, or default to separate REDIS_* env vars + if config.redis_url: + rds = redis.Redis.from_url(config.redis_url, db=config.redis_database) + + elif config.redis_password: + rds = redis.Redis( + host=config.redis_host, + port=config.redis_port, + password=config.redis_password, + db=config.redis_database, + ) + else: + rds = redis.Redis( + host=config.redis_host, port=config.redis_port, db=config.redis_database + ) + + except Exception as error: + logging.error("Failed to connect to Redis with error: " + error) + return False + + return rds + + +def read(key): + """ + Read specified key from Redis. + """ + + rds = connect() + + try: + # get info from cache + data = rds.get(key) + + # return False if not found + if not data: + return False + + # decode bytes to str + data = data.decode("UTF-8") + + # return data from Redis + return data + + except Exception as redis_error: + # print query parse error and return empty dict + logging.error( + "An error occured while trying to read and decode from Redis", + extra={"key": key, "error_msg": redis_error}, + ) + logging.error(redis_error) + + # return failed status + return False + + +def write(key, data): + """ + Write specified key to Redis. + """ + + rds = connect() + + # write data and set ttl + try: + expiration = int(config.cache_expiration) + + # insert data into Redis + if expiration == 0: + rds.set(key, data) + else: + rds.set(key, data, ex=expiration) + + # return succes status + return True + + except Exception as redis_error: + # print query parse error and return empty dict + logging.error( + "An error occured while trying to write to Redis cache", + extra={"key": key, "error_msg": redis_error}, + ) + logging.error(redis_error) + + # return fail status + return False + + +def increment(key, amount=1): + """ + Increment value of amount to + specified key to Redis. + """ + + rds = connect() + + # increment data of key + try: + # increment and set new value + rds.incrby(key, amount) + + # return succes status + return True + + except Exception as redis_error: + # print query parse error and return empty dict + logging.error( + "An error occured while trying to increment value in Redis cache", + extra={"key": key, "error_msg": redis_error}, + ) + + # return fail status + return False diff --git a/src/utils/steam.py b/src/utils/steam.py new file mode 100644 index 0000000..4822f4f --- /dev/null +++ b/src/utils/steam.py @@ -0,0 +1,245 @@ +import gevent +import logging +import requests +from steam.client import SteamClient + + +def init_client(): + """ + Initialize Steam client, login and + return the client. + """ + + logging.debug("Connecting via steamclient to steam api") + client = SteamClient() + client.anonymous_login() + client.verbose_debug = False + + return client + + +def get_app_list(): + """ + Get list of id's of all current apps in + Steam and return them in a flat list. + """ + + response = requests.get("https://api.steampowered.com/ISteamApps/GetAppList/v2/") + response_json = response.json() + + apps = [] + if response.status_code != 200: + logging.error("The Steam GetAppList API endpoint returned a non-200 http code") + return False + + else: + for app in response_json["applist"]["apps"]: + apps.append(app["appid"]) + + return apps + + +def get_change_number(): + """ + Get and return the latest change number. + """ + + connect_retries = 2 + connect_timeout = 3 + client = None + + try: + # Sometimes it hangs for 30+ seconds. Normal connection takes about 500ms + for _ in range(connect_retries): + count = str(_) + + try: + with gevent.Timeout(connect_timeout): + + client = init_client() + info = client.get_changes_since( + 1, app_changes=False, package_changes=False + ) + change_number = info.current_change_number + + client.logout() + + return change_number + + except gevent.timeout.Timeout: + if client: + client._connecting = False + client.logout() + + else: + break + else: + if client: + client._connecting = False + client.logout() + raise Exception(f"Max connect retries ({connect_retries}) exceeded") + + except Exception as err: + if client: + client._connecting = False + client.logout() + + logging.error( + "Encountered the following error when trying to retrieve latest change number: " + + str(err) + ) + return False + + +def get_changes_since_change_number(change_number): + """ + Get and return lists of changed apps and + packages since the specified change number. + """ + + connect_retries = 2 + connect_timeout = 3 + client = None + + try: + # Sometimes it hangs for 30+ seconds. Normal connection takes about 500ms + for _ in range(connect_retries): + count = str(_) + + try: + with gevent.Timeout(connect_timeout): + + client = init_client() + info = client.get_changes_since( + int(change_number), app_changes=True, package_changes=True + ) + + app_list = [] + if info.app_changes: + for app in info.app_changes: + app_list.append(app.appid) + + package_list = [] + if info.package_changes: + for package in info.package_changes: + package_list.append(package.packageid) + + changes = {"apps": app_list, "packages": package_list} + + client.logout() + + return changes + + except gevent.timeout.Timeout: + if client: + client._connecting = False + client.logout() + + else: + break + else: + if client: + client._connecting = False + client.logout() + raise Exception(f"Max connect retries ({connect_retries}) exceeded") + + except Exception as err: + if client: + client._connecting = False + client.logout() + logging.error( + "Failed in get changes since last change number with error: " + str(err), + extra={"changenumber": str(change_number)}, + ) + return False + + +def get_apps_info(apps=[]): + """ + Get product info for list of apps and + return the output untouched. + """ + + connect_retries = 2 + connect_timeout = 3 + client = None + + logging.info("Started requesting app info", extra={"apps": str(apps)}) + + try: + # Sometimes it hangs for 30+ seconds. Normal connection takes about 500ms + for _ in range(connect_retries): + count = str(_) + + try: + with gevent.Timeout(connect_timeout): + logging.info( + "Retrieving app info from steamclient", + extra={"apps": str(apps), "retry_count": count}, + ) + + client = init_client() + + logging.debug( + "Requesting app info from steam api", extra={"apps": str(apps)} + ) + info = client.get_product_info(apps=apps, timeout=1) + info = info["apps"] + + client.logout() + + return info + + except gevent.timeout.Timeout: + logging.warning( + "Encountered timeout when trying to connect to steam api. Retrying.." + ) + if client: + client._connecting = False + client.logout() + + else: + logging.info( + "Succesfully retrieved app info", extra={"apps": str(apps)} + ) + break + else: + if client: + client._connecting = False + client.logout() + logging.error( + "Max connect retries exceeded", + extra={"apps": str(apps), "connect_retries": connect_retries}, + ) + raise Exception(f"Max connect retries ({connect_retries}) exceeded") + + except Exception as err: + if client: + client._connecting = False + client.logout() + logging.error( + "Failed in retrieving app info with error: " + str(err), + extra={"apps": str(apps)}, + ) + return False + + +def get_packages_info(packages=[]): + """ + Get product info for list of packages and + return the output untouched. + """ + + try: + client = init_client() + info = client.get_product_info(packages=packages, timeout=5) + info = info["packages"] + except Exception as err: + logging.error( + "Something went wrong while querying product info for packages: " + + str(packages) + ) + logging.error(err) + return False + + return info diff --git a/src/utils/storage.py b/src/utils/storage.py new file mode 100644 index 0000000..efc137c --- /dev/null +++ b/src/utils/storage.py @@ -0,0 +1,250 @@ +from job import logger +import utils.helper +import config +import logging +import os +from minio import Minio +from io import BytesIO + + +## meta functions + + +def read(path, filename): + """ + Read file from specified directory. + """ + + match config.storage_type: + case "local": + response = local_read(path, filename) + case "object": + response = object_read(path, filename) + case _: + response = False + + return response + + +def write(content, path, filename): + """ + Write content to file in the specified + directory/path. + """ + + match config.storage_type: + case "local": + response = local_write(content, path, filename) + case "object": + response = object_write(content, path, filename) + case _: + response = False + + return response + + +def list(path): + """ + List files in specified directory and return + it as a list. + """ + + match config.storage_type: + case "local": + response = local_list(path) + case "object": + response = object_list(path) + case _: + response = False + + return response + + +## local storage functions + + +def local_read(path, filename): + """ + Read file from local specified directory. + """ + + path = utils.helper.combine_paths(config.storage_directory, path) + path = utils.helper.normalize_directory(path) + file = path + filename + + try: + content = open(file, "r") + content = content.read() + + except Exception: + logging.error("The following file could not be read: " + file) + return False + + return content + + +def local_write(content, path, filename): + """ + Write content to file to local specified + directory. + """ + + path = utils.helper.combine_paths(config.storage_directory, path) + path = utils.helper.normalize_directory(path) + file = path + filename + + try: + f = open(file, "w") + f.write(content) + f.close() + logging.info("Written the following file: " + file) + + except Exception: + logger.error("The following file could not be written locally: " + file) + return False + + return True + + +def local_delete(path, filename): + """ + Delete file in local specified directory. + """ + + path = utils.helper.combine_paths(config.storage_directory, path) + path = utils.helper.normalize_directory(path) + file = path + filename + + try: + os.remove(file) + except OSError as err: + logging.error( + "The following file could not be deleted locally: " + + err.filename + + ". The following error occured: " + + err.strerror + ) + return False + + return True + + +def local_list(path): + """ + List files in specified local directory and + return it as a list. + """ + + path = utils.helper.combine_paths(config.storage_directory, path) + path = utils.helper.normalize_directory(path) + + try: + content = os.listdir(path) + + except Exception: + logging.error("The following directory could not be read: " + path) + return False + + return content + + +## object store functions + + +def object_connect(): + """ + Connect to object store with credentials set + in config environment variables. + """ + + arguments = { + "endpoint": config.storage_object_endpoint, + "access_key": config.storage_object_access_key, + "secret_key": config.storage_object_secret_key, + "secure": str(config.storage_object_secure).lower() == "true", + } + + if config.storage_object_region: + arguments["region"] = config.storage_object_region + + client = Minio(**arguments) + + return client + + +def object_read(path, filename): + """ + Read file from object storage from specified + directory. + """ + + file = path + filename + conn = object_connect() + + try: + content = conn.get_object(config.storage_object_bucket, file) + + except Exception: + logging.error("The following file could not be retrieved: " + file) + return False + + return content + + +def object_write(content, path, filename): + """ + Write content to file to object storage in + specified directory. + """ + + content = content.encode("utf-8") + content = BytesIO(content) + + file = path + filename + conn = object_connect() + resp = conn.put_object( + config.storage_object_bucket, + file, + content, + length=-1, + part_size=10485760, + content_type="application/json", + ) + + return resp + + +def object_delete(path, filename): + """ + Delete file in object storage specified directory. + """ + + file = path + filename + conn = object_connect() + resp = conn.remove_object(config.storage_object_bucket, file) + + return resp + + +def object_list(path, details=False): + """ + List files in specified directory in object + storage and return it as a list. + """ + + conn = object_connect() + + try: + files = conn.list_objects(config.storage_object_bucket, prefix=path) + file_list = [] + + for file in files: + file = file.object_name + file = file.split("/")[-1] + file_list.append(file) + + except Exception: + logging.error("The files in following directory could not be listed: " + path) + return False + + return file_list diff --git a/src/main.py b/src/web.py similarity index 64% rename from src/main.py rename to src/web.py index d86d0fd..825ee79 100644 --- a/src/main.py +++ b/src/web.py @@ -1,34 +1,20 @@ """ -Main application and entrypoint. +Web Service and main API. """ # import modules -import os +import utils.redis +import utils.steam +import config import json import semver import typing import logging from fastapi import FastAPI, Response -from functions import app_info, cache_read, cache_write, log_level -from logfmter import Logfmter - -# load configuration -from dotenv import load_dotenv - -load_dotenv() # initialise app app = FastAPI() -# set logformat -formatter = Logfmter(keys=["level"], mapping={"level": "levelname"}) -handler = logging.StreamHandler() -handler.setFormatter(formatter) -logging.basicConfig(handlers=[handler]) - -if "LOG_LEVEL" in os.environ: - log_level(os.environ["LOG_LEVEL"]) - # include "pretty" for backwards compatibility class PrettyJSONResponse(Response): @@ -47,44 +33,46 @@ def render(self, content: typing.Any) -> bytes: @app.get("/v1/info/{app_id}", response_class=PrettyJSONResponse) def read_app(app_id: int, pretty: bool = False): - logging.info("Requested app info", extra={"app_id": app_id}) + logging.info("Requested app info", extra={"apps": str([app_id])}) - if "CACHE" in os.environ and os.environ["CACHE"]: - info = cache_read(app_id) + if config.cache == "True": + info = utils.redis.read("app." + str(app_id)) if not info: logging.info( - "App info could not be found in cache", extra={"app_id": app_id} + "App info could not be found in cache", extra={"apps": str([app_id])} ) - info = app_info(app_id) - cache_write(app_id, info) + info = utils.steam.get_apps_info([app_id]) + data = json.dumps(info) + utils.redis.write("app." + str(app_id), data) else: + info = json.loads(info) logging.info( "App info succesfully retrieved from cache", - extra={"app_id": app_id}, + extra={"apps": str([app_id])}, ) else: - info = app_info(app_id) + info = utils.steam.get_apps_info([app_id]) if info is None: logging.info( "The SteamCMD backend returned no actual data and failed", - extra={"app_id": app_id}, + extra={"apps": str([app_id])}, ) # return empty result for not found app return {"data": {app_id: {}}, "status": "failed", "pretty": pretty} - if not info["apps"]: + if not info: logging.info( "No app has been found at Steam but the request was succesfull", - extra={"app_id": app_id}, + extra={"apps": str([app_id])}, ) # return empty result for not found app return {"data": {app_id: {}}, "status": "success", "pretty": pretty} - logging.info("Succesfully retrieved app info", extra={"app_id": app_id}) - return {"data": info["apps"], "status": "success", "pretty": pretty} + logging.info("Succesfully retrieved app info", extra={"apps": str([app_id])}) + return {"data": info, "status": "success", "pretty": pretty} @app.get("/v1/version", response_class=PrettyJSONResponse) @@ -92,10 +80,10 @@ def read_item(pretty: bool = False): logging.info("Requested api version") # check if version succesfully read and parsed - if "VERSION" in os.environ and os.environ["VERSION"]: + if config.version: return { "status": "success", - "data": semver.parse(os.environ["VERSION"]), + "data": semver.parse(config.version), "pretty": pretty, } else: