10
I Use This!
Activity Not Available

News

Analyzed 4 months ago. based on code collected 5 months ago.
Posted almost 6 years ago by Jon Cuna
I have a long-running background task that spins the flask app again and to do some auditing in the background. The front end is a web application and uses socketio to communicate with the backend main flask app to handle ... [More] multiple async behaviors. I make sure to only fire the background task when the main thread is created and I do eventlet.monkey_patch() only at the very beginning of the script. if the background thread has a lot of stuff to audit, it blocks the main thread, the more stuff in memory, the longer it blocks the main thread. The audit is not CPU intensive at all, it's just some db inserts and logging. The items that need to be audited are added to an object in memory from the main thread and are passed by reference to the child thread. (Like a in memory queue) If I don't monkey patch eventlet, then everything works fine but then flask's auto reload won't work, and I need it for development. I run the app like socketio.run(app) in dev Behavior persists when using gunicorn/eventlet When the background task is sleeping sleep(2), there's no block happening. import eventlet eventlet.monkey_patch() # ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread # the async code that runs is the following class AsyncAuditor(threading.Thread): def __init__(self, tasks: list, stop: threading.Event): super().__init__() self.tasks = tasks self.stop_event = stop def run(self): from app import init_app from dal import db app = init_app(mode='sys') app.logger.info('starting async audit thread') with app.app_context(): try: while not self.stop_event.is_set(): if len(self.tasks) > 0: task: dict for task in self.tasks: app.logger.debug(threading.current_thread().name + ' new audit record') task.payload = encryptor.encrypt(task.payload) task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0] db.session.add(task) self.tasks.clear() db.session.commit() sleep(2) app.logger.info('exiting async audit thread') except BaseException as e: app.logger.exception('Exception') # there's some code that tries to gracefully exit if app needs to exit stop_event = threading.Event() async_task = AsyncAuditor(API.audit_tasks, stop_event) async_task.start() def exit_async_thread(): stop_event.set() async_task.join() atexit.register(exit_async_thread) I expect that while the child thread is working, the main thread would not be blocked by any db operations, in fact, like I mentioned before, if I don't monkey patch eventlet, then everything works fine in the main thread and the child one as well. Instead, I'm getting 9 and even 30 seconds delays when hitting an endpoint in the flask application while the background task is working. [Less]
Posted almost 6 years ago by Jon Cuna
I have a long-running background task that spins the flask app again and to do some auditing in the background. The front end is a web application and uses socketio to communicate with the backend main flask app to handle multiple async ... [More] behaviors. I make sure to only fire the background task when the main thread is created and I do eventlet.monkey_patch() only at the very beginning of the script. if the background thread has a lot of stuff to audit, it blocks the main thread, the more stuff in memory, the longer it blocks the main thread. The audit is not CPU intensive at all, it's just some db inserts and logging. The items that need to be audited are added to an object in memory from the main thread and are passed by reference to the child thread. (Like a in memory queue) If I don't monkey patch eventlet, then everything works fine but then flask's auto reload won't work, and I need it for development. I run the app like socketio.run(app) in dev Behavior persists when using gunicorn/eventlet When the background task is sleeping sleep(2), there's no block happening. import eventlet eventlet.monkey_patch() # ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread # the async code that runs is the following class AsyncAuditor(threading.Thread): def __init__(self, tasks: list, stop: threading.Event): super().__init__() self.tasks = tasks self.stop_event = stop def run(self): from app import init_app from dal import db app = init_app(mode='sys') app.logger.info('starting async audit thread') with app.app_context(): try: while not self.stop_event.is_set(): if len(self.tasks) > 0: task: dict for task in self.tasks: app.logger.debug(threading.current_thread().name + ' new audit record') task.payload = encryptor.encrypt(task.payload) task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0] db.session.add(task) self.tasks.clear() db.session.commit() sleep(2) app.logger.info('exiting async audit thread') except BaseException as e: app.logger.exception('Exception') # there's some code that tries to gracefully exit if app needs to exit stop_event = threading.Event() async_task = AsyncAuditor(API.audit_tasks, stop_event) async_task.start() def exit_async_thread(): stop_event.set() async_task.join() atexit.register(exit_async_thread) I expect that while the child thread is working, the main thread would not be blocked by any db operations, in fact, like I mentioned before, if I don't monkey patch eventlet, then everything works fine in the main thread and the child one as well. Instead, I'm getting 9 and even 30 seconds delays when hitting an endpoint in the flask application while the background task is working. [Less]
Posted almost 6 years ago by Admiral-Chicken
I have two python applications running simultaneously, one of them is a wsgi web socket server that clients connect to for real-time updates, it solely uses the python eventlet library. The second python application is a ... [More] processing script that does some calculations. I need the processing script to notify the web socket server when a new calculation is done. I figured that the web socket server should also contain some REST routes other applications and scripts can call, but once I run the command wsgi.server(listener, dispatch) to start the wsgi server, I am not sure how to create a REST Endpoint as well because the app strictly starts strictly listening in for web socket connections. Would creating a new process and creating like an Eve Rest endpoint be the best solution? I noticed a lot of people have been using the SocketIO library to run REST Endpoints and Web Sockets simultaneously however the SocketIO library is a package I do not have available for this application [Less]
Posted almost 6 years ago by Kisiel
I have Flask application and I want to use flask-socketio to handle webosockets with gunicorn and eventlets. Although, when I try to connect my test client (http://www.websocket.org/echo.html) I am receiving: WebSocket ... [More] connection to 'ws://localhost/socket.io?encoding=text' failed: Error during WebSocket handshake: Unexpected response code: 200 socketio_app.py from flask import Flask, render_template from flask_socketio import SocketIO app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, port=9090, host='0.0.0.0', async_mode='eventlet', debug=True) @app.route('/socket.io') def index(): return render_template('index.html') if __name__ == '__main__': socketio.run(app) and I run it in this way: gunicorn -k eventlet -w 1 socketio_app:app -b 0.0.0.0:9090 --error-logfile - --access-logfile - --log-level debug Should I use it in another way? Should I manually modify my response like that? @app.route('/socket.io') def index(): return Response(status=101, headers={ 'Connection': 'Upgrade', 'Upgrade': 'websocket' }) [Less]
Posted almost 6 years ago by Kisiel
I have Flask application and I want to use flask-socketio to handle webosockets with gunicorn and eventlets. Although, when I try to connect my test client (http://www.websocket.org/echo.html) I am receiving: WebSocket connection to ... [More] 'ws://localhost/socket.io?encoding=text' failed: Error during WebSocket handshake: Unexpected response code: 200 socketio_app.py from flask import Flask, render_template from flask_socketio import SocketIO app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, port=9090, host='0.0.0.0', async_mode='eventlet', debug=True) @app.route('/socket.io') def index(): return render_template('index.html') if __name__ == '__main__': socketio.run(app) and I run it in this way: gunicorn -k eventlet -w 1 socketio_app:app -b 0.0.0.0:9090 --error-logfile - --access-logfile - --log-level debug Should I use it in another way? Should I manually modify my response like that? @app.route('/socket.io') def index(): return Response(status=101, headers={ 'Connection': 'Upgrade', 'Upgrade': 'websocket' }) [Less]
Posted about 6 years ago by mdod
I am trying to setup two eventlet servers that listen concurrently on different ports. The first server in the code below is for a SocketIO implementation and the second is for an external connection. Both function separately but not at ... [More] the same time. if __name__ == '__main__': eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 4000)), app) s = eventlet.listen(('0.0.0.0', 6000)) pool = eventlet.GreenPool(5) while True: c, address = s.accept() pool.spawn_n(function, c) [Less]
Posted about 6 years ago by mdod
I am trying to setup two eventlet servers that listen concurrently on different ports. The first server in the code below is for a SocketIO implementation and the second is for an external connection. Both function ... [More] separately but not at the same time. if __name__ == '__main__': eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 4000)), app) s = eventlet.listen(('0.0.0.0', 6000)) pool = eventlet.GreenPool(5) while True: c, address = s.accept() pool.spawn_n(function, c) [Less]
Posted about 6 years ago by ik4ru52k
after around 2-3 minutes, celery throws the following traceback: File "c:\program files (x86)\lib\site-packages\eventlet\hubs\selects.py", line 55, in wait listeners.get(fileno, noop).cb(fileno) [2019-06-29 ... [More] 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\celery\worker\pidbox.py", line 120, in loop connection.drain_events(timeout=1.0) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\connection.py", line 315, in drain_events return self.transport.drain_events(self.connection, **kwargs) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events return connection.drain_events(**kwargs) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 500, in drain_events while not self.blocking_read(timeout): [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 505, in blocking_read frame = self.transport.read_frame() [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 252, in read_frame frame_header = read(7, True) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 444, in _read [2019-06-29 20:38:09,849: WARNING/MainProcess] OSError: Server unexpectedly closed connection [2019-06-29 20:38:09,849: WARNING/MainProcess] Removing descriptor: 1044 Nevertheless, the worker of the current task continues it's work. But within my web application i'm continously polling an URL to get updates of the current task. After the above mentioned error occurs, the connection to RabbitMQ seems to be closed so that the application is not able to access the result backend (RabbitMQ as well) anymore. I've spent a lot time on figuring out, what exactly could throw that error. I came to the very vague conclusion that it might be because the worker (currently in use: eventlet, because running on Windows) cannot send heartbeats to RabbitMQ. But i'm getting totally confused, because some advises to configure Heartbeat value to 0 (which i did as you can see down below in my config). Also that doesn't resolve the issue. This is my current configuration, which is some kind of desperate mishmash of all best advises on github and stackoverflow: CELERY_BROKER_URL = 'pyamqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True} CELERY_BROKER_HEARTBEAT = 0 CELERY_BROKER_POOL_LIMIT = None CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True} CELERY_BROKER_CONNECTION_TIMEOUT = 20 CELERY_BROKER_CONNECTION_RETRY = True CELERY_BROKER_CONNECTION_MAX_RETRIES = 100 CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True CELERY_IGNORE_RESULT = False SQLALCHEMY_DATABASE_URI = userpass + basedir + dbname SQLALCHEMY_TRACK_MODIFICATIONS = False I would expect, that the connection between RabbitMQ and Celery is stable and no connection error occurs. Biggest problem for me for the time being is, that i have absolutely no clue what exactly the error triggers. Is it a worker not sending a heartbeat to RabbitMQ? Is it RabbitMQ itself? I'm very happy for every kind of hint. EDIT: Forgot to mention, that i also read that this has to do something with the hostname. I start celery with: celery -A tasks.celery worker --loglevel=info --pool=eventlet --without-mingle --without-gossip --without-heartbeat and it's running on localhost (webserver is running via XAMPP). If i check RabbitMQ console, its not "rabbit@localhost" but "rabbit@DESKTOP-xxxx". Maybe this leads to the error? [Less]
Posted about 6 years ago by ik4ru52k
after around 2-3 minutes, celery throws the following traceback: File "c:\program files (x86)\lib\site-packages\eventlet\hubs\selects.py", line 55, in wait listeners.get(fileno, noop).cb(fileno) [2019-06-29 20:38:09,849: ... [More] WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\celery\worker\pidbox.py", line 120, in loop connection.drain_events(timeout=1.0) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\connection.py", line 315, in drain_events return self.transport.drain_events(self.connection, **kwargs) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events return connection.drain_events(**kwargs) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 500, in drain_events while not self.blocking_read(timeout): [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 505, in blocking_read frame = self.transport.read_frame() [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 252, in read_frame frame_header = read(7, True) [2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 444, in _read [2019-06-29 20:38:09,849: WARNING/MainProcess] OSError: Server unexpectedly closed connection [2019-06-29 20:38:09,849: WARNING/MainProcess] Removing descriptor: 1044 Nevertheless, the worker of the current task continues it's work. But within my web application i'm continously polling an URL to get updates of the current task. After the above mentioned error occurs, the connection to RabbitMQ seems to be closed so that the application is not able to access the result backend (RabbitMQ as well) anymore. I've spent a lot time on figuring out, what exactly could throw that error. I came to the very vague conclusion that it might be because the worker (currently in use: eventlet, because running on Windows) cannot send heartbeats to RabbitMQ. But i'm getting totally confused, because some advises to configure Heartbeat value to 0 (which i did as you can see down below in my config). Also that doesn't resolve the issue. This is my current configuration, which is some kind of desperate mishmash of all best advises on github and stackoverflow: CELERY_BROKER_URL = 'pyamqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True} CELERY_BROKER_HEARTBEAT = 0 CELERY_BROKER_POOL_LIMIT = None CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True} CELERY_BROKER_CONNECTION_TIMEOUT = 20 CELERY_BROKER_CONNECTION_RETRY = True CELERY_BROKER_CONNECTION_MAX_RETRIES = 100 CELERY_TIMEZONE = 'UTC' CELERY_ENABLE_UTC = True CELERY_IGNORE_RESULT = False SQLALCHEMY_DATABASE_URI = userpass + basedir + dbname SQLALCHEMY_TRACK_MODIFICATIONS = False I would expect, that the connection between RabbitMQ and Celery is stable and no connection error occurs. Biggest problem for me for the time being is, that i have absolutely no clue what exactly the error triggers. Is it a worker not sending a heartbeat to RabbitMQ? Is it RabbitMQ itself? I'm very happy for every kind of hint. EDIT: Forgot to mention, that i also read that this has to do something with the hostname. I start celery with: celery -A tasks.celery worker --loglevel=info --pool=eventlet --without-mingle --without-gossip --without-heartbeat and it's running on localhost (webserver is running via XAMPP). If i check RabbitMQ console, its not "rabbit@localhost" but "rabbit@DESKTOP-xxxx". Maybe this leads to the error? [Less]
Posted about 6 years ago by CadentOrange
We have a Flask application that is served via gunicorn, using the eventlet worker. We're deploying the application in a kubernetes pod, with the idea of scaling the number of pods depending on workload. The recommended ... [More] settings for the number of workers in gunicorn is 2 - 4 x $NUM_CPUS. See docs. I've previously deployed services on dedicated physical hardware where such calculations made sense. On a 4 core machine, having 16 workers sounds OK and we eventually bumped it to 32 workers. Does this calculation still apply in a kubernetes pod using an async worker particularly as: There could be multiple pods on a single node. The same service will be run in multiple pods. How should I set the number of gunicorn workers? Set it to -w 1 and let kubernetes handle the scaling via pods? Set it to 2-4 x $NUM_CPU on the kubernetes nodes. On one pod or multiple? Something else entirely? Update We decided to go with the 1st option, which is our current approach. Set the number of gunicorn works to 1, and scale horizontally by increasing the number of pods. Otherwise there will be too many moving parts plus we won't be leveraging Kubernetes to its full potential. [Less]