Skip to main content

Klustron Online Table Migration and Multi-Compute Node Application Test

KlustronAbout 6 min

Klustron Online Table Migration and Multi-Compute Node Application Test

Note:

Unless specifically stated otherwise, the version numbers mentioned in the text can be substituted with any released version number. For a list of all released versions, please visit: http://doc.klustron.com/zh/Release_notes.html

Objective of This Article:

This article primarily explores the execution of applications on multiple compute nodes, while concurrently migrating some partitions of the table to a newly added Shard. The ultimate aim is to verify that Klustron's online table migration process has negligible impact on applications distributed across multiple nodes.

All tests are conducted via the XPanel console and PostgreSQL client to connect to the database cluster. The XPanel service is installed on a server with the IP address 192.168.56.112. To access it, open a browser on a device that can reach 192.168.56.112 and navigate to: http://192.168.56.112:18080/KunlunXPanel/#/login?redirect=%2Fdashboard

The default login username and password are: super_dba/super_dba. You are required to change the super_dba password upon your first login.

Once logged in, the homepage is displayed as follows:

01 Creating a Cluster

1.1 Click on “Cluster Management” and then “Cluster List”. In the cluster list interface, click the “Create” button.

1.2 Enter the cluster parameters as shown in the following diagram:

1.3 Click “Confirm” to check the status of the cluster creation task.

1.4 Once all statuses indicate “Running”, the cluster creation is complete.

02 Preparing Test Data

2.1 Open an SSH terminal window connected to the kunlundb_cluster (Compute Node IP: 192.168.56.112) and log in as the kunlun user. Then, connect to the cluster database using the PostgreSQL client.

[root@kunlun1 ~]# su - kunlun
[kunlun@kunlun1 ~]$ source /kunlun/env.sh
[kunlun@kunlun1 ~]$ psql -h 192.168.56.112 -p 47001 -U abc postgres

2.2 Create a test table in the database and insert test data.

postgres=# create table prod_part (id int primary key, name char(8)) partition by hash(id);
postgres=# create table prod_part_p1 partition of prod_part for values with (modulus 6, remainder 0);
postgres=# create table prod_part_p2 partition of prod_part for values with (modulus 6, remainder 1);
postgres=# create table prod_part_p3 partition of prod_part for values with (modulus 6, remainder 2);
postgres=# create table prod_part_p4 partition of prod_part for values with (modulus 6, remainder 3);
postgres=# create table prod_part_p5 partition of prod_part for values with (modulus 6, remainder 4);
postgres=# create table prod_part_p6 partition of prod_part for values with (modulus 6, remainder 5);
postgres=# insert into prod_part select i,'text'||i from generate_series(1,300) i;

2.3 Check the distribution of the data:

postgres=# analyze prod_part;
postgres=# select relname table_name ,reltuples num_rows, name shard_name from pg_class t1,pg_shard t2 where t1.relshardid = t2.id and t1.reltype<>0 and t1.relname like 'prod_part%';

This shows that all partitions of the prod_part table, including prod_part_p1, prod_part_p2, prod_part_p3, prod_part_p4, prod_part_p5, and prod_part_p6, are stored on the storage node shard_1.

03 Expanding Storage Shard Nodes in the Cluster

3.1 Adding Storage Shard Nodes

3.1.1 In the cluster list, click “Settings” or the “+” button for the cluster that needs additional storage shard nodes.

3.1.2 In “Cluster Settings”, click on “Shard List”, and then click the “Add Shard” button.

3.1.3 Enter the parameters for adding a new shard node as illustrated.

3.1.4 Click “Confirm” to track the progress of adding the shard node.

3.1.5 Confirm that the new storage shard node, shard_2, is successfully added to the cluster.

3.2 Scaling Up Storage Shard Nodes

3.2.1 Open two database access windows, each with a Python script ready to interact with the database. The first window should prepare the pyprod1.py script to access Compute Node 1 (which in this case is the server at 192.168.56.112). Continuously perform select queries on the cluster database while expanding the storage shard nodes. The content of the pyprod1.py script is as follows:

import psycopg2.extras
from psycopg2 import DatabaseError
import time
import datetime

conn = psycopg2.connect(database='postgres',user='abc',
                 password='abc',host='192.168.56.112',port='47001')
select_sql = ''' select * from prod_part where id=%s; '''

i = 1
try:
    while (i <= 1000) :
        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(select_sql,[i])
        res = cursor.fetchall()
        print(dict(res[0]))
        current_datetime = datetime.datetime.now()
        print("Select current date and time:", current_datetime)
        if (i == 1000) :
            i = 1
        else :
            i = i+1
        cursor.close()
        conn.commit()
        time.sleep(1)
except (Exception, DatabaseError) as e:
    print(e)
    input('Press any key and Enter to continue ~!')
    conn = psycopg2.connect(database='postgres', user='abc',
                            password='abc', host='192.168.56.112', port='47001')
    select_sql = ''' select * from prod_part where id=%s; '''
    while (i <= 1000):
        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(select_sql, [i])
        res = cursor.fetchall()
        print(dict(res[0]))
        current_datetime = datetime.datetime.now()
        print("Select current date and time:", current_datetime)
        if (i == 1000):
            i = 1
        else:
            i = i + 1
        cursor.close()
        conn.commit()
        time.sleep(1)
finally:
    conn.close()

3.2.2 The second window should prepare the pyprod2.py script to access Compute Node 2 (which in this case is the server at 192.168.56.113). Continuously perform update operations on the cluster database while expanding the storage shard nodes. The content of the pyprod2.py script is as follows:

import psycopg2.extras
from psycopg2 import DatabaseError
import time
import datetime

conn = psycopg2.connect(database='postgres',user='abc',
                 password='abc',host='192.168.56.113',port='47001')
update_sql = ''' update prod_part set name='test' where id=%s; '''

i = 1
try:
    while (i <= 1000) :
        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(update_sql,[i])
        current_datetime = datetime.datetime.now()
        print("Update current date and time:", current_datetime)
        if (i == 1000) :
            i = 1
        else :
            i = i+1
        cursor.close()
        conn.commit()
        time.sleep(1)
except (Exception, DatabaseError) as e:
    print(e)
    input('Press any key and Enter to continue ~!')
    conn = psycopg2.connect(database='postgres', user='abc',
                            password='abc', host='192.168.56.113', port='47001')
    update_sql = ''' update prod_part set name= 'test' where id=%s; '''
    while (i <= 1000):
        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        cursor.execute(update_sql, [i])
        current_datetime = datetime.datetime.now()
        print("Update current date and time:", current_datetime)
        if (i == 1000):
            i = 1
        else:
            i = i + 1
        cursor.close()
        conn.commit()
        time.sleep(1)
finally:
    conn.close()

3.2.3 In the first command line, run the pyprod1.py script to continuously perform select queries on the database.

[kunlun@kunlun1 scripts]$ python pyprod1.py

3.2.4 Simultaneously, in the second window, run the pyprod2.py script to continuously execute update operations on the database.

[kunlun@kunlun1 scripts]$ python pyprod2.py

3.2.5 In the cluster list, click the “Scale In/Out” button for the cluster that requires partition transfer between shards.

3.2.6 In the cluster expansion interface, select the database to be expanded from the “Select Database” dropdown list, and click “Confirm”. For instance, the “postgres” database in this example.

Select the tables to be moved from shard_1 to shard_2, here choosing the partitioned tables prod_part_p4, prod_part_p5, and prod_part_p6; choose “No” for “Keep Original Table?”, select “shard_2” as the target shard, and then click “Submit”.

3.2.8 Click the “Confirm” button on the cluster expansion confirmation screen to start the expansion process.

3.2.9 The expansion process completes successfully.

3.2.10 During the expansion process, the select queries continuously have access to the tables being moved.

3.2.11 During the expansion process, the update operations continuously have access to the tables being moved.

3.3 Data Verification After Cluster Expansion

3.3.1 Open an SSH terminal connected to kunlundb_cluster (Compute Node IP: 192.168.56.112), log in as 'kunlun', and connect to the cluster via the PostgreSQL client to run data queries.

[root@kunlun1 ~]# su - kunlun
[kunlun@kunlun1 ~]$ source /kunlun/env.sh
[kunlun@kunlun1 ~]$ psql -h 192.168.56.112 -p 47001 -U abc postgres

3.3.2 Review the distribution of data.

postgres=# analyze prod_part;
postgres=# select relname table_name ,reltuples num_rows, name shard_name from pg_class t1,pg_shard t2 where t1.relshardid = t2.id and t1.reltype<>0 and t1.relname like 'prod_part%';

Connecting to Compute Node 192.168.56.112 shows that partitions prod_part_p1, prod_part_p2, and prod_part_p3 of the partitioned table prod_part are stored on storage node shard_1, while prod_part_p4, prod_part_p5, and prod_part_p6 partitions have been expanded to storage node shard_2.

3.3.3 Open another SSH terminal window connected to the kunlundb_cluster (Compute Node IP: 192.168.56.113), log in as the kunlun user, and then use the PostgreSQL client to connect to the cluster and execute data query commands.

[root@kunlun1 ~]# su - kunlun
[kunlun@kunlun1 ~]$ source /kunlun/env.sh
[kunlun@kunlun1 ~]$ psql -h 192.168.56.113 -p 47001 -U abc postgres

3.3.4 Check the data distribution.

postgres=# analyze prod_part;
postgres=# select relname table_name ,reltuples num_rows, name shard_name from pg_class t1,pg_shard t2 where t1.relshardid = t2.id and t1.reltype<>0 and t1.relname like 'prod_part%';

Connecting to Compute Node 192.168.56.113 shows a similar distribution: partitions prod_part_p1, prod_part_p2, and prod_part_p3 are on storage node shard_1, and partitions prod_part_p4, prod_part_p5, and prod_part_p6 have been expanded to storage node shard_2.

The metadata of the partitioned table prod_part as seen on Compute Node 1 (192.168.56.112) and Compute Node 2 (192.168.56.113) is consistent.

This completes the multi-compute node operations on the tables while simultaneously migrating them as part of the test.

END