I wrote the script below to load data obtained from the twitter JSON archive on archive.org into a PostgreSQL database.
I'm looking for optimizations in the code. It currently runs at ~1.7 seconds per file (of 50,000 files), or loads 3 million rows in an hour.
Would I be looking at multithreading? The first run took approximately 12 hours (see profiler below).
Also, my setup is as follows:
- Python 2.7, 32-bit, Windows 8
- PostgreSQL running on an external USB 3.0 hard drive
- TAR files are on that same USB hard drive
"""
Read the output of an extracted TAR twitter archive from:
https://archive.bk21.net/details/twitterstream
"""
import bz2
import datetime
import json
import os
import profile
import psycopg2
from pprint import pprint
with open("postgresConnecString.txt", 'r') as f:
DB_CONNECTIONSTRING = f.readline()
conn = psycopg2.connect(DB_CONNECTIONSTRING)
CACHE_DIR = "H:/Twitter datastream/PYTHONCACHE"
def load_bz2_json(filename):
""" Takes a bz2 filename, returns the tweets as a list of tweet dictionaries"""
with open(filename, 'rb') as f:
s = f.read()
lines = bz2.decompress(s).split("\n")
tweets = []
for line in lines:
try:
if line == "":
num_lines -= 1
continue
tweets.append(json.loads(line))
except: # I'm kind of lenient as I have millions of tweets, most errors were due to encoding or so)
continue
return tweets
def load_tweet(tweet, tweets_saved):
"""Takes a tweet (dictionary) and upserts its contents to a PostgreSQL database"""
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
except KeyError:
return tweets_saved
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datetime.now(),
'tweet_json': json.dumps(tweet)}
cur = conn.cursor()
try:
cur.execute("""INSERT INTO tweets (tweet_id, tweet_text, tweet_locale, created_at_str, date_loaded, tweet_json)
VALUES (%s, %s, %s, %s, %s, %s);""", (data['tweet_id'], data['tweet_text'], data['tweet_locale'],
data['created_at_str'], data['date_loaded'], data['tweet_json']))
except: # Kind of lenient for errors, here again.
return tweets_saved
finally:
cur.close()
tweets_saved += 1
return tweets_saved
def handle_file(filename, retry=False):
"""Takes a filename, loads all tweets into a PostgreSQL database"""
tweets = load_bz2_json(filename)
tweets_saved = 0
for tweet in tweets:
tweets_saved = load_tweet(tweet, tweets_saved) # Extracts proper items and places them in database
conn.commit()
return True
def main():
files_processed = 0
for root, dirs, files in os.walk(CACHE_DIR):
for file in files:
files_processed +=1
filename = os.path.join(root, file)
#print(file)
print('Starting work on file ' + str(files_processed) + '): ' + filename)
handle_file(filename)
if __name__ == "__main__":
pprint('Starting work!')
profile.run('main()')
conn.close()
else: # If running interactively in interpreter (Pycharm):
filename = r"H:\Twitter datastream\PYTHONCACHE\2013\01\01\00\00.json.bz2"
I retried the profiler with less files (1,000); the following is the output:
ncalls lineno(function)tottime %oftime filename
2870689 0(execute) 916,953 40,77%
1000 0(decompress) 511,18 22,73%
3245379 372(raw_decode) 231,57 10,30% decoder.py
2870689 212(iterencode) 123,231 5,48% encoder.py
1000 0(read) 74,577 3,32%
1000 0(commit) 66,478 2,96%
1000 0(open) 53,977 2,40%
3245379 39(load_tweet) 43,261 1,92% pyTwitter.py
1098 0(_isdir) 31,037 1,38%
3245379 361(decode) 21,016 0,93% decoder.py
2870690 0(join) 20,995 0,93%
2870689 186(encode) 19,343 0,86% encoder.py
2870689 0(cursor) 18,513 0,82%
2870689 193(dumps) 16,998 0,76% __init__.py
2870689 0(now) 16,072 0,71%
1 77(main) 14,142 0,63% pyTwitter.py
1000 68(handle_file) 11,296 0,50% pyTwitter.py
1000 21(load_bz2_json) 9,872 0,44% pyTwitter.py
6490758 0(match) 8,277 0,37%
My first performance edit:
- Create a cursor once per file
- conn.commit() once per 10 files.
- Write one insert statement per file (following this post
This is the profiler result for a full run on one extracted TAR file. The full code is available at github
# ncalls tottime percall cumtime percall filename:lineno(function)
# 39377 17316.266 0.440 17316.266 0.440 :0(decompress)
# 130997418 9288.108 0.000 9288.108 0.000 decoder.py:372(raw_decode)
# 39214 5283.592 0.135 5283.592 0.135 :0(execute)
# 46834100 1622.474 0.000 1622.474 0.000 encoder.py:212(iterencode)
# 46834100 904.214 0.000 904.216 0.000 :0(mogrify)
# 130997418 854.491 0.000 10779.594 0.000 decoder.py:361(decode)
# 39377 725.356 0.018 725.356 0.018 :0(open)
# 39377 600.551 0.015 600.551 0.015 :0(read)
# 1 560.083 560.083 41120.377 41120.377 pyTwitter.py:78(main)
# 130997418 546.896 0.000 3179.022 0.000 pyTwitter.py:39(load_tweet)
# 46873315/46873314 407.746 0.000 1384.412 0.000 :0(join)
# 39377 400.072 0.010 40491.833 1.028 pyTwitter.py:62(handle_file)
# 39377 399.955 0.010 30489.249 0.774 pyTwitter.py:21(load_bz2_json)
# 261994836 338.315 0.000 338.315 0.000 :0(match)
# 130997418 308.528 0.000 11088.122 0.000 __init__.py:293(loads)
# 46834100 261.071 0.000 2277.501 0.000 encoder.py:186(encode)
# 39215 256.085 0.007 256.085 0.007 :0(split)
# 46834100 227.774 0.000 2505.275 0.000 __init__.py:193(dumps)
# 261994836 208.695 0.000 208.695 0.000 :0(end)
# 177871780 165.239 0.000 165.239 0.000 :0(append)
# 46834100 126.850 0.000 126.850 0.000 :0(now)
# 131197028 90.289 0.000 90.289 0.000 :0(len)
# 93668202 87.131 0.000 87.131 0.000 :0(isinstance)
# 46873314 72.450 0.000 976.665 0.000 pyTwitter.py:74(<genexpr>)
# 723 27.384 0.038 27.384 0.038 :0(listdir)
# 40099 25.166 0.001 25.166 0.001 :0(_isdir)
# 392 12.197 0.031 12.197 0.031 :0(commit)
ujson
for JSON handling, it's a drop-in replacement for thejson
module. – ferada Feb 4 at 13:54