Friday, July 24, 2020

Azure/SQL table to Kafka topic in JSON format using jq

When working with any complex environment, using a static set of data makes developing and testing easier at early stages. With Kafka, one way to do this is sending each raw in the table as a message to Kafka. Providing them in JSON or similar is better than CSV though there is a higher overhead due to the headers in each message, yet it is viable than dealing with missing fields etc. in CSV.


I used the Centos system that has Kafka and SQL command line installed. Starting from command line given in my earlier post here, we can get table convert to JSON.


sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0  -Q "select * from orders" | jq -c --slurp --raw-input --raw-output  "split(\"\n\") |  map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" 

When sending messages to Kafka, each row has to be a JSON object rather than an array.  jq 'Array/Object value iterator (.[]) does exactly that. 

sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0  -Q "select * from orders" | jq -c --slurp --raw-input --raw-output  "split(\"\n\") |  map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq .[]

This is not ideal as it makes the JSON pretty by adding new lines after each field. Using -c option solves the problem.

sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0  -Q "select * from orders" | jq -c --slurp --raw-input --raw-output  "split(\"\n\") |  map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq -c .[]

Now each row is a single JSON object as below.


Next task is to create a topic in Kafka, say orders. Using the bundled command line script to create topics, and using the default values,

/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic orders --partitions 1 --replication-factor 1

Now it is just a matter of sending the data to the Kafka topic using the bundled producer script. Simply pipe the output of sqlcmd to the command line producer as below.

sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0  -Q "select * from orders" | jq -c --slurp --raw-input --raw-output  "split(\"\n\") |  map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq -c .[] | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders 










No comments:

Post a Comment