Tuesday, July 20, 2021

Upsert Parquet Data Incrementally

Incremental data load is very easy now a days. Next generation Databricks Delta allows us to upsert and delete records efficiently in data lakes. However, it's a bit tedious to emulate a function that can upsert parquet table incrementally like Delta. In this article I'm going to throw some light on the subject.

Hadoop follow WORM (write once and read multiple time) that doesn't allow us to delete rows from Data Frame. But then question appears how to handle restatement data? When last week data got changed in current week, we need to update the row with latest value in master table.

In this example we'll process historical and latest data before overwriting existing table, however, for large data sets, it will impact engine performance. We need to segregate input data in such a way so that only the partition gets update where there is a change. For that, data wrangling is most important.

In below set of example Data Frame, assume Table 1 is our previous week data and Table tow has been received in current week. Significant point is that the row of week ID 260 (Sales value) got changed in Table 2. We have to keep that change in master data.

Let's prepare three sets of test Data Frame and apply upsert function with first two. Using "testthat" R library we'll compare the result with third Data Frame.

Sunday, October 25, 2020

Spark Processing - Leveraging Databricks Jobs

 

Running spark application in Databricks require many architectural considerations. From the beginning of choosing right cluster up to coding is million-dollar question. Not only that, post implementation, monitoring job performance and optimizing ETL jobs is another continuous process of improvement.

Here, we’ll discuss a few points that can boost up job performance and report your business at earliest.

Databricks offer two types of clusters comprising different runtime for different workload. Choosing Databricks runtime based on working area and domain is first and foremost important point to be considered.

Next point is to select worker and driver type. Before going into the depth of different types of worker and Driver, lets have a look into the function of them.

Driver:  The Driver is one of the nodes in the Cluster. The driver does not run computations, it plays the role of a master node in the Spark cluster. When you join multiple portion of Dataset from different executor, the whole data is sent to the Driver.

Worker: Workers run the Spark executors and other services required for the proper functioning of the clusters. Process of distributed workload happens on workers. Databricks runs one executor per worker node; therefore, the terms executor and worker are used interchangeably. Executors are JVMs that run on Worker nodes. These are the JVMs that run Tasks on data Partitions.

There are two types of cluster modes. Standard and high concurrency. High concurrency provides resource utilization, isolation for each notebook by creating a new environment for each one, security and sharing by multiple concurrently active users. Sharing is accomplished by pre-empting tasks to enforce fair sharing between different users. Pre-emption is configurable.

Now it would be easy to understand the requirement and select worker and driver type. Not like that, we need to consider price offered by different service. Databricks is nothing but a PaaS. It’s depends on two major instance provider - AWS and Azure. Go to the product price page of Databricks, it will offer you to select any one of two. Below are the prices offered (old one) for Microsoft Azure. (Please check the latest offer)



Now, we are bit serious about selecting the platform to run our ETL jobs. Databricks and Azure both are well documented however it is fragmented, therefore, above information will help understanding the concept quickly.

Hope we have already selected near to perfect platform based on our work type and budget. Next to choose language based on our convenient. Databricks supports multiple languages but we’ll always get the best performance with JVM-based languages like Spark-SQL, java, Scala. On top of that Apache Spark is written in Scala, therefore writing ETL in Scala will be advantageous indeed. However, every language has its own advantage, like python is bit popular, where R will be best use for plotting. Along with performance, depending on capability and availability of resource we should select language. 

Next point comes in my mind is different file formats for data stored in Apache Hadoop—including CSV, JSON, Apache Avro, and Apache Parquet. Text processing (CSV and JSON) are replaced by most people with Avro and Parquet as the main contenders. General observation of Databricks jobs reveals that when we process PARQUET format of file(read/write), at the time of shuffling number of partitions get increased compared to CSV, distribution of task and parallelism also seems to be more optimized comparatively.


When it comes to choosing Hadoop file format, there are many factors involved—such as integrating with third-party applications, schema evolution requirements, data type availability, and performance. But if performance matters, benchmarking show that Parquet would be the format to choose. However, Databricks Delta extends Apache Spark to simplify data reliability and boost Spark's performance.

Apart from that autoscaling and Databricks pools can improve performance of spark jobs, however cost involve with that. Databricks does not charge DBUs while instances are idle in the pool. Instance provider billing does apply.

Now code optimization may the last option to boost up performance. I’ll discuss the same in a separate thread here only.


Saturday, November 23, 2019

Using gRPC Client in CI/CD Pipeline

To expedite delivery almost every project has merged their Development and Operational activities under a common pipeline. The philosophy has been implemented in different way by keeping DevOps principles intact across the globe. Simplicity and Security is one of the most important aspect of DevOps Principals. It reduces operational overhead and improves Return of Investment.

Keeping that in mind I preferred to use GitLab which provides everything that is essentials for DevOps lifecycle. Every commit gets tested rigorously, then Build image and deploy in Docker Swarm or in Kubernetes cluster. GitLab delivers every feature very quickly to the end-user.There are various ways to achieve that. Continuous deployment usually gets configured through webhooks. A common pattern is to run http server locally that listens incoming HTTP requests from repository and triggers specific deployment command on every push.

Instead of using BaseHTTPRequestHandler to listen incoming request I used gRPC to call function defined in server side. gRPC claims 7 times faster that REST when receiving data & roughly 10 times faster than REST when sending data for a specific payload.It has many other advantages, like

·         Easy to understand.
·         Loose coupling between clients/server makes changes easy.
·         Low latency, highly scale-able, distributed systems.
·         language independent.

Enough talking, lets jump into the actual implementation.
The design is very simple,
  •         Run a gRPC server inside the controller system. 
  •        Write a simple bash/shell script comprising specific deployment command  
  •        Run gRPC client in pipeline on every push

Let’s define a hook, the actual method that will be called remotely by gRPC client.

My gRPC hook:



gRPC uses Protocol Buffers as the interface description language, and provides features such as authentication, bidirectional streaming and flow control,blocking or non blocking bindings, and cancellation and timeouts.Protocol Buffers is the default serialization format for sending data between clients and servers.
Lets define a protobuff :

Using above description grps_tools will generate two classes _pb2_grpc.py and _pb2.py. Run the following command and generate gRPC classes.



$ python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. glb_hook.proto

Along with simplicity we must consider security. Therefore, generate self-signed certificate.


$ openssl req -newkey rsa:2048 -nodes -keyoutserver.key -x509 -days 365 -out server.crt


For more details visit gRPC documentation page and generate Server & client code.

Create deployment script and put that in same directory where gRPC server run


Configure client inside CI/CD to call deployment script remotely on every push. Once pipeline is ready, start server 

$ nohup python server.py &

At this point, every commit and push into the repository, pipeline will execute jobs.


Once client calls the function (that is defined inside the server) with a valid string value (command), a new process will be opened at server side, which is in this case, the script test.sh. If we have a look in to the server, we found Docker command pulling latest image and executing them in detach mode on a specific port. Once deployment is done, hit the URL and we'll see the change.



Tuesday, November 12, 2019

IoT


Studying Medical Data and analyzing them is Healthcare analytics which is improving human life span potentially by predicting various sign of diseases in advance. To protect our life, we do maintain quality of every intake. At the same time, we gone through different medical test periodically to check performance of inner system. Analysis excretion is among one of them. Excretion is the process that removes the waste products of digestion and metabolism from the body. It gets rid of by-products that the body is unable to use, many of which are toxic and incompatible with life. Data analysis of human body excretion gives various preventive medical information of an individual. A simple urinalysis is one way to find certain illnesses like Kidney diseases, Liver problem, Diabetes etc. in their earlier stages.

This article is not about the possibilities of capturing metrics by testing samples, rather than how we can make this test done automatically and alert individuals. Smart Sanitary System (sCube) is one way to accomplish that. Leave or release your body excretion publicly or privately, smart device will analysis that and report you.

Healthcare is one of the most important criteria for Smart City. Without proper health treatment and medication, a city will never be able to survive as a smart city. with the help of IoT it is possible to help people live smartly. Energy release by human body, weight gain or loss periodically, walking step analysis etc. can be done with the help of AI and IoT. There are potential opportunities for Health & Life insurance companies to serve their customer better and run the business more accurately.

For example, analysis done by the Iris iQ200 ELITE (İris Diagnostics, USA), Dirui FUS-200 (DIRUI Industrial Co., China) says that the degree of concordance between the two instruments was better than the degree of concordance between the manual microscopic method and the individual devices. Therefore, if sample can be analyzed automatically by instrument, collecting data and monitoring them would not be a challenge.

Site Reliability Engineering


It is assumed that DevOps philosophy has been adopted by every project at their own way. True implementation of DevOps is hidden in SRE - Site Reliability Engineering.

It seems every organization has its own SRE team in a fragmented form. Whenever there is an issue, we all jump into that and bring the business on track as per SLA. SRE talks about another two layers - SLI and SLO, which can be used as a filter of SLA. At any point of time, a particular matrix says Yes or No about system Health. These are all Service Level Indicators. Bindings targets of SLI is SLO. It never promises 100% availability of the site. Based on all these SLOs, Service Level Agreements are prepared transparently.

Transparently, because it accepts expectable risk – amount of failure we can have within our SLO. It is near to impossible to assure 100% availability, even if we provide service through our own fiber network, backbone and customized secure software. Due to least reliable component in the system we can grantee 100% availability all the time. Error Budget clearly shows minimum permissible loss beforehand. SRE expects failure is normal and determine how much failure we can tolerate. Error Budget helps to decide whether delivering new product quickly is important or Releasing reliable product/feature is our prime goal.

It has perfectly defined perhaps intended how to avoid Toil or operational overhead by discarding manual task so far possible. Manual, repetitive, automatable, tactical and devoid of long-term value are the characteristics of Overhead. Working manually by sitting in front of computer is not an intelligent decision. At the same time, investing 20Hrs to automate a single task which supposed to be done manually once in a month within 20 min, is not a wise idea, either.

Altogether, it seems, latter the service organization adopt SRE, sooner it will disappear from the market. Therefore, every organization should have a defined framework/model of SRE, if nothing as such is ready!! Experts says SRE is the class that implements the interface of DevOps. Case study on existing DevOps projects and implementing SRE on that can be represented as a POC.

SCM and evolution of DevOps


Software Configuration Management (SCM) is the application of Configuration Management (CM) principles in the context of Software Engineering (SE) in Computer Science (CS).

Software Configuration Management (SCM)  identifies and tracks the configuration of artifacts at various points in time and performs systematic control of changes to the configuration of artifacts for the purpose of maintaining integrity and traceability throughout the whole software life cycle.
It is essential for every project and applicable for every methodology of project Governance. To balance the demand of rapid software delivery and leveraging ROI, organizations started implementing automation everywhere. Domain and disciplines defined in SCM are categorized. Tools for every domain or for multiple disciplines were developed. All are interconnected as a Tool-chain that is expected to expedite software delivery.

In the era of rapid development and delivery, first we introduced Continuous Integration, though, before that, parallel development, source code merging, standardizing code commit by different hooks were in place. Many projects used customized script to integrate different software modules and packaged them together. A typical three team, three tire enterprise structure. Along with rapidness, automation also helps continuous improvement which is obvious. Over maturity of continuous integration (CI) we thought about Continuous Delivery (CD). Every project started talking about CI/CD but a few of them really succeed to Deliver Continuously. Integration of different software module is purely technical, however delivery involves many nontechnical, functional and client centric project related activities that demands agility, therefore, seems difficult to be continuous, so far monolithic application is concerned.

Merging Development, Operation and System Administration with the help of Automation introduced new concept called DevOps. The name reveals itself, Development+Operation, other than that there is no state forwards definition, rules, policies that guide us implementing the concept uniformly. Off course it has a principal, perhaps the goal to Developed as per requirement, Integrate, Deploy, Test automatically and getting Feedback for farther Improvement, then Release and finally Monitor for continuous operation. In background every change gets registered and ensure possibilities of rollback at any time. It seems like ITIL specified best practices of Service Lifecycle, however it is not a sequential framework, rather than it is Agile – an interactive approach where MVPs are passed through the life-cycle and delivered in a very short period. Altogether it’s an operation pipeline that accomplish the goal throughout a set of toolchains. 

Managing Automation and agility at the same time is not about simply letting loose a stream. To overcome the hurdle, monolithic application splits up into microservices. On the other hand, infrastructure becomes concise as a form of container. In combination of Microservices and Containerization organizations experiencing proven benefit of DevOps. Orchestration of huge containers is not a big hurdle today. Security and segregation mechanism imposed inside orchestration framework are simplifying coding complexity.

Principals of SCM are still being maintained silently inside DevOps engineering. Version Identification, Version Control, Artifact Versioning and Issue Tracking are essential disciplines of every software projects. Evolution of DevOps is continuing. Elimination or integration of tools name it differently, however core concept is automating SCM and IM to deliver rigorously Tested product in agile way.