Posted
over 4 years
ago
by
veryfoolish
#!/usr/bin/env python3
import eventlet
from argparse import ArgumentParser
import eventlet.green.zmq as zmq
from flask import Flask, request
from flask_cors import CORS
from datetime import datetime, timedelta
from flask_socketio import
... [More]
SocketIO
from general_utils import log_util
logger = log_util.get_logger()
app = Flask(__name__)
CORS(app)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, cookie=None)
hb_snapshot_address = "tcp://heartbeat:5589"
hb_incremental_address = "tcp://heartbeat:5556"
history_file = "history"
clear_file = False
def parse_args():
parser = ArgumentParser()
parser.add_argument('-p', '--port', type=int, default=5000, help="Port")
parser.add_argument('--syslog', action='store_true', help="Syslog")
return parser.parse_args()
def get_recent_entries():
global clear_file
entries = []
with open(history_file, 'r') as f:
lines = f.readlines()
for line in lines:
entries.append(eval(line))
output = []
for entry in entries:
dtstring = entry['date']+" "+entry['time']
datetime_object = datetime.strptime(dtstring, "%Y-%m-%d %H:%M:%S.%f")
if datetime_object > datetime.now() - timedelta(days=7):
output.append(entry)
else:
clear_file = True
return output
def flush_history_file(content):
with open(history_file, 'w') as f:
for line in content:
print(line,file=f)
def hb_thread(incremental_address):
logger.info('entering hb_thread')
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(incremental_address)
socket.setsockopt_string(zmq.SUBSCRIBE, u"")
while True:
logger.info('waiting for heartbeat message')
msg = socket.recv_string()
logger.info(f'Received message {msg}')
msg = parse_message(msg)
with open(history_file, 'a') as f:
f.write(str(msg)+'\n')
logger.info(f'Sending message {msg}')
def parse_message(msg):
schema = [
'date',
'time',
'health',
'uptime',
'host',
'name',
'pid',
'status'
]
msg_split = msg.split()
msg_dict = {schema[i]: msg_split[i] for i in range(len(schema))}
return msg_dict
def get_snapshot(snapshot_address):
logger.info('Getting snapshot')
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(snapshot_address)
socket.setsockopt_string(zmq.SUBSCRIBE, u"")
data = socket.recv_string()
socket.disconnect(snapshot_address)
socket.close()
context.term()
data_dict = {
f'{line.split()[0]}-{line.split()[1]}':
{
'host': line.split()[0],
'name': line.split()[1]
}
for line in data.splitlines()
}
return data_dict
@socketio.on('connect')
def connect():
snapshot = get_snapshot(hb_snapshot_address)
socketio.emit('snapshot', snapshot)
logger.info(f'Connected and sent snapshot to {request.remote_addr}')
@socketio.on('disconnect')
def disconnect():
logger.info(f'Disconnected from {request.remote_addr}')
@app.route('/v0/snapshot')
def snapshot():
return get_snapshot(hb_snapshot_address)
@app.route('/v0/history')
def history():
global clear_file
out = get_recent_entries()
if clear_file:
flush_history_file(out)
clear_file = False
else:
logger.info('serving history without clearing file')
return {'history' : out}
if __name__ == '__main__':
args = parse_args()
eventlet.greenthread.spawn(hb_thread, hb_incremental_address)
socketio.run(app, host='0.0.0.0', port=args.port, log_output=True)
The above code is for a heartbeat monitoring application. It uses Eventlet's ZMQ and Flask. The Flask components work fine, but for some reason it stops logging heartbeat messages after several hours. I'm really stumped by this bug and was wondering if anybody had any ideas:
The section that hangs eventually is:
def hb_thread(incremental_address):
logger.info('entering hb_thread')
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(incremental_address)
socket.setsockopt_string(zmq.SUBSCRIBE, u"")
while True:
logger.info('waiting for heartbeat message')
msg = socket.recv_string()
logger.info(f'Received message {msg}')
msg = parse_message(msg)
with open(history_file, 'a') as f:
f.write(str(msg)+'\n')
logger.info(f'Sending message {msg}')
I'm currently running with strace to see if maybe I can spot the bug there. But maybe somebody here has a lot of experience with ZMQ/Eventlet.
Thanks.
[Less]
|
Posted
over 4 years
ago
by
Zhou
I am using Flask and Socketio. Here's a simplification of my code:
from flask import Flask, url_for, render_template, send_from_directory, request, redirect
from flask_socketio import SocketIO, emit, send
app = Flask(__name__)
socketio
... [More]
= SocketIO(app, async_mode='threading')
#socketio = SocketIO(app) # using eventlet, don't write "async_mode='threading' or it will ignore eventlet
@socketio.on('xxx', namespace='/xxx')
def start_bg(cid):
# eventlet.spawn(bg_conversion, cid)
socketio.start_background_task(bg_conversion, cid) # Do some time-consuming things
print('test')
if __name__ == '__main__':
socketio.run(app, port=80)
It works fine without eventlet, but now I have the need to stop a thread so I guess eventlet would be good.
However, if I switch to the annotated code, the client side's callback function will only be invoked after the thread(bg_conversion) exits.
I added a print after the spawning line, and it prints immediately - so it is running in the background. But why isn't the callback function on the client's side invoked immediately? Doesn't the function return immediately after the print?
[Less]
|
Posted
over 4 years
ago
by
joe
I have a weird problem when trying to generate a pdf using PDFkit2 on Python 3.8 using Celery + RabbitMQ and Eventlet all running on Windows Server 2019. If I call an async function, e.g. gen_pdf.delay('source', 'target'), and the source
... [More]
html table contains more than 99 rows, the wkhtmltopdf portion of PDFKit2 just hangs there forever, no error, no clue. If i call the function directly, e.g. gen_pdf('source', 'target'), from the calling app, it can do unlimited rows using the same html source file. I've tried everything to resolve this, the reason i need it to run asynchronously is that there are hundreds of reports that need to be iterated through and the main thread gets bogged down.
Any help would be greatly appreciated as I've scoured the net to no avail. I've included a sample async task for review.
import pdfkit
from celery import Celery
async_task = Celery('async_task')
async_task.config_from_object('async.celeryconfig')
@async_task.task(name='gen_pdf')
def gen_pdf(source, target):
folder = 'some path'
options = {
'allow': 'F:\\inetpub\\root\\static\\',
'header-html': 'F:\\inetpub\\root\\static\\html\\pdf_header.html',
'footer-html': 'F:\\inetpub\\root\\static\\html\\pdf_footer.html',
'encoding': "UTF-8",
'page-size': 'Letter',
'orientation': 'Landscape',
'margin-top': '0.75in',
'margin-right': '0.5in',
'margin-bottom': '0.5in',
'margin-left': '0.5in',
'footer-right': '[page] of [topage]',
}
# converting html to PDF
pdfkit.from_file(f'{folder}{source}', f'{folder}{target}', options=options)
the celeryconfig
BROKER_URL = 'amqp://user:password@localhost:5672//'
BROKER_HEARTBEAT = 0
BROKER_POOL_LIMIT = None
BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True}
BROKER_CONNECTION_TIMEOUT = 30
BROKER_CONNECTION_RETRY = True
BROKER_CONNECTION_MAX_RETRIES = 100
CELERY_RESULT_BACKEND = 'db+sqlite:///async_tasks.db'
CELERY_TRACK_STARTED = True
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
and the startup_script
celery worker --purge -A async_task -P eventlet -l INFO -n "PDF" -E --autoscale=25,5
[Less]
|
Posted
over 4 years
ago
by
ZhouW
On Python 3.8.5 I am using eventlet 0.30.0 and trying to run the following:
import eventlet
eventlet.monkey_patch()
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(10) # never starts
If however I do not call
... [More]
eventlet.monkey_patch(), ThreadPool() starts fine. What is then the recommended way to do multiprocessing if using eventlet.monkey_patch()?
[Less]
|
Posted
over 4 years
ago
by
babak abadkheir
I have this configuration on my flask project :
in requrement.txt:
flask_socketio
eventlet
in my app.py :
from flask_socketio import SocketIO
socketio = SocketIO(app, async_mode="eventlet")
@socketio.on('register')
async def
... [More]
register(data):
await my_asyncfunction(data['machine_id'])
and my_asyncfunction:
async def my_asyncfunction(data):
....
and this is my error:
/usr/local/lib/python3.7/threading.py:870: RuntimeWarning: coroutine
'my_asyncfunction' was never awaited self._target(*self._args,
**self._kwargs)RuntimeWarning: Enable tracemalloc to get the object allocation traceback
so I tried the option async_mode="eventlet" but same error :(
[Less]
|
Posted
over 4 years
ago
by
John Targaryen
Currently, I am trying to add a signal handler that correctly updates the database state whenever our server is re-started/updated by listening for SIGTERM interrupts. Our Flask app is on a Gunicorn server monkey-patched with eventlet.
... [More]
However, whenever the restart happens, we get the error from eventlet 'do not call blocking functions from the mainloop'. How do we get around this issue?
Thank you!
[Less]
|
Posted
over 4 years
ago
by
user14759488
I am trying to get with a qt websocket client data from my websocket server (python, eventlet).
The thing is: I can send messages to my server - but when my server sends data back, my qt application does no recognize any message.
I tried
... [More]
to connect the signal / slot by following commands, but no command did call the "receiveMsg" function:
qt application:
#include
#include
QString message;
QWebSocket m_ws_msg;
void MainWindow::receiveMsg(QString message){
// this function will never be called
}
MainWindow::MainWindow(QWidget *parent)
: QMainWindow(parent)
, ui(new Ui::MainWindow)
{
connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg);
auto ws_opened = [this]() {
if(m_ws_msg.isValid()){
qDebug() << "Send text to server: " << message;
m_ws_msg.sendTextMessage(message);
m_ws_msg.close(QWebSocketProtocol::CloseCodeNormal,"Operation complete - closed by client");
} else {
qDebug() << "Websocket is NOT valid" ;
m_ws_msg.close(QWebSocketProtocol::CloseCodeAbnormalDisconnection,"Operation FAILED - closed");
}
};
connect(&m_ws_msg, &QWebSocket::connected, ws_opened);
message = "test";
m_ws_msg.open(ws_url); // send message to python webserver
}
Has someone any idea why I don't receive any data?
Python websocket server:
# main.py
import eventlet
import os
from eventlet import wsgi, websocket
@websocket.WebSocketWSGI
def handle(ws):
msg = ws.wait()
if(msg != None):
print('Message received: {}'.format(msg))
msg = "ECHO BACK"
ws.send(msg)
def dispatch(environ, start_response):
global execTimer
if environ['PATH_INFO'] == '/message':
print('PATH_INFO == \'/message\'')
return handle(environ, start_response)
if __name__ == '__main__':
listener = eventlet.listen(('127.0.0.1', 7000))
print('\nVisit http://localhost:7000/ in your websocket-capable browser.\n')
wsgi.server(listener, dispatch)
[Less]
|
Posted
over 4 years
ago
by
user14759488
I am trying to get with a qt websocket client data from my websocket server (python, eventlet).
The thing is: I can send messages to my server - but when my server sends data back, my qt application does no recognize any message.
I tried
... [More]
to connect the signal / slot by following commands, but no command did call the "receiveMsg" function:
connect(&m_ws_msg, &QWebSocket::binaryMessageReceived, this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::binaryFrameReceived , this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::textFrameReceived , this, &MainWindow::receiveMsg); // not working
Has someone any idea why I don't receive any data?
Python websocket server:
# main.py
import sqlite3
import json
import eventlet
import os
import struct
from eventlet import wsgi, websocket, greenthread
import ctypes
def str2bin(input):
txt = json.dumps(input)
bin_data = ctypes.create_string_buffer(len(txt))
pos = 0
for c in txt:
struct.pack_into('c', bin_data, pos, c.encode('ascii'))
pos = pos + 1
return bin_data
@websocket.WebSocketWSGI
def handle(ws):
msg = ws.wait()
if(msg != None):
print('Message received: {}'.format(msg))
msg = "ECHO BACK"
bin_data = str2bin(msg)
ws.send(bin_data)
def dispatch(environ, start_response):
"""
WEBSOCKETS
"""
global execTimer
if environ['PATH_INFO'] == '/data':
print('PATH_INFO == \'/data\'')
return saveData(environ, start_response)
elif environ['PATH_INFO'] == '/message':
print('PATH_INFO == \'/message\'')
return handle(environ, start_response)
if __name__ == '__main__':
listener = eventlet.listen(('127.0.0.1', 7000))
print('\nVisit http://localhost:7000/ in your websocket-capable browser.\n')
wsgi.server(listener, dispatch)
My qt application:
MainWindow::MainWindow(QWidget *parent)
: QMainWindow(parent)
, ui(new Ui::MainWindow)
{
ui->setupUi(this);
message = "connect";
/* SEND MESSAGE */
connect(&m_ws_msg, &QWebSocket::disconnected, [] { qDebug() << "m_ws_msg disconnected() called";});
connect(&m_ws_msg, &QWebSocket::binaryMessageReceived, this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::binaryFrameReceived , this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg); // not working
//connect(&m_ws_msg, &QWebSocket::textFrameReceived , this, &MainWindow::receiveMsg); // not working
auto ws_opened = [this]() {
if(m_ws_msg.isValid()){
qDebug() << "Send text to server: " << message;
m_ws_msg.sendTextMessage(message);
m_ws_msg.close(QWebSocketProtocol::CloseCodeNormal,"Operation complete - closed by client");
} else {
qDebug() << "Websocket is NOT valid" ;
m_ws_msg.close(QWebSocketProtocol::CloseCodeAbnormalDisconnection,"Operation FAILED - closed");
}
};
connect(&m_ws_msg, &QWebSocket::connected, ws_opened);
}
// ...
void MainWindow::receiveMsg(QString message){
messageCounter++;
}
[Less]
|
Posted
over 4 years
ago
by
firebush
Why am I getting the following import error when I try to import eventlet's SSL module:
ModuleNotFoundError: No module named 'OpenSSL.tsafe'
Is eventlet's OpenSSL not compatible with recent versions of pyOpenSSL?
Reproduction Steps
... [More]
Using the following Pipenv:
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
[packages]
pyOpenSSL = "*"
eventlet = "*"
[requires]
python_version = "3"
Create a pipenv using that file:
$ pipenv install
Creating a virtualenv for this project…
Pipfile: /tmp/Pipfile
Using /usr/local/bin/python3.8 (3.8.3) to create virtualenv…
...
Successfully created virtual environment!
...
Now import eventlet.green.OpenSSL.SSL:
$ pipenv run python
Python 3.8.3 (default, Jun 29 2020, 18:02:49)
[GCC 8.3.1 20190311 (Red Hat 8.3.1-3)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from eventlet.green.OpenSSL import SSL
Traceback (most recent call last):
File "", line 1, in
File "/home/myuser/.local/share/virtualenvs/tmp-XVr6zr33/lib/python3.8/site-packages/eventlet/green/OpenSSL/__init__.py", line 3, in
from . import tsafe
File "/home/myuser/.local/share/virtualenvs/tmp-XVr6zr33/lib/python3.8/site-packages/eventlet/green/OpenSSL/tsafe.py", line 1, in
from OpenSSL.tsafe import *
ModuleNotFoundError: No module named 'OpenSSL.tsafe'
>>>
[Less]
|
Posted
over 4 years
ago
by
Яша Проценко
I am writing a website for flask. There is a need to use Apsheduler for periodic task execution. But the problem is that the task is not executed periodically due to a conflict with eventlet. On the forums, I found that you need to wrap
... [More]
the blocking construct in eventlet.tpool.execute(). Tell me what exactly do I need to wrap?
There is a class for working with tasks.
from app import scheduler
import json
class SchedulerTask(object):
schedulers_list_publish = None
def __init__(self):
self.schedulers_list_publish = list()
def add_scheduler_publish(self, dev_id, mqtt, topic_req_res, m_req_state, qos_req, timer):
id_sch = dev_id + "_scheduler"
sc = scheduler.add_job(self.publish_async, args=[mqtt, topic_req_res, m_req_state, qos_req, timer, id_sch],
id=id_sch, trigger='interval', seconds=timer)
print(sc.id)
self.schedulers_list_publish.append(id_sch)
return id_sch
def start_schedulers(self):
print(self.schedulers_list_publish)
scheduler.start()
@staticmethod
def del_schedulers(s_id):
scheduler.remove_job(s_id)
# передача запроса на получение данных
@staticmethod
def publish_async(mqtt, topic_req_res, m_req_state, qos_req, timer, id_sch):
try:
msg = json.dumps(m_req_state)
mqtt.publish(topic_req_res, msg, qos_req)
except Exception as ex:
print("Error publish: " + str(ex))
I call from a function from another module:
def _handle_connect(self, client, userdata, flags, rc):
code_list = list()
for dev in self.devices:
if dev.device_code not in code_list: # запущен ли уже поток с таким кодом
code_list.append(dev.device_code)
mqtt.subscribe("BK" + dev.device_code + self.type_topic[1], self.qos_sub)
self.schedulers_list.append(tpool.execute(sch_task.add_scheduler_publish, dev.device_code,
mqtt,
"BK" + dev.device_code + self.type_topic[0],
self.m_request_state,
self.qos_request,
self.POOL_TIME))
sch_task.start_schedulers()
[Less]
|