Skip to content

SQL工具类(sqlutils)模块JAVA版.html

Alex.Mo edited this page Jun 21, 2019 · 2 revisions
  • 一、SQL引擎模块概述及依赖说明java版

概述:
本部分主要介绍了处理表之间的复杂操作和sql类的数据统计查询要用到SQL工具类。

开发注意事项:

1.导入依赖Maven pom文件

        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.11.0</version>
        </dependency>

2.导入客户端类文件:

com.dksou.fitting.sqlutils.service.DKSQLConf.java
com.dksou.fitting.sqlutils.service.DKSQLEngine.java
com.dksou.fitting.sqlutils.service.ResultEntity.java
  • 二、基于Hive的方法

  1. Hive的SQL执行方法

    方法签名

    ResultEntity executeHQL(String hostIp, String port, String username, String password, Stringdatabase, String sql, String queueName, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp:Hive连接ip地址
    port:Hive连接端口
    username:用户名
    password:密码
    database:要连接的数据库
    sql:要执行的sql语句
    queueName:指定MapReduce作业提交到的队列(为mapreduce.job.queuename此参数设置)
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
        String port = "10000";
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
    
        }
    
        /**
         * 测试HIVE SQL 执行一些无返回的SQL语句:DDL或者部分DML语句
         *
         * @throws Exception
         */
        @Test
        public void testExecuteHQL() throws Exception {
            String database = "default";
            //create database:
            String sql = "create database if not exists school2 comment  '学校数据仓库'";
            //create table:
            //String sql = "create table if not exists ViewRecord(ViewId string,ViewIp string,ViewPageUrl string,ViewTime string) comment 'website online record' row format delimited fields terminated by ',' STORED AS TEXTFILE";
            //load data:
            //String sql = "load data local inpath '/root/teacher.txt' overwrite into table teacher";
            //String sql = "";
            String queueName = "";
            ResultEntity result = client.executeHQL(hive_hostIp, port, username, password, database, sql, queueName, dkSqlConf);
            System.out.println(result.getMessage());
            System.out.println(result.getResult());
        }
    }
  2. Hive的数据查询语句

    方法签名

    ResultEntity excuteQueryHQL(String hostIp, String port, String username, String password, String database, String sql, String orderBy, int startRow, int endRow, String queueName, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp    hive 的连接IP地址
    port      hive的连接端口
    username  用户名
    password  密码
    database  要连接的数据库
    sql       要执行的sql
    orderBy   排序字段
    startRow  开始行
    endRow    结束行
    queueName 指定MapReduce作业提交到的队列(为mapreduce.job.queuename此参数设置)
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
        String port = "10000";
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
        }
    @Test
    public void testExcuteQueryHQL() throws Exception {
        String database = "default";
        String sql = "select * from student";
        String orderBy = "id";
        int startRow = 1;
        int endRow = 10;
        String queueName = "";
        ResultEntity result = client.excuteQueryHQL(hive_hostIp, port, username, password, database, sql, orderBy, startRow, endRow, queueName, dkSqlConf);
        System.out.println(result.getMessage());
        System.out.println(result.getResult());
      }
    }
  3. Hive的表记录数查询

    方法签名

    ResultEntity countHQL(String hostIp, String port, String username, String password, String database, String sql, String queueName, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp:hive的连接IP地址
    port:hive的连接端口
    username:用户名
    password:密码
    database:要连接的数据库
    sql:要执行的sql
    queueName:指定MapReduce作业提交到的队列(为mapreduce.job.queuename此参数设置)
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
        String port = "10000";
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
    }
    /**
     * Hive表记录数统计
     *
     * @throws Exception
     */
    @Test
    public void testCountHQL() throws Exception {
    
        String database = "default";
        String sql = "select count(1) as total from student";
        String queueName = "a";
        ResultEntity result = client.countHQL(hive_hostIp, port, username, password, database, sql, queueName, dkSqlConf);
        System.out.println(result.getMessage());
        System.out.println(result.getResult());
      }
    }
  • 三、基于Impala的方法

  1. Impala的SQL执行方法

    方法签名

    ResultEntity excuteISQL(String hostIp, String port,String database, String sql, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp:Impala连接ip地址
    port:Impala连接端口
    database:要连接的数据库
    sql:要执行的sql语句
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
    String impala_port = "21050";
        String port = "10000";
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
    
        }
        /**
         * 测试IMPALA SQL 执行一些无返回的SQL语句:DDL或者部分DML语句
         *
         * @throws Exception
         */
        @Test
        public void testExcuteISQL() throws Exception {
            String database = "default";
            String sql = "create if not exists database school";
            // String sql = "create table if not exists teacher(id int,name string,last_mod string) comment '教师信息' row format delimited fields terminated by ',' STORED AS TEXTFILE";
    //        dkSqlConf.PRINCIPAL = "impala/[email protected]";
    //        dkSqlConf.KEYTAB = "/root/impala.keytab";
            ResultEntity result = client.excuteISQL(impala_hostIp,impala_port,database, sql, dkSqlConf);
            System.out.println(result.getMessage());
            System.out.println(result.getResult());
        }
  2. Impala的数据查询

    方法签名

    ResultEntity excuteQueryISQL(String hostIp, String port, String database, String sql, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp   Impala的连接IP地址
    port     Impala连接端口
    database 要连接的数据库
    sql      要执行的sql
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
    String impala_port = "21050";
        String port = "10000";
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
    
        }
    @Test
    public void testExcuteQueryISQL() throws Exception {
        String database = "default";
        String sql = "select * from student";
        dkSqlConf.PRINCIPAL = "impala/[email protected]";
        dkSqlConf.KEYTAB = "/root/impala.keytab";
        ResultEntity result = client.excuteQueryISQL(impala_hostIp, impala_port, database, sql, dkSqlConf);
        System.out.println(result.getMessage());
        System.out.println(result.getResult());
      }
    }
  3. Impala的表记录数查询

    方法签名

    ResultEntity countIQL(String hostIp, String port, String database, String sql, DKSQLConf dkSqlConf)

    返回

    ResultEntity对象,通过status属性获取执行结果状态,通过message属性获取成功或异常信息,通过result属性获取执行结果

    方法参数说明

    hostIp   Impala的连接IP地址
    port     Impala的连接端口
    database 要连接的数据库
    sql:要执行的sql
    dkSqlConf:
    KRB5_CONF:krb5.conf文件路径配置
    PRINCIPAL:principal主体名称配置
    KEYTAB:keytab文件路径配置
    

    程序清单

    package com.dksou.fitting.sqlutils.test;
    
    
    import com.dksou.fitting.sqlutils.service.DKSQLEngine;
    import com.dksou.fitting.sqlutils.service.DKSQLConf;
    import com.dksou.fitting.sqlutils.service.ResultEntity;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TMultiplexedProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.Before;
    import org.junit.Test;
    
    public class DKSQLEngineClientTest {
        String serverIP = "localhost";
        int serverPort = 9871;
        TTransport transport;
        DKSQLEngine.Client client;
        DKSQLConf dkSqlConf;
        //cdh平台
        String hive_hostIp = "192.168.1.73";
        String impala_hostIp = "192.168.1.166";
        String port = "10000";
    String impala_port=21050;
        String username = "root";
        String password = "123456";
    
        @Before
        public void open() throws TTransportException {
            transport = new TSocket(serverIP, serverPort);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            TMultiplexedProtocol tMultiplexedProtocol = new TMultiplexedProtocol(protocol, "SQLEngineService");
            client = new DKSQLEngine.Client(tMultiplexedProtocol);
    
            dkSqlConf = new DKSQLConf();
            //如果需要支持kerberos安全组件,需配置下面三个属性
            //dkSqlConf.KRB5_CONF = "/etc/krb5.conf";
            //dkSqlConf.PRINCIPAL = "hive/[email protected]";
            //dkSqlConf.KEYTAB = "/root/hive.keytab";
    
        }
    /**
     * Impala表记录数统计
     *
     * @throws Exception
     */
    @Test
    public void testCountIQL() throws Exception {
        String database = "default";
        String sql = "select count(*) from student";
        dkSqlConf.PRINCIPAL = "impala/[email protected]";
        dkSqlConf.KEYTAB = "/root/impala.keytab";
        ResultEntity result = client.countIQL(impala_hostIp, impala_port, username, password, database, sql, dkSqlConf);
        System.out.println(result.getMessage());
        System.out.println(result.getResult());
      }
    }

<上一章-Rdmbs或hdfs数据导入至elasticsearch> <下一章-实时流(stream)模块JAVA版>

<回到-主页><回到-用户开发使用手册java版>

Clone this wiki locally