Posted
almost 6 years
ago
by
Jon Cuna
I have a long-running background task that spins the flask app again and to do some auditing in the background. The front end is a web application and uses socketio to communicate with the backend main flask app to handle
... [More]
multiple async behaviors.
I make sure to only fire the background task when the main thread is created and I do eventlet.monkey_patch() only at the very beginning of the script.
if the background thread has a lot of stuff to audit, it blocks the main thread, the more stuff in memory, the longer it blocks the main thread. The audit is not CPU intensive at all, it's just some db inserts and logging.
The items that need to be audited are added to an object in memory from the main thread and are passed by reference to the child thread. (Like a in memory queue)
If I don't monkey patch eventlet, then everything works fine but then flask's auto reload won't work, and I need it for development.
I run the app like socketio.run(app) in dev
Behavior persists when using gunicorn/eventlet
When the background task is sleeping sleep(2), there's no block happening.
import eventlet
eventlet.monkey_patch()
# ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread
# the async code that runs is the following
class AsyncAuditor(threading.Thread):
def __init__(self, tasks: list, stop: threading.Event):
super().__init__()
self.tasks = tasks
self.stop_event = stop
def run(self):
from app import init_app
from dal import db
app = init_app(mode='sys')
app.logger.info('starting async audit thread')
with app.app_context():
try:
while not self.stop_event.is_set():
if len(self.tasks) > 0:
task: dict
for task in self.tasks:
app.logger.debug(threading.current_thread().name + ' new audit record')
task.payload = encryptor.encrypt(task.payload)
task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0]
db.session.add(task)
self.tasks.clear()
db.session.commit()
sleep(2)
app.logger.info('exiting async audit thread')
except BaseException as e:
app.logger.exception('Exception')
# there's some code that tries to gracefully exit if app needs to exit
stop_event = threading.Event()
async_task = AsyncAuditor(API.audit_tasks, stop_event)
async_task.start()
def exit_async_thread():
stop_event.set()
async_task.join()
atexit.register(exit_async_thread)
I expect that while the child thread is working, the main thread would not be blocked by any db operations, in fact, like I mentioned before, if I don't monkey patch eventlet, then everything works fine in the main thread and the child one as well. Instead, I'm getting 9 and even 30 seconds delays when hitting an endpoint in the flask application while the background task is working.
[Less]
|
Posted
almost 6 years
ago
by
Jon Cuna
I have a long-running background task that spins the flask app again and to do some auditing in the background. The front end is a web application and uses socketio to communicate with the backend main flask app to handle multiple async
... [More]
behaviors.
I make sure to only fire the background task when the main thread is created and I do eventlet.monkey_patch() only at the very beginning of the script.
if the background thread has a lot of stuff to audit, it blocks the main thread, the more stuff in memory, the longer it blocks the main thread. The audit is not CPU intensive at all, it's just some db inserts and logging.
The items that need to be audited are added to an object in memory from the main thread and are passed by reference to the child thread. (Like a in memory queue)
If I don't monkey patch eventlet, then everything works fine but then flask's auto reload won't work, and I need it for development.
I run the app like socketio.run(app) in dev
Behavior persists when using gunicorn/eventlet
When the background task is sleeping sleep(2), there's no block happening.
import eventlet
eventlet.monkey_patch()
# ... rest of code is a basic flask app create_app factory that at some # point starts the new thread if it's the main thread
# the async code that runs is the following
class AsyncAuditor(threading.Thread):
def __init__(self, tasks: list, stop: threading.Event):
super().__init__()
self.tasks = tasks
self.stop_event = stop
def run(self):
from app import init_app
from dal import db
app = init_app(mode='sys')
app.logger.info('starting async audit thread')
with app.app_context():
try:
while not self.stop_event.is_set():
if len(self.tasks) > 0:
task: dict
for task in self.tasks:
app.logger.debug(threading.current_thread().name + ' new audit record')
task.payload = encryptor.encrypt(task.payload)
task.ip = struct.unpack("!I", socket.inet_aton(task.ip))[0]
db.session.add(task)
self.tasks.clear()
db.session.commit()
sleep(2)
app.logger.info('exiting async audit thread')
except BaseException as e:
app.logger.exception('Exception')
# there's some code that tries to gracefully exit if app needs to exit
stop_event = threading.Event()
async_task = AsyncAuditor(API.audit_tasks, stop_event)
async_task.start()
def exit_async_thread():
stop_event.set()
async_task.join()
atexit.register(exit_async_thread)
I expect that while the child thread is working, the main thread would not be blocked by any db operations, in fact, like I mentioned before, if I don't monkey patch eventlet, then everything works fine in the main thread and the child one as well. Instead, I'm getting 9 and even 30 seconds delays when hitting an endpoint in the flask application while the background task is working.
[Less]
|
Posted
almost 6 years
ago
by
Admiral-Chicken
I have two python applications running simultaneously, one of them is a wsgi web socket server that clients connect to for real-time updates, it solely uses the python eventlet library. The second python application is a
... [More]
processing script that does some calculations. I need the processing script to notify the web socket server when a new calculation is done.
I figured that the web socket server should also contain some REST routes other applications and scripts can call, but once I run the command
wsgi.server(listener, dispatch)
to start the wsgi server, I am not sure how to create a REST Endpoint as well because the app strictly starts strictly listening in for web socket connections. Would creating a new process and creating like an Eve Rest endpoint be the best solution?
I noticed a lot of people have been using the SocketIO library to run REST Endpoints and Web Sockets simultaneously however the SocketIO library is a package I do not have available for this application
[Less]
|
Posted
almost 6 years
ago
by
Kisiel
I have Flask application and I want to use flask-socketio to handle webosockets with gunicorn and eventlets.
Although, when I try to connect my test client (http://www.websocket.org/echo.html) I am receiving:
WebSocket
... [More]
connection to 'ws://localhost/socket.io?encoding=text' failed: Error during WebSocket handshake: Unexpected response code: 200
socketio_app.py
from flask import Flask, render_template
from flask_socketio import SocketIO
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, port=9090, host='0.0.0.0', async_mode='eventlet', debug=True)
@app.route('/socket.io')
def index():
return render_template('index.html')
if __name__ == '__main__':
socketio.run(app)
and I run it in this way:
gunicorn -k eventlet -w 1 socketio_app:app -b 0.0.0.0:9090 --error-logfile - --access-logfile - --log-level debug
Should I use it in another way? Should I manually modify my response like that?
@app.route('/socket.io')
def index():
return Response(status=101, headers={
'Connection': 'Upgrade',
'Upgrade': 'websocket'
})
[Less]
|
Posted
almost 6 years
ago
by
Kisiel
I have Flask application and I want to use flask-socketio to handle webosockets with gunicorn and eventlets.
Although, when I try to connect my test client (http://www.websocket.org/echo.html) I am receiving:
WebSocket connection to
... [More]
'ws://localhost/socket.io?encoding=text' failed: Error during WebSocket handshake: Unexpected response code: 200
socketio_app.py
from flask import Flask, render_template
from flask_socketio import SocketIO
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, port=9090, host='0.0.0.0', async_mode='eventlet', debug=True)
@app.route('/socket.io')
def index():
return render_template('index.html')
if __name__ == '__main__':
socketio.run(app)
and I run it in this way:
gunicorn -k eventlet -w 1 socketio_app:app -b 0.0.0.0:9090 --error-logfile - --access-logfile - --log-level debug
Should I use it in another way? Should I manually modify my response like that?
@app.route('/socket.io')
def index():
return Response(status=101, headers={
'Connection': 'Upgrade',
'Upgrade': 'websocket'
})
[Less]
|
Posted
about 6 years
ago
by
mdod
I am trying to setup two eventlet servers that listen concurrently on different ports.
The first server in the code below is for a SocketIO implementation and the second is for an external connection. Both function separately but not at
... [More]
the same time.
if __name__ == '__main__':
eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 4000)), app)
s = eventlet.listen(('0.0.0.0', 6000))
pool = eventlet.GreenPool(5)
while True:
c, address = s.accept()
pool.spawn_n(function, c)
[Less]
|
Posted
about 6 years
ago
by
mdod
I am trying to setup two eventlet servers that listen concurrently on different ports.
The first server in the code below is for a SocketIO implementation and the second is for an external connection. Both function
... [More]
separately but not at the same time.
if __name__ == '__main__':
eventlet.wsgi.server(eventlet.listen(('0.0.0.0', 4000)), app)
s = eventlet.listen(('0.0.0.0', 6000))
pool = eventlet.GreenPool(5)
while True:
c, address = s.accept()
pool.spawn_n(function, c)
[Less]
|
Posted
about 6 years
ago
by
ik4ru52k
after around 2-3 minutes, celery throws the following traceback:
File "c:\program files (x86)\lib\site-packages\eventlet\hubs\selects.py", line 55, in wait
listeners.get(fileno, noop).cb(fileno)
[2019-06-29
... [More]
20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
connection.drain_events(timeout=1.0)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\connection.py", line 315, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
return connection.drain_events(**kwargs)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 505, in blocking_read
frame = self.transport.read_frame()
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 252, in read_frame
frame_header = read(7, True)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 444, in _read
[2019-06-29 20:38:09,849: WARNING/MainProcess] OSError: Server unexpectedly closed connection
[2019-06-29 20:38:09,849: WARNING/MainProcess] Removing descriptor: 1044
Nevertheless, the worker of the current task continues it's work. But within my web application i'm continously polling an URL to get updates of the current task. After the above mentioned error occurs, the connection to RabbitMQ seems to be closed so that the application is not able to access the result backend (RabbitMQ as well) anymore.
I've spent a lot time on figuring out, what exactly could throw that error. I came to the very vague conclusion that it might be because the worker (currently in use: eventlet, because running on Windows) cannot send heartbeats to RabbitMQ.
But i'm getting totally confused, because some advises to configure Heartbeat value to 0 (which i did as you can see down below in my config). Also that doesn't resolve the issue.
This is my current configuration, which is some kind of desperate mishmash of all best advises on github and stackoverflow:
CELERY_BROKER_URL = 'pyamqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
CELERY_BROKER_HEARTBEAT = 0
CELERY_BROKER_POOL_LIMIT = None
CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
CELERY_BROKER_CONNECTION_TIMEOUT = 20
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 100
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_IGNORE_RESULT = False
SQLALCHEMY_DATABASE_URI = userpass + basedir + dbname
SQLALCHEMY_TRACK_MODIFICATIONS = False
I would expect, that the connection between RabbitMQ and Celery is stable and no connection error occurs.
Biggest problem for me for the time being is, that i have absolutely no clue what exactly the error triggers. Is it a worker not sending a heartbeat to RabbitMQ? Is it RabbitMQ itself?
I'm very happy for every kind of hint.
EDIT:
Forgot to mention, that i also read that this has to do something with the hostname.
I start celery with:
celery -A tasks.celery worker --loglevel=info --pool=eventlet --without-mingle --without-gossip --without-heartbeat
and it's running on localhost (webserver is running via XAMPP).
If i check RabbitMQ console, its not "rabbit@localhost" but "rabbit@DESKTOP-xxxx". Maybe this leads to the error?
[Less]
|
Posted
about 6 years
ago
by
ik4ru52k
after around 2-3 minutes, celery throws the following traceback:
File "c:\program files (x86)\lib\site-packages\eventlet\hubs\selects.py", line 55, in wait
listeners.get(fileno, noop).cb(fileno)
[2019-06-29 20:38:09,849:
... [More]
WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\celery\worker\pidbox.py", line 120, in loop
connection.drain_events(timeout=1.0)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\connection.py", line 315, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\kombu\transport\pyamqp.py", line 103, in drain_events
return connection.drain_events(**kwargs)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\connection.py", line 505, in blocking_read
frame = self.transport.read_frame()
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 252, in read_frame
frame_header = read(7, True)
[2019-06-29 20:38:09,849: WARNING/MainProcess] File "c:\program files (x86)\lib\site-packages\amqp\transport.py", line 444, in _read
[2019-06-29 20:38:09,849: WARNING/MainProcess] OSError: Server unexpectedly closed connection
[2019-06-29 20:38:09,849: WARNING/MainProcess] Removing descriptor: 1044
Nevertheless, the worker of the current task continues it's work. But within my web application i'm continously polling an URL to get updates of the current task. After the above mentioned error occurs, the connection to RabbitMQ seems to be closed so that the application is not able to access the result backend (RabbitMQ as well) anymore.
I've spent a lot time on figuring out, what exactly could throw that error. I came to the very vague conclusion that it might be because the worker (currently in use: eventlet, because running on Windows) cannot send heartbeats to RabbitMQ.
But i'm getting totally confused, because some advises to configure Heartbeat value to 0 (which i did as you can see down below in my config). Also that doesn't resolve the issue.
This is my current configuration, which is some kind of desperate mishmash of all best advises on github and stackoverflow:
CELERY_BROKER_URL = 'pyamqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
CELERY_BROKER_HEARTBEAT = 0
CELERY_BROKER_POOL_LIMIT = None
CELERY_BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
CELERY_BROKER_CONNECTION_TIMEOUT = 20
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 100
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_IGNORE_RESULT = False
SQLALCHEMY_DATABASE_URI = userpass + basedir + dbname
SQLALCHEMY_TRACK_MODIFICATIONS = False
I would expect, that the connection between RabbitMQ and Celery is stable and no connection error occurs.
Biggest problem for me for the time being is, that i have absolutely no clue what exactly the error triggers. Is it a worker not sending a heartbeat to RabbitMQ? Is it RabbitMQ itself?
I'm very happy for every kind of hint.
EDIT:
Forgot to mention, that i also read that this has to do something with the hostname.
I start celery with:
celery -A tasks.celery worker --loglevel=info --pool=eventlet --without-mingle --without-gossip --without-heartbeat
and it's running on localhost (webserver is running via XAMPP).
If i check RabbitMQ console, its not "rabbit@localhost" but "rabbit@DESKTOP-xxxx". Maybe this leads to the error?
[Less]
|
Posted
about 6 years
ago
by
CadentOrange
We have a Flask application that is served via gunicorn, using the eventlet worker. We're deploying the application in a kubernetes pod, with the idea of scaling the number of pods depending on workload.
The recommended
... [More]
settings for the number of workers in gunicorn is 2 - 4 x $NUM_CPUS. See docs. I've previously deployed services on dedicated physical hardware where such calculations made sense. On a 4 core machine, having 16 workers sounds OK and we eventually bumped it to 32 workers.
Does this calculation still apply in a kubernetes pod using an async worker particularly as:
There could be multiple pods on a single node.
The same service will be run in multiple pods.
How should I set the number of gunicorn workers?
Set it to -w 1 and let kubernetes handle the scaling via pods?
Set it to 2-4 x $NUM_CPU on the kubernetes nodes. On one pod or multiple?
Something else entirely?
Update
We decided to go with the 1st option, which is our current approach. Set the number of gunicorn works to 1, and scale horizontally by increasing the number of pods. Otherwise there will be too many moving parts plus we won't be leveraging Kubernetes to its full potential.
[Less]
|