Tuesday 3 October 2017

Big Data Beginners Guide

Big Data Beginners Guide

Let us go straight to a simple setup which worked for us in maintaining a big data cluster despite of limited resources.

BigData Infrastructure components ::-

1) HDFS
2) YARN
3) AMBARI
4) ZOOKEEPER
5) HIVE SERVER
6) HBASE

The below diagram is going to be very helpful in understanding the big data landscape.

The above architecture has served our team very well and is a general architectural overview for a big data platform.

Also, one can separate out data nodes i.e. yarn containers/executors and Hbase region servers. We decided on saving large machines and clubbed the executors and region server processes at one node.

Tips and tricks to follow soon.

1) What is the ideal replication factor in Hadoop ?

Always have a default replication factor of 3. But to manage hfs disk usage, one should have a file specific replication factor.

Saturday 27 August 2016

Insides into spark

Insides into spark

Spark is a processing engine. You submit a code and spark will run it for you. Spark will create execution tasks for your code and will use its power of distributed processing to bring results for you.

This infrastructure has a single driver node and multiple worker executor nodes. At every driver node, you need to provide a bunch of configurations to start a spark context. These configuration include yarn resource manager configurations, path for hive jars, diver and executor memory settings, port configurations, hadoop versions and a lot of settings which are a must to start spark Context. Once created, you can start submitting jobs to spark context.

Data in spark is a RDD(Resilient Distributed DataSet) present on executor nodes and only for some operations data is present on driver node. Spark has extensive features of data shuffle, repartition and a host of other techniques for performing data transformation in an optimised way.

Firstly, lets break the execution into steps and discuss its aspects one by one:-
  • First of all spark driver's JVM starts executing your code. Whenever it encounters an RDD transformation code, spark's DAG scheduler creates a Stage. This scheduler keeps on interpreting the code creating many stages as it interprets, till it encounters an action. An action is an external stage which spark cannot manage internally. A database/hadoop/file write is an example of spark action. At this point, when an action is interpreted, the DAG is already having many stages(remember a stage is a RDD transformation code) and these stages can only run at executors (as data is only on executor JVM's). 
  • At this point, spark's DAG scheduler is complete and comprising of many stages. These stages would be executed sequentially, unless the DAG has stages for parallel execution. The RDD transformation logic now needs to be executed at executors. Here is the trick. Execution does not happen as one stage at a time.


                                            1.1  Spark Action Basics(Job -> Stage -> Task)
  • An RDD is present on multiple executors and each executor would have this RDD split into multiple partitions. So, the transformation logic of your stage would be executed in parallel among partitions.  This unit of execution is called tasks.
  • While executing, driver node has the exact details of each RDD partition. And the same detail is available at spark's UI console. Once all the task transformations are complete, the stage would complete. Here, some tasks may fail and the failed tasks need to re-computed and if tasks keep on failing repeatedly, this stage may fail and there by the entire spark job would fail.
  • And once your spark stage is successful, the next stage runs and when the last stage completes, your spark job is complete. And at this point your spark action finishes.
  • The DAG interpreter will start interpreting code after the previous action till it finds the next action, after which the entire execution phase would repeat.
  • To summarize, your code may have multiple actions, resulting in multiple spark jobs and their execution completes only after all spark jobs are complete.

Some specific questions :-

1) Why does JobProgressListener take so much heap memory on driver ?
Spark's ui data is very memory extensive and data for each Job, stage, task and sql data accumulates to a huge chunk of memory. Storing full data of 1000 spark jobs, assuming 5 stages in a job, can take up around 5GB of memory on driver's heap.

2) What is a spark data frame ?
Spark data frame is a distributed data set along with a Schema associated with it. This dataframe can be queried like a query in relational databases. Image a data frame as a list of objects. Here every object is a sequence of values. There is a schema array storing the field names and its datatype at each index of the values sequence. This list of objects is present at every executor.

3) Does spark also send serialized objects to executor nodes along with the code from driver?
Yes spark serializes all objects which are referenced inside the stage code block and sends them to the executor node. So, to avoid runtime exceptions of serialization, all referenced objects inside spark stage should be serializable. Objects like logger and some types of application contexts are not serializable.

4) Why do we get an error saying "spark stage should not have a return statement."?
This error is designed in the spark code. Spark stage is a RDD transformation block and this code runs at you executor JVM. And the calling function will be executed in the driver JVM.

5) What is an accumulator in spark ?
Accumulator is value aggregate. Using this you can aggregate values at executors and ensure that the aggregated values across all executors is available at the driver. It is a very important tool to generate metrics.

6) I do not want to use accumulators as they make the working network heavy and chatty. What can I do ?
Yes this is a very good point. It is a very good idea to collect metrics at every executor. This can be done by writing values and executor level aggregates to a file. And a file parser can parse this file and push these metrics to another system, say a metric management system. This system will facilitate separation of concern as all aggregates and monitoring rules corresponding to a metric name can be defined in this system which is completely isolated from the metric generation process.


7) How can i monitor spark infra metrics ?
Create your own spark's  metrics.properties and set this file location in spark.metrics.conf in JobConf of sparkcontext. You can monitor on number of active spark jobs in your spark context. This is particularly important to ensure that the spark job load in production server is equally distributed throughout the day.

8) In spark console job names do not make sense. What can I do ?
In spark context, set the job description with text describing your job. This will go in as job name in spark ui console. This description is only applicable to the particular thread on which your spark job is launched and it will not affect other spark jobs. Be sure to clear this description when your spark job finishes.

9) is it good to have long running spark context ?
No it is not good to have a long running spark context. Running several spark jobs in parallel on a single spark context is not the desired way to run. In doing so you would need to anticipate the cumulative heap memory usage for all jobs on the spark context. Especially hive queries take a lot of driver memory due to heavy broadcast and HadoopConf objects.

more to follow .......





--------------------------------------------------------------------------------------------------------------------------

Other links :

JVM Performance Tuning
Big Data Guide
GC Tuning Guide