SQLAlchemy分为两个部分,一个是最常用的ORM对象映射,另一个是核心的SQL expression。 第一个是纯粹的ORM,后面这个不是ORM,而是DBAPI的封装,通过一些sql表达式来避免了直接写sql。 使用SQLAlchemy则可以分为三种方式。
- 使用ORM避免直接书写sql
- 使用raw sql直接书写sql
- 使用sql expression,通过SQLAlchemy的方法写sql表达式
安装
一般来讲对某个底层数据库需要安装相应的驱动,比如使用了mysql,那么需要安装python的mysql驱动,
有很多种选择,SQLAlchemy默认的MySQLdb/MySQL-Python,也可以使用PyMySQL,下面就用PyMySQL作为例子.
在centos上面安装MySQL-Python
- yum install mysql-devel
- pip install MySQL-python
安装PyMySQL: pip install PyMySQL
注意: MySQLdb仅仅支持python2,如果要支持python3,请安装PyMySQL.
定义映射
这里使用两个表来说明,一个用户表users,一个电子邮件表addresses,两者一对多的关系。我们先定义这两个映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship Base = declarative_base() class Address(Base): """电子邮件表""" __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String(30), nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship("User", back_populates="addresses") def __repr__(self): return "<Address(email_address='{}')>".format(self.email_address) class User(Base): """用户表""" __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(10)) fullname = Column(String(20)) password = Column(String(20)) addresses = relationship("Address", order_by=Address.id, back_populates="user") def __repr__(self): return "<User(name='{}', fullname='{}', password='{}')>".format( self.name, self.fullname, self.password)
|
连接数据库并创建表
通过create_engine()
可以连接数据库,另外先要提前创建test这个测试数据库:
1 2 3 4 5 6 7 8 9 10
| from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://root:mysql@127.0.0.1:3306/test', echo=True) Base.metadata.create_all(engine)
|
创建session
对数据库的操作必须先创建一个session,增删改查操作都有这个session负责,
首先我们先创建一个session工厂类,由它来负责后续的session创建
1 2
| from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine)
|
添加用户
1 2 3 4 5 6 7 8 9 10 11
| session = Session() ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') session.add(ed_user) session.add_all([ User(name='wendy', fullname='Wendy Williams', password='foobar'), User(name='mary', fullname='Mary Contrary', password='xxg527'), User(name='fred', fullname='Fred Flinstone', password='blah')]) session.commit()
|
增删查改
查询
在session上面调用query()方法会创建一个Query对象
1 2 3 4 5 6 7 8 9 10
| for user in session.query(User).order_by(User.id): print(user.name, user.fullname) for name in session.query(User.name).filter_by(fullname='Ed Jones'): print(name) for name in session.query(User.name).filter(User.fullname=='Ed Jones'): print(name)
|
删除
1 2
| session.delete(ed_user) session.query(User).filter_by(name='ed').count()
|
关系映射
一对多的关系映射
sqlalchemy使用ForeignKey来指明一对多的关系,表示一对多的关系时,在子表类中通过 foreign key (外键)引用父表类。然后,在父表类中通过 relationship() 方法来引用子表的类:比如一个用户可有多个邮件地址,而一个邮件地址只属于一个用户。那么就是典型的一对多或多对一关系。
注意:两个类中都通过relationship()方法指明相互关系。
在Address类中,我们定义外键,还有对应所属的user对象
1 2 3 4 5 6 7 8 9 10 11
| class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child") class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id'))
|
下面通过几个例子来操作一对多的关系映射
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| jack = User(name='jack', fullname='Jack Bean', password='gjffdd') jack.addresses = [Address(email_address='jack@google.com'), Address(email_address='j25@yahoo.com')] session.add(jack) session.commit() jack = session.query(User).filter_by(name='jack').one() jack.addresses session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all()
|
有时候我们不想使用懒加载,而是要强制一次性加载某个关联数据,那么可以使用subqueryload或者joinedload
1 2 3 4 5 6 7
| from sqlalchemy.orm import subqueryload jack = session.query(User).options(subqueryload(User.addresses)).filter_by(name='jack').one() from sqlalchemy.orm import joinedload jack = session.query(User).options(joinedload(User.addresses)).filter_by(name='jack').one()
|
多对一映射
在多对一的关系中建立双向的关系,这样的话在对方看来这就是一个多对一的关系, 在子表类中附加一个relationship()方法,并且在双方的relationship()方法中使用relationship.back_populates方法参数:
1 2 3 4 5 6 7 8 9 10 11 12
| class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child", back_populates="parent") class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id')) parent = relationship("Parent", back_populates="children")
|
这样的话子表将会在多对一的关系中获得父表的属性,或者可以在单一的relationship()方法中使用backref参数来代替back_populates参数, 推荐使用这种方式,可以少些几句话。
1 2 3 4 5 6 7 8 9
| class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child", backref="parent") class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id'))
|
一对一
一对一就是多对一和一对多的一个特例,只需在relationship加上一个参数uselist=False替换多的一端就是一对一
一对多 => 一对一:
1 2 3 4 5 6 7 8 9
| class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) child = relationship("Child", uselist=False, backref="parent") class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id'))
|
多对一 => 一对一:
1 2 3 4 5 6 7 8 9
| class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) child_id = Column(Integer, ForeignKey('child.id')) child = relationship("Child", backref=backref("parent", uselist=False)) class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True)
|
多对多
多对多关系需要一个中间关联表,通过参数secondary来指定。backref会自动的为子表类加载同样的secondary参数,
所以为了简洁起见仍然推荐这种写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from sqlalchemy import Table, Text post_keywords = Table('post_keywords',Base.metadata, Column('post_id',Integer,ForeignKey('posts.id')), Column('keyword_id',Integer,ForeignKey('keywords.id')) ) class BlogPost(Base): __tablename__ = 'posts' id = Column(Integer,primary_key=True) body = Column(Text) keywords = relationship('Keyword',secondary=post_keywords,backref='posts') class Keyword(Base): __tablename__ = 'keywords' id = Column(Integer,primary_key = True) keyword = Column(String(50),nullable=False,unique=True)
|
如果使用back_populates,那么两个都要定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from sqlalchemy import Table, Text post_keywords = Table('post_keywords',Base.metadata, Column('post_id',Integer,ForeignKey('posts.id')), Column('keyword_id',Integer,ForeignKey('keywords.id')) ) class BlogPost(Base): __tablename__ = 'posts' id = Column(Integer,primary_key=True) body = Column(Text) keywords = relationship('Keyword',secondary=post_keywords,back_populates="parents") class Keyword(Base): __tablename__ = 'keywords' id = Column(Integer,primary_key = True) keyword = Column(String(50),nullable=False,unique=True) parents = relationship('BlogPost',secondary=post_keywords,back_populates="keywords")
|
一些重要参数
relationship()函数接收的参数非常多,比如:backref,secondary,primaryjoin等等。 下面列举一下我用到的参数
不设置cascade,删除parent时,其关联的chilren不会删除,只会把chilren关联的parent.id置为空,
设置cascade后就可以级联删除children
对象的四种状态
对象在session中可能存在的四种状态包括:
- Transient:实例还不在session中,还没有保存到数据库中去,没有数据库身份,像刚创建出来的对象比如User(),仅仅只有mapper()与之关联
- Pending:用add()一个transient对象后,就变成了一个pending对象,这时候仍然没有flushed到数据库中去,直到flush发生。
- Persistent:实例出现在session中而且在数据库中也有记录了,通常是通过flush一个pending实例变成Persistent或者从数据库中querying一个已经存在的实例。
- Detached:一个对象它有记录在数据库中,但是不在任何session中,
关联查询
查询:[http://docs.sqlalchemy.org/en/rel_1_1/orm/query.html]
关联查询: [http://docs.sqlalchemy.org/en/rel_1_1/orm/query.html#sqlalchemy.orm.query.Query.join]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| session.query(User).join(Address).filter(Address.email==”lzjun@qq.com”).all() q = session.query(User).join(Address, User.id==Address.user_id) q = session.query(User).join("orders", "items", "keywords") q = session.query(User).join(User.orders).join(Order.items).join(Item.keywords) address_subq = session.query(Address).\ filter(Address.email_address == 'ed@foo.com').\ subquery() q = session.query(User).join(address_subq, User.addresses) q = session.query(Address).select_from(User).\ join(User.addresses).\ filter(User.name == 'ed') SELECT address.* FROM user JOIN address ON user.id=address.user_id WHERE user.name = :name_1 q = session.query(Node).\ join("children", "children", aliased=True, isouter=True).\ filter(Node.name == 'grandchild 1')
|
参考