~nch/glue

ref: 50333171e5b5973cdef9c6e8d045750a3a0936e6 glue/table.py -rw-r--r-- 3.8 KiB
50333171 — nc WIP this seems(?) to be working 1 year, 6 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from dataclasses import dataclass
from typing import List, Dict, Tuple, Iterator, Optional, Union
import numpy as np # type: ignore
from collections.abc import Iterable

# TODO: implement pure relational alebra version
# TODO: implement tuple calculus version
# TODO: add referential integrity...
# TODO: add serialization/deserialization

def _ensure_columns_match(a: List, b: List) -> None:
    if set(a) != set(b):
        raise ValueError('columns do not match', a, b)

class Table:
    columns: List[str]
    def __init__(self, x: Union[List, Tuple, Dict], columns: Optional[List[str]] = None): # TODO: refactor this spaghetti
        '''
        pre: self.columns is not None or len(x) > 0
        post: self.columns is not None and len(self.columns) > 0
        '''
        if isinstance(x, list) or isinstance(x, tuple):
            if columns is not None:
                if len(columns) != len(x[0]):
                    raise ValueError('Length of columns does not match length of data')
                self.columns = columns
            else:
                # we know x is nonempty, so mypy can ignore the type here
                self.columns = list(map(str, range(len(x[0]))))
            self.column_data = [np.array(xs) for xs in zip(*x)]
        elif isinstance(x, dict):
            if columns is not None:
                _ensure_columns_match(list(x.keys()), columns)
                self.columns = columns
            else:
                self.columns = list(x.keys())
            self.column_data = [np.array(x[k]) for k in self.columns]
        else:
            assert False, type(x)

        assert(self.columns)
        assert(self.column_data)

    def __repr__(self):
        return f'<Table {self.columns}>'

    def coli(self, name: str):
        try:
            return self.columns.index(name)
        except ValueError:
            raise AttributeError('No such column', name)

    def tuples(self) -> Iterator[Tuple]:
        for t in zip(*self.column_data):
            yield t

    def tuple(self):
        assert(self.column_data[0].shape == ())
        return tuple(self[c].item() for c in self.columns)

    def __getitem__(self, key):
        if isinstance(key, int):
            return self.column_data[key]
        if isinstance(key, str):
            return self.column_data[self.coli(key)]

        if isinstance(key, list):
            return Table({key: self[key] for key in columns})
        if isinstance(key, slice):
            return Table({key: self[key][key] for key in columns})
        if isinstance(key, tuple):
            colkey, rowkey = key
            if isinstance(colkey, int) or isinstance(colkey, str):
                return self[colkey][rowkey]
            elif isinstance(colkey, list):
                r = Table({col: self.column_data[self.coli(col)][rowkey] for col in colkey})
                if isinstance(rowkey, int):
                    return r.tuple()
                return r
            elif isinstance(colkey, slice):
                return self[self.columns[colkey], rowkey]

    def vstack(self, other: 'Table'):
        '''
        post: len(__return__) == len(self) + len(other)
        '''
        _ensure_columns_match(self.columns, other.columns)
        return Table({c: np.hstack((self[c], other[c])) for c in self.columns}, columns=self.columns)

    def append(self, vals: dict):
        '''
        post: len(__return__) == len(self) + 1
        '''
        if isinstance(vals, dict):
            _ensure_columns_match(list(vals.keys()), self.columns)
            return self.vstack(Table({k: [v] for k, v in vals.items()}))
        elif isinstance(vals, tuple):
            if len(vals) != len(self.columns): # TODO: maybe typecheck here?
                raise ValueError("Not enough tuple values to map into columns")
            return self.vstack(Table([vals], columns=self.columns))
        else:
            assert False, type(vals)