Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scripts for XML -> Python object -> YAML conversion (re #129) #133

Merged
merged 2 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions bin/anthology.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Marcel Bollmann <[email protected]>, 2019

from collections import defaultdict
from glob import glob
from lxml import etree
import itertools as it
import logging as log
import os


class Anthology:
schema = None

def __init__(self, importdir=None):
self.volumes = defaultdict(list) # maps volume IDs to lists of paper IDs
self.papers = {} # maps paper IDs to Paper objects
if importdir is not None:
self.import_directory(importdir)

def load_schema(self, schemafile):
if os.path.exists(schemafile):
self.schema = etree.RelaxNG(file=schemafile)
else:
log.error("RelaxNG schema not found: {}".format(schemafile))

def import_directory(self, importdir):
assert os.path.isdir(importdir), "Directory not found: {}".format(importdir)
self.load_schema(importdir + "/schema.rng")
for xmlfile in glob(importdir + "/*.xml"):
self.import_file(xmlfile)

def import_file(self, filename):
tree = etree.parse(filename)
if self.schema is not None:
if not self.schema(tree):
log.error("RelaxNG validation failed for {}".format(filename))
volume = tree.getroot()
volume_id = volume.get("id")
for paper in volume:
paper_id = paper.get("id")
full_id = "{}-{}".format(volume_id, paper_id)
self.papers[full_id] = Paper(paper, volume_id)
self.volumes[volume_id].append(full_id)


def _stringify_children(node):
"""Returns the full content of a node, including tags.

Used for nodes that can have mixed text and HTML elements (like <b> and <i>)."""
return "".join(
chunk
for chunk in it.chain(
(node.text,),
it.chain(
*(
(etree.tostring(child, with_tail=False, encoding=str), child.tail)
for child in node.getchildren()
)
),
(node.tail,),
)
if chunk
).strip()


_LIST_ELEMENTS = ("attachment", "author", "editor", "video")


class Paper:
def __init__(self, paper_element, volume_id):
# initialize
self.paper_id = paper_element.get("id")
self.parent_volume = volume_id
self.attrib = {}
self._parse_element(paper_element)
if "year" not in self.attrib:
self._infer_year()

def _parse_element(self, paper_element):
# read & store values
if "href" in paper_element.attrib:
self.attrib["attrib_href"] = paper_element.get("href")
for element in paper_element:
# parse value
tag = element.tag.lower()
if tag in ("abstract", "title"):
value = _stringify_children(element)
elif tag == "attachment":
value = {"filename": element.text, "type": element.get("type", None)}
elif tag in ("author", "editor"):
value = PersonName.from_element(element)
elif tag in ("erratum", "revision"):
value = {"value": element.text, "id": element.get("id")}
elif tag == "mrf":
value = {"filename": element.text, "src": element.get("src")}
elif tag == "video":
value = {"href": element.get("href"), "tag": element.get("tag")}
else:
value = element.text
# store value
if tag in _LIST_ELEMENTS:
try:
self.attrib[tag].append(value)
except KeyError:
self.attrib[tag] = [value]
else:
if tag in self.attrib:
log.warning(
"{}: Unexpected multiple occurrence of '{}' element".format(
self.full_id, tag
)
)
self.attrib[tag] = value

def _infer_year(self):
"""Infer the year from the volume ID.

Many paper entries do not explicitly contain their year. This function assumes
that the paper's volume identifier follows the format 'xyy', where L is
some letter and yy are the last two digits of the year of publication.
"""
assert (
len(self.parent_volume) == 3
), "Couldn't infer year: unknown volume ID format"
digits = int(self.parent_volume[1:])
if digits >= 60:
year = "19{}".format(digits)
else:
year = "20{}".format(digits)
self.attrib["year"] = year

@property
def full_id(self):
return "{}-{}".format(self.parent_volume, self.paper_id)

def get(self, name, default=None):
try:
return self.attrib[name]
except KeyError:
return default

def items(self):
return self.attrib.items()


class PersonName:
first, last, jr = "", "", ""

def __init__(self, first, last, jr):
self.first = first.strip()
self.last = last.strip()
self.jr = jr.strip()

def from_element(person_element):
first, last, jr = "", "", ""
for element in person_element:
tag = element.tag
# These are guaranteed to occur at most once by the schema
if tag == "first":
first = element.text or ""
elif tag == "last":
last = element.text or ""
elif tag == "jr":
jr = element.text or ""
return PersonName(first, last, jr)

@property
def full(self):
return "{} {}{}".format(self.first, self.last, self.jr).strip()

def as_dict(self):
return {"first": self.first, "last": self.last, "jr": self.jr}

def __eq__(self, other):
return (
(self.first == other.first)
and (self.last == other.last)
and (self.jr == other.jr)
)

def __str__(self):
return self.full

def __repr__(self):
if self.jr:
return "{} || {} || {}".format(self.first, self.last, self.jr)
elif self.first:
return "{} || {}".format(self.first, self.last)
else:
return self.last

def __hash__(self):
return hash(repr(self))


class PersonIndex:
"""Keeps an index of persons and their associated papers."""

def __init__(self):
self.names = {} # maps name strings to PersonName objects
self.papers = defaultdict(lambda: defaultdict(list))

def register(self, name: PersonName, paper_id, role):
"""Adds a name to the index, associates it with the given paper ID and role, and returns the name's unique representation."""
if repr(name) not in self.names:
self.names[repr(name)] = name
self.papers[name][role].append(paper_id)
return repr(name)

def items(self):
for name_repr, name in self.names.items():
yield name_repr, name, self.papers[name]


if __name__ == "__main__":
print("This is not a stand-alone script.")
3 changes: 3 additions & 0 deletions bin/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
docopt>=0.6.0
lxml>=4.2.0
PyYAML>=3.0
78 changes: 78 additions & 0 deletions bin/xml_to_yaml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python3
# Marcel Bollmann <[email protected]>, 2019

"""Usage: xml_to_yaml.py [--importdir=DIR] [--exportdir=DIR]

Work in progress.

Options:
--importdir=DIR Directory to import XML files from. [default: {scriptdir}/../import/]
--exportdir=DIR Directory to write YAML files to. [default: {scriptdir}/../hugo/data/]
-h, --help Display this helpful text.
"""

from docopt import docopt
import logging as log
import os
import yaml

try:
from yaml import CDumper as Dumper
except ImportError:
from yaml import Dumper

from anthology import Anthology, PersonIndex


def export_anthology(anthology, outdir):
# Create directories
for subdir in ("", "volumes"):
target_dir = "{}/{}".format(outdir, subdir)
if not os.path.isdir(target_dir):
os.mkdir(target_dir)

pidx = PersonIndex()
for volume, ids in anthology.volumes.items():
papers = {}
for id_ in ids:
paper = anthology.papers[id_]
data = paper.attrib
# Index personal names while we're going through the papers
if "author" in data:
data["author"] = [
pidx.register(person, id_, "author") for person in data["author"]
]
if "editor" in data:
data["editor"] = [
pidx.register(person, id_, "editor") for person in data["editor"]
]
papers[paper.paper_id] = data

# Dump all papers of a volume into a single file (as with the XML)
with open("{}/volumes/{}.yaml".format(outdir, volume), "w") as f:
print(yaml.dump(papers, Dumper=Dumper), file=f)

# Dump author index
people = {}
for name_repr, name, papers in pidx.items():
data = name.as_dict()
data.update(papers)
people[name_repr] = data
with open("{}/people.yaml".format(outdir), "w") as f:
print(yaml.dump(people, Dumper=Dumper), file=f)


if __name__ == "__main__":
args = docopt(__doc__)
scriptdir = os.path.dirname(os.path.abspath(__file__))
if "{scriptdir}" in args["--importdir"]:
args["--importdir"] = os.path.abspath(
args["--importdir"].format(scriptdir=scriptdir)
)
if "{scriptdir}" in args["--exportdir"]:
args["--exportdir"] = os.path.abspath(
args["--exportdir"].format(scriptdir=scriptdir)
)

anthology = Anthology(importdir=args["--importdir"])
export_anthology(anthology, args["--exportdir"])