10
I Use This!
Activity Not Available

News

Analyzed 4 months ago. based on code collected 5 months ago.
Posted over 4 years ago by veryfoolish
#!/usr/bin/env python3 import eventlet from argparse import ArgumentParser import eventlet.green.zmq as zmq from flask import Flask, request from flask_cors import CORS from datetime import datetime, timedelta from flask_socketio import ... [More] SocketIO from general_utils import log_util logger = log_util.get_logger() app = Flask(__name__) CORS(app) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, cookie=None) hb_snapshot_address = "tcp://heartbeat:5589" hb_incremental_address = "tcp://heartbeat:5556" history_file = "history" clear_file = False def parse_args(): parser = ArgumentParser() parser.add_argument('-p', '--port', type=int, default=5000, help="Port") parser.add_argument('--syslog', action='store_true', help="Syslog") return parser.parse_args() def get_recent_entries(): global clear_file entries = [] with open(history_file, 'r') as f: lines = f.readlines() for line in lines: entries.append(eval(line)) output = [] for entry in entries: dtstring = entry['date']+" "+entry['time'] datetime_object = datetime.strptime(dtstring, "%Y-%m-%d %H:%M:%S.%f") if datetime_object > datetime.now() - timedelta(days=7): output.append(entry) else: clear_file = True return output def flush_history_file(content): with open(history_file, 'w') as f: for line in content: print(line,file=f) def hb_thread(incremental_address): logger.info('entering hb_thread') context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(incremental_address) socket.setsockopt_string(zmq.SUBSCRIBE, u"") while True: logger.info('waiting for heartbeat message') msg = socket.recv_string() logger.info(f'Received message {msg}') msg = parse_message(msg) with open(history_file, 'a') as f: f.write(str(msg)+'\n') logger.info(f'Sending message {msg}') def parse_message(msg): schema = [ 'date', 'time', 'health', 'uptime', 'host', 'name', 'pid', 'status' ] msg_split = msg.split() msg_dict = {schema[i]: msg_split[i] for i in range(len(schema))} return msg_dict def get_snapshot(snapshot_address): logger.info('Getting snapshot') context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(snapshot_address) socket.setsockopt_string(zmq.SUBSCRIBE, u"") data = socket.recv_string() socket.disconnect(snapshot_address) socket.close() context.term() data_dict = { f'{line.split()[0]}-{line.split()[1]}': { 'host': line.split()[0], 'name': line.split()[1] } for line in data.splitlines() } return data_dict @socketio.on('connect') def connect(): snapshot = get_snapshot(hb_snapshot_address) socketio.emit('snapshot', snapshot) logger.info(f'Connected and sent snapshot to {request.remote_addr}') @socketio.on('disconnect') def disconnect(): logger.info(f'Disconnected from {request.remote_addr}') @app.route('/v0/snapshot') def snapshot(): return get_snapshot(hb_snapshot_address) @app.route('/v0/history') def history(): global clear_file out = get_recent_entries() if clear_file: flush_history_file(out) clear_file = False else: logger.info('serving history without clearing file') return {'history' : out} if __name__ == '__main__': args = parse_args() eventlet.greenthread.spawn(hb_thread, hb_incremental_address) socketio.run(app, host='0.0.0.0', port=args.port, log_output=True) The above code is for a heartbeat monitoring application. It uses Eventlet's ZMQ and Flask. The Flask components work fine, but for some reason it stops logging heartbeat messages after several hours. I'm really stumped by this bug and was wondering if anybody had any ideas: The section that hangs eventually is: def hb_thread(incremental_address): logger.info('entering hb_thread') context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect(incremental_address) socket.setsockopt_string(zmq.SUBSCRIBE, u"") while True: logger.info('waiting for heartbeat message') msg = socket.recv_string() logger.info(f'Received message {msg}') msg = parse_message(msg) with open(history_file, 'a') as f: f.write(str(msg)+'\n') logger.info(f'Sending message {msg}') I'm currently running with strace to see if maybe I can spot the bug there. But maybe somebody here has a lot of experience with ZMQ/Eventlet. Thanks. [Less]
Posted over 4 years ago by Zhou
I am using Flask and Socketio. Here's a simplification of my code: from flask import Flask, url_for, render_template, send_from_directory, request, redirect from flask_socketio import SocketIO, emit, send app = Flask(__name__) socketio ... [More] = SocketIO(app, async_mode='threading') #socketio = SocketIO(app) # using eventlet, don't write "async_mode='threading' or it will ignore eventlet @socketio.on('xxx', namespace='/xxx') def start_bg(cid): # eventlet.spawn(bg_conversion, cid) socketio.start_background_task(bg_conversion, cid) # Do some time-consuming things print('test') if __name__ == '__main__': socketio.run(app, port=80) It works fine without eventlet, but now I have the need to stop a thread so I guess eventlet would be good. However, if I switch to the annotated code, the client side's callback function will only be invoked after the thread(bg_conversion) exits. I added a print after the spawning line, and it prints immediately - so it is running in the background. But why isn't the callback function on the client's side invoked immediately? Doesn't the function return immediately after the print? [Less]
Posted over 4 years ago by joe
I have a weird problem when trying to generate a pdf using PDFkit2 on Python 3.8 using Celery + RabbitMQ and Eventlet all running on Windows Server 2019. If I call an async function, e.g. gen_pdf.delay('source', 'target'), and the source ... [More] html table contains more than 99 rows, the wkhtmltopdf portion of PDFKit2 just hangs there forever, no error, no clue. If i call the function directly, e.g. gen_pdf('source', 'target'), from the calling app, it can do unlimited rows using the same html source file. I've tried everything to resolve this, the reason i need it to run asynchronously is that there are hundreds of reports that need to be iterated through and the main thread gets bogged down. Any help would be greatly appreciated as I've scoured the net to no avail. I've included a sample async task for review. import pdfkit from celery import Celery async_task = Celery('async_task') async_task.config_from_object('async.celeryconfig') @async_task.task(name='gen_pdf') def gen_pdf(source, target): folder = 'some path' options = { 'allow': 'F:\\inetpub\\root\\static\\', 'header-html': 'F:\\inetpub\\root\\static\\html\\pdf_header.html', 'footer-html': 'F:\\inetpub\\root\\static\\html\\pdf_footer.html', 'encoding': "UTF-8", 'page-size': 'Letter', 'orientation': 'Landscape', 'margin-top': '0.75in', 'margin-right': '0.5in', 'margin-bottom': '0.5in', 'margin-left': '0.5in', 'footer-right': '[page] of [topage]', } # converting html to PDF pdfkit.from_file(f'{folder}{source}', f'{folder}{target}', options=options) the celeryconfig BROKER_URL = 'amqp://user:password@localhost:5672//' BROKER_HEARTBEAT = 0 BROKER_POOL_LIMIT = None BROKER_TRANSPORT_OPTIONS = {'confirm_publish': True} BROKER_CONNECTION_TIMEOUT = 30 BROKER_CONNECTION_RETRY = True BROKER_CONNECTION_MAX_RETRIES = 100 CELERY_RESULT_BACKEND = 'db+sqlite:///async_tasks.db' CELERY_TRACK_STARTED = True CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_CREATE_MISSING_QUEUES = True and the startup_script celery worker --purge -A async_task -P eventlet -l INFO -n "PDF" -E --autoscale=25,5 [Less]
Posted over 4 years ago by ZhouW
On Python 3.8.5 I am using eventlet 0.30.0 and trying to run the following: import eventlet eventlet.monkey_patch() from multiprocessing.dummy import Pool as ThreadPool pool = ThreadPool(10) # never starts If however I do not call ... [More] eventlet.monkey_patch(), ThreadPool() starts fine. What is then the recommended way to do multiprocessing if using eventlet.monkey_patch()? [Less]
Posted over 4 years ago by babak abadkheir
I have this configuration on my flask project : in requrement.txt: flask_socketio eventlet in my app.py : from flask_socketio import SocketIO socketio = SocketIO(app, async_mode="eventlet") @socketio.on('register') async def ... [More] register(data): await my_asyncfunction(data['machine_id']) and my_asyncfunction: async def my_asyncfunction(data): .... and this is my error: /usr/local/lib/python3.7/threading.py:870: RuntimeWarning: coroutine 'my_asyncfunction' was never awaited self._target(*self._args, **self._kwargs)RuntimeWarning: Enable tracemalloc to get the object allocation traceback so I tried the option async_mode="eventlet" but same error :( [Less]
Posted over 4 years ago by John Targaryen
Currently, I am trying to add a signal handler that correctly updates the database state whenever our server is re-started/updated by listening for SIGTERM interrupts. Our Flask app is on a Gunicorn server monkey-patched with eventlet. ... [More] However, whenever the restart happens, we get the error from eventlet 'do not call blocking functions from the mainloop'. How do we get around this issue? Thank you! [Less]
Posted over 4 years ago by user14759488
I am trying to get with a qt websocket client data from my websocket server (python, eventlet). The thing is: I can send messages to my server - but when my server sends data back, my qt application does no recognize any message. I tried ... [More] to connect the signal / slot by following commands, but no command did call the "receiveMsg" function: qt application: #include #include QString message; QWebSocket m_ws_msg; void MainWindow::receiveMsg(QString message){ // this function will never be called } MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent) , ui(new Ui::MainWindow) { connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg); auto ws_opened = [this]() { if(m_ws_msg.isValid()){ qDebug() << "Send text to server: " << message; m_ws_msg.sendTextMessage(message); m_ws_msg.close(QWebSocketProtocol::CloseCodeNormal,"Operation complete - closed by client"); } else { qDebug() << "Websocket is NOT valid" ; m_ws_msg.close(QWebSocketProtocol::CloseCodeAbnormalDisconnection,"Operation FAILED - closed"); } }; connect(&m_ws_msg, &QWebSocket::connected, ws_opened); message = "test"; m_ws_msg.open(ws_url); // send message to python webserver } Has someone any idea why I don't receive any data? Python websocket server: # main.py import eventlet import os from eventlet import wsgi, websocket @websocket.WebSocketWSGI def handle(ws): msg = ws.wait() if(msg != None): print('Message received: {}'.format(msg)) msg = "ECHO BACK" ws.send(msg) def dispatch(environ, start_response): global execTimer if environ['PATH_INFO'] == '/message': print('PATH_INFO == \'/message\'') return handle(environ, start_response) if __name__ == '__main__': listener = eventlet.listen(('127.0.0.1', 7000)) print('\nVisit http://localhost:7000/ in your websocket-capable browser.\n') wsgi.server(listener, dispatch) [Less]
Posted over 4 years ago by user14759488
I am trying to get with a qt websocket client data from my websocket server (python, eventlet). The thing is: I can send messages to my server - but when my server sends data back, my qt application does no recognize any message. I tried ... [More] to connect the signal / slot by following commands, but no command did call the "receiveMsg" function: connect(&m_ws_msg, &QWebSocket::binaryMessageReceived, this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::binaryFrameReceived , this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::textFrameReceived , this, &MainWindow::receiveMsg); // not working Has someone any idea why I don't receive any data? Python websocket server: # main.py import sqlite3 import json import eventlet import os import struct from eventlet import wsgi, websocket, greenthread import ctypes def str2bin(input): txt = json.dumps(input) bin_data = ctypes.create_string_buffer(len(txt)) pos = 0 for c in txt: struct.pack_into('c', bin_data, pos, c.encode('ascii')) pos = pos + 1 return bin_data @websocket.WebSocketWSGI def handle(ws): msg = ws.wait() if(msg != None): print('Message received: {}'.format(msg)) msg = "ECHO BACK" bin_data = str2bin(msg) ws.send(bin_data) def dispatch(environ, start_response): """ WEBSOCKETS """ global execTimer if environ['PATH_INFO'] == '/data': print('PATH_INFO == \'/data\'') return saveData(environ, start_response) elif environ['PATH_INFO'] == '/message': print('PATH_INFO == \'/message\'') return handle(environ, start_response) if __name__ == '__main__': listener = eventlet.listen(('127.0.0.1', 7000)) print('\nVisit http://localhost:7000/ in your websocket-capable browser.\n') wsgi.server(listener, dispatch) My qt application: MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent) , ui(new Ui::MainWindow) { ui->setupUi(this); message = "connect"; /* SEND MESSAGE */ connect(&m_ws_msg, &QWebSocket::disconnected, [] { qDebug() << "m_ws_msg disconnected() called";}); connect(&m_ws_msg, &QWebSocket::binaryMessageReceived, this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::binaryFrameReceived , this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::textMessageReceived , this, &MainWindow::receiveMsg); // not working //connect(&m_ws_msg, &QWebSocket::textFrameReceived , this, &MainWindow::receiveMsg); // not working auto ws_opened = [this]() { if(m_ws_msg.isValid()){ qDebug() << "Send text to server: " << message; m_ws_msg.sendTextMessage(message); m_ws_msg.close(QWebSocketProtocol::CloseCodeNormal,"Operation complete - closed by client"); } else { qDebug() << "Websocket is NOT valid" ; m_ws_msg.close(QWebSocketProtocol::CloseCodeAbnormalDisconnection,"Operation FAILED - closed"); } }; connect(&m_ws_msg, &QWebSocket::connected, ws_opened); } // ... void MainWindow::receiveMsg(QString message){ messageCounter++; } [Less]
Posted over 4 years ago by firebush
Why am I getting the following import error when I try to import eventlet's SSL module: ModuleNotFoundError: No module named 'OpenSSL.tsafe' Is eventlet's OpenSSL not compatible with recent versions of pyOpenSSL? Reproduction Steps ... [More] Using the following Pipenv: [[source]] name = "pypi" url = "https://pypi.org/simple" [packages] pyOpenSSL = "*" eventlet = "*" [requires] python_version = "3" Create a pipenv using that file: $ pipenv install Creating a virtualenv for this project… Pipfile: /tmp/Pipfile Using /usr/local/bin/python3.8 (3.8.3) to create virtualenv… ... Successfully created virtual environment! ... Now import eventlet.green.OpenSSL.SSL: $ pipenv run python Python 3.8.3 (default, Jun 29 2020, 18:02:49) [GCC 8.3.1 20190311 (Red Hat 8.3.1-3)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> from eventlet.green.OpenSSL import SSL Traceback (most recent call last): File "", line 1, in File "/home/myuser/.local/share/virtualenvs/tmp-XVr6zr33/lib/python3.8/site-packages/eventlet/green/OpenSSL/__init__.py", line 3, in from . import tsafe File "/home/myuser/.local/share/virtualenvs/tmp-XVr6zr33/lib/python3.8/site-packages/eventlet/green/OpenSSL/tsafe.py", line 1, in from OpenSSL.tsafe import * ModuleNotFoundError: No module named 'OpenSSL.tsafe' >>> [Less]
Posted over 4 years ago by Яша Проценко
I am writing a website for flask. There is a need to use Apsheduler for periodic task execution. But the problem is that the task is not executed periodically due to a conflict with eventlet. On the forums, I found that you need to wrap ... [More] the blocking construct in eventlet.tpool.execute(). Tell me what exactly do I need to wrap? There is a class for working with tasks. from app import scheduler import json class SchedulerTask(object): schedulers_list_publish = None def __init__(self): self.schedulers_list_publish = list() def add_scheduler_publish(self, dev_id, mqtt, topic_req_res, m_req_state, qos_req, timer): id_sch = dev_id + "_scheduler" sc = scheduler.add_job(self.publish_async, args=[mqtt, topic_req_res, m_req_state, qos_req, timer, id_sch], id=id_sch, trigger='interval', seconds=timer) print(sc.id) self.schedulers_list_publish.append(id_sch) return id_sch def start_schedulers(self): print(self.schedulers_list_publish) scheduler.start() @staticmethod def del_schedulers(s_id): scheduler.remove_job(s_id) # передача запроса на получение данных @staticmethod def publish_async(mqtt, topic_req_res, m_req_state, qos_req, timer, id_sch): try: msg = json.dumps(m_req_state) mqtt.publish(topic_req_res, msg, qos_req) except Exception as ex: print("Error publish: " + str(ex)) I call from a function from another module: def _handle_connect(self, client, userdata, flags, rc): code_list = list() for dev in self.devices: if dev.device_code not in code_list: # запущен ли уже поток с таким кодом code_list.append(dev.device_code) mqtt.subscribe("BK" + dev.device_code + self.type_topic[1], self.qos_sub) self.schedulers_list.append(tpool.execute(sch_task.add_scheduler_publish, dev.device_code, mqtt, "BK" + dev.device_code + self.type_topic[0], self.m_request_state, self.qos_request, self.POOL_TIME)) sch_task.start_schedulers() [Less]