Python: Arbeiten mit YAML Files

YAML ist eine vereinfachte Auszeichnungssprache zur Datenserialisierung. Das Format ist einfach gehalten und ist eine schöne alternative zu XML, JSON usw. Ist auch ein super Format für Configfiles als alternative zu INI Files.

Einfaches YAML File:

variable_str: value
variable_num: 33

list:
  - a
  - b
  - c
  - d

dictionary:
  - list1:
    - a
    - b
    - c
  - list2:
    - a
    - b
    - c

block: |
    block ..................
    block ..................
    block ..................
    block ..................
    block ..................
    block ..................
    block ..................

Testscript:

import yaml
from pprint import pprint

with open("test3.yaml", "r") as f:
    x = yaml.load(f.read())

print "RAW Data:"
pprint(x)
print "----"
print "Block:"
print x['block']
print "Dictionary:"
print x['dictionary']
print "List:"
print x['list']
print "Var String:"
print x['variable_str']
print "Var Num:"
print x['variable_num']

Ausgabe:

RAW Data:
{'block': 'block ..................\nblock ..................\nblock ..................\nblock ..................\nblock ..................\nblock ..................\nblock ..................\n',
 'dictionary': [{'list1': ['a', 'b', 'c']}, {'list2': ['a', 'b', 'c']}],
 'list': ['a', 'b', 'c', 'd'],
 'variable_num': 33,
 'variable_str': 'value'}
----
Block:
block ..................
block ..................
block ..................
block ..................
block ..................
block ..................
block ..................

Dictionary:
[{'list1': ['a', 'b', 'c']}, {'list2': ['a', 'b', 'c']}]
List:
['a', 'b', 'c', 'd']
Var String:
value
Var Num:
33

Python Dictionarys / Listen etc. „dumpen“ in ein YAML File:

import yaml

t = {
    "bubu": {
            "a": "a",
            "b": "b",
            "c": "c",
            "d": "d",
            "e": "e",
            },
    "baba": {
            "a": "a",
            "b": "b",
            "c": "c",
            "d": "d",
            "e": "e",
            }
    }

with open('foobar.yaml', 'w') as f:
    yaml.dump(t, f, default_flow_style=False)

Ausgabe:

bubu:
  a: a
  b: b
  c: c
  d: d
  e: e
baba:
  a: a
  b: b
  c: c
  d: d
  e: e

 

 

Python: Entfernen von Steuerzeichen in ASCII Strings

Bin heute wieder mal über ein Problem mit ASCII Strings gestolpert. Im String war ein CTRL+C welches als \x03 in der ASCII Tabelle ist. Ich bin auf eine Seite gestoßen welche für alle möglichen Script und Programmiersprachen Codesnippets bereitstellt um Steuerzeichen zu entfernen.

ASCII Tabelle bei Wikipedia: https://de.wikipedia.org/wiki/American_Standard_Code_for_Information_Interchange

Hier für Python:

stripped = lambda s: "".join(i for i in s if 31 < ord(i) < 127)
 
print(stripped("\ba\x00b\n\rc\fd\xc3"))

Output:

abcd

Quelle: https://rosettacode.org/wiki/Strip_control_codes_and_extended_characters_from_a_string

Alternativ kann man auch einzelne ASCII Codes ersetzen mit der Regex Library

Beispiel:

import re

data = "ab\x03cd"
data_clean = re.sub(r"\x03","",data)

print data

abcd

Viel Spaß 🙂

Cisco/Python: Backup der Konfiguration bei write Event auf externen Server

Diverse Cisco Geräte können bei einem write Event die Konfiguration an einen anderen Server z.B. über HTTP pushen.

Cisco Config:

archive
 path http://1.2.3.4/cisco_config/put/$h-$t
 write-memory

Apache /etc/httpd/conf.d/zzz_cisco_backup.conf:

WSGIDaemonProcess cisco_backup user=apache group=apache threads=10
WSGIPythonPath /opt/cisco_backup/web_root
WSGIScriptAlias /cisco_backup /opt/cisco_backup/web_root/cisco_backup.wsgi

<Directory /opt/cisco_backup/web_root>
WSGIProcessGroup cisco_backup
WSGIApplicationGroup %{GLOBAL}
WSGIScriptReloading On
Order deny,allow
Allow from all

<Files cisco_backup.py>
Require all granted
</Files>
<Files cisco_backup.wsgi>
Require all granted
</Files>

</Directory>

cisco_backup.wsgi File:

import sys

sys.path.append("/opt/cisco_backup/web_root")

from cisco_backup import app as application

cisco_backup.py File:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from flask import Flask
from flask import request

app = Flask(__name__)

@app.route("/put/<cfg>", methods=['PUT'])
def get_config(cfg):
   with open('/opt/cisco_config/incoming_configs/%s' % cfg, "wb") as f:
      f.write(request.data)
   return "ok"

if __name__ == "__main__":
    app.run()

Viel Spaß 😉

Python: Snippet – E-Mail versenden, alternative zu Mailer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import smtplib
from email.mime.text import MIMEText


def postmaster(mfrom, mto, msubject, message, smtphost):

    msg = MIMEText(message.encode("utf-8"))
    msg['Subject'] = msubject
    msg['From'] = mfrom
    msg['To'] = mto

    s = smtplib.SMTP(smtphost)
    s.sendmail(msg['From'], msg['To'], msg.as_string())
    s.quit()

 

Python: Snippet: SSH shell on Cisco devices

Mit dem Snippet können Kommandos auf einer Cisco Shell via SSH ausgeführt werden.

#!/usr/bin/env python

import paramiko
import sys


def send_string_and_wait_for_string(command, wait_string, should_print):
    shell.send(command)
  
    receive_buffer = ""

    while not wait_string in receive_buffer:
        receive_buffer += shell.recv(1024)

    if should_print:
        print receive_buffer

    return receive_buffer

client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect("10.62.62.10", username="testuser", password="testpasswd", look_for_keys=False, allow_agent=False)

shell = client.invoke_shell()
send_string_and_wait_for_string("", "#", False)
send_string_and_wait_for_string("terminal length 0\n", "#", False)
output=send_string_and_wait_for_string("show logging\n", "#", False)
print output
client.close()

Mehr Infos / Quelle: http://blog.timmattison.com/archives/2014/06/25/automating-cisco-switch-interactions/

Python: Snippet/Experiment – Syslog Server mit globalen und Host Filtern

Der Code ist nicht fertig und war mal ein Labor Versuch. Es lassen sich globale und Host Filter setzen wo diese zutreffen werden die Logs in ein extra File geschrieben.

Config file:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Config definition


class CFG:

    def __init__(self):

        # Path for logfiles
        self.syslogpath = "/home/mthoma/_dev/syslog/log/"

        # Listner Port
        self.port = 3702

        # Listner address
        self.host = "0.0.0.0"

        # Global Filter
        self.global_filter = {
            "filter": [
                ".*FOOBAR.*",
                ".*COFFEE.*"
            ]
        }

        # Host Filter
        self.host_filter = {
            "10.201.11.33": {
                "filter": [
                    ".*MACFLAP.*",
                    ".*BUBU.*",
                ]
            },
        }

Syslog Server:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Load config class
from config import CFG

# Load common classes
import re
import logging
import SocketServer
import socket
import os

# Load configuration file
C = CFG()

formatter = logging.Formatter('%(message)s')

def setup_logger(name, log_file, level=logging.INFO):
    handler = logging.FileHandler(log_file)
    handler.setFormatter(formatter)
    
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)
    
    return logger


class SyslogUDPHandler(SocketServer.BaseRequestHandler):

    def handle(self):
        data = bytes.decode(self.request[0].strip())
        sockets = self.request[1]

        ip = str(self.client_address[0])
        
    # Try to resolve reverse record via DNS
        try:
            name, alias, addresslist = socket.gethostbyaddr(ip)
        except:
            name = ip
        
    # Set path
        path = C.syslogpath+name+"/"
        
    # Create path if not exist
        try:
            os.stat(path)
        except:
            os.mkdir(path)
        
        logger = setup_logger('normal_log', path+"log")
        logger.info(str(data))
        
        logger_sp = setup_logger('special_log', path+"spec")
        
        if ip in C.host_filter:
            filters = options['filter'] + C.global_filter['filter']
            filter_join = "|".join(filters)
            
            if re.match(r"%s" % filter_join, str(data)):
                logger_sp.info(str(data))
        else:
            filters = C.global_filter['filter']
            filter_join = "|".join(filters)
            
            if re.match(r"%s" % filter_join, str(data)):
                logger_sp.info(str(data))
                
        
        
        print "%s : " % self.client_address[0], str(data)

        logging.info(str(data))



if __name__ == "__main__":

    try:
        server = SocketServer.UDPServer((C.host,C.port), SyslogUDPHandler)
        server.serve_forever(poll_interval=0.5)

    except (IOError, SystemExit):
        raise

    except KeyboardInterrupt:
        print "Crtl+C Pressed. Shutting down."

 

Python: Snippet Multiprocessing mit Ergebnis

Beispiel für Parallelisierung von Jobs mit Ergebnis welche als Liste zurückgegeben werden.

#!/usr/bin/env python
# -*- encoding: utf-8; py-indent-offset: 4 -*-

import os
from multiprocessing import Pool


def worker(job):
    x, y = job
    result = x ** y
    return os.getpid(), result
  
if __name__ == '__main__':
    jobs = [(1, 2), (3, 4), (5, 6), (11, 12), (13, 14), (15, 16), (21, 22), (23, 24), (25, 26)]
    
    result_buffer = []
  
    pool = Pool(processes=5)
    
    for job in jobs:
        result_buffer.append(pool.apply_async(worker, args=(job,)))
    
    pool.close()
    pool.join()
  
    results = [r.get() for r in result_buffer]

    print results
  
    for pid, result in results:
        print "working pid was: %s" % pid
        print "result is: %s" % result
        print "---"

Beispiel Ergebnis:

$python mp_with_result.py

[(7992, 1), (7992, 81), (7992, 15625), (7992, 3138428376721L), (7992, 3937376385699289L), (7992, 6568408355712890625L), (7992, 122694327386105632949003612841L), (7992, 480250763996501976790165756943041L), (7992, 2220446049250313080847263336181640625L)]
working pid was: 7992
result is: 1
---
working pid was: 7992
result is: 81
---
working pid was: 7992
result is: 15625
---
working pid was: 7992
result is: 3138428376721
---
working pid was: 7992
result is: 3937376385699289
---
working pid was: 7992
result is: 6568408355712890625
---
working pid was: 7992
result is: 122694327386105632949003612841
---
working pid was: 7992
result is: 480250763996501976790165756943041
---
working pid was: 7992
result is: 2220446049250313080847263336181640625
---

 

Python: Experiment/Snippet – Komprimieren und löschen von Logfiles nach X Tagen

Ein Ansatz für Logverzeichnisse im Format /log/<yyyy>/<mm>/<dd>/<div. logsfiles>

#!/usr/bin/env python

import gzip
import shutil
import os
import datetime
import time

#############################################
# Config
#############################################

# Path of Logfiles
# Structure is /opt/log/<YYYY>/<MM>/<DD>/
gpath='/opt/log/'

# hold logs for x days
hold_time=180



#############################################

def get_immediate_subdirectories(a_dir):
    return [name for name in os.listdir(a_dir) if os.path.isdir(os.path.join(a_dir, name))]

def delete_files(f):
    # delete file if older than hold time
    nowx = time.time()

    for file in os.listdir(f):    
        if os.stat(f+file).st_mtime < nowx - hold_time * 86400:
            f_path = f+file
            print "delete %s " % f_path
            os.remove(f_path)
            
    try:
        os.rmdir(f)
    except:
        pass
            
    
    
def compress_files(lpath):
    # Compress files
    print "Working on: " + lpath
    obj = os.listdir(lpath)
    for f in obj:
        if os.path.isfile(lpath+f) and ".gz" not in f:
            with open(lpath+f,'rb') as f_in:
                with gzip.open(lpath+f+".gz",'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
                    os.remove(lpath+f)
            
#compress everything which ist older than now

now = datetime.datetime.now()
years = get_immediate_subdirectories(gpath)

for year in years:

    # delete empty directories
    if not os.listdir(gpath+year):
        os.rmdir(gpath+year)
    else:

        months = get_immediate_subdirectories(gpath+year)
    
        for month in months:

            # delete empty directories
            if not os.listdir(gpath+year+"/"+month):
                os.rmdir(gpath+year+"/"+month)

            else:
                days = get_immediate_subdirectories(gpath+year+"/"+month)
                        
                # Remove current day from compressing & cleaning
                if month == str(now.month) and year == str(now.year):

                    if len(str(now.day)):
                        now_day = "0%s" % now.day
                    else:
                        now_day = str(now.day)

                    days.remove(now_day)
        
                for day in days:
                    # delete empty directories
                    if not os.listdir(gpath+year+"/"+month+"/"+day+"/"):
                        os.rmdir(gpath+year+"/"+month+"/"+day+"/")
                    else:
                        # compress all files in folder
                        compress_files(gpath+year+"/"+month+"/"+day+"/")
                        
                        # delete old files
                        delete_files(gpath+year+"/"+month+"/"+day+"/")

 

Python: Snippet – Kaputten UTF-8 String reparieren

Ich habe aus der Datenbank einen String zurückbekommen der UTF-8 war aber falsch kodiert zurückgegeben wurde. So wurde aus Geschäftsstelle -> Gesch├ñftsstelle

Folgendes Snippet kann einen kaputten UTF-8 String neu auf UTF-8 kodieren:

name_kaputt = 'Gesch\xc3\xa4ftsstelle'

name = ''.join(chr(ord(c)) for c in name_kaputt).decode("utf-8")

print name_kaputt
print name

Ergebnis:

Python 2.7.13 (v2.7.13:a06454b1afa1, Dec 17 2016, 20:42:59) [MSC v.1500 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> name_kaputt = 'Gesch\xc3\xa4ftsstelle'
>>> name = ''.join(chr(ord(c)) for c in name_kaputt).decode("utf-8")
>>> print name_kaputt
Geschäftsstelle
>>> print name
Geschäftsstelle