最简单的操作
import pymysql import timeclass mysql: def init(self, host, user, password, db, port=3306, charset='utf8', timeout=3000): self.host = host self.user = user self.password = password self.db = db self.port = port self.charset = charset self.timeout = timeout self.conn = None self.__conn()
def __conn(self): try: self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, charset=self.charset, db=self.db, connect_timeout=self.timeout) return True except Exception as e: pass def __reconn(self, num=3, sleep=3): ''' 检查是否已连接如未连接,则重新连接 :param num: :param sleep: :return: ''' r_times = 0 s_conn = False while s_conn == False and r_times <= num: try: self.conn.ping() s_conn = True except: if self.__conn(): s_conn = False break r_times += 1 time.sleep(sleep) def excute_sql(self, sql, param=[]): try: self.__reconn() cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute(sql, param) self.conn.commit() # result = cursor.fetchall() # result = cursor.fetchone() cursor.close() return True, cursor.rowcount # , result ''' fetchone():该方法获取下一个查询结果集。结果集是一个对象。 fetchall():接收全部返回结果行。 rowcount:这是一个只读属性,返回执行execute()方法后影响的行数。 ''' except Exception as e: return False, e def select(self, sql='', param=[]): try: self.__reconn() cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) cursor.execute(sql) result = cursor.fetchall() cursor.close() return True, result except Exception as e: return False, e def select_limit(self, sql='', offset=0, length=20, param=[]): sql = '%s limit %d , %d ;' % (sql, offset, length) return self.select(sql, param) def close(self): self.conn.close()if name == 'main': db = mysql(host="127.0.0.1", port=3306, user='root', password='123456', charset='utf8', db='tdb') print(db.excute_sql('insert into t(name) values("dddd")')) print(db.excute_sql('insert into t(name) values(%s)', [''dd'])) print(db.excute_sql('delete from t where name="dddd"')) print(db.select('select * from t'))

评论