Compare commits

13 Commits

Author SHA1 Message Date
0433028a9e .gitignore: Fix typo
Some checks are pending
ci/woodpecker/push/woodpecker Pipeline is pending
2023-06-18 05:43:28 +02:00
a996e1b4a6 Add demo web app and test with actual data 2023-05-28 21:10:06 +02:00
f076efffc6 Cache SPARQL queries to a local database 2023-05-28 20:03:55 +02:00
44eb8147c8 Parse values 2023-05-23 20:00:45 +02:00
a4087ac180 Make RemoteSparqlBackend actually work and use it in Field/Table tests 2023-05-21 21:29:05 +02:00
fb02fb3841 Use requests directly instead SPARQLWrapper
So tests can be closer to the real thing, by mocking the HTTP server
instead of having a completely different abstraction on the client side
(RdflibSparqlBackend)
2023-05-21 19:58:02 +02:00
370c75b16c Lint tests 2023-05-21 17:49:30 +02:00
3f4b969065 Ensure generated SPARQL queries are syntactically valid 2023-05-21 17:37:47 +02:00
673df5453f Fix lint 2023-05-21 16:23:20 +02:00
df975efcd3 Add tests for field id clashes 2023-05-21 14:33:53 +02:00
c414514de3 Fix typo 2023-05-21 14:06:14 +02:00
3a6d306fbf Define basic architecture and pytest fixtures for SPARQL querying 2023-05-21 14:04:46 +02:00
5ab41939f2 Start defining the data model 2023-05-21 14:04:27 +02:00
15 changed files with 1043 additions and 4 deletions

3
.gitignore vendored
View File

@ -160,3 +160,6 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Data
*.sqlite3

View File

@ -0,0 +1,13 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.

103
glowtables/cache.py Normal file
View File

@ -0,0 +1,103 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""SPARQL query cache"""
import datetime
import random
import sqlite3
from typing import Optional
EXPIRE_PROBA = 0.001
"""Probability an ``INSERT INTO`` is preceded by a ``DELETE`` of all old records."""
CACHE_LIFETIME = datetime.timedelta(days=7)
def _now() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)
class Cache:
"""A simple key-value cache for SPARQL queries"""
def __init__(self, db: str):
self._db = sqlite3.connect(db)
self._init_schema()
def _init_schema(self):
"""Initialize tables and indexes"""
with self._db:
self._db.execute(
"""
CREATE TABLE IF NOT EXISTS sparql_queries (
url TEXT,
query TEXT,
response TEXT,
date TEXT -- ISO8601 timestamp of the recorded query, must be UTC
);
"""
)
self._db.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS sparql_queries_pk
ON sparql_queries (url, query)
"""
)
def _expire(self) -> None:
"""Randomly delete outdated item from the database."""
if random.random() < EXPIRE_PROBA:
with self._db:
self._db.execute(
"""
DELETE FROM sparql_queries WHERE date < ?
""",
((_now() - CACHE_LIFETIME).isoformat()),
)
def get(self, url: str, query: str) -> Optional[str]:
"""Gets the response to a previous query from the cache, or None."""
with self._db:
cur = self._db.execute(
"""
SELECT response
FROM sparql_queries
WHERE url=? AND query=? AND date >= ?
""",
(url, query, (_now() - CACHE_LIFETIME).isoformat()),
)
rows = list(cur)
if rows:
# cache hit
((resp,),) = rows
return resp
else:
# cache miss
return None
def set(self, url: str, query: str, response: str) -> None:
"""Adds the response of a query to the cache."""
self._expire()
with self._db:
self._db.execute(
"""
INSERT INTO sparql_queries(url, query, response, date)
VALUES (?, ?, ?, ?)
ON CONFLICT(url, query) DO UPDATE SET
response=EXCLUDED.response,
date=EXCLUDED.date
""",
(url, query, response, _now().isoformat()),
)

View File

View File

@ -0,0 +1,58 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Example table of all Wikidata cats"""
import rdflib
from glowtables.table import LabeledField, Language, Table
cats = Table(
id="cats",
display_names={Language("en"): "Cats"},
fields=[
LabeledField(
"name",
{Language("en"): "Name property (not label)"},
str,
rdflib.URIRef("http://www.wikidata.org/prop/direct/P2561"),
default="",
),
LabeledField(
"breed",
{Language("en"): "Breed"},
str,
rdflib.URIRef("http://www.wikidata.org/prop/direct/P4743"),
),
LabeledField(
"instanceof",
{Language("en"): "Instance of"},
str,
rdflib.URIRef("http://www.wikidata.org/prop/direct/P31"),
),
LabeledField(
"haircolor",
{Language("en"): "Hair color"},
str,
rdflib.URIRef("http://www.wikidata.org/prop/direct/P1884"),
default="",
),
],
constraints="""
?subject
<http://www.wikidata.org/prop/direct/P31>
<http://www.wikidata.org/entity/Q146>
.
""", # instance of cat
)

128
glowtables/shortxml.py Normal file
View File

@ -0,0 +1,128 @@
# Copyright (c) 2023 Valentin Lorentz
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# pylint: disable=consider-using-f-string,missing-class-docstring
"""This module allows writing XML ASTs in a way that is more concise than the default
:mod:`xml.etree.ElementTree` interface.
For example:
.. code-block:: python
from .shortxml import Namespace
HTML = Namespace("http://www.w3.org/1999/xhtml")
page = HTML.html(
HTML.head(
HTML.title("irctest dashboard"),
HTML.link(rel="stylesheet", type="text/css", href="./style.css"),
),
HTML.body(
HTML.h1("irctest dashboard"),
HTML.h2("Tests by command/specification"),
HTML.dl(
[
( # elements can be arbitrarily nested in lists
HTML.dt(HTML.a(title, href=f"./{title}.xhtml")),
HTML.dd(defintion),
)
for title, definition in sorted(definitions)
],
class_="module-index",
),
HTML.h2("Tests by implementation"),
HTML.ul(
[
HTML.li(HTML.a(job, href=f"./{file_name}"))
for job, file_name in sorted(job_pages)
],
class_="job-index",
),
),
)
print(ET.tostring(page, default_namespace=HTML.uri))
Attributes can be passed either as dictionaries or as kwargs, and can be mixed
with child elements.
Trailing underscores are stripped from attributes, which allows passing reserved
Python keywords (eg. ``class_`` instead of ``class``)
Attributes are always qualified, and share the namespace of the element they are
attached to.
Mixed content (elements containing both text and child elements) is not supported.
"""
import xml.etree.ElementTree as ET
from typing import Dict, Iterable, Union
def _namespacify(ns: str, s: str) -> str:
return "{%s}%s" % (ns, s)
_Children = Union[None, Dict[str, str], ET.Element, Iterable["_Children"]]
class ElementFactory:
def __init__(self, namespace: str, tag: str):
self._tag = _namespacify(namespace, tag)
self._namespace = namespace
def __call__(self, *args: Union[str, _Children], **kwargs: str) -> ET.Element:
e = ET.Element(self._tag)
attributes = {k.rstrip("_"): v for (k, v) in kwargs.items()}
children = [*args, attributes]
if args and isinstance(children[0], str):
e.text = children[0]
children.pop(0)
for child in children:
self._append_child(e, child)
return e
def _append_child(self, e: ET.Element, child: _Children) -> None:
if isinstance(child, ET.Element):
e.append(child)
elif child is None:
pass
elif isinstance(child, dict):
for k, v in child.items():
e.set(_namespacify(self._namespace, k), str(v))
elif isinstance(child, str):
raise ValueError("Mixed content is not supported")
else:
for grandchild in child:
self._append_child(e, grandchild)
class Namespace:
def __init__(self, uri: str):
self.uri = uri
def __getattr__(self, tag: str) -> ElementFactory:
return ElementFactory(self.uri, tag)

61
glowtables/sparql.py Normal file
View File

@ -0,0 +1,61 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Abstraction over SPARQL backends, primarily meant to be mocked by tests."""
import abc
import json
from typing import Iterable
import requests
from .cache import Cache
class SparqlBackend(abc.ABC):
"""Abstract class for SPARQL clients"""
@abc.abstractmethod
def query(self, query: str) -> Iterable[tuple]:
"""Sends a SPARQL query, and returns an iterable of results."""
class RemoteSparqlBackend(SparqlBackend):
"""Queries a SPARQL API over HTTP."""
def __init__(self, url: str, agent: str, cache: Cache):
"""
:param url: Base URL of the endpoint
:param agent: User-Agent to use in HTTP requests
"""
self._url = url
self._session = requests.Session()
self._session.headers["User-Agent"] = agent
self._cache = cache
def query(self, query: str) -> Iterable[tuple]:
headers = {
"Content-Type": "application/sparql-query",
"Accept": "application/json",
}
resp_text = self._cache.get(self._url, query)
if not resp_text:
resp_text = self._session.post(self._url, headers=headers, data=query).text
self._cache.set(self._url, query, resp_text)
resp = json.loads(resp_text)
variables = resp["head"]["vars"]
for result in resp["results"]["bindings"]:
yield tuple(result.get(variable) for variable in variables)

9
glowtables/style.css Normal file
View File

@ -0,0 +1,9 @@
@media (prefers-color-scheme: dark) {
body {
background-color: #121212;
color: rgba(255, 255, 255, 0.87);
}
a {
filter: invert(0.85) hue-rotate(180deg);
}
}

235
glowtables/table.py Normal file
View File

@ -0,0 +1,235 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Data model"""
import abc
import collections
import dataclasses
import itertools
import textwrap
from typing import Any, Callable, Generic, Iterator, NewType, Optional, TypeVar
import rdflib
from glowtables.sparql import SparqlBackend
Language = NewType("Language", str)
"""ISO 639-1 code"""
SparqlVariable = NewType("SparqlVariable", str)
"""A variable within a SPARQL query, without the leading ``?``."""
_TFieldValue = TypeVar("_TFieldValue")
@dataclasses.dataclass
class Field(abc.ABC, Generic[_TFieldValue]):
"""Abstract class for a table field."""
id: str
"""Unique within a table"""
display_names: dict[Language, str]
"""Localized name for the field (eg. in a table header)"""
parse: Callable[[str], _TFieldValue]
"""Parses a string returned by a SPARQL query to a native Python value."""
@abc.abstractmethod
def sort_key(self, value: _TFieldValue):
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
@abc.abstractmethod
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
"""
Given the SPARQL variable of a subject and object, returns SPARQL statements
which bind the ``object_var`` to the value of the field for the subject bound
to ``subject_var``.
For example, if this ``Field`` represents the `"CPU frequency"
<https://www.wikidata.org/wiki/Property:P2144>`__, ``subject_var`` is ``a``, and
``object_var`` is `b``, this will return::
?subject_var <http://www.wikidata.org/prop/direct/P2144> ?object_var.
Typically there is only one statement, but more statements are needed to fetch
nodes which aren't neighbors.
"""
@dataclasses.dataclass
class LiteralField(Field[_TFieldValue], Generic[_TFieldValue]):
"""Simplest field: its value is a literal directly on the subject"""
predicate: rdflib.URIRef
default: Optional[_TFieldValue] = None
"""If this is not :const:`None`, allows subjects without a statement for this field;
and use this value instead when sorting.
This is only used when sorting, and isn't displayed."""
def sort_key(self, value: Optional[_TFieldValue]) -> Any:
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
if value is None:
if self.default is None:
raise ValueError(f"{self.id} value is unexpectedly None")
return self.sort_key(self.default)
return value
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
statement = f"?{subject_var} <{self.predicate}> ?{object_var}."
if self.default is None:
return statement
else:
return f"OPTIONAL {{ {statement} }}."
@dataclasses.dataclass
class LabeledField(Field[_TFieldValue], Generic[_TFieldValue]):
"""Simplest field: its value is a literal directly on the subject"""
predicate: rdflib.URIRef
default: Optional[_TFieldValue] = None
"""If this is not :const:`None`, allows subjects without a statement for this field;
and use this value instead when sorting.
This is only used when sorting, and isn't displayed."""
def sort_key(self, value: Optional[_TFieldValue]) -> Any:
"""Function suitable as ``key`` argument to :func:`sorted`.
Defaults to the identity function."""
if value is None:
if self.default is None:
raise ValueError(f"{self.id} value is unexpectedly None")
return self.sort_key(self.default)
return value
def sparql(
self,
subject_var: SparqlVariable,
object_var: SparqlVariable,
new_var: Callable[[], SparqlVariable],
) -> str:
node_var = new_var()
statement = f"""
?{subject_var} <{self.predicate}> ?{node_var}.
SERVICE <http://wikiba.se/ontology#label> {{
<http://www.bigdata.com/rdf#serviceParam> <http://wikiba.se/ontology#language> "en".
?{node_var} <http://www.w3.org/2000/01/rdf-schema#label> ?{object_var}.
}}
""" # noqa
if self.default is None:
return statement
else:
return f"OPTIONAL {{ {statement} }}."
@dataclasses.dataclass
class Table:
"""A table, along with its fields description."""
fields: list[Field]
"""Ordered list of all fields of the table. Includes hidden and filter-only fields.
"""
constraints: str
"""SPARQL statements which constrain the set of nodes used as main subject
for table entries.
The variable bound to the subject is what is defined in :attr:`subject`
(by default, ``?subject``).
"""
id: str
"""Unique within a Glowtable instance"""
display_names: dict[Language, str] = dataclasses.field(default_factory=dict)
"""Localized name for the table (eg. on a page title)"""
subject: SparqlVariable = SparqlVariable("subject")
sparql_template = textwrap.dedent(
"""
SELECT {columns}
WHERE {{
{constraints}
{statements}
}}
"""
)
def __post_init__(self) -> None:
field_ids = [field.id for field in self.fields]
if str(self.subject) in field_ids:
raise ValueError(f"{self.subject} is both subject and a field id.")
duplicate_field_ids = [
field_id
for (field_id, count) in collections.Counter(field_ids).items()
if count > 1
]
if duplicate_field_ids:
raise ValueError(
f"{self} has duplicate field ids: {', '.join(duplicate_field_ids)}"
)
def sparql(self) -> str:
"""Returns a SPARQL query suitable to get records for this table."""
def new_var(prefix: str) -> Iterator[SparqlVariable]:
for i in itertools.count():
yield SparqlVariable(f"{prefix}{i}")
subject = SparqlVariable("subject")
columns = " ".join(f"?{field.id}" for field in self.fields)
statements = "\n ".join(
field.sparql(subject, SparqlVariable(field.id), new_var(field.id).__next__)
for field in self.fields
)
constraints = textwrap.indent(self.constraints, " ").strip()
return self.sparql_template.format(
subject=subject,
columns=columns,
constraints=constraints,
statements=statements.strip(),
)
def query(self, backend: SparqlBackend) -> Iterator[tuple]:
"""Returns a list of all rows of the table. Each row has exactly one cell for
each column defined in :attr:`fields`.
"""
for row in backend.query(self.sparql()):
yield tuple(
None if cell is None else field.parse(cell["value"])
for (field, cell) in zip(self.fields, row)
)

View File

View File

@ -0,0 +1,64 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""pytest fixtures"""
# pylint: disable=redefined-outer-name
import pytest
import rdflib
from glowtables.cache import Cache
from glowtables.sparql import RemoteSparqlBackend
@pytest.fixture()
def rdflib_graph() -> rdflib.Graph:
"""Returns an empty rdflib graph."""
return rdflib.Graph()
@pytest.fixture()
def rdflib_sparql(requests_mock, rdflib_graph: rdflib.Graph) -> RemoteSparqlBackend:
"""Returns a SPARQL backend instance for ``rdflib_graph``."""
def rdflib_to_json(o) -> dict:
if isinstance(o, rdflib.Literal):
return {"type": "literal", "value": str(o)}
elif isinstance(o, rdflib.URIRef):
return {"type": "uri", "value": str(o)}
else:
raise NotImplementedError(o)
def json_callback(request, context):
results = rdflib_graph.query(request.text)
context.status_code = 200
return {
"head": {"vars": results.vars},
"results": {
"bindings": [
{
k: rdflib_to_json(v)
for (k, v) in zip(results.vars, result)
if v is not None
}
for result in results
]
},
}
requests_mock.register_uri("POST", "mock://sparql.example.org/", json=json_callback)
return RemoteSparqlBackend(
"mock://sparql.example.org/", agent="Mock Client", cache=Cache(":memory:")
)

View File

@ -0,0 +1,200 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests :mod:`glowtables.table`.
Test cases use a graph containing a few CPUs, with names and clock frequencies.
"""
import textwrap
from decimal import Decimal
import pytest
import rdflib
from glowtables.sparql import SparqlBackend
from glowtables.table import Language, LiteralField, Table
CPU1_URI = rdflib.URIRef("http://example.org/grown-1700")
CPU2_URI = rdflib.URIRef("http://example.org/grown-3600")
CPU3_URI = rdflib.URIRef("http://example.org/secret-project")
CG_URI = rdflib.URIRef("http://example.org/likestone-underdog")
CPU = rdflib.URIRef("http://example.org/CPU")
DISPLAY_NAME = rdflib.URIRef("http://example.org/display-name")
FREQUENCY = rdflib.URIRef("http://example.org/clock-frequency")
TYPE = rdflib.URIRef("http://example.org/type")
@pytest.fixture()
def rdflib_graph() -> rdflib.Graph:
graph = rdflib.Graph()
# A CPU with all the properties
graph.add((CPU1_URI, DISPLAY_NAME, rdflib.Literal("Grown 1700")))
graph.add((CPU1_URI, TYPE, CPU))
graph.add((CPU1_URI, FREQUENCY, rdflib.Literal(3000)))
# Another CPU, without a frequency
graph.add((CPU2_URI, DISPLAY_NAME, rdflib.Literal("Grown 3600")))
graph.add((CPU2_URI, TYPE, CPU))
# Another CPU, without a name
graph.add((CPU3_URI, TYPE, CPU))
graph.add((CPU3_URI, FREQUENCY, rdflib.Literal(9000)))
# Add a graphics card; which should be excluded from CPU searches
graph.add((CG_URI, DISPLAY_NAME, rdflib.Literal("LikeStone Underdog")))
graph.add((CG_URI, FREQUENCY, rdflib.Literal(1626)))
graph.add((CG_URI, TYPE, rdflib.URIRef("http://example.org/graphics-card")))
return graph
def test_single_literal(rdflib_sparql: SparqlBackend) -> None:
name_field = LiteralField(
"display_name",
{Language("en"): "Name"},
str,
rdflib.URIRef("http://example.org/display-name"),
)
table = Table(
id="test-table",
fields=[name_field],
constraints="?subject <http://example.org/type> <http://example.org/CPU>.",
)
assert table.sparql() == textwrap.dedent(
"""
SELECT ?display_name
WHERE {
?subject <http://example.org/type> <http://example.org/CPU>.
?subject <http://example.org/display-name> ?display_name.
}
"""
)
assert set(table.query(rdflib_sparql)) == {
("Grown 1700",),
("Grown 3600",),
}
def test_two_literals(rdflib_sparql: SparqlBackend) -> None:
name_field = LiteralField(
"display_name",
{Language("en"): "Name"},
str,
rdflib.URIRef("http://example.org/display-name"),
)
frequency_field = LiteralField(
"frequency",
{Language("en"): "Clock frequency"},
Decimal,
rdflib.URIRef("http://example.org/clock-frequency"),
)
table = Table(
id="test-table",
fields=[name_field, frequency_field],
constraints="?subject <http://example.org/type> <http://example.org/CPU>.",
)
assert table.sparql() == textwrap.dedent(
"""
SELECT ?display_name ?frequency
WHERE {
?subject <http://example.org/type> <http://example.org/CPU>.
?subject <http://example.org/display-name> ?display_name.
?subject <http://example.org/clock-frequency> ?frequency.
}
"""
)
assert set(table.query(rdflib_sparql)) == {
("Grown 1700", 3000),
}
def test_default_value(rdflib_sparql: SparqlBackend) -> None:
name_field = LiteralField(
"display_name",
{Language("en"): "Name"},
str,
rdflib.URIRef("http://example.org/display-name"),
default="Anonymous CPU",
)
frequency_field = LiteralField(
"frequency",
{Language("en"): "Clock frequency"},
Decimal,
rdflib.URIRef("http://example.org/clock-frequency"),
default=Decimal(0),
)
table = Table(
id="test-table",
fields=[name_field, frequency_field],
constraints="?subject <http://example.org/type> <http://example.org/CPU>.",
)
assert table.sparql() == textwrap.dedent(
"""
SELECT ?display_name ?frequency
WHERE {
?subject <http://example.org/type> <http://example.org/CPU>.
OPTIONAL { ?subject <http://example.org/display-name> ?display_name. }.
OPTIONAL { ?subject <http://example.org/clock-frequency> ?frequency. }.
}
"""
)
assert set(table.query(rdflib_sparql)) == {
("Grown 1700", 3000),
("Grown 3600", None),
(None, 9000),
}
def test_field_id_subject() -> None:
name_field = LiteralField(
"subject",
{Language("en"): "Name"},
str,
rdflib.URIRef("http://example.org/display-name"),
)
with pytest.raises(ValueError, match="both subject and a field id"):
Table(
id="test-table",
fields=[name_field],
constraints="",
)
def test_field_id_clash() -> None:
name_field = LiteralField(
"name",
{Language("en"): "Name"},
str,
rdflib.URIRef("http://example.org/name"),
)
display_name_field = LiteralField(
"name",
{Language("en"): "Display Name"},
str,
rdflib.URIRef("http://example.org/display-name"),
)
with pytest.raises(ValueError, match="has duplicate field ids: name"):
Table(
id="test-table",
fields=[name_field, display_name_field],
constraints="",
)

158
glowtables/views.py Normal file
View File

@ -0,0 +1,158 @@
# This file is part of the Glowtables software
# Copyright (C) 2023 Valentin Lorentz
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License version 3, as published by the
# Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""Minimal webapp to display Glowtables"""
import functools
import importlib.metadata
import importlib.resources
import logging
import operator
import xml.etree.ElementTree as ET
from typing import Callable, List, TypeVar
import flask
from .cache import Cache
from .shortxml import Namespace
from .sparql import RemoteSparqlBackend
from .table import Language, Table
HTML = Namespace("http://www.w3.org/1999/xhtml")
LANG = Language("en") # TODO: configurable
logger = logging.getLogger(__name__)
app = flask.Flask(__name__)
TView = TypeVar("TView", bound=Callable)
def _sparql_backend() -> RemoteSparqlBackend:
return RemoteSparqlBackend(
"https://query.wikidata.org/sparql",
agent="Unconfigured Glowtable instance",
cache=Cache("file:sparql_cache.sqlite3"),
)
def xhtml_view(f: TView) -> TView:
"""Decorator for Flask views which may return XHTML as :mod:`xml.etree.ElementTree`
objects."""
@functools.wraps(f)
def newf(*args, **kwargs):
res = f(*args, **kwargs)
if isinstance(res, (ET.Element, ET.ElementTree)):
xml = ET.tostring(res, default_namespace=HTML.uri)
return flask.Response(xml, mimetype="application/xhtml+xml")
else:
return res
return newf # type: ignore[return-value]
def list_tables() -> List[Table]:
"""Returns all :class:`Table` instances registered as ``glowtables.tables``
entrypoints."""
table_entrypoints: List[
importlib.metadata.EntryPoint
] = importlib.metadata.entry_points( # type: ignore[call-arg,assignment]
group="glowtables.tables"
)
tables = []
for table_entrypoint in sorted(table_entrypoints, key=operator.attrgetter("name")):
table = table_entrypoint.load()
if not isinstance(table, Table):
logger.error(
"%s is %r, which is not an instance of glowtables.table.Table",
table_entrypoint.name,
table,
)
continue
tables.append(table)
return tables
@app.route("/")
@xhtml_view
def index() -> ET.Element:
"""Displays the list of tables."""
tables = list_tables()
return HTML.html(
HTML.head(
HTML.title("Glowtables"),
HTML.link(rel="stylesheet", type="text/css", href="/style.css"),
),
HTML.body(
HTML.h1("Glowtables"),
HTML.ul(
[
HTML.li(
HTML.a(table.display_names[LANG], href=f"/tables/{table.id}/")
)
for table in tables
]
)
if tables
else HTML.p(
"""
There are no tables defined, check the Glowtables documentation
to find how to configure them.
"""
),
),
)
@app.route("/style.css")
def style() -> flask.Response:
"""Serves the CSS."""
css = importlib.resources.files(__package__).joinpath("style.css").read_bytes()
return flask.Response(css, mimetype="text/css")
@app.route("/tables/<table_id>/")
@xhtml_view
def table_(table_id: str) -> ET.Element:
"""Displays a table."""
tables = list_tables()
for table in tables:
if table.id == table_id:
break
else:
flask.abort(404)
return HTML.html(
HTML.head(
HTML.title("Glowtables"),
HTML.link(rel="stylesheet", type="text/css", href="/style.css"),
),
HTML.body(
HTML.h1(table.display_names[LANG]),
HTML.table(
HTML.thead(
HTML.tr(
HTML.th(field.display_names[LANG]) for field in table.fields
)
),
HTML.tbody(
HTML.tr(HTML.td(cell) for cell in row)
for row in table.query(_sparql_backend())
),
),
),
)

View File

@ -3,11 +3,13 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "globtables"
name = "glowtables"
version = "0.0.1"
requires-python = ">=3.9"
dependencies = [
"flask == 2.*",
"flask ~= 2.0",
"rdflib ~= 6.0",
"requests ~= 2.0",
]
[project.optional-dependencies]
@ -19,6 +21,9 @@ testing = [
"types-setuptools",
]
[project.entry-points."glowtables.tables"]
example_cats = "glowtables.examples.cats:cats"
[tool.isort]
profile = "black"
@ -49,9 +54,13 @@ disable = [
"no-member",
"unsupported-membership-test",
"import-error",
"undefined-loop-variable",
# flake8 does it already:
"line-too-long",
]
ignore-paths = [
"glowtables/tests/.*_test.py"
]
[tool.pytest.ini_options]
python_files = "*_test.py"

View File

@ -1,2 +0,0 @@
def test_nothing():
pass