glowtables/glowtables/table.py

236 lines
7.7 KiB
Python

# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Data model"""
import abc
import collections
import dataclasses
import itertools
import textwrap
from typing import Any, Callable, Generic, Iterator, NewType, Optional, TypeVar
import rdflib
from glowtables.sparql import SparqlBackend
Language = NewType("Language", str)
"""ISO 639-1 code"""
SparqlVariable = NewType("SparqlVariable", str)
"""A variable within a SPARQL query, without the leading ``?``."""
_TFieldValue = TypeVar("_TFieldValue")
@dataclasses.dataclass
class Field(abc.ABC, Generic[_TFieldValue]):
"""Abstract class for a table field."""
id: str
"""Unique within a table"""
display_names: dict[Language, str]
"""Localized name for the field (eg. in a table header)"""
parse: Callable[[str], _TFieldValue]
"""Parses a string returned by a SPARQL query to a native Python value."""
@abc.abstractmethod
def sort_key(self, value: _TFieldValue):
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
@abc.abstractmethod
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
"""
Given the SPARQL variable of a subject and object, returns SPARQL statements
which bind the ``object_var`` to the value of the field for the subject bound
to ``subject_var``.
For example, if this ``Field`` represents the `"CPU frequency"
<https://www.wikidata.org/wiki/Property:P2144>`__, ``subject_var`` is ``a``, and
``object_var`` is `b``, this will return::
?subject_var <http://www.wikidata.org/prop/direct/P2144> ?object_var.
Typically there is only one statement, but more statements are needed to fetch
nodes which aren't neighbors.
"""
@dataclasses.dataclass
class LiteralField(Field[_TFieldValue], Generic[_TFieldValue]):
"""Simplest field: its value is a literal directly on the subject"""
predicate: rdflib.URIRef
default: Optional[_TFieldValue] = None
"""If this is not :const:`None`, allows subjects without a statement for this field;
and use this value instead when sorting.
This is only used when sorting, and isn't displayed."""
def sort_key(self, value: Optional[_TFieldValue]) -> Any:
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
if value is None:
if self.default is None:
raise ValueError(f"{self.id} value is unexpectedly None")
return self.sort_key(self.default)
return value
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
statement = f"?{subject_var} <{self.predicate}> ?{object_var}."
if self.default is None:
return statement
else:
return f"OPTIONAL {{ {statement} }}."
@dataclasses.dataclass
class LabeledField(Field[_TFieldValue], Generic[_TFieldValue]):
"""Simplest field: its value is a literal directly on the subject"""
predicate: rdflib.URIRef
default: Optional[_TFieldValue] = None
"""If this is not :const:`None`, allows subjects without a statement for this field;
and use this value instead when sorting.
This is only used when sorting, and isn't displayed."""
def sort_key(self, value: Optional[_TFieldValue]) -> Any:
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
if value is None:
if self.default is None:
raise ValueError(f"{self.id} value is unexpectedly None")
return self.sort_key(self.default)
return value
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
node_var = new_var()
statement = f"""
?{subject_var} <{self.predicate}> ?{node_var}.
SERVICE <http://wikiba.se/ontology#label> {{
<http://www.bigdata.com/rdf#serviceParam> <http://wikiba.se/ontology#language> "en".
?{node_var} <http://www.w3.org/2000/01/rdf-schema#label> ?{object_var}.
}}
""" # noqa
if self.default is None:
return statement
else:
return f"OPTIONAL {{ {statement} }}."
@dataclasses.dataclass
class Table:
"""A table, along with its fields description."""
fields: list[Field]
"""Ordered list of all fields of the table. Includes hidden and filter-only fields.
"""
constraints: str
"""SPARQL statements which constrain the set of nodes used as main subject
for table entries.
The variable bound to the subject is what is defined in :attr:`subject`
(by default, ``?subject``).
"""
id: str
"""Unique within a Glowtable instance"""
display_names: dict[Language, str] = dataclasses.field(default_factory=dict)
"""Localized name for the table (eg. on a page title)"""
subject: SparqlVariable = SparqlVariable("subject")
sparql_template = textwrap.dedent(
"""
SELECT {columns}
WHERE {{
{constraints}
{statements}
}}
"""
)
def __post_init__(self) -> None:
field_ids = [field.id for field in self.fields]
if str(self.subject) in field_ids:
raise ValueError(f"{self.subject} is both subject and a field id.")
duplicate_field_ids = [
field_id
for (field_id, count) in collections.Counter(field_ids).items()
if count > 1
]
if duplicate_field_ids:
raise ValueError(
f"{self} has duplicate field ids: {', '.join(duplicate_field_ids)}"
)
def sparql(self) -> str:
"""Returns a SPARQL query suitable to get records for this table."""
def new_var(prefix: str) -> Iterator[SparqlVariable]:
for i in itertools.count():
yield SparqlVariable(f"{prefix}{i}")
subject = SparqlVariable("subject")
columns = " ".join(f"?{field.id}" for field in self.fields)
statements = "\n ".join(
field.sparql(subject, SparqlVariable(field.id), new_var(field.id).__next__)
for field in self.fields
)
constraints = textwrap.indent(self.constraints, " ").strip()
return self.sparql_template.format(
subject=subject,
columns=columns,
constraints=constraints,
statements=statements.strip(),
)
def query(self, backend: SparqlBackend) -> Iterator[tuple]:
"""Returns a list of all rows of the table. Each row has exactly one cell for
each column defined in :attr:`fields`.
"""
for row in backend.query(self.sparql()):
yield tuple(
None if cell is None else field.parse(cell["value"])
for (field, cell) in zip(self.fields, row)
)