Pythonによるssh自動接続用スクリプト(試作)

2017/08/18 追記

Golangで書き直してgithubに上げてるので、ちゃんと使うならそっちのほうがいいかも。


(すっごい久々にブログの記事書いた気がする…)

この度、転職が決まったんですが新しい職場だとクライアントはMacを使える事になった。
今までの職場だとTeratermのlistboxを使用したマクロでサーバにログインしてたので、似たようなものをMac用に作ることにしてみた。

で、最初bashで書こうかと思ったんだけど、勉強がてらPythonで書いてみようと思いPythonにした。
とりあえず、今の状態で出来ることとしては以下。

  • host.listを元に接続先を選択してログイン
  • suでユーザのスイッチ可
  • 操作ログの自動取得(ログ中にタイムスタンプの挿入不可)

なお、sshでの接続にはpexpectを用いているので、それだけは別途インストールが必要になる。
ログイン時に実行させる任意のコマンドをファイルに書いといたり、操作ログに操作時のタイムスタンプを挿入させたりしたいけど、まだ出来てない…どうもpexpectなどで直接ログ取得時にタイムスタンプを付与すると、コマンドの実行結果行に中途半端なタイムスタンプがついちゃったり、入力文字列の1つ1つにタイムスタンプがついて大変うざったい事になるので、scriptコマンドの時と同様に別プロセスで付与させるしかないのかな?

今のトコ、必要になるファイルはスクリプトファイル本体とhost.list(接続先ホストの情報を記述している)の2ファイルのみ。
あんまりセキュリティは考えていない…(今後の事考えるとよろしくないかも)。

1.host.list

基本的に、今までのTeratermマクロのものを使いまわせるよう、同じフォーマットとしている。

host.list

# SSHAutoLogin.ttl用 ログインホスト設定ファイル
# 左から順に、以下の内容を記述していく。
# HOST_NAME(任意の名称。接続するホストの選択時やログ名に利用する。必須。)
# HOST_IP(ホストのIPアドレス。接続時に利用する。必須。)
# HOST_ROOT_USER(管理者ユーザ名。ログインユーザを設定してスイッチする場合はrootユーザを設定。必須。)
# HOST_ROOT_PASS(管理者ユーザパスワード。必須。)
# HOST_USER(ログインユーザ名。管理者ユーザで直接ログイン出来る場合は不要。)
# HOST_PASS(ログインユーザパスワード。管理者ユーザで直接ログイン出来る場合は不要。)
# SUDO_FLAG(スイッチ文を「sudo su -」とするか否か。何かしら(0でも1でも)入力すると有効になる。スイッチ時のコマンドが「su -」の場合は不要)

# HOST_NAME, HOST_IP, HOST_ROOT_USER, HOST_ROOT_PASS, HOST_USER, HOST_PASS, SUDO_FLAG

# CentOS(「su -」)でスイッチする例
Test-CentOS1,172.28.0.101,root,password,test,password

# CentOS(「su -」)でスイッチせずに直接ログインする例
Test-CentOS2,172.28.0.102,root,password

# Ubuntu(sudo su -)でスイッチする例
Test-Ubuntu1,172.28.0.103,test,password,test,password,1

本当は、Macで使うこと前提ならssh接続時に背景画像を変える事を前提に、その辺の設定を記述しても良さそうだけど、今はいいや…

2.ssh_connect.py

スクリプト本体。
あまりいい出来ではない…少しづつ直していこう。

ssh_connect.py

#! /usr/bin/python
# -*- coding: utf-8 -*-

################
# モジュール読込 #
################
from __future__ import division
import curses
from math import *
import pexpect
import locale
locale.setlocale(locale.LC_ALL, "")
from datetime import datetime
import logging

######
# 関数 #
######

def ssh_connect(line_no):
    # 接続時に使用する情報を取得
    connect_host_name      = "".join(map(str,host_name_list[line_no]))
    connect_host_ip        = "".join(map(str,host_ip_list[line_no]))
    connect_host_root_user = "".join(map(str,host_root_user_list[line_no]))
    connect_host_root_pass = "".join(map(str,host_root_pass_list[line_no]))
    connect_host_user      = "".join(map(str,host_user_list[line_no]))
    connect_host_pass      = "".join(map(str,host_pass_list[line_no]))
    connect_sudo_flag      = "".join(map(str,sudo_flag_list[line_no]))

    # ログインユーザーの指定有無を確認
    if connect_host_user == "":
        connect_user = connect_host_root_user
        connect_pass = connect_host_root_pass
    else:
        connect_user = connect_host_user
        connect_pass = connect_host_pass

    foo = pexpect.spawn('ssh -o "StrictHostKeyChecking=no" %s@%s' % (connect_user, connect_host_ip))
    foo.logfile_read = open('%s%s.log.%s' % ( log_dir,connect_host_name,now_time ),"w")
    foo.expect(['.*ssword:','パスワード:'])
    print foo.before.rstrip()
    print foo.after.rstrip()
    foo.sendline(connect_pass)
    if not connect_host_user == "":
        foo.expect(['$','#'])
        print foo.before.strip()
        print foo.after.strip()
        foo.sendline('su - %s' % (connect_host_root_user))

        foo.expect(['.*ssword:','パスワード:'])
        print foo.before.strip()
        print foo.after.strip()
        foo.sendline(connect_host_root_pass)
    foo.interact()

##########
# 設定関連 #
##########
max_row      = 20
list_file    = '/tmp/data.csv'
command_file = ''
log_dir      = '/tmp/'
now_time     = datetime.now().strftime('%Y%m%d_%H%M%S')

################
# curses関連設定 #
################
# リスト表示件数
screen        = curses.initscr()
curses.noecho()
curses.cbreak()
curses.start_color()
screen.keypad( 1 )
curses.init_pair(1,curses.COLOR_BLACK, curses.COLOR_CYAN)
highlightText = curses.color_pair( 1 )
normalText    = curses.A_NORMAL
screen.border( 0 )
curses.curs_set( 0 )
box           = curses.newwin( max_row + 2, 100, 1, 1 )
box.box()

################
# リスト情報取得 #
################
f = open(list_file)
line = f.readline()
i = 0
host_name_list      = []
host_ip_list        = []
host_root_user_list = []
host_root_pass_list = []
host_user_list      = []
host_pass_list      = []
sudo_flag_list      = []
while line:
    line = line.strip()
    # コメント・空行を除外
    if line.startswith( "#" ) or len( line ) == 0:
        line = f.readline()
        continue
    # リストの各要素を取得
    line_column = len( line.split(',') )
    if line_column == 4:
        host_name      = line.split(',')[0]
        host_ip        = line.split(',')[1]
        host_root_user = line.split(',')[2]
        host_root_pass = line.split(',')[3]
        host_user      = ""
        host_pass      = ""
        sudo_flag      = ""
    elif line_column == 6:
        host_name      = line.split(',')[0]
        host_ip        = line.split(',')[1]
        host_root_user = line.split(',')[2]
        host_root_pass = line.split(',')[3]
        host_user      = line.split(',')[4]
        host_pass      = line.split(',')[5]
        sudo_flag      = ""
    elif line_column == 7:
        host_name      = line.split(',')[0]
        host_ip        = line.split(',')[1]
        host_root_user = line.split(',')[2]
        host_root_pass = line.split(',')[3]
        host_user      = line.split(',')[4]
        host_pass      = line.split(',')[5]
        sudo_flag      = line.split(',')[6]
    else:
        exit(1)
        print "リストに誤りがあります。"
        print line
    host_name_list      += [[host_name]]
    host_ip_list        += [[host_ip]]
    host_root_user_list += [[host_root_user]]
    host_root_pass_list += [[host_root_pass]]
    host_user_list      += [[host_user]]
    host_pass_list      += [[host_pass]]
    sudo_flag_list      += [[sudo_flag]]
    line = f.readline()
f.close
row_num = len( host_name_list )

############
# リスト表示 #
############
pages = int( ceil( row_num / max_row ) )
position = 1
page = 1
for i in range( 1, max_row + 1 ):
    if row_num == 0:
        box.addstr( 1, 1, "There aren't host_name_list", highlightText )
    else:
        if (i == position):
            box.addstr( i, 2, str( i ) + " - " + "".join(map(str,host_name_list[ i - 1 ])), highlightText )
        else:
            box.addstr( i, 2, str( i ) + " - " + "".join(map(str,host_name_list[ i - 1 ])), normalText )
        if i == row_num:
            break
screen.refresh()
box.refresh()

########################
# ホスト選択画面キー操作 #
########################
x = screen.getch()
while x != 27:

    # 下キー押下時の動作
    if x == curses.KEY_DOWN:
        if page == 1:
            if position < i:
                 position = position + 1
             else:
                 if pages > 1:
                    page = page + 1
                    position = 1 + ( max_row * ( page - 1 ) )
        elif page == pages:
            if position < row_num:
                position = position + 1
        else:
            if position < max_row + ( max_row * ( page - 1 ) ):
                 position = position + 1
             else:
                 page = page + 1
                 position = 1 + ( max_row * ( page - 1 ) )
     # 上キー押下時の動作
     if x == curses.KEY_UP:
         if page == 1:
             if position > 1:
                position = position - 1
        else:
            if position > ( 1 + ( max_row * ( page - 1 ) ) ):
                position = position - 1
            else:
                page = page - 1
                position = max_row + ( max_row * ( page - 1 ) )

    # 左キー押下時の動作
    if x == curses.KEY_LEFT:
        if page > 1:
            page = page - 1
            position = 1 + ( max_row * ( page - 1 ) )

    # 右キー押下時の動作
    if x == curses.KEY_RIGHT:
        if page < pages:
            page = page + 1
            position = ( 1 + ( max_row * ( page - 1 ) ) )

    # Enter押下時(決定)の処理
    if x == ord( "\n" ) and row_num != 0:
        screen.erase()
        screen.border( 0 )

        curses.endwin()
        ssh_connect(position - 1)
        exit()

    box.erase()
    screen.border( 0 )
    box.border( 0 )

    for i in range( 1 + ( max_row * ( page - 1 ) ), max_row + 1 + ( max_row * ( page - 1 ) ) ):
        if row_num == 0:
            box.addstr( 1, 1, "There aren't host_name_list",  highlightText )
        else:
            if ( i + ( max_row * ( page - 1 ) ) == position + ( max_row * ( page - 1 ) ) ):
                box.addstr( i - ( max_row * ( page - 1 ) ), 2, str( i ) + " - " + "".join(map(str,host_name_list[ i - 1 ])), highlightText )
            else:
                box.addstr( i - ( max_row * ( page - 1 ) ), 2, str( i ) + " - " + "".join(map(str,host_name_list[ i - 1 ])), normalText )
            if i == row_num:
                break

    screen.refresh()
    box.refresh()
    x = screen.getch()

curses.endwin()
exit()

実際に使用した際のイメージがこちら
コマンドを実行すると…

接続先のリストが表示されるので、接続するホストを矢印キーで選択しEnter。

ログイン完了。
rootにスイッチする場合は、host.listで指定してればスイッチするようにしている。

まぁ、これからある程度使っていくだろうし、少しづつブラッシュアップしていこう。