python数据库操作 | 臭大佬

臭大佬 2020-03-18 21:15:32 1347
简介 python数据库操作

介绍

在 Python 中操作数据库,一般来说有两种方式:

  • 使用 ORM(对象关系映射)模型,例如 SQLALchemy;
  • 使用 Python 的特定数据库软件接口,例如 pymysql;

准备工作

准备一张数据表

CREATE TABLE `articles` (
  `id` int(10) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  `img_url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

pymysql操作数据

安装

pip install pymysql

爬取数据并存储

# coding:utf-8
import requests
from bs4 import BeautifulSoup
import pymysql


def getData():
    '''
    获取数据
    :return:
    '''
    url = "https://www.choudalao.com/"
    wbdata = requests.get(url).text
    soup = BeautifulSoup(wbdata, 'lxml')
    news = soup.select("#body_app > article > div.blogs > ul > li > h3 > a")
    return news


def pymysqlAction():
    '''
    pymysql操作
    :return:
    '''
    # 建立一个连接
    conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', db='python_test', charset='utf8')
    # 创建一个游标
    cursor = conn.cursor()
    news = getData()
    for d in news:
        # 提取出标题和连接信息
        title = d.get_text()
        link = d.get("href")
        sql = "INSERT INTO articles(title,url)VALUES('{0}','{1}');".format(title, link)
        # 执行 SQL 语句
        cursor.execute(sql)
        # 提交
        conn.commit()
    # 关闭游标
    cursor.close()
    # 关闭数据库连接
    conn.close()

if __name__ == '__main__':
    pymysqlAction()

读取数据

# 建立一个连接
conn = pymysql.connect(host='localhost', port=3306, user='root', password='root', db='python_test', charset='utf8')
# 创建一个游标
cursor = conn.cursor()
sql = "select * from articles"
cursor.execute(sql)
# 执行sql语句,获取所有结果
result = cursor.fetchall()
# 执行sql语句,得到一条结果
# result = cursor.fetchone()
print(result)
# 断开连接
cursor.close()
conn.close()

SQLALchemy操作数据

安装

pip install sqlalchemy  -i https://pypi.douban.com/simple

直接使用会报Warning: (1366, "Incorrect string value: '\\xD6\\xD0\\xB9\\xFA\\xB1\\xEA...' for column 'VARIABLE_VALUE' at row 518") result = self._query(query)的错误,建议安装mysql-connector-python驱动

pip install mysql-connector-python  -i https://pypi.douban.com/simple

详情点击查看

执行原生sql

# coding:utf-8
from sqlalchemy import create_engine

def selectDb():
    conn = engine.raw_connection()
    cursor = conn.cursor()
    cursor.execute(
        "select * from articles"
    )
    result = cursor.fetchall()
    print(result)
    cursor.close()
    conn.close()


if __name__ == '__main__':
    engine = create_engine(
        "mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    selectDb()

orm使用

表操作
# coding:utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
# Base是declarative_base的实例化对象
Base = declarative_base()

# 定义一个常量
ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")

# 每个类都要继承Base
class Articls(Base):
    __tablename__ = 'articles'  # 数据库表名称
    # Column是列的意思,固定写法 Column(字段类型, 参数)
    # primary_key主键、index索引、nullable是否可以为空
    id = Column(Integer, primary_key=True)  # id 主键
    title = Column(String(256), nullable=False)  # title列,不可为空
    url = Column(String(256), nullable=False)  # url列,不可为空
    # extra = Column(Text, nullable=True)
    # 相当于Django的ORM的class Meta,是一些元信息
    __table_args__ = (

    )

if __name__ == '__main__':
    # 根据类创建数据库表
    Base.metadata.create_all(ENGINE)
    # 根据类删除数据库表
    #Base.metadata.drop_all(ENGINE)
    pass

增伤改查

数据表

CREATE TABLE `users` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL,
  PRIMARY KEY (`id`),
  KEY `ix_users_name` (`name`)
) ENGINE=MyISAM AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

目录结构

run.py

'''
-*- coding: utf-8 -*-
'''
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from models.Users import Users

ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")
# 每次执行数据库操作的时候,都需要创建一个session,相当于管理器(相当于Django的ORM的objects)
Session = sessionmaker(bind=ENGINE)
# 线程安全,基于本地线程实现每个线程用同一个session
session = scoped_session(Session)

# =======执行ORM操作==========
def select():
    # res = session.query(Users).all()
    # res = session.query(Users).get(1)  # 查询id=1的记录
    # res = session.query(Users).filter(Users.name == "wjf").all()
    # res = session.query(Users).filter_by(name="wjf").all()
    res = session.query(Users).filter_by(name="wjf").first()
    return res

def add(name=''):
    if name == '':
        return False
    obj = Users(name=name)
    session.add(obj)
    # # 批量
    # session.add_all([
    #     Users(name="aaa"),
    #     Users(name="bbb"),
    #     Hosts(name="ccc"),
    # ])
    # 提交
    session.commit()
    # 关闭session
    session.close()

def update(name=''):
    '''
    更新
    :return:
    '''
    if name == '':
        return False
    session.query(Users).filter(Users.id > 0).update({"name" :name})
    #session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    session.commit()
    session.close()

def delete():
    '''
    删除
    :return:
    '''
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    session.close()

if __name__ == '__main__':
    add('德玛西亚')

Users.py

# coding:utf-8
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# Base是declarative_base的实例化对象
Base = declarative_base()
# 定义一个常量
ENGINE = create_engine("mysql+mysqlconnector://root:root@127.0.0.1:3306/python_test")


# 每个类都要继承Base
class Users(Base):
    __tablename__ = 'users'
    # Column是列的意思,固定写法 Column(字段类型, 参数)
    # primary_key主键、index索引、nullable是否可以为空
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    # 相当于Django的ORM的class Meta,是一些元信息
    __table_args__ = (

    )

常规操作

以下操作并没有全部验证,使用前请自行验证

# 条件查询
ret1 = session.query(Users).filter_by(id=1).first()
res2 = session.query(Users).filter(Users.id > 1, Users.name == "wjf").all()
res3 = session.query(Users).filter(Users.id.between(1, 10)).all()
res4 = session.query(Users).filter(~Users.id.in_([1, 10])).first()


from sqlalchemy import and_, or_

res5 = session.query(Users).filter(and_(Users.id > 1, Users.name == "wjf")).first()
res6 = session.query(Users).filter(or_(Users.id > 1, Users.name == "wjf")).first()
res7 = session.query(Users).filter(or_(
    Users.id > 1,
    and_(Users.id > 3, Users.name == "wjf")
)).all()

# 通配符
res8 = session.query(Users).filter(Users.name.like("L%")).all()
res9 = session.query(Users).filter(~Users.name.like("L%")).all()

# 限制
res10 = session.query(Users).filter(~Users.name.like("L%")).all()[1:2]

# 排序
res11 = session.query(Users).order_by(Users.id.desc()).all()  # 倒序
res12 = session.query(Users).order_by(Users.id.asc()).all()  # 正序

# 分组
res13 = session.query(Users.name).group_by(Users.name).all()

# 聚合函数
from sqlalchemy.sql import func

res14 = session.query(
    func.max(Users.id),
    func.sum(Users.name),
    func.min(Users.id)
).group_by(Users.name).having(func.max(Users.id > 22)).all()

# 连表
# print(res15) 得到一个列表套元组 元组里是两个对象
# [(user_obj1, hobby_obj1), (user_obj2, hobby_obj2), ]
res15 = session.query(UserInfo, Hobby).filter(UserInfo.hobby_id == Hobby.id).all()

# print(res16) 得到列表里面是前一个对象,join相当于inner join
# [user_obj1, user_obj2, ]
res16 = session.query(UserInfo).join(Hobby).all()

# 相当于inner join
# for i in res16:
#     # print(i[0].name, i[1].name)
#     print(i.hobby.name)

# 指定isouter=True相当于left join
res17 = session.query(Hobby).join(UserInfo, isouter=True).all()
res17_1 = session.query(UserInfo).join(Hobby, isouter=True).all()

# 或者直接用outerjoin也是相当于left join
res18 = session.query(Hobby).outerjoin(UserInfo).all()
res18_1 = session.query(UserInfo).outerjoin(Hobby).all()
print(res17)
print(res17_1)
print(res18)
print(res18_1)