跳至主要內容

Klustron(原KunlunBase) 产品使用和评测指南

Klustron大约 63 分钟

Klustron(原KunlunBase) 产品使用和评测指南

概述

本文档指导和帮助 Klustron 用户评测和验证 Klustron 的各项重要功能。用户遵循本文档操作即可安装好 Klustron 集群并且体验和使用到 Klustron 的各主要功能,把本文档作为使用 Klustron 的快速入门手册。 Klustron 的完整的文档请访问 www.doc.klustron.com 用户也可根据本文档来评测和验证 Klustron 各项功能工作正常。 Klustron 团队在进行 Klustron 产品研发过程中,会持续对其各项功能开发测试程序,并且使用自动化测试系统每天运行所有测试程序,以便及时发现和解决问题,确保 Klustron 各项功能工作正常。同时,我们会使用MySQL和PostgreSQL的测试集来针对 Klustron 的存储节点和计算节点进行功能测试。关于 Klustron 的自动化测试系统的日常测试结果,请访问 www.klustron.com 查看。

Klustron 集群架构

Klustron 是一个分布式关系数据库管理系统,面向 TB 和 PB 级别海量数据处理,支持高吞吐量和低延时的极致性能来处理海量数据的高并发读写请求。它支持水平弹性扩容,提供健壮的事务 ACID 保障,高效易用的分布式查询处理,高可扩展性,高可用性和透明的分库分表数据处理功能,业务层和终端用户无感知的水平扩展能力,是典型的 NewSQL 分布式数据库系统。

[图片: KunlunBase产品使用和评测指南相关截图]

Klustron 的上述技术能力确保用户可以在业务持续增长时,随时不停服增加服务器节点来扩大数据存储和处理&计算能力,并且在服务器节点发生故障或者断电时,不丢失数据,并且不停止数据读写服务。确保用户业务持续以最高质量运行。

应用软件开发者只需要按照使用单节点关系数据库相同的方法使用昆仑数据库,就可以得到所有上述 NewSQL 数据库的优点,完全不需要考虑数据的存储和管理的任何细节。用户可以使用PostgreSQL和MySQL两种连接协议连接到 Klustron 集群的计算节点并执行DDL和DML SQL语句。 Klustron 支持标准SQL,PostgreSQL的 DDL 语法,以及PostgreSQL和MySQL 的私有 DML 语法。因此,原本使用PostgreSQL和MySQL的应用程序不需要修改就可以使用 Klustron 。

Klustron 支持 所有SQL 数据分析功能,可以执行 TPC-H 和 TPC-DS 的所有 OLAP 分析 SQL 语句,因此,用户可以把各类业务系统中的数据持续流入 Klustron 集群,并且使用 Klustron 集群对这些持续更新的数据做 OLAP 分析和数据挖掘,以便迅速发现变化和趋势,抓住转瞬即逝的商机,及时应对突发状况,获得巨大的竞争优势。

即使用户的数据量只有若干 GB,也可以使用 Klustron 而获益。Klustron 的 fullsync 功能比MySQL 自带的 semisync(半同步)和 group replication 具有更高的可靠性 --- 这两种技术如果主备节点同时断电那么用户数据可能丢失或者不一致,而 fullsync 确保这种情况下不会丢失用户数据。此时只部署一个存储集群(例如 一主两备)即可。当用户数据和业务负载持续上升时,可以随时按需增加更多的存储集群和计算节点,从而扩大数据存储和查询能力。

同时,用户可以通过读写分离扩展 Klustron 的处理能力。通过使用读写分离技术从备机查询数据,并安排专属的计算节点用于 OLAP 分析查询,可以确保 OLAP 分析任务完全不会影响 OLTP 业务的性能。

使用 Klustron

为 Klustron 初始化服务器(BootStrap)

经过初始化(bootstrap)的服务器才可以安装 Klustron 集群。初始化服务器完成后,用户就可以使用 XPanel 集群管理工具安装 Klustron 集群。

本节介绍如何初始化计算机服务器。

使用脚本初始化服务器

  1. 下载脚本 git clone https://gitee.com/zettadb/cloudnative.git cd cloudnative/cluster

  2. 填写配置文件,详细参数说明参考: 使用集群管理安装脚本指导手册

  3. 填写完配置文件后,开始初始化服务器

     python setup_cluster_manager.py --config=cluster_and_node_mgr.json --action=install     bash -e clustermgr/install.sh
    

创建 Klustron 集群

以下操作皆需要在初始化服务器后进行

XPanel

  1. 进入 xpanel web端:http://host:port/KunlunXPanel

    a. host 就是下载 docker 镜像并运行的服务器对应 ip。

    b. port 是部署 docker 时指定的端口

  2. 首次进入,账号和密码都是 super_dba。

    [图片: KunlunBase产品使用和评测指南相关截图]

  3. 随后会需要修改密码,修改完密码后,进入到xpanel主界面

    [图片: KunlunBase产品使用和评测指南相关截图]

  4. 增加计算机列表

    a. 检查需要增加的计算机是否存在列表中,不存在则进行该步骤,存在则跳过

    b. 目前只能一个一个添加,该列表里的对应的服务器必须安装有 node_mgr,且配置正确

    c. 系统管理 -- 用户管理 -- 新增 -- 填写对应的参数 -- 确认

    [图片: KunlunBase产品使用和评测指南相关截图]

  5. 在新增完计算机后,就可以创建集群了

    a. 集群管理 -- 集群列表 -- 新增 [图片: KunlunBase产品使用和评测指南相关截图] b. 根据需求填写对应的参数值 [图片: KunlunBase产品使用和评测指南相关截图] c. 等集群安装好后,可以在 集群展示 界面查看对应的节点信息和主备关系 集群管理 -- 集群列表 -- 点击对应的集群 [图片: KunlunBase产品使用和评测指南相关截图]

使用 Klustron 做数据读写

各编程语言准备使用 Klustron

当前 Klustron 同时支持 postgres 和MySQL协议。

计算节点MySQL协议端口号可以通过进入计算节点 pg 端口,使用 show MySQL_port; 查看

命令行连接

  • 连接 Klustron pg 协议,可以通过 tcp 和 url 的方式进行连接

    • Tcp:psql -h host -p port -U username -d databaseName
    • url: psql postgres://userName:passwd@host:port/databaseName
  • 连接 KlustronMySQL协议,可以通过 tcp 和 socket 的方式进行连接

    • TCP:MySQL -h host -P port -ppassword -u userName
      • 注意:-p 和密码之间不能有空格

各语言 connector 下载方式

  • go 和 rust 都可以直接通过自带的包管理器下载所需依赖,不需要额外安装
    • go 推荐 1.17.5 版本或者更高版本
    • rust 推荐1.59.0 版本或者更高版本

java[图片: KunlunBase产品使用和评测指南相关截图]

在 maven 仓库选择好对应的 jar 包版本后,点击 jar 就可以下载对应jar包

  • MySQL connector/j

    • https://dev.MySQL.com/downloads/connector/j/ MySQL 官网下载
    • https://mvnrepository.com/artifact/MySQL/MySQL-connector-java maven 仓库
  • PostgreSQL JDBC Driver

    • https://mvnrepository.com/artifact/org.PostgreSQL/PostgreSQL maven 仓库
  • mariadb connector/j - 3.x 版本

    • https://mariadb.org/connector-java/all-releases/ mariadb官网,官网只有最新的 3.x 版本
    • https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client maven 仓库
  • mariadb connector/j - 2.x 版本

    • http://zettatech.tpddns.cn:14000/thirdparty/ 泽拓科技官网
    • 因为官方 2.0 版本在连接计算节点会有一些 bug,所以不建议直接使用 mariadb 官网的 2.x 版本 connector

python

  • Psycopg2

    • pip install psycopg2 通过 pip 安装
    • pip install psycopg2==$version 通过 pip 安装
    • https://github.com/psycopg/psycopg2 通过 setup.pg build
  • MySQL-connector/python

    • https://downloads.MySQL.com/archives/c-python/ MySQL官网
    • pip install MySQL-connector-python 通过 pip 安装 8.x 版本MySQL connector/python
    • https://github.com/MySQL/MySQL-connector-python 通过 setup.pg build
  • pyMySQL

    • pip install pyMySQL 通过 pip 安装
    • https://github.com/PyMySQL/PyMySQL 通过 setup.pg build

nodejs

  • pg

    • npm install pg 通过 npm 工具安装 pg connector
  • MySQL

    • npm install MySQL 通过npm工具安装MySQL connector
    • https://downloads.MySQL.com/archives/c-nodejs/ MySQL官网下载

php

  • MySQL
    • https://downloads.MySQL.com/archives/c-php/ MySQL官网下载

c

  • PostgreSQL

    • sudo apt-get install libpq++-dev -y ubuntu 安装 pg 依赖
  • MySQL

    • sudo apt-get install libMySQL++-dev -y ubuntu 安装MySQL依赖

c#

  • PostgreSQL

    • dotnet add package Npgsql
  • MySQL

    • dotnet add package MySQL.Data

c++

  • PostgreSQL
sudo apt install libpq-dev
git clone https://github.com/jtv/libpqxx.git
cd libpqxx
./configure
make
sudo make install
  • MySQL
  •  sudo apt-get install libMySQLclient-dev  安装 c++ 连接MySQL的依赖
    
  •  https://downloads.MySQL.com/archives/c-cpp/ MySQL官方 connector 下载
    

使用各编程语言对 Klustron 执行增删改查 DML 语句

以下所有示例可以在 https://gitee.com/zettadb/cloudnative 这里下载

  • pg 相关示例在 cloudnative/smoke 目录下
  • MySQL 相关示例在 cloudnative/smoke/somkeTest-MySQL目录下

java

  • PostgreSQL -JDBC-Driver
package kunlun.test;

/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 */

import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

public class SmokeTest {

    static {
        try {
            Class.forName("org.PostgreSQL.Driver");
            //Class.forName("com.MySQL.cj.jdbc.Driver");
        } catch (Exception ex) {
        }
    }

    public static Connection getConnection(String user,
                                           String password,
                                           String host,
                                           int port,
                                           String dbname) {
        String proto = "PostgreSQL";
        Properties props = new Properties();
        props.setProperty("user", user);
        props.setProperty("password", password);
        String url = "jdbc:" + proto+"://" + host + ":" + port + "/" + dbname;
        try {
            return DriverManager.getConnection(url, props);
        } catch (Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

    public static void smokeTest(Connection conn) throws Exception{
        boolean autocommit = conn.getAutoCommit();
        System.out.println("default autocommit: " + autocommit);
        conn.setAutoCommit(true);
        Statement st =conn.createStatement();
        st.execute("SET client_min_messages TO 'warning';");
        st.execute("drop table if exists t1;");
        st.execute("RESET client_min_messages;");
        String createSql = "create table t1(id integer primary key, " +
                           "info text, wt integer);";
        st.execute(createSql);
        st.execute("insert into t1(id,info,wt) values(1, 'record1', 1);");
        st.execute("insert into t1(id,info,wt) values(2, 'record2', 2);");
        st.execute("update t1 set wt = 12 where id = 1;");
        ResultSet res1 = st.executeQuery("select * from t1;");
        System.out.printf("res1:%s%n", showResults(res1).toString());
        res1.close();
        st.close();

        String pstr = "select * from t1 where id=?";
        PreparedStatement ps = conn.prepareStatement(pstr);
        ps.setInt(1, 1);
        ResultSet pres = ps.executeQuery();
        System.out.printf("pres1:%s%n", showResults(pres).toString());
        ps.setInt(1, 2);
        pres = ps.executeQuery();
        System.out.printf("pres2:%s%n", showResults(pres).toString());
        ps.close();

        pstr = "update t1 set info=? , wt=? where id=?";
        ps = conn.prepareStatement(pstr);
        ps.setString(1, "Rec1");
        ps.setInt(2, 2);
        ps.setInt(3, 1);
        ps.execute();
        ps.setString(1, "Rec2");
        ps.setInt(2, 3);
        ps.setInt(3, 2);
        ps.execute();
        ps.close();

        st =conn.createStatement();
        ResultSet res2 = st.executeQuery("select * from t1;");
        System.out.printf("res2:%s%n", showResults(res2).toString());
        res2.close();
        st.execute("delete from t1 where id = 1;");
        ResultSet res3 = st.executeQuery("select * from t1;");
        System.out.printf("res3:%s%n", showResults(res3).toString());
        res3.close();
        st.execute("drop table t1;");
        st.close();
        conn.setAutoCommit(autocommit);
    }

    /*
     * We do the following actions:
     * 1 Create the able
     * 2 Insert two records
     * 3 Update the first record.
     * 4 Query the records(res1).
     * 5 Delete the second record.
     * 6 Query the records again(res2).
     * 7 Drop the table.
     */
    public static void smokeTestFile(Connection conn, String cmdfile) throws Exception{
        boolean autocommit = conn.getAutoCommit();
        System.out.println("default autocommit: " + autocommit);
        conn.setAutoCommit(true);
        Statement st =conn.createStatement();
        BufferedReader br = new BufferedReader(new FileReader(cmdfile));
        String cmd = null;
        do {
            cmd = br.readLine();
            if (cmd == null) {
                break;
            }
            if (cmd.toUpperCase().startsWith("SELECT")) {
                ResultSet res = st.executeQuery(cmd);
                System.out.printf("sql:%s, res:%s%n", cmd,
                                  showResults(res).toString());
                res.close();
            } else {
                st.execute(cmd);
            }
        } while (cmd != null);
        br.close();
        st.close();
        conn.setAutoCommit(autocommit);
    }

    private static List<List<String>> showResults(ResultSet res)
        throws Exception {
        LinkedList<List<String>> results = new LinkedList<>();
        int cols = res.getMetaData().getColumnCount();
        while (res.next()) {
            List<String> row = new ArrayList<>(cols);
            for (int i = 0; i < cols; i++) {
                row.add(res.getString(i + 1));
            }
            results.addLast(row);
        }
        return results;
    }

    public static void test1(String[] args) throws Exception{
        String host = args[0];
        int port = Integer.valueOf(args[1]);
        String user = "abc";
        String password = "abc";
        String database = "postgres";
        Connection conn = getConnection(user, password, host, port, database);
        smokeTest(conn);
        conn.close();
    }

    public static void main(String[] args) throws Exception {
        test1(args);
    }
}
  • MySQL connector/j
  • 编译:javac test.java
  • 运行:java -cp .:./MySQL-connector-java-$version.jar test
import java.sql.DriverManager;
import java.util.ArrayList;
import java.sql.*;
import java.util.*;

public class MySQL {
    // MySQL 8.0 以下版本 - JDBC 驱动名及数据库 URL
    static final String JDBC_DRIVER = "com.MySQL.jdbc.Driver";
    // MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL
    //static final String JDBC_DRIVER = "com.MySQL.cj.jdbc.Driver";
    // 数据库的用户名与密码,需要根据自己的设置
    static final String USER = "user";
    static final String PASS = "pwd";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;

        //处理传入的参数
        String host = Arrays.toString(args);
        String urls1 = "jdbc:MySQL://" + host + "/postgres";
        String urls2 = urls1.replace("[","");
        String urls = urls2.replace("]","");

        try{
            // 注册 JDBC 驱动
            Class.forName(JDBC_DRIVER);

            // 打开链接
            System.out.println("连接数据库" + host + "...");
            conn = DriverManager.getConnection(urls,USER,PASS);

            // 执行查询
            System.out.println(" 实例化Statement对象...");
            stmt = conn.createStatement();
            ArrayList sqls = new ArrayList();
            sqls.add("drop table if exists myjdbc_sm;");
            sqls.add("create table myjdbc_sm(a int primary key, b text);");
            sqls.add("insert into myjdbc_sm values(1, 'abc'),(2, 'bcd'),(3, 'cde');");
            sqls.add("select * from myjdbc_sm;");
            sqls.add("update myjdbc_sm set b = 'def' where a = 1;");
            sqls.add("select * from myjdbc_sm;");
            sqls.add("delete from myjdbc_sm where a = 3;");
            sqls.add("select * from myjdbc_sm;");

            for (int i = 0; i <= 7; ++i){
                String sql = (String) sqls.get(i);
                System.out.println(sql);

                if (sql == "select * from myjdbc_sm;"){
                    ResultSet rs = stmt.executeQuery(sql);
                    while(rs.next()) {
                        int a = rs.getInt("a");
                        String b = rs.getString("b");
                        System.out.print(" a: " + a + " b: " + b + "\n");
                    }
                    rs.close();
                }
                else {
                    stmt.execute(sql);
                }
            }

            stmt.close();
            conn.close();
        }catch(SQLException se){
            // 处理 JDBC 错误
            se.printStackTrace();
        }catch(Exception e){
            // 处理 Class.forName 错误
            e.printStackTrace();
        }finally{
            // 关闭资源
            try{
                if(stmt!=null) stmt.close();
            }catch(SQLException se2){
            }// 什么都不做
            try{
                if(conn!=null) conn.close();
            }catch(SQLException se){
                se.printStackTrace();
            }
        }
        System.out.println("Goodbye!");
    }
}

python

  • Psycopg2
import psycopg2
import sys

def test(hoststr, portstr):
        intport = int(portstr)
        conn = psycopg2.connect(host=hoststr, port=intport, user='abc', password='abc', database='postgres')
        conn.autocommit = True
        cur = conn.cursor()
        sqls=["SET client_min_messages TO 'warning';",
                "drop table if exists t1111",
                "RESET client_min_messages;",
                "create table t1111(id int primary key, info text, wt int)",
                "insert into t1111(id,info,wt) values(1, 'record1', 1)",
                "insert into t1111(id,info,wt) values(2, 'record2', 2)",
                "update t1111 set wt = 12 where id = 1", "select * from t1111",
                "delete from t1111 where id = 1", "select * from t1111",
                "prepare q1(int) as select*from t1111 where id=$1",
                "begin",
                "execute q1(1)",
                "execute q1(2)",
                "prepare q2(text,int, int) as update t1111 set info=$1 , wt=$2 where id=$3",
                "execute q2('Rec1',2,1)",
                "commit",
                "execute q2('Rec2',3,2)",
                "drop table t1111"]
        for sql in sqls:
                res = cur.execute(sql+";")
                print "command:%s, res:%s" % (sql, str(res))

if __name__ == '__main__':
        test(sys.argv[1], sys.argv[2])

  • MySQL connector/python
import argparse
from time import sleep
from MySQL import connector
def test(sql, host, port, user, pwd, db):
    conn = connector.connect(buffered=True, host=host, port=port, user=user, passwd=pwd, database=db, ssl_disabled=True)
    cur = conn.cursor()

    print(sql)
    if sql == 'select * from MySQL_connector_python;':
        cur.execute(sql)
        rs = cur.fetchall()
        srs = str(rs)
        srs = srs.replace('[(', '')
        srs = srs.replace(')]', '')
        srs = srs.replace('), (', '\n------\n')
        srs = srs.replace(',', ' |')
        srs = srs.replace('\'', '')
        print('--------\na | b\n------\n' + srs + '\n--------')
    else:
        cur.execute(sql)

    conn.commit()
    cur.close()
    conn.close()

def execSql(host, port, user, pwd, db):
    sql = ['drop table if exists MySQL_connector_python;',
            'create table MySQL_connector_python(a int primary key, b text);',
            "insert into MySQL_connector_python values(1,'a'),(2,'b'),(3,'c');",
            'select * from MySQL_connector_python;',
            "update MySQL_connector_python set b = 'abc' where a = 3;",
            'select * from MySQL_connector_python;',
            'delete from MySQL_connector_python where a = 3;',
            'select * from MySQL_connector_python;']
    for sqls in sql:
        test(sqls, host, port, user, pwd, db)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description = 'this script is use to test ddl replication!')
    parser.add_argument('--host', help='host')
    parser.add_argument('--port', default=3306, help='port')
    parser.add_argument('--db', default='MySQL', help='database name')
    parser.add_argument('--pwd', default='root', help='password')
    parser.add_argument('--user', default='root', help='user name')
    args = parser.parse_args()
    host = args.host
    port = args.port
    db   = args.db
    pwd  = args.pwd
    user = args.user

    print(host, str(port))
    execSql(host, port, user, pwd, db)

nodejs

  • PostgreSQL
const { findSourceMap } = require('module');
const { CLIENT_RENEG_LIMIT } = require('tls');
const pg=require('./node_modules/pg');

var arguments = process.argv.splice(2);
console.log('host:', arguments[0], 'port: ', arguments[1]);

var conString = ('postgres://abc:abc@'+arguments[0]+':'+arguments[1]+'/postgres');
var client = new pg.Client(conString);


client.connect(function(err){
    if(err){
        return console.error('数据库连接出错',err);
    }

    console.log("")
    console.log("=========== JS Driver ==============");
    client.query('drop table if exists smoketesttable_js;',function(err,data){
        if(err){
            return console.error('step 1 : droped table failed!',err);

        }else{
            console.log('step 1 : drop table success!')
        }
    })
  client.query('drop table if exists smoketesttable_js1;');//再运行一次的原因是因为如果失败了就只有一个failed!提示,没有报错信息。所以再运行一次让这个报错信息显示出来

    client.query('create table smoketesttable_js(id int primary key,name text,gender text);',function(err,data){
        if(err){
            return console.error('step 2 : create failed!',err);
        }else{
            console.log('step 2 : create table success!')
        }
    })
  client.query('create table smoketesttable_js1(id int primary key,name text,gender text);')

    client.query("insert into smoketesttable_js values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');",function(err,data){
        if(err){
            return console.error('step 3 : insert failed!',err);
        }else{
            console.log('step 3 : insert data success!')
        }
    })
  client.query("insert into smoketesttable_js1 values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');")

    client.query("delete from smoketesttable_js where id = 1;",function(err){
        if(err){
            return console.error('step 4 : delete failed!')
        }else{
            console.log("step 4 : delete data success!")
        }
    })
  client.query("delete from smoketesttable_js1 where id = 1;")

    client.query("update smoketesttable_js set gender = 'male' where id = 2;",function(err){
        if(err){
            return console.error("step 5 : update failed!")
        }else{
            console.log('step 5 : update gender success!')
        }
    })
  client.query("update smoketesttable_js1 set gender = 'male' where id = 2;")


    client.query("select * from smoketesttable_js;",function(err){
        if(err){
            return console.error("select failed!")
            client.query("step 6 : select * from smoktesttable_js;")
        }else{
            console.log('step 6 : select table success!')
        }
    })
    client.query("select * from smoketesttable_js1;")

    client.query("commit",function(err){
        if(err){
            return console.error("select failed!")
        }else{
            console.log('step 6 : commit success!')
        }
        client.end();
        console.log("====================================");
  console.log("")
    })


})
  • MySQL
// 使用process获取命令行传的参数
var arguments = process.argv.splice(2);
var hosts = arguments[0];
var ports  = arguments[1];

var MySQL  = require('MySQL');

var connection = MySQL.createConnection({
    host     : hosts,
    user     : 'abc',
    password : 'abc',
    port: ports,
    database: 'postgres'
});
//console.log(connection)


connection.connect();

var sql1 = 'drop table if exists myjs_sm';
var sql2 = 'create table if not exists myjs_sm(a int primary key,b text);'
var sql3 = 'insert into myjs_sm values(1,"abc")';
var sqls = 'select * from myjs_sm';
var sql4 = 'update myjs_sm set b = "asd" where a = 1';
var sql5 = 'delete from myjs_sm where a = 1';

var  execsql=function(arg1){
//查
  connection.query(arg1, function (err, result) {
                if(err){
                console.log(err.message);
                return;
        }

        console.log('-----------------------[ "' + arg1 + '" ]');
        console.log(result);
        console.log('------------------------------------------------------------\n\n');
  });

  //connection.end();
}

execsql(sql1);
execsql(sql2);
execsql(sql3);
execsql(sqls);
execsql(sql4);
execsql(sqls);
execsql(sql5);
connection.end();

php

  • PostgreSQL
    • php smoketest-pg.php host port
<?php
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 */

function showresults($ret) {
  echo "=== select results ===\n";
  while ($row = pg_fetch_row($ret)) {
        $str="";
        foreach ($row as $item){
                $str .= $item."  ";
        }
        echo "row: $str\n";
  }
}

function checkret($db, $cmd, $ret) {
  if (!$ret) {
        echo $cmd.pg_last_error($db)."\n";
        exit(1);
  } else {
        echo $cmd.":success\n";
  }
}

$host = "host=$argv[1]";
$port = "port=$argv[2]";
$dbname = "dbname=postgres";
$credentials = "user=abc password=abc";
$connstr="$host $port $dbname $credentials";

echo "conn string: $connstr\n";

$db = pg_connect($connstr);
if(!$db){
  echo "Error : Unable to open database\n";
  exit(1);
} else {
  echo "Opened database successfully\n";
}

$sql = "SET client_min_messages TO 'warning';";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "drop table if exists t1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "RESET client_min_messages;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "create table t1(id integer primary key, info text, wt integer);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "insert into t1(id,info,wt) values(1, 'record1', 1);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "insert into t1(id,info,wt) values(2, 'record2', 2);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "update t1 set wt = 12 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "select info from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

$sql = 'select * from t1 where id=$1';
$ret = pg_prepare($db, "prep", $sql);
checkret($db,  "prep:".$sql, $ret);

$ret = pg_execute($db, "prep", array(1));
showresults($ret);

$ret = pg_execute($db, "prep", array(2));
showresults($ret);

$sql = 'update t1 set info=$1 , wt=$2 where id=$3';
$ret = pg_prepare($db, "prep2", $sql);
checkret($db, "prep:".$sql, $ret);

$ret = pg_execute($db, "prep2", array('Rec1', 2, 1));
checkret($db, "prep2:{Rec1,2,1}", $ret);

$ret = pg_execute($db, "prep2", array('Rec2', 3, 2));
checkret($db, "prep2:{Rec2,3,2}", $ret);

$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

$sql = "delete from t1 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

pg_close($db)
?>

  • MySQL
    • php smoketest-my.php host port
<?php
$host = "$argv[1]";
$port = "$argv[2]";
$dbname = "postgres";
$user = "abc";
$pwd = "abc";

$conn = MySQLi_connect($host, $user, $pwd, $dbname, $port) or die("数据库连接错误!");

$sql1 = "drop table if  exists myphp_sm \n";
$sql2 = "create table if not exists myphp_sm(a int primary key, b text)\n";
$sql3 = "insert into myphp_sm values (1,'abc')\n";
$sql4 = "select * from myphp_sm\n";
$sql5 = "update myphp_sm set b = 'asd' where a = 1\n";
$sql6 = "select * from myphp_sm\n";
$sql7 = "delete from myphp_sm where a = 1\n";

$rs = MySQLi_query($conn, $sql1);
echo "$sql1 success\n";

$rs = MySQLi_query($conn, $sql2);
echo "$sql2 success\n";

$rs = MySQLi_query($conn, $sql3);
echo "$sql3 success\n";

$rs = MySQLi_query($conn, $sql4);
echo "$sql4 success\n";
$row = MySQLi_fetch_array($rs);
var_dump($row);

$rs = MySQLi_query($conn, $sql5);
echo "$sql5 success\n";

$rs = MySQLi_query($conn, $sql6);
echo "$sql6 success\n";
$row = MySQLi_fetch_array($rs);
var_dump($row);

$rs = MySQLi_query($conn, $sql7);
echo "$sql7 success\n";
?>

go

  • MySQL
    • 环境设置及编译
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct
go mod init smoketest_my #初始化脚本
go mod tidy #该步go的包管理器会把依赖库自动处理好
go build # build
  •  ./smoketest_my -h host -p port
    
package main

import (
    "fmt"
    _ "github.com/go-sql-driver/MySQL"
    "github.com/jmoiron/sqlx"
    "flag"
)

func checkError(err error) {
        if err != nil {
                panic(err)
        }
}

func main() {
        var User string
        var Pwd string
        var Host string
        var Port int
        var Dbname string
        flag.StringVar(&Host,"h","","默认为空")
        flag.IntVar(&Port,"p",5001,"默认为5001")
        flag.StringVar(&Pwd,"pwd","abc","默认为abc")
        flag.StringVar(&Dbname,"d","postgres","默认为postgres")
        flag.StringVar(&User,"u","abc","默认为abc")
        flag.Parse()

        fmt.Println("============= Golang-MySQL ============")
        // Initialize connection string.
        var connectionString string = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", User, Pwd, Host, Port, Dbname)

        // Initialize connection object.
        db, err := sqlx.Open("MySQL", connectionString)
        checkError(err)

        err = db.Ping()
        checkError(err)
        fmt.Println("Successfully connection to database!!!")

        // Drop table
        _, err = db.Exec("drop table if exists mygo_sm;")
        checkError(err)
        fmt.Println("Successfully drop   table")

        // Create table.
        _, err = db.Exec("create table mygo_sm(id int primary key,name text,gender text);")
        checkError(err)
        fmt.Println("Successfully create table")

        // Insert
//        sql_statement := "insert into mygo_sm values ($1, $2, $3);"
        _, err = db.Exec("insert into mygo_sm values( 1, 'banana', 'male')")
        checkError(err)
        _, err = db.Exec("insert into mygo_sm values(2, 'orange', 'female')")
        checkError(err)
        _, err = db.Exec("insert into mygo_sm values(3, 'apple', 'male')")
        checkError(err)
        fmt.Println("Successfully insert table")

        _, err = db.Exec("delete from mygo_sm where id = 2")
        checkError(err)
        fmt.Println("Successfully delete table")

        _, err = db.Exec("update mygo_sm set name = 'update' where id = 3")
        checkError(err)
        fmt.Println("Successfully update table")

        _, err = db.Exec("select * from mygo_sm")
        checkError(err)
        fmt.Println("Successfully select table")

        fmt.Println("=================================")
}
  • PostgreSQL
    • 环境设置及编译,参考上条
    • ./smoketest -h host -p port
package main

import (
  "database/sql"
  "fmt"
  "flag"
  _ "github.com/lib/pq"
)

const (
  // Initialize connection constants.
  //HOST     = "mydemoserver.postgres.database.azure.com"
  //DATABASE = "postgres"
  //USER     = "abc"
  //PASSWORD = "abc"
)

func checkError(err error) {
  if err != nil {
        panic(err)
  }
}

func main() {
        var User string
        var Pwd string
        var Host string
        var Port int
  var Db string
  flag.StringVar(&Host,"h","","默认为空")
  flag.IntVar(&Port,"p",5001,"默认为5001")
  flag.StringVar(&Pwd,"pwd","abc","默认为abc")
  flag.StringVar(&Db,"d","postgres","默认为postgres")
  flag.StringVar(&User,"u","abc","默认为abc")
  flag.Parse()

  fmt.Println("============= Golang ============")
  // Initialize connection string.
  var connectionString string = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", Host, Port,User,Pwd,Db)

  // Initialize connection object.
  db, err := sql.Open("postgres", connectionString)
  checkError(err)

  err = db.Ping()
  checkError(err)
        fmt.Println("Successfully connection to database!!!")

  // Drop previous table of same name if one exists.
  _, err = db.Exec("drop table if exists SmokeTestTable_go;")
  checkError(err)
  fmt.Println("Successfully drop   table")

  // Create table.
  _, err = db.Exec("create table SmokeTestTable_go(id int primary key,name text,gender text);")
  checkError(err)
  fmt.Println("Successfully create table")

  // Insert some data into table.
  sql_statement := "insert into SmokeTestTable_go values ($1, $2, $3);"
  _, err = db.Exec(sql_statement, 1, "banana", "male")
  checkError(err)
  _, err = db.Exec(sql_statement, 2, "orange", "female")
  checkError(err)
  _, err = db.Exec(sql_statement, 3, "apple", "male")
  checkError(err)
  fmt.Println("Successfully insert table")

  _, err = db.Exec("delete from Smoketesttable_go where id = 2")
        checkError(err)
        fmt.Println("Successfully delete table")

        _, err = db.Exec("update SmokeTestTable_go set name = 'update' where id = 3")
        checkError(err)
        fmt.Println("Successfully update table")

  _, err = db.Exec("select * from SmokeTestTable_go")
        checkError(err)
        fmt.Println("Successfully select table")

  fmt.Println("=================================")
}

c

  • PostgreSQL
    • gcc -o smokeTest smokeTest.c -I/path/PostgreSQL-11.5-rel/include -L/path/PostgreSQL-11.5-rel/lib -lpq
    • ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=user password=pwd"
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 *
 * gcc -o smokeTest smokeTest.c -I/path/PostgreSQL-11.5-rel/include -L/path/PostgreSQL-11.5-rel/lib -lpq
 * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
 *
 * Test the C version of libpq, the PostgreSQL frontend library.
 */
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

static void
exit_nicely(PGconn *conn)
{
        PQfinish(conn);
        exit(1);
}

int
main(int argc, char **argv)
{
        const char *conninfo;
        PGconn     *conn;
        PGresult   *res;
        int                     nFields;
        int                     i,
                                j;

        /*
         * If the user supplies a parameter on the command line, use it as the
         * conninfo string; otherwise default to setting dbname=postgres and using
         * environment variables or defaults for all other connection parameters.
         */
        if (argc > 1)
                conninfo = argv[1];
        else
                conninfo = "dbname = postgres host=192.168.0.104 port=6401 user=abc password=abc";

        /* Make a connection to the database */
        conn = PQconnectdb(conninfo);

        /* Check to see that the backend connection was successfully made */
        if (PQstatus(conn) != CONNECTION_OK)
        {
                fprintf(stderr, "Connection to database failed: %s",
                                PQerrorMessage(conn));
                exit_nicely(conn);
        }

        /* Set always-secure search path, so malicous users can't take control. */
        res = PQexec(conn, "SET client_min_messages TO 'warning';");
        PQclear(res);
        res = PQexec(conn, "drop table if exists t1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "drop table ok\n");
        }
        PQclear(res);
        res = PQexec(conn, "RESET client_min_messages;");
        PQclear(res);
        /*
         * Our test case here involves using a cursor, for which we must be inside
         * a transaction block.  We could do the whole thing with a single
         * PQexec() of "select * from pg_database", but that's too trivial to make
         * a good example.
         */

        /* Start a transaction block */
        res = PQexec(conn, "create table t1(id integer primary key,info text, wt integer)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "create table command failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "create table ok\n");
        }
        PQclear(res);

        /*
         * Fetch rows from pg_database, the system catalog of databases
         */
        res = PQexec(conn, "insert into t1(id,info,wt) values(1, 'record1', 1)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "insert record 1 failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "insert record 1 ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "insert into t1(id,info,wt) values(2, 'record2', 2)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "insert record 2 failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "insert record 2 ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "update t1 set wt = 12 where id = 1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "update record failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "update record ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "select * from t1");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
                fprintf(stderr, "select failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                int nFields = PQnfields(res);
                for(i=0;i<nFields;i++)
                {
                        fprintf(stderr, "%s\t\t",PQfname(res,i));
                }
                fprintf(stderr, "\n");

                for(i=0;i<PQntuples(res);i++)
                {
                        for(j=0;j<nFields;j++)
                        {
                                fprintf(stderr, "%s\t\t",PQgetvalue(res,i,j));
                        }
                        fprintf(stderr, "\n");
                }
        }
        PQclear(res);

        res = PQexec(conn, "delete from t1 where id = 1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "delete record failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "delete record ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "drop table t1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "drop table ok\n");
        }
        PQclear(res);

        /* close the connection to the database and cleanup */
        PQfinish(conn);

        return 0;
}
  • MySQL
  • gcc -I/usr/include/MySQL -L/usr/lib/MySQL MySQL.c -lMySQLclient -o MySQL
  • ./MySQL host port
#include <stdio.h>
#include <MySQL.h>

int main(int argc,char** argv){
  printf("connect to %s:%s\n",argv[1], argv[2]);
  printf("version: %s\n", MySQL_get_client_info());
  MySQL* my = MySQL_init(NULL);
  int port = atoi(argv[2]);

  if(!MySQL_real_connect(my, ("%s", argv[1]), "abc", "abc", "postgres", port, NULL, 0)){
        printf("connect error !\n");
        MySQL_close(my);
  }

  printf("drop table if exists myc_sm;\n");
  MySQL_query(my, "drop table if exists myc_sm;");

  printf("create table myc_sm;\n");
  MySQL_query(my, "create table myc_sm(a int primary key, b text);");

  printf("insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')\n");
  MySQL_query(my, "insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')");

  void select(void)
  {
        printf("\n\nselect * from myc_sm;\n");
        int res = MySQL_query(my, "select * from myc_sm;");
        MySQL_RES* a = MySQL_store_result(my);
        int rows = MySQL_num_rows(a);
        int cols = MySQL_num_fields(a);
        printf("rows: %d, cols: %d\n", rows, cols);
        MySQL_FIELD *field = MySQL_fetch_fields(a);
        for(int i = 0; i < cols; i++)
        {
                printf("%-10s\t", field[i].name);
        }
        puts("");
                MySQL_ROW line;
        for(int i = 0; i < rows; i++)
                {
                        line =  MySQL_fetch_row(a);
                        for(int j = 0; j < cols; j++)
                        {
                                printf("%-10s\t", line[j]);
                        }
                        puts("");
        }
  }

  select();

  printf("update myc_sm set b = 'def' where a = 1;");
  MySQL_query(my, "update myc_sm set b = 'def' where a = 1;");
  select();

  printf("delete from myc_sm where a = 3;");
  MySQL_query(my, "delete from myc_sm where a = 3;");
  select();

  MySQL_close(my);
}

c#

  • PostgreSQL
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Npgsql;

namespace SmokeTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("\n========== C# Driver ============");
            string connSting = "Host=localhost;Port=5401;Username=abc;Password=abc;Database=postgres";
            var conn = new NpgsqlConnection(connSting);

            NpgsqlDataAdapter DA = new NpgsqlDataAdapter();
            //NpgsqlCommand cmd_select = new NpgsqlCommand("select * from SmokeTestTable_csharp");
            //DA.SelectCommand = cmd_select;




            string drop1 = "drop table if exists SmokeTestTable_csharp;";
            string create = "create table SmokeTestTable_csharp(id int primary key,name text,gender text);";
            string insert = "insert into SmokeTestTable_csharp values(1,'li','male'),(2,'delete_me','male'),(3,'update_me','female');";
            string create2 = "create table testfordelete(id int primary key);";
            string droptab = "drop table testfordelete;";
            string delete = "delete from Smoketesttable_csharp where id = 2";
            string update = "update SmokeTestTable_csharp set name = 'update' where id = 3";
            string select = "select * from SmokeTestTable_csharp";
            string drodb = "drop database if exists smoketestdb;";
            string commit = "commit;";
            string credb = "create database smoketestdb;";
            string swdb = "use smoketestdb";
            string dropdb = "drop database smoketestdb;";



            conn.Open();

            using (var com1 = new NpgsqlCommand(drop1, conn))
            using (var com2 = new NpgsqlCommand(create, conn))
            using (var com3 = new NpgsqlCommand(insert, conn))
            using (var com4 = new NpgsqlCommand(create2, conn))
            using (var com5 = new NpgsqlCommand(droptab, conn))
            using (var com6 = new NpgsqlCommand(delete, conn))
            using (var com7 = new NpgsqlCommand(update, conn))
            using (var com8 = new NpgsqlCommand(select, conn))
            using (var drobd1 = new NpgsqlCommand(drodb,conn))
            using (var credb1 = new NpgsqlCommand(credb, conn))
            using (var swdb1 = new NpgsqlCommand(swdb, conn))
            using (var dropdb1 = new NpgsqlCommand(dropdb, conn))
            using (var comm = new NpgsqlCommand(commit,conn))

            {

                //drobd1.ExecuteNonQuery();
                //Console.WriteLine("drop table success!");
                //credb1.ExecuteNonQuery();
                //Console.WriteLine("create database success!");
                //comm.ExecuteNonQuery();
                //Console.WriteLine("commit success!");
                //swdb1.ExecuteNonQuery();
                //Console.WriteLine("switch database success!");

                com1.ExecuteNonQuery();
                Console.WriteLine("drop   table success!");
                com2.ExecuteNonQuery();
                Console.WriteLine("create table success!");
                com3.ExecuteNonQuery();
                Console.WriteLine("insert table success!");
                com4.ExecuteNonQuery();
                Console.WriteLine("create table success!");
                com5.ExecuteNonQuery();
                Console.WriteLine("drop   table success!");
                com6.ExecuteNonQuery();
                Console.WriteLine("delete table success!");
                com7.ExecuteNonQuery();
                Console.WriteLine("update table success!");
                com8.ExecuteNonQuery();
                Console.WriteLine("select table success!");
                comm.ExecuteNonQuery();
                Console.WriteLine("commit table success!");
                //dropdb1.ExecuteNonQuery();
                //Console.WriteLine("drop database success!");
            }
            conn.Close();
            Console.WriteLine("=================================\n");
        }
    }
}
  • MySQL
using System;
using System.Collections.Generic;
using System.Data;
using MySQL.Data.MySQLClient;
using System.Text;

namespace MySQL
{
    class Program
    {
        static void Main(string[] args)
        {
        //server=127.0.0.1;port=3306;user=root;password=root; database=minecraftdb;
                string cs = "server=" + args[0] + ";user=abc;password=abc;port=" + args[1] + ";database=postgres";
                Console.WriteLine("testing MySQL: " + cs);

        MySQLConnection conn = null;
        conn = new MySQLConnection(cs);
        //conn.Open();

        //Console.WriteLine("drop table if exists mycs_sm;");
        //MySQLCommand cmd = new MySQLCommand("drop table if exists mycs_sm;", conn);
        //int n = cmd.ExecuteNonQuery();

        List<string> sqlList = new List<string>();
        sqlList.Add("drop table if exists mycs_sm;");
        sqlList.Add("create table mycs_sm(a int primary key, b text);");
        sqlList.Add("insert into mycs_sm values(1,'abc'),(2,'bcd'),(3,'cde');");
        sqlList.Add("select * from mycs_sm;");
        sqlList.Add("update mycs_sm set b = 'def' where a = 1;");
        sqlList.Add("select * from mycs_sm;");
        sqlList.Add("delete from mycs_sm where a = 3;");
        sqlList.Add("select * from mycs_sm;");

        foreach (string i in sqlList){
                Console.WriteLine(i);
                List<string> list = new List<string>();
                if (i == "select * from mycs_sm;"){
                        conn.Open();
                        MySQLCommand cmd = new MySQLCommand(i, conn);
                        MySQLDataReader reader = cmd.ExecuteReader();
                        while (reader.Read()){
                                string id = reader.GetString("a");
                                string name = reader.GetString("b");
                                Console.WriteLine(id + " : " + name);
                        }
                        conn.Close();
                }
                else {
                        conn.Open();
                        MySQLCommand cmd = new MySQLCommand(i, conn);
                        cmd.ExecuteNonQuery();
                        conn.Close();
                }
        }

        //conn.Close();
        }
    }
}

c++

  • PostgreSQL
    • 编译
      • g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 *
 * sudo apt install libpq-dev
 * git clone https://github.com/jtv/libpqxx.git
 * cd libpqxx
 * ./configure
 * make
 * sudo make install
 *
 * g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
 * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
 *
 * Test the C++ version of libpqxx, the PostgreSQL frontend library.
 */
#include <iostream>
#include <pqxx/pqxx>

using namespace std;
using namespace pqxx;

int
main(int argc, char **argv)
{
  const char *conninfo;

  if (argc > 1)
        conninfo = argv[1];
  else
        conninfo = "dbname = postgres user=abc password=abc hostaddr=127.0.0.1 port=5401";

  try{
        pqxx::connection db(conninfo);
        if (db.is_open()) {
                cout << "Opened database successfully: " << db.dbname() << endl;
        } else {
                cout << "Can't open database" << endl;
                return 1;
        }

        pqxx::nontransaction txn1{db};
        txn1.exec("drop table if exists t1");
        txn1.exec("create table t1(id integer primary key, info text, wt integer)");
        txn1.commit();

        pqxx::work txn2{db};
        txn2.exec("insert into t1(id,info,wt) values(1, 'record1', 1)");
        txn2.exec("insert into t1(id,info,wt) values(2, 'record2', 2)");
        txn2.exec("insert into t1(id,info,wt) values(3, 'record3', 3)");
        txn2.exec("update t1 set wt = 12 where id = 1");
        txn2.exec("delete from t1 where id = 2");

        pqxx::result r2{txn2.exec("select * from t1")};
        for (auto row: r2)
                std::cout << row[0] << " " << row[1] << " " << row[2] << std::endl;

        txn2.commit();

        pqxx::nontransaction txn3{db};
        txn3.exec("drop table t1");
        txn3.commit();

        db.close();
  }catch (const std::exception &e){
        cerr << e.what() << std::endl;
        return 1;
  }

  return 0;
}
  • MySQL
    • 编译
      • g++ MySQL.cpp -lMySQLcppconn -o MySQL
// g++ MySQL.cpp -lMySQLcppconn -o MySQLtest
// ./MySQLtest "tcp://192.168.0.113:5661"
#include "MySQL_connection.h"
#include <stdlib.h>
#include <iostream>
#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
using namespace std;
int main(int argc,char* argv[]){
  sql::Driver *driver;
        sql::Connection *con;
        sql::Statement *stmt;
        sql::ResultSet *res;

  /* Create a connection */
        driver = get_driver_instance();
  //string infos = sprintf("\"tcp://" , argv[1] , "\"");
  //string in1 = "\"tcp://";
  //string in2 = "\"";
  //string infos = in1 + argv[1] + in2;
  string infos = argv[1];
        con = driver->connect(infos, "abc", "abc");
  con->setSchema("postgres");
  stmt = con->createStatement();
        stmt->execute("drop table if exists mycpp_sm;");
  cout<<"drop table if exists mycpp_sm;"<<endl;
  stmt->execute("create table mycpp_sm(a int primary key, b text)");
  cout<<"create table mycpp_sm(a int primary key, b text)"<<endl;
  stmt->execute("insert into mycpp_sm values(1, 'abc'),(2,'bcd'),(3,'cde')");
  cout<<"insert into mycpp_sm(1, 'abc'),(2,'bcd'),(3, 'cde')"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
  stmt->execute("update mycpp_sm set b = 'qwer' where a = 2");
  cout<<"update mycpp_sm set b = 'qwer' where a = 2"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
        stmt->execute("delete from mycpp_sm where a = 3");
  cout<<"delete from mycpp_sm where a = 3"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
  delete stmt;
  delete con;
}

rust

  • PostgreSQL
    • 编译
      • cargo build(该步会把所有依赖下载并安装)
use clap::{App, Arg};
use postgres::{Client, NoTls};

const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: &str = "7999";
const DEFAULT_USER: &str = "abc";
const DEFAULT_PASS: &str = "abc";
const DEFAULT_DB: &str = "postgres";

struct ConnectionArgs {
    pub host: String,
    pub port: String,
    pub user: String,
    pub pass: String,
    pub db: String,
}

fn parse_args() -> ConnectionArgs {
    let matches = App::new("Execute SQL in Postgres")
        .arg(
            Arg::with_name("host")
                .short("h")
                .long("host")
                .value_name("HOSTNAME")
                .default_value(DEFAULT_HOST)
                .help("Sets the host name of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("port")
                .short("p")
                .long("port")
                .value_name("PORT")
                .default_value(DEFAULT_PORT)
                .help("Sets the port of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("user")
                .short("u")
                .long("user")
                .value_name("USERNAME")
                .default_value(DEFAULT_USER)
                .help("Sets the user to log into PostgreSQL")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("pass")
                .long("pass")
                .value_name("PASSWORD")
                .default_value(DEFAULT_PASS)
                .help("Sets the user's password")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("db")
                .short("d")
                .long("database")
                .value_name("DATABASE")
                .default_value(DEFAULT_DB)
                .help("Sets the database to use")
                .takes_value(true),
        )
        .get_matches();

    ConnectionArgs {
        host: matches.value_of("host").unwrap().to_string(),
        port: matches.value_of("port").unwrap().to_string(),
        user: matches.value_of("user").unwrap().to_string(),
        pass: matches.value_of("pass").unwrap().to_string(),
        db: matches.value_of("db").unwrap().to_string(),
    }
}

fn main() {
    let args = parse_args();

    let sqls = [
        "drop table if exists t1;",
        "create table t1(id integer primary key, info text, wt integer);",
        "insert into t1(id,info,wt) values(1, 'record1', 1);",
        "insert into t1(id,info,wt) values(2, 'record2', 2);",
        "update t1 set wt = 12 where id = 1;",
        "select * from t1;",
        "delete from t1 where id = 1;",
        "select * from t1;",
    ];

    let mut client = Client::connect(
        &format!(
            "postgres://{}:{}@{}:{}/{}",
            args.user, args.pass, args.host, args.port, args.db
        ),
        NoTls,
    )
    .unwrap();

    for sql in sqls {
        // 或者使用 let result = client.query(sql, &[]);
        // query 会输出结果,execute 只返回影响的行数
        let result = client.execute(sql, &[]);
        println!("command: {}, res: {:?}", sql, result);
    }
}
  • MySQL
    • 编译
      • cargo build(该步会把所有依赖下载并安装)
use MySQL::*;
use MySQL::prelude::*;
//use std::env;
use clap::{App, Arg};

fn main() {
    let matches = App::new("Execute SQL in Postgres")
        .arg(
            Arg::with_name("host")
                .short("h")
                .long("host")
                .value_name("HOSTNAME")
                .help("Sets the host name of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
           Arg::with_name("port")
                .short("p")
                .long("port")
                .value_name("PORT")
                .help("Sets the port of PostgreSQL service")
                .takes_value(true),
        ).get_matches();
    let host = matches.value_of("host").unwrap_or("192.168.0.113").to_string();
    let port = matches.value_of("port").unwrap_or("5662").to_string();
    let front = "MySQL://pgx:pgx_pwd@".to_string();

    let hosts = front + &host + ":" + &port + "/MySQL";
    let url = hosts.to_string();
    // let mut hosts = front + &host + ":".to_string() + &port + "/MySQL".to_string();
//    let mut url = format!("MySQL://pgx:pgx_pwd@{:?}:{:?}/MySQL", host, port);

    let urls = "MySQL://abc:abc@192.168.0.113:5662/postgres";
    println!("{}", url);
    println!("{}", urls);
    let pool = Pool::new(urls).unwrap(); // 获取连接池
    let mut conn = pool.get_conn().unwrap();// 获取链接

    let sql = [
        "drop table if exists myrs_sm;",
        "create table myrs_sm(a int primary key, b text);",
        "insert into myrs_sm values(1, 'abc'),(2,'bcd'),(3,'cde');",
        "select * from myrs_sm;",
        "update myrs_sm set b = 'def' where a = 1",
        "select * from myrs_sm;",
        "delete from myrs_sm where a = 3",
        "select * from myrs_sm;"
    ];

    let selects = "select * from myrs_sm;";

    for sqls in sql{
        conn.query_iter(sqls).unwrap();
        println!();
        println!("{:?}", sqls);
        if sqls == "select * from myrs_sm;"{
            conn.query_iter(selects).unwrap()
                .for_each(|row|{
                let r:(i32,String)=from_row(row.unwrap());
                println!("result: a={},b='{}'",r.0,r.1);
            });
        }
    }
}

基本 DDL

Database

create database

CREATE DATABASE name
    [ [ WITH ] [ OWNER [=] user_name ]
           [ TEMPLATE [=] template ]
           [ ENCODING [=] encoding ]
           [ LOCALE [=] locale ]
           [ LC_COLLATE [=] lc_collate ]
           [ LC_CTYPE [=] lc_ctype ]
           [ TABLESPACE [=] tablespace_name ]
           [ ALLOW_CONNECTIONS [=] allowconn ]
           [ CONNECTION LIMIT [=] connlimit ]
           [ IS_TEMPLATE [=] istemplate ] ]

创建一个新的数据库

create database kunlun;

要在一个默认表空间pg_default中创建一个被用户 abc 有的新数据库 kunlun:

CREATE DATABASE kunlun OWNER abc TABLESPACE pg_default;

更多的参考链接 create databaseopen in new window

alter database

ALTER DATABASE name [ [ WITH ] option [ ... ] ]


这里 option 可以是:

    ALLOW_CONNECTIONS allowconn
    CONNECTION LIMIT connlimit
    IS_TEMPLATE istemplate

ALTER DATABASE name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }

ALTER DATABASE name SET TABLESPACE new_tablespace

ALTER DATABASE name SET configuration_parameter { TO | = } { value | DEFAULT }
ALTER DATABASE name SET configuration_parameter FROM CURRENT
ALTER DATABASE name RESET configuration_parameter
ALTER DATABASE name RESET ALL

修改数据库的所有者

ALTER DATABASE kunlun OWNER TO abc;

在数据库 kunlun 中禁用索引扫描

ALTER DATABASE kunlun SET enable_indexscan TO off;

修改数据库的最大链接数:

alter database kunlun connection limit 10;

更多的参考链接 alter databaseopen in new window

drop database

DROP DATABASE [ IF EXISTS ] name

更多的参考链接 drop databaseopen in new window

Schema

create schema

CREATE SCHEMA schema_name [ AUTHORIZATION role_specification ] [ schema_element [ ... ] ]
CREATE SCHEMA AUTHORIZATION role_specification [ schema_element [ ... ] ]
CREATE SCHEMA IF NOT EXISTS schema_name [ AUTHORIZATION role_specification ]
CREATE SCHEMA IF NOT EXISTS AUTHORIZATION role_specification

其中 role_specification 可以是:

    user_name
  | CURRENT_USER
  | SESSION_USER

创建一个新的模式

create schema myschema;

为用户创建 kunlun 创建一个模式,此模式也同时命名为 kunlun :

CREATE SCHEMA AUTHORIZATION kunlun;

更多的参考链接 create schemaopen in new window

alter schema

ALTER SCHEMA name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }

更改模式的所有者

 alter schema kunlun owner to vito;

更多的参考链接 alter schemaopen in new window

drop schema

DROP SCHEMA [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除名为 kunlun 的 schema:

drop schema kunlun;

更多的参考链接 drop schemaopen in new window

Table

create table

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name ( [
  { column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]
    | table_constraint
    | LIKE source_table [ like_option ... ] }
    [, ... ]
] )
[ INHERITS ( parent_table [, ... ] ) ]
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name
    OF type_name [ (
  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]
    | table_constraint }
    [, ... ]
) ]
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name
    PARTITION OF parent_table [ (
  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]
    | table_constraint }
    [, ... ]
) ] { FOR VALUES partition_bound_spec | DEFAULT }
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

其中 column_constraint 是:

[ CONSTRAINT constraint_name ]
{ NOT NULL |
  NULL |
  DEFAULT default_expr |
  GENERATED ALWAYS AS ( generation_expr ) STORED |
  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |
  UNIQUE index_parameters |
  PRIMARY KEY index_parameters |
  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]
    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

table_constraint 是:

[ CONSTRAINT constraint_name ]
{ 
  UNIQUE ( column_name [, ... ] ) index_parameters |
  PRIMARY KEY ( column_name [, ... ] ) index_parameters |
like_option 是:

{ INCLUDING | EXCLUDING } { COMMENTS | CONSTRAINTS | DEFAULTS | GENERATED | IDENTITY | INDEXES | STATISTICS | STORAGE | ALL }

partition_bound_spec 是:

IN ( partition_bound_expr [, ...] ) |
FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )
  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |
WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )

UNIQUE、PRIMARY KEY以及EXCLUDE约束中的index_parameters是:

[ INCLUDE ( column_name [, ... ] ) ]
[ WITH ( storage_parameter [= value] [, ... ] ) ]
[ USING INDEX TABLESPACE tablespace_name ]

一个EXCLUDE约束中的exclude_element是:

{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

创建表 t1:

create table t1(a int , b int);

创建临时表 v1:

create temp table v1(a int, b text);

建立 hash 分区表:

CREATE TABLE t2 (
    order_id     bigint not null,
    cust_id      bigint not null,
    status       text
) PARTITION BY HASH (order_id);

创建一个分区表:

CREATE TABLE t3 (
    logdate         date not null,
    peaktemp        int,
    unitsales       int
) PARTITION BY RANGE (logdate);

更多的参考链接

create tableopen in new window

alter table

ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    action [, ... ]
ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    RENAME [ COLUMN ] column_name TO new_column_name
ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    RENAME CONSTRAINT constraint_name TO new_constraint_name
ALTER TABLE [ IF EXISTS ] name
    RENAME TO new_name
ALTER TABLE [ IF EXISTS ] name
    SET SCHEMA new_schema
ALTER TABLE ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]
    SET TABLESPACE new_tablespace [ NOWAIT ]
ALTER TABLE [ IF EXISTS ] name
    ATTACH PARTITION partition_name { FOR VALUES partition_bound_spec | DEFAULT }
ALTER TABLE [ IF EXISTS ] name
    DETACH PARTITION partition_name

其中action 是以下之一:

    ADD [ COLUMN ] [ IF NOT EXISTS ] column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]
    DROP [ COLUMN ] [ IF EXISTS ] column_name [ RESTRICT | CASCADE ]
    ALTER [ COLUMN ] column_name [ SET DATA ] TYPE data_type [ COLLATE collation ] [ USING expression ]
    ALTER [ COLUMN ] column_name SET DEFAULT expression
    ALTER [ COLUMN ] column_name DROP DEFAULT
    ALTER [ COLUMN ] column_name { SET | DROP } NOT NULL
    ALTER [ COLUMN ] column_name DROP EXPRESSION [ IF EXISTS ]
    ALTER [ COLUMN ] column_name ADD GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ]
    ALTER [ COLUMN ] column_name { SET GENERATED { ALWAYS | BY DEFAULT } | SET sequence_option | RESTART [ [ WITH ] restart ] } [...]
    ALTER [ COLUMN ] column_name DROP IDENTITY [ IF EXISTS ]
    ALTER [ COLUMN ] column_name SET STATISTICS integer
    ALTER [ COLUMN ] column_name SET ( attribute_option = value [, ... ] )
    ALTER [ COLUMN ] column_name RESET ( attribute_option [, ... ] )
    ALTER [ COLUMN ] column_name SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
    ADD table_constraint [ NOT VALID ]
    ADD table_constraint_using_index
    ALTER CONSTRAINT constraint_name [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
    VALIDATE CONSTRAINT constraint_name
    DROP CONSTRAINT [ IF EXISTS ]  constraint_name [ RESTRICT | CASCADE ]
    ENABLE REPLICA TRIGGER trigger_name
    ENABLE ALWAYS TRIGGER trigger_name
    DISABLE RULE rewrite_rule_name
    ENABLE RULE rewrite_rule_name
    ENABLE REPLICA RULE rewrite_rule_name
    ENABLE ALWAYS RULE rewrite_rule_name
    DISABLE ROW LEVEL SECURITY
    ENABLE ROW LEVEL SECURITY
    FORCE ROW LEVEL SECURITY
    NO FORCE ROW LEVEL SECURITY
    SET TABLESPACE new_tablespace
    SET ( storage_parameter [= value] [, ... ] )
    RESET ( storage_parameter [, ... ] )
    INHERIT parent_table
    NO INHERIT parent_table
    OF type_name
    NOT OF
    OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
    REPLICA IDENTITY { DEFAULT | USING INDEX index_name | FULL | NOTHING }

and partition_bound_spec is:

IN ( partition_bound_expr [, ...] ) |
FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )
  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |
WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )

and column_constraint is:

[ CONSTRAINT constraint_name ]
{ NOT NULL |
  NULL |
  DEFAULT default_expr |
  GENERATED ALWAYS AS ( generation_expr ) STORED |
  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |
  UNIQUE index_parameters |
  PRIMARY KEY index_parameters |
  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]
    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

而table_constraint是:

[ CONSTRAINT constraint_name ]
{  UNIQUE ( column_name [, ... ] ) index_parameters |
  PRIMARY KEY ( column_name [, ... ] ) index_parameters |
  EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] |
    [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

并且 table_constraint_using_index 是:

    [ CONSTRAINT constraint_name ]
    { UNIQUE | PRIMARY KEY } USING INDEX index_name
    [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

UNIQUE、PRIMARY KEY以及EXCLUDE约束中的index_parameters是:

[ INCLUDE ( column_name [, ... ] ) ]
[ WITH ( storage_parameter [= value] [, ... ] ) ]
[ USING INDEX TABLESPACE tablespace_name ]

exclude_element in an EXCLUDE constraint is:

{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

重命名表 kunlun 的名字:

create table kunlun(a int not null);
alter table kunlun rename to t1;

向 t1 表中增加一列:

alter table t1 add column bb text;

bb 字段重命名为 b:

alter table t1 rename column bb to b;

修改表中 b 字段的数据类型:

ALTER TABLE t1 ALTER COLUMN b type varchar(10);

向 b 字段增加唯一约束:

alter table t1 add constraint unique_t1_b unique (b);

为 b 列增加一个非空约束:

ALTER TABLE t1 ALTER COLUMN b SET NOT NULL;

移除 b 列的非空约束:

ALTER TABLE t1 ALTER COLUMN b drop NOT NULL;

把 t1 表移动至另一个模式中:

create schema kunlun;
ALTER TABLE public.t1 SET SCHEMA kunlun;

更多的参考链接 alter tableopen in new window

drop table

DROP TABLE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除表 t1:

drop table t1;

更多的参考链接 drop tableopen in new window

Index

create index

CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] name ] ON [ ONLY ] table_name [ USING method ]
    ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
    [ INCLUDE ( column_name [, ...] ) ]
    [ WITH ( storage_parameter = value [, ... ] ) ]
    [ TABLESPACE tablespace_name ]
    [ WHERE predicate ]

更多的参考链接 create indexopen in new window

alter index

ALTER INDEX [ IF EXISTS ] name RENAME TO new_name
ALTER INDEX [ IF EXISTS ] name SET TABLESPACE tablespace_name
ALTER INDEX name ATTACH PARTITION index_name
ALTER INDEX name DEPENDS ON EXTENSION extension_name
ALTER INDEX [ IF EXISTS ] name SET ( storage_parameter = value [, ... ] )
ALTER INDEX [ IF EXISTS ] name RESET ( storage_parameter [, ... ] )
ALTER INDEX [ IF EXISTS ] name ALTER [ COLUMN ] column_number
    SET STATISTICS integer
ALTER INDEX ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]
    SET TABLESPACE new_tablespace [ NOWAIT ]

重命名一个现有的索引:

ALTER INDEX distributors RENAME TO suppliers;

更多的参考链接 alter indexopen in new window

drop index

DROP INDEX [ CONCURRENTLY ] [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除一个存在的索引:

drop index suppliers;

更多的参考链接 drop indexopen in new window

Sequence

create sequence

CREATE [ TEMPORARY | TEMP ] SEQUENCE [ IF NOT EXISTS ] name [ INCREMENT [ BY ] increment ]
    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]
    [ START [ WITH ] start ] [ CACHE cache ] [ [ NO ] CYCLE ]
    [ OWNED BY { table_name.column_name | NONE } ]

创建一个名为 kunlun 的序列从 100 开始:

CREATE SEQUENCE kunlun START 100;
SELECT nextval('kunlun');
 nextval
---------
     100
SELECT nextval('kunlun');
 nextval
---------
     101

更多的参考链接 create sequenceopen in new window

alter sequence

ALTER SEQUENCE [ IF EXISTS ] name
    [ AS data_type ]
    [ INCREMENT [ BY ] increment ]
    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]
    [ START [ WITH ] start ]
    [ RESTART [ [ WITH ] restart ] ]
    [ CACHE cache ] [ [ NO ] CYCLE ]
    [ OWNED BY { table_name.column_name | NONE } ]
ALTER SEQUENCE [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER SEQUENCE [ IF EXISTS ] name RENAME TO new_name
ALTER SEQUENCE [ IF EXISTS ] name SET SCHEMA new_schema

更改 kunlun 序列从 200 开始:

ALTER SEQUENCE kunlun RESTART WITH 200;
SELECT nextval('kunlun');
 nextval
---------
     200
SELECT nextval('kunlun');
 nextval
---------
     201

更多的参考链接 alter sequenceopen in new window

drop sequence

DROP SEQUENCE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除 kunlun 序列:

DROP SEQUENCE kunlun;

更多的参考链接 drop sequenceopen in new window

View

create view

CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] [ RECURSIVE ] VIEW name [ ( column_name [, ...] ) ]
    [ WITH ( view_option_name [= view_option_value] [, ... ] ) ]
    AS query
    [ WITH [ CASCADED | LOCAL ] CHECK OPTION ]

创建一个为 v1 的视图:

create table t1(id int, a int);
insert into t1(id,a) values (1,2),(2,4),(3,8),(6,0);
CREATE VIEW v1 AS
    SELECT *
    FROM t1
    WHERE id<5;

更多的参考链接 create viewopen in new window

alter view

ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name SET DEFAULT expression
ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name DROP DEFAULT
ALTER VIEW [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER VIEW [ IF EXISTS ] name RENAME TO new_name
ALTER VIEW [ IF EXISTS ] name SET SCHEMA new_schema
ALTER VIEW [ IF EXISTS ] name SET ( view_option_name [= view_option_value] [, ... ] )
ALTER VIEW [ IF EXISTS ] name RESET ( view_option_name [, ... ] )

更改 v1 视图的名称:

alter view v1 rename to vv1;

更多的参考链接 alter viewopen in new window

drop view

DROP VIEW [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除 vv1 视图:

drop view vv1;

更多的参考链接 drop viewopen in new window

存储过程

http://192.168.0.104/pgdocs/html/storage.html

用户和权限管理

create role

CREATE ROLE name [ [ WITH ] option [ ... ] ]

where option可以是:

      SUPERUSER | NOSUPERUSER
    | CREATEDB | NOCREATEDB
    | CREATEROLE | NOCREATEROLE
    | INHERIT | NOINHERIT
    | LOGIN | NOLOGIN
    | REPLICATION | NOREPLICATION
    | BYPASSRLS | NOBYPASSRLS
    | CONNECTION LIMIT connlimit
    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL
    | VALID UNTIL 'timestamp'
    | IN ROLE role_name [, ...]
    | IN GROUP role_name [, ...]
    | ROLE role_name [, ...]
    | ADMIN role_name [, ...]
    | USER role_name [, ...]
    | SYSID uid

创建一个可以登录的角色 kunlun,它没有密码:

create role kunlun LOGIN;

创建一个可以登陆且有密码的用户:kunlun1: CREATE USERCREATE ROLE完全相同,除了它带有LOGIN;

CREATE USER kunlun1 WITH PASSWORD '12345678';

创建用户 kunlun2,密码有效期到 2024年1月1日:

CREATE ROLE kunlun2 WITH LOGIN PASSWORD '12345678' VALID UNTIL '2024-01-01';

创建一个可以创建数据库和管理角色的角色 kunlun3:

CREATE ROLE kunlun3 WITH CREATEDB CREATEROLE;

更多的参考链接 create roleopen in new window

alter role

ALTER ROLE role_specification [ WITH ] option [ ... ]

其中option可以是:

      SUPERUSER | NOSUPERUSER
    | CREATEDB | NOCREATEDB
    | CREATEROLE | NOCREATEROLE
    | INHERIT | NOINHERIT
    | LOGIN | NOLOGIN
    | REPLICATION | NOREPLICATION
    | BYPASSRLS | NOBYPASSRLS
    | CONNECTION LIMIT connlimit
    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL
    | VALID UNTIL 'timestamp'

ALTER ROLE name RENAME TO new_name

ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter { TO | = } { value | DEFAULT }
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter FROM CURRENT
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET configuration_parameter
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET ALL

其中role_specification可以是:

    role_name
  | CURRENT_USER
  | SESSION_USER

更改 kunlun1 的密码:

ALTER ROLE kunlun1 WITH PASSWORD '87654321';

移除 kunlun2 的密码:

ALTER ROLE kunlun2 WITH PASSWORD NULL;

更多的参考链接 alter roleopen in new window

drop role

DROP ROLE [ IF EXISTS ] name [, ...]

删除创建的角色 kunlun:

drop role kunlun;

更多的参考链接 drop roleopen in new window

grant

GRANT { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { [ TABLE ] table_name [, ...]
         | ALL TABLES IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )
    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }
    ON [ TABLE ] table_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { USAGE | SELECT | UPDATE }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { SEQUENCE sequence_name [, ...]
         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }
    ON DATABASE database_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON DOMAIN domain_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN DATA WRAPPER fdw_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN SERVER server_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { EXECUTE | ALL [ PRIVILEGES ] }
    ON { { FUNCTION | PROCEDURE | ROUTINE } routine_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]
         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON LANGUAGE lang_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }
    ON LARGE OBJECT loid [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
    ON SCHEMA schema_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { CREATE | ALL [ PRIVILEGES ] }
    ON TABLESPACE tablespace_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON TYPE type_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

其中role_specification可以是:

    [ GROUP ] role_name
  | PUBLIC
  | CURRENT_USER
  | SESSION_USER

GRANT role_name [, ...] TO role_name [, ...] [ WITH ADMIN OPTION ]

把表 t1 上的插入特权授予给所有用户:

GRANT INSERT ON t1 TO PUBLIC;

更多的参考链接 grantopen in new window

revoke

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { [ TABLE ] table_name [, ...]
         | ALL TABLES IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )
    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }
    ON [ TABLE ] table_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { USAGE | SELECT | UPDATE }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { SEQUENCE sequence_name [, ...]
         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }
    ON DATABASE database_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON DOMAIN domain_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN DATA WRAPPER fdw_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN SERVER server_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { EXECUTE | ALL [ PRIVILEGES ] }
    ON { { FUNCTION | PROCEDURE | ROUTINE } function_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]
         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON LANGUAGE lang_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }
    ON LARGE OBJECT loid [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
    ON SCHEMA schema_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { CREATE | ALL [ PRIVILEGES ] }
    ON TABLESPACE tablespace_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON TYPE type_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ ADMIN OPTION FOR ]
    role_name [, ...] FROM role_name [, ...]
    [ CASCADE | RESTRICT ]

从 public 收回表t1上的插入特权:

REVOKE INSERT ON t1 FROM PUBLIC;

更多的参考链接 revokeopen in new window

其他DDL

truncateopen in new window

TRUNCATE [ TABLE ] [ ONLY ] name [ * ] [, ... ]
    [ RESTART IDENTITY | CONTINUE IDENTITY ] [ CASCADE | RESTRICT ]

更多的参考链接 truncateopen in new window

基本DML

insert

[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    [ OVERRIDING { SYSTEM | USER } VALUE ]
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

其中 conflict_target 可以是以下之一:

    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_name

并且 conflict_action 是以下之一:

    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]

向表 t1 中插入数据:

create table t1(id int, a int);
insert into t1(id,a) values (1,2);
insert into t1(id,a) values (2,4),(3,8),(6,0);

更多的参考链接 insertopen in new window

update

[ WITH [ RECURSIVE ] with_query [, ...] ]
UPDATE [ ONLY ] table_name [ * ] [ [ AS ] alias ]
    SET { column_name = { expression | DEFAULT } |
          ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
          ( column_name [, ...] ) = ( sub-SELECT )
        } [, ...]
    [ FROM from_list ]
    [ WHERE condition | WHERE CURRENT OF cursor_name ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

在 t1 表中 a 列中的 4 改为 3:

UPDATE t1 SET  a='3' WHERE id=2;

更多的参考链接 updateopen in new window

delete

[ WITH [ RECURSIVE ] with_query [, ...] ]
DELETE FROM [ ONLY ] table_name [ * ] [ [ AS ] alias ]
    [ USING from_item [, ...] ]
    [ WHERE condition | WHERE CURRENT OF cursor_name ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

在 t1 表中删除 id 等于 6 的行:

DELETE FROM t1 WHERE id=6;

更多的参考链接 deleteopen in new window

select

[ WITH [ RECURSIVE ] with_query [, ...] ]
SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
    [ * | expression [ [ AS ] output_name ] [, ...] ]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY grouping_element [, ...] ]
    [ HAVING condition ]
    [ WINDOW window_name AS ( window_definition ) [, ...] ]
    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start [ ROW | ROWS ] ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } { ONLY | WITH TIES } ]
    [ FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE } [ OF table_name [, ...] ] [ NOWAIT | SKIP LOCKED ] [...] ]

其中 from_item 可以是以下之一:

    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    [ LATERAL ] function_name ( [ argument [, ...] ] )
                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) ]

并且 grouping_element 可以是以下之一:

    ( )
    expression
    ( expression [, ...] )
    ROLLUP ( { expression | ( expression [, ...] ) } [, ...] )
    CUBE ( { expression | ( expression [, ...] ) } [, ...] )
    GROUPING SETS ( grouping_element [, ...] )

并且 with_query 是:

    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )

TABLE [ ONLY ] table_name [ * ]

进行查看 t1 表,

select * from t1;

等价于

table t1;

更多的参考链接 selectopen in new window

MySQL 特有的DML

insert ignore

如果数据库没有内容,就插入新的数据,如果有数据的话就跳过这条数据:

drop table if exists t1;
create table t1 (a int PRIMARY KEY , b int not null,CONSTRAINT t1_b_key UNIQUE (b));
insert ignore into t1(a,b) values (4,4);
# 重复进行insert,因为已经存在而进行跳过
insert ignore into t1(a,b) values (4,4);

replace into

查看 t1 里面存在的内容:

insert into t1(a,b) values (2,3);
table t1;
 a | b
---+---
 2 | 3
 4 | 4
(2 rows)

如果存在冲突,则先删除其他冲突的元组,然后再进行插入:

replace into t1 values(1,1),(1,2);
table t1;
 a | b
---+---
 1 | 2
 2 | 3
 4 | 4
(3 rows)

Prepared Statement

PostgreSQL 语法和示例

(使用 client api 调用)

PREPARE name [ ( data_type [, ...] ) ] AS statement

MySQL 语法和示例

(使用 client api 调用)

事务处理

Klustron 支持与MySQL相同的事务处理功能并且与MySQL连接协议或者PostgreSQL连接协议无关,在这两类连接中具有相同的行为。

autocommit 事务

在一个客户端连接有一个 autocommit 变量,默认值是 true,用户可以在连接中动态修改 autocommit 的值。当 autocommit 为 true 时,用户发送的任何 DML 语句都会作为独立的事务来执行,语句执行结束时 Klustron 就自动提交这个事务。

DDL 语句永远作为一个独立的事务来执行,无论 autocommit 是 true 还是 false。 如果执行 DDL 时本连接中已有运行中的事务,那么 Klustron 会先提交这个事务,然后再执行这个 DDL。

显式事务

我们把不是 autocommit 的事务称为显式事务。一个显式事务可以显式或者隐式地启动和提交。

显式地启动和提交/回滚事务

  1. 用户执行 begin 或者 start transaction 显式开启一个事务

  2. 执行若干条 DML 语句

  3. 执行 commit 显式地提交这个事务;或者执行 rollback 显式地回滚这个事务

隐式开启和隐式提交事务

如果用户设置 autocommit 为 false,那么之后本客户端连接 C 中收到的第一条 DML 语句后,会先自动(隐式)启动一个事务 T,在这个事务 T 中执行这条语句。后续在这个连接 C 中收到更多 DML 语句时, Klustron 会继续在事务 T 中执行这些语句。

一个显示事务还可以被隐式地提交,当在显式事务 T 中 Klustron 收到 set autocommit=true 或者任何一个 DDL 语句时, Klustron 会(隐式)提交事务 T。

如果在显式事务中执行某个 DML 语句出错,那么事务 T 在 Klustron 内部会被自动回滚,之后用户在连接C中 的事务T中 发送任何DML语句都会被 Klustron 忽略,直到在连接 C 中收到commit或者rollback后, Klustron 会回滚事务 T 。

事务处理功能示例

autocommit 事务示例

在 autocommit 开启状态时,用户发送的任何 DML 语句都会作为独立的事务来执行,语句执行结束时 Klustron 就自动提交这个事务。

set autocommit=true;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);
insert into t1(b) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
drop table if exists t2;

[图片: KunlunBase产品使用和评测指南相关截图]

在 autocommit 关闭状态时,需要进行手动提交或者回滚

set autocommit=off;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);

打开另一个会话查看 t1 表 [图片: KunlunBase产品使用和评测指南相关截图]

进行 commit 提交之后才会成功过插入 [图片: KunlunBase产品使用和评测指南相关截图][图片: KunlunBase产品使用和评测指南相关截图]

在向事务中插入一条数据,进行手动回滚 [图片: KunlunBase产品使用和评测指南相关截图]

DDL 语句永远作为一个独立的事务来执行

ddl 语句已经进行提交了,commit 提示没有检测到事务

set autocommit=true;
show autocommit;
begin;
create table t1(a serial primary key,  b int);
commit;

[图片: KunlunBase产品使用和评测指南相关截图]

将 autocommit 设置为 false,也是一样的

set autocommit=false;
show autocommit;
begin;
drop table t1;
commit;

[图片: KunlunBase产品使用和评测指南相关截图]

显性事务示例

先关闭隐式提交,在显示中进行事务

show autocommit;
set autocommit=off;
begin;
create table t1(a serial primary key,  b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);    

[图片: KunlunBase产品使用和评测指南相关截图]

打开另一个窗口,查看 t1 表,因为还没进行 commit;表 t1中是没有数据的 [图片: KunlunBase产品使用和评测指南相关截图]

我们继续进行 commit; 提交 [图片: KunlunBase产品使用和评测指南相关截图]

成功提交为 t1 表中插入数据 [图片: KunlunBase产品使用和评测指南相关截图]

无论 autocommit 是 true 还是 false。 如果执行 DDL 时本连接中已有运行中的事务,那么 Klustron 会先提交这个事务,然后再执行这个 DDL

show autocommit;
set autocommit=off;
begin;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);

t1目前是没有数据的 [图片: KunlunBase产品使用和评测指南相关截图]

如果执行 ddl 语句会提交前面事务 [图片: KunlunBase产品使用和评测指南相关截图][图片: KunlunBase产品使用和评测指南相关截图]

现在已经提交了事务,再输入 commit;也是提示没有事务需要提交

commit;

[图片: KunlunBase产品使用和评测指南相关截图]

在显式事务中执行某个 DML 语句出错,那么事务 T 在 Klustron 内部会被自动回滚,之后用户在连接 C 中的事务T中 发送任何 DML 语句都会被 Klustron 忽略,直到在连接 C 中收到 commit 或者 rollback 后,Klustron 会回滚事务 T 。

set autocommit=off;
show autocommit;
begin;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);
insert into t1(c) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
commit;

[图片: KunlunBase产品使用和评测指南相关截图]

所有数据都进行了回滚,t1 表中没有任何数据 [图片: KunlunBase产品使用和评测指南相关截图]

在应用程序中处理事务

应用软件代码中执行事务

应用软件中只需要逐一发送一个显式事务的 SQL 语句,注意务必要在执行完 DML 语句后提交这个事务。同时,在应用软件代码中,一个事务的所有逻辑应该在一个 try 块中,catch 所有可能的数据库异常,然后对于任何错误,都应该发送rollback语句来回滚这个事务。 例如:

try {
	conn.execute("begin");
	conn.execute("select*from orders where order_id=123");
	/*
	  处理订单数据的代码若干行
	*/
	conn.execute("update stock set remaining=456 where item_id=789");
	conn.execute("commit");
} catch (DatabaseException &e) {
	conn.execute("rollback");
}

对于没有异常处理的语言来说,可以通过检查语句执行的返回值确定是否成功,如果失败则回滚事务。

Klustron 事务执行时的错误处理

注意 Klustron 默认会在语句执行错误后直接在内部把这个事务设置为 aborted 状态,这样后续发送任何 DML 语句过来都会忽略,直到收到 commit 或者 rollback 语句,Klustron 会统一地回滚这个事务,即使收到的是commit语句也如此,此时 commit 代表的是这个事务结束了,可以做回滚这个事务了。这个做法与PostgreSQL完全相同。这种做法有很强的健壮性---即使客户端不做任何错误处理,这个执行出错的事务也会被回滚掉。

这个方式与MySQL不同,在一个显式事务中执行的 DML 语句出现错误后,MySQL 只是返回错误给客户端,用户来决定他是要继续执行这个事务还是回滚它。客户端必须检查错误,发现执行出错要根据错误号码和其他信息决定要回滚还是继续执行这个事务,如果继续执行那么执行到 commit 语句时这个事务会被提交。如果用户的业务代码没有检查错误,那么就会不知不觉地提交这个部分语句执行出错的事务,即使本该回滚它。所以这种方式要求用户代码必须检查每一个 SQL 语句的执行结果并且在数据库系统返回错误的情况下判断是否要回滚这个事务,如要回滚则立刻回滚,不能再执行这个事务剩余的 SQL 语句。

从 Klustron -1.1 开始也支持MySQL的这种事务处理模式,只要设置 enable_stmt_subxact=on 后,Klustron 就按照MySQL的这种事务处理方式工作,不过这有轻微的性能代价。

也可以在业务逻辑允许的情况下,设置 autocommit=on 后发送 DML 语句。只有根据应用逻辑,单个这样的语句确实是逻辑上完整的操作的情况下再使用 autocommit 事务。如果执行一个任务(事务)需要执行多条 DML 语句,那么应该显式地启动一个事务然后执行这些语句,这样才能确保这个事务的所有操作要么全部完成要么在出错时完全回滚,避免出现部分完成部分失败的问题。

对于 autocommit 事务来说,无论MySQL还是PostgreSQL还是 Klustron ,只要 SQL 语句出错,那么这个 autocommit 语句(事务)会被数据库系统自动回滚,不需要应用侧发送 rollback 语句;只要这个 SQL 语句执行成功,这个 autocommit 事务也会被数据库系统自动提交,不需要应用侧发送 commit 语句。

对于灌入数据等特殊场景,使用 autocommit 语句或者显式事务对于业务逻辑来说都可以,不过为了更好的性能,应该使用显式事务,并且在每个事务中插入较多的行。比如如果你要导入 10 亿条数据到一个表中,那么你可以开启一个显式事务,在其中插入 100 万行然后提交它,并且在每个 insert 语句中只插入1行。 重复这样的事务直到全部 10 亿行数据灌入数据库中。

在这个特定的场景中,如果灌入流程有失败,你一定希望可以继续灌入而不是回滚这个事务,所以你可以预先设置enable_stmt_subxact=on,这样一旦某个 insert 语句失败后(比如因为断电导致 Klustron 计算节点或者存储集群主节点宕机),再次执行它(只插入 1 行,因此不会有行冲突,当然,表必须要有主键或者唯一索引,这个方法才管用)即可。全部 100 万行插入后提交事务。

Klustron 提交事务的错误处理

对于 Klustron 来说,执行 commit 语句也可能因为网络故障或者集群节点宕机(比如断电)等情况而发生错误。执行 commit 发生错误时,客户端会收到错误(异常),不过不需要回滚这个事务。这个事务到底是否成功地提交,取决于发生错误的时机,比如在执行分布式事务两阶段提交算法的第一阶段提交时候发生错误的话,这个事务会被 Klustron 自动回滚,用户无需发送 rollback 语句;如果是在两阶段提交的提交第二阶段发生错误的话, Klustron 会提交这个事务,如果某个事务分支当时无法提交(比如发生了存储集群主节点宕机导致主备切换,需要若干秒完成切换), Klustron 也会在最早的时间提交剩余的事务分支,客户端也不需要发送 rollback 语句。

在这种情况下,如果应用程序希望判断这个事务提交是否成功,最简单的方法是检查这个事务的更新是否存在于相应的表中。不过这些更新可能恰好被随后并发执行的事务删除了,导致没有检查到,尽管那个事务确实成功提交了。所以确实有这个需要的话,业务侧就要设计更巧妙的方法来解决这个问题。

如果在应用程序执行除了 commit 之外的任何一条 SQL 语句时发生语句超时或者连接断开等错误,那么这个事务一定会被 Klustron 回滚,应用侧无需其他额外处理。如果执行 commit 时发生语句超时或者连接断开,那么 Klustron 的处理方法与上段所述完全相同,并且应用侧无需其他处理。

DDL 事务处理和复制

用户在 Klustron 集群的任何一个计算节点执行的 DDL, Klustron 会自动复制到这个集群的所有其他计算节点执行;并且用户在连接到多个计算节点的多个客户端连接中可以同时执行 DDL 语句, Klustron 会确保一个集群的所有计算节点按照相同的顺序执行用户发送给所有计算节点的 DDL 语句。因此一个 Klustron 集群的所有计算节点始终拥有相同的元数据从而正确运行。

当用户执行一个 DDL 语句时,Klustron 把这个语句作为一个独立的分布式事务来运行。如果这个事务运行过程中计算节点或者存储节点或者元数据节点发生故障或者断电,那么这个分布式事务会自动被回滚,清除所有中间状态,这样用户就可以再次执行这个 DDL 语句。

用户不需要知道 DDL 事务处理的技术细节和原理,只需要知道在一个 Klustron 集群的任何一个计算节点都可以执行 DDL 语句和 DML 语句,就像使用单机数据库一样。

体验 DDL 复制功能

这里是一个例子帮助用户体验 Klustron 集群确实具备 DDL 复制的能力。例如,在任何一个计算节点执行 create table 语句,在集群的其他计算节点都可以使用到那个 create table 语句创建的表。

我们首先需要准备一个两个计算节点集群,便于测试.

psql postgres://abc:abc@192.168.0.134:47001/postgres
psql postgres://abc:abc@192.168.0.134:47004/postgres

create

在计算节点 47001 中创建数据库 kunlundb; [图片: KunlunBase产品使用和评测指南相关截图]

在计算节点 47004 中进行查看 [图片: KunlunBase产品使用和评测指南相关截图]

接着在 47004 的节点中创建 t1 表

create table t1(a int, b int);
insert into t1 values (1,2);

[图片: KunlunBase产品使用和评测指南相关截图]

在 47001 计算节点中存在 kunlundb 数据库中的 t1 表 [图片: KunlunBase产品使用和评测指南相关截图]

insert

继续在 47001 中增加表 t1 的数据

insert into t1 values (3,4);
insert into t1 values (5,6);

[图片: KunlunBase产品使用和评测指南相关截图]

增加的数据在 47004 中也成功复制 [图片: KunlunBase产品使用和评测指南相关截图]

update

在计算节点 47004 中进行修改数据

update t1 set a='2' where a=3;
update t1 set a='3' where a=5;

[图片: KunlunBase产品使用和评测指南相关截图]

在另一个计算节点 47001 中,数据也是得到了同步 [图片: KunlunBase产品使用和评测指南相关截图]

delete

在计算节点 47001 中进行删除某行数据

delete from t1 where a=1;

[图片: KunlunBase产品使用和评测指南相关截图]

在另一个计算节点 47004 中,数据也是得到了同步 [图片: KunlunBase产品使用和评测指南相关截图]

drop

在计算节点 47004 中进行删除表 t1 [图片: KunlunBase产品使用和评测指南相关截图]

同样在另一个计算节点 47004 中也不在表 t1 [图片: KunlunBase产品使用和评测指南相关截图]

Fullsync 高可用

Fullsync 强同步机制

模拟备机故障体验 fullsync

  1. 购买测试集群,例如购买一主两备 shard,连接 shard 主节点可以正常进行创建库表,插入/删除数据等操作。

  2. 将该 shard 两个备 db 的MySQLd进程停止了。

  3. 连接 shard 主节点进行创建库表,插入/删除数据时,都会报错返回

监控和配置 Fullsync

1、通过 XPanel 监控和配置

设置 fullsync_consistency_level 实例变量,具体操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

获取 fullsync_consistency_level 实例变量,具体操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

集群展示页面,点击某个存储节点-》进入按钮,还可以查看当前节点的fullsync值。

[图片: KunlunBase产品使用和评测指南相关截图]

2、通过 SQL 语句监控和配置

设置 fullsync 配置 sql:

set global fullsync_consistency_level=2

查看 fullsync 设置:

show global variables like 'fullsync_consistency_level'

主备切换

模拟主节点故障

  1. 设置主节点只读,执行该 sql,set global read_only=true;

  2. 将主节点MySQLd 进程 kill 掉/调用 stop 脚本停止 cluster_mgr 检测到主节点异常后,触发计时任务,主节点连续故障时间超过设置值(默认 20s ),则触发容灾切换。切换后选出新主,可以正常对外提供服务。

主备切换流程记录

查看主备切换流程记录有两个地方:

  1. 元数据集群的 kunlun_metadata_db.rbr_consfailover 表中。

  2. xpanel 中集群管理->集群列表信息 -> 主备切换中。

[图片: KunlunBase产品使用和评测指南相关截图]

验证新的主节点工作正常并且没有丢失已提交的事务

cluster_mgr 提供验证测试程序,下载地址如下: https://github.com/zettadb/cluster_mgr/tree/main/test/suite/consfailover

弹性扩缩容

XPanel

1. 弹性扩容。

注意:

  • 一个集群中至少存在两个以上 shard 才能发起扩容。

  • 扩容分为自动扩容和手动扩容。自动扩容与手动扩容的区别:自动扩容用户不需要人为选 shard 表,由系统自动检测所需扩容表进行扩容操作;手动扩容就是用户自己选表搬迁来达到扩容目的。

(1)自动扩容。

进到 XPanel 中,点集群管理-》集群列表-》集群列表信息,进到集群列表页面,找到已经建好的多 shard 集群,发起自动扩容。如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

如果扩容成功,如图所示:

[图片: KunlunBase产品使用和评测指南相关截图]

如果扩容失败,如图所示:

[图片: KunlunBase产品使用和评测指南相关截图]

2. 手动扩容。

具体操作,如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

如果扩容成功,如图所示:

[图片: KunlunBase产品使用和评测指南相关截图]

如果扩容失败,如图所示:

[图片: KunlunBase产品使用和评测指南相关截图]

cluster-mgr API

1、使用 postman 创建 rbr 集群: [图片: KunlunBase产品使用和评测指南相关截图]

集群创建成功后,连接计算节点,如: psql postgres://abc:abc@192.168.0.132:59701/postgres

建立 student 表并写入数据。

2、增加 shard: [图片: KunlunBase产品使用和评测指南相关截图]

3、发起扩容: [图片: KunlunBase产品使用和评测指南相关截图]

扩容成功后,连接新增的 shard 的主,如:

MySQL -uclustmgr -pclustmgr_pwd -h192.168.0.129 -P59413
use postgres_$$_public
show tables;(检查是否有student表)

预期是这样: [图片: KunlunBase产品使用和评测指南相关截图]

故障恢复机制及验证方法

1. 弹性扩缩容原理介绍

弹性扩容的目标是将业务表,按需从一个 shard 上不停机的迁移到另一个 shard 上,从而实现数据的横向扩展,其过程有如下几个阶段:数据全量导出、数据全量导入、增量数据恢复、路由切换。

数据全量导出: 该阶段,会将待迁移表利用 mydumper 工具进行以逻辑导出,并将数据保存在磁盘上。

数据全量导入: 该阶段,会将上一阶段导出的数据,利用 myloader 工具导入到目标分片上。

增量数据恢复: 该阶段,会利用 MySQL 的主备复制机制,根据 mydumper 导出全量镜像时的一致性位点,创建一个只同步待迁移表的MySQL主备同步 channel ,并命名为 expand_id(ID 为全局的任务 id),其中该 channel 的源节点为表的迁出源 shard ,目标实例为扩容的目标 shard 的主节点。待增量数据重放完成后,在不停止整个数据同步通道的前提下,对源 shard 的待迁移表进行 rename 操作,从而阻断源 shard 对该表的读写。当在目标 shard 的主机上也发现该表已经被 rename 后,增量数据恢复阶段结束,并对数据同步通道进行清理,进入路由切换阶段。

路由切换阶段: 路由切换阶段,会先通过写入元数据集群的方式,通知所有的计算节点,该表的路由已经发生变化。在写入完成后,对目标 shard 上的已经在上一阶段被 rename 的表进行恢复操作,即再次 rename 为原表名。此时源 shard 上的表还是处在 rename 状态,目标 shard 上的表已经被恢复为业务的原表名。

所有的计算几点,如果已经更新了路由,则如果此时有业务访问该迁移表,则会路由到目标 shardd 上。如果该计算节点,在业务请求到来时,还没有更新到最新的路由变化,仍然访问原 shardd 此时由于源 shard 上的表名仍然处于 rename 状态,因此访问会失败,知道更新到最新的路由后,访问才能成功。

2. 故障场景及恢复机制

故障恢复机制主要需要解决的问题是:当扩容正在进行的过程中发生故障导致扩容流程中断的情况下,如何清理中间状态的数据或者当服务恢复后如何继续未完成的扩容任务的问题。

当任务中断在数据全量导出结束后: 此时任务重新发起即可。

当任务中断在数据全量导入之后: 此时任务重新发起即可

当任务中断在增量数据恢复之后: 则任务直接回滚,所有的中间状态的数据被清理掉。

集群数据备份和恢复

1、创建 rbr 集群: [图片: KunlunBase产品使用和评测指南相关截图]

此集群作为源集群; 集群创建成功后,连接计算节点,如: psql postgres://abc:abc@192.168.0.132:59701/postgres

建立 student 表并写入数据

2、发起备份操作:(需保证hdfs server已启动)

[图片: KunlunBase产品使用和评测指南相关截图] hdfs 下记录恢复的时间,如:2022-08-23 13:52 [图片: KunlunBase产品使用和评测指南相关截图]

3、创建另一个集群: 规格需与步骤1中的集群一致,参考步骤 1,作为目标集群。

4、发起恢复操作: [图片: KunlunBase产品使用和评测指南相关截图]

恢复成功后,链接步骤 3 中集群的计算节点,如: psql postgres://abc:abc@192.168.0.129:59701/postgres 检查 student 表是否同步到目标集群中。

XPanel 配置和发起备份

1、cluster_mgr 安装完成后,系统会自动上报备份存储目录,在XPanel的备份存储目标管理页面中可以查看相关备份存储目录列表。如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

2、建好 vito 集群。

[图片: KunlunBase产品使用和评测指南相关截图]

3、登录 vito 集群中的计算节点 47001 新建一个表,并加入两条数据。

[图片: KunlunBase产品使用和评测指南相关截图]

4、对 vito 集群发起全量备份,操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

5、到集群备份列表中,查看备份结果信息,如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

XPanel 启动集群恢复

1、对已备份的 vito 集群进行恢复操作。创建一个新的集群 vito3 作为恢复备份的集群。如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

2、vito3 的集群计算节点是 47003。登录节点查看没有任何表。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

3、将 vito 集群之前的备份在 vito3 集群中进行恢复。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

4、回档成功后登录 vito3 集群的计算节点 47003 查看验证是否存在 t2 表,下图所示表示恢复集群成功。

[图片: KunlunBase产品使用和评测指南相关截图]

其他集群管理功能 API 和 XPanel 操作

增加存储集群

  1. 使用 XPanel 增加存储集群。具体操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

增加、删除计算节点

1、使用 XPanel 增加计算节点。操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

2、使用 XPanel 删除计算节点。如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

重做备机节点

1、使用 XPanel 重做备机节点。下面是对 test1 集群中的 shard1 存储集群的备机节点 57001,进行重做备机,具体操作如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

其他集群管理功能 api:

1、使用 postman 创建 rbr 集群: [图片: KunlunBase产品使用和评测指南相关截图]

返回内容是这样: [图片: KunlunBase产品使用和评测指南相关截图] 下同,jobid 会跟着变化。

2、增加 comps: [图片: KunlunBase产品使用和评测指南相关截图]

3、增加 shards: [图片: KunlunBase产品使用和评测指南相关截图]

4、增加 nodes: [图片: KunlunBase产品使用和评测指南相关截图]

5、删除 comps: [图片: KunlunBase产品使用和评测指南相关截图]

6、删除 shards: [图片: KunlunBase产品使用和评测指南相关截图]

7、删除 nodes: [图片: KunlunBase产品使用和评测指南相关截图]

8、重做 shard 备机 [图片: KunlunBase产品使用和评测指南相关截图] 需保证 hdfs server 是正常运行的,才可以成功。

9、检查 clustmgr 处理 api 的状态: [图片: KunlunBase产品使用和评测指南相关截图]

10、删除集群: [图片: KunlunBase产品使用和评测指南相关截图]

查看集群节点日志

前置条件

1、目前 kunlun 集群支持配置ES来收集各模块输出日志,搭建集群时(boostrap)选配是否安装 ES,如果选择安装后,则自动完成对管理模块,计算节点和存储节点日志收集。

kibana 中查看日志: 查看 filebeat 上报的 index [图片: KunlunBase产品使用和评测指南相关截图]

配置 kibana index pattern [图片: KunlunBase产品使用和评测指南相关截图]

kibana discover页面操作 [图片: KunlunBase产品使用和评测指南相关截图]

目前日志按照机器级别上报,机器上可能有管理模块,计算节点或者存储节点。可以在 kibana 页面配置过滤规则来查看不同模块日志。

2、集群没有配置 ES,则需要登录各个模块所在集群查看日志。

查看集群节点状态

使用 XPanel 查看状态变化记录

1、XPanel 中查看集群节点的状态,如下图所示。

[图片: KunlunBase产品使用和评测指南相关截图]

[图片: KunlunBase产品使用和评测指南相关截图]

故障报告

找到 core 文件

需要用户在自己的 linux 机器上设置:

  1. ulimit -c unlimited

  2. sysctl -w kernel.core_pattern=core.%e.%p.%t 程序运行目录下生成core文件,这个需要root权限

使用带符号的程序文件打印调用栈

如果出现 core 文体,使用 gdb 打印调用栈 gdb -c core 文件 bin文件 回车 进入后,输入 bt 回车

END