diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..5fe94e8 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,49 @@ +name: CI + +on: + push: + branches: + - "**" + pull_request: + branches: + - "**" + +permissions: + contents: read + +jobs: + build: + runs-on: ${{ matrix.os }} + services: + redis: + image: redis + ports: + - 6379:6379 + strategy: + matrix: + os: [ubuntu-latest] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + pip install --upgrade pip setuptools + pip install -r requirements.txt + + - name: Install test dependencies + run: | + pip install -r test-requirements.txt + + - name: Run Pylama + run: | + pylama * -i E501 + # - name: Run MyPy strict + # run: | + # mypy --strict + - name: Run Unit Tests + run: | + python -m unittest discover diff --git a/README.md b/README.md index 9815848..23a3a7a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Meesee -[![Build Status](https://travis-ci.com/Attumm/meesee.svg?branch=main)](https://travis-ci.com/Attumm/meesee) +[![CI](https://github.com/Attumm/meesee/actions/workflows/ci.yml/badge.svg)](https://github.com/Attumm/meesee/actions/workflows/ci.yml) Task queue, Long lived workers process parallelization, with Redis as backend. The project is used in production by three different companies. diff --git a/examples/example_consume.py b/examples/example_consume.py index b186e27..e995263 100644 --- a/examples/example_consume.py +++ b/examples/example_consume.py @@ -14,5 +14,5 @@ def my_func(item, worker_id): if __name__ == '__main__': - workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10 + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 startapp(my_func, workers=workers, config=config) diff --git a/examples/example_failure_main.py b/examples/example_failure_main.py index 932abfd..1e4706a 100644 --- a/examples/example_failure_main.py +++ b/examples/example_failure_main.py @@ -28,10 +28,9 @@ def my_failure_func(item, exception, r_instance, worker_id): print('failure callback', item, str(exception), worker_id) int_item = int(item.decode('utf-8')) # handle item, and resend to queue with item minus one - r_instance.send(int_item-1) + r_instance.send(int_item - 1) if __name__ == "__main__": produce(10) startapp(my_func, workers=10, config=config, on_failure_func=my_failure_func) - diff --git a/examples/example_health_check.py b/examples/example_health_check.py index 4dd913f..c6a7347 100644 --- a/examples/example_health_check.py +++ b/examples/example_health_check.py @@ -1,6 +1,8 @@ import smtplib from email.message import EmailMessage +from meesee import RedisQueue + def send_email(msg, to, from_, subject='default'): msg_email = EmailMessage() @@ -15,6 +17,7 @@ def send_email(msg, to, from_, subject='default'): s.quit() return True + config = { "namespace": "removeme", "key": "tasks", @@ -22,7 +25,6 @@ def send_email(msg, to, from_, subject='default'): "maxsize": 100 } -from meesee import RedisQueue # Note that max_allowed should be alteast one less then maxsize of the config max_allowed = 10000 diff --git a/examples/example_multi_config.py b/examples/example_multi_config.py index 95a66f8..7f35935 100644 --- a/examples/example_multi_config.py +++ b/examples/example_multi_config.py @@ -21,18 +21,22 @@ "timeout": 1, }] + def func_a(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item)) + def func_b(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item)) + def func_c(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item)) funcs = [func_a, func_b, func_c] + def produce(items, configs): r = RedisQueue(**configs[0]) for config in configs: @@ -40,7 +44,8 @@ def produce(items, configs): for _ in range(items): r.send(config['key']) + if __name__ == '__main__': produce(3, configs) - workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10 + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 startapp(funcs, workers=workers, config=configs) diff --git a/examples/example_multi_func.py b/examples/example_multi_func.py index 7b3da79..6343716 100644 --- a/examples/example_multi_func.py +++ b/examples/example_multi_func.py @@ -13,9 +13,11 @@ def func_a(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_a', worker_id, item)) + def func_b(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_b', worker_id, item)) + def func_c(item, worker_id): print('func: {}, worker_id: {}, item: {}'.format('func_c', worker_id, item)) @@ -23,5 +25,5 @@ def func_c(item, worker_id): funcs = [func_a, func_b, func_c] if __name__ == '__main__': - workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10 + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 startapp(funcs, workers=workers, config=config) diff --git a/examples/example_produce.py b/examples/example_produce.py index a82b61b..92b2617 100644 --- a/examples/example_produce.py +++ b/examples/example_produce.py @@ -16,5 +16,5 @@ def produce(items): if __name__ == "__main__": - amount = int(sys.argv[sys.argv.index('-p')+1]) if '-p' in sys.argv else 10 + amount = int(sys.argv[sys.argv.index('-p') + 1]) if '-p' in sys.argv else 10 produce(amount) diff --git a/examples/example_sys_signal_doesnt_add_item.py b/examples/example_sys_signal_doesnt_add_item.py index dc57683..41260f2 100644 --- a/examples/example_sys_signal_doesnt_add_item.py +++ b/examples/example_sys_signal_doesnt_add_item.py @@ -1,4 +1,3 @@ -import time from meesee import RedisQueue from meesee import startapp @@ -22,13 +21,13 @@ def my_func(item, worker_id): print('regression: item is None') print('got item {}'.format(locals())) + def raise_sys_exit(item, worker_id): print('raise sys_exit') - raise SystemExit + raise SystemExit if __name__ == "__main__": produce(1) startapp(raise_sys_exit, workers=1, config=config) startapp(my_func, workers=1, config=config) - diff --git a/examples/example_timout_consumer.py b/examples/example_timout_consumer.py index 695230c..f32a2a1 100644 --- a/examples/example_timout_consumer.py +++ b/examples/example_timout_consumer.py @@ -15,5 +15,5 @@ def my_func(item, worker_id): if __name__ == '__main__': - workers = int(sys.argv[sys.argv.index('-w')+1]) if '-w' in sys.argv else 10 + workers = int(sys.argv[sys.argv.index('-w') + 1]) if '-w' in sys.argv else 10 startapp(my_func, workers=workers, config=config) diff --git a/examples/example_tls.py b/examples/example_tls.py index f118b53..d89fd61 100644 --- a/examples/example_tls.py +++ b/examples/example_tls.py @@ -2,13 +2,13 @@ from meesee import RedisQueue, startapp redis_config = { - "host": '127.0.0.1', + "host": "127.0.0.1", "port": 6380, "ssl": True, - "ssl_keyfile":'test_redis_key.pem', - "ssl_certfile":'test_redis_cert.pem', - "ssl_cert_reqs":'required', - "ssl_ca_certs":'test_redis_cert.pem' + "ssl_keyfile": "test_redis_key.pem", + "ssl_certfile": "test_redis_cert.pem", + "ssl_cert_reqs": "required", + "ssl_ca_certs": "test_redis_cert.pem", } @@ -28,7 +28,8 @@ def produce(items): def my_func(item, worker_id): - print('worker: {worker_id} hello, look at me, msg: {item}'.format(worker_id=worker_id, item=item)) + print("worker: {worker_id} hello, look at me, msg: {item}".format(worker_id=worker_id, item=item)) + if __name__ == "__main__": # Create self-signed certs diff --git a/meesee.py b/meesee.py index 3e92a34..e7d4969 100644 --- a/meesee.py +++ b/meesee.py @@ -16,9 +16,9 @@ class RedisQueue: def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None): - # TCP check if connection is alive, Sane defaults - redis_config.setdefault('socket_timeout', 30) - redis_config.setdefault('socket_keepalive', True) + # TCP check if connection is alive + # redis_config.setdefault('socket_timeout', 30) + # redis_config.setdefault('socket_keepalive', True) # Ping check if connection is alive # redis_config.setdefault('health_check_interval', 30) self.r = redis.Redis(**redis_config) @@ -105,7 +105,7 @@ def setup_init_items(func_kwargs, init_kwargs): return {name: func_kwargs[name] for name in init_kwargs.keys()} -def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs): +def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs): # noqa:C901 if isinstance(func, list): func = func[worker_id % len(func)] if isinstance(config, list): @@ -148,7 +148,7 @@ def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=No try: p.starmap(run_worker, args) except (KeyboardInterrupt, SystemExit): - sys.stdout.write('Starting Graceful exit\n') - p.close() - p.join() + sys.stdout.write('Starting Graceful exit\n') + p.close() + p.join() sys.stdout.write('Clean shut down\n') diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..b949767 --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1 @@ +pylama==8.4.1 diff --git a/tests.py b/tests.py index 7b7afbb..f6bd887 100644 --- a/tests.py +++ b/tests.py @@ -20,7 +20,7 @@ def produce(amount): r = RedisQueue(**example_config) - for i in range(1, amount+1): + for i in range(1, amount + 1): r.send(i) @@ -54,9 +54,9 @@ def tearDown(self): def test_incr_key_equals_produces_single_worker(self): expected = 100 produce(expected) - + key = 'test:amount' - kwargs = {'key':key, 'r': RedisQueue} + kwargs = {'key': key, 'r': RedisQueue} init_kwargs = {'r': example_config} startapp(increment_by_one, workers=1, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) @@ -67,22 +67,22 @@ def test_incr_key_equals_produces_single_worker(self): def test_incr_key_equals_produces_five_workers(self): expected = 100 produce(expected) - + key = 'test:amount' - kwargs = {'key':key, 'r': RedisQueue} + kwargs = {'key': key, 'r': RedisQueue} init_kwargs = {'r': example_config} startapp(increment_by_one, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) result = int(redis_instance.get(key)) self.assertEqual(result, expected) - + def test_incr_key_equals_produces_multiple_workers(self): - expected = 123 + expected = 123 produce(expected) - + key = 'test:amount' - kwargs = {'key':key, 'r': RedisQueue} + kwargs = {'key': key, 'r': RedisQueue} init_kwargs = {'r': example_config} startapp(increment_by_one, workers=7, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) @@ -93,13 +93,13 @@ def test_incr_key_equals_produces_multiple_workers(self): def test_all_workers_are_present(self): expected = 268 expected_workers_amount = 5 - expected_workers = {i for i in range(1, expected_workers_amount+1)} + expected_workers = {i for i in range(1, expected_workers_amount + 1)} produce(expected) - + key = 'test:amount' key_workerids = 'test:workerids' - kwargs = {'key':key, 'r': RedisQueue} + kwargs = {'key': key, 'r': RedisQueue} init_kwargs = {'r': example_config} startapp(incr_and_append_worker_id, workers=expected_workers_amount, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) @@ -109,9 +109,9 @@ def test_all_workers_are_present(self): result_workers = redis_instance.lrange(key_workerids, 0, -1) result_workers_set = {int(i) for i in sorted(result_workers)} - + self.assertEqual(result_workers_set, expected_workers) - + def append_item(item, worker_id, key, r, test_result_key): r.r.lpush(test_result_key, item) @@ -126,10 +126,10 @@ def tearDown(self): def test_items_send_are_handled_single_worker(self): expected = ['1', '2', '3'] produce_items(expected) - + key = 'test:items' result_key = 'test:result' - kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key} + kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key} init_kwargs = {'r': example_config} startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) @@ -140,10 +140,10 @@ def test_items_send_are_handled_single_worker(self): def test_items_send_are_handled_multiple_worker(self): expected = ['1', '2', '3'] produce_items(expected) - + key = 'test:items' result_key = 'test:result' - kwargs = {'key':key, 'r': RedisQueue, 'test_result_key': result_key} + kwargs = {'key': key, 'r': RedisQueue, 'test_result_key': result_key} init_kwargs = {'r': example_config} startapp(append_item, workers=5, config=example_config, func_kwargs=kwargs, init_kwargs=init_kwargs) @@ -178,4 +178,3 @@ def test_handle_sys(self): if __name__ == '__main__': unittest.main() -