Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests #3

Merged
merged 33 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0332ef2
add tests
quaxsze Jan 14, 2022
36cfb93
filters
quaxsze Jan 18, 2022
dcb6dc5
filters and tests
quaxsze Jan 18, 2022
f796db4
Merge branch 'main' into addTests
maudetes Jan 19, 2022
c33f75d
remove dockefiles
quaxsze Jan 20, 2022
0230cb2
add missing requirements
quaxsze Jan 20, 2022
bf4503c
fix service
quaxsze Jan 20, 2022
a8b5e52
Merge branch 'main' into addTests
quaxsze Jan 20, 2022
cb29915
Merge branch 'main' of github.com:opendatateam/udata-search-service i…
quaxsze Jan 24, 2022
586462f
fix tests
quaxsze Jan 24, 2022
2c5c51f
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 24, 2022
06d605a
add github action
quaxsze Jan 24, 2022
970145d
add github action2
quaxsze Jan 24, 2022
7d85fd4
linting fixes
quaxsze Jan 24, 2022
7eb73bb
Improve temporal and spatial support (#8)
maudetes Jan 25, 2022
ddcaa8e
add more tests
quaxsze Jan 26, 2022
8911c98
fix tests
quaxsze Jan 26, 2022
b6da8b8
fix tests
quaxsze Jan 26, 2022
239d20b
Add tests for the kafka consumer
maudetes Jan 26, 2022
4c26673
Deal with organization deserialization better
maudetes Jan 26, 2022
03e1e76
Merge pull request #9 from opendatateam/add-kafka-consumer-tests
maudetes Jan 27, 2022
35e0da8
fix empty filters
quaxsze Jan 27, 2022
a032f7c
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 27, 2022
bd0c881
Add normalization before indexation in consumer
maudetes Jan 27, 2022
7e94483
Use `and` operator and a most fields query to dataset search query
maudetes Jan 27, 2022
5ce9022
Update tests following normalization
maudetes Jan 28, 2022
3e8d4c7
add api tests
quaxsze Jan 28, 2022
b5b9113
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 28, 2022
556e326
fix requirements
quaxsze Jan 28, 2022
25700f4
fix filters
quaxsze Feb 2, 2022
93848d6
Change normalized values to Float type
maudetes Feb 3, 2022
602dc8d
Update app/infrastructure/search_clients.py
quaxsze Feb 3, 2022
fc7582e
fix
quaxsze Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Pytest package

on: [push]

jobs:
build:

runs-on: ubuntu-latest
services:
es-1:
image: udata/elasticsearch:7.16.2
env:
node.name: es01
cluster.name: es-docker-cluster
cluster.initial_master_nodes: es01
ports:
- 9200:9200
strategy:
matrix:
python-version: [3.9]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
1 change: 0 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from flask import Flask
from app.config import Config
from app.container import Container
from app.infrastructure import kafka_consumer
from app.presentation import api, commands


Expand Down
6 changes: 3 additions & 3 deletions app/infrastructure/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def create_kafka_consumer():
consumer = KafkaConsumer(
bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}',
group_id='elastic',
reconnect_backoff_max_ms=100000, # TODO: what value to set here?
reconnect_backoff_max_ms=100000, # TODO: what value to set here?

# API Version is needed in order to prevent api version guessing leading to an error
# on startup if Kafka Broker isn't ready yet
api_version=tuple([int(value) for value in KAFKA_API_VERSION.split('.')])
Expand Down Expand Up @@ -79,7 +79,7 @@ def consume_messages(consumer, es):
for message in consumer:
value = message.value
val_utf8 = value.decode('utf-8').replace('NaN', 'null')

key = message.key
index = message.topic

Expand Down
141 changes: 85 additions & 56 deletions app/infrastructure/search_clients.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Tuple, Optional, List
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl import Index, Document, Integer, Text, tokenizer, token_filter, analyzer, query, Date
from elasticsearch_dsl import Index, Document, Integer, Text, tokenizer, token_filter, analyzer, query
from elasticsearch_dsl.connections import connections
from app.domain.entities import Dataset, Organization, Reuse
from app.config import Config
Expand Down Expand Up @@ -28,7 +28,7 @@ class SearchableOrganization(Document):
description = Text(analyzer=dgv_analyzer)
url = Text()
orga_sp = Integer()
created_at = Date()
created_at = Text()
quaxsze marked this conversation as resolved.
Show resolved Hide resolved
followers = Integer()
datasets = Integer()

Expand All @@ -39,7 +39,7 @@ class Index:
class SearchableReuse(Document):
title = Text(analyzer=dgv_analyzer)
url = Text()
created_at = Date()
created_at = Text()
orga_followers = Integer()
views = Integer()
followers = Integer()
Expand All @@ -57,7 +57,7 @@ class SearchableDataset(Document):
title = Text(analyzer=dgv_analyzer)
acronym = Text()
url = Text()
created_at = Date()
created_at = Text()
orga_sp = Integer()
orga_followers = Integer()
views = Integer()
Expand All @@ -67,8 +67,8 @@ class SearchableDataset(Document):
resources_count = Integer()
concat_title_org = Text(analyzer=dgv_analyzer)
organization_id = Text()
temporal_coverage_start = Date()
temporal_coverage_end = Date()
temporal_coverage_start = Text()
temporal_coverage_end = Text()
granularity = Text()
geozones = Text()
quaxsze marked this conversation as resolved.
Show resolved Hide resolved
description = Text(analyzer=dgv_analyzer)
Expand Down Expand Up @@ -103,69 +103,98 @@ def index_dataset(self, to_index: Dataset) -> None:
def index_reuse(self, to_index: Reuse) -> None:
SearchableReuse(meta={'id': to_index.id}, **to_index.to_dict()).save(skip_empty=False)

def query_organizations(self, query_text: str, offset: int, page_size: int) -> Tuple[int, List[dict]]:
s = SearchableOrganization.search().query('bool', should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['title^15','acronym^15','description^8'])]),
functions=[
query.SF("field_value_factor", field="orga_sp", factor=8, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="datasets", factor=1, modifier='sqrt', missing=1),
],
),
query.Match(title={"query": query_text, 'fuzziness': 'AUTO'})
])
def query_organizations(self, query_text: str, offset: int, page_size: int, filters: dict) -> Tuple[int, List[dict]]:
s = SearchableOrganization.search()

for key, value in filters.items():
s = s.filter('term', **{key: value})

if query_text:
s = s.query('bool', should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['name^15', 'acronym^15', 'description^8'])]),
functions=[
query.SF("field_value_factor", field="orga_sp", factor=8, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="datasets", factor=1, modifier='sqrt', missing=1),
],
),
query.Match(title={"query": query_text, 'fuzziness': 'AUTO'})
])

s = s[offset:(offset + page_size)]

response = s.execute()
results_number = response.hits.total.value
res = [hit.to_dict(skip_empty=False) for hit in response.hits]
return results_number, res

def query_datasets(self, query_text: str, offset: int, page_size: int) -> Tuple[int, List[dict]]:
datasets_score_functions = [
query.SF("field_value_factor", field="orga_sp", factor=8, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="views", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="orga_followers", factor=1, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="featured", factor=1, modifier='sqrt', missing=1),
]
s = SearchableDataset.search().query(
'bool',
should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['title^15','acronym^15','description^8','organization^8'])]),
functions=datasets_score_functions,
),
query.Q(
'function_score',
query=query.Bool(must=[query.Match(concat_title_org={"query": query_text, "operator": "and", "boost": 8})]),
functions=datasets_score_functions,
),
query.MultiMatch(query=query_text, type='most_fields', fields=['title', 'organization'], fuzziness='AUTO')
])
def query_datasets(self, query_text: str, offset: int, page_size: int, filters: dict) -> Tuple[int, List[dict]]:
s = SearchableDataset.search()

for key, value in filters.items():
if key == 'temporal_coverage_start':
s = s.filter('range', **{'temporal_coverage_start': {'lte': value}})
elif key == 'temporal_coverage_end':
s = s.filter('range', **{'temporal_coverage_end': {'gte': value}})
else:
s = s.filter('term', **{key: value})

if query_text:
datasets_score_functions = [
query.SF("field_value_factor", field="orga_sp", factor=8, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="views", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="orga_followers", factor=1, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="featured", factor=1, modifier='sqrt', missing=1),
]
s = s.query(
'bool',
should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['title^15', 'acronym^15', 'description^8', 'organization^8'])]),
functions=datasets_score_functions,
),
query.Q(
'function_score',
query=query.Bool(must=[query.Match(concat_title_org={"query": query_text, "operator": "and", "boost": 8})]),
functions=datasets_score_functions,
),
query.MultiMatch(query=query_text, type='most_fields', fields=['title', 'organization'], fuzziness='AUTO')
])

quaxsze marked this conversation as resolved.
Show resolved Hide resolved
s = s[offset:(offset + page_size)]

response = s.execute()
results_number = response.hits.total.value
res = [hit.to_dict(skip_empty=False) for hit in response.hits]
return results_number, res

def query_reuses(self, query_text: str, offset: int, page_size: int) -> Tuple[int, List[dict]]:
s = SearchableReuse.search().query('bool', should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['title^15','description^8','organization^8'])]),
functions=[
query.SF("field_value_factor", field="views", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="orga_followers", factor=1, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="featured", factor=1, modifier='sqrt', missing=1),
],
),
query.MultiMatch(query=query_text, type='most_fields', fields=['title', 'organization'], fuzziness='AUTO')
])
def query_reuses(self, query_text: str, offset: int, page_size: int, filters: dict) -> Tuple[int, List[dict]]:
s = SearchableReuse.search()

for key, value in filters.items():
s = s.filter('term', **{key: value})

if query_text:
s = s.query('bool', should=[
query.Q(
'function_score',
query=query.Bool(should=[query.MultiMatch(query=query_text, type='phrase', fields=['title^15', 'description^8', 'organization^8'])]),
functions=[
query.SF("field_value_factor", field="views", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="followers", factor=4, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="orga_followers", factor=1, modifier='sqrt', missing=1),
query.SF("field_value_factor", field="featured", factor=1, modifier='sqrt', missing=1),
],
),
query.MultiMatch(query=query_text, type='most_fields', fields=['title', 'organization'], fuzziness='AUTO')
])

s = s[offset:(offset + page_size)]

response = s.execute()
results_number = response.hits.total.value
res = [hit.to_dict(skip_empty=False) for hit in response.hits]
Expand Down
33 changes: 27 additions & 6 deletions app/infrastructure/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ def __init__(self, search_client: ElasticClient):
def feed(self, organization: Organization) -> None:
self.search_client.index_organization(organization)

def search(self, search_text: str, page: int, page_size: int) -> Tuple[List[Organization], int, int]:
def search(self, filters: dict) -> Tuple[List[Organization], int, int]:
page = filters.pop('page')
page_size = filters.pop('page_size')
search_text = filters.pop('q')

if page > 1:
offset = page_size * (page - 1)
else:
offset = 0

results_number, search_results = self.search_client.query_organizations(search_text, offset, page_size)
results_number, search_results = self.search_client.query_organizations(search_text, offset, page_size, filters)
results = [Organization.load_from_dict(hit) for hit in search_results]
total_pages = round(results_number / page_size) or 1
return results, results_number, total_pages
Expand All @@ -37,13 +41,20 @@ def __init__(self, search_client: ElasticClient):
def feed(self, dataset: Dataset) -> None:
self.search_client.index_dataset(dataset)

def search(self, search_text: str, page: int, page_size: int) -> Tuple[List[Dataset], int, int]:
def search(self, filters: dict) -> Tuple[List[Dataset], int, int]:
page = filters.pop('page')
page_size = filters.pop('page_size')
search_text = filters.pop('q')

if page > 1:
offset = page_size * (page - 1)
else:
offset = 0

results_number, search_results = self.search_client.query_datasets(search_text, offset, page_size)
if filters['temporal_coverage']:
self.parse_temporal_value(filters)

results_number, search_results = self.search_client.query_datasets(search_text, offset, page_size, filters)
results = [Dataset.load_from_dict(hit) for hit in search_results]
total_pages = round(results_number / page_size) or 1
return results, results_number, total_pages
Expand All @@ -54,6 +65,12 @@ def find_one(self, dataset_id: str) -> Optional[Dataset]:
except TypeError:
return None

@staticmethod
def parse_temporal_value(filters):
parts = filters.pop('temporal_coverage')
filters['temporal_coverage_start'] = parts[:10]
filters['temporal_coverage_end'] = parts[11:]
quaxsze marked this conversation as resolved.
Show resolved Hide resolved


class ReuseService:

Expand All @@ -63,13 +80,17 @@ def __init__(self, search_client: ElasticClient):
def feed(self, reuse: Reuse) -> None:
self.search_client.index_reuse(reuse)

def search(self, search_text: str, page: int, page_size: int) -> Tuple[List[Reuse], int, int]:
def search(self, filters: dict) -> Tuple[List[Reuse], int, int]:
page = filters.pop('page')
page_size = filters.pop('page_size')
search_text = filters.pop('q')

if page > 1:
offset = page_size * (page - 1)
else:
offset = 0

results_number, search_results = self.search_client.query_reuses(search_text, offset, page_size)
results_number, search_results = self.search_client.query_reuses(search_text, offset, page_size, filters)
results = [Reuse.load_from_dict(hit) for hit in search_results]
total_pages = round(results_number / page_size) or 1
return results, results_number, total_pages
Expand Down
Loading