你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。

使用 HDInsight 中的 Apache Hive 和 Apache Hadoop 分析 Twitter 数据
  • 项目

了解如何使用 Apache Hive 处理 Twitter 数据。 结果是发送最多包含某个特定词的推文的 Twitter 用户列表。

重要

本文档中的步骤已在 HDInsight 3.6 上进行测试。

获取数据

Twitter 允许通过 REST API 检索每个推文的数据作为 JavaScript 对象表示法 (JSON) 文档。 要对 API 进行身份验证,需要 OAuth。

创建 Twitter 应用程序

https://www.myhdinsightapp.com

下载推文

以下 Python 代码会从 Twitter 下载 10,000 篇推文并将其保存到一个名为 tweets.txt 的文件中。

注意

由于已安装了 Python,请在 HDInsight 群集上执行以下步骤。

ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
sudo apt install python-dev libffi-dev libssl-dev
sudo apt remove python-openssl
python -m pip install virtualenv
mkdir gettweets
cd gettweets
virtualenv gettweets
source gettweets/bin/activate
pip install tweepy progressbar pyOpenSSL requests[security]
nano gettweets.py
#!/usr/bin/python

from tweepy import Stream, OAuthHandler
from tweepy.streaming import StreamListener
from progressbar import ProgressBar, Percentage, Bar
import json
import sys

#Twitter app information
consumer_secret='Your consumer secret'
consumer_key='Your consumer key'
access_token='Your access token'
access_token_secret='Your access token secret'

#The number of tweets we want to get
max_tweets=100

#Create the listener class that receives and saves tweets
class listener(StreamListener):
    #On init, set the counter to zero and create a progress bar
    def __init__(self, api=None):
        self.num_tweets = 0
        self.pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=max_tweets).start()

    #When data is received, do this
    def on_data(self, data):
        #Append the tweet to the 'tweets.txt' file
        with open('tweets.txt', 'a') as tweet_file:
            tweet_file.write(data)
            #Increment the number of tweets
            self.num_tweets += 1
            #Check to see if we have hit max_tweets and exit if so
            if self.num_tweets >= max_tweets:
                self.pbar.finish()
                sys.exit(0)
            else:
                #increment the progress bar
                self.pbar.update(self.num_tweets)
        return True

    #Handle any errors that may occur
    def on_error(self, status):
        print status

#Get the OAuth token
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
#Use the listener class for stream processing
twitterStream = Stream(auth, listener())
#Filter for these topics
twitterStream.filter(track=["azure","cloud","hdinsight"])
python gettweets.py

上传数据

要将数据上传到 HDInsight 存储,请使用以下命令:

hdfs dfs -mkdir -p /tutorials/twitter/data
hdfs dfs -put tweets.txt /tutorials/twitter/data/tweets.txt

这些命令会在群集中的所有节点都可以访问的位置中存储数据。

运行 HiveQL 作业

nano twitter.hql
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;
-- Drop table, if it exists
DROP TABLE tweets_raw;
-- Create it, pointing toward the tweets logged from Twitter
CREATE EXTERNAL TABLE tweets_raw (
    json_response STRING
)
STORED AS TEXTFILE LOCATION '/tutorials/twitter/data';
-- Drop and recreate the destination table
DROP TABLE tweets;
CREATE TABLE tweets
(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array<STRING>,
    user_mentions array<STRING>,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING
);
-- Select tweets from the imported data, parse the JSON,
-- and insert into the tweets table
FROM tweets_raw
INSERT OVERWRITE TABLE tweets
SELECT
    cast(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    concat(substr (get_json_object(json_response, '$.created_at'),1,10),' ',
    substr (get_json_object(json_response, '$.created_at'),27,4)),
    substr (get_json_object(json_response, '$.created_at'),27,4),
    case substr (get_json_object(json_response,    '$.created_at'),5,3)
        when "Jan" then "01"
        when "Feb" then "02"
        when "Mar" then "03"
        when "Apr" then "04"
        when "May" then "05"
        when "Jun" then "06"
        when "Jul" then "07"
        when "Aug" then "08"
        when "Sep" then "09"
        when "Oct" then "10"
        when "Nov" then "11"
        when "Dec" then "12" end,
    substr (get_json_object(json_response, '$.created_at'),9,2),
    substr (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    cast (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    array(
        trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        trim(lower(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    array(
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        trim(lower(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    trim(lower(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    trim(lower(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    cast (get_json_object(json_response, '$.user.followers_count') as INT),
    cast (get_json_object(json_response, '$.user.listed_count') as INT),
    cast (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (length(json_response) > 500);
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http' -i twitter.hql
SELECT name, screen_name, count(1) as cc
FROM tweets
WHERE text like "%Azure%"
GROUP BY name,screen_name
ORDER BY cc DESC LIMIT 10;

后续步骤

你已了解如何将非结构化 JSON 数据集转换为结构化 Apache Hive 表。 若要了解有关 HDInsight 上的 Hive 的详细信息,请参阅以下文档: