fb1d7b44a1a8eea28236f0d9c824afb034e847fd — Amirouche 1 year, 6 months ago b2d5c29
pstore: import babelia's pstore...

- Drop the use of the POSIX processus pool;

- Pick a sample of document candidates to avoid very long response
  time => the same keywords can yield different results, but those
  results will contain all the required keywords.
1 files changed, 40 insertions(+), 13 deletions(-)

M found/pstore.py
M found/pstore.py => found/pstore.py +40 -13
@@ 17,6 17,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import asyncio
from operator import itemgetter
from uuid import uuid4

@@ 25,7 26,6 @@ from collections import Counter

import found
from found import nstore
from found.pool import pool_for_each_par_map

import zstandard as zstd

@@ 41,7 41,7 @@ PSTORE_SUFFIX_COUNTERS = [b'\x03']
PStore = namedtuple('PStore', ('name', 'tokens', 'prefix_index', 'prefix_counters', 'pool'))

def make(name, prefix, pool):
def make(name, prefix):
    prefix = list(prefix)
    prefix_tokens = tuple(prefix + PSTORE_SUFFIX_TOKENS)
    tokens = nstore.make('{}/token'.format(name), prefix_tokens, 2)

@@ 56,7 56,7 @@ def make(name, prefix, pool):
        # That will map bag uid to a counter serialized to json and
        # compressed with zstd. It is a good old key-value store.
        tuple(prefix + PSTORE_SUFFIX_COUNTERS),
    return out

@@ 133,30 133,57 @@ def _filter(hits):
    return wrapped

async def search(tx, store, keywords, limit):
    coroutines = [_keywords_to_token(tx, store.tokens, keyword) for keyword in keywords]
    tokens = await asyncio.gather(*coroutines)
async def massage(tx, store, candidate, keywords, hits):
    score = 0
    counter = await found.get(tx, found.pack((store.prefix_counters, candidate)))
    # TODO: replace the dictionary and the following for loop with
    # a single iteration over the counter, using zigzag algorithm.
    counter = dict(found.unpack(zstd.decompress(counter)))
    for keyword in keywords:
            count = counter[keyword]
        except KeyError:
            return None
            score += count
    hits[candidate] = score

async def search(tx, store, keywords, limit=13):
    coroutines = (_keywords_to_token(tx, store.tokens, keyword) for keyword in keywords)
    keywords = await asyncio.gather(*coroutines)
    # If a keyword is not present in store.tokens, then there is no
    # document associated with it, hence there is no document that
    # match that keyword, hence no document that has all the requested
    # keywords. Return an empty counter.
    if any(token is None for token in tokens):
    if any(keyword is None for keyword in keywords):
        return list()

    # Select seed token
    coroutines = [_token_to_size(tx, store.prefix_index, token) for token in tokens]
    coroutines = (_token_to_size(tx, store.prefix_index, token) for token in keywords)
    sizes = await asyncio.gather(*coroutines)
    seed = min(zip(sizes, tokens), key=itemgetter(0))[1]
    _, seed = min(zip(sizes, keywords), key=itemgetter(0))

    # Select candidates
    candidates = []
    key = found.pack((store.prefix_index, seed))
    query = found.query(tx, key, found.next_prefix(key))

    async for key, _ in query:
        _, _, uid = found.unpack(key)
    # score, filter and construct hits

    # XXX: 500 was empirically discovered, to make it so that the
    #      search takes less than 1 second or so.
    if len(candidates) > 500:
        candidates = random.sample(candidates, 500)

    # score, filter and construct hits aka. massage
    hits = Counter()
    loop = asyncio.get_running_loop()
    async_generator = _prepare(tx, store.prefix_counters, candidates, tokens)
    await pool_for_each_par_map(loop, store.pool, _filter(hits), _score, async_generator)

    coroutines = (massage(tx, store, c, keywords, hits) for c in candidates)
    await asyncio.gather(*coroutines)

    out = hits.most_common(limit)

    return out