spooq

SPOOQ Logo

Table of Contents

  1. Overview
  2. Getting Started
  3. Play with the tool using Docker
  4. How does it work?
  5. Stream Processing
  6. Interactive Mode
  7. Thrift Server (Experimental)
  8. Reference Documentation
    1. Configuration Overview
    2. Steps Kind
    3. Launch Parameters
  9. Rest API
  10. Download
  11. How to compile the code
  12. Cookbook

Overview

Spooq is an ETL Big Data tool based on the Apache Spark framework that simplifies its use through the ability to implement data pipelines using a declarative approach based on simple configuration files and expressing transformations primarily through SQL.

The name is clearly inspired by the Apache Sqoop project, of which the tool is proposed as a replacement capable of exploiting the potential of Spark instead of the original tool’s engine, which was instead based on Hadoop’s MapReduce.

Spooq unlike its “predecessor” is capable, by leveraging the capabilities of Apache Spark, of:

Getting Started

For the impatient ones, here is a quick guide to get started with the tool quickly. In keeping with tradition we start with the classic “hello world” example (don’t worry if it doesn’t make much sense from a functional point of view, it serves to understand the basic philosophy of the tool).

The tool takes as input a configuration file (HOCON, JSON or YAML format can be used) with some identifying information about the flow and a sequence of processing steps. For example, using the following configuration:

id = "helloWorld"
desc = "sample 'hello world' job"

steps = [
{
    id = hello
    shortDesc = "execute 'hello world' sql query"
    kind = sql
    sql = "select 'hello world!' as message"
    show = true
}
]

Launching the application we will get the following output: Asciinema Watch in Asciinema

Okay, now let’s try to do something more useful. For example, a CSV format import of a Postgres table by connecting via JDBC:

id = "sample job"
desc = "sample spooq job that ingest data from database table through jdbc connection"

steps = [
{
    id = customers
    shortDesc = "load from jdbc"
    desc = "load 'customer' table from postgres database"
    kind = input
    format = jdbc
    options = {
        url = "jdbc:postgresql://kandula.db.elephantsql.com:5432/wllbjgnv"
        driver = "org.postgresql.Driver"
	dbtable = "public.customer"
        user = "wllbjgnv"
        password = "**********"
        numPartitions = "1"
    }
    cache = true
    show = true
},
{
      id = out
      shortDesc = "write to fs"
      dependsOn = ["customers"]
      desc = "write 'customer' table to fs using csv format"
      kind = output
      source = customers
      format = csv
      options = {
        header = "true"
      }
      mode = overwrite
      path = "/tmp/customer.csv"
},
]

Please note: in order to connect via jdbc to Postgres we must have the connector available in the classpath. We will explain this step in more detail later.

Let us now run the tool with the above configuration file: Asciinema Watch in Asciinema

Let’s try adding a data preparation step (we can use as many as we need):

id = "sample job"
desc = "sample spooq job that ingest data from database table through jdbc connection"
steps = [
{
    id = customers
    shortDesc = "load from jdbc"
    desc = "load 'customer' table from postgres database"
    kind = input
    format = jdbc
    options = {
        url = "jdbc:postgresql://kandula.db.elephantsql.com:5432/wllbjgnv"
        driver = "org.postgresql.Driver"
	dbtable = "public.customer"
        user = "wllbjgnv"
        password = "**********"
        numPartitions = "1"
    }
    cache = true
},
{
    id = prepared
    shortDesc = "filter and prepare output"
    kind = sql
    sql = "select customer_id,first_name,last_name,email from customers"
    show = true
},
{
      id = out
      shortDesc = "write to fs"
      desc = "write 'customer' table to fs using csv format"
      kind = output
      source = prepared
      format = parquet
      mode = overwrite
      path = "/tmp/customer.parquet"
},
]

Let’s launch the tool again using the new configuration: Asciinema Watch in Asciinema

Play with the tool using Docker

The best way to start testing the tool is to use the prepackaged Docker image by following these simple steps:

  1. In a folder of your choice create two subfolders named: conf and data;
  2. Inside the conf folder create a hello.conf file with the following content ```hocon id = “helloWorld” desc = “sample ‘hello world’ job”

steps = [ { id = hello shortDesc = “execute ‘hello world’ sql query” kind = sql sql = “select ‘hello world!’ as message” show = true }, { id = out shortDesc = “sample output” kind = output source = hello format = json mode = overwrite path = /opt/spooq/data/hello.json } ]


3. Launch the docker image with the following command:
```bash
docker run -v $(pwd)/conf:/opt/spooq/conf -v $(pwd)/data:/opt/spooq/data -it mcartia/spooq -c conf/hello.conf
  1. If everything went smoothly you should find the job output in the data/hello.json/ directory. Congratulations!

Please note: You can pass any arguments supported by spooq. The application will run locally on a standalone Spark installation embedded in the docker image.

It is possible to load additional dependencies by setting the SPOOQ_PACKAGES environment variable. For example, if we wanted to launch the application by loading the jdbc postgres connector it would be enough to use:

docker run -v $(pwd)/conf:/opt/spooq/conf -v $(pwd)/data:/opt/spooq/data -e SPOOQ_PACKAGES=org.postgresql:postgresql:42.4.0 -it mcartia/spooq -c conf/your.conf

(to load multiple dependencies just separate them with a comma ,)

How does it work?

The tool is nothing more than a generic Spark application that can be launched in standalone mode (master=local[*] via a fat-jar containing within it the Apache Spark framework and other libraries) or on a cluster via spark-submit.

It is also possible to use the tool as a library within other spark applications or from spark-shell: Asciinema Watch in Asciinema

In this mode we will receive as output an object with all the dataframes and variables created during the execution of the job defined in the configuration used so we can continue to process it easily interactively in the REPL.

The downloadable distribution also contains sample launch scripts in the various modes. This is the ones that make use of spark-submit:

#!/bin/bash
source $SPOOQ_HOME/bin/loadenv

JAR=$SPOOQ_HOME/lib/spooq-spark3.jar
MAIN=com.github.supermariolabs.spooq.Application

# Example
# SPOOQ_PACKAGES=org.postgresql:postgresql:42.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0

if [ -n "$SPOOQ_PACKAGES" ]
then
    echo "PACKAGES_CMD=--packages $SPOOQ_PACKAGES"
    PACKAGES_CMD="--packages $SPOOQ_PACKAGES"
else
    PACKAGES_CMD=
fi

ARGS="$@"

if [ -z "$JAVA_HOME" ]
then
    echo "JAVA_HOME not defined!"
else
    echo "Using JAVA_HOME=$JAVA_HOME"
fi

if [ -z "$SPARK_HOME" ]
then
    echo "SPARK_HOME not defined!"
else
    echo "Using SPARK_HOME=$SPARK_HOME"
    $SPARK_HOME/bin/spark-submit \
	--class $MAIN \
	--master local[*] \
	--conf spark.executor.extraJavaOptions=-Dlog4j.configurationFile=$SPOOQ_HOME/conf/log4j2.properties \
	--conf spark.driver.extraJavaOptions=-Dlog4j.configurationFile=$SPOOQ_HOME/conf/log4j2.properties \
	$PACKAGES_CMD \
	$JAR \
	$ARGS
fi

As you can see, you can use the system to load dependencies from maven-compatible repositories using the --packages (and --repositories) option of the spark-submit command!

Stream Processing

The framework also supports the Structured Streaming API via step blocks with input-stream and output-stream kind. Let us look at an example and also introduce the use of UDFs to enrich the already large collection of built-in SQL functions provided by Spark:

id = "sample streaming job"

steps = [
{
    id = stream
    shortDesc = "generate fake stream"
    kind = input-stream
    format = rate
    options = {
        rowsPerSecond = "2"
    }
},
{
    id = randomCustomer
    shortDesc = "load sample udf"
    kind = udf
    claz = com.github.supermariolabs.spooq.udf.example.FakeCustomerUDF
},
{
    id = enriched
    shortDesc = "enrich stream using sql and udf"
    kind = sql
    sql = "select customer.* from (select randomCustomer(value) customer from stream)"
},
{
    id = outStream
    source = enriched
    shortDesc = "stream dump"
    kind = output-stream
    format = console
    outputMode = "append"
    trigger = {
                policy = "processingTime"
                value = "10 seconds"
    }
}
]

Which once launched will produce: Asciinema Watch in Asciinema

Interactive Mode

Interactive mode allows you to be able to execute SQL queries once the pipeline execution is finished. This mode is very useful during pipeline development and debugging as well as for being able to perform interactive analysis of data from multiple sources using Spark’s distributed SQL engine.

This mode is triggered by the use of the --interactive (or -i) switch when launching the application. Let’s use this simple configuration:

id = "sample job"
steps = [
{
    id = customers
    shortDesc = "load customer.csv file"
    kind = input
    format = csv
    schema = "customer_id int ,store_id int ,first_name string,last_name string,email string,address_id string,activebool string,create_date string,last_update string,active string"
    options = {
        header = "true"
    }
    path = "/tmp/customer.csv"
    cache = true
}]

we will get: DBeaver Community Watch in Asciinema

Thrift Server (Experimental)

An experimental feature allows a built-in thrift server to be started (on a configurable port, default: 10001) and use a client via the standard Hive JDBC driver to be able to make SQL queries on the views (temporary tables) created during processing.

For example, you can use the opensource DBeaver Community client: DBeaver

Reference Documentation

Configuration Overview

Configuration files can be written in HOCON, JSON or YAML format. Decoding is implemented through the use of the Circe library. When the format is not specified as a launch parameter, the application tries to infer it from the file extension.

All configuration blocks (including the root) have an identifier (id, mandatory) a short description (shortDesc, optional) and an extended description (desc, optional) and a list of steps (mandatory but can be empty). The following is an example of a valid minimal configuration in HOCON format:

id = minimal
shortDesc = "Minimal configuration example"
desc = "This example configuration does nothing but is formally valid"
steps = []

Which will produce: Minimal configuration

Steps Kind

input

A step with kind input loads a DataFrame starting from a data source natively supported by Spark or using a third-party connector.

For each DataFrame a corresponding temporary view is also created whose name is the id of the corresponding step block. In this way it will be possible to reference the same in subsequent blocks within SQL queries.

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
   {
      id = customers
      shortDesc = "load customer.csv file"
      kind = input
      format = csv
      schema = "customer_id int ,store_id int ,first_name string,last_name string,email string,address_id string,activebool string,create_date string,last_update string,active string"
      options = {
         header = "true"
      }
      path = "/tmp/customer.csv"
      cache = true
      show = true
   },
   #...
   {
      id = jdbc
      shortDesc = "load from jdbc"
      desc = "load 'customer' table from postgres database"
      kind = input
      format = jdbc
      options = {
         url = "jdbc:postgresql://kandula.db.elephantsql.com:5432/wllbjgnv"
         driver = "org.postgresql.Driver"
         dbtable = "public.customer"
         user = "wllbjgnv"
         password = "F-pOL8v410XRmLrC43PCKlazY_-cT11k"
         numPartitions = "1"
      }
      cache = true
      show = true
   }
]

customInput

A step with kind customInput creates a dataframe and the corresponding temporary view executing custom code defined in a class that extends the customInputStep trait:

package com.github.supermariolabs.spooq.etl

import com.github.supermariolabs.spooq.model.Step
import org.apache.spark.sql.DataFrame

trait CustomInputStep extends Serializable {
  def run(dfMap: Map[String, DataFrame], variables: Map[String, Any], args : Map[String,String], customInputStep : Step): DataFrame
}

In our example we developed a custom input step that reads from api response json authenticated by Oauth (experimental):

steps = [
{
    id = exampleRestSource
    shortDesc = "load a json response from a REST api"
    kind = customInput
    format = json
    options = {
        api_rest_authentication_body = "{\"client_id\": \"12345678\",\"client_secret\": \"12345678\",\"grant_type\": \"client_credentials\"}",
        api_rest_method = "POST",
        multiline = "true",
        header = "true",
        api_rest_body = "{\"flowDate\": \"2022-04-04\",\"hours\": [14,15],\"zones\": [\"CNOR\",\"CSUD\"],\"statuses\": [\"null\",\"REP\",\"SENT_OK\"]}",
        api_rest_authentication_host = "https://exampleHost.com/oauth/token"
    }
    claz = com.github.supermariolabs.spooq.etl.RestApiStep
    path = "https://exampleHostToDoTheCallTo.com/examplePath"
    cache = true
    show = true
},
#...

input-stream

A step with kind input-stream loads a (streaming) DataFrame starting from a data source natively supported by Spark or using a third-party connector. The feature uses Spark’s Structured Streaming API.

For each (streaming) DataFrame a corresponding temporary view is also created whose name is the id of the corresponding step block. In this way it will be possible to reference the same in subsequent blocks within SQL queries.

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
{
    id = stream
    shortDesc = "generate fake stream"
    kind = input-stream
    format = rate
    options = {
        rowsPerSecond = "2"
    }
},
#...
   {
      id = kafka
      shortDesc = "kafka topic input"
      kind = input-stream
      format = kafka
      options = {
         "kafka.bootstrap.servers" = "localhost:9092"
         subscribe = "spooq"
         includeHeaders = "true"
      }
   }
]

sql

A step with kind sql creates a dataframe (in batch or streaming mode) and the corresponding temporary view starting from an SQL query that makes use of dataframes previously created in other blocks (input, input-stream or other sql blocks).

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
{
    id = customers
    shortDesc = "load customer.csv file"
    kind = input
    format = csv
    options = {
        header = "true"
    }
    path = "/tmp/customer.csv"
    cache = true
    show = true
},
{
    id = filter
    shortDesc = "filter customers"
    kind = sql
    sql = "select * from customers where substr(first_name,0,1)='B'"
    show = true
},
{
    id = prepared
    shortDesc = "prepare output"
    kind = sql
    sql = "select first_name, upper(last_name), email from filter"
    show = true
},
#...

variable

A step with kind variable creates a variable that is put into the variables map and can be referenced in sql blocks through placeholders.

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
  {
    id = helloVar
    kind = variable
    sql = "select 'hello world!'"
    show = true
  },
  {
    id = hello
    shortDesc = "execute hello world sql query"
    kind = sql
    sql = """select '#{variables.helloVar}' as message"""
    show = true
  },
#...

script (experimental)

A step with kind script creates a dataframe and the corresponding temporary view evaluating a script interpreted by a JSR 223 engine.

Inside the snippet we will be able to use the variables sc (SparkContext), spark (SparkSession), logger (Logger slf4j), all the dataframes and variables created in the previous blocks (referenced using the id as a name).

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

import org.apache.spark.sql.DataFrame

trait SimpleStep extends Serializable { def run(dfMap: Map[String, DataFrame], variables: Map[String, Any]): DataFrame }


The properties supported by this type of block (in addition to the common ones `id`, `shortDesc` and `desc`) are:
- `claz` class name including package
- `cache` whether to apply dataframe caching (N.B. lazy as default on Spark)
- `show` whether to display a diagnostic dataframe data sample

  Examples:
```hocon
steps = [
{
    id = customers
    shortDesc = "load customer.csv file"
    kind = input
    format = csv
    schema = "customer_id int ,store_id int ,first_name string,last_name string,email string,address_id string,activebool string,create_date string,last_update string,active string"
    options = {
        header = "true"
    }
    path = "/tmp/customer.csv"
    cache = true
    show = true
},
{
    id = custom
    shortDesc = "custom step"
    kind = custom
    claz = com.github.supermariolabs.spooq.etl.SampleStep
},
#...

udf

A step with kind udf registers a custom user defined function that can be used inside subsequent sql blocks.

The function must be implemented (and available in the classpath at runtime) by extending the SimpleUDF trait:

package com.github.supermariolabs.spooq.udf

import org.apache.spark.sql.expressions.UserDefinedFunction

trait SimpleUDF extends Serializable {
   val udf: UserDefinedFunction
}

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
{
    id = stream
    shortDesc = "generate fake stream"
    kind = input-stream
    format = rate
    options = {
        rowsPerSecond = "2"
    }
},
{
    id = randomCustomer
    shortDesc = "load sample udf"
    kind = udf
    claz = com.github.supermariolabs.spooq.udf.example.FakeCustomerUDF
},
{
    id = enriched
    shortDesc = "enrich stream using sql and udf"
    kind = sql
    sql = "select customer.* from (select randomCustomer(value) customer from stream)"
},
#...

output

A step with kind output writes a DataFrame to any data source natively supported by Spark or using a third-party connector.

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
{
    id = customers
    shortDesc = "load customer.csv file"
    kind = input
    format = csv
    schema = "customer_id int ,store_id int ,first_name string,last_name string,email string,address_id string,activebool string,create_date string,last_update string,active string"
    path = "/tmp/customer.csv"
    cache = true
    show = true
},
{
    id = filter
    shortDesc = "filter customers"
    kind = sql
    sql = "select * from customers where substr(first_name,0,1)='B'"
    show = true
},
{
    id = prepared
    shortDesc = "prepare output"
    kind = sql
    sql = "select first_name, upper(last_name), email from filter"
    show = true
},
{
      id = out
      shortDesc = "write to fs"
      dependsOn = ["filter"]
      desc = "write filtered customers table to fs using json format"
      kind = output
      source = prepared
      format = json
      mode = overwrite
      path = "/tmp/customer.json"
},
#...

output-stream

A step with kind output-stream creates a streaming DataFrame that writes to a data source natively supported by Spark or using a third-party connector. The feature uses Spark’s Structured Streaming API.

The properties supported by this type of block (in addition to the common ones id, shortDesc and desc) are:

Examples:

steps = [
{
    id = stream
    shortDesc = "generate fake stream"
    kind = input-stream
    format = rate
    options = {
        rowsPerSecond = "2"
    }
},
{
    id = randomCustomer
    shortDesc = "load sample udf"
    kind = udf
    claz = com.github.supermariolabs.spooq.udf.example.FakeCustomerUDF
},
{
    id = enriched
    shortDesc = "enrich stream using sql and udf"
    kind = sql
    sql = "select customer.* from (select randomCustomer(value) customer from stream)"
},
{
    id = outStream
    source = enriched
    shortDesc = "stream dump"
    kind = output-stream
    format = console
    outputMode = "append"
    trigger = {
                policy = "processingTime"
                value = "10 seconds"
    }
},
#...

Launch Parameters

The command parameters supported by the application are:

Rest API

You can interact with spooq also with rest api (remember to pass –http option when tou launch spooq): Example:

./spooq/bin/spooq -c 'spooq/conf/example.conf' --http

Below are all the Rest APIs, with examples of how to call them (4242 is the default port the service is listening to):

Add a step

curl -X POST localhost:4242/step

request body (json):

{
      "id":"orders_spooq",
      "desc":"load orders_spooq.csv file",
      "kind":"input",
      "format":"csv",
      "options": {
         "header":"true"
      },
      "path":"s3a://test/poc/orders_spooq.csv",
      "cache":true
}

Cache a dataframe

curl -X GET localhost:4242/cache/{dataframeName}

path variable:

dataframeName: name of the dataframe you want to cache

Unpersist a dataframe

curl -X GET localhost:4242/unpersist/{dataframeName}

path variable:

dataframeName: name of the dataframe you want to unpersist

Delete a dataframe

curl -X DELETE localhost:4242/step/{dataframeName}

path variable:

dataframeName: name of the dataframe you want to delete

Download

Spooq 0.9.9beta Spark3 (scala 2.12)

Spooq 0.9.9beta Spark3 (scala 2.12) standalone

Spooq 0.9.9beta Spark2 (scala 2.11)

Spooq 0.9.9beta Spark2 (scala 2.11) standalone

How to compile the code

To compile and create a custom build you can use sbt. In particular it is possible to compile the software with Scala 2.11 for Spark 2.x and with Scala 2.12 for Spark 3.x.

It is also possible to build the software with or without the Spark embedded libraries (depending on whether you want to use it standalone or on a cluster with spark-submit).

Examples:

# compile using Scala 2.11/Spark 2.x building with spark provided dependencies
$ sbt -Dbuild.spark.version=2 configString assembly

# compile using Scala 2.12/Spark 3.x building fat jar (standalone)
$ sbt -Dbuild.spark.version=3 -Dstandalone=true configString assembly

...

Cookbook

TODO