Content-based recommendation system using neural networks - Part 3



In this post, we will build our content based model in the cloud. Train our model in the cloud will allow us to train on large amounts of data and without having to manage any infrastructure. We will use Cloud ML Engine to train our content-based model DNN in TensorFlow. In order to do this, we need to put our code into a Python package (i.e. add setup.py and init.py files) and we will organize our code into:

  • model.py
  • task.py




    Submitting a job to ML Engine

    task.py will be executed by ML Engine and it references our content-based model logic located in model.py.

import os
import tensorflow as tf
import numpy as np
import google.datalab.bigquery as bq

PROJECT = 'PROJECT' # REPLACE WITH YOUR PROJECT ID
BUCKET = 'BUCKET' # REPLACE WITH YOUR BUCKET NAME
REGION = 'us-central1' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1

# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '1.8'


gcloud  config  set project $PROJECT
gcloud config set compute/region $REGION



model.py

#!/usr/bin/env python
# coding=utf-8

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

# Import helpful libraries and setup our project, bucket, and region
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.python.lib.io import file_io


# tensorboard==1.8.0
# tensorflow==1.8.0
# tensorflow-hub==0.2.0


import os
import tensorflow as tf
import numpy as np
import tensorflow_hub as hub
import shutil



def build_model(args):


# Variable pour serving_input_fn()
    NON_FACTOR_COLUMNS = 'next_content_id,visitor_id,content_id,category,title,author,months_since_epoch'.split(',')


    record_defaults = [["Unknown"], ["Unknown"],["Unknown"],["Unknown"],["Unknown"],[523],["Unknown"]]


    column_keys = ["visitor_id", "content_id", "category", "title", "author", "months_since_epoch", "next_content_id"]
    label_key = "next_content_id"

    def read_dataset(filename, mode, batch_size = 512):
      def _input_fn():
          def decode_csv(value_column):
              columns = tf.decode_csv(value_column,record_defaults=record_defaults)
    #           columns = tf.decode_csv(value_column,record_defaults=record_defaults, use_quote_delim=False)
              features = dict(zip(column_keys, columns))
              label = features.pop(label_key)
              return features, label

          # Create list of files that match pattern
          file_list = tf.gfile.Glob(filename)

          # Create dataset from file list
          dataset = tf.data.TextLineDataset(file_list).map(decode_csv)

          if mode == tf.estimator.ModeKeys.TRAIN:
              num_epochs = None # indefinitely
              dataset = dataset.shuffle(buffer_size = 10 * batch_size)
          else:
              num_epochs = 1 # end-of-input after this

          dataset = dataset.repeat(num_epochs).batch(batch_size)
          return dataset.make_one_shot_iterator().get_next()
      return _input_fn


    def create_feature_columns(args):

      embedded_title_column = hub.text_embedding_column(key="title", module_spec="https://tfhub.dev/google/nnlm-de-dim50/1", trainable=False)
        #test
        # title_column = tf.feature_column.categorical_column_with_hash_bucket(key="title",
        #     hash_bucket_size=len(content_ids_list) + 1)
        # embedded_title_column = tf.feature_column.embedding_column(
        #     categorical_column=title_column,
        #     dimension=10)

      content_id_column = tf.feature_column.categorical_column_with_hash_bucket(
            key="content_id",



            hash_bucket_size= 6088 + 1)

      embedded_content_column = tf.feature_column.embedding_column(
            categorical_column=content_id_column,
            dimension=10)

      author_column = tf.feature_column.categorical_column_with_hash_bucket(key="author",



            hash_bucket_size=459 + 1)







      embedded_author_column = tf.feature_column.embedding_column(
            categorical_column=author_column,
            dimension=3)

      category_column_categorical = tf.feature_column.categorical_column_with_vocabulary_list(
            key="category",
            vocabulary_list=['Facebook Instant Articles','Le sac de chips''Actualité','Opinions''Argent','Enquêtes','Monde','Spectacles','24heures','Sports','JM','Blogues','Weekend','Le Guide de l\'auto','Porte-monnaie','Voyages','Décès','En 5 minutes','publireportage','Qub Radio','Automatisation','Magasiner'],
            num_oov_buckets=1)

      # category_column_categorical = tf.feature_column.categorical_column_with_vocabulary_file(key = "category", vocabulary_file = tf.gfile.Glob(filename = "gs://numericanalytics/contentbased_dnn/labs/categories.txt"), num_oov_buckets = 1)


      category_column = tf.feature_column.indicator_column(category_column_categorical)

      months_since_epoch_boundaries = list(range(400,700,20))
      months_since_epoch_column = tf.feature_column.numeric_column(
            key="months_since_epoch")
      months_since_epoch_bucketized = tf.feature_column.bucketized_column(
            source_column = months_since_epoch_column,
            boundaries = months_since_epoch_boundaries)

      crossed_months_since_category_column = tf.feature_column.indicator_column(tf.feature_column.crossed_column(
          keys = [category_column_categorical, months_since_epoch_bucketized],





          hash_bucket_size = 330 + 1))









      feature_columns = [embedded_content_column,
                           embedded_author_column,
                           category_column,
                           embedded_title_column,
                           crossed_months_since_category_column]

      return feature_columns




    def model_fn(features, labels, mode, params):
      net = tf.feature_column.input_layer(features, params['feature_columns'])

      for units in params['hidden_units']:
                net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

      logits = tf.layers.dense(net, params['n_classes'], activation=None)

      predicted_classes = tf.argmax(logits, 1)
      values, predicted_test = tf.nn.top_k(logits, 3)

 
    
    


      from tensorflow.python.lib.io import file_io

      with file_io.FileIO("gs://numericanalytics/contentbased_dnn/labs/content_ids.txt", mode = 'r') as ifp:
            content = tf.constant([x.rstrip() for x in ifp])

        # Gather predicted class names based predicted class indices
      predicted_class_names = tf.gather(content, predicted_classes)
      predicted_class_test = tf.gather(content, predicted_test)


      if mode == tf.estimator.ModeKeys.PREDICT:
         predictions = {'class_ids': predicted_classes[:, tf.newaxis], 'class_names' : predicted_class_names[:, tf.newaxis], 'probabilities': tf.nn.softmax(logits), 'logits': logits, 'class_test' : predicted_class_test[:, tf.newaxis]}
#          predictions = {'class_names' : predicted_class_names[:, tf.newaxis]}


            # Create export outputs
         export_outputs = {"predict_export_outputs": tf.estimator.export.PredictOutput(outputs = predictions)}

         return tf.estimator.EstimatorSpec( # return early since we're done with what we need for prediction mode
                mode = mode,
                predictions = predictions,
                loss = None,
                train_op = None,
                eval_metric_ops = None,
                export_outputs = export_outputs)



      table = tf.contrib.lookup.index_table_from_file(vocabulary_file = "gs://numericanalytics/contentbased_dnn/labs/content_ids.txt")
      labels = table.lookup(labels)

          # Compute loss.
      loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)

          # Compute evaluation metrics.
      accuracy = tf.metrics.accuracy(labels=labels, predictions=predicted_classes, name='acc_op')
      top_10_accuracy = tf.metrics.mean(tf.nn.in_top_k(predictions=logits, targets=labels, k=10))

      metrics = {
            'accuracy': accuracy,
            'top_10_accuracy' : top_10_accuracy}

        #   tf.summary.scalar make accuracy available to TensorBoard in both TRAIN and EVAL modes
      tf.summary.scalar('accuracy', accuracy[1])
      tf.summary.scalar('top_10_accuracy', top_10_accuracy[1])



      if mode == tf.estimator.ModeKeys.EVAL:
        # return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)
        return tf.estimator.EstimatorSpec( # return early since we're done with what we need for evaluation mode
                mode = mode,
                predictions = None,
                loss = loss,
                train_op = None,
                eval_metric_ops = metrics,
                export_outputs = None)



          # Create training op.
      assert mode == tf.estimator.ModeKeys.TRAIN

      optimizer = tf.train.AdagradOptimizer(learning_rate = params['learning_rate'])
      train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
      return tf.estimator.EstimatorSpec( # final return since we're done with what we need for training mode
            mode = mode,
            predictions = None,
            loss = loss,
            train_op = train_op,
            eval_metric_ops = None,
            export_outputs = None)


    # The serving_input_fn function is used to define the shapes and types of the inputs the model accepts when the model is exported
    # It is optional, but required for deploying the trained model to a endpoint

    def serving_input_fn():
      # Logic to the following:
      # 1. Defines placeholders that TensorFlow will feed with inference requests
      # 2. Preprocess input data
      # 3. Returns a tf.estimator.export.ServingInputReceiver or tf.estimator.export.TensorServingInputReceiver,
      # which packages the placeholders and the resulting feature Tensors together.
        feature_placeholders = {
            colname : tf.placeholder(dtype = tf.string, shape = [None]) \
            for colname in NON_FACTOR_COLUMNS[1:-1]
        }
        feature_placeholders['months_since_epoch'] = tf.placeholder(dtype = tf.float32, shape = [None])

        features = {
            key: tf.expand_dims(tensor, -1) \
            for key, tensor in feature_placeholders.items()
        }

        return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)


    # Create train and evaluate loop to combine all of the pieces together.
    tf.logging.set_verbosity(tf.logging.INFO)
    def train_and_evaluate(args):
      estimator = tf.estimator.Estimator(
            model_fn=model_fn,
            model_dir = args['output_dir'],
            params={
              'feature_columns': create_feature_columns(args),
              'hidden_units': [200, 100, 50],
              'bucket': args['bucket'],




              'n_classes': 6088,






              'learning_rate': args['learning_rate'],
            })





      train_spec = tf.estimator.TrainSpec(
            input_fn = read_dataset(filename = args['train_data_paths'], mode = tf.estimator.ModeKeys.TRAIN, batch_size = args['batch_size']), max_steps = args['train_steps'])



      exporter = tf.estimator.LatestExporter('exporter', serving_input_fn)



      eval_spec = tf.estimator.EvalSpec(
            input_fn = read_dataset(filename = args['eval_data_paths'], mode = tf.estimator.ModeKeys.EVAL, batch_size = args['batch_size']),
            steps = None,
            start_delay_secs = args['start_delay_secs'],
            throttle_secs = args['throttle_secs'],
            exporters = exporter)








      tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)


    # Call train_and_evaluate loop
    train_and_evaluate(args)



task.py

import argparse
import json
import os
import shutil

from . import model

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # Input and output paths
    parser.add_argument(
        '--bucket',
        help = 'GCS path to training data',
        required = True
    )
    parser.add_argument(
        '--train_data_paths',
        help = 'GCS path to training data',
        required = True
    )
    parser.add_argument(
        '--eval_data_paths',
        help = 'GCS path to validation data',
        required = True
    )
    parser.add_argument(
        '--output_dir',
        help = 'GCS pat to write checkpoints and export models',
        required = True
    )
    # Tunable hyperparameters
    parser.add_argument(
        '--batch_size',
        help = 'The number of examples in each mini-batch',
        type = int,
        default = 512
    )
    parser.add_argument(
        '--learning_rate',
        help = 'The learning rate for gradient descent',
        type = float,
        default = 0.1
    )
    parser.add_argument(
        '--hidden_units',
        help = 'Hidden layer sizes to use for DNN feature columns -- provide space-separated layers',
        type = str,
        default = "200 100 50"
    )
    parser.add_argument(
        '--train_steps',
        help = 'The number of steps/batches to train on',
        type = int,
        default = 2000
    )
    parser.add_argument(
        '--start_delay_secs',
        help = 'The number of seconds to delay before starting evaluation',
        type = int,
        default = 30
    )
    parser.add_argument(
        '--throttle_secs',
        help = 'The number of seconds between each evaluation',
        type = int,
        default = 60
    )
    parser.add_argument(
        '--job-dir',
        help = 'this model ignores this field, but it is required by gcloud',
        default = 'junk'
    )

    args = parser.parse_args()
    arguments = args.__dict__

    # unused args provided by service
    arguments.pop('job_dir', None)
    arguments.pop('job-dir', None)

    # Create hidden_units list
    arguments['hidden_units'] = [int(x) for x in arguments['hidden_units'].split(" ")]

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
    arguments['output_dir'] = os.path.join(
        arguments['output_dir'],
        json.loads(
            os.environ.get('TF_CONFIG', '{}')
        ).get('task', {}).get('trial', '')
    )

    # Run the training job
    shutil.rmtree(arguments['output_dir'], ignore_errors = True) # start fresh each time
    model.build_model(arguments)



Run on Google Cloud ML Engine with Cloud Datalab in GCP

%bash
OUTDIR=gs://${BUCKET}/contentbased_dnn/labs/small_trained_model
JOBNAME=contentbased_recommendation_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ml-engine jobs submit training $JOBNAME \
  --region=$REGION \
  --module-name=trainer.task \
  --package-path=$(pwd)/trainer \
  --job-dir=$OUTDIR \
  --staging-bucket=gs://$BUCKET \
  --scale-tier=BASIC_GPU \
  --runtime-version=$TFVERSION \
  -- \
  --bucket=${BUCKET} \
  --train_data_paths=gs://${BUCKET}/contentbased_dnn/labs/training_set.csv* \
  --eval_data_paths=gs://${BUCKET}/contentbased_dnn/labs/test_set.csv* \
  --output_dir=${OUTDIR} \
  --batch_size=128 \
  --learning_rate=0.1 \
  --hidden_units="256 128 64" \
  --train_steps=1000 \
  --start_delay_secs=30 \
  --throttle_secs=30

TensorBoard

from google.datalab.ml import TensorBoard
TensorBoard().start('gs://numericanalytics/contentbased_dnn/labs/small_trained_model')
for pid in TensorBoard.list()['pid']:
  TensorBoard().stop(pid)
  print("Stopped TensorBoard with pid {}".format(pid))

Create model & version on Cloud ML Engine

%bash
MODEL_NAME="Contentbased_DNN_test_top3"
MODEL_VERSION="ml_on_gcp"
MODEL_LOCATION=$(gsutil ls gs://numericanalytics/contentbased_dnn/labs/small_trained_model/export/exporter/ | tail -1)
echo "Deleting and deploying $MODEL_NAME $MODEL_VERSION from $MODEL_LOCATION ... this will take a few minutes"
#gcloud ml-engine versions delete ${MODEL_VERSION} --model ${MODEL_NAME}
#gcloud ml-engine models delete ${MODEL_NAME}
gcloud ml-engine models create ${MODEL_NAME} --regions $REGION
gcloud ml-engine versions create ${MODEL_VERSION} --model ${MODEL_NAME} --origin ${MODEL_LOCATION} --runtime-version $TFVERSION

Make Predictions

%bash
OUTDIR=gs://${BUCKET}/contentbased_dnn/labs/small_trained_model
JOBNAME=contentbased_recommendation_$(date -u +%y%m%d_%H%M%S)
MODEL_NAME="Contentbased_DNN_test"
MODEL_VERSION="ml_on_gcp"

from oauth2client.client import GoogleCredentials
import requests
import json

MODEL_NAME = 'Contentbased_DNN_test_top3'
MODEL_VERSION = 'ml_on_gcp'

token = GoogleCredentials.get_application_default().get_access_token().access_token
api = 'https://ml.googleapis.com/v1/projects/{}/models/{}/versions/{}:predict' \
         .format(PROJECT, MODEL_NAME, MODEL_VERSION)
headers = {'Authorization': 'Bearer ' + token }
data = {
  'instances': [
    {
      'visitor_id': '1000056146838977869',
      'content_id': '/2018/12/10/un-olivier-dans-la-gorge',
      'category': 'Opinions',
      'title': 'Un Olivier dans la gorge',
      'author': 'Sophie Durocher',
      'months_since_epoch': 540
    },
  ]
}
response = requests.post(api, json=data, headers=headers)
print(response.content)