Now that we have developed classes to help us interact for both the Fitbit and Strava API, we need a way to store that data where it is easily accessible. In order to do so we will roll out a postgres database on docker and use sqlalchemy to simplify our interactions with the db.

Project Organization

Since eventually I want to setup a flask website to display the information, I will start setting up the project as a flask application factory and separating into modules. There are much better tutorials out there for how to set it up and I recommend searching for them.
At this point my project structure looks as follows.

\app
  \static
    \examples
    \js
  \stats
    __init__.py
    models.py
  \templates
  __init__.py
stats_con.py
tasks.py

Test DB setup

Since we are still in the development stages we choose to roll out a docker based db  (in this case Postgres).

We can spin up the container with the following command

docker run --name sqlalchemy-orm-psql -e
POSTGRES_PASSWORD=pass -e POSTGRES_USER=usr -e
POSTGRES_DB=sqlalchemy -p 5432:5432 -d postgres

This will initialize postgres database at localhost:5432 a username named usr and set its password as pass, finally it will also create a database called sqlalchemy.

Creating the sqlalchemy classes

SQLAlchemy is a great tool for setting up our database interactions and tables (it also has a very useful flask plugin for down the line).

The first step is setting up classes to allow us to interact with the database and to function as representations of our tables.

Since this isn't a particularly complex projects I only setup one folder to contain my classes under the stats folder, making sure to create the __init__.py to treat it as a module.

Upper Level Initialization

Since I will eventually move everything to flask, we want to start enough to it. Under the app subfolder we want to create an __init__.py file to initialize properly and allow for imports into the sqlalchemny classes.

All we are doing in this file is creating the database connection and setting its parameters.

We import sqlalchemy and create the engine using the previously created postgres db URI.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://usr:pass@localhost:5432/sqlalchemy')

We use this engine to create session which allows us to run queries against the database and functions as our main connection to it.

Session = sessionmaker(bind=engine)

Finally we create a Base, from the declarative_base() function. This creates the base for our classes. This makes sure that the correct Table objects are created and properly maps our objects to the database.

Base=declarative_base()

This more than likely will be changed when using flask since flask-alchemy has additional functionality tied to using db.Model as opposed to Base=declarative_base()

Fitbit Classes

Now that we have initialize the connection to our database and created the base factory we can create the actual classes for our data.

First we import our Base object and the necessary sqlalchemy.

from .. import Base
from sqlalchemy import BigInteger, Column, String, Integer, ForeignKey, Float, DateTime, TIMESTAMP, func, Time

class Fitbit_Weight(Base):
    __tablename__='fitbit_weight'
    id=Column(BigInteger(), primary_key=True)
    weight=Column(Float(), nullable=False)
    bmi=Column(Float())
    fat=Column(Float())
    record_date=Column(DateTime())
    record_time=Column(Time())
    last_time = Column(TIMESTAMP, server_default=func.now(), onupdate=func.current_timestamp())

    def __repr__(self):
        return "<FITBIT WEIGHT '%s', weight='%s', bmi='%s', date='%s'>"%(self.id, self.weight, self.bmi, self.record_date)

class Fitbit_Calories(Base):
    __tablename__='fitbit_calories'
    id=Column(BigInteger(), primary_key=True)
    calories=Column(Float())
    record_date=Column(DateTime())
    last_time = Column(TIMESTAMP, server_default=func.now(), onupdate=func.current_timestamp())

    def __repr__(self):
        return "<FITBIT Calories '%s', calories='%s', date='%s'>"%(self.id, self.calories, self.record_date)

The process is pretty self explanatory. The the things to pay attention to is to remember to create a primary_key on each table.

w also used the func function to make sure that I store the time at which the records are updated.

Finally to assist with debugging and logging to a certain degree, we create a __repr__ method to allow for a more informative message for each of objects created with these classes.

Strava Class

The same process is used for the Strava information

class Strava_Activity(Base):
    __tablename__='strava_activity'
    #index=Column(Integer(), primary_key=True)
    id=Column(BigInteger(), primary_key=True)
    owner=Column(Integer())#Probs Foregin keyring
    activity_type=Column(String(50))
    distance=Column(Float())
    elapsed_time=Column(Float())
    average_speed=Column(Float())
    average_cadence=Column(Float())
    average_heartrate=Column(Float())
    name=Column(String(50))
    utc_offset=Column(Float())
    max_speed=Column(Float())
    max_heartrate=Column(Float())
    total_elevation_gain=Column(Float())
    upload_id=Column(BigInteger())
    moving_time=Column(Float())
    start_date=Column(DateTime())
    start_date_local=Column(DateTime())
    last_time = Column(TIMESTAMP, server_default=func.now(), onupdate=func.current_timestamp())

    def __repr__(self):
        return "<STRAVA ACTIVITY '%s', distance='%s', type='%s', date='%s'>"%(self.id, self.distance, self.activity_type, self.start_date_local)

Storing the data

Now that we have a well defined way to access our database we can get to the process of actually recording the data obtained from our API calls into the db.

The first step is to create all the tables we defined in our sqlalchemy classes if they dont exist.
Base.metadata.create_all()

We also initialize our db session with session=Session()

Importing the stats_con.py classes Strava and Fitbit we can use the previously defined functions to obtain our information.

def Update_Strava_Activities():
    # Initialize Strava connection and get the data
    stv=Strava()
    data=stv.get_activities().json()

    # Get the required columns from our Strava Class
    strava_params=[c for c in inspect(Strava_Activity).columns.keys()]

    # Remove the last time parameter as that is autogenerated
    strava_params.remove('last_time')

    #We will first create all our model class instances for Strava_Activity
    acts=[]
    for dic in data:
        #Initialize an empty default dict so we dont get triped up with key missing issues
        d = defaultdict(lambda: None, dic)
        #Rename some columns from the API json so they match our class
        d['owner']=d['athlete']['id']
        d['activity_type']=d['type']

        #Search for values needed in our class in the API json
        update={}
        for val in strava_params:
            update[val]=d[val]

        log.info(update)

        #Initialize our model class from the dictionary
        act=Strava_Activity(**update)
        acts.append(act)

    # Merge our results into the database (I will rewrite all of them for the last 30 items regardless of what it says), at the current moment I don't need to check the API for deleted activities but might in the future.
    for act in acts:
        try:
            with session.begin_nested():
                session.merge(act)
            log.info("Updated: %s"%str(act))
        except:
            log.info("Skipped %s"%str(act))
    session.commit()
    session.flush()

First we initialize our Strava connection using our previously developed class.
We obtain our data using stv.get_activities().json(). This will return a json with our information.

Since we need to define the values of the  Since we are lazy we actually get the names of the sqlalchemy columns from the sqlalchemy Strava Class. We can do that using inspect and pushing into an array.

strava_params=[c for c in inspec(Strava_Activity).columns.keys()]

In the next code block we iterate thru the array and push the values into a default dictionary. The reason we use a default dictionary is to prepolulate with nulls since some activities will not have all the infomrmation. We also use this to rename some of the obtained json values into the correct column name.

We use**update to pass a dictionary to initialize our sqlaclhemy object from a dictionary instead of typing it out.

Then we use our session object with begin_nested and try to merge our recods and commit them, this is better explained at sqlalchemy documentation.

We will also do the same for our Fitbit Weight

def Update_Fitbit_Weight():
    fbt=Fitbit()
    wdata=fbt.get_weight().json()

    fweight_params=[c for c in inspect(Fitbit_Weight).columns.keys()]
    fweight_params.remove('last_time')

    acts=[]

    for dic in wdata['weight']:
        d = defaultdict(lambda: None, dic)
        d['id']=d['logId']
        d['record_date']=d['date']
        d['record_time']=d['time']

        update={}
        for val in fweight_params:
            update[val]=d[val]

        act=Fitbit_Weight(**update)
        acts.append(act)

    for act in acts:
        try:
            with session.begin_nested():
                session.merge(act)
            log.info("Updated: %s"%str(act))
        except:
            log.info("Skipped %s"%str(act))
    session.commit()
    session.flush()

and the Fitbit Calories

def Update_Fitbit_Calories():
    fbt=Fitbit()
    #The calories dont have an ID so create one out of the date
    cdata=fbt.get_calories().json()
    acts=[]
    for dic in cdata['foods-log-caloriesIn']:
        d = defaultdict(lambda: None, dic)
        update={}
        update['id']=int(datetime.datetime.strptime(d['dateTime'], '%Y-%m-%d').timestamp())
        update['record_date']=d['dateTime']
        update['calories']=d['value']

        act=Fitbit_Calories(**update)
        acts.append(act)

    for act in acts:
        try:
            with session.begin_nested():
                session.merge(act)
            log.info("Updated: %s"%str(act))
        except:
            log.info("Skipped %s"%str(act))
    session.commit()
    session.flush()

So now we have a method of storing our data into our database.

Scheduling our Updates

Finally we probably want to set this to update on a schedule. There are multiple ways of doing this. The most popular being chrontab or something with a celery scheduled worker. I have also been looking at using airflow for stuff like this. However at this moment we just want to keep this simple and for that Prefect works pretty well.

All we have to do is create tasks and define a Flow and a Schedule, we can even set up dependent tasks that way.

So for example we update the Update_Strava_Activities function to be a Prefect tasks.

@task(max_retries=2, retry_delay=timedelta(seconds=2))
def Update_Strava_Activities():

Which sets it to retry 2 times with a 2 second delay between retries.
We then define theses tasks inside a flow, tied to a schedule.

from datetime import timedelta
import prefect
from prefect import Flow, Parameter, task, unmapped
from prefect.schedules import IntervalSchedule

...

schedule = IntervalSchedule(interval=timedelta(minutes=60))

with Flow("Data Updater", schedule) as flow:
    Update_Strava_Activities()
    Update_Fitbit_Weight()
    Update_Fitbit_Calories()

This will run both of those tasks independently (so if one fails it wont prevent the other from running) and they will fire off every 60 seconds. All we have to do after is run the flow with flow.run() and the script will fire off and update our data every 60 minutes.

In the final part we will modify this basic project and develop a quick flask dashboard to present a calendar view of our weekly stats.

Get the github repo here.