Skip to content

Quick Start

Yuzhen Huang edited this page Apr 25, 2019 · 4 revisions

This tutorial provides a quick introduction to using Tangram.

For a complete programming guide, please refer to wiki/Programming Guide.

For the complete Tangram API, please refer to wiki/API.

For more applications, please refer to wiki/Applications.

Tangram's MapUpdate API naturally support various workloads from bulk processing, to machine learning, graph analytics, etc. efficiently. For simplicity, here we'll create a simple word count application with Tangram step by step.

Word Count

The C++ Program

Even though now users have to use C++ as the programming language for Tangram program, we will show that the API is high-level and writing Tangram code is not more difficult compared with other high-level languages, e.g., Python, Scala.

Define the WC User-defined Type

To begin with, we need to create a user-defined type WC to store the count of each word in all documents.

struct WC {
  using KeyT = std::string;
  KeyT word;
  int count = 0;

  WC() = default;
  WC(KeyT key) : word(key) {}
  KeyT Key() const { return word; }

  friend SArrayBinStream& operator<<(xyz::SArrayBinStream& stream, const WC& wc) {
    stream << wc.word << wc.count;
    return stream;
  }
  friend SArrayBinStream& operator>>(xyz::SArrayBinStream& stream, WC& wc) {
    stream >> wc.word >> wc.count;
    return stream;
  }
};

It is basically a key-value pair with the word (std::string) as the key and the count (int) as the value. Note that we need to define the key type KeyT, key function KeyT Key() const, and the serialization function in the WC type.

Construct the Workflow

At the beginning of the main function, we need to initialize the system with the command-line argument:

Runner::Init(argc, argv);

First , we creates a collection named lines with each line in the HDFS file as an object:

auto lines = Context::load(FLAGS_url, [](std::string content) {
  return content;
});

The FLAGS_url is the path to the file in HDFS and will be provided through command line argument by our launching script. The lambda function is the parse function for users to parse each line into an object. For our case, we just directly return the line.

Then, we create an empty placeholder collection with FLAGS_num_parts partitions.

auto wordcount = Context::placeholder<WC>(FLAGS_num_parts);

Next, we will conduct a MapUpdate plan for the word count.

Context::mapupdate(lines, wordcount, 
    [](std::string line, output<std::string, int>* o) {
      boost::char_separator<char> sep(" \t\n");
      boost::tokenizer<boost::char_separator<char>> tok(line, sep);
      for (auto& w : tok) {
        o->Add(w, 1);
      }
    },
    [](WC* wc, int c) {
      wc->count += c;
    });

MapUpdate requires 4 parameters. The first two are the map collection and the update collection. Here, the map collection is lines and the update collection is wordcount. The next two parameters are the map function and the update function respectively.

The map function takes two parameters: the first one is the element and the second one is an Output object where our output key/value pairs will emit to. In the map function, we split the lines and for each word, we emit a <word, 1> pair to the Output object. The update function takes two parameters: the first one is a pointer to the WC object, and the second one is the emitted value corresponding to the key in the map function. In the update function, we simply accumulate the count of the word by c.

Tangram applies the map function to each object/element in the map function. It shuffles the output of the map function and groups the key/value pairs by key. The shuffled key/value pair will be applied to the corresponding update collection by invoking the update function.

Finally, we need to submit all the plans by calling:

Runner::Run();

The complete C++ code for the above word count application is attached below.

The Launch Script

In the launch script, we need to specify the hostfile, the program file and the scheduler file:

hostfile = "machinefiles/20nodes"
progfile = "release/WordCount"
schedulerfile = "release/SchedulerMain"

The hostfile lists all worker machines, each by a line:

worker1
worker2
worker3
...

Then we need to specify some common parameters, e.g., the scheduler's hostname and port, the hdfs namenode and port.

common_params = {
    "scheduler" : "proj99",
    "scheduler_port" : "33254",
    "hdfs_namenode" : "proj99",
    "hdfs_port" : 9000,
}

Next, we can specify the command-line arguments for our program :

program_params = {
    "url" : "/datasets/corpus/enwiki-21g",
    "num_parts" : 1000,
}

Finally, we can launch the program by calling:

dump_core = False
l = Launcher(schedulerfile, progfile, hostfile,
             common_params, scheduler_params, program_params, env_params,
             dump_core)

l.Launch(sys.argv)

The launch python script handles the distribution and launching of the program. It will launch a Scheduler and multiple Workers specified in the hostfile using ssh.

The complete Python launching script is attached below.

Launching the Program

Launching the program is simple:

python /path/to/your/script

To kill your Tangram program:

python /path/to/your/script kill

Complete C++ Code:

#include "core/plan/runner.hpp"
#include "boost/tokenizer.hpp"

DEFINE_string(url, "", "The url for hdfs file");
DEFINE_int32(num_parts, 100, "# word partitions");
DEFINE_string(combine_type, "kDirectCombine", "kShuffleCombine, kDirectCombine, kNoCombine, timeout");

using namespace xyz;

struct WC {
  using KeyT = std::string;
  KeyT word;
  int count = 0;

  WC() = default;
  WC(KeyT key) : word(key) {}
  KeyT Key() const { return word; }

  friend SArrayBinStream& operator<<(xyz::SArrayBinStream& stream, const WC& wc) {
    stream << wc.word << wc.count;
    return stream;
  }
  friend SArrayBinStream& operator>>(xyz::SArrayBinStream& stream, WC& wc) {
    stream >> wc.word >> wc.count;
    return stream;
  }
};

int main(int argc, char **argv) {
  Runner::Init(argc, argv);
  const int combine_timeout = ParseCombineTimeout(FLAGS_combine_type);
  if (FLAGS_node_id == 0) {
    LOG(INFO) << "combine_type: " << FLAGS_combine_type << ", timeout: " << combine_timeout;
  }

  auto lines = Context::load(FLAGS_url, [](std::string content) {
    return content;
  });
  auto wordcount = Context::placeholder<WC>(FLAGS_num_parts);
  Context::mapupdate(lines, wordcount, 
    [](std::string line, output<std::string, int>* o) {
      boost::char_separator<char> sep(" \t\n");
      boost::tokenizer<boost::char_separator<char>> tok(line, sep);
      for (auto& w : tok) {
        o->Add(w, 1);
      }
    },
    [](WC* wc, int c) {
      wc->count += c;
    });

  Runner::Run();
}

Complete Python Launching Script

#!/usr/bin/env python

import sys
from os.path import dirname, realpath 

proj_dir = dirname(dirname(dirname(realpath(__file__))))
sys.path.append(proj_dir+"/scripts/")

from launcher import Launcher

hostfile = "machinefiles/20nodes"
progfile = "release/WordCount"
schedulerfile = "release/SchedulerMain"

common_params = {
    "scheduler" : "proj99",
    "scheduler_port" : "33254",
    "hdfs_namenode" : "proj99",
    "hdfs_port" : 9000,
}

program_params = {
    "url" : "/datasets/corpus/enwiki-21g",
    "num_parts" : 1000,
}

scheduler_params = {
    "dag_runner_type" : "sequential",
}

env_params = (
  "GLOG_logtostderr=true "
  "GLOG_v=-1 "
  "GLOG_minloglevel=0 "
  "LIBHDFS3_CONF=/data/opt/hadoop-2.6.0/etc/hadoop/hdfs-site.xml"
  )

dump_core = False
l = Launcher(schedulerfile, progfile, hostfile,
             common_params, scheduler_params, program_params, env_params,
             dump_core)

l.Launch(sys.argv)

You may refer to example/tfidf for the complete code.