一、连接器的安装和配置
pymongo: MongoDB 官方提供的 Python 工具包。官方文档: https://pymongo.readthedocs.io/en/stable/ pip安装,命令如下:
管理 MongoDB 的主要步骤如下:
- 连接到 MongoDB 数据库系统
- 管理 MongoDB 数据库
- 管理 MongoDB 中的集合
- 管理 MongoDB 中的文档
第一步,连接 MongoDB:
- # 方式一: 使用默认的配置
- client = MongoClient()
- # 方式二: 指定主机地址和端口号
- client = MongoClient('localhost', 27017)
- # 方式三: 使用URI连接参数
- client = MongoClient('mongodb://localhost:27017/')
第二步,管理数据库:
- #通过MongoClient对象来管理多个数据库获取数据库(逻辑库)对象
- db = client.DATABASE_NAME
- db = client["DATABASE_NAME"]
- db = client.get_database(name=None, *args)
- # 查看所有的数据库(逻辑库)
- client.list_databases()
- # 删除数据库(逻辑库)
- client.drop_database(name_or_database, *args)
第三步,管理集合
- # 通过数据库对象db来管理集合
- # 获取集合对象
- db = client.DATABASE_NAME
- db.COLLECTION_NAME
- client.DATABASE_NAME.COLLECTION_NAME
- db.get_collection(name, *args)
- # 查看当前数据库下的集合列表
- db.list_collection_names()
- # 删除集合
- db.drop_collection(name_or_collection, *args)
基础操作示例:
- # -*- coding: utf-8 -*-
- # @Time : 2023-03-17 1:47
- # @Author : AmoXiang
- # @File : 1.数据库连接.py
- # @Software: PyCharm
- # @Blog : https://blog.csdn.net/xw1680
-
- from pymongo import MongoClient
-
- # 使用默认配置连接到数据库
- # client = MongoClient()
- # print(client)
- # client.close()
- # 指定主机地址和端口号连接到数据库
- # client = MongoClient('localhost', 27017)
- # 使用URI连接参数连接到数据库
- client = MongoClient('mongodb://localhost:27017/')
- print(client)
- # client.close()
- #
- # # 访问数据库
- # db = client.test
- # db = client["test"]
- # print(db)
- # db = client.get_database('test')
- # client.close()
- # print(db)
- #
- # 查看有哪些数据库
- db_list = client.list_databases()
- # # db_list = client.list_database_names()
- for item in db_list:
- print(item)
- #
- # 查看数据库下有哪些集合
- db_test = client["test"]
- for item in db_test.list_collection_names():
- print(item)
- #
- # # 集合对象操作
- # data = db_test.students.find_one()
- # data = client.test.students.find_one()
- data = client.test.get_collection('students').find_one()
- print(data)
-
二、新增文档
说明:pymongo 在插入数据时可以将 python 的对象转换成 BSON
insert_one():插入一个文档
- # 调用方法
- result = db.COLLECTION_NAME.insert_one(doc)
- # 返回插入的文档ID
- result.inserted _id
insert_many():批量新增文档。调用方法:
- doc_list = [doc1,doc2]
- result = db.COLLECTION_NAME.insert_many(doc_list)
示例:
三、查询文档
pymongo 可以将查询的结果转换成 python 中的对象
常用方法:
- find_one(): 按条件查询一个文档
- find(): 按条件查询多个文档
- count_documents(): 统计满足条件的文档总数
- aggregate(): 聚合统计
- .sort(): 排序
- .skip().limit(): 分页
示例代码:
- # -*- coding: utf-8 -*-
- # @Time : 2023-03-18 15:03
- # @Author : AmoXiang
- # @File : 5.查询文档.py
- # @Software: PyCharm
- # @Blog : https://blog.csdn.net/xw1680
-
- from bson.objectid import ObjectId
- from pymongo import MongoClient, ASCENDING, DESCENDING
-
-
- class LearnMongoDBSearch(object):
- """ MongoDB查询练习 """
-
- def __init__(self):
- self.client = MongoClient()
-
- def search_one(self):
- """ 查询一个文档 """
- temp_obj = self.client.test.newdb.find_one()
- print(temp_obj)
- print('喜欢分数:', temp_obj['likes'])
- # print('注册时间:', temp_obj['reg_date'].date())
- print('姓名:', temp_obj['uname'])
-
- def search_user_by_pk(self, pk):
- obj_id = ObjectId(pk)
- # user_obj = self.client.test.newdb.find_one({'_id': obj_id})
- # 面向对象的方法,有代码提示
- db = self.client.get_database('test')
- users = db.get_collection('newdb')
- user_obj = users.find_one({'_id': obj_id})
- print(user_obj)
-
- def search_many(self):
- """ 查询多个文档 """
- db = self.client.get_database('test')
- students = db.get_collection('students')
- # stu_list = students.find()
- # for item in stu_list:
- # print(item)
-
- # 查询年龄大于12岁的学生
- stu_list = students.find({'age': {'$gt': 12}}, {'stu_name': 1, 'class_name': 1, 'age': 1})
- for item in stu_list:
- # 注意: mongo中存储的整数转换成了浮点数
- print(item)
-
- def paginate(self, page=1, page_size=10):
- """
- 分页处理
- :param page: 当前的页
- :param page_size: 每一页数据大小
- :return:
- """
- db = self.client.get_database('test')
- students = db.get_collection('students')
- offset = (page - 1) * page_size
- stu_list = students.find().skip(offset).limit(page_size)
- return stu_list
-
- def sort_data(self):
- """ 排序 """
- db = self.client.get_database('test')
- grades = db.get_collection('grades')
- # // 将学生的语文成绩从高到低排序
- # db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1});
- #
- # //将学生的语文成绩按照年龄和成绩排序
- # db.grades.find({"grade.course_name": "语文"}).sort({"age": -1, "grade.score": -1});
- # db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1, "age": -1});
-
- # 按某一列排序
- # data_list = grades.find({"grade.course_name": "语文"}).sort("grade.score", DESCENDING);
- # for item in data_list:
- # print(item)
-
- # 按多列排序
- data_list = grades.find({"grade.course_name": "语文"}).sort([('age', DESCENDING),("grade.score", DESCENDING),])
- for item in data_list:
- print(item)
-
- def counter_students(self):
- """ 统计newdb中文档总数 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- # result = newdb.count_documents({})
- result = newdb.count_documents({"uname": {"$eq": "张三"}})
- print(result)
-
- def test_aggregate(self):
- """
- 聚合统计:及格的学生成绩
- """
- db = self.client.get_database('test')
- grades = db.get_collection('grades')
- result = grades.aggregate([
- # //where
- {
- '$match': {"grade.score": {'$gte': 60}}
- },
- # //group by
- {
- '$group': {
- '_id': "$stu_no",
- 'total': {'$sum': 1}
- }
- },
- # // having
- {
- '$match': {
- 'total': {'$eq': 3}
- }
- }
- ])
- for item in result:
- print(item)
-
-
- if __name__ == '__main__':
- obj = LearnMongoDBSearch()
- # obj.search_one()
- # obj.search_user_by_pk('6411ee77b6170000b4003f95')
- # obj.search_many()
- # stu_list = obj.paginate(page=3)
- # for item in stu_list:
- # print(item)
-
- # obj.sort_data()
- # obj.counter_students()
- obj.test_aggregate()
-
四、更新文档
回顾,更新数据表达式,如下表所示:

修改一个文档,调用方法:
- update_one(filter, update, *args)
替换一个文档,调用方法:
- replace_one(filter, replacement, *args)
批量修改文档,调用方法:
- replace_one(filter, replacement, *args)
快捷方法:
- find_one_and_update(filter, update, *args) # 修改一个文档
- find_one_and_replace(filter, replacement, *args) # 替换一个文档
- # 注意返回值的不同
返回结果:
acknowledged:结果是否已经被确认
modified_count:修改的文档数
matched_count:满足条件的文档数
raw_result:原始数据
upserted_id:更新的ID (upsert=True)
示例代码:
- # -*- coding: utf-8 -*-
- # @Time : 2023-03-18 14:56
- # @Author : AmoXiang
- # @File : 4.修改文档.py
- # @Software: PyCharm
- # @Blog : https://blog.csdn.net/xw1680
-
- from pymongo import MongoClient
-
- class LearnMongoDBUpdate(object):
- """ MongoDB更新练习 """
-
- def __init__(self):
- self.client = MongoClient()
-
- def test_update_one(self):
- """ 更新一个文档 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- result = newdb.update_one({}, {'$set': {'likes': 70}})
- print(result.modified_count)
-
- def test_replace_one(self):
- """ 替换一个文档 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- result = newdb.replace_one({}, {'uname': '张三'})
- print(result.modified_count)
-
- def test_update_many(self):
- """ 批量更新文档 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- result = newdb.update_many({}, {'$set': {'likes': 90}})
- print('修改的数量:', result.modified_count)
- print('满足条件的数量:', result.matched_count)
-
- def test_update_shortcut(self):
- """ 更新文档的快捷方法 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- # 此处返回一个文档对象
- result = newdb.find_one_and_update({}, {'$set': {'sex': '未知'}})
- print(result['_id'])
- if __name__ == '__main__':
- obj = LearnMongoDBUpdate()
- # obj.test_update_one()
- # obj.test_replace_one()
- # obj.test_update_many()
- obj.test_update_shortcut()
-
五、删除文档
删除一个文档,调用方法如下:
- result = db.COLLECTION_NAME.delete_one(filter, *args)
- # 返回已经删除的记录数 result.deleted_count
- # 删除时返回文档对象
- find_one_and_delete(filter, *args)
批量删除文档,调用方法:
- result = db.COLLECTION_NAME.delete_many(filter, *args)
- # 返回已经删除的记录数
- result. deleted_count
示例代码:
- # -*- coding: utf-8 -*-
- # @Time : 2023-03-18 14:34
- # @Author : AmoXiang
- # @File : 3.删除文档.py
- # @Software: PyCharm
- # @Blog : https://blog.csdn.net/xw1680
-
- from pymongo import MongoClient
-
-
- class LearnMongoDBDelete(object):
- """ MongoDB删除练习 """
-
- def __init__(self):
- self.client = MongoClient()
-
- def test_delete_one(self):
- """ 删除一个文档 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- result = newdb.delete_one({})
- print(result.deleted_count)
-
- def test_delete_many(self):
- """ 批量删除文档 """
- db = self.client.get_database('test')
- users = db.get_collection('newdb')
- # 删除所有的数据
- result = users.delete_many({"likes": {"$lte": 90}})
- print(result.deleted_count)
-
- def test_delete_shortcut(self):
- """ 删除文档的快捷方法 """
- db = self.client.get_database('test')
- newdb = db.get_collection('newdb')
- result = newdb.find_one_and_delete({})
- print(result['title'])
-
-
- if __name__ == '__main__':
- obj = LearnMongoDBDelete()
- # obj.test_delete_one()
- # obj.test_delete_shortcut()
- obj.test_delete_many()
-
至此今天的学习就到此结束了,笔者在这里声明,笔者写文章只是为了学习交流,以及让更多学习数据库的读者少走一些弯路,节省时间,并不用做其他用途,如有侵权,联系博主删除即可。感谢您阅读本篇博文,希望本文能成为您编程路上的领航者。祝您阅读愉快!
到此这篇关于Python 操作 MongoDB ----非 ODM的文章就介绍到这了,更多相关Python 操作 MongoDB内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!