Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).


## [25.11.11/2.8.11] 2025-12-17
### fixed
- Tagpack actor validation

## [25.11.10/2.8.10] 2025-12-15
### fixed
- better retry on ingest (also for prepared statements)
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
SHELL := /bin/bash
PROJECT := graphsense-lib
VENV := venv
RELEASE := 'v25.11.10'
RELEASESEM := 'v2.8.10'
RELEASE := 'v25.11.11'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this really a change or is this a artifact of not merging the last release to master?

RELEASESEM := 'v2.8.11'

-include .env

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ tagpacks = [
"sqlmodel>=0.0.22",
"tabulate>=0.9.0",
"rapidfuzz>=3.13.0",
"rapidyaml>=0.10.0",
]

all = [
Expand Down
50 changes: 49 additions & 1 deletion src/graphsenselib/tagpack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@
except ImportError:
from yaml import SafeLoader as SafeLoader

# Fast YAML loading using rapidyaml
import warnings

with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="builtin type Swig.*has no __module__ attribute",
category=DeprecationWarning,
)
import ryml as _ryml

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a good idea to just remove these warnings?

if sys.version_info[:2] >= (3, 8):
# TODO: Import directly (no need for conditional) when `python_requires = >= 3.8`
from importlib.metadata import PackageNotFoundError, version # pragma: no cover
Expand Down Expand Up @@ -62,9 +73,46 @@ def __str__(self):
class UniqueKeyLoader(SafeLoader):
def construct_mapping(self, node, deep=False):
mapping = set()
for key_node, value_node in node.value:
for key_node, _ in node.value:
key = self.construct_object(key_node, deep=deep)
if key in mapping:
raise ValidationError(f"Duplicate {key!r} key found in YAML.")
mapping.add(key)
return super().construct_mapping(node, deep)


def _check_duplicate_keys_ryml(tree, root_id):
"""Check for duplicate keys at the top level of ryml tree."""
if not tree.is_map(root_id):
return
keys = set()
for i in range(tree.num_children(root_id)):
child_id = tree.child(root_id, i)
key = tree.key(child_id)
if key in keys:
raise ValidationError(
f"Duplicate {key.tobytes().decode()!r} key found in YAML."
)
keys.add(key)


def load_yaml_fast(file_path):
"""Load YAML using rapidyaml (~10x faster) for large files, else PyYAML."""
import json
import os
import yaml

file_size = os.path.getsize(file_path)

# Use UniqueKeyLoader for small files (duplicate key detection)
if file_size < 100 * 1024:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to fallback on small files?

with open(file_path, "r") as f:
return yaml.load(f, UniqueKeyLoader)

# Fast path: ryml -> check duplicates -> JSON -> json.loads
with open(file_path, "rb") as f:
content = f.read()
tree = _ryml.parse_in_arena(content)
_check_duplicate_keys_ryml(tree, tree.root_id())
json_bytes = _ryml.emit_json_malloc(tree, tree.root_id())
return json.loads(json_bytes)
13 changes: 11 additions & 2 deletions src/graphsenselib/tagpack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,17 @@ def validate_tagpack(

# Check actors if enabled
if check_actor_references and actorpack:
actor = tagpack.all_header_fields.get("actor")
if actor:
# Collect actors from header level and tag level
actors_to_check = set()
header_actor = tagpack.all_header_fields.get("actor")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason we need more cases? or is this a new feature?

if header_actor:
actors_to_check.add(header_actor)
for tag in tagpack.tags:
tag_actor = tag.all_fields.get("actor")
if tag_actor:
actors_to_check.add(tag_actor)

for actor in actors_to_check:
resolved = actorpack.resolve_actor(actor)
if resolved is None:
# Unknown actor
Expand Down
57 changes: 43 additions & 14 deletions src/graphsenselib/tagpack/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,61 @@

from graphsenselib.tagpack import ValidationError

# Caches
_type_def_cache: dict[int, dict[str, dict]] = {} # udts_id -> {item_type -> definition}
_mandatory_fields_cache: dict[int, frozenset[str]] = {} # schema_id -> mandatory fields


def _get_type_def_cache(udts: dict) -> dict[str, dict]:
"""Get or create type definition cache for a udts instance."""
udts_id = id(udts)
if udts_id not in _type_def_cache:
_type_def_cache[udts_id] = {}
return _type_def_cache[udts_id]


def load_field_type_definition(udts, item_type):
cache = _get_type_def_cache(udts)
if item_type in cache:
return cache[item_type]

if item_type.startswith("@"):
fd = udts.get(item_type[1:])
if fd is None:
raise ValidationError(f"No type {item_type[1:]} found in the schema.")
return fd
result = fd
else:
return {"type": item_type}
result = {"type": item_type}

cache[item_type] = result
return result


def _get_mandatory_fields(schema_def: dict) -> frozenset[str]:
"""Get mandatory fields for a schema definition (cached)."""
schema_id = id(schema_def)
if schema_id not in _mandatory_fields_cache:
_mandatory_fields_cache[schema_id] = frozenset(
k for k, v in schema_def.items() if bool(v.get("mandatory", False))
)
return _mandatory_fields_cache[schema_id]


def check_type_list_items(udts, field_name, field_definition, lst):
if "item_type" in field_definition:
item_def = load_field_type_definition(udts, field_definition["item_type"])
for i, x in enumerate(lst):
check_type(
udts,
f"{field_name}[{i}]",
load_field_type_definition(udts, field_definition["item_type"]),
x,
)
check_type(udts, f"{field_name}[{i}]", item_def, x)


def check_type_dict(udts, field_name, field_definition, dct):
if "item_type" in field_definition:
fd_def = load_field_type_definition(udts, field_definition["item_type"])

if type(fd_def) is str:
raise ValidationError(f"Type of dict {field_name} is a basic type {fd_def}")

# check mandatory entries
mandatory_fields = [
k for k, v in fd_def.items() if bool(v.get("mandatory", False))
]

# Use cached mandatory fields
mandatory_fields = _get_mandatory_fields(fd_def)
for field in mandatory_fields:
if field not in dct:
raise ValidationError(f"Mandatory field {field} not in {dct}")
Expand All @@ -51,25 +72,30 @@ def check_type_dict(udts, field_name, field_definition, dct):
def check_type(udts, field_name, field_definition, value):
"""Checks whether a field's type matches the definition"""
schema_type = field_definition["type"]

if schema_type == "text":
if not isinstance(value, str):
raise ValidationError("Field {} must be of type text".format(field_name))
if len(value.strip()) == 0:
raise ValidationError("Empty value in text field {}".format(field_name))

elif schema_type == "datetime":
if not isinstance(value, datetime.date) and not isinstance(
value, datetime.datetime
):
raise ValidationError(
f"Field {field_name} must be of type datetime. Found {type(value)}"
)

elif schema_type == "boolean":
if not isinstance(value, bool):
raise ValidationError(f"Field {field_name} must be of type boolean")

elif schema_type == "list":
if not isinstance(value, list):
raise ValidationError(f"Field {field_name} must be of type list")
check_type_list_items(udts, field_name, field_definition, value)

elif schema_type == "json_text":
try:
json_data = json.loads(value)
Expand All @@ -78,8 +104,11 @@ def check_type(udts, field_name, field_definition, value):
f"Invalid JSON in field {field_name} with value {value}: {e}"
)
check_type_dict(udts, field_name, field_definition, json_data)

elif schema_type == "dict":
check_type_dict(udts, field_name, field_definition, value)

else:
raise ValidationError("Unsupported schema type {}".format(schema_type))

return True
Loading