CategoriesNiet gecategoriseerd

How to import csv to Elasticsearch

There are multiple ways to import a CSV into Elasticsearch. For this how-to, we will explain two different ways to do it. The first one is with Logstash, and the second one is with Python.

Prepare setup

First, we will download the dataset. For this, we can use the Divvy dataset, which consists of thousands of rides. This is a perfect training dataset. For example, I downloaded the Divvy_Trips_2020_Q1.zip file.

https://divvy-tripdata.s3.amazonaws.com/index.html

Unzip the zip file.

unzip Divvy_Trips_2020_Q1.zip

This should result in a file called Divvy_Trips_2020_Q1.csv. Get the csv headers with the following command.

cat Divvy_Trips_2020_Q1.csv |head -n 5
ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
EACB19130B0CDA4A,docked_bike,2020-01-21 20:06:59,2020-01-21 20:14:30,Western Ave & Leland Ave,239,Clark St & Leland Ave,326,41.9665,-87.6884,41.9671,-87.6674,member
8FED874C809DC021,docked_bike,2020-01-30 14:22:39,2020-01-30 14:26:22,Clark St & Montrose Ave,234,Southport Ave & Irving Park Rd,318,41.9616,-87.666,41.9542,-87.6644,member
789F3C21E472CA96,docked_bike,2020-01-09 19:29:26,2020-01-09 19:32:17,Broadway & Belmont Ave,296,Wilton Ave & Belmont Ave,117,41.9401,-87.6455,41.9402,-87.653,member
C9A388DAC6ABF313,docked_bike,2020-01-06 16:17:07,2020-01-06 16:25:56,Clark St & Randolph St,51,Fairbanks Ct & Grand Ave,24,41.8846,-87.6319,41.8918,-87.6206,member

Logstash

The most straightforward way to import the data is through Logstash. We can use Logstash with a CLI command to import it directly into Elasticsearch.

Create the following file as import.conf. Make sure you update the highlighted variables.

input {
  file {
    path => "/home/deepsearch/elastic/Divvy_Trips_2020_Q1.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}


filter {
  csv {
    separator => ","
    skip_header => true
    columns => [
      "ride_id",
      "rideable_type",
      "started_at",
      "ended_at",
      "start_station_name",
      "start_station_id",
      "end_station_name",
      "end_station_id",
      "start_lat",
      "start_lng",
      "end_lat",
      "end_lng",
      "member_casual"
    ]
  }


  mutate {
    convert => {
      "start_station_id" => "integer"
      "end_station_id" => "integer"
      "start_lat" => "float"
      "start_lng" => "float"
      "end_lat" => "float"
      "end_lng" => "float"
    }
  }

  date {
    match => [ "started_at", "YYYY-MM-dd HH:mm:ss" ]
    target => "@timestamp"
    timezone => "UTC"
  }
}

output {
  elasticsearch {
    hosts => ["elastic.deepsearch-foundry.com:9200"]
    index => "bike_rides"
    user => "elastic"
    password => "password"
    ssl_enabled => true
  }

  stdout {
    codec => rubydebug
  }
}

After running the import, you can check if it succeeded. This should output the indexed fields.

curl -u elastic:password https://elastic.deepsearch-foundry.com:9200/bike_rides/_search?pretty

Python script

The second option is to import it through a Python script.

First, we need to install the necessary dependencies.

pip install pandas elasticsearch

After that, we create a new Python script called import.py.

import pandas as pd
from elasticsearch import Elasticsearch

df = pd.read_csv("Divvy_Trips_2020_Q1.csv")
es = Elasticsearch(["https://elastic.deepsearch-foundry.nl:9201"], basic_auth=('elastic', 'password'))

index_name = "bike_rides_python"

# Optional: Create index with mappings (if needed)
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name)

# Bulk import
for i, row in df.iterrows():
    doc = row.to_dict()
    es.index(index=index_name, document=doc)

Now we can start the import script.

python3 import.py

This might take a while, especially because the script is not optimized for large imports. You can check the result by querying Elasticsearch.

curl -u elastic:password https://elastic.deepsearch-foundry.com:9200/bike_rides_python/_search?pretty

Final thoughts

There are many ways to import data into Elasticsearch. This guide shows some of the ways to do it. Use this as a starting point to optimize the process for mass imports and improved speed. Here are a few things you should do in a production environment:

  • Use _bulk endpoint for large imports
  • Define index mapping beforehand
  • Monitor ingest and verify

Good luck with the imports, and success with the rest of your Elastic journey!