When working with any complex environment, using a static set of data makes developing and testing easier at early stages. With Kafka, one way to do this is sending each raw in the table as a message to Kafka. Providing them in JSON or similar is better than CSV though there is a higher overhead due to the headers in each message, yet it is viable than dealing with missing fields etc. in CSV.
I used the Centos system that has Kafka and SQL command line installed. Starting from command line given in my earlier post here, we can get table convert to JSON.
sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0 -Q "select * from orders" | jq -c --slurp --raw-input --raw-output "split(\"\n\") | map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})"
When sending messages to Kafka, each row has to be a JSON object rather than an array. jq 'Array/Object value iterator (.[]) does exactly that.
sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0 -Q "select * from orders" | jq -c --slurp --raw-input --raw-output "split(\"\n\") | map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq .[]
This is not ideal as it makes the JSON pretty by adding new lines after each field. Using -c option solves the problem.
sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0 -Q "select * from orders" | jq -c --slurp --raw-input --raw-output "split(\"\n\") | map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq -c .[]
Now each row is a single JSON object as below.
Next task is to create a topic in Kafka, say orders. Using the bundled command line script to create topics, and using the default values,
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic orders --partitions 1 --replication-factor 1
Now it is just a matter of sending the data to the Kafka topic using the bundled producer script. Simply pipe the output of sqlcmd to the command line producer as below.
sqlcmd -S <youraccount>.database.windows.net -U user -P PW -d northwind -s~ -y 0 -Q "select * from orders" | jq -c --slurp --raw-input --raw-output "split(\"\n\") | map(split(\"~\")) |map({\"OrderID\":.[0], \"CustomerID\":.[1], \"EmployeeID\":.[2], \"OrderDate\":.[3], \"RequiredDate\":.[4], \"ShippedDate\":.[5], \"ShipVia\":.[6], \"Freight\":.[7], \"ShipName\":.[8], \"ShipAddress\":.[9],\"ShipCity\":.[10],\"ShipRegion\":.[11],\"ShipPostalCode\":.[12], \"ShipCountry\":.[13]})" | jq -c .[] | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders
No comments:
Post a Comment