Skip to main content

Klustron Product Usage and Evaluation Guide

KlustronAbout 53 min

Klustron Product Usage and Evaluation Guide

overview

This document guides and helps Klustron users evaluate and verify the important functions of Klustron. Users can install the Klustron cluster and experience and use the main functions of Klustron by following this document, and use this document as a quick start manual for using Klustron. For complete documentation of Klustron, please visit www.doc.klustron.com. Users can also evaluate and verify that various functions of Klustron are working properly according to this document. During the Klustron product development process, the Klustron team will continue to develop test programs for its various functions, and use the automated test system to run all test programs every day, so as to find and solve problems in time to ensure that various functions of Klustron work normally. At the same time, we will use MySQL and PostgreSQL test sets to perform functional tests on Klustron storage nodes and computing nodes. Please visit www.Klustron.com for the daily test results of Klustron's automated test system.

Klustron cluster architecture

Klustron is a distributed relational database management system for terabyte and petabyte-level massive data processing, supporting high throughput and low latency extreme performance to handle high concurrent read and write requests for massive data. It supports horizontal elastic expansion, provides robust ACID guarantees for transactions, efficient and easy-to-use distributed query processing, high scalability, high availability and transparent sub-database and sub-table data processing functions, and non-perceived horizontal expansion of the business layer and end users It is a typical NewSQL distributed database system.

![img](KunlunBase 产品使用和评测指南/1.png)

The above technical capabilities of Klustron ensure that users can increase server nodes without interruption at any time to expand data storage and processing & computing capabilities when business continues to grow, and when server nodes fail or power outages, data will not be lost and data will not be stopped Read and write services. Ensure that user services continue to run at the highest quality.

Application software developers only need to use the Kunlun database in the same way as the single-node relational database , and can get all the advantages of the above-mentioned NewSQL database without considering any details of data storage and management. Users can use two connection protocols, PostgreSQL and MySQL, to connect to the computing nodes of the Klustron cluster and execute DDL and DML SQL statements. Klustron supports standard SQL, DDL syntax of PostgreSQL, and private DML syntax of PostgreSQL and MySQL. Therefore, applications that originally use PostgreSQL and MySQL can use Klustron without modification.

Klustron supports all SQL data analysis functions, and can execute all OLAP analysis SQL statements of TPC-H and TPC-DS . Therefore, users can continuously flow data from various business systems into the Klustron cluster, and use the Klustron cluster to analyze these continuously updated data. Do OLAP analysis and data mining on data to quickly discover changes and trends, seize fleeting business opportunities, respond to emergencies in a timely manner, and gain huge competitive advantages.

Even if the user's data volume is only a few GB, they can benefit from using Klustron. Klustron's fullsync function has higher reliability than MySQL's built-in semisync (semi-synchronous) and group replication---if the two technologies are powered off at the same time, user data may be lost or inconsistent, and fullsync ensures this No user data will be lost in this case. At this point, only one storage cluster (for example, one primary and two standbys) can be deployed. When user data and business load continue to increase, more storage clusters and computing nodes can be added on demand at any time to expand data storage and query capabilities.

At the same time, users can expand the processing capabilities of Klustron through read-write separation. By using read-write separation technology to query data from the standby machine, and arranging dedicated computing nodes for OLAP analysis and query, it can ensure that OLAP analysis tasks will not affect the performance of OLTP business at all.

Use Kunlun Base

Initialize server for Klustron (BootStrap)

A Klustron cluster can only be installed on a server that has been initialized (bootstrap). After the server is initialized, the user can use the XPanel cluster management tool to install the Klustron cluster.

This section describes how to initialize the computer server.

Initialize the server using a script

  1. Download the script git clone https://gitee.com/zettadb/cloudnative.git cd cloudnative/cluster

  2. Fill in the configuration file. For detailed parameter descriptions, refer to: Using the Cluster Management Installation Script Instruction Manual

  3. After filling out the configuration file, start to initialize the server

     python setup_cluster_manager.py --config=cluster_and_node_mgr.json --action=install     bash -e clustermgr/install.sh
    

Create a Klustron cluster

The following operations need to be performed after initializing the server

XPanel

  1. Enter the xpanel web terminal: http://host:port/KunlunXPanel

    a. The host is the corresponding ip of the server that downloads the docker image and runs it.

    b. port is the port specified when deploying docker

  2. When entering for the first time, the account number and password are both super_dba.

    ![img](KunlunBase 产品使用和评测指南/2.png)

  3. Then you will need to change the password. After changing the password, enter the main interface of xpanel

    ![img](KunlunBase 产品使用和评测指南/3.png)

  4. add computer list

    a. Check whether the computer to be added exists in the list, if it does not exist, proceed to this step, if it exists, skip it

    b. At present, only one by one can be added, the corresponding server in the list must have node_mgr installed, and the configuration is correct

    c. System Management -- User Management -- Add -- Fill in the corresponding parameters -- Confirm

    ![img](KunlunBase 产品使用和评测指南/4.png)

  5. After adding computers, you can create a cluster

    a. Cluster management -- cluster list -- new ![img](KunlunBase 产品使用和评测指南/5.png) b. Fill in the corresponding parameter values according to requirements ![img](KunlunBase 产品使用和评测指南/6.png) c. After the cluster is installed, you can view the corresponding node information and master-backup relationship on the cluster display interface Cluster management -- cluster list -- - Click on the corresponding cluster ![img](KunlunBase 产品使用和评测指南/7.png)

Use Klustron to read and write data

Each programming language is ready to use Klustron

Currently Klustron supports both postgres and MySQL protocols.

The MySQL protocol port number of the computing node can be entered by entering the pg port of the computing node, using show MySQL_port; View

command line connection

  • Connect to Klustron pg protocol, you can connect through tcp and url
    • Tcp:psql -h host -p port -U username -d databaseName
    • url: psql postgres://userName:passwd@host:port/databaseName
  • Connect to KlustronMySQL protocol, you can connect through tcp and socket
    • TCP:MySQL -h host -P port -ppassword -u userName
      • Note: there can be no space between -p and password

How to download connectors in each language

  • Both go and rust can directly download the required dependencies through the built-in package manager without additional installation
    • Go version 1.17.5 or higher is recommended
    • Rust recommends version 1.59.0 or later

java ![img](KunlunBase 产品使用和评测指南/8.png)

After selecting the corresponding jar package version in the maven warehouse, click jar to download the corresponding jar package

  • MySQL connector/j
  • PostgreSQL JDBC Driver
    • https://mvnrepository.com/artifact/org.PostgreSQL/PostgreSQL maven repository
  • mariadb connector/j - version 3.x
    • https://mariadb.org/connector-java/all-releases/ mariadb official website, the official website only has the latest 3.x version
    • https://mvnrepository.com/artifact/org.mariadb.jdbc/mariadb-java-client maven repository
  • mariadb connector/j - version 2.x
    • http://zettatech.tpddns.cn:14000/thirdparty/ Zettatech official website
    • Because the official version 2.0 has some bugs when connecting to computing nodes, it is not recommended to directly use the 2.x version connector on the mariadb official website

python

  • Psycopg2
    • pip install psycopg2 install via pip
    • pip install psycopg2==$version install via pip
    • https://github.com/psycopg/psycopg2 via setup.pg build
  • MySQL-connector/python
  • pyMySQL
    • pip install pyMySQL install via pip
    • https://github.com/PyMySQL/PyMySQL via setup.pg build

nodejs

php

c

  • PostgreSQL
    • sudo apt-get install libpq++-dev -y ubuntu install pg dependencies
  • MySQL
    • sudo apt-get install libMySQL++-dev -y ubuntu install MySQL dependencies

c#

  • PostgreSQL
    • dotnet add package Npgsql
  • MySQL
    • dotnet add package MySQL.Data

c++

  • PostgreSQL

    sudo apt install libpq-dev
    git clone https://github.com/jtv/libpqxx.git
    cd libpqxx
    ./configure
    make
    sudo make install
    
  • MySQL

Use various programming languages to perform addition, deletion, modification and query DML statements on Klustron

All the following examples can be downloaded here https://gitee.com/zettadb/cloudnative

  • pg related examples are in the cloudnative/smoke directory
  • MySQL related examples are in the cloudnative/smoke/somkeTest-MySQL directory

java

  • PostgreSQL -JDBC-Driver
package kunlun.test;

/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 */

import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

public class SmokeTest {

    static {
        try {
            Class.forName("org.PostgreSQL.Driver");
            //Class.forName("com.MySQL.cj.jdbc.Driver");
        } catch (Exception ex) {
        }
    }

    public static Connection getConnection(String user,
                                           String password,
                                           String host,
                                           int port,
                                           String dbname) {
        String proto = "PostgreSQL";
        Properties props = new Properties();
        props.setProperty("user", user);
        props.setProperty("password", password);
        String url = "jdbc:" + proto+"://" + host + ":" + port + "/" + dbname;
        try {
            return DriverManager.getConnection(url, props);
        } catch (Exception ex) {
            ex.printStackTrace();
            return null;
        }
    }

    public static void smokeTest(Connection conn) throws Exception{
        boolean autocommit = conn.getAutoCommit();
        System.out.println("default autocommit: " + autocommit);
        conn.setAutoCommit(true);
        Statement st =conn.createStatement();
        st.execute("SET client_min_messages TO 'warning';");
        st.execute("drop table if exists t1;");
        st.execute("RESET client_min_messages;");
        String createSql = "create table t1(id integer primary key, " +
                           "info text, wt integer);";
        st.execute(createSql);
        st.execute("insert into t1(id,info,wt) values(1, 'record1', 1);");
        st.execute("insert into t1(id,info,wt) values(2, 'record2', 2);");
        st.execute("update t1 set wt = 12 where id = 1;");
        ResultSet res1 = st.executeQuery("select * from t1;");
        System.out.printf("res1:%s%n", showResults(res1).toString());
        res1.close();
        st.close();

        String pstr = "select * from t1 where id=?";
        PreparedStatement ps = conn.prepareStatement(pstr);
        ps.setInt(1, 1);
        ResultSet pres = ps.executeQuery();
        System.out.printf("pres1:%s%n", showResults(pres).toString());
        ps.setInt(1, 2);
        pres = ps.executeQuery();
        System.out.printf("pres2:%s%n", showResults(pres).toString());
        ps.close();

        pstr = "update t1 set info=? , wt=? where id=?";
        ps = conn.prepareStatement(pstr);
        ps.setString(1, "Rec1");
        ps.setInt(2, 2);
        ps.setInt(3, 1);
        ps.execute();
        ps.setString(1, "Rec2");
        ps.setInt(2, 3);
        ps.setInt(3, 2);
        ps.execute();
        ps.close();

        st =conn.createStatement();
        ResultSet res2 = st.executeQuery("select * from t1;");
        System.out.printf("res2:%s%n", showResults(res2).toString());
        res2.close();
        st.execute("delete from t1 where id = 1;");
        ResultSet res3 = st.executeQuery("select * from t1;");
        System.out.printf("res3:%s%n", showResults(res3).toString());
        res3.close();
        st.execute("drop table t1;");
        st.close();
        conn.setAutoCommit(autocommit);
    }

    /*
     * We do the following actions:
     * 1 Create the able
     * 2 Insert two records
     * 3 Update the first record.
     * 4 Query the records(res1).
     * 5 Delete the second record.
     * 6 Query the records again(res2).
     * 7 Drop the table.
     */
    public static void smokeTestFile(Connection conn, String cmdfile) throws Exception{
        boolean autocommit = conn.getAutoCommit();
        System.out.println("default autocommit: " + autocommit);
        conn.setAutoCommit(true);
        Statement st =conn.createStatement();
        BufferedReader br = new BufferedReader(new FileReader(cmdfile));
        String cmd = null;
        do {
            cmd = br.readLine();
            if (cmd == null) {
                break;
            }
            if (cmd.toUpperCase().startsWith("SELECT")) {
                ResultSet res = st.executeQuery(cmd);
                System.out.printf("sql:%s, res:%s%n", cmd,
                                  showResults(res).toString());
                res.close();
            } else {
                st.execute(cmd);
            }
        } while (cmd != null);
        br.close();
        st.close();
        conn.setAutoCommit(autocommit);
    }

    private static List<List<String>> showResults(ResultSet res)
        throws Exception {
        LinkedList<List<String>> results = new LinkedList<>();
        int cols = res.getMetaData().getColumnCount();
        while (res.next()) {
            List<String> row = new ArrayList<>(cols);
            for (int i = 0; i < cols; i++) {
                row.add(res.getString(i + 1));
            }
            results.addLast(row);
        }
        return results;
    }

    public static void test1(String[] args) throws Exception{
        String host = args[0];
        int port = Integer.valueOf(args[1]);
        String user = "abc";
        String password = "abc";
        String database = "postgres";
        Connection conn = getConnection(user, password, host, port, database);
        smokeTest(conn);
        conn.close();
    }

    public static void main(String[] args) throws Exception {
        test1(args);
    }
}
  • MySQL connector/j
    • Compile: javac test.java
    • Run: java -cp .:./MySQL-connector-java-$version.jar test
import java.sql.DriverManager;
import java.util.ArrayList;
import java.sql.*;
import java.util.*;

public class MySQL {
    // MySQL 8.0 以下版本 - JDBC 驱动名及数据库 URL
    static final String JDBC_DRIVER = "com.MySQL.jdbc.Driver";
    // MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL
    //static final String JDBC_DRIVER = "com.MySQL.cj.jdbc.Driver";
    // 数据库的用户名与密码,需要根据自己的设置
    static final String USER = "user";
    static final String PASS = "pwd";

    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;

        //处理传入的参数
        String host = Arrays.toString(args);
        String urls1 = "jdbc:MySQL://" + host + "/postgres";
        String urls2 = urls1.replace("[","");
        String urls = urls2.replace("]","");

        try{
            // 注册 JDBC 驱动
            Class.forName(JDBC_DRIVER);

            // 打开链接
            System.out.println("连接数据库" + host + "...");
            conn = DriverManager.getConnection(urls,USER,PASS);

            // 执行查询
            System.out.println(" 实例化Statement对象...");
            stmt = conn.createStatement();
            ArrayList sqls = new ArrayList();
            sqls.add("drop table if exists myjdbc_sm;");
            sqls.add("create table myjdbc_sm(a int primary key, b text);");
            sqls.add("insert into myjdbc_sm values(1, 'abc'),(2, 'bcd'),(3, 'cde');");
            sqls.add("select * from myjdbc_sm;");
            sqls.add("update myjdbc_sm set b = 'def' where a = 1;");
            sqls.add("select * from myjdbc_sm;");
            sqls.add("delete from myjdbc_sm where a = 3;");
            sqls.add("select * from myjdbc_sm;");

            for (int i = 0; i <= 7; ++i){
                String sql = (String) sqls.get(i);
                System.out.println(sql);

                if (sql == "select * from myjdbc_sm;"){
                    ResultSet rs = stmt.executeQuery(sql);
                    while(rs.next()) {
                        int a = rs.getInt("a");
                        String b = rs.getString("b");
                        System.out.print(" a: " + a + " b: " + b + "\n");
                    }
                    rs.close();
                }
                else {
                    stmt.execute(sql);
                }
            }

            stmt.close();
            conn.close();
        }catch(SQLException se){
            // 处理 JDBC 错误
            se.printStackTrace();
        }catch(Exception e){
            // 处理 Class.forName 错误
            e.printStackTrace();
        }finally{
            // 关闭资源
            try{
                if(stmt!=null) stmt.close();
            }catch(SQLException se2){
            }// 什么都不做
            try{
                if(conn!=null) conn.close();
            }catch(SQLException se){
                se.printStackTrace();
            }
        }
        System.out.println("Goodbye!");
    }
}

python

  • Psycopg2
import psycopg2
import sys

def test(hoststr, portstr):
        intport = int(portstr)
        conn = psycopg2.connect(host=hoststr, port=intport, user='abc', password='abc', database='postgres')
        conn.autocommit = True
        cur = conn.cursor()
        sqls=["SET client_min_messages TO 'warning';",
                "drop table if exists t1111",
                "RESET client_min_messages;",
                "create table t1111(id int primary key, info text, wt int)",
                "insert into t1111(id,info,wt) values(1, 'record1', 1)",
                "insert into t1111(id,info,wt) values(2, 'record2', 2)",
                "update t1111 set wt = 12 where id = 1", "select * from t1111",
                "delete from t1111 where id = 1", "select * from t1111",
                "prepare q1(int) as select*from t1111 where id=$1",
                "begin",
                "execute q1(1)",
                "execute q1(2)",
                "prepare q2(text,int, int) as update t1111 set info=$1 , wt=$2 where id=$3",
                "execute q2('Rec1',2,1)",
                "commit",
                "execute q2('Rec2',3,2)",
                "drop table t1111"]
        for sql in sqls:
                res = cur.execute(sql+";")
                print "command:%s, res:%s" % (sql, str(res))

if __name__ == '__main__':
        test(sys.argv[1], sys.argv[2])
  • MySQL connector/python
import argparse
from time import sleep
from MySQL import connector
def test(sql, host, port, user, pwd, db):
    conn = connector.connect(buffered=True, host=host, port=port, user=user, passwd=pwd, database=db, ssl_disabled=True)
    cur = conn.cursor()

    print(sql)
    if sql == 'select * from MySQL_connector_python;':
        cur.execute(sql)
        rs = cur.fetchall()
        srs = str(rs)
        srs = srs.replace('[(', '')
        srs = srs.replace(')]', '')
        srs = srs.replace('), (', '\n------\n')
        srs = srs.replace(',', ' |')
        srs = srs.replace('\'', '')
        print('--------\na | b\n------\n' + srs + '\n--------')
    else:
        cur.execute(sql)

    conn.commit()
    cur.close()
    conn.close()

def execSql(host, port, user, pwd, db):
    sql = ['drop table if exists MySQL_connector_python;',
            'create table MySQL_connector_python(a int primary key, b text);',
            "insert into MySQL_connector_python values(1,'a'),(2,'b'),(3,'c');",
            'select * from MySQL_connector_python;',
            "update MySQL_connector_python set b = 'abc' where a = 3;",
            'select * from MySQL_connector_python;',
            'delete from MySQL_connector_python where a = 3;',
            'select * from MySQL_connector_python;']
    for sqls in sql:
        test(sqls, host, port, user, pwd, db)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description = 'this script is use to test ddl replication!')
    parser.add_argument('--host', help='host')
    parser.add_argument('--port', default=3306, help='port')
    parser.add_argument('--db', default='MySQL', help='database name')
    parser.add_argument('--pwd', default='root', help='password')
    parser.add_argument('--user', default='root', help='user name')
    args = parser.parse_args()
    host = args.host
    port = args.port
    db   = args.db
    pwd  = args.pwd
    user = args.user

    print(host, str(port))
    execSql(host, port, user, pwd, db)

nodejs

  • PostgreSQL
const { findSourceMap } = require('module');
const { CLIENT_RENEG_LIMIT } = require('tls');
const pg=require('./node_modules/pg');

var arguments = process.argv.splice(2);
console.log('host:', arguments[0], 'port: ', arguments[1]);

var conString = ('postgres://abc:abc@'+arguments[0]+':'+arguments[1]+'/postgres');
var client = new pg.Client(conString);


client.connect(function(err){
    if(err){
        return console.error('数据库连接出错',err);
    }

    console.log("")
    console.log("=========== JS Driver ==============");
    client.query('drop table if exists smoketesttable_js;',function(err,data){
        if(err){
            return console.error('step 1 : droped table failed!',err);

        }else{
            console.log('step 1 : drop table success!')
        }
    })
  client.query('drop table if exists smoketesttable_js1;');//再运行一次的原因是因为如果失败了就只有一个failed!提示,没有报错信息。所以再运行一次让这个报错信息显示出来

    client.query('create table smoketesttable_js(id int primary key,name text,gender text);',function(err,data){
        if(err){
            return console.error('step 2 : create failed!',err);
        }else{
            console.log('step 2 : create table success!')
        }
    })
  client.query('create table smoketesttable_js1(id int primary key,name text,gender text);')

    client.query("insert into smoketesttable_js values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');",function(err,data){
        if(err){
            return console.error('step 3 : insert failed!',err);
        }else{
            console.log('step 3 : insert data success!')
        }
    })
  client.query("insert into smoketesttable_js1 values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');")

    client.query("delete from smoketesttable_js where id = 1;",function(err){
        if(err){
            return console.error('step 4 : delete failed!')
        }else{
            console.log("step 4 : delete data success!")
        }
    })
  client.query("delete from smoketesttable_js1 where id = 1;")

    client.query("update smoketesttable_js set gender = 'male' where id = 2;",function(err){
        if(err){
            return console.error("step 5 : update failed!")
        }else{
            console.log('step 5 : update gender success!')
        }
    })
  client.query("update smoketesttable_js1 set gender = 'male' where id = 2;")


    client.query("select * from smoketesttable_js;",function(err){
        if(err){
            return console.error("select failed!")
            client.query("step 6 : select * from smoktesttable_js;")
        }else{
            console.log('step 6 : select table success!')
        }
    })
    client.query("select * from smoketesttable_js1;")

    client.query("commit",function(err){
        if(err){
            return console.error("select failed!")
        }else{
            console.log('step 6 : commit success!')
        }
        client.end();
        console.log("====================================");
  console.log("")
    })


})
  • MySQL
// 使用process获取命令行传的参数
var arguments = process.argv.splice(2);
var hosts = arguments[0];
var ports  = arguments[1];

var MySQL  = require('MySQL');

var connection = MySQL.createConnection({
    host     : hosts,
    user     : 'abc',
    password : 'abc',
    port: ports,
    database: 'postgres'
});
//console.log(connection)


connection.connect();

var sql1 = 'drop table if exists myjs_sm';
var sql2 = 'create table if not exists myjs_sm(a int primary key,b text);'
var sql3 = 'insert into myjs_sm values(1,"abc")';
var sqls = 'select * from myjs_sm';
var sql4 = 'update myjs_sm set b = "asd" where a = 1';
var sql5 = 'delete from myjs_sm where a = 1';

var  execsql=function(arg1){
//查
  connection.query(arg1, function (err, result) {
                if(err){
                console.log(err.message);
                return;
        }

        console.log('-----------------------[ "' + arg1 + '" ]');
        console.log(result);
        console.log('------------------------------------------------------------\n\n');
  });

  //connection.end();
}

execsql(sql1);
execsql(sql2);
execsql(sql3);
execsql(sqls);
execsql(sql4);
execsql(sqls);
execsql(sql5);
connection.end();

php

  • PostgreSQL
    • php smoketest-pg.php host port
<?php
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 */

function showresults($ret) {
  echo "=== select results ===\n";
  while ($row = pg_fetch_row($ret)) {
        $str="";
        foreach ($row as $item){
                $str .= $item."  ";
        }
        echo "row: $str\n";
  }
}

function checkret($db, $cmd, $ret) {
  if (!$ret) {
        echo $cmd.pg_last_error($db)."\n";
        exit(1);
  } else {
        echo $cmd.":success\n";
  }
}

$host = "host=$argv[1]";
$port = "port=$argv[2]";
$dbname = "dbname=postgres";
$credentials = "user=abc password=abc";
$connstr="$host $port $dbname $credentials";

echo "conn string: $connstr\n";

$db = pg_connect($connstr);
if(!$db){
  echo "Error : Unable to open database\n";
  exit(1);
} else {
  echo "Opened database successfully\n";
}

$sql = "SET client_min_messages TO 'warning';";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "drop table if exists t1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "RESET client_min_messages;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "create table t1(id integer primary key, info text, wt integer);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "insert into t1(id,info,wt) values(1, 'record1', 1);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "insert into t1(id,info,wt) values(2, 'record2', 2);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "update t1 set wt = 12 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "select info from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

$sql = 'select * from t1 where id=$1';
$ret = pg_prepare($db, "prep", $sql);
checkret($db,  "prep:".$sql, $ret);

$ret = pg_execute($db, "prep", array(1));
showresults($ret);

$ret = pg_execute($db, "prep", array(2));
showresults($ret);

$sql = 'update t1 set info=$1 , wt=$2 where id=$3';
$ret = pg_prepare($db, "prep2", $sql);
checkret($db, "prep:".$sql, $ret);

$ret = pg_execute($db, "prep2", array('Rec1', 2, 1));
checkret($db, "prep2:{Rec1,2,1}", $ret);

$ret = pg_execute($db, "prep2", array('Rec2', 3, 2));
checkret($db, "prep2:{Rec2,3,2}", $ret);

$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

$sql = "delete from t1 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);

$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {
  showresults($ret);
}

pg_close($db)
?>
  • MySQL
    • php smoketest-my.php host port
<?php
$host = "$argv[1]";
$port = "$argv[2]";
$dbname = "postgres";
$user = "abc";
$pwd = "abc";

$conn = MySQLi_connect($host, $user, $pwd, $dbname, $port) or die("数据库连接错误!");

$sql1 = "drop table if  exists myphp_sm \n";
$sql2 = "create table if not exists myphp_sm(a int primary key, b text)\n";
$sql3 = "insert into myphp_sm values (1,'abc')\n";
$sql4 = "select * from myphp_sm\n";
$sql5 = "update myphp_sm set b = 'asd' where a = 1\n";
$sql6 = "select * from myphp_sm\n";
$sql7 = "delete from myphp_sm where a = 1\n";

$rs = MySQLi_query($conn, $sql1);
echo "$sql1 success\n";

$rs = MySQLi_query($conn, $sql2);
echo "$sql2 success\n";

$rs = MySQLi_query($conn, $sql3);
echo "$sql3 success\n";

$rs = MySQLi_query($conn, $sql4);
echo "$sql4 success\n";
$row = MySQLi_fetch_array($rs);
var_dump($row);

$rs = MySQLi_query($conn, $sql5);
echo "$sql5 success\n";

$rs = MySQLi_query($conn, $sql6);
echo "$sql6 success\n";
$row = MySQLi_fetch_array($rs);
var_dump($row);

$rs = MySQLi_query($conn, $sql7);
echo "$sql7 success\n";
?>

go

  • MySQL
    • Environment setup and compilation
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct
go mod init smoketest_my #初始化脚本
go mod tidy #该步go的包管理器会把依赖库自动处理好
go build # build
  • ./smoketest_my -h host -p port
package main

import (
    "fmt"
    _ "github.com/go-sql-driver/MySQL"
    "github.com/jmoiron/sqlx"
    "flag"
)

func checkError(err error) {
        if err != nil {
                panic(err)
        }
}

func main() {
        var User string
        var Pwd string
        var Host string
        var Port int
        var Dbname string
        flag.StringVar(&Host,"h","","默认为空")
        flag.IntVar(&Port,"p",5001,"默认为5001")
        flag.StringVar(&Pwd,"pwd","abc","默认为abc")
        flag.StringVar(&Dbname,"d","postgres","默认为postgres")
        flag.StringVar(&User,"u","abc","默认为abc")
        flag.Parse()

        fmt.Println("============= Golang-MySQL ============")
        // Initialize connection string.
        var connectionString string = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8", User, Pwd, Host, Port, Dbname)

        // Initialize connection object.
        db, err := sqlx.Open("MySQL", connectionString)
        checkError(err)

        err = db.Ping()
        checkError(err)
        fmt.Println("Successfully connection to database!!!")

        // Drop table
        _, err = db.Exec("drop table if exists mygo_sm;")
        checkError(err)
        fmt.Println("Successfully drop   table")

        // Create table.
        _, err = db.Exec("create table mygo_sm(id int primary key,name text,gender text);")
        checkError(err)
        fmt.Println("Successfully create table")

        // Insert
//        sql_statement := "insert into mygo_sm values ($1, $2, $3);"
        _, err = db.Exec("insert into mygo_sm values( 1, 'banana', 'male')")
        checkError(err)
        _, err = db.Exec("insert into mygo_sm values(2, 'orange', 'female')")
        checkError(err)
        _, err = db.Exec("insert into mygo_sm values(3, 'apple', 'male')")
        checkError(err)
        fmt.Println("Successfully insert table")

        _, err = db.Exec("delete from mygo_sm where id = 2")
        checkError(err)
        fmt.Println("Successfully delete table")

        _, err = db.Exec("update mygo_sm set name = 'update' where id = 3")
        checkError(err)
        fmt.Println("Successfully update table")

        _, err = db.Exec("select * from mygo_sm")
        checkError(err)
        fmt.Println("Successfully select table")

        fmt.Println("=================================")
}
  • PostgreSQL
    • For environment setting and compilation, refer to the previous article
    • ./smoketest -h host -p port
package main

import (
  "database/sql"
  "fmt"
  "flag"
  _ "github.com/lib/pq"
)

const (
  // Initialize connection constants.
  //HOST     = "mydemoserver.postgres.database.azure.com"
  //DATABASE = "postgres"
  //USER     = "abc"
  //PASSWORD = "abc"
)

func checkError(err error) {
  if err != nil {
        panic(err)
  }
}

func main() {
        var User string
        var Pwd string
        var Host string
        var Port int
  var Db string
  flag.StringVar(&Host,"h","","默认为空")
  flag.IntVar(&Port,"p",5001,"默认为5001")
  flag.StringVar(&Pwd,"pwd","abc","默认为abc")
  flag.StringVar(&Db,"d","postgres","默认为postgres")
  flag.StringVar(&User,"u","abc","默认为abc")
  flag.Parse()

  fmt.Println("============= Golang ============")
  // Initialize connection string.
  var connectionString string = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", Host, Port,User,Pwd,Db)

  // Initialize connection object.
  db, err := sql.Open("postgres", connectionString)
  checkError(err)

  err = db.Ping()
  checkError(err)
        fmt.Println("Successfully connection to database!!!")

  // Drop previous table of same name if one exists.
  _, err = db.Exec("drop table if exists SmokeTestTable_go;")
  checkError(err)
  fmt.Println("Successfully drop   table")

  // Create table.
  _, err = db.Exec("create table SmokeTestTable_go(id int primary key,name text,gender text);")
  checkError(err)
  fmt.Println("Successfully create table")

  // Insert some data into table.
  sql_statement := "insert into SmokeTestTable_go values ($1, $2, $3);"
  _, err = db.Exec(sql_statement, 1, "banana", "male")
  checkError(err)
  _, err = db.Exec(sql_statement, 2, "orange", "female")
  checkError(err)
  _, err = db.Exec(sql_statement, 3, "apple", "male")
  checkError(err)
  fmt.Println("Successfully insert table")

  _, err = db.Exec("delete from Smoketesttable_go where id = 2")
        checkError(err)
        fmt.Println("Successfully delete table")

        _, err = db.Exec("update SmokeTestTable_go set name = 'update' where id = 3")
        checkError(err)
        fmt.Println("Successfully update table")

  _, err = db.Exec("select * from SmokeTestTable_go")
        checkError(err)
        fmt.Println("Successfully select table")

  fmt.Println("=================================")
}

c

  • PostgreSQL
    • gcc -o smokeTest smokeTest.c -I/path/PostgreSQL-11.5-rel/include -L/path/PostgreSQL-11.5-rel/lib -lpq
    • ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=user password=pwd"
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 * combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
 *
 * gcc -o smokeTest smokeTest.c -I/path/PostgreSQL-11.5-rel/include -L/path/PostgreSQL-11.5-rel/lib -lpq
 * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
 *
 * Test the C version of libpq, the PostgreSQL frontend library.
 */
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"

static void
exit_nicely(PGconn *conn)
{
        PQfinish(conn);
        exit(1);
}

int
main(int argc, char **argv)
{
        const char *conninfo;
        PGconn     *conn;
        PGresult   *res;
        int                     nFields;
        int                     i,
                                j;

        /*
         * If the user supplies a parameter on the command line, use it as the
         * conninfo string; otherwise default to setting dbname=postgres and using
         * environment variables or defaults for all other connection parameters.
         */
        if (argc > 1)
                conninfo = argv[1];
        else
                conninfo = "dbname = postgres host=192.168.0.104 port=6401 user=abc password=abc";

        /* Make a connection to the database */
        conn = PQconnectdb(conninfo);

        /* Check to see that the backend connection was successfully made */
        if (PQstatus(conn) != CONNECTION_OK)
        {
                fprintf(stderr, "Connection to database failed: %s",
                                PQerrorMessage(conn));
                exit_nicely(conn);
        }

        /* Set always-secure search path, so malicous users can't take control. */
        res = PQexec(conn, "SET client_min_messages TO 'warning';");
        PQclear(res);
        res = PQexec(conn, "drop table if exists t1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "drop table ok\n");
        }
        PQclear(res);
        res = PQexec(conn, "RESET client_min_messages;");
        PQclear(res);
        /*
         * Our test case here involves using a cursor, for which we must be inside
         * a transaction block.  We could do the whole thing with a single
         * PQexec() of "select * from pg_database", but that's too trivial to make
         * a good example.
         */

        /* Start a transaction block */
        res = PQexec(conn, "create table t1(id integer primary key,info text, wt integer)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "create table command failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "create table ok\n");
        }
        PQclear(res);

        /*
         * Fetch rows from pg_database, the system catalog of databases
         */
        res = PQexec(conn, "insert into t1(id,info,wt) values(1, 'record1', 1)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "insert record 1 failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "insert record 1 ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "insert into t1(id,info,wt) values(2, 'record2', 2)");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "insert record 2 failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "insert record 2 ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "update t1 set wt = 12 where id = 1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "update record failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "update record ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "select * from t1");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
                fprintf(stderr, "select failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                int nFields = PQnfields(res);
                for(i=0;i<nFields;i++)
                {
                        fprintf(stderr, "%s\t\t",PQfname(res,i));
                }
                fprintf(stderr, "\n");

                for(i=0;i<PQntuples(res);i++)
                {
                        for(j=0;j<nFields;j++)
                        {
                                fprintf(stderr, "%s\t\t",PQgetvalue(res,i,j));
                        }
                        fprintf(stderr, "\n");
                }
        }
        PQclear(res);

        res = PQexec(conn, "delete from t1 where id = 1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "delete record failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "delete record ok\n");
        }
        PQclear(res);

        res = PQexec(conn, "drop table t1");
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
                PQclear(res);
                exit_nicely(conn);
        } else {
                fprintf(stderr, "drop table ok\n");
        }
        PQclear(res);

        /* close the connection to the database and cleanup */
        PQfinish(conn);

        return 0;
}
  • MySQL
    • gcc -I/usr/include/MySQL -L/usr/lib/MySQL MySQL.c -lMySQLclient -o MySQL
    • ./MySQL host port
#include <stdio.h>
#include <MySQL.h>

int main(int argc,char** argv){
  printf("connect to %s:%s\n",argv[1], argv[2]);
  printf("version: %s\n", MySQL_get_client_info());
  MySQL* my = MySQL_init(NULL);
  int port = atoi(argv[2]);

  if(!MySQL_real_connect(my, ("%s", argv[1]), "abc", "abc", "postgres", port, NULL, 0)){
        printf("connect error !\n");
        MySQL_close(my);
  }

  printf("drop table if exists myc_sm;\n");
  MySQL_query(my, "drop table if exists myc_sm;");

  printf("create table myc_sm;\n");
  MySQL_query(my, "create table myc_sm(a int primary key, b text);");

  printf("insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')\n");
  MySQL_query(my, "insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')");

  void select(void)
  {
        printf("\n\nselect * from myc_sm;\n");
        int res = MySQL_query(my, "select * from myc_sm;");
        MySQL_RES* a = MySQL_store_result(my);
        int rows = MySQL_num_rows(a);
        int cols = MySQL_num_fields(a);
        printf("rows: %d, cols: %d\n", rows, cols);
        MySQL_FIELD *field = MySQL_fetch_fields(a);
        for(int i = 0; i < cols; i++)
        {
                printf("%-10s\t", field[i].name);
        }
        puts("");
                MySQL_ROW line;
        for(int i = 0; i < rows; i++)
                {
                        line =  MySQL_fetch_row(a);
                        for(int j = 0; j < cols; j++)
                        {
                                printf("%-10s\t", line[j]);
                        }
                        puts("");
        }
  }

  select();

  printf("update myc_sm set b = 'def' where a = 1;");
  MySQL_query(my, "update myc_sm set b = 'def' where a = 1;");
  select();

  printf("delete from myc_sm where a = 3;");
  MySQL_query(my, "delete from myc_sm where a = 3;");
  select();

  MySQL_close(my);
}

c#

  • PostgreSQL
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Npgsql;

namespace SmokeTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("\n========== C# Driver ============");
            string connSting = "Host=localhost;Port=5401;Username=abc;Password=abc;Database=postgres";
            var conn = new NpgsqlConnection(connSting);

            NpgsqlDataAdapter DA = new NpgsqlDataAdapter();
            //NpgsqlCommand cmd_select = new NpgsqlCommand("select * from SmokeTestTable_csharp");
            //DA.SelectCommand = cmd_select;




            string drop1 = "drop table if exists SmokeTestTable_csharp;";
            string create = "create table SmokeTestTable_csharp(id int primary key,name text,gender text);";
            string insert = "insert into SmokeTestTable_csharp values(1,'li','male'),(2,'delete_me','male'),(3,'update_me','female');";
            string create2 = "create table testfordelete(id int primary key);";
            string droptab = "drop table testfordelete;";
            string delete = "delete from Smoketesttable_csharp where id = 2";
            string update = "update SmokeTestTable_csharp set name = 'update' where id = 3";
            string select = "select * from SmokeTestTable_csharp";
            string drodb = "drop database if exists smoketestdb;";
            string commit = "commit;";
            string credb = "create database smoketestdb;";
            string swdb = "use smoketestdb";
            string dropdb = "drop database smoketestdb;";



            conn.Open();

            using (var com1 = new NpgsqlCommand(drop1, conn))
            using (var com2 = new NpgsqlCommand(create, conn))
            using (var com3 = new NpgsqlCommand(insert, conn))
            using (var com4 = new NpgsqlCommand(create2, conn))
            using (var com5 = new NpgsqlCommand(droptab, conn))
            using (var com6 = new NpgsqlCommand(delete, conn))
            using (var com7 = new NpgsqlCommand(update, conn))
            using (var com8 = new NpgsqlCommand(select, conn))
            using (var drobd1 = new NpgsqlCommand(drodb,conn))
            using (var credb1 = new NpgsqlCommand(credb, conn))
            using (var swdb1 = new NpgsqlCommand(swdb, conn))
            using (var dropdb1 = new NpgsqlCommand(dropdb, conn))
            using (var comm = new NpgsqlCommand(commit,conn))

            {

                //drobd1.ExecuteNonQuery();
                //Console.WriteLine("drop table success!");
                //credb1.ExecuteNonQuery();
                //Console.WriteLine("create database success!");
                //comm.ExecuteNonQuery();
                //Console.WriteLine("commit success!");
                //swdb1.ExecuteNonQuery();
                //Console.WriteLine("switch database success!");

                com1.ExecuteNonQuery();
                Console.WriteLine("drop   table success!");
                com2.ExecuteNonQuery();
                Console.WriteLine("create table success!");
                com3.ExecuteNonQuery();
                Console.WriteLine("insert table success!");
                com4.ExecuteNonQuery();
                Console.WriteLine("create table success!");
                com5.ExecuteNonQuery();
                Console.WriteLine("drop   table success!");
                com6.ExecuteNonQuery();
                Console.WriteLine("delete table success!");
                com7.ExecuteNonQuery();
                Console.WriteLine("update table success!");
                com8.ExecuteNonQuery();
                Console.WriteLine("select table success!");
                comm.ExecuteNonQuery();
                Console.WriteLine("commit table success!");
                //dropdb1.ExecuteNonQuery();
                //Console.WriteLine("drop database success!");
            }
            conn.Close();
            Console.WriteLine("=================================\n");
        }
    }
}
  • MySQL
using System;
using System.Collections.Generic;
using System.Data;
using MySQL.Data.MySQLClient;
using System.Text;

namespace MySQL
{
    class Program
    {
        static void Main(string[] args)
        {
        //server=127.0.0.1;port=3306;user=root;password=root; database=minecraftdb;
                string cs = "server=" + args[0] + ";user=abc;password=abc;port=" + args[1] + ";database=postgres";
                Console.WriteLine("testing MySQL: " + cs);

        MySQLConnection conn = null;
        conn = new MySQLConnection(cs);
        //conn.Open();

        //Console.WriteLine("drop table if exists mycs_sm;");
        //MySQLCommand cmd = new MySQLCommand("drop table if exists mycs_sm;", conn);
        //int n = cmd.ExecuteNonQuery();

        List<string> sqlList = new List<string>();
        sqlList.Add("drop table if exists mycs_sm;");
        sqlList.Add("create table mycs_sm(a int primary key, b text);");
        sqlList.Add("insert into mycs_sm values(1,'abc'),(2,'bcd'),(3,'cde');");
        sqlList.Add("select * from mycs_sm;");
        sqlList.Add("update mycs_sm set b = 'def' where a = 1;");
        sqlList.Add("select * from mycs_sm;");
        sqlList.Add("delete from mycs_sm where a = 3;");
        sqlList.Add("select * from mycs_sm;");

        foreach (string i in sqlList){
                Console.WriteLine(i);
                List<string> list = new List<string>();
                if (i == "select * from mycs_sm;"){
                        conn.Open();
                        MySQLCommand cmd = new MySQLCommand(i, conn);
                        MySQLDataReader reader = cmd.ExecuteReader();
                        while (reader.Read()){
                                string id = reader.GetString("a");
                                string name = reader.GetString("b");
                                Console.WriteLine(id + " : " + name);
                        }
                        conn.Close();
                }
                else {
                        conn.Open();
                        MySQLCommand cmd = new MySQLCommand(i, conn);
                        cmd.ExecuteNonQuery();
                        conn.Close();
                }
        }

        //conn.Close();
        }
    }
}

c++

  • PostgreSQL
    • compile
      • g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
/*
 * Copyright (c) 2019 ZettaDB inc. All rights reserved.
 * This source code is licensed under Apache 2.0 License,
 *
 * sudo apt install libpq-dev
 * git clone https://github.com/jtv/libpqxx.git
 * cd libpqxx
 * ./configure
 * make
 * sudo make install
 *
 * g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
 * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
 *
 * Test the C++ version of libpqxx, the PostgreSQL frontend library.
 */
#include <iostream>
#include <pqxx/pqxx>

using namespace std;
using namespace pqxx;

int
main(int argc, char **argv)
{
  const char *conninfo;

  if (argc > 1)
        conninfo = argv[1];
  else
        conninfo = "dbname = postgres user=abc password=abc hostaddr=127.0.0.1 port=5401";

  try{
        pqxx::connection db(conninfo);
        if (db.is_open()) {
                cout << "Opened database successfully: " << db.dbname() << endl;
        } else {
                cout << "Can't open database" << endl;
                return 1;
        }

        pqxx::nontransaction txn1{db};
        txn1.exec("drop table if exists t1");
        txn1.exec("create table t1(id integer primary key, info text, wt integer)");
        txn1.commit();

        pqxx::work txn2{db};
        txn2.exec("insert into t1(id,info,wt) values(1, 'record1', 1)");
        txn2.exec("insert into t1(id,info,wt) values(2, 'record2', 2)");
        txn2.exec("insert into t1(id,info,wt) values(3, 'record3', 3)");
        txn2.exec("update t1 set wt = 12 where id = 1");
        txn2.exec("delete from t1 where id = 2");

        pqxx::result r2{txn2.exec("select * from t1")};
        for (auto row: r2)
                std::cout << row[0] << " " << row[1] << " " << row[2] << std::endl;

        txn2.commit();

        pqxx::nontransaction txn3{db};
        txn3.exec("drop table t1");
        txn3.commit();

        db.close();
  }catch (const std::exception &e){
        cerr << e.what() << std::endl;
        return 1;
  }

  return 0;
}
  • MySQL
    • compile
      • g++ MySQL.cpp -lMySQLcppconn -o MySQL
// g++ MySQL.cpp -lMySQLcppconn -o MySQLtest
// ./MySQLtest "tcp://192.168.0.113:5661"
#include "MySQL_connection.h"
#include <stdlib.h>
#include <iostream>
#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
using namespace std;
int main(int argc,char* argv[]){
  sql::Driver *driver;
        sql::Connection *con;
        sql::Statement *stmt;
        sql::ResultSet *res;

  /* Create a connection */
        driver = get_driver_instance();
  //string infos = sprintf("\"tcp://" , argv[1] , "\"");
  //string in1 = "\"tcp://";
  //string in2 = "\"";
  //string infos = in1 + argv[1] + in2;
  string infos = argv[1];
        con = driver->connect(infos, "abc", "abc");
  con->setSchema("postgres");
  stmt = con->createStatement();
        stmt->execute("drop table if exists mycpp_sm;");
  cout<<"drop table if exists mycpp_sm;"<<endl;
  stmt->execute("create table mycpp_sm(a int primary key, b text)");
  cout<<"create table mycpp_sm(a int primary key, b text)"<<endl;
  stmt->execute("insert into mycpp_sm values(1, 'abc'),(2,'bcd'),(3,'cde')");
  cout<<"insert into mycpp_sm(1, 'abc'),(2,'bcd'),(3, 'cde')"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
  stmt->execute("update mycpp_sm set b = 'qwer' where a = 2");
  cout<<"update mycpp_sm set b = 'qwer' where a = 2"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
        stmt->execute("delete from mycpp_sm where a = 3");
  cout<<"delete from mycpp_sm where a = 3"<<endl;
  stmt->executeQuery("select * from mycpp_sm");
  cout<<"select * from mycpp_sm"<<endl;
  delete stmt;
  delete con;
}

rust

  • PostgreSQL
    • compile
      • cargo build (this step will download and install all dependencies)
use clap::{App, Arg};
use postgres::{Client, NoTls};

const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: &str = "7999";
const DEFAULT_USER: &str = "abc";
const DEFAULT_PASS: &str = "abc";
const DEFAULT_DB: &str = "postgres";

struct ConnectionArgs {
    pub host: String,
    pub port: String,
    pub user: String,
    pub pass: String,
    pub db: String,
}

fn parse_args() -> ConnectionArgs {
    let matches = App::new("Execute SQL in Postgres")
        .arg(
            Arg::with_name("host")
                .short("h")
                .long("host")
                .value_name("HOSTNAME")
                .default_value(DEFAULT_HOST)
                .help("Sets the host name of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("port")
                .short("p")
                .long("port")
                .value_name("PORT")
                .default_value(DEFAULT_PORT)
                .help("Sets the port of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("user")
                .short("u")
                .long("user")
                .value_name("USERNAME")
                .default_value(DEFAULT_USER)
                .help("Sets the user to log into PostgreSQL")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("pass")
                .long("pass")
                .value_name("PASSWORD")
                .default_value(DEFAULT_PASS)
                .help("Sets the user's password")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("db")
                .short("d")
                .long("database")
                .value_name("DATABASE")
                .default_value(DEFAULT_DB)
                .help("Sets the database to use")
                .takes_value(true),
        )
        .get_matches();

    ConnectionArgs {
        host: matches.value_of("host").unwrap().to_string(),
        port: matches.value_of("port").unwrap().to_string(),
        user: matches.value_of("user").unwrap().to_string(),
        pass: matches.value_of("pass").unwrap().to_string(),
        db: matches.value_of("db").unwrap().to_string(),
    }
}

fn main() {
    let args = parse_args();

    let sqls = [
        "drop table if exists t1;",
        "create table t1(id integer primary key, info text, wt integer);",
        "insert into t1(id,info,wt) values(1, 'record1', 1);",
        "insert into t1(id,info,wt) values(2, 'record2', 2);",
        "update t1 set wt = 12 where id = 1;",
        "select * from t1;",
        "delete from t1 where id = 1;",
        "select * from t1;",
    ];

    let mut client = Client::connect(
        &format!(
            "postgres://{}:{}@{}:{}/{}",
            args.user, args.pass, args.host, args.port, args.db
        ),
        NoTls,
    )
    .unwrap();

    for sql in sqls {
        // 或者使用 let result = client.query(sql, &[]);
        // query 会输出结果,execute 只返回影响的行数
        let result = client.execute(sql, &[]);
        println!("command: {}, res: {:?}", sql, result);
    }
}
  • MySQL
    • compile
      • cargo build (this step will download and install all dependencies)
use MySQL::*;
use MySQL::prelude::*;
//use std::env;
use clap::{App, Arg};

fn main() {
    let matches = App::new("Execute SQL in Postgres")
        .arg(
            Arg::with_name("host")
                .short("h")
                .long("host")
                .value_name("HOSTNAME")
                .help("Sets the host name of PostgreSQL service")
                .takes_value(true),
        )
        .arg(
           Arg::with_name("port")
                .short("p")
                .long("port")
                .value_name("PORT")
                .help("Sets the port of PostgreSQL service")
                .takes_value(true),
        ).get_matches();
    let host = matches.value_of("host").unwrap_or("192.168.0.113").to_string();
    let port = matches.value_of("port").unwrap_or("5662").to_string();
    let front = "MySQL://pgx:pgx_pwd@".to_string();

    let hosts = front + &host + ":" + &port + "/MySQL";
    let url = hosts.to_string();
    // let mut hosts = front + &host + ":".to_string() + &port + "/MySQL".to_string();
//    let mut url = format!("MySQL://pgx:pgx_pwd@{:?}:{:?}/MySQL", host, port);

    let urls = "MySQL://abc:abc@192.168.0.113:5662/postgres";
    println!("{}", url);
    println!("{}", urls);
    let pool = Pool::new(urls).unwrap(); // 获取连接池
    let mut conn = pool.get_conn().unwrap();// 获取链接

    let sql = [
        "drop table if exists myrs_sm;",
        "create table myrs_sm(a int primary key, b text);",
        "insert into myrs_sm values(1, 'abc'),(2,'bcd'),(3,'cde');",
        "select * from myrs_sm;",
        "update myrs_sm set b = 'def' where a = 1",
        "select * from myrs_sm;",
        "delete from myrs_sm where a = 3",
        "select * from myrs_sm;"
    ];

    let selects = "select * from myrs_sm;";

    for sqls in sql{
        conn.query_iter(sqls).unwrap();
        println!();
        println!("{:?}", sqls);
        if sqls == "select * from myrs_sm;"{
            conn.query_iter(selects).unwrap()
                .for_each(|row|{
                let r:(i32,String)=from_row(row.unwrap());
                println!("result: a={},b='{}'",r.0,r.1);
            });
        }
    }
}

Basic DDL

Database

create database

CREATE DATABASE name
    [ [ WITH ] [ OWNER [=] user_name ]
           [ TEMPLATE [=] template ]
           [ ENCODING [=] encoding ]
           [ LOCALE [=] locale ]
           [ LC_COLLATE [=] lc_collate ]
           [ LC_CTYPE [=] lc_ctype ]
           [ TABLESPACE [=] tablespace_name ]
           [ ALLOW_CONNECTIONS [=] allowconn ]
           [ CONNECTION LIMIT [=] connlimit ]
           [ IS_TEMPLATE [=] istemplate ] ]

create a new database

create database kunlun;

pg_defaultTo create a new database kunlun owned by user abc in a default tablespace :

CREATE DATABASE kunlun OWNER abc TABLESPACE pg_default;

More reference link [create database](http:// Klustron .com:14000/docs/html/sql-createdatabase.html)

alter database

ALTER DATABASE name [ [ WITH ] option [ ... ] ]


这里 option 可以是:

    ALLOW_CONNECTIONS allowconn
    CONNECTION LIMIT connlimit
    IS_TEMPLATE istemplate

ALTER DATABASE name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }

ALTER DATABASE name SET TABLESPACE new_tablespace

ALTER DATABASE name SET configuration_parameter { TO | = } { value | DEFAULT }
ALTER DATABASE name SET configuration_parameter FROM CURRENT
ALTER DATABASE name RESET configuration_parameter
ALTER DATABASE name RESET ALL

Change the owner of the database

ALTER DATABASE kunlun OWNER TO abc;

Disable index scans in database kunlun

ALTER DATABASE kunlun SET enable_indexscan TO off;

Modify the maximum number of connections to the database:

alter database kunlun connection limit 10;

More reference link alter databaseopen in new window

drop database

DROP DATABASE [ IF EXISTS ] name

More reference link drop databaseopen in new window

Schema

create schema

CREATE SCHEMA schema_name [ AUTHORIZATION role_specification ] [ schema_element [ ... ] ]
CREATE SCHEMA AUTHORIZATION role_specification [ schema_element [ ... ] ]
CREATE SCHEMA IF NOT EXISTS schema_name [ AUTHORIZATION role_specification ]
CREATE SCHEMA IF NOT EXISTS AUTHORIZATION role_specification

其中 role_specification 可以是:

    user_name
  | CURRENT_USER
  | SESSION_USER

Create a new schema

create schema myschema;

Create a schema for user kunlun, which is also named kunlun:

CREATE SCHEMA AUTHORIZATION kunlun;

More reference link create schemaopen in new window

alter schema

ALTER SCHEMA name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }

Change the owner of the schema

 alter schema kunlun owner to vito;

More reference link alter schemaopen in new window

drop schema

DROP SCHEMA [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

Delete the schema named kunlun:

drop schema kunlun;

More reference link drop schemaopen in new window

Table

create table

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name ( [
  { column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]
    | table_constraint
    | LIKE source_table [ like_option ... ] }
    [, ... ]
] )
[ INHERITS ( parent_table [, ... ] ) ]
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name
    OF type_name [ (
  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]
    | table_constraint }
    [, ... ]
) ]
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name
    PARTITION OF parent_table [ (
  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]
    | table_constraint }
    [, ... ]
) ] { FOR VALUES partition_bound_spec | DEFAULT }
[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ]
[ USING method ]
[ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ]
[ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ]
[ TABLESPACE tablespace_name ]

其中 column_constraint 是:

[ CONSTRAINT constraint_name ]
{ NOT NULL |
  NULL |
  DEFAULT default_expr |
  GENERATED ALWAYS AS ( generation_expr ) STORED |
  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |
  UNIQUE index_parameters |
  PRIMARY KEY index_parameters |
  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]
    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

table_constraint 是:

[ CONSTRAINT constraint_name ]
{ 
  UNIQUE ( column_name [, ... ] ) index_parameters |
  PRIMARY KEY ( column_name [, ... ] ) index_parameters |
like_option 是:

{ INCLUDING | EXCLUDING } { COMMENTS | CONSTRAINTS | DEFAULTS | GENERATED | IDENTITY | INDEXES | STATISTICS | STORAGE | ALL }

partition_bound_spec 是:

IN ( partition_bound_expr [, ...] ) |
FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )
  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |
WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )

UNIQUE、PRIMARY KEY以及EXCLUDE约束中的index_parameters是:

[ INCLUDE ( column_name [, ... ] ) ]
[ WITH ( storage_parameter [= value] [, ... ] ) ]
[ USING INDEX TABLESPACE tablespace_name ]

一个EXCLUDE约束中的exclude_element是:

{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

Create table t1:

create table t1(a int , b int);

Create temporary table v1:

create temp table v1(a int, b text);

Create a hash partition table:

CREATE TABLE t2 (
    order_id     bigint not null,
    cust_id      bigint not null,
    status       text
) PARTITION BY HASH (order_id);

Create a partition table:

CREATE TABLE t3 (
    logdate         date not null,
    peaktemp        int,
    unitsales       int
) PARTITION BY RANGE (logdate);

More reference links

create tableopen in new window

alter table

ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    action [, ... ]
ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    RENAME [ COLUMN ] column_name TO new_column_name
ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]
    RENAME CONSTRAINT constraint_name TO new_constraint_name
ALTER TABLE [ IF EXISTS ] name
    RENAME TO new_name
ALTER TABLE [ IF EXISTS ] name
    SET SCHEMA new_schema
ALTER TABLE ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]
    SET TABLESPACE new_tablespace [ NOWAIT ]
ALTER TABLE [ IF EXISTS ] name
    ATTACH PARTITION partition_name { FOR VALUES partition_bound_spec | DEFAULT }
ALTER TABLE [ IF EXISTS ] name
    DETACH PARTITION partition_name

其中action 是以下之一:

    ADD [ COLUMN ] [ IF NOT EXISTS ] column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]
    DROP [ COLUMN ] [ IF EXISTS ] column_name [ RESTRICT | CASCADE ]
    ALTER [ COLUMN ] column_name [ SET DATA ] TYPE data_type [ COLLATE collation ] [ USING expression ]
    ALTER [ COLUMN ] column_name SET DEFAULT expression
    ALTER [ COLUMN ] column_name DROP DEFAULT
    ALTER [ COLUMN ] column_name { SET | DROP } NOT NULL
    ALTER [ COLUMN ] column_name DROP EXPRESSION [ IF EXISTS ]
    ALTER [ COLUMN ] column_name ADD GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ]
    ALTER [ COLUMN ] column_name { SET GENERATED { ALWAYS | BY DEFAULT } | SET sequence_option | RESTART [ [ WITH ] restart ] } [...]
    ALTER [ COLUMN ] column_name DROP IDENTITY [ IF EXISTS ]
    ALTER [ COLUMN ] column_name SET STATISTICS integer
    ALTER [ COLUMN ] column_name SET ( attribute_option = value [, ... ] )
    ALTER [ COLUMN ] column_name RESET ( attribute_option [, ... ] )
    ALTER [ COLUMN ] column_name SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
    ADD table_constraint [ NOT VALID ]
    ADD table_constraint_using_index
    ALTER CONSTRAINT constraint_name [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
    VALIDATE CONSTRAINT constraint_name
    DROP CONSTRAINT [ IF EXISTS ]  constraint_name [ RESTRICT | CASCADE ]
    ENABLE REPLICA TRIGGER trigger_name
    ENABLE ALWAYS TRIGGER trigger_name
    DISABLE RULE rewrite_rule_name
    ENABLE RULE rewrite_rule_name
    ENABLE REPLICA RULE rewrite_rule_name
    ENABLE ALWAYS RULE rewrite_rule_name
    DISABLE ROW LEVEL SECURITY
    ENABLE ROW LEVEL SECURITY
    FORCE ROW LEVEL SECURITY
    NO FORCE ROW LEVEL SECURITY
    SET TABLESPACE new_tablespace
    SET ( storage_parameter [= value] [, ... ] )
    RESET ( storage_parameter [, ... ] )
    INHERIT parent_table
    NO INHERIT parent_table
    OF type_name
    NOT OF
    OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
    REPLICA IDENTITY { DEFAULT | USING INDEX index_name | FULL | NOTHING }

and partition_bound_spec is:

IN ( partition_bound_expr [, ...] ) |
FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )
  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |
WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )

and column_constraint is:

[ CONSTRAINT constraint_name ]
{ NOT NULL |
  NULL |
  DEFAULT default_expr |
  GENERATED ALWAYS AS ( generation_expr ) STORED |
  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |
  UNIQUE index_parameters |
  PRIMARY KEY index_parameters |
  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]
    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

而table_constraint是:

[ CONSTRAINT constraint_name ]
{  UNIQUE ( column_name [, ... ] ) index_parameters |
  PRIMARY KEY ( column_name [, ... ] ) index_parameters |
  EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] |
    [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] }
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

并且 table_constraint_using_index 是:

    [ CONSTRAINT constraint_name ]
    { UNIQUE | PRIMARY KEY } USING INDEX index_name
    [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]

UNIQUE、PRIMARY KEY以及EXCLUDE约束中的index_parameters是:

[ INCLUDE ( column_name [, ... ] ) ]
[ WITH ( storage_parameter [= value] [, ... ] ) ]
[ USING INDEX TABLESPACE tablespace_name ]

exclude_element in an EXCLUDE constraint is:

{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

Rename the name of the table kunlun:

create table kunlun(a int not null);
alter table kunlun rename to t1;

Add a column to the t1 table:

alter table t1 add column bb text;

The bb field is renamed to b:

alter table t1 rename column bb to b;

Modify the data type of field b in the table:

ALTER TABLE t1 ALTER COLUMN b type varchar(10);

Add a unique constraint to the b field:

alter table t1 add constraint unique_t1_b unique (b);

Add a not-null constraint to column b:

ALTER TABLE t1 ALTER COLUMN b SET NOT NULL;

Remove the not-null constraint on column b:

ALTER TABLE t1 ALTER COLUMN b drop NOT NULL;

Move the t1 table into another schema:

create schema kunlun;
ALTER TABLE public.t1 SET SCHEMA kunlun;

More reference link alter tableopen in new window

drop table

DROP TABLE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

Drop table t1:

drop table t1;

More reference link drop tableopen in new window

Index

create index

CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] name ] ON [ ONLY ] table_name [ USING method ]
    ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
    [ INCLUDE ( column_name [, ...] ) ]
    [ WITH ( storage_parameter = value [, ... ] ) ]
    [ TABLESPACE tablespace_name ]
    [ WHERE predicate ]

More reference link create indexopen in new window

alter index

ALTER INDEX [ IF EXISTS ] name RENAME TO new_name
ALTER INDEX [ IF EXISTS ] name SET TABLESPACE tablespace_name
ALTER INDEX name ATTACH PARTITION index_name
ALTER INDEX name DEPENDS ON EXTENSION extension_name
ALTER INDEX [ IF EXISTS ] name SET ( storage_parameter = value [, ... ] )
ALTER INDEX [ IF EXISTS ] name RESET ( storage_parameter [, ... ] )
ALTER INDEX [ IF EXISTS ] name ALTER [ COLUMN ] column_number
    SET STATISTICS integer
ALTER INDEX ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]
    SET TABLESPACE new_tablespace [ NOWAIT ]

Rename an existing index:

ALTER INDEX distributors RENAME TO suppliers;

More reference link alter indexopen in new window

drop index

DROP INDEX [ CONCURRENTLY ] [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

Drop an existing index:

drop index suppliers;

More reference link drop indexopen in new window

Sequence

create sequence

CREATE [ TEMPORARY | TEMP ] SEQUENCE [ IF NOT EXISTS ] name [ INCREMENT [ BY ] increment ]
    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]
    [ START [ WITH ] start ] [ CACHE cache ] [ [ NO ] CYCLE ]
    [ OWNED BY { table_name.column_name | NONE } ]

Create a sequence called kunlun starting at 100:

CREATE SEQUENCE kunlun START 100;
SELECT nextval('kunlun');
 nextval
---------
     100
SELECT nextval('kunlun');
 nextval
---------
     101

More reference link create sequenceopen in new window

alter sequence

ALTER SEQUENCE [ IF EXISTS ] name
    [ AS data_type ]
    [ INCREMENT [ BY ] increment ]
    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]
    [ START [ WITH ] start ]
    [ RESTART [ [ WITH ] restart ] ]
    [ CACHE cache ] [ [ NO ] CYCLE ]
    [ OWNED BY { table_name.column_name | NONE } ]
ALTER SEQUENCE [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER SEQUENCE [ IF EXISTS ] name RENAME TO new_name
ALTER SEQUENCE [ IF EXISTS ] name SET SCHEMA new_schema

Change the kunlun sequence to start with 200:

ALTER SEQUENCE kunlun RESTART WITH 200;
SELECT nextval('kunlun');
 nextval
---------
     200
SELECT nextval('kunlun');
 nextval
---------
     201

More reference link alter sequenceopen in new window

drop sequence

DROP SEQUENCE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

Delete the kunlun sequence:

DROP SEQUENCE kunlun;

More reference link drop sequenceopen in new window

View

create view

CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] [ RECURSIVE ] VIEW name [ ( column_name [, ...] ) ]
    [ WITH ( view_option_name [= view_option_value] [, ... ] ) ]
    AS query
    [ WITH [ CASCADED | LOCAL ] CHECK OPTION ]

Create a view for v1:

create table t1(id int, a int);
insert into t1(id,a) values (1,2),(2,4),(3,8),(6,0);
CREATE VIEW v1 AS
    SELECT *
    FROM t1
    WHERE id<5;

More reference link create viewopen in new window

alter view

ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name SET DEFAULT expression
ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name DROP DEFAULT
ALTER VIEW [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }
ALTER VIEW [ IF EXISTS ] name RENAME TO new_name
ALTER VIEW [ IF EXISTS ] name SET SCHEMA new_schema
ALTER VIEW [ IF EXISTS ] name SET ( view_option_name [= view_option_value] [, ... ] )
ALTER VIEW [ IF EXISTS ] name RESET ( view_option_name [, ... ] )

Change the name of the v1 view:

alter view v1 rename to vv1;

More reference link alter viewopen in new window

drop view

DROP VIEW [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

Delete the vv1 view:

drop view vv1;

More reference links drop viewopen in new window

stored procedure

http://192.168.0.104/pgdocs/html/storage.html

User and permission management

create role

CREATE ROLE name [ [ WITH ] option [ ... ] ]

where option可以是:

      SUPERUSER | NOSUPERUSER
    | CREATEDB | NOCREATEDB
    | CREATEROLE | NOCREATEROLE
    | INHERIT | NOINHERIT
    | LOGIN | NOLOGIN
    | REPLICATION | NOREPLICATION
    | BYPASSRLS | NOBYPASSRLS
    | CONNECTION LIMIT connlimit
    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL
    | VALID UNTIL 'timestamp'
    | IN ROLE role_name [, ...]
    | IN GROUP role_name [, ...]
    | ROLE role_name [, ...]
    | ADMIN role_name [, ...]
    | USER role_name [, ...]
    | SYSID uid

Create a role kunlun that can log in, it has no password:

create role kunlun LOGIN;

Create a user who can log in and has a password: kunlun1: Exactly the same CREATE USERas CREATE ROLE, except it has LOGIN;

CREATE USER kunlun1 WITH PASSWORD '12345678';

Create user kunlun2, the password is valid until January 1, 2024:

CREATE ROLE kunlun2 WITH LOGIN PASSWORD '12345678' VALID UNTIL '2024-01-01';

Create a role kunlun3 that can create databases and manage roles:

CREATE ROLE kunlun3 WITH CREATEDB CREATEROLE;

More reference link create roleopen in new window

alter role

ALTER ROLE role_specification [ WITH ] option [ ... ]

其中option可以是:

      SUPERUSER | NOSUPERUSER
    | CREATEDB | NOCREATEDB
    | CREATEROLE | NOCREATEROLE
    | INHERIT | NOINHERIT
    | LOGIN | NOLOGIN
    | REPLICATION | NOREPLICATION
    | BYPASSRLS | NOBYPASSRLS
    | CONNECTION LIMIT connlimit
    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL
    | VALID UNTIL 'timestamp'

ALTER ROLE name RENAME TO new_name

ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter { TO | = } { value | DEFAULT }
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter FROM CURRENT
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET configuration_parameter
ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET ALL

其中role_specification可以是:

    role_name
  | CURRENT_USER
  | SESSION_USER

Change the password of kunlun1:

ALTER ROLE kunlun1 WITH PASSWORD '87654321';

Remove the password for kunlun2:

ALTER ROLE kunlun2 WITH PASSWORD NULL;

More reference link alter roleopen in new window

drop role

DROP ROLE [ IF EXISTS ] name [, ...]

Delete the created role kunlun:

drop role kunlun;

More reference link drop roleopen in new window

grant

GRANT { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { [ TABLE ] table_name [, ...]
         | ALL TABLES IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )
    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }
    ON [ TABLE ] table_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { USAGE | SELECT | UPDATE }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { SEQUENCE sequence_name [, ...]
         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }
    ON DATABASE database_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON DOMAIN domain_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN DATA WRAPPER fdw_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN SERVER server_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { EXECUTE | ALL [ PRIVILEGES ] }
    ON { { FUNCTION | PROCEDURE | ROUTINE } routine_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]
         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON LANGUAGE lang_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }
    ON LARGE OBJECT loid [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
    ON SCHEMA schema_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { CREATE | ALL [ PRIVILEGES ] }
    ON TABLESPACE tablespace_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

GRANT { USAGE | ALL [ PRIVILEGES ] }
    ON TYPE type_name [, ...]
    TO role_specification [, ...] [ WITH GRANT OPTION ]

其中role_specification可以是:

    [ GROUP ] role_name
  | PUBLIC
  | CURRENT_USER
  | SESSION_USER

GRANT role_name [, ...] TO role_name [, ...] [ WITH ADMIN OPTION ]

Grant insert privileges on table t1 to all users:

GRANT INSERT ON t1 TO PUBLIC;

More reference link grantopen in new window

revoke

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { [ TABLE ] table_name [, ...]
         | ALL TABLES IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )
    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }
    ON [ TABLE ] table_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { USAGE | SELECT | UPDATE }
    [, ...] | ALL [ PRIVILEGES ] }
    ON { SEQUENCE sequence_name [, ...]
         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }
    ON DATABASE database_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON DOMAIN domain_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN DATA WRAPPER fdw_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON FOREIGN SERVER server_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { EXECUTE | ALL [ PRIVILEGES ] }
    ON { { FUNCTION | PROCEDURE | ROUTINE } function_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]
         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON LANGUAGE lang_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }
    ON LARGE OBJECT loid [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }
    ON SCHEMA schema_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { CREATE | ALL [ PRIVILEGES ] }
    ON TABLESPACE tablespace_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ GRANT OPTION FOR ]
    { USAGE | ALL [ PRIVILEGES ] }
    ON TYPE type_name [, ...]
    FROM { [ GROUP ] role_name | PUBLIC } [, ...]
    [ CASCADE | RESTRICT ]

REVOKE [ ADMIN OPTION FOR ]
    role_name [, ...] FROM role_name [, ...]
    [ CASCADE | RESTRICT ]

Revoke insert privileges on table t1 from public:

REVOKE INSERT ON t1 FROM PUBLIC;

More reference link revokeopen in new window

Other DDLs

truncateopen in new window

TRUNCATE [ TABLE ] [ ONLY ] name [ * ] [, ... ]
    [ RESTART IDENTITY | CONTINUE IDENTITY ] [ CASCADE | RESTRICT ]

More reference link truncateopen in new window

Basic DML

insert

[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    [ OVERRIDING { SYSTEM | USER } VALUE ]
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

其中 conflict_target 可以是以下之一:

    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_name

并且 conflict_action 是以下之一:

    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]

Insert data into table t1:

create table t1(id int, a int);
insert into t1(id,a) values (1,2);
insert into t1(id,a) values (2,4),(3,8),(6,0);

More reference links insertopen in new window

update

[ WITH [ RECURSIVE ] with_query [, ...] ]
UPDATE [ ONLY ] table_name [ * ] [ [ AS ] alias ]
    SET { column_name = { expression | DEFAULT } |
          ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
          ( column_name [, ...] ) = ( sub-SELECT )
        } [, ...]
    [ FROM from_list ]
    [ WHERE condition | WHERE CURRENT OF cursor_name ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

Change 4 in column a of table t1 to 3:

UPDATE t1 SET  a='3' WHERE id=2;

More reference links updateopen in new window

delete

[ WITH [ RECURSIVE ] with_query [, ...] ]
DELETE FROM [ ONLY ] table_name [ * ] [ [ AS ] alias ]
    [ USING from_item [, ...] ]
    [ WHERE condition | WHERE CURRENT OF cursor_name ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

Delete the row with id equal to 6 in table t1:

DELETE FROM t1 WHERE id=6;

More reference links deleteopen in new window

select

[ WITH [ RECURSIVE ] with_query [, ...] ]
SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
    [ * | expression [ [ AS ] output_name ] [, ...] ]
    [ FROM from_item [, ...] ]
    [ WHERE condition ]
    [ GROUP BY grouping_element [, ...] ]
    [ HAVING condition ]
    [ WINDOW window_name AS ( window_definition ) [, ...] ]
    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]
    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start [ ROW | ROWS ] ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } { ONLY | WITH TIES } ]
    [ FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE } [ OF table_name [, ...] ] [ NOWAIT | SKIP LOCKED ] [...] ]

其中 from_item 可以是以下之一:

    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]
    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]
    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    [ LATERAL ] function_name ( [ argument [, ...] ] )
                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )
    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )
    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )
                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]
    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) ]

并且 grouping_element 可以是以下之一:

    ( )
    expression
    ( expression [, ...] )
    ROLLUP ( { expression | ( expression [, ...] ) } [, ...] )
    CUBE ( { expression | ( expression [, ...] ) } [, ...] )
    GROUPING SETS ( grouping_element [, ...] )

并且 with_query 是:

    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )

TABLE [ ONLY ] table_name [ * ]

To view the t1 table,

select * from t1;

Equivalent to

table t1;

More reference links selectopen in new window

MySQL-specific DML

insert ignore

If the database has no content, insert new data, if there is data, skip this data:

drop table if exists t1;
create table t1 (a int PRIMARY KEY , b int not null,CONSTRAINT t1_b_key UNIQUE (b));
insert ignore into t1(a,b) values (4,4);
# 重复进行insert,因为已经存在而进行跳过
insert ignore into t1(a,b) values (4,4);

replace into

View what exists in t1:

insert into t1(a,b) values (2,3);
table t1;
 a | b
---+---
 2 | 3
 4 | 4
(2 rows)

If there are conflicts, other conflicting tuples are deleted before inserting:

replace into t1 values(1,1),(1,2);
table t1;
 a | b
---+---
 1 | 2
 2 | 3
 4 | 4
(3 rows)

Prepared Statement

PostgreSQL syntax and examples

(call using client api)

PREPARE name [ ( data_type [, ...] ) ] AS statement

MySQL syntax and examples

(call using client api)

transaction processing

Klustron supports the same transaction processing function as MySQL and has nothing to do with MySQL connection protocol or PostgreSQL connection protocol, and has the same behavior in these two types of connections.

autocommit transaction

There is an autocommit variable in a client connection, the default value is true, and the user can dynamically modify the value of autocommit in the connection. When autocommit is true, any DML statement sent by the user will be executed as an independent transaction, and Klustron will automatically commit the transaction when the statement execution ends.

DDL statements are always executed as an independent transaction, regardless of whether autocommit is true or false. If there is already a running transaction in this connection when executing DDL, Klustron will submit this transaction first, and then execute this DDL.

explicit transaction

We call transactions that are not autocommit explicit transactions. An explicit transaction can be started and committed explicitly or implicitly.

Explicitly start and commit/rollback transactions

  1. The user executes begin or start transaction to explicitly start a transaction
  2. Execute several DML statements
  3. Execute commit to explicitly commit the transaction; or execute rollback to explicitly roll back the transaction

Implicitly open and implicitly commit transactions

If the user sets autocommit to false, then after the client connects to the first DML statement received in C, it will automatically (implicitly) start a transaction T, and execute this statement in this transaction T. When receiving more DML statements in this connection C, Klustron will continue to execute these statements in transaction T.

An explicit transaction can also be committed implicitly. When Klustron receives set autocommit=true or any DDL statement in explicit transaction T, Klustron will (implicitly) commit transaction T.

If there is an error in executing a DML statement in an explicit transaction, transaction T will be automatically rolled back inside Klustron, and any DML statement sent by the user in transaction T in connection C will be ignored by Klustron until it is received in connection C After commit or rollback, Klustron will roll back transaction T.

Examples of transaction processing functions

autocommit transaction example

When autocommit is enabled, any DML statement sent by the user will be executed as an independent transaction, and Klustron will automatically commit the transaction when the statement execution ends.

set autocommit=true;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);
insert into t1(b) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
drop table if exists t2;

![img](KunlunBase 产品使用和评测指南/9.png)

When autocommit is off, manual submission or rollback is required

set autocommit=off;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);

Open another session to view the t1 table ![img](KunlunBase 产品使用和评测指南/10.png)

The insertion will only be successful after the commit is submitted ![img](KunlunBase 产品使用和评测指南/11.png) ![img](KunlunBase 产品使用和评测指南/12.png)

Insert a piece of data into the transaction and perform a manual rollback ![img](KunlunBase 产品使用和评测指南/13.png)

DDL statements are always executed as a separate transaction

The ddl statement has been submitted, and the commit prompt does not detect the transaction

set autocommit=true;
show autocommit;
begin;
create table t1(a serial primary key,  b int);
commit;

![img](KunlunBase 产品使用和评测指南/14.png)

Setting autocommit to false is the same

set autocommit=false;
show autocommit;
begin;
drop table t1;
commit;

![img](KunlunBase 产品使用和评测指南/15.png)

Explicit transaction example

Close the implicit commit first, and perform the transaction in the display

show autocommit;
set autocommit=off;
begin;
create table t1(a serial primary key,  b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);

![img](KunlunBase 产品使用和评测指南/16.png)

Open another window and check the t1 table, because the commit has not been done yet; there is no data in the table t1 ![img](KunlunBase 产品使用和评测指南/17.png)

Let's go ahead and commit; commit ![img](KunlunBase 产品使用和评测指南/18.png)

Submitted successfully to insert data into the t1 table ![img](KunlunBase 产品使用和评测指南/19.png)

Whether autocommit is true or false. If there is already a running transaction in this connection when executing DDL, Klustron will submit this transaction first, and then execute this DDL

show autocommit;
set autocommit=off;
begin;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);

t1 currently has no data ![img](KunlunBase 产品使用和评测指南/20.png)

If the ddl statement is executed, the previous transaction will be committed ![img](KunlunBase 产品使用和评测指南/21.png) ![img](KunlunBase 产品使用和评测指南/22.png)

Now that the transaction has been submitted, enter commit again; it also prompts that there is no transaction to submit

commit;

![img](KunlunBase 产品使用和评测指南/23.png)

If there is an error in executing a certain DML statement in an explicit transaction, then transaction T will be automatically rolled back inside Klustron, and any DML statement sent by the user in transaction T in connection C will be ignored by Klustron until it is received in connection C After commit or rollback, Klustron will roll back transaction T.

set autocommit=off;
show autocommit;
begin;
drop table if exists t1;
create table t1(a serial primary key,  b int);
insert into t1(b) values(11);
insert into t1(c) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
commit;

![img](KunlunBase 产品使用和评测指南/24.png)

All data has been rolled back, there is no data in the t1 table ![img](KunlunBase 产品使用和评测指南/25.png)

Handle transactions in the application

Executing Transactions in Application Code

The application software only needs to send the SQL statement of an explicit transaction one by one, and be sure to submit the transaction after executing the DML statement. At the same time, in the application software code, all the logic of a transaction should be in a try block, catch all possible database exceptions, and then for any error, the rollback statement should be sent to roll back the transaction. For example:

try {
    conn.execute("begin");
    conn.execute("select*from orders where order_id=123");
    /*
      处理订单数据的代码若干行
    */
    conn.execute("update stock set remaining=456 where item_id=789");
    conn.execute("commit");
} catch (DatabaseException &e) {
    conn.execute("rollback");
}

For languages without exception handling, you can check the return value of the statement execution to determine whether it succeeded, and roll back the transaction if it fails.

Error handling during Klustron transaction execution

Note that by default, Klustron will directly set the transaction to the aborted state internally after a statement execution error, so that any subsequent DML statements will be ignored until it receives a commit or rollback statement, and Klustron will uniformly roll back the transaction, even if it receives The same is true for the commit statement. At this time, commit means that the transaction is over, and the transaction can be rolled back. This approach is exactly the same as PostgreSQL. This approach is very robust --- even if the client does not do any error handling, the transaction that performed the error will be rolled back.

This method is different from MySQL. After an error occurs in a DML statement executed in an explicit transaction, MySQL just returns an error to the client, and the user decides whether he wants to continue executing the transaction or roll it back. The client must check for errors, and if it finds an execution error, it must decide whether to roll back or continue to execute the transaction based on the error number and other information. If it continues to execute, the transaction will be committed when the commit statement is executed. If the user's business code does not check for errors, it will unknowingly submit the transaction in which this partial statement execution error occurs, even if it should have been rolled back. Therefore, this method requires the user code to check the execution result of each SQL statement and judge whether to roll back the transaction if the database system returns an error. If it is to be rolled back, it will be rolled back immediately, and the remaining SQL of the transaction cannot be executed. statement.

Starting from Klustron -1.1, this transaction processing mode of MySQL is also supported. As long as enable_stmt_subxact=on is set, Klustron will work according to this transaction processing mode of MySQL, but this has a slight performance cost.

You can also send DML statements after setting autocommit=on if the business logic allows. Use autocommit transactions only if, according to the application logic, a single such statement is indeed a logically complete operation. If executing a task (transaction) requires the execution of multiple DML statements, you should explicitly start a transaction and then execute these statements, so as to ensure that all operations of this transaction are either fully completed or completely rolled back when an error occurs, avoiding partial completion Part of the problem with failure.

For autocommit transactions, regardless of MySQL, PostgreSQL or Klustron, as long as the SQL statement fails, the autocommit statement (transaction) will be automatically rolled back by the database system, and the application side does not need to send a rollback statement; as long as the SQL statement is executed successfully, the autocommit The transaction will also be automatically committed by the database system, without sending a commit statement from the application side.

For special scenarios such as filling data, using autocommit statements or explicit transactions is fine for business logic, but for better performance, explicit transactions should be used and more rows should be inserted in each transaction. For example, if you want to import 1 billion rows of data into a table, then you can start an explicit transaction, insert 1 million rows in it and then commit it, and only insert 1 row in each insert statement. Repeat this transaction until all 1 billion rows of data have been flushed into the database.

In this specific scenario, if the injection process fails, you must want to continue the injection instead of rolling back the transaction, so you can pre-set enable_stmt_subxact=on, so that once an insert statement fails (for example, due to power failure cause the Klustron computing node or the master node of the storage cluster to go down), execute it again (only one row is inserted, so there will be no row conflicts, of course, the table must have a primary key or a unique index for this method to work). Commit the transaction after all 1 million rows have been inserted.

Error Handling of Klustron Committed Transactions

For Klustron, an error may occur when executing the commit statement due to network failure or cluster node downtime (such as power failure). When an error occurs during commit execution, the client will receive an error (exception), but there is no need to roll back the transaction. Whether the transaction is successfully committed depends on the timing of the error. For example, if an error occurs during the first stage of the distributed transaction two-phase commit algorithm, the transaction will be automatically rolled back by Klustron, and the user does not need to send a rollback statement; If an error occurs in the second phase of the two-phase commit, Klustron will commit the transaction. If a transaction branch cannot be committed at that time (for example, the master node of the storage cluster is down and the master-standby switchover occurs, and it takes several seconds to complete the switchover) , Klustron will also submit the remaining transaction branches at the earliest time, and the client does not need to send a rollback statement.

In this case, if the application wants to judge whether the transaction commit was successful, the easiest way is to check whether the update of the transaction exists in the corresponding table. However, these updates may happen to be deleted by subsequent concurrent transactions, resulting in no checks, although that transaction did successfully commit. So if there is such a need, the business side must design a more ingenious method to solve this problem.

If an error such as statement timeout or connection disconnection occurs when the application executes any SQL statement except commit, the transaction will be rolled back by Klustron, and no additional processing is required on the application side. If a statement timeout occurs or the connection is disconnected when executing commit, the processing method of Klustron is exactly the same as that described in the previous paragraph, and no other processing is required on the application side.

DDL transactions and replication

The DDL executed by the user on any computing node of the Klustron cluster, Klustron will be automatically copied to all other computing nodes in the cluster for execution; and the user can simultaneously execute DDL statements in multiple client connections connected to multiple computing nodes, Klustron It will ensure that all computing nodes of a cluster execute the DDL statements sent by the user to all computing nodes in the same order. Therefore, all computing nodes of a Klustron cluster always have the same metadata to run correctly.

When a user executes a DDL statement, Klustron runs the statement as an independent distributed transaction. If the computing node, storage node, or metadata node fails or is powered off during the running of the transaction, the distributed transaction will be automatically rolled back to clear all intermediate states, so that the user can execute the DDL statement again.

Users do not need to know the technical details and principles of DDL transaction processing, but only need to know that any computing node in a Klustron cluster can execute DDL statements and DML statements, just like using a stand-alone database.

Experience the DDL copy function

Here is an example to help users experience that the Klustron cluster does have the capability of DDL replication. For example, if a create table statement is executed on any computing node, other computing nodes in the cluster can use the table created by that create table statement.

We first need to prepare a cluster of two computing nodes for testing.

psql postgres://abc:abc@192.168.0.134:47001/postgres
psql postgres://abc:abc@192.168.0.134:47004/postgres

create

Create database kunlundb in computing node 47001; ![img](KunlunBase 产品使用和评测指南/26.png)

View in compute node 47004 ![img](KunlunBase 产品使用和评测指南/27.png)

Then create the t1 table in the 47004 node

create table t1(a int, b int);
insert into t1 values (1,2);

![img](KunlunBase 产品使用和评测指南/28.png)

The t1 table in the kunlundb database exists in the 47001 computing node ![img](KunlunBase 产品使用和评测指南/29.png)

insert

Continue to increase the data of table t1 in 47001

insert into t1 values (3,4);
insert into t1 values (5,6);

![img](KunlunBase 产品使用和评测指南/30.png)

The added data is also successfully replicated in 47004 ![img](KunlunBase 产品使用和评测指南/31.png)

update

Modify data in computing node 47004

update t1 set a='2' where a=3;
update t1 set a='3' where a=5;

![img](KunlunBase 产品使用和评测指南/32.png)

In another computing node 47001, the data is also synchronized ![img](KunlunBase 产品使用和评测指南/33.png)

delete

Delete a row of data in computing node 47001

delete from t1 where a=1;

![img](KunlunBase 产品使用和评测指南/34.png)

In another computing node 47004, the data is also synchronized ![img](KunlunBase 产品使用和评测指南/35.png)

drop

Delete table t1 in compute node 47004 ![img](KunlunBase 产品使用和评测指南/36.png)

Also in another computing node 47004 is not in table t1 ![img](KunlunBase 产品使用和评测指南/37.png)

Fullsync High Availability

Fullsync strong synchronization mechanism

Simulate standby machine failure experience fullsync

  1. Purchase a test cluster, for example, purchase one primary and two standby shards, and connect to the primary node of the shards to create database tables, insert/delete data, and other operations normally.
  2. The MySQLd process of the two standby dbs of the shard is stopped.
  3. When connecting to the shard master node to create a database table, when inserting/deleting data, an error will be reported and returned

Monitor and configure Fullsync

1、Monitoring and configuration via XPanel

Set the fullsync_consistency_level instance variable, as shown in the figure below.

![img](KunlunBase 产品使用和评测指南/38.png)

![img](KunlunBase 产品使用和评测指南/39.png)

![img](KunlunBase 产品使用和评测指南/40.png)

Obtain the fullsync_consistency_level instance variable, and the specific operation is shown in the figure below.

![img](KunlunBase 产品使用和评测指南/41.png)

![img](KunlunBase 产品使用和评测指南/42.png)

![img](KunlunBase 产品使用和评测指南/43.png)

On the cluster display page, click a certain storage node-"Enter button, and you can also view the fullsync value of the current node.

![img](KunlunBase 产品使用和评测指南/44.png)

2、Monitoring and configuration through SQL statements

Set fullsync configuration sql:

set global fullsync_consistency_level=2

View fullsync settings:

show global variables like 'fullsync_consistency_level'

Active/standby switchover

Simulate master node failure

  1. Set the master node to be read-only, execute the sql, set global read_only=true;
  2. Kill the MySQLd process of the master node/call the stop script to stop cluster_mgr. After detecting the abnormality of the master node, a timing task is triggered. If the continuous failure time of the master node exceeds the set value (default 20s), a disaster recovery switch is triggered. After the switchover, a new master is elected, and services can be provided to the outside world normally.

Active/standby switchover process record

There are two places to view the records of the active/standby switchover process:

  1. In the kunlun_metadata_db.rbr_consfailover table of the metadata cluster.
  2. In xpanel, cluster management -> cluster list information -> active/standby switching.

![img](KunlunBase 产品使用和评测指南/45.png)

Verify that the new primary is working properly and has not lost committed transactions

cluster_mgr provides a verification test program, the download address is as follows: https://github.com/zettadb/cluster_mgr/tree/main/test/suite/consfailover

Elastic scaling

XPanel

1. Elastic expansion.

Notice:

  • There must be at least two shards in a cluster to initiate capacity expansion.
  • Capacity expansion is divided into automatic capacity expansion and manual capacity expansion. The difference between automatic expansion and manual expansion: automatic expansion users do not need to manually select the shard table, and the system automatically detects the required expansion table for expansion operation; manual expansion means that the user selects and relocates the table to achieve the purpose of expansion.

(1) Automatic expansion.

Enter XPanel, click Cluster Management-"Cluster List-"Cluster List Information, enter the cluster list page, find the multi-shard cluster that has been built, and initiate automatic expansion. As shown below.

![img](KunlunBase 产品使用和评测指南/46.png)

![img](KunlunBase 产品使用和评测指南/47.png)

![img](KunlunBase 产品使用和评测指南/48.png)

![img](KunlunBase 产品使用和评测指南/49.png)

If the expansion is successful, as shown in the figure:

![img](KunlunBase 产品使用和评测指南/50.png)

If the expansion fails, as shown in the figure:

![img](KunlunBase 产品使用和评测指南/51.png)

2. Manually expand the capacity.

The specific operation is shown in the figure below.

![img](KunlunBase 产品使用和评测指南/52.png)

![img](KunlunBase 产品使用和评测指南/53.png)

![img](KunlunBase 产品使用和评测指南/54.png)

If the expansion is successful, as shown in the figure:

![img](KunlunBase 产品使用和评测指南/55.png)

If the expansion fails, as shown in the figure:

![img](KunlunBase 产品使用和评测指南/56.png)

cluster-mgr API

1、Use postman to create an rbr cluster: ![img](KunlunBase 产品使用和评测指南/57.png)

After the cluster is successfully created, connect to the computing node, such as: psql postgres://abc:abc@192.168.0.132:59701/postgres

Create a student table and write data.

2、Add shards: ![img](KunlunBase 产品使用和评测指南/58.png)

3、Initiate expansion: ![img](KunlunBase 产品使用和评测指南/59.png)

After successful expansion, connect to the master of the newly added shard, such as:

MySQL -uclustmgr -pclustmgr_pwd -h192.168.0.129 -P59413
use postgres_$$_public
show tables;(检查是否有student表)

Expected something like this: ![img](KunlunBase 产品使用和评测指南/60.png)

Failure Recovery Mechanism and Verification Method

1. Introduction to the principle of elastic expansion and contraction

The goal of elastic expansion is to migrate business tables from one shard to another shard on demand without stopping, so as to realize the horizontal expansion of data. The process has the following stages: full data export, full data import, incremental Data recovery, routing switching.

Full data export: In this stage, the table to be migrated will be logically exported using the mydumper tool, and the data will be saved on the disk.

Full data import: In this stage, the data exported in the previous stage will be imported to the target shard using the myloader tool.

Incremental data recovery: At this stage, MySQL’s master-slave replication mechanism will be used to create a MySQL master-slave synchronization channel that only synchronizes the table to be migrated according to the consistency point when mydumper exports the full mirror, and it will be named expand_id (ID is Global task id), where the source node of the channel is the source shard from which the table is migrated, and the target instance is the master node of the target shard to be expanded. After the incremental data replay is completed, without stopping the entire data synchronization channel, rename the source shard's table to be migrated, thereby blocking the source shard from reading and writing the table. When the host of the target shard also finds that the table has been renamed, the incremental data recovery phase ends, and the data synchronization channel is cleaned up, entering the route switching phase.

Routing switching phase: In the routing switching phase, all computing nodes will be notified by writing to the metadata cluster that the routing of the table has changed. After the writing is completed, restore the table on the target shard that has been renamed in the previous stage, that is, rename it to the original table name again. At this time, the table on the source shard is still in the rename state, and the table on the target shard has been restored to the original table name of the business.

For all calculation points, if the route has been updated, if there is business accessing the migration table at this time, it will be routed to the target shardd. If the computing node has not been updated to the latest routing changes when the business request arrives, and still accesses the original shardd, the access will fail because the table name on the source shard is still in the rename state. , the access can be successful.

2. Failure scenarios and recovery mechanisms

The main problem to be solved by the fault recovery mechanism is: when a fault occurs during the expansion process and the expansion process is interrupted, how to clean up the data in the intermediate state or how to continue the unfinished expansion task after the service is restored.

When the task is interrupted after the full data export is completed: the task can be re-initiated at this time.

When the task is interrupted after the full amount of data is imported: at this time, the task can be re-initiated

When the task is interrupted after incremental data recovery : The task is rolled back directly, and all intermediate state data is cleared.

Cluster data backup and recovery

1、Create an rbr cluster: ![img](KunlunBase 产品使用和评测指南/61.png)

This cluster is used as the source cluster; after the cluster is successfully created, connect to the computing node, such as: psql postgres://abc:abc@192.168.0.132:59701/postgres

Create student table and write data

2、Initiate a backup operation: (need to ensure that the hdfs server has been started)

![img](KunlunBase 产品使用和评测指南/62.png) Record the recovery time under hdfs, such as: 2022-08-23 13:52 ![img](KunlunBase 产品使用和评测指南/63.png)

3、Create another cluster: The specifications must be consistent with the cluster in step 1. Refer to step 1 as the target cluster.

4、Initiate a recovery operation: ![img](KunlunBase 产品使用和评测指南/63.png)

After the recovery is successful, link the computing nodes of the cluster in step 3, such as: psql postgres://abc:abc@192.168.0.129:59701/postgres Check whether the student table is synchronized to the target cluster.

XPanel Configuration and Initiating Backups

1、After the cluster_mgr installation is complete, the system will automatically report the backup storage directory, and you can view the list of related backup storage directories on the backup storage target management page of XPanel. As shown below.

![img](KunlunBase 产品使用和评测指南/65.png)

2、Build the vito cluster.

![img](KunlunBase 产品使用和评测指南/66.png)

3、Log in to computing node 47001 in the vito cluster to create a new table and add two pieces of data.

![img](KunlunBase 产品使用和评测指南/67.png)

4、Initiate a full backup of the vito cluster, as shown in the figure below.

![img](KunlunBase 产品使用和评测指南/68.png)

5、Go to the cluster backup list and view the backup result information, as shown in the figure below.

![img](KunlunBase 产品使用和评测指南/69.png)

XPanel initiates cluster recovery

1、Perform recovery operations on the backed up vito cluster. Create a new cluster vito3 as the cluster to restore the backup. As shown below.

![img](KunlunBase 产品使用和评测指南/70.png)

2、The cluster computing node of vito3 is 47003. Log in to the node to see that there are no tables.

![img](KunlunBase 产品使用和评测指南/71.png)

![img](KunlunBase 产品使用和评测指南/72.png)

3、Restore the previous backup of the vito cluster in the vito3 cluster.

![img](KunlunBase 产品使用和评测指南/73.png)

![img](KunlunBase 产品使用和评测指南/74.png)

![img](KunlunBase 产品使用和评测指南/75.png)

4、After the rollback is successful, log in to computing node 47003 of the vito3 cluster to check and verify whether the t2 table exists. The figure below indicates that the cluster recovery is successful.

![img](KunlunBase 产品使用和评测指南/76.png)

Other cluster management functions API and XPanel operations

Add storage cluster

1、Use XPanel to grow storage clusters. The specific operation is shown in the figure below.

![img](KunlunBase 产品使用和评测指南/77.png)

![img](KunlunBase 产品使用和评测指南/78.png)

![img](KunlunBase 产品使用和评测指南/79.png)

Add and delete computing nodes

1、Use XPanel to add computing nodes. The operation is shown in the figure below.

![img](KunlunBase 产品使用和评测指南/80.png)

![img](KunlunBase 产品使用和评测指南/81.png)

![img](KunlunBase 产品使用和评测指南/82.png)

![img](KunlunBase 产品使用和评测指南/83.png)

2、Use XPanel to delete computing nodes. As shown below.

![img](KunlunBase 产品使用和评测指南/84.png)

![img](KunlunBase 产品使用和评测指南/85.png)

![img](KunlunBase 产品使用和评测指南/86.png)

![img](KunlunBase 产品使用和评测指南/87.png)

![img](KunlunBase 产品使用和评测指南/88.png)

Redo the standby node

1、Use XPanel to redo the standby node. The following is to redo the standby node 57001 of the shard1 storage cluster in the test1 cluster, and the specific operation is shown in the figure below.

![img](KunlunBase 产品使用和评测指南/89.png)

![img](KunlunBase 产品使用和评测指南/90.png)

![img](KunlunBase 产品使用和评测指南/91.png)

![img](KunlunBase 产品使用和评测指南/92.png)

Other cluster management function api:

1、Use postman to create an rbr cluster: ![img](KunlunBase 产品使用和评测指南/93.png)

The returned content is as follows: the ![img](KunlunBase 产品使用和评测指南/94.png) same below, and the jobid will change accordingly.

2、Add comps: ![img](KunlunBase 产品使用和评测指南/95.png)

3、Add shards: ![img](KunlunBase 产品使用和评测指南/96.png)

4、Add nodes: ![img](KunlunBase 产品使用和评测指南/97.png)

5、Delete comps: ![img](KunlunBase 产品使用和评测指南/98.png)

6、Delete shards: ![img](KunlunBase 产品使用和评测指南/99.png)

7、Delete nodes: ![img](KunlunBase 产品使用和评测指南/100.png)

8、To redo the shard standby machine, it ![img](KunlunBase 产品使用和评测指南/101.png) is necessary to ensure that the hdfs server is running normally before it can be successful.

9、Check the status of clustmgr processing api: ![img](KunlunBase 产品使用和评测指南/102.png)

10、Delete the cluster: ![img](KunlunBase 产品使用和评测指南/103.png)

View cluster node logs

Preconditions

1、At present, the kunlun cluster supports configuring ES to collect the output logs of each module. When building a cluster (boostrap), you can choose whether to install ES. If you choose to install it, it will automatically complete the log collection of management modules, computing nodes, and storage nodes.

View logs in kibana: View the index reported by filebeat ![img](KunlunBase 产品使用和评测指南/104.png)

Configure kibana index pattern ![img](KunlunBase 产品使用和评测指南/105.png)

kibana discover page operation ![img](KunlunBase 产品使用和评测指南/106.png)

Currently, logs are reported at the machine level. There may be management modules, computing nodes, or storage nodes on the machine. Filtering rules can be configured on the kibana page to view logs of different modules.

2、If the cluster is not configured with ES, you need to log in to the cluster where each module is located to view the logs.

View cluster node status

Use XPanel to view status change records

1、View the status of cluster nodes in XPanel, as shown in the figure below.

![img](KunlunBase 产品使用和评测指南/107.png)

![img](KunlunBase 产品使用和评测指南/108.png)

error report

find the core file

Users need to set up on their own linux machines:

  1. ulimit -c unlimited
  2. sysctl -w kernel.core_pattern=core.%e.%p.%t The core file is generated in the program running directory, which requires root privileges

If the core text appears, use gdb to print the call stack gdb -c core file bin file Enter, enter bt and press Enter

END