I'm trying to create data feed script for real time bitcoin data for OkCoin exchange using their websocket API (documentation) and a database manager for storing.
There are two channels I am interested in: trades and market depth. Once the socket is opened I subscribe to both of them so I have two types of messages coming in possibly spaced by a fraction of a second but not at the exact same time since there is only one websocket connection opened.
I am running Python version 3.3 and have used websocket and SQLite3. The way I have designed my code each time a message arrives and on_message
is fired it writes to database.
Now suppose a frequent scenario when a message is instantly followed by another message (few milliseconds) and before first call of on_message
is completed second call is requested. Shouldn't some sort of jam occur sooner or later? What confuses me is that I can run the following script for several hours and everything seems to work. On the other hand I cannot assure myself that all the data received was stored. Is there some sort of internal queuing system for on_message
calls in websocket
package that handles this automatically? What would be the best way to implement what I'm trying to achieve and which Python modules should I use to do that?
import websocket
import ast
import json
import sqlite3
class DataFeed():
def __init__(self):
#create database
file = "myDB.db"
self.path = "C:/Users/Uporabnik/Desktop/OkCoin/data/"+file
self.db = sqlite3.connect(self.path)
self.cursor = self.db.cursor()
self.cursor.execute(
"""CREATE TABLE ok_btcusd_depth(
bid REAL,
bid_amount REAL,
ask REAL,
ask_amount REAL)
""")
self.cursor.execute(
"""CREATE TABLE ok_btcusd_trades(
price REAL,
amount REAL,
time TEXT,
type TEXT)
""")
self.db.commit()
#connect to websocket
url = "wss://real.okcoin.com:10440/websocket/okcoinapi"
self.ws = websocket.WebSocketApp(url,
on_message=self.on_message,
on_error=self.on_error
)
self.ws.on_open = self.on_open
self.ws.run_forever()
def on_message(self,ws,msg):
msg = ast.literal_eval(msg) #convert string to list
table = msg[0]["channel"]
data = msg[0]["data"]
if table == "ok_btcusd_depth":
bid_info = data["bids"][0]
ask_info = data["asks"][0]
Tuple = (bid_info[0],bid_info[1],ask_info[0],ask_info[1])
self.cursor.execute(
"""INSERT INTO ok_btcusd_depth(
bid,bid_amount,ask,ask_amount)
VALUES(?,?,?,?)""",Tuple )
else:
for trade in data:
Tuple = (float(trade[0]),float(trade[1]),trade[2],trade[3])
self.cursor.execute(
"""INSERT INTO ok_btcusd_trades(price,amount,time,type)
VALUES(?,?,?,?)""",Tuple)
self.db.commit()
def on_error(self,ws,error):
print(error)
def on_open(self,ws):
request = [{"event":"addChannel","channel":"ok_btcusd_depth"},
{"event":"addChannel","channel":"ok_btcusd_trades"}]
request = json.dumps(request)
request = request.encode("utf-8")
ws.send(request)
if __name__ == "__main__":
DataFeed()