Skip to content

Commit

Permalink
Merge pull request #38 from ShorensteinCenter/devel
Browse files Browse the repository at this point in the history
[Feature, N/A] Take snapshots of list analyses
  • Loading branch information
williamhakim10 authored Jan 15, 2019
2 parents 6c5872f + 270df65 commit fcbc439
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 361 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ benchmarks-env/
*.dmg
.coverage
.DS_Store
scratch/
31 changes: 21 additions & 10 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# Association table for many-to-many relationship between lists and users
list_users = db.Table( # pylint: disable=invalid-name
'list_users',
db.Column('list_id', db.String(64), db.ForeignKey('list_stats.list_id'),
db.Column('list_id', db.String(64), db.ForeignKey('email_list.list_id'),
primary_key=True),
db.Column('user_id', db.Integer, db.ForeignKey('app_user.id'),
primary_key=True))
Expand All @@ -31,13 +31,9 @@ def __repr__(self):
return '<AppUser {}>'.format(self.id)

class ListStats(db.Model): # pylint: disable=too-few-public-methods
"""Stores individual MailChimp lists and their associated stats."""
list_id = db.Column(db.String(64), primary_key=True)
list_name = db.Column(db.String(128))
org_id = db.Column(db.Integer, db.ForeignKey('organization.id',
name='fk_org_id'))
api_key = db.Column(db.String(64))
data_center = db.Column(db.String(64))
"""Stores stats associated with a MailChimp list."""
id = db.Column(db.Integer, primary_key=True)
analysis_timestamp = db.Column(db.DateTime, default=datetime.utcnow)
frequency = db.Column(db.Float)
subscribers = db.Column(db.Integer)
open_rate = db.Column(db.Float)
Expand All @@ -48,13 +44,28 @@ class ListStats(db.Model): # pylint: disable=too-few-public-methods
pending_pct = db.Column(db.Float)
high_open_rt_pct = db.Column(db.Float)
cur_yr_inactive_pct = db.Column(db.Float)
list_id = db.Column(db.String(64), db.ForeignKey('email_list.list_id',
name='fk_list_id'))

def __repr__(self):
return '<ListStats {}>'.format(self.id)

class EmailList(db.Model): # pylint: disable=too-few-public-methods
"""Stores individual MailChimp lists."""
list_id = db.Column(db.String(64), primary_key=True)
list_name = db.Column(db.String(128))
api_key = db.Column(db.String(64))
data_center = db.Column(db.String(64))
store_aggregates = db.Column(db.Boolean)
monthly_updates = db.Column(db.Boolean)
monthly_update_users = db.relationship(
AppUser, secondary=list_users, backref='lists', lazy='subquery')
org_id = db.Column(db.Integer, db.ForeignKey('organization.id',
name='fk_org_id'))
analyses = db.relationship(ListStats, backref='list')

def __repr__(self):
return '<ListStats {}>'.format(self.list_id)
return '<EmailList {}>'.format(self.list_id)

class Organization(db.Model): # pylint: disable=too-few-public-methods
"""Stores a media or journalism organization."""
Expand All @@ -67,7 +78,7 @@ class Organization(db.Model): # pylint: disable=too-few-public-methods
employee_range = db.Column(db.String(32))
budget = db.Column(db.String(64))
affiliations = db.Column(db.String(512))
lists = db.relationship(ListStats, backref='org')
lists = db.relationship(EmailList, backref='org')
users = db.relationship(AppUser, secondary=users, backref='orgs')

def __repr__(self):
Expand Down
190 changes: 120 additions & 70 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""This module contains Celery tasks and functions associated with them."""
import os
import json
import random
import time
from datetime import datetime, timedelta, timezone
import requests
import numpy as np
from sqlalchemy import desc
from sqlalchemy.sql.functions import func
from celery.utils.log import get_task_logger
from app import celery, db
from app.emails import send_email
from app.lists import MailChimpList, MailChimpImportError, do_async_import
from app.models import ListStats
from app.models import EmailList, ListStats
from app.dbops import associate_user_with_list
from app.visualizations import (
draw_bar, draw_stacked_horizontal_bar, draw_histogram, draw_donuts)
Expand Down Expand Up @@ -81,12 +82,8 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
mailing_list.calc_high_open_rate_pct()
mailing_list.calc_cur_yr_stats()

# Create a list object
# Create a set of stats
list_stats = ListStats(
list_id=list_data['list_id'],
list_name=list_data['list_name'],
api_key=list_data['key'],
data_center=list_data['data_center'],
frequency=mailing_list.frequency,
subscribers=mailing_list.subscribers,
open_rate=mailing_list.open_rate,
Expand All @@ -97,13 +94,23 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
pending_pct=mailing_list.pending_pct,
high_open_rt_pct=mailing_list.high_open_rt_pct,
cur_yr_inactive_pct=mailing_list.cur_yr_inactive_pct,
store_aggregates=list_data['store_aggregates'],
monthly_updates=list_data['monthly_updates'],
org_id=org_id)
list_id=list_data['list_id'])

# If the user gave their permission, store the object in the database
# If the user gave their permission, store the stats in the database
if list_data['monthly_updates'] or list_data['store_aggregates']:
list_stats = db.session.merge(list_stats)

# Create a list object to go with the set of stats
email_list = EmailList(
list_id=list_data['list_id'],
list_name=list_data['list_name'],
api_key=list_data['key'],
data_center=list_data['data_center'],
store_aggregates=list_data['store_aggregates'],
monthly_updates=list_data['monthly_updates'],
org_id=org_id)
email_list = db.session.merge(email_list)

db.session.add(list_stats)
try:
db.session.commit()
except:
Expand All @@ -113,7 +120,7 @@ def import_analyze_store_list(list_data, org_id, user_email=None):
return list_stats

def send_report(stats, list_id, list_name, user_email_or_emails):
"""Generates charts using Pygal and emails them to the user.
"""Generates charts using Plotly and emails them to the user.
Args:
stats: a dictionary containing analysis results for a list.
Expand All @@ -122,22 +129,30 @@ def send_report(stats, list_id, list_name, user_email_or_emails):
user_email_or_emails: a list of emails to send the report to.
"""

# Generate aggregates for the database
# Only include lists where we have permission
# This subquery generates the most recent stats
# For each unique list_id in the database
# Where store_aggregates is True
subquery = ListStats.query.filter(
ListStats.list.has(store_aggregates=True)).order_by('list_id', desc(
'analysis_timestamp')).distinct(ListStats.list_id).subquery()

# Generate aggregates within the subquery
agg_stats = db.session.query(
func.avg(ListStats.subscribers),
func.avg(ListStats.subscribed_pct),
func.avg(ListStats.unsubscribed_pct),
func.avg(ListStats.cleaned_pct),
func.avg(ListStats.pending_pct),
func.avg(ListStats.open_rate),
func.avg(ListStats.high_open_rt_pct),
func.avg(ListStats.cur_yr_inactive_pct)).filter_by(
store_aggregates=True).first()
func.avg(subquery.columns.subscribers),
func.avg(subquery.columns.subscribed_pct),
func.avg(subquery.columns.unsubscribed_pct),
func.avg(subquery.columns.cleaned_pct),
func.avg(subquery.columns.pending_pct),
func.avg(subquery.columns.open_rate),
func.avg(subquery.columns.high_open_rt_pct),
func.avg(subquery.columns.cur_yr_inactive_pct)).first()

# Make sure we have no 'None' values
agg_stats = [agg if agg else 0 for agg in agg_stats]

# Convert subscribers average to an integer
agg_stats[0] = int(agg_stats[0])

# Generate epoch time (to get around image caching in webmail)
epoch_time = str(int(time.time()))

Expand Down Expand Up @@ -202,7 +217,7 @@ def send_report(stats, list_id, list_name, user_email_or_emails):
os.environ.get('SES_CONFIGURATION_SET') or None))

def extract_stats(list_object):
"""Extracts a stats dictionary from a list object from the database."""
"""Extracts a stats dictionary from a SQLAlchemy ListStats object."""
stats = {'subscribers': list_object.subscribers,
'open_rate': list_object.open_rate,
'hist_bin_counts': json.loads(list_object.hist_bin_counts),
Expand All @@ -218,10 +233,12 @@ def extract_stats(list_object):
def init_list_analysis(user_data, list_data, org_id):
"""Celery task wrapper for each stage of analyzing a list.
First checks if the list stats are cached, i.e. already in the
First checks if there is a recently cached analysis, i.e. already in the
database. If not, calls import_analyze_store_list() to generate
them. Then checks if the user is already associated with the list,
if not, create the relationship. Finally, generates a benchmarking
the ListStats and an associated EmailList. Next updates the user's
privacy options (e.g. store_aggregates, monthly_updates) if the list was
cached. Then checks if the user selected monthly updates, if so,
create the relationship. Finally, generates a benchmarking
report with the stats.
Args:
Expand All @@ -230,60 +247,88 @@ def init_list_analysis(user_data, list_data, org_id):
org_id: the id of the organization associated with the list.
"""

# Try to pull the list stats from database
# Try to pull the most recent ListStats from the database
# Otherwise generate them
list_object = (ListStats.query.filter_by(
list_id=list_data['list_id']).first() or
import_analyze_store_list(
list_data, org_id, user_data['email']))

# Associate the list with the user who requested the analysis
# If that user requested monthly updates
if list_data['monthly_updates']:
associate_user_with_list(user_data['user_id'], list_object)

stats = extract_stats(list_object)
most_recent_analysis = (ListStats.query.filter_by(
list_id=list_data['list_id']).order_by(desc(
'analysis_timestamp')).first() or import_analyze_store_list(
list_data, org_id, user_data['email']))

# If the user chose to store their data, there will be an associated
# EmailList object
list_object = EmailList.query.filter_by(
list_id=list_data['list_id']).first()

if list_object:

# Update the privacy options if they differ from previous selection
if (list_object.monthly_updates != list_data['monthly_updates']
or list_object.store_aggregates != list_data['store_aggregates']):
list_object.monthly_updates = list_data['monthly_updates']
list_object.store_aggregates = list_data['store_aggregates']
list_object = db.session.merge(list_object)
try:
db.session.commit()
except:
db.session.rollback()
raise

# Associate the list with the user who requested the analysis
# If that user requested monthly updates
if list_data['monthly_updates']:
associate_user_with_list(user_data['user_id'], list_object)

# Convert the ListStats object to an easier-to-use dictionary
stats = extract_stats(most_recent_analysis)
send_report(stats, list_data['list_id'],
list_data['list_name'], [user_data['email']])

@celery.task
def update_stored_data():
"""Celery task which goes through the database
and updates calculations using the most recent data.
and generates a new set of calculations for each list older than 30 days.
This task is called by Celery Beat, see the schedule in config.py.
"""

# Get the logger
logger = get_task_logger(__name__)

# Grab what we have in the database
list_objects = ListStats.query.with_entities(
ListStats.list_id, ListStats.list_name, ListStats.org_id,
ListStats.api_key, ListStats.data_center,
ListStats.store_aggregates, ListStats.monthly_updates).all()
# Grab the most recent analyses in the database
list_analyses = ListStats.query.order_by(
'list_id', desc('analysis_timestamp')).distinct(
ListStats.list_id).all()

if not list_objects:
logger.info('No lists to update!')
if not list_analyses:
logger.warning('No lists in the database!')
return

# Create a list of analyses which are more than 30 days old
now = datetime.now(timezone.utc)
one_month_ago = now - timedelta(days=30)
analyses_to_update = [
analysis for analysis in list_analyses
if (analysis.analysis_timestamp.replace(
tzinfo=timezone.utc)) < one_month_ago]

if not analyses_to_update:
logger.info('No old lists to update!')
return

# Placeholder for lists which failed during the update process
failed_updates = []

# Update 1/30th of the lists in the database (such that every list
# is updated about once per month, on average).
lists_to_update = random.sample(
list_objects, len(list_objects) // 31 if len(list_objects) // 31 else 1)

# Update each list's calculations in sequence
for list_to_update in lists_to_update:
for analysis in analyses_to_update:

logger.info('Updating list %s!', analysis.list_id)

logger.info('Updating list %s!', list_to_update.list_id)
# Get the list object associated with the analysis
associated_list_object = analysis.list

# Pull information about the list from the API
# This may have changed since we originally pulled the list data
request_uri = ('https://{}.api.mailchimp.com/3.0/lists/{}'.format(
list_to_update.data_center, list_to_update.list_id))
associated_list_object.data_center,
associated_list_object.list_id))
params = (
('fields', 'stats.member_count,'
'stats.unsubscribe_count,'
Expand All @@ -294,31 +339,31 @@ def update_stored_data():
)
response = requests.get(
request_uri, params=params,
auth=('shorenstein', list_to_update.api_key))
auth=('shorenstein', associated_list_object.api_key))
response_body = response.json()
response_stats = response_body['stats']
count = (response_stats['member_count'] +
response_stats['unsubscribe_count'] +
response_stats['cleaned_count'])

# Create a dictionary of list data
list_data = {'list_id': list_to_update.list_id,
'list_name': list_to_update.list_name,
'key': list_to_update.api_key,
'data_center': list_to_update.data_center,
'monthly_updates': list_to_update.monthly_updates,
'store_aggregates': list_to_update.store_aggregates,
list_data = {'list_id': analysis.list_id,
'list_name': associated_list_object.list_name,
'key': associated_list_object.api_key,
'data_center': associated_list_object.data_center,
'monthly_updates': associated_list_object.monthly_updates,
'store_aggregates': associated_list_object.store_aggregates,
'total_count': count,
'open_rate': response_stats['open_rate'],
'date_created': response_body['date_created'],
'campaign_count': response_stats['campaign_count']}

# Then re-run the calculations and update the database
try:
import_analyze_store_list(list_data, list_to_update.org_id)
import_analyze_store_list(list_data, associated_list_object.org_id)
except MailChimpImportError:
logger.error('Error updating list %s.', list_to_update.list_id)
failed_updates.append(list_to_update.list_id)
logger.error('Error updating list %s.', analysis.list_id)
failed_updates.append(analysis.list_id)

# If any updates failed, raise an exception to send an error email
if failed_updates:
Expand All @@ -336,7 +381,7 @@ def send_monthly_reports():
logger = get_task_logger(__name__)

# Grab info from the database
monthly_report_lists = ListStats.query.filter_by(
monthly_report_lists = EmailList.query.filter_by(
monthly_updates=True).all()

# Send an email report for each list
Expand All @@ -352,8 +397,13 @@ def send_monthly_reports():
monthly_report_list.list_name,
monthly_report_list.list_id)

# Get the most recent analysis for the list
stats_object = ListStats.query.filter_by(
list_id=monthly_report_list.list_id).order_by(
desc('analysis_timestamp')).first()

# Extract stats from the list object
stats = extract_stats(monthly_report_list)
stats = extract_stats(stats_object)
send_report(stats, monthly_report_list.list_id,
monthly_report_list.list_name,
users_to_email)
Loading

0 comments on commit fcbc439

Please sign in to comment.