标签:
SQLALchemy
ORM
db first 数据库操作类
code first 类操作数据库
1、自定义生成表 class 类(base):
列1
列2
根据类去创建表
2、使用类操作表
以后通过类和对象操作数据库
pramiko
堡垒机
ORM
连表 一对多 1、创建表,主动指定外键约束 2、操作 类:repr 单表 连表 session.query(表1).join(表2).all() 多对多 1、创建表,额外的关系表 2、filter() == int_( 都可以是另外一个查询) 3、relationship A AB ==> fk, 关系 B 4、简单 A 关系(B,ABTable对象) AB ==>fk, B 操作时,简单 Table对象: A 关系(B,Table对象方法) Table对象方法 AB ==>fk, B 操作时,简单 A 关系(B,AB.__table__) AB ==>fk, B 操作时,简单 1、创建表 2、操作表 #单表 #连表 .join 关系: 一对多 fk,关系 多对多, 多一张表 fk,fk 1、 关系表:关系 2、A:关系,(B,AB)
#定义数据库
from sqlalchemy.ext.declarative import declarative_base #创建base 后面类里要继承base
#创建连接
from sqlalchemy import Column,Integer,String,ForeignKey,UniqueConstraint,Index #表类型
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine#创建连接
engine = create_engine("mysql+pymysql://root:@127.0.0.1:3306/s13", max_overflow=5)#创建连接,连接到自己的数据库
Base = declarative_base()#继承base类
#一对多
class Group(Base):
__tablename__ = ‘group‘
nid = Column(Integer, primary_key=True,autoincrement=True)
caption = Column(String(50))
class User(Base):
__tablename__ = ‘user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
group_id = Column(Integer, ForeignKey("group.nid"))
group = relationship("Group",backref = "uuu")#跟host组关联的所有用户
def __repr__(self): #内部方法输出不再是对象
temp = "%s %s %s"%(self.nid,self.username,self.group_id)
return temp
#多对多
class Host(Base):
__tablename__ = ‘host‘
nid = Column(Integer, primary_key=True,autoincrement=True)
hostname = Column(String(32))
port = Column(String(32))
ip = Column(String(32))
class HostUser(Base):
__tablename__ = ‘host_user‘
nid = Column(Integer, primary_key=True,autoincrement=True)
username = Column(String(32))
class HostToHostUser(Base):
__tablename__ = ‘host_to_host_user‘
nid = Column(Integer, primary_key=True, autoincrement=True)
host_id = Column(Integer, ForeignKey(‘host.nid‘))
host_user_id = Column(Integer, ForeignKey(‘host_user.nid‘))
host = relationship("Host",backref=‘h‘)
host_user = relationship("HostUser",backref=‘u‘)
def init_db():
Base.metadata.create_all(engine)
def drop_db():
Base.metadata.drop_all(engine)
#init_db()
Session = sessionmaker(bind=engine)
session = Session()
# session.add(Group(caption=‘dba‘))
# session.add(Group(caption=‘ddd‘))
# session.commit()
# session.add_all([
# # User(username=‘alex1‘,group_id=1),
# User(username=‘alex2‘, group_id=2)
# #User(username=‘alex2‘,gorup_id=2)
# ])
# session.commit()
session.add_all([
Host(hostname=‘c1‘,port=‘22‘,ip=‘1.1.1.1‘),
Host(hostname=‘c2‘,port=‘22‘,ip=‘1.1.1.2‘),
Host(hostname=‘c3‘,port=‘22‘,ip=‘1.1.1.3‘),
Host(hostname=‘c4‘,port=‘22‘,ip=‘1.1.1.4‘),
Host(hostname=‘c5‘,port=‘22‘,ip=‘1.1.1.5‘),
])
#session.commit()
session.add_all([
HostUser(username=‘root‘),
HostUser(username=‘db‘),
HostUser(username=‘nb‘),
HostUser(username=‘sb‘),
])
#session.commit()
session.add_all([
HostToHostUser(host_id=1,host_user_id=1),
HostToHostUser(host_id=1,host_user_id=2),
HostToHostUser(host_id=1,host_user_id=3),
HostToHostUser(host_id=2,host_user_id=2),
HostToHostUser(host_id=2,host_user_id=4),
HostToHostUser(host_id=2,host_user_id=3),
])
# session.commit()
"""
#原始方法,不加relationship之前
ret = session.query(User).join(Group)
#ret = session.query(User).join(Group,isouter=True)
#print(ret) #查看生成的sql语句
ret1 = session.query(User).join(Group).all() #表示查询两张表放在一起把数据全部查到
#相当于 select * from user left join group on user.group_id = group.nid
#print(ret1)
#ret2 = session.query(User).join(Group,isouter=True).all() #以left方式
#session.query(User,Group).join(Group,isouter=True).all() #以left方式把两个表中的所有数据拿出来
#session.query(User.username,Group.caption).join(Group,isouter=True).all() #拿出相应的值
"""
"""
#新方式反向查询
obj = session.query(Group).filter(Group.caption == "DBA").first()
print(obj.nid)
print(obj.caption)
print(obj.uuu) #反向查找
"""
"""
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
# host_obj.nid
session.query()
session.query(Host.nid).filter(Host.hostname == ‘c1‘)
所有用户ID
host_obj = session.query(Host).filter(Host.hostname == ‘c1‘).first()
host_2_host_user = session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == host_obj.nid).all()
#print(host_2_host_user)
#[(1,),(2,),(2,)]
r = zip(*host_2_host_user)
#print(list(r)[0])
#[1,2,3,]
users = session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all()
print(users)
"""
# 原始方式
# session.query(HostUser.name).filter(HostUser.nid.in_(session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_id == session.query(Host.nid).filter(Host.hostname == ‘c1‘))))
"""
host_obj = session.query(Host).filter(Host.hostname==‘c1‘).first()
print(host_obj.nid)
print(host_obj.hostname)
# 第三表对应的对象
print(host_obj.h)
# 循环获取的第三表对应的对象
for item in host_obj.h:
print(item.host_user,item.host_user.nid,item.host_user.username)
"""
host_obj = session.query(Host).filter(Host.hostname==‘c1‘).first()
#item 代指user表的每一行数据
for item in host_obj.h:
print(item.host_user.username)
paramiko
上一章节有详细一点的介绍
既可以发送命令也可以上传文件
import paramiko
import uuid
class SSHConnection(object):
def __init__(self, host=‘192.168.11.61‘, port=22, username=‘alex‘,pwd=‘alex3714‘):
self.host = host
self.port = port
self.username = username
self.pwd = pwd
self.__k = None
def run(self):
self.connect()
pass
self.close()
def connect(self):
transport = paramiko.Transport((self.host,self.port))
transport.connect(username=self.username,password=self.pwd)
self.__transport = transport
def close(self):
self.__transport.close()
def cmd(self, command):
ssh = paramiko.SSHClient()
ssh._transport = self.__transport
# 执行命令
stdin, stdout, stderr = ssh.exec_command(command)
# 获取命令结果
result = stdout.read()
return result
def upload(self,local_path, target_path):
# 连接,上传
sftp = paramiko.SFTPClient.from_transport(self.__transport)
# 将location.py 上传至服务器 /tmp/test.py
sftp.put(local_path, target_path)
ssh = SSHConnection()
ssh.connect()
r1 = ssh.cmd(‘df‘)
ssh.upload(‘s2.py‘, "/home/alex/s7.py")
ssh.close()
堡垒机
终端输入
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:QL
import paramiko
import sys
import os
import socket
import select
import getpass
from paramiko.py3compat import u #python2.7需要把这行注释
tran = paramiko.Transport((‘10.211.55.4‘, 22,))
tran.start_client()
tran.auth_password(‘wupeiqi‘, ‘123‘)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
while True:
# 监视用户输入和服务器返回数据
# sys.stdin 处理用户输入
# chan 是之前创建的通道,用于接收服务器返回信息
readable, writeable, error = select.select([chan, sys.stdin, ],[],[],1)
if chan in readable:
try:
#x = chan.recv(1024) #python2.7需要改成这样
x = u(chan.recv(1024))
if len(x) == 0:
print(‘\r\n*** EOF\r\n‘)
break
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in readable:
inp = sys.stdin.readline()
chan.sendall(inp)
chan.close()
tran.close()
完整版
import paramiko
import sys
import os
import socket
import getpass
# from paramiko.py3compat import u
# windows does not have termios...
try:
import termios
import tty
has_termios = True
except ImportError:
has_termios = False
def interactive_shell(chan):
if has_termios:
posix_shell(chan)
else:
windows_shell(chan)
def posix_shell(chan):
import select
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
tty.setcbreak(sys.stdin.fileno())
chan.settimeout(0.0)
f = open(‘handle.log‘,‘a+‘)
tab_flag = False
temp_list = []
while True:
r, w, e = select.select([chan, sys.stdin], [], [])
if chan in r:
try:
x = chan.recv(1024)
if len(x) == 0:
sys.stdout.write(‘\r\n*** EOF\r\n‘)
break
if tab_flag:
if x.startswith(‘\r\n‘):
pass
else:
f.write(x)
f.flush()
tab_flag = False
sys.stdout.write(x)
sys.stdout.flush()
except socket.timeout:
pass
if sys.stdin in r:
x = sys.stdin.read(1)
if len(x) == 0:
break
if x == ‘\t‘:
tab_flag = True
else:
f.write(x)
f.flush()
chan.send(x)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
def windows_shell(chan):
import threading
sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
sys.stdout.write(‘\r\n*** EOF ***\r\n\r\n‘)
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
try:
while True:
d = sys.stdin.read(1)
if not d:
break
chan.send(d)
except EOFError:
# user hit ^Z or F6
pass
def run():
# 获取当前登录用户
host_list = [
{‘host‘: "192.168.11.139", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
{‘host‘: "192.168.11.138", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
{‘host‘: "192.168.11.137", ‘username‘: ‘oldboy‘, ‘pwd‘: "123"},
]
for item in enumerate(host_list, 1):
print(item[‘host‘])
num = raw_input(‘序号:‘)
sel_host = host_list[int(num) -1]
hostname = sel_host[‘host‘]
username = sel_host[‘username‘]
pwd = sel_host[‘pwd‘]
print(hostname,username,pwd)
tran = paramiko.Transport((hostname, 22,))
tran.start_client()
tran.auth_password(username, pwd)
# 打开一个通道
chan = tran.open_session()
# 获取一个终端
chan.get_pty()
# 激活器
chan.invoke_shell()
interactive_shell(chan)
chan.close()
tran.close()
if __name__ == ‘__main__‘:
run(
数据库表
from sqlalchemy import create_engine,and_,or_,func,Table from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String,ForeignKey,UniqueConstraint,DateTime from sqlalchemy.orm import sessionmaker,relationship Base = declarative_base() #生成一个SqlORM 基类 # 程序登陆用户和服务器账户,一个人可以有多个服务器账号,一个服务器账号可以给多个人用 UserProfile2HostUser= Table(‘userprofile_2_hostuser‘,Base.metadata, Column(‘userprofile_id‘,ForeignKey(‘user_profile.id‘),primary_key=True), Column(‘hostuser_id‘,ForeignKey(‘host_user.id‘),primary_key=True), ) class Host(Base): __tablename__=‘host‘ id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22) class HostUser(Base): __tablename__ = ‘host_user‘ id = Column(Integer,primary_key=True) AuthTypes = [ (u‘ssh-passwd‘,u‘SSH/Password‘), (u‘ssh-key‘,u‘SSH/KEY‘), ] auth_type = Column(String(64)) username = Column(String(64),unique=True,nullable=False) password = Column(String(255)) host_id = Column(Integer,ForeignKey(‘host.id‘)) host = relationship(‘Host‘,backref=‘uu‘) __table_args__ = (UniqueConstraint(‘host_id‘,‘username‘, name=‘_host_username_uc‘),) # # obj = session.query(HostUser.username,HostUser.password,Host.hostname,Host.port).join(Host).filter(HostUser.id == 1).first() # (用户名,密码,主机名,端口) class UserProfile(Base): __tablename__ = ‘user_profile‘ id = Column(Integer,primary_key=True) username = Column(String(64),unique=True,nullable=False) password = Column(String(255),nullable=False) # 如果是一个人只能在一个组下 group_id = Column(Integer,ForeignKey(‘group.id‘)) host_list = relationship(‘HostUser‘,secondary=UserProfile2HostUser,backref=‘userprofiles‘) """ # 输入用户名和密码: # obj = session.query(UserProfile).filter(username=输入的用户名, password=输入的密码).first() if not obj: # 堡垒机登录用户对象 # 输入这个人的所有机器 obj.host_list # 当前堡垒机登录用户,所有的服务器用户名 # for item in obj.host_list: # item,是一个HostUser对象 item.password,item.username, # item.host 对象,host对象 item.host.hostname,item.host.port # item 目标机器HostUser对象 host_obj = input(:...) session.add(AuditLog(userprofile_id=obj.id,hostuser_id = host_obj.id, "ifconfig")) """ class AuditLog(Base): __tablename__ = ‘audit_log‘ id = Column(Integer,primary_key=True) userprofile_id = Column(Integer,ForeignKey(‘user_profile.id‘)) hostuser_id = Column(Integer,ForeignKey(‘host_user.id‘)) cmd = Column(String(255)) date = Column(DateTime) """ class Session: session = None def __init__(): engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) ss = sessionmaker(bind=engine) obj = ss() Session.session = obj @classmethod def instance(cls): if not cls.session: cls() return cls.session """
标签:
原文地址:http://www.cnblogs.com/QL8533/p/5733907.html