1
\$\begingroup\$

I have the following working code for a generic WebSocket component:

import json
import aiohttp
import asyncio
import gzip

import asyncio
from threading import Thread

class WebSocket:
    KEEPALIVE_INTERVAL_S = 10

    def __init__(self, url, on_connect, on_msg):
        self.url = url
        self.on_connect = on_connect
        self.on_msg = on_msg

        self.streams = {}
        self.worker_thread = Thread(name='WebSocket', target=self.thread_func, daemon=True).start()

    def thread_func(self):
        asyncio.run(self.aio_run())

    async def aio_run(self):
        async with aiohttp.ClientSession() as session:

            self.ws = await session.ws_connect(self.url)

            await self.on_connect(self)

            async def ping():
                while True:
                    print('KEEPALIVE')
                    await self.ws.ping()
                    await asyncio.sleep(WebSocket.KEEPALIVE_INTERVAL_S)

            async def main_loop():
                async for msg in self.ws:
                    def extract_data(msg):
                        if msg.type == aiohttp.WSMsgType.BINARY:
                            as_bytes = gzip.decompress(msg.data)
                            as_string = as_bytes.decode('utf8')
                            as_json = json.loads(as_string)
                            return as_json

                        elif msg.type == aiohttp.WSMsgType.TEXT:
                            return json.loads(msg.data)

                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            print('⛔️ aiohttp.WSMsgType.ERROR')

                        return msg.data

                    data = extract_data(msg)

                    self.on_msg(data)

            # May want this approach if we want to handle graceful shutdown
            # W.task_ping = asyncio.create_task(ping())
            # W.task_main_loop = asyncio.create_task(main_loop())

            await asyncio.gather(
                ping(),
                main_loop()
            )

    async def send_json(self, J):
        await self.ws.send_json(J)

I'm creating a daemon thread, from which I invoke asyncio.run(...), which allows me to leverage asyncio functionality while allowing the component to be executed from non-asyncio code.

Here's an example consumer of the component:

class WS_HotBit:
    URL = 'wss://ws.hotbit.io'
    KEY = None

    def __init__(self, on_trades):

        def on_msg(data):
            print(data)

            if set(data.keys()) == set(['error', 'result', 'id']):
                print(data)

            elif set(data.keys()) == set(['method', 'id', 'ts', 'params']):
                stream = data['method']
                id_ = data['id']
                timestamp = data['ts']
                payload = data['params']

                # payload:  ['ETHUSDT', [{'id': 2304578176, 'time': 1622984716.977204, 'price': '2680', 'amount': '0.00984', 'type': 'sell'},...], ...]

                if len(payload) != 2:
                    print('⚠️ Unknown WebSocket packet format')
                    print(payload)

                else:
                    symbol, trades = payload
                    print(f'id: {id_}', stream, timestamp, symbol, f'got {len(trades)} trades')

                    on_trades(symbol, trades)

            else:
                print('⚠️ Unknown packet from WebSocket')
                print(data)

        async def on_connect(ws):
            await ws.send_json({'method': 'deals.subscribe', 'params': ['ETHUSDT', 'BTCUSDT'], 'id': 1})

        self.websocket = WebSocket(WS_HotBit.URL, on_connect, on_msg)

And this code tests the consumer:

from time import sleep

if __name__ == '__main__':

    def on_trades(symbol, trades):
        print(symbol, [tr['amount'] for tr in trades])

    ws_hotbit = WS_HotBit(on_trades)

    while True:
        sleep(3)
        print('Main TICK')

The consumer is able to send packets into the websocket, and thus subscribe to streams, albeit with the limitation that this is done all in one place, as I'm using a 'did-connect' callback. I can't see a clean way of bypassing this limitation (and allowing sending packets from non-asyncio code at any time.

I'm also aware that there is no correct destruction sequence.

This is my first foray into asycnio, and my feeling is that it could be cleaner.

Forever grateful if an engineer with more experience/expertise would care to point out the biggest flashing red lights.

\$\endgroup\$

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Browse other questions tagged or ask your own question.