Using DataFrames and SQL in Apache Spark 2.0

Authors | Dr. Christoph Schubert

Responsibility | Guo Yan

First Steps to Using DataFrames and SQL in Spark 2.0

One motivation for Spark 2.0 development is to make it accessible to a wider audience, especially data analysts or business analysts who lack programming skills but may be very familiar with SQL. Therefore, Spark 2.0 is now easier to use than ever before. In this section, I will introduce how to use Apache Spark 2.0. And focus on DataFrames as an untyped version of the new Dataset API.

To Spark 1.3, the Resilient Distributed Dataset (RDD) has been the main abstraction in Spark. The RDD API is modeled after the Scala collection framework, so indirectly provides familiar programming primitives for Hadoop Map / Reduce and common programming primitives for functional programming (Map, Filter, Reduce). Although the RDD API is more expressive than the Map / Reduce paradigm, expressing complex queries is still cumbersome, especially for users from typical data analysis backgrounds, who may be familiar with SQL or data frameworks from the R/Python programming language.

Spark 1.3 introduced DataFrames as a new abstraction at the top of the RDD. DataFrame is a collection of rows with named columns, modeled after the corresponding R and Python packages.

Spark 1.6 saw the Dataset class introduced as a typed version of DataFrame. In Spark 2.0, DataFrames are actually a special version of Datasets. We have type DataFrame = Dataset [Row], so the DataFrame and Dataset APIs are unified.

On the surface, DataFrame is like an SQL table. Spark 2.0 takes this relationship to the next level: We can use SQL to modify and query DataSets and DataFrames. By limiting the number of expressions, it helps to better optimize. The data set is also well integrated with the Catalyst optimizer, which greatly improves the execution speed of the Spark code. Therefore, new development should utilize DataFrames.

In this article, I will focus on the basic usage of DataFrames in Spark 2.0. I will try to emphasize the similarities between the Dataset API and SQL, and how to use the SQL and Dataset APIs to interrogate data interchangeably. With the entire code generation and Catalyst optimizer, both versions will compile the same efficient code.

Code examples are given in the Scala programming language. I think this code is the clearest, because Spark itself is written in Scala.

➤SparkSession

The SparkSession class replaces SparkContext and SQLContext in Apache Spark 2.0 and provides a unique entry point for Spark clusters.

For backward compatibility, the SparkSession object contains SparkContext and SQLContext objects, see below. When we use the interactive Spark shell, create a SparkSession object named spark for us.

Create DataFrames

DataFrame is a table with named columns. The simplest DataFrame is created using the SparkSession's range method:

Use show to give us a table representation of a DataFrame. You can use describe to get an overview of numeric properties. Describe returns a DataFrame:

Observe that Spark selects the name id for the only column in the data frame. For more interesting examples, consider the following data set:

In this case, the customerDF object will have columns named _1, _2, _3, _4, which in some way violate the purpose of the named column. You can restore by renaming columns:

Use printSchema and describe to provide the following output:

In general we will load data from a file. The SparkSession class provides the following methods:

Here we let Spark extract the header information from the first line of the CSV file (by setting the header option to true) and use the numeric types (age and total) to convert the numeric column to the corresponding data type inferSchema option.

Other possible data formats include parquet files and the possibility of reading data over a JDBC connection.

Basic data operations

We will now access the basic functionality of the data in the DataFrame and compare it with SQL.

Lineage, operations, actions, and code generation throughout the stage

The same pedigree concept, the difference between conversion operations and action operations applies to Datasets and RDDs. Most of the DataFrame operations we discuss below produce a new DataFrame but do not actually perform any calculations. To trigger the calculation, one of the action operations must be invoked, such as show (print the first row of the DataFrame as a table), collect (return an Array of Row objects), count (return the number of rows in the DataFrame), and foreach (for each row Apply a function). This is a common concept of lazy evaluation.

All of the following methods of the Dataset class actually rely on the Directed Acyclic Graph (DAG) of all datasets to create a new "dataset" from an existing dataset. This is called the data set's lineage. When using only the invoke operation, the Catalyst optimizer analyzes all transitions in the lineage and generates the actual code. This is called full-phase code generation and is responsible for the Dataset's performance improvements to RDD.

Row-line object

The Row class acts as a container in a row of DataFrame with no type data value. Normally we don't create the Row object ourselves. Instead we use the following syntax:

The Row object element is accessed either by location (starting from 0) or using apply:

It will produce an Any object type. Or better use get, one of the methods:

Because this will not appear the original type of overhead. We can use the isNull method to check if an entry in the line is 'null':

We now take a look at the most common conversion operations of the DataFrame class:

Select

The first transformation we will look at is "select", which allows us to project and transform the columns of a DataFrame.

Reference column

There are two ways to access a DataFrame column by their name: you can refer to it as a string, or you can use the apply method, the col- method, or the string as a parameter and return a Column object. So customerDF.col("customer") and customerDF("customer") are the first columns of customerDF.

Selecting and converting columns

The simplest form of select conversion allows us to project a DataFrame into a DataFrame containing fewer columns. The following four expressions return a DataFrame that contains only the customer and the province columns:

Mixed string and column parameters cannot be called in a single select method: customerDF.select("customer", $"province") results in an error.

Using the operator defined by the Column class, you can construct a complex column expression:

Apply show to get the following result:

Column alias

The column names of the new dataset are derived from the expressions used to create them. We can use alias or as to change the column names to other mnemonics:

Generates a DataFrame with the same content as before, but uses columns named name, newAge, and isZJ.

The Column class contains various effective methods for performing basic data analysis tasks. We will refer to the reader's documentation for more information.

Finally, we can use the lit function to add a column with a constant value and re-encode the column value using when and otherwise. For example, we add a new column "ageGroup", 1 if "age <20", 2 if "age <30", 3 otherwise, and the column "trusted" which is always "false":

Given the following DataFrame:

Drop is a select-relative conversion operation; it returns a DataFrame in which some columns of the original DataFrame have been removed.

Finally, you can use the distinct method to return a DataFrame with a unique value in the original DataFrame:

Returns a DataFrame containing a single column and three rows containing values: "Beijing", "Jiangsu", "Zhejiang".

Filter

The second DataFrame transformation is the Filter method, which selects in the DataFrame row. There are two overloaded methods: one accepts a Column and the other accepts a SQL expression (a String). For example, there are two equivalent ways to filter all customers older than 30 years old:

The Filter transformation accepts generic Boolean connectors, and (or) and or:

We use a single equal sign in the SQL version, or use the three-equation "===" (a method of the Column class). Using Scala's equal sign in the == operator causes an error. We refer to the useful methods in the Column class document again.

Aggregation

Performing an aggregation is one of the most basic tasks for data analysis. For example, we may be interested in the total amount of each order, or more specifically, the total or average amount of money for each province or age group. There may also be interest in knowing which customer's age group has a higher than average total. Borrowing SQL, we can use GROUP BY expressions to solve these problems. DataFrames provides similar functionality. It can be grouped according to the values ​​of some columns, and it can also be specified using a string or "Column" object.

The withColumn method adds a new column or replaces an existing column.

The aggregation data is divided into two steps: a call to the GroupBy method combines the rows with equal values ​​in a particular column, and then calls an aggregate function such as sum (summation value), max (maximum) or for each group of rows in the original DataFrame "avg" (average). Technically speaking, GroupBy will return an object of the RelationalGroupedDataFrame class. The RelationalGroupedDataFrame contains the max, min, avg, mean, and sum methods, all of which perform specified operations on the DataFrame's numeric columns, and can accept a String-parameter to restrict the numeric column being manipulated. In addition, we have a count method to calculate the number of rows in each group, and a generic agg method allows us to specify more general aggregate functions. All of these methods return a DataFrame.

customerAgeGroupDF.groupBy("agegroup").max.show output:

Finally, customerAgeGroupDF.groupBy("agegroup").min("age", "total").show output:

There is also a generic agg method that accepts complex column expressions. Agg is available in RelationalGroupedDataFrame and Dataset. The latter method performs aggregation on the entire data set. Both of these methods allow us to give a list of column expressions:

The available aggregate functions are defined in org.apache.spark.sql.functions. The RelationalGroupedDataset class is called "GroupedData" in Apache Spark 1.x. Another feature of RelationalGroupedDataset is that you can pivot on some column values. For example, the following content allows us to list the total number of each age group:

The null value indicates no combination of province/age groups. The overloaded version of Pivot accepts a list of values ​​for perspective. This aspect allows us to limit the number of columns, on the other hand is more effective, because Spark does not need to calculate all the values ​​in the pivot column. E.g:

Finally, using hub data can also be a complex aggregation:

Here =!= is the "not equal to" method of the Column class.

Sorting and Limiting

The OrderBy method allows us to sort the contents of a dataset based on some columns. As before, we can use Strings or Column objects to specify the columns: customerDF.orderBy("age") and customerDF.orderBy($"age") give the same result. The default sort order is ascending. If you want to sort in descending order, you can use the desc method or desc function of the Column class:

Observe that the desc function returns a Column-object, any other column also needs to be specified as a Column-object.

Finally, the limit method returns a DataFrame containing the first n rows in the original DataFrame.

➤DataFrame method and SQL contrast

We have found that the basic methods of the DataFrame class are closely related to the parts of the SQLselect statement. The following table summarizes this correspondence:

Joins so far have been missing in our discussion. Spark's DataFrame supports connections, which we discuss in the next part of the article.

Fully-typed DataSets APIs, connections, and user-defined functions (UDFs) are discussed below.

➤ Use SQL to Process DataFrames

We also execute SQL statements directly in Apache Spark 2.0. SparkSession's SQL method returns a DataFrame. In addition, the DataFrame's selectExp method also allows us to specify a SQL expression for a single column, as shown below. In order to be able to reference a DataFrame in an SQL expression, it is first necessary to register the DataFrame as a temporary table, and in Spark 2 as a temporary view (referred to as tempview). DataFrame provides us with the following two methods:

createTempView creates a new view, throwing an exception if a view with that name already exists;

createOrReplaceTempView creates a temporary view to replace.

Both methods have the view name as the only parameter.

After the registry, you can use the SparkSession's SQL method to execute the SQL statement:

Returns a DataFrame with the following content:

The catalog field of the SparkSession class is an object of the Catalog class and has a variety of methods for handling the session registries and views. For example, the ListTables method of the Catalog returns a Dataset containing all the registry information:

A data set containing information about the columns in the "tableName" registry is returned, for example:

In addition, you can use the SelectExpr method of the DataSet to perform some single-column SQL expressions, such as:

Both produce DataFrame objects.

First step conclusion

We hope to convince readers that the unity of Apache Spark 2.0 can provide Spark learning curve for SQL familiar analysts. The next section will further introduce the use of typed Dataset APIs, user-defined functions, and connections between Datasets. In addition, we will discuss the usage defects of the new Dataset API.

The second step to use DataFrames and SQL in Spark 2.0

The first part of this article uses the untyped DataFrame API, where each row represents a Row object. In the following content, we will use the updated Dataset API. Dataset was introduced in Apache Spark 1.6 and has been unified using DataFrames in Spark 2.0. We now have type DataFrame = Dataset [Row], where the square brackets ([and] the generic type in Scala are therefore similar For Java and <). Therefore, all the methods discussed above such as select, filter, groupBy, agg, orderBy, limit, etc. are used in the same way.

➤ Datasets: Return Type Information

The Spark 2.0 pre-DataFrame API is essentially an untyped API, which means that it is very likely that some compiler errors will result in inaccessible type information during compilation.

As before, we will use Scala in our example because I believe Scala is the most concise. Examples that may be involved: Spark will represent a SparkSession object that represents our Spark cluster.

Example: Analyzing Apache access logs

We will use Apache to access log format data. Review the typical lines in the Apache log first, as follows:

This line contains the following sections:

127.0.0.1 is the IP address (or host name, if available) of the client (remote host) making the request to the server;

The first in the output - indicates that the requested information (user identity from the remote machine) is not available;

The second in the output - indicates that the requested information (user identity from local login) is not available;

[01/ Aug/1995:00:00:01 -0400] indicates the time when the server completed the processing of the request, the format is: [day/month/year:hour:minute:second time zone], there are three parts:" GET /images/ Launch-logo.gif HTTP / 1.0";

Request methods (eg, GET, POST, etc.)

Endpoint (Uniform Resource Identifier);

And the client protocol version ('HTTP/1.0').

1.200 This is the status code returned by the server to the client. This information is very valuable: successful replies (codes starting with 2), redirects (codes starting with 3), errors caused by clients (codes starting with 4), server errors (codes starting with 5). The last entry indicates the size of the object returned to the client. If nothing is returned then it is - or 0.

The first task is to create the appropriate type to hold the log line information, so we use Scala's case class as follows:

By default, case class objects are immutable. Equality is compared by their values, not by comparing object references.

After defining the appropriate data structure for the log entry, you now need to convert the String representing the log entry to an Apache Log object. We will use regular expressions to achieve this, refer to the following:

You can see that the regular expression contains 9 capture groups that represent the fields of the Apache Log class.

When using regular expressions to parse access logs, you face the following issues:

Some log lines have a size of -, we want to convert it to 0;

Some log lines do not conform to the format given by the selected regular expression.

To overcome the second problem, we use Scala's "Option" type to discard the wrong format and confirm it. Option is also a generic type. Objects of type Option[ApacheLog] can have the following forms:

None indicates that there is no value (in other languages, null may be used);

Some(log)for a ApacheLog-objectlog.

The following is a one-line function parsing and returns None for non-parsable log entries:

The best way is to modify the regular expression to capture all log entries, but Option is a common technique for handling common errors or unresolvable entries.

Taken together, we will now analyze a real data set. We will use the famous NASA Apache access log data set, which can be downloaded at ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz.

After downloading and unzipping the file, first open it as a String dataset, then use regular expression parsing:

Open the text file with the spark.read.text method and return a DataFrame, which is the textfile line. Use the Dataset's as method to convert it to a Dataset object containing Strings (instead of the Rows contains a string) and import spark.implicits._ to allow the creation of a Dataset containing a string or other primitive type.

flatMap applies the parse_logline function to each row of rawData, and collects all results in the form of Some(ApacheLog) into apacheLogs, discarding all unresolvable log lines (all results in the form None).

We can now perform analysis on "datasets" just like on "DataFrame". The column names in the Dataset are just the field names of the Apache Log case class.

For example, the following code prints 10 endpoints that generate up to 404 responses:

As mentioned earlier, you can register a Dataset as a temporary view and then use SQL to execute the query:

The above SQL query has the same result as the above Scala code.

User defined function (UDF)

In Spark SQL, we can use a wide range of functions, including functions for processing dates, basic statistics, and other mathematical functions. The build of Spark in a function is defined in the org.apache.spark.sql.functions object.

As an example, we use the following function to extract the top-level domain of the host name:

If you want to use this function in your SQL query, you need to register first. This is achieved through the SparkSession's udf object:

The last underscore after the function name converts the extractTLD to a partially applied function, which is necessary. If omitted, it will cause an error. The register method returns a UserDefinedFunction object that can be applied to a column expression.

Once registered, we can use the extractTLD in the SQL query:

To get an overview of the registered user-defined functions, you can use the listFunctions method of the spark.catalog object, which returns all the functions of the DataFrame defined by the SparkSession:

Note that Spark SQL follows the usual SQL convention, which is case insensitive. That is, the following SQL expressions are all valid and equivalent to each other: select extractTLD(host) from apacheLogs, select extracttld(host) from apacheLogs, "select EXTRACTTLD(host) from apacheLogs". The function name returned by spark.catalog.listFunctions will always be lowercase.

In addition to using UDFs in SQL queries, we can also apply them directly to column expressions. The following expression returns all requests in the .net domain:

It is worth noting that, contrary to Spark's construction in methods such as filter, select, and so on, user-defined functions take only column expressions as arguments. Writing extractTLD_UDF("host") will result in an error.

In addition to registering UDFs in a directory and using them in Column expressions and SQL, we can also register a UDF using the udf function in the org.apache.spark.sql.functions object:

After registering a UDF, you can apply it to a Column expression (such as a filter) as follows:

However, it cannot be used in SQL queries because it has not been registered by name.

The Catalyst optimizer in Spark optimizes all queries involving datasets and treats user-defined functions as black boxes. It is worth noting that when the filter operation involves a UDF, the filter operation may not be "push down" before the connection. We illustrate by the following example.

In general, it doesn't depend on UDFs but it's better to combine operations from built-in "Column" expressions.

Join us

Finally, we will discuss how to use the following two Dataset methods to connect the data set:

Join returns a DataFrame

joinWith returns a pair of Datasets

The following example connects two tables 1, 2 (from Wikipedia):

Table 1 Employee

Table 2 Department

Define two case classes, encode the two tables as a sequence of case class objects (not shown for space reasons), and finally create two Dataset objects:

In order to perform an internal isochronous connection, simply provide the column name to be connected as a "String":

Spark automatically deletes the double columns, and join.show gives the following output:

Table 3 output

In the above, joined is a DataFrame and is no longer a Dataset. The line joining the dataset can be given as the Seq column name, or it can specify the type of equi-join (inner, outer, left_outer, right_outer, or leftsemi) to perform. If you want to specify the connection type, you need to use the Seq notation to specify the column to connect to. Note that if you perform an inner join (for example, to get a pair of all employees working in the same department): employees.join(employees,Seq("depID")), we have no way to access the connected DataFrame column: employees.join ( Employees, Seq("depID")).select("lastname") will fail with duplicate column names. The way to handle this situation is to rename some columns:

In addition to equal connections, we can also give more complex join expressions, such as the following query, which connects all departments to employees who do not know the department ID and do not work in this department:

You can then perform a Cartesian join between two Datasets without specifying any join conditions: departments.join(employees).show.

Finally, the dataset's joinWith method returns a Dataset that contains Scala tuples that match the rows in the original dataset.

Table 4 returns dataset

This can be used to circumvent the problem of inaccessible columns after the connection.

The Catalyst optimizer attempts to "push down" the "filter" operations to optimize as many connections as possible, so they are executed before the actual connection.

For this work, user-defined functions (UDFs) should not be used within connection conditions because these are processed by Catalyst as black boxes.

Conclusion

We have discussed the use of the typed Dataset API in Apache Spark 2.0, how to define and use user-defined functions in Apache Spark, and the danger of doing so. The main difficulty with using UDFs is that they are considered by the Catalyst Optimizer as black boxes.

Author: MA Xiao (Dr. Christoph Schubert), Zhejiang University of Finance data analysis and big data computing professor. After receiving a Ph.D. in mathematics from the University of Bremen in Germany in 2006, he began his research and teaching work at the Dortmund University Software Engineering Institute until 2011 in China. His research focuses on big data technology and NoSQL databases and functional planning and stochastic computing models and modal logic. He is also chairman of the International Big Data Analysis Conference.

PS: Another CSDN Spark user WeChat group, please add WeChat guorui_1118 and note the company + real name + job application into the group.

This article is "Programmer" original article, please do not reprint without permission, more exciting articles please [read original] Subscribe to "programmer".

15 Inch POS

It is a terminal reader equipped with barcode or OCR code technology, with cash or barter limit cashier function. Its main task is to provide data services and management functions for commodity and media transactions, as well as non-cash settlement. POS is a multi-function terminal. It can be installed in the special merchants and acceptance outlets of credit cards to form a network with the computer, and the automatic transfer of electronic funds can be realized. It has the functions of supporting consumption, pre-authorization, balance inquiry and transfer. Safe, fast and reliable. It is difficult to obtain basic business information in bulk transactions. The introduction of the POS system is mainly to solve the blind spot of information management in the retail industry. An important part of the chain store management information system.

15 Inch Pos,Mobile Pos Machine,Pos Swipe Machine,Debit Card Swipe Machine

ShengXiaoBang(GZ) Material Union Technology Co.Ltd , https://www.sxbgz.com

Posted on