基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

我是创始人李岩:很抱歉!给自己产品做个广告,点击进来看看。  

基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

作者:吴敏达

机器学习 (Machine Learning, ML) 是计算机可以学习而不需要事先编程的能力。 由于数字数据的广泛增长和大数据的计算能力提高,机器学习的时代已经到来。 Apache SystemML 是由 IBM 开源的机器学习系统,现在是 Apache 顶级项目,它所具备的能力在机器学习领域独领风骚。 IBM BigInsights 是业界领先的 Hadoop 企业级发行版本,在 IBM BigInsights 发行版中已经包含了 Apache SystemML 用于大数据平台的机器学习。 本文阐述了 Apache SystemML 的原理和基于 IBM BigInsights 的机器学习实践。

机器学习和 Apache SystemML

机器学习是让计算机从数据中学习的科学和艺术。换句话说,可以训练计算机来了解数据科学家创建的模型。该计算机将使用算法从其接收的数据中迭代学习,并发现该数据中的模式。当新数据进入时,计算机可以根据以前学习的模式进行预测。例如,像亚马逊和 Netflix 这样的公司利用机器学习算法来分析客户的历史产品购买数据或他们观看过的电影。亚马逊然后可以向您推荐新产品,Netfix 可以推荐您可能感兴趣的电影。另外一个例子,通过收集社交媒体情绪数据,零售商可以更多地了解客户的购买习惯,从而提供更令人满意的购物体验。还比如,随着搜索引擎收集越来越多的搜索和选择数据,引擎可以提取与其用户相关的更准确的信息,然后提供更相关的搜索结果,其原理是通过大量分析数据可以让您看到通常可能看不到的内容,从而更好地为客户服务。

ML 是机器学习 Machine Learning 的缩写,所以 SystemML 显而易见是机器学习系统,由 IBM 的 Almaden 实验室 10 年前开发。它用 Java 语言编写,可支持描述性分析、分类、聚类、回归、矩阵分解及生存分析等机器学习算法。IBM 人工智能 Waston 平台就整合了 SystemML 的功能,例如 SystemML 用于 Watson 医疗用于预测治疗结果的机器学习算法,精确度大幅度提高。

SystemML 在 2015 年由 IBM 开源,于 2015 年 8 月 27 日在 GitHub 上公开发布,并于 2015 年 11 月 2 日成为 Apache Incubator 孵化项目。Apache SystemML 作为开源大数据机学习平台受到广泛认可,在 Cadent Technology 和 IBM Watson Health 等客户实践中备受赞誉。Apache Software Foundation 在 2017 年 5 月 31 日宣布将 Apache SystemML 孵化毕业,自此成为 Apache 顶级项目。目前 SystemML 作为 Apache 顶级项目的最新版本是 0.14,支持 Spark 2.x。Apache SystemML 在 2016 年被 datamation.com 列为 15 款开源人工智能软件之一。在部署方面, SystemML 运行环境支持单机和分布式部署。单机部署显然有利于本地开发的工作,而分布式部署则可以真正发挥机器学习的威力,支持的框架包括 Hadoop 和 Spark。

Apache SystemML 目前支持的机器学习算法有:

  1. 描述性统计 Descriptive Statistics该类中的算法用于描述数据集的主要特征。它们提供了对不同观察或数据记录计算的有意义的摘要收集在研究中。这些摘要通常构成初步数据探索的基础,作为其中的一部分更广泛的统计分析。
    • 单变量统计 Univariate Statistics
    • 双变量统计 Bivariate Statistics
    • 分层双变量统计 Stratified Bivariate Statistics
  2. 分类 Classification该类中的算法用于基于一些预定义的类或对象对数据进行分组。这是监督学习的特点。分类算法的一个例子是将社交媒体的评论分为正面评价,负面评价或中立评价。
    • 多项 Logistic 回归 Multinomial Logistic Regression
    • 支持向量机 Support Vector Machines
      • 二进制类支持向量机 Binary-Class Support Vector Machines
      • 多类支持向量机 Multi-Class Support Vector Machines
    • 朴素贝叶斯 Naive Bayes
    • 决策树 Decision Trees
    • 随机森林 Random Forests
  3. 聚类 Clustering聚类是一种无监督的学习类算法。数据集中没有预定义的类 – 算法在数据中找到关系。聚类算法将数据排列或聚类成若干数量的逻辑组。例如,确定商店客户的购买模式。
    • K 均值聚类 K-Means Clustering
  4. 回归 Regression回归是另一类监督学习算法。该数据集中的目标变量是连续的。股票市场预测是回归算法的一个例子。这里的股票价格是目标变量,或者是我们想预测的,而且每天都有变化。
    • 线性回归 Linear Regression
    • 逐步线性回归 Stepwise Linear Regression
    • 广义线性模型 Generalized Linear Models
    • 逐步广义线性回归 Stepwise Generalized Linear Regression
    • 回归计分与预测 Regression Scoring and Prediction
  5. 矩阵分解 Matrix Factorization矩阵分解算法用于发现嵌入在不同实体之间的交互中的潜在特征。它们利用多个矩阵,当它们相乘时,生成一个类似于原先矩阵的新矩阵。亚马逊和 Netflix 使用矩阵因式分解算法来提出产品建议。例如每行代表您的一个客户,每列表示您的一个产品,矩阵是大而稀疏的。因此,每个单元代表由特定客户购买的特定产品。该矩阵首先填充历史数据,然后将原始矩阵分解为”产品因素”和”客户因素”两个因素。通过将这两个因子相乘在一起,我们产生添加到矩阵中的新的非零值。这些新的非零值表示产品建议。
    • 主成分分析 Principal Component Analysis
    • 通过交替最小化完成矩阵 Matrix Completion via Alternating Minimizations
  6. 生存分析 Survival Analysis生存分析检查感兴趣的特定事件发生所需的时间。换句话说,它们用于估计生存概率。 例如,在医学研究中,原型的这种事件是患者的死亡,但是该方法可以应用于其他应用领域,例如在心理实验中完成个人的任务或者在工程中的电气部件的故障。
    • Kaplan-Meier 生存分析 Kaplan-Meier Survival Analysis
    • Cox 比例风险回归模型 Cox Proportional Hazard Regression Model

Apache SystemML 特点和架构

Apache SystemML 具备两种非常的能力在机器学 习领 域独 领风骚 。声明式机器学 习 ( Declarative Machine Learning 简称 DML ) 使表达 ML 算法更容易和更自然。 算法可以用 类 似 R 的 语 法或 类 Python 语法来表示。 DML 通 过 提供表达自定 义 分析的完全灵活性以及与底 层输 入格式和物理数据表示形式的数据独立性, 显 着提高了数据科学家的生 产 力。其次, Apache SystemML 根据数据和集群特性提供自 动优 化,以确保效率和可 扩 展性。 Apache SystemML 为使用大数据的机器学习提供了最佳性能。它可以在 MapReduce 或 Spark 环境中运行,它可以自动优化并实现性能扩展,自动确定算法是在单机还是在集群上运行。

当对小数据量进行机器学习时,数据科学家可以利用 R 或 Python 编写专为数据解读而设计的高级代码。该代码可以在单机上上运行,返回给数据科学家的结果可能不是预期的,而是一个迭代的结果,然后开始修改代码和重新评估结果的过程,这一直持续到科学家认为结果是可以接受的,这种方法适用于小数据量。对于大数据的情况,则需要采用 Hadoop 或者 Spark 计算机集群,在这种情况下,数据科学家像以前一样写高级代码,但必须依靠程序员将代码重新实现或转换为分布式平台的代码,这经常需要消耗大量的时间和精力。进行第一次迭代的结果被传回给数据科学家,与小数据一样,结果不太可能是数据科学家预期,并且需要对算法进行一些调整。取决于算法的复杂性和要分析的数据量,迭代可能需要几天甚至几周才能运行,每个代码重写和重新转换都容易产生错误。

如图 1 所示,SystemML 的作用是翻译数据科学家的代码为可扩展的可执行程序,这有利于大大减少每次迭代的运行时间,同时 SystemML 将性能和可扩展性结合在一起,代码量远远小于其他方式。

基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

图 1. Apache SystemML 是基于大数据的解决方案

为了说明 Apache SystemML 的性能,比较了三种不同的稀疏集合:小数据量,中等数据规模和大量数据。小数据量包含 1.2 GB 数据,中等数据量包含 12 GB 数据,大量数据 120 GB 数据,在 6 节点计算机集群上分别用 R、Spark MLlib 和 SystemML 运行同样的算法。首先,R 代码需要超过一天多的时间才能最终达成小数据量的计算, 由于内存 不足错误,R 从未完成在中型和大型数据集上的运行,显而易见 R 不是为大数据分析设计的。Spark MLlib 在小数据和中等数据情况表现很好, 可以却花 费 了超过一天的 时间 来运行大数据集。 Apache SystemML 超越了其他方案,在各种数据量下均能快速完成分析任务, 如图 2 所示。

基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

图 2. Apache SystemML 的性能

Apache SystemML 能实现大数据的机器学习的能力和高性能源于 SystemML 优化器,用来自动执行关键的性能决策,分布式还是本地计算? 如何进行数据分片? 是否需要磁盘和内存交互? Apache SystemML 支持分布式和本地的混合运算,SystemML 优化器可以支持 Spark Driver 中的多线程计算,Spark Executors 中的分布式计算以及优化器进行基于成本的选择。如图 3 所示,优化器的输入是算法,输出则是生成的分布式计算代码。优化器分为三个部分,语言层、高层次操作层(HOP)、低层次操作层(LOP)。

  • 语言层完成了三种不同的操作:解析,变量分析和验证,输入代码被分成基本块,然后在适用的地方进行优化。
  • 高层次操作层(HOP)创建表示块的数据流图,根据数据统计信息确定作业分配。优化器从基于内存和成本估算的替代执行计划中选择,并确定操作符的顺序和选择,选择分布式、本地或混合运算方式。SystemML 有一个广泛的重写库,这些重写用于优化代码。
  • 低层次操作层(LOP)生成物理执行计划,进一步优化 Spark、Map-Reduce 的作业。

基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

图 3. Apache SystemML 的架构和优化

Apache SystemML 执行模式

Apache SystemML 提供了多种执行模式,数据科学家可以在单机上开发一个算法,然后进行扩展,使用 Spark 或 Hadoop 将该算法用于分发群集。Apache SystemML 的执行模式分为以下五种,鉴于 SystemML 的主要目的是在大型分布式数据集上执行机器学习,调用 SystemML 的两个最重要的方法是 Hadoop Batch 和 Spark Batch 模式。

Spark MLContext

Spark MLContext API 提供了一个编程接口,用于使用 Scala,Java 和 Python 等语言从 Spark 与 SystemML 进行交互。 因此,它提供了一种方便的方式来与 Spark Shell 和 Notebook (如 Jupyter 和 Zeppelin)进行交互。

Spark Batch

Spark Batch 模式可以使用 spark-submit SystemML.jar 在批处理模式下调用 SystemML,调用的 DML 脚本在 -f 参数后面指定。

Hadoop Batch

Hadoop Batch 模式可以使用 hadoop jar SystemML.jar 在批处理模式下调用 SystemML,调用的 DML 脚本在 -f 参数后面指定。

Standalone

SystemML 的独立模式旨在允许数据科学家在单个机器上快速原型算法。 在独立模式下,所有操作均发生在非 Hadoop 环境中的单个节点上。 独立模式不适用于大型数据集。对于大规模生产环境,SystemML 算法执行可以使用 Apache Hadoop 或 Apache Spark 分布在多节点集群中。

JMLC

Java 机器学习连接器(Java Machine Learning Connector 简称 JMLC) API 是用于以嵌入式方式与 SystemML 交互的编程接口。为了使用 JMLC,由于 JMLC 在现有的 Java 虚拟机中调用了 SystemML,所以需要在 Java 应用程序的类路径中包含 SystemML jar 文件。这种可嵌入性使得 SystemML 成为生产流程的一部分,用于诸如评分等任务。JMLC 的主要目的是作为一个评分 API,您的评分功能使用 SystemML 的 DML (声明式机器学习)语言表达。在相当少量的输入数据上,单个 JVM 上的单个计算机上产生相对较小量的输出数据。

由于启动成本,往往是最佳做法做批量打分,例如一次记录 1000 条记录。对于大量数据,建议以 SystemML 的分布式模式(如 Spark 批处理模式或 Hadoop 批处理模式)运行 DML,以利用 SystemML 的分布式计算功能。 JMLC 以性能为代价提供可嵌入性,因此其使用取决于正在处理的业务用例的性质。

Apache SystemML 安装和部署

IBM Biginsights 是业界领先的 Hadoop 企业级发行版本,在世界著名 IT 行业独立研究公司 Forrester 从 2012 年到 2016 年发布的三次 Hadoop 解决方案的评测报告中,IBM BigInsights 一直处于领导者位置。IBM BigInsights 以 Apache Hadoop 及其相关开源项目作为核心组件,并在 Hadoop 开源框架的基础上进行了大量的企业化增强。IBM Biginsights 包含 Apache SystemML 最新的版本,可以直接部署。通常 Apache SystemML 会和 Spark 一起使用,systemml 必须和 spark 节点安装在一起,并且需要在多节点部署。IBM Biginsights 4.2 以后的版本,比如 版本 4.2.5 就可以通过 Ambari 来安装和管理 SystemML 组件,如图 4 所示。

基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

图 4. IBM Biginsights 包含 Apache SystemML

IBM Biginsights 4.2 版本可以把 Apache SystemML 单独添加到现有的安装中。清单 1 给出了在 IBM Biginsights 4.2 一个节点上部署 Apache SystemML 的部署脚本。其他 Hadoop 发行版用户可以从 http://systemml.apache.org 下载 Apache SystemML 最新的版本按照类似清单 1 的方式部署。

清单 1. IBM BigInsights 4.2 部署 Apache SystemML

				[root@bi01 ~]# cd
				                      /var/www/html/repos/IOP/RHEL7/x86_64/4.2.0.0/systemml/noarch 
				[root@bi01 noarch]# yum install -y
				                      apache_systemml* 
				Installed: 
				  apache_systemml_4.2.0.0.noarch 0:0.10.0_IBM_2-000000 
				Complete!
			

基于 IBM BigInsights 的大数据机器学习实例

下面使用 IBM BigInsights 4.2 的 SystemML 0.10 版本来做一个实际机器学习的例子,数据来自互联网的航空数据,场景是预测飞机延误。

  1. 首先需要下载并加载到 HDFS 上,如清单 2 所示。清单 2. 数据准备
    						[spark@bi01 ~]$ wget
    						                      http://stat-computing.org/dataexpo/2009/2007.csv.bz2 
    						[spark@bi01 ~]$ hadoop fs -put
    						                      2007.csv.bz2 /user/spark/
    					
  2. 这个例子除了需要 SystemML 类库外,还需要用到 Spark 解析和查询 CSV 格式的类库,为了说明方便采用 spark-shell 的方式来执行这个例子,它的执行方式是 Spark MLContext,如清单 3 所示。清单 3. 启动 Apache SystemML 的 Spark 环境
    							[spark@bi01 ~]$ spark-shell --master
    							                      yarn-client --jars
    							                      /usr/iop/current/systemml-client/lib/systemml.jar --packages
    							                      com.databricks:spark-csv_2.10:1.4.0 --num-executors 3
    							                      --driver-memory 2G --executor-memory 6G
    						
  3. 如清单 4 所示使用 Spark CSV 类库将数据集加载到 DataFrame 中。清单 4. 数据加载
    							scala> import
    							                      sys.process._ 
    							scala> import
    							                      java.net.URL 
    							scala> import
    							                      java.io.File 
    							scala> import
    							                      org.apache.spark.sql.SQLContext 
    							scala> import
    							                      org.apache.spark.storage.StorageLevel 
    							scala> val sqlContext = new
    							                      SQLContext(sc) 
    							scala> val fmt =
    							                      sqlContext.read.format("com.databricks.spark.csv") 
    							scala> val opt =
    							                      fmt.options(Map("header"->"true",
    							                      "inferSchema"->"true")) 
    							scala> val localFilePath =
    							                      "hdfs://bi01.ibm.com:8020/user/spark/2007.csv.bz2" 
    							scala> val airline =
    							                      opt.load(localFilePath).na.replace( "*", Map("NA" -> "0.0")
    							                      ) 
    							   
    							scala>
    							                      airline.printSchema 
    							root 
    							 |-- Year: integer (nullable
    							                      = true) 
    							 |-- Month: integer (nullable
    							                      = true) 
    							 |-- DayofMonth: integer
    							                      (nullable = true) 
    							 |-- DayOfWeek: integer
    							                      (nullable = true) 
    							 |-- DepTime: string
    							                      (nullable = true) 
    							 |-- CRSDepTime: integer
    							                      (nullable = true) 
    							 |-- ArrTime: string
    							                      (nullable = true) 
    							 |-- CRSArrTime: integer
    							                      (nullable = true) 
    							 |-- UniqueCarrier: string
    							                      (nullable = true) 
    							 |-- FlightNum: integer
    							                      (nullable = true) 
    							 |-- TailNum: string
    							                      (nullable = true) 
    							 |-- ActualElapsedTime:
    							                      string (nullable = true) 
    							 |-- CRSElapsedTime: string
    							                      (nullable = true) 
    							 |-- AirTime: string
    							                      (nullable = true) 
    							 |-- ArrDelay: string
    							                      (nullable = true) 
    							 |-- DepDelay: string
    							                      (nullable = true) 
    							 |-- Origin: string (nullable
    							                      = true) 
    							 |-- Dest: string (nullable =
    							                      true) 
    							 |-- Distance: integer
    							                      (nullable = true) 
    							 |-- TaxiIn: integer
    							                      (nullable = true) 
    							 |-- TaxiOut: integer
    							                      (nullable = true) 
    							 |-- Cancelled: integer
    							                      (nullable = true) 
    							 |-- CancellationCode: string
    							                      (nullable = true) 
    							 |-- Diverted: integer
    							                      (nullable = true) 
    							 |-- CarrierDelay: integer
    							                      (nullable = true) 
    							 |-- WeatherDelay: integer
    							                      (nullable = true) 
    							 |-- NASDelay: integer
    							                      (nullable = true) 
    							 |-- SecurityDelay: integer
    							                      (nullable = true) 
    							 |-- LateAircraftDelay:
    							                      integer (nullable = true)
    						
  4. 如清单 5 所示使用 Spark SQL 查看哪些机场延误最多。清单 5. 数据探索
    							scala>
    							                      airline.registerTempTable("airline") 
    							scala> sqlContext.sql("""SELECT
    							                      Origin, count(*) conFlight, avg(DepDelay) delay 
    							     |                     FROM airline 
    							     |                     GROUP BY Origin 
    							|                     ORDER BY delay
    							                      DESC""").show 
    							+------+---------+------------------+ 
    							|Origin|conFlight|             delay| 
    							+------+---------+------------------+ 
    							|   PIR|        4|              45.5| 
    							|   ACK|      314|45.296178343949045| 
    							|   SOP|      195| 34.02051282051282| 
    							|   HHH|      997| 22.58776328986961| 
    							|   MCN|      992|22.496975806451612| 
    							|   AKN|      235|21.123404255319148| 
    							|   CEC|     1055|20.807582938388627| 
    							|   GNV|     1927| 20.69797612869746| 
    							|   EYW|     1052|20.224334600760457| 
    							|   ACY|      735|20.141496598639456| 
    							|   SPI|     1745|19.545558739255014| 
    							|   GST|       90|19.233333333333334| 
    							|   EWR|   154113|18.800853918877706| 
    							|   BRW|      726| 18.02754820936639| 
    							|   AGS|     2286|17.728346456692915| 
    							|   ORD|   375784|17.695756072637472| 
    							|   TRI|     1207| 17.63628831814416| 
    							|   SBN|     5128|17.505850234009362| 
    							|   FAY|     2185| 17.48970251716247| 
    							|   PHL|   104063|17.067776250924922| 
    							+------+---------+------------------+ 
    							only showing top 20 rows
    						
  5. 这个例子是预测出发地是 JFK 且延误超过 15 个航班,如果超过 15 则延误标记为 1.0,没有超过 15 标记为 2.0。如清单 6 所示进行数据预处理,随机把数据集分成 70% 的训练数据和 30% 的测试数据。模型训练的数据有 81307 条,用来验证模型的测试数据有 34882 条。清单 6. 数据预处理
    							scala>
    							                      sqlContext.udf.register("checkDelay", (depDelay:String) => try
    							                      { if(depDelay.toDouble > 15) 1.0 else 2.0 } catch { case
    							                      e:Exception => 1.0 }) 
    							scala> val tempSmallAirlineData =
    							                      sqlContext.sql("SELECT *, checkDelay(DepDelay) label FROM airline
    							                      WHERE Origin =
    							                      'JFK'").persist(StorageLevel.MEMORY_AND_DISK) 
    							scala> val popularDest =
    							                      tempSmallAirlineData.select("Dest").map(y =>
    							                      (y.get(0).toString, 1)).reduceByKey(_ + _).filter(_._2 >
    							                      1000).collect.toMap 
    							scala>
    							                      sqlContext.udf.register("onlyUsePopularDest", (x:String) =>
    							                      popularDest.contains(x)) 
    							scala>
    							                      tempSmallAirlineData.registerTempTable("tempAirline") 
    							scala> val smallAirlineData =
    							                      sqlContext.sql("SELECT * FROM tempAirline WHERE
    							                      onlyUsePopularDest(Dest)") 
    							scala> val datasets =
    							                      smallAirlineData.randomSplit(Array(0.7, 0.3)) 
    							scala> val trainDataset =
    							                      datasets(0).cache 
    							scala> val testDataset =
    							                      datasets(1).cache 
    							   
    							scala>
    							                      trainDataset.count 
    							res8: Long = 81307 
    							   
    							scala>
    							                      testDataset.count 
    							res9: Long = 34882
    						
  6. 使用 One-hot encoding 对目的地进行编码,并包括列 Year,Month,DayofMonth,DayOfWeek,Distance 。 One-hot encoding 是将一 列标签索引映射到一列二进制向量,最多只有一个单值。 该编码允许期望连续特征(例如逻辑回归)的算法使用分类特征。清单 7. 数据编码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    scala> import
                           org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
                           VectorAssembler} 
    scala> val indexer = new
                           StringIndexer().setInputCol("Dest").setOutputCol("DestIndex").setHandleInvalid("skip") 
    scala> val encoder = new
                           OneHotEncoder().setInputCol("DestIndex").setOutputCol("DestVec") 
    scala> val assembler = new
                           VectorAssembler().setInputCols(Array("Year","Month","DayofMonth","DayOfWeek","Distance","DestVec")).setOutputCol("features")
  7. 使用 SystemML 构建模型,实际调用 SystemML 的 MultiLogReg.dml(用于训练)如清单 7 所示,MultiLogReg.dml 脚本执行二项式和多项逻辑回归。在这个例子中设置了以下参数,并用 70% 的训练数据进行模型训练。
    • reg: (缺省值: 0.0) L2 正则化参数,设置为 1e-4
    • tol: (缺省值: 0.000001) 公差,用于收敛标准,设置为 1e-2
    • mii: (缺省值: 0) 内部迭代的最大数目,设置为 0
    • moi: (缺省值: 100) 最大外部迭代次数,设置为 100

    清单 7. 预测模型

    							scala> import
    							                      org.apache.spark.ml.Pipeline 
    							scala> import
    							                      org.apache.sysml.api.ml.LogisticRegression 
    							scala> val lr = new
    							                      LogisticRegression("log",
    							                      sc).setRegParam(1e-4).setTol(1e-2).setMaxInnerIter(0).setMaxOuterIter(100) 
    							scala> val pipeline = new
    							                      Pipeline().setStages(Array(indexer, encoder, assembler,
    							                      lr)) 
    							   
    							scala> val model =
    							                      pipeline.fit(trainDataset) 
    							BEGIN MULTINOMIAL LOGISTIC REGRESSION
    							                      SCRIPT 
    							Reading X... 
    							Reading Y... 
    							-- Initially:  Objective =
    							                      56332.764511287314,  Gradient Norm = 4.442549498590815E7,
    							                       Trust Delta = 0.001024586722033724 
    							-- Outer Iteration 1: Had 1 CG
    							                      iterations 
    							   -- Obj.Reduction:
    							                       Actual = 9183.298853323096,  Predicted =
    							                      8838.441994939885  (A/P: 1.039),  Trust Delta =
    							                      4.1405507117841964E-4 
    							   -- New Objective
    							                      = 47149.46565796422,  Beta Change Norm =
    							                      3.978995393407976E-4,  Gradient Norm =
    							                      3449234.078105791 
    							-- Outer Iteration 2: Had 2 CG
    							                      iterations 
    							   -- Obj.Reduction:
    							                       Actual = 105.15284603631153,  Predicted =
    							                      103.41131167333731  (A/P: 1.0168),  Trust Delta =
    							                      4.1405507117841964E-4 
    							   -- New Objective
    							                      = 47044.312811927906,  Beta Change Norm =
    							                      1.0251350174812022E-4,  Gradient Norm =
    							                      83156.34844588072 
    							Termination / Convergence condition
    							                      satisfied. 
    							model:
    							                      org.apache.spark.ml.PipelineModel =
    							                      pipeline_4bb95fa082de
    						
  8. 最后用 30% 的测试数据进行预测,实际调用 SystemML 的 GLM-predict.dml(用于预测)。如清单 8 所示,列出了航班预测的结果以及实际延误的情况,并最后计算出模型的均方根误差。清单 8. 模型评估
    							scala> val predictions =
    							                      model.transform(testDataset.withColumnRenamed("label",
    							                      "OriginalLabel")) 
    							   
    							scala>predictions.select("prediction",
    							                      "OriginalLabel").show 
    							+----------+-------------+ 
    							 |prediction|OriginalLabel| 
    							+----------+-------------+ 
    							|       1.0|          1.0| 
    							|       1.0|          1.0| 
    							|       1.0|          1.0| 
    							|       1.0|          1.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          1.0| 
    							|       1.0|          1.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          1.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          1.0| 
    							|       1.0|          2.0| 
    							|       1.0|          2.0| 
    							|       1.0|          1.0| 
    							|       1.0|          1.0| 
    							+----------+-------------+ 
    							only showing top 20 rows 
    							   
    							scala>
    							                      sqlContext.udf.register("square", (x:Double) => Math.pow(x,
    							                      2.0)) 
    							scala>
    							                      predictions.registerTempTable("predictions") 
    							scala> sqlContext.sql("SELECT
    							                      sqrt(avg(square(OriginalLabel - prediction))) FROM
    							                      predictions").show 
    							+------------------+ 
    							|               _c0| 
    							+------------------+ 
    							|0.8567701236741244| 
    							+------------------+
    						

总结

本文通过理论联系实际的方式描述了如何在 IBM BigInsights 利用 Apache SystemML 进行大数据平台的机器学习。

SystemML 是一个灵活的,可 扩 展的机器学 习 系 统 。 SystemML 的特点是:

  • 通 过类 似 R 和 类 似 Python 的 语 言 进 行算法定制。
  • 多种 执 行模式,包括 Spark MLContext , Spark Batch , Hadoop Batch , Standalone 和 JMLC 。
  • 基于数据和集群特征的自 动优 化,以确保效率和可 扩 展性。

IBM BigInsights 是业界领先的 Hadoop 企业级发行版本,不仅包含 Apache SystemML 组件也对其进行企业级技术支持。IBM BigInsights 最新版本包含的 Jupyter Notebook 可以方便地进行 SystemML 的机器学习代码的编写和调试。

End.

转载请注明来自36大数据(36dsj.com): 36大数据 » 基于大数据的机器学习:Apache SystemML 在 IBM BigInsights 的实践

随意打赏

基于大数据分析基于大数据
提交建议
微信扫一扫,分享给好友吧。