Skip to content

Commit

Permalink
add google analytics account summaries to s3 operator
Browse files Browse the repository at this point in the history
  • Loading branch information
benjamingregory committed Feb 7, 2018
1 parent 55372cc commit 93e7657
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
5 changes: 4 additions & 1 deletion __init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from airflow.plugins_manager import AirflowPlugin
from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook
from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import GoogleAnalyticsReportingToS3Operator
from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import GoogleAnalyticsAccountSummariesToS3Operator


class GoogleAnalyticsPlugin(AirflowPlugin):
name = "google_analytics_plugin"
hooks = [GoogleAnalyticsHook]
operators = [GoogleAnalyticsReportingToS3Operator]
operators = [GoogleAnalyticsReportingToS3Operator,
GoogleAnalyticsAccountSummariesToS3Operator]
executors = []
macros = []
admin_views = []
Expand Down
63 changes: 63 additions & 0 deletions operators/google_analytics_account_summaries_to_s3_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import json
import os

from airflow.models import BaseOperator

from airflow.hooks.S3_hook import S3Hook

from GoogleAnalyticsPlugin.hooks.google_analytics_hook import GoogleAnalyticsHook


class GoogleAnalyticsAccountSummariesToS3Operator(BaseOperator):
template_fields = ('s3_key',)

def __init__(self,
google_analytics_conn_id,
s3_conn_id,
s3_bucket,
s3_key,
brand,
space,
*args,
**kwargs):
super().__init__(*args, **kwargs)

self.google_analytics_conn_id = google_analytics_conn_id
self.s3_conn_id = s3_conn_id
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.brand = brand
self.space = space

def execute(self, context):
ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id)
s3_conn = S3Hook(self.s3_conn_id)

account_summaries = ga_conn.get_account_summaries()

file_name = '/tmp/{key}.jsonl'.format(key=self.s3_key)
with open(file_name, 'w') as ga_file:
data = []
for item in account_summaries.get('items', []):
root_data_obj = {
'account_id': item['id'],
'pgv_brand': self.brand,
'pgv_space': self.space
}

for web_property in item.get('webProperties', []):
data_obj = {}
data_obj.update(root_data_obj)

data_obj['property_id'] = web_property['id']

for profile in web_property.get('profiles', []):
data_obj['profile_id'] = profile['id']
data_obj['profile_name'] = profile['name']
data.append(data_obj)

json_data = '\n'.join([json.dumps(d) for d in data])
ga_file.write(json_data)

s3_conn.load_file(file_name, self.s3_key, self.s3_bucket, True)
os.remove(file_name)

0 comments on commit 93e7657

Please sign in to comment.