Skip to content

Commit

Permalink
Merge pull request #16 from Attumm/add_ci
Browse files Browse the repository at this point in the history
Add ci
  • Loading branch information
Attumm authored May 13, 2024
2 parents 045b721 + 05d877a commit 722858e
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 44 deletions.
49 changes: 49 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: CI

on:
push:
branches:
- "**"
pull_request:
branches:
- "**"

permissions:
contents: read

jobs:
build:
runs-on: ${{ matrix.os }}
services:
redis:
image: redis
ports:
- 6379:6379
strategy:
matrix:
os: [ubuntu-latest]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
pip install --upgrade pip setuptools
pip install -r requirements.txt
- name: Install test dependencies
run: |
pip install -r test-requirements.txt
- name: Run Pylama
run: |
pylama * -i E501
# - name: Run MyPy strict
# run: |
# mypy --strict
- name: Run Unit Tests
run: |
python -m unittest discover
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Meesee
[![Build Status](https://travis-ci.com/Attumm/meesee.svg?branch=main)](https://travis-ci.com/Attumm/meesee)
[![CI](https://github.com/Attumm/meesee/actions/workflows/ci.yml/badge.svg)](https://github.com/Attumm/meesee/actions/workflows/ci.yml)

Task queue, Long lived workers process parallelization, with Redis as backend.
The project is used in production by three different companies.
Expand Down
2 changes: 1 addition & 1 deletion examples/example_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ def my_func(item, worker_id):


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(my_func, workers=workers, config=config)
3 changes: 1 addition & 2 deletions examples/example_failure_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ def my_failure_func(item, exception, r_instance, worker_id):
print('failure callback', item, str(exception), worker_id)
int_item = int(item.decode('utf-8'))
# handle item, and resend to queue with item minus one
r_instance.send(int_item-1)
r_instance.send(int_item - 1)


if __name__ == "__main__":
produce(10)
startapp(my_func, workers=10, config=config, on_failure_func=my_failure_func)

4 changes: 3 additions & 1 deletion examples/example_health_check.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import smtplib
from email.message import EmailMessage

from meesee import RedisQueue


def send_email(msg, to, from_, subject='default'):
msg_email = EmailMessage()
Expand All @@ -15,14 +17,14 @@ def send_email(msg, to, from_, subject='default'):
s.quit()
return True


config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 100
}

from meesee import RedisQueue

# Note that max_allowed should be alteast one less then maxsize of the config
max_allowed = 10000
Expand Down
7 changes: 6 additions & 1 deletion examples/example_multi_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,31 @@
"timeout": 1,
}]


def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


funcs = [func_a, func_b, func_c]


def produce(items, configs):
r = RedisQueue(**configs[0])
for config in configs:
r.set_list_key(key=config['key'], namespace=config['namespace'])
for _ in range(items):
r.send(config['key'])


if __name__ == '__main__':
produce(3, configs)
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(funcs, workers=workers, config=configs)
4 changes: 3 additions & 1 deletion examples/example_multi_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
def func_a(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item))


def func_b(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item))


def func_c(item, worker_id):
print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item))


funcs = [func_a, func_b, func_c]

if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(funcs, workers=workers, config=config)
2 changes: 1 addition & 1 deletion examples/example_produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ def produce(items):


if __name__ == "__main__":
amount = int(sys.argv[sys.argv.index('-p')+1]) if '-p' in sys.argv else 10
amount = int(sys.argv[sys.argv.index('-p') + 1]) if '-p' in sys.argv else 10
produce(amount)
5 changes: 2 additions & 3 deletions examples/example_sys_signal_doesnt_add_item.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
from meesee import RedisQueue
from meesee import startapp

Expand All @@ -22,13 +21,13 @@ def my_func(item, worker_id):
print('regression: item is None')
print('got item {}'.format(locals()))


def raise_sys_exit(item, worker_id):
print('raise sys_exit')
raise SystemExit
raise SystemExit


if __name__ == "__main__":
produce(1)
startapp(raise_sys_exit, workers=1, config=config)
startapp(my_func, workers=1, config=config)

2 changes: 1 addition & 1 deletion examples/example_timout_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ def my_func(item, worker_id):


if __name__ == '__main__':
workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10
workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10
startapp(my_func, workers=workers, config=config)
13 changes: 7 additions & 6 deletions examples/example_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from meesee import RedisQueue, startapp

redis_config = {
"host": '127.0.0.1',
"host": "127.0.0.1",
"port": 6380,
"ssl": True,
"ssl_keyfile":'test_redis_key.pem',
"ssl_certfile":'test_redis_cert.pem',
"ssl_cert_reqs":'required',
"ssl_ca_certs":'test_redis_cert.pem'
"ssl_keyfile": "test_redis_key.pem",
"ssl_certfile": "test_redis_cert.pem",
"ssl_cert_reqs": "required",
"ssl_ca_certs": "test_redis_cert.pem",
}


Expand All @@ -28,7 +28,8 @@ def produce(items):


def my_func(item, worker_id):
print('worker: {worker_id} hello, look at me, msg: {item}'.format(worker_id=worker_id, item=item))
print("worker: {worker_id} hello, look at me, msg: {item}".format(worker_id=worker_id, item=item))


if __name__ == "__main__":
# Create self-signed certs
Expand Down
14 changes: 7 additions & 7 deletions meesee.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

class RedisQueue:
def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None):
# TCP check if connection is alive, Sane defaults
redis_config.setdefault('socket_timeout', 30)
redis_config.setdefault('socket_keepalive', True)
# TCP check if connection is alive
# redis_config.setdefault('socket_timeout', 30)
# redis_config.setdefault('socket_keepalive', True)
# Ping check if connection is alive
# redis_config.setdefault('health_check_interval', 30)
self.r = redis.Redis(**redis_config)
Expand Down Expand Up @@ -105,7 +105,7 @@ def setup_init_items(func_kwargs, init_kwargs):
return {name: func_kwargs[name] for name in init_kwargs.keys()}


def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs):
def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs): # noqa:C901
if isinstance(func, list):
func = func[worker_id % len(func)]
if isinstance(config, list):
Expand Down Expand Up @@ -148,7 +148,7 @@ def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=No
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Graceful exit\n')
p.close()
p.join()
sys.stdout.write('Starting Graceful exit\n')
p.close()
p.join()
sys.stdout.write('Clean shut down\n')
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pylama==8.4.1
37 changes: 18 additions & 19 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def produce(amount):
r = RedisQueue(**example_config)
for i in range(1, amount+1):
for i in range(1, amount + 1):
r.send(i)


Expand Down Expand Up @@ -54,9 +54,9 @@ def tearDown(self):
def test_incr_key_equals_produces_single_worker(self):
expected = 100
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=1, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -67,22 +67,22 @@ def test_incr_key_equals_produces_single_worker(self):
def test_incr_key_equals_produces_five_workers(self):
expected = 100
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)

result = int(redis_instance.get(key))
self.assertEqual(result, expected)

def test_incr_key_equals_produces_multiple_workers(self):
expected = 123
expected = 123
produce(expected)

key = 'test:amount'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(increment_by_one, workers=7, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -93,13 +93,13 @@ def test_incr_key_equals_produces_multiple_workers(self):
def test_all_workers_are_present(self):
expected = 268
expected_workers_amount = 5
expected_workers = {i for i in range(1, expected_workers_amount+1)}
expected_workers = {i for i in range(1, expected_workers_amount + 1)}

produce(expected)

key = 'test:amount'
key_workerids = 'test:workerids'
kwargs = {'key':key, 'r': RedisQueue}
kwargs = {'key': key, 'r': RedisQueue}
init_kwargs = {'r': example_config}

startapp(incr_and_append_worker_id, workers=expected_workers_amount, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -109,9 +109,9 @@ def test_all_workers_are_present(self):

result_workers = redis_instance.lrange(key_workerids, 0, -1)
result_workers_set = {int(i) for i in sorted(result_workers)}

self.assertEqual(result_workers_set, expected_workers)


def append_item(item, worker_id, key, r, test_result_key):
r.r.lpush(test_result_key, item)
Expand All @@ -126,10 +126,10 @@ def tearDown(self):
def test_items_send_are_handled_single_worker(self):
expected = ['1', '2', '3']
produce_items(expected)

key = 'test:items'
result_key = 'test:result'
kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key}
kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key}
init_kwargs = {'r': example_config}

startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand All @@ -140,10 +140,10 @@ def test_items_send_are_handled_single_worker(self):
def test_items_send_are_handled_multiple_worker(self):
expected = ['1', '2', '3']
produce_items(expected)

key = 'test:items'
result_key = 'test:result'
kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key}
kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key}
init_kwargs = {'r': example_config}

startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs)
Expand Down Expand Up @@ -178,4 +178,3 @@ def test_handle_sys(self):

if __name__ == '__main__':
unittest.main()

0 comments on commit 722858e

Please sign in to comment.