Scala Fun : Create your custom config

Ani
4 min readJun 3, 2022

Work hard and figure out how to be useful and don’t try to imitate anybody else’s success. Figure out how to do it for yourself with yourself.
Harrison Ford

DYI

Being a data engineer, I try to find fun to create small utils to overcome repetitive work. Of course there are many alternatives in the market open sources or freebees.

I was doing some research on my own machine with Kafka and for that I was loading multiple files into several Kafka topics with different files and combinations. Wondering how I can make such a config that can help me to do that and without invoking spark.

My love for Case classes helped me with that. For the people out there if you are not aware of Case classes in scala so here it is.

A Case Class is just like a regular class, which has a feature for modeling unchangeable data. It is also constructive in pattern matching. It has been defined with a modifier case, due to this case keyword, we can get some benefits to stop oneself from doing a sections of codes that have to be included in many places with little or no alteration. As we can see below a minimal case class needs the keyword case class, an identifier, and a parameter list which may be vacant.

Here is the output. Look how easy to access the elements and how easy it is to create.

The book name is : Harry Potter and the Deathly Hallows which was written by : J. K. Rowling published in : 2007Process finished with exit code 0

Now get back to the config stuff I was talking about, it is a great tool to have. As I mentioned before, the task was to load data in different kafka topics with different parameters such as they have different numbers of partitions and many more. I have cut that short for this article.

{
"pipeline": {
"pipeline_name": "kafka-tutorial"
},
"kafka_setup": {
"kafka_connection": {
"kafkaHost": "127.0.0.1",
"kafkaPort": "9092"
},
"kafka_topic_setup": [
{
"kafka_topic_name": "spark-streaming-1",
"topic_replication_factor": 1,
"topic_partitions": 3,
"kafka_topic_test_data": "src/main/resources/StructuredStreamingData/stream_input_1.csv"
},
{
"kafka_topic_name": "spark-streaming-2",
"topic_replication_factor": 1,
"topic_partitions": 3,
"kafka_topic_test_data": "src/main/resources/StructuredStreamingData/stream_input_3.csv"
},
{
"kafka_topic_name": "spark-streaming-3",
"topic_replication_factor": 1,
"topic_partitions": 5,
"kafka_topic_test_data": "src/main/resources/StructuredStreamingData/stream_input_2.csv"
},
{
"kafka_topic_name": "spark-streaming-4",
"topic_replication_factor": 1,
"topic_partitions": 6,
"kafka_topic_test_data": "src/main/resources/StructuredStreamingData/stream_input_4.csv"
}
]
}
}

To impose a data structure on this JSON file that I framed for the configuration I created, what? Yeah, a case class with a top parent along with it’s nested parents. Just like the image below. You already have figured out that I made the Kafka topic set up as an array of lists, a list of kafka properties and the file to be loaded.

Now for the reading par I am using ObjectMapper() from com.fasterxml.jackson.databind and also to accept single value as array using DeserializationFeature with ACCEPT_SINGLE_VALUE_AS_ARRAY config feature. This helps a lot.

Now here is a simple method to read the config file and which will impose the case class struct/schema or what you say on that. Also notice that the method return type is of KafkaConfig a.k.a our case class for the config.

Now it is easy! I can access all the elements which are on top level one one go.

val data = mapperConfig(configFile)
val kafka_topic_setup = data.kafka_setup.kafka_topic_setup
val kafka_connection_details = data.kafka_setup.kafka_connection

To insert values to the kafka topics I have a method here.

/** Write kafka topics method */
def writeToKafkaTopics(fileDf: DataFrame, topicName: String, Host: String, Port: String) = {
fileDf
.select(to_json(struct("*")).alias("value"))
.write
.format("kafka")
.options(Map("topic" -> topicName, "kafka.bootstrap.servers" -> (Host + ":" + Port)))
.save()
}

Let’s see how can we access the config which are in array. Very simple, you can do that with a simple “for” implementation or flatMap.

Bingo!

In the above code block, I am reading my different files by taking setup.kafka_topic_test_data element and inserting into corresponding topics available in the same list.

Isn’t it fun? Reach out to me if you need something on ETL framework building or you need to get some brainstorming for a solution. I am always here to talk.

For any type of help regarding career counselling, resume building, discussing designs or know more about latest data engineering trends and technologies reach out to me at anigos.

P.S : I don’t charge money

--

--

Ani

Big Data Architect — Passionate about designing robust distributed systems