This is a script that creates a base dataframe from a sqlite database, adds data to it (also from SQLite), cleanse it and formats it all to an Excel file. I feel like it is incredibly verbose and my code could be simplified quite a bit. One thing I definitely want to do is eliminate the direct references to the db, /Users/meter/code/mm/vid_score/test.db
so that this script can be run on any machine.
Any thoughts on simple changes I can make to decrease the complexity and amount of code?
import pandas as pd
import numpy as np
import sqlite3 as db
from contextlib import closing
from datetime import date, timedelta
ONE = date.today() - timedelta(1)
TWO = date.today() - timedelta(2)
SEVEN = date.today() - timedelta(8)
def video_base():
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM video_played;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df_one = df.loc[df['time'] == ONE]
df_two = df.loc[df['time'] == TWO]
df_seven = df.loc[df['time'] == SEVEN]
df_one = df_one.groupby('video_id').agg({"ios_id": {"count_watched": np.count_nonzero,
"unique_watched": pd.Series.nunique},
"feed_position": {"feed_position": np.average},
"time_watched": {"time_watched": np.sum},
"video_length": {"video_length": np.sum}})
df_one.columns = df_one.columns.droplevel(0)
df_one['avg_time_watched'] = df_one['time_watched'] / df_one['video_length']
df_two = df_two.groupby('video_id').agg({"ios_id": {"count_watched": np.count_nonzero,
"unique_watched": pd.Series.nunique},
"feed_position": {"feed_position": np.average},
"time_watched": {"time_watched": np.sum},
"video_length": {"video_length": np.sum}})
df_two.columns = df_two.columns.droplevel(0)
df_two = df_two.rename(columns={'count_watched': 'count_watched_yeterday'})
df_seven = df_seven.groupby('video_id').agg({"ios_id": {"count_watched": np.count_nonzero,
"unique_watched": pd.Series.nunique},
"feed_position": {"feed_position": np.average},
"time_watched": {"time_watched": np.sum},
"video_length": {"video_length": np.sum}})
df_seven.columns = df_seven.columns.droplevel(0)
df_seven = df_seven.rename(columns={'count_watched': 'count_watched_seven'})
video_base = pd.merge(df_one, df_two[['count_watched_yeterday']],
how='left', left_index=True, right_index=True)
video_base = pd.merge(video_base, df_seven[['count_watched_seven']],
how='left', left_index=True, right_index=True)
return video_base
def video_intent(video_base):
# ITEM INFO #
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM item_info_click;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df.groupby('video_id').agg({"ios_id": {"count_clicks": np.count_nonzero,
"unique_clicks": pd.Series.nunique},
"feed_position": {"feed_position": np.average}})
df.columns = df.columns.droplevel(0)
video_base['item_info_clicks'] = df['count_clicks']
# FAVED #
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM faved;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df.groupby('video_id').agg({"ios_id": {"count_faves": np.count_nonzero,
"unique_clicks": pd.Series.nunique},
"feed_position": {"feed_position": np.average}})
df.columns = df.columns.droplevel(0)
video_base['faves'] = df['count_faves']
# REPLAYS #
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM replay;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df.groupby('video_id').agg({"ios_id": {"replays": np.count_nonzero,
"unique_clicks": pd.Series.nunique},
"feed_position": {"feed_position": np.average}})
df.columns = df.columns.droplevel(0)
video_base['replays'] = df['replays']
# ADD TO CART #
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM add_to_cart;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df.groupby('video_id').agg({"ios_id": {"add_to_cart": np.count_nonzero,
"unique_clicks": pd.Series.nunique},
"feed_position": {"feed_position": np.average}})
df.columns = df.columns.droplevel(0)
video_base['add_to_cart'] = df['add_to_cart']
# CAROUSEL #
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM carousel;", con)
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df.groupby('video_id').agg({"ios_id": {"carousel": np.count_nonzero,
"unique_clicks": pd.Series.nunique},
"feed_position": {"feed_position": np.average}})
df.columns = df.columns.droplevel(0)
video_base['carousel'] = df['carousel']
return video_base
def cleanup(video_raw):
video_raw = video_raw.sort('count_watched', ascending=False)
video_raw['percent_yesterday'] = (video_raw['count_watched'] - video_raw['count_watched_yeterday']) / video_raw[
'count_watched_yeterday']
video_raw['percent_seven'] = (video_raw['count_watched'] - video_raw['count_watched_seven']) / video_raw[
'count_watched_seven']
new_cols = ['count_watched', 'percent_yesterday', 'percent_seven',
'unique_watched', 'avg_time_watched', 'feed_position',
'replays', 'item_info_clicks', 'faves', 'add_to_cart', 'carousel']
video_clean = video_raw[new_cols]
video_clean = video_clean.rename(columns={'count_watched': 'Video Plays',
'percent_yesterday': '% Change from Yest.',
'percent_seven': '% Change from Last Week',
'unique_watched': 'Unique Watchers',
'avg_time_watched': 'Avg. Time Watched',
'feed_position': 'Avg. Feed Position',
'replays': 'Replays',
'item_info_clicks': 'Item Info Click',
'faves': 'Faved',
'add_to_cart': 'Add To Cart',
'carousel': 'Carousel'})
return video_clean
def feed_position():
with closing(db.connect('/Users/meter/code/mm/vid_score/test.db')) as con:
df = pd.read_sql_query("SELECT * FROM video_played;", con)
print df
df['time'] = pd.to_datetime(df['time'], unit='s').dt.date
df = df.drop('index', axis=1)
df = df.loc[df['time'] == ONE]
df = df[['ios_id', 'video_id', 'feed_position']]
feed_data = df.pivot_table(index=['video_id'],
columns=['feed_position'],
values=['ios_id'],
aggfunc=np.count_nonzero)
feed_data.columns = feed_data.columns.droplevel(0)
return feed_data
def to_excel(video_report, feed):
# Create a Pandas Excel writer using XlsxWriter as the engine.
writer = pd.ExcelWriter('daily_report.xlsx', engine='xlsxwriter')
# Convert the dataframe to an XlsxWriter Excel object.
video_report.to_excel(writer, sheet_name='Video Overview', na_rep="-")
# Get the xlsxwriter objects from the dataframe writer object.
workbook = writer.book
worksheet = writer.sheets['Video Overview']
# Add some cell formats.
integer = workbook.add_format({'num_format': '0', 'align': 'center'})
decimal = workbook.add_format({'num_format': '0.00', 'align': 'center'})
percentage = workbook.add_format({'num_format': '0.0%', 'align': 'center'})
zebra = workbook.add_format({'bold': True})
worksheet.set_column('B:B', 13, integer)
worksheet.set_column('C:C', 17, percentage)
worksheet.set_column('D:D', 19, percentage)
worksheet.set_column('E:E', 15, integer)
worksheet.set_column('F:F', 15, percentage)
worksheet.set_column('G:G', 15, decimal)
worksheet.set_column('H:H', 13, integer)
worksheet.set_column('I:I', 13, integer)
worksheet.set_column('J:J', 13, integer)
worksheet.set_column('K:K', 13, integer)
worksheet.set_column('L:L', 13, integer)
worksheet.set_row(3, 20, zebra)
feed.to_excel(writer, sheet_name='Feed Position', na_rep="-")
workbook1 = writer.book
worksheet1 = writer.sheets['Feed Position']
integer = workbook1.add_format({'num_format': '0', 'align': 'center'})
worksheet1.set_column('B:HU', 4, integer)
writer.save()
def main():
video_data = video_base()
intent_added = video_intent(video_data)
cleaned = cleanup(intent_added)
feed_data = feed_position()
to_excel(cleaned, feed_data)
if __name__ == "__main__":
main()