ObjectRocket

Using Logstash to load CSV data into Elasticsearch

Do you have a brand new Elasticsearch instance, but all your useful data you’d like to search lives in a CSV file? No problem. Logstash makes turning almost any data into something easily searchable in an Elasticsearch index.

To start with, you need some data (and a unix-like environment to use these examples – Windows works fine with some minor adjustments); In my case, I wanted to take an export of the data from my Davis Vantage Pro2 weather station, in CSV format, and create a new index with it.

I started out with a few million lines that looked like this, stored in a local file:

$ head -3 /home/erik/weather.csv
HumOut,TempIn,DewPoint,HumIn,WindDir,RainMonth,WindSpeed,RainDay,BatteryVolts,WindChill,Pressure,time,TempOut,WindSpeed10Min,RainRate
76,78.0,78.227017302825,44,109,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:29Z,86.5,1,0.0
76,78.0,78.227017302825,44,107,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:45Z,86.5,1,0.0
76,78.0,78.32406784157725,44,107,2.0,0,0.0,1.236328125,90.83340000000001,29.543,2015-06-18T17:50:00Z,86.59999999999999,1,0.0

Don’t have any interesting data already? Here are a few fun sources:

Ok, so we have data, let’s get started. First, make sure you have a version of Java installed:

$ java -version
openjdk version "1.8.0_51"

Pretty much any JVM is fine for this – OpenJDK, Oracle, etc.

$ curl -O https://download.elastic.co/logstash/logstash/logstash-1.5.4.tar.gz
$ tar xfz logstash-1.5.4.tar.gz
$ cd logstash-1.5.4
$ mkdir conf

Now it’s time to build a configuration file.

First, we’ll define an ‘input’ section where we tell Logstash where to find the data.

input {
    file {
        path => "/home/erik/weather.csv"
        start_position => beginning

    }
}

This just tells it where to look, and that we want to load from the beginning of the file.

Next we need a filter – Logstash has loads of filter plugins available by default. We’re going to use a couple different ones to parse our data. So far, Logstash doesn’t really know anything about the data in the file – we need to tell it the format, and any other specifics on how to handle various fields.

filter {
    csv {
        columns => [
          "HumOut",
          "TempIn",
          "DewPoint",
          "HumIn",
          "WindDir",
          "RainMonth",
          "WindSpeed",
          "RainDay",
          "BatteryVolts",
          "WindChill",
          "Pressure",
          "time",
          "TempOut",
          "WindSpeed10Min",
          "RainRate"
        ]
        separator => ","
        remove_field => ["message"]
        }
    date {
        match => ["time", "ISO8601"]
    }
    mutate {
        convert => ["TempOut", "float"]
    }
}

I left the real columns in that correspond to my data; They should be pretty self explanatory, but there are a couple important pieces. First, I tell it to remove the ‘message’ field – which is an entry containing the entire row; I won’t need it, since I’m searching on specific attributes. Second, I tell it that the ‘time’ field contains an ISO8601-formatted date, so that Elasticsearch knows not to treat it as a plain string. Finally, I use the mutate function to convert the ‘TempOut’ value into a floating point number.

Ok, so now we’re set up to ingest the data, and parse it – but now we need to store it in Elasticsearch:

output {
    elasticsearch {
        protocol => "https"
        host => ["iad1-20999-0.es.objectrocket.com:20999"]
        user => "erik"
        password => "mysupersecretpassword"
        action => "index"
        index => "eriks_weather_index"
    }
    stdout { }
}

This should be pretty self-explanatory too – just configure for your host/port, authentication data, and the name of the index you’d like to store it in.

Ok, let’s fire it up! If it’s working, it should look something like this:

$ bin/logstash -f conf/logstash.conf -v
Logstash startup completed

Did it work? Let’s ask Elasticsearch:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/_cat/indices?v'
health status index               pri rep docs.count store.size pri.store.size
green  open   eriks_weather_index 5   1   294854     95.8mb     48.5mb

Looks like my documents are there! Let’s query for one:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/eriks_weather_index/_search?q=TempOut:>75&pretty&terminate_after=1'

This tells Elasticsearch to find documents with TempOut greater than 75 (Tempout:>75), to format it for human consumption (pretty), and to return no more than one result per shard (terminate_after=1). It should return something like this:

{
  "took" : 4,
  "timed_out" : false,
  "terminated_early" : true,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "eriks_weather_index",
      "_type" : "logs",
      "_id" : "AU-yXZJIJb3HnhKvpdNC",
      "_score" : 1.0,
      "_source":{"@version":"1","@timestamp":"2015-06-22T10:24:23.000Z","host":"kibana","path":"/home/erik/weather.csv","HumOut":"86","TempIn":"79.7","DewPoint":"70.65179649787358","HumIn":"46","WindDir":"161","RainMonth":"2.7","WindSpeed":"0","RainDay":"0.36","BatteryVolts":"1.125","WindChill":"82.41464999999999","Pressure":"29.611","time":"2015-06-22T10:24:23Z","TempOut":75.1,"WindSpeed10Min":"0","RainRate":"0.0"}
    } ]
  }
}

Success. Logstash is a great Swiss army knife for turning any data you have laying around into something you can easily play with in Elasticsearch – have fun!

Exit mobile version