RSS Feeds mit Logstash und Elasticsearch durchsuchen

von Peter Soth

In diesem Blogpost möchte ich aufzeigen, wie man RSS Feeds mit Logstash und Elasticsearch durchsuchen kann. Folgendes Logstash-Skript holt sich alle 2 Stunden Informationen aus der Logistikbranche.

input {

  rss {
    url => "http://www.dvz.de/rssallenews"
    interval => 7200
        tags => ["de", "rss", "dvz"]
  }

  rss {
    url => "http://www.openpr.de/rss/kategorie/12/Logistik-Transport.xml"
    interval => 7200
        tags => ["de", "rss", "openpr"]
  }

  rss {
    url => "http://www.pressebox.de/rss/flag/logistik"
    interval => 7200
        tags => ["de", "rss", "openpr"]
  }
}

filter {

}

output {
 elasticsearch {
  action => "index"
  hosts => "localhost"
  workers => 1
 }
 stdout {}
}

Und folgendes Python-Skript ermöglicht das Dursuchen des RSS-Feeds. Die Elasticsearch-Query liefert keine Dubletten (da ja alle 2 Stunden der RSS-Feed indiziert wird) zurück. Dies wird mit Hilfe einer top_hits Aggregation erreicht.

from elasticsearch import Elasticsearch
import html2text
from terminaltables import AsciiTable
import datetime

# init Elasticsearch
es = Elasticsearch([
    {'host': '0.0.0.0', 'port': 0}
])

def addRowTable(table, col1, col2, col3, col4):
    table.append([str(col1), str(col2), str(col3), str(col4)])

def queryElastic(query):
    # query Elasticsearch
    results = es.search(index='logstash*', body={
       "size":0,
       "query":{
          "query_string":{
             "query": query,
             "analyze_wildcard" : "true"
          }
       },
       "aggs":{
          "dedup":{
             "terms":{
                "field":"title.raw",
                "size":0
             },
             "aggs":{
                "dedup_docs":{
                   "top_hits":{
                      "size":1
                   }
                }
             }
          }
       }
    })
    # setting parameters for html2text
    plainhtml = html2text.HTML2Text()
    plainhtml.ignore_links = True
    plainhtml.ignore_images = True
    # setting ascii table cols
    table_data = [['Datum', 'Titel', 'Meldung', 'Link']]
    # sorting results by timestamp date
    results = sorted(results['aggregations']['dedup']['buckets'], key=lambda k: k['dedup_docs']['hits']['hits'][0]['_source'].get('@timestamp', 0), reverse=True)
    for result in results:
        datum = datetime.datetime.strptime(result['dedup_docs']['hits']['hits'][0]['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%d.%m.%y')
        titel = result['dedup_docs']['hits']['hits'][0]['_source']['title'].strip()
        meldung = plainhtml.handle(result['dedup_docs']['hits']['hits'][0]['_source']['message'].strip().replace('\n', ''))
        link = result['dedup_docs']['hits']['hits'][0]['_source']['link'].strip()
        addRowTable(table_data, datum, titel, meldung, link)
    # create ascii table
    table = AsciiTable(table_data)
    table.inner_row_border = True
    #print table
    print(table.table)

queryElastic(input("Enter RSS search string: "))

Und so sieht das Ganze in Aktion aus:

Natürlich kann die Abfrage auch in einer Web-Applikation bereitgestellt werden. Python wurde hier nur zur Veranschaulichung benutzt.

Kategorien: ElasticsearchLogstashPython

Zurück