socialgekon.com
  • Главни
  • Уређивање
  • Укс Дизајн
  • Наука О Подацима И Базе Података
  • Дизајн Процес
Наука О Подацима И Базе Података

Водич за стримовање Апацхе Спарк-а: Идентификовање хасхтагова у тренду са Твиттера

Данас подаци расту и акумулирају се брже него раније. Тренутно је око 90% података генерисаних у нашем свету генерисано у последње две године. Због овог раста стопе, платформе Велики података морали су да усвоје радикална решења да би могли да задрже тако велике количине података.

Један од најзначајнијих извора података данас су друштвени медији. Дозволите ми да демонстрирам пример из стварног живота: управљање, анализирање и издвајање информација из података друштвених медија у реалном времену, користећи једно од еколошких решења у Велики података најважније ствари - Апацхе Спарк и Питхон.

Апацхе Спарк Стреаминг може се користити за издвајање информација из друштвених медија, попут трендовских Твиттер хештегова



У овом чланку ћу вам показати како да направите једноставну апликацију која чита Твиттер мрежне фидове помоћу Питхона, а затим обрађује твеетове користећи Апацхе Спарк Стреаминг да бисте идентификовали хасхтагове и на крају вратили најважније хасхтагове у тренду и приказали ове податке на контролној табли у реалном времену.

Стварање сопствених акредитива за Твиттер АПИ-је

Да бисте добили твитове са Твиттера, морате се регистровати на ТвиттерАппс Кликом на „Направи нову апликацију“ и након попуњавања доњег обрасца кликните на „Направи своју Твиттер апликацију“.

Снимак екрана: Како креирати своју Твиттер апликацију

Друго, идите на вашу новостворену апликацију и отворите прозор „Приступ идентификаторима и кључевима“. Затим кликните на „Генериши мој приступни идентификатор“.

Снимак екрана: Инсталирање акредитива, лозинки и приступних ИД-ова за Твиттер апликацију

Приказаће се ваши нови ИД-ови за пријаву као што је приказано доле.

Снимак екрана: Инсталирање приступних ИД-ова за апликацију Твиитер

И сада сте спремни за следећи корак.

Направите ХТТП Твиттер клијент

У овом кораку ћу вам показати како да направите једноставног клијента који ће достављати твеетове са Твиттер АПИ-ја користећи Питхон, а затим их проследити инстанци Спарк Стреаминг . Било кога би требало бити лако пратити питхон програмер професионални.

Прво ћемо створити датотеку под називом twitter_app.py а затим ћемо додати код заједно како се види доле.

Увезите библиотеке које ћемо користити како је приказано доле:

import socket import sys import requests import requests_oauthlib import json

И додајте променљиве које ће се користити у ОАутх-у за повезивање са Твиттер-ом као што је приказано доле:

# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Сада, креирајмо нову функцију која се зове get_tweets који ће позвати УРЛ Твиттер АПИ-ја и вратити одговор за низ твеетова.

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

Затим креирате функцију која узима одговор из горњег приказа и извлачи текст твеетова из ЈСОН објекта пуних твеетова. После овога пошаљите сваки твит инстанци Спарк Стреаминг (о томе ће бити речи касније) преко ТЦП везе.

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

Сада ћемо урадити главни део. Ово ће учинити да апликација хостује везе утичница , са којим ће се касније повезати Искра . Овде ћемо конфигурисати ИП да буде localhost пошто ће се све изводити на истој машини и на порту 9009. Затим ћемо позвати методу get_tweets, што смо претходно урадили, да бисмо добили твитове са Твиттера и проследили ваш одговор са везом утичница а send_tweets_to_spark да пошаље твитове Спарк-у.

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

Инсталирање наше Апацхе Спарк Стреаминг апликације

Изградимо нашу апликацију Спарк Стреаминг , која ће извршити обраду у реалном времену за долазне твитове, извући из њих хасхтагове и израчунати колико је хасхтагова поменуто.

Илустрација: * Спарк стреаминг * омогућава обраду долазних твеетова у реалном времену и издвајање хасхтага

Прво, морамо створити инстанцу Спарк контекст sc, онда креирамо Стриминг контекст ssc од sc са интервалом од две секунде који ће извршити трансформацију у свим преносима примљеним сваке две секунде. Имајте на уму да смо ниво дневника поставили на ERROR да бисте могли да онемогућите већину дневника које пишете Искра .

Овде дефинишемо контролну тачку како бисмо могли да омогућимо периодичну проверу РДД-а; ово је обавезно за употребу у нашој апликацији, јер ћемо користити трансформације пожара са статусом (о томе ће бити речи касније у истом одељку).

Затим дефинишемо наш главни ДСтреам датаСтреам, који ће повезати сервер утичница које смо раније креирали у луци 9009 и читаће твеетове са те луке. Свака плоча на ДСтреам-у биће твит.

from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)

Сада ћемо дефинисати нашу логику трансформације. Прво ћемо све твеетове разбити на речи и ставити у РДД речи. Затим филтрирамо само хасхтагове свих речи и уцртамо их поред (hashtag, 1) и стављамо их у хасхтагове РДД.

Тада морамо израчунати колико пута је хасхтаг споменут. То можемо учинити помоћу функције reduceByKey. Ова функција израчунава колико је пута хеш-ознака поменута у свакој групи, односно ресетоваће налог у свакој групи.

У нашем случају морамо израчунати бројеве у свим групама, па ћемо користити другу функцију која се зове updateStateByKey с обзиром да вам ова функција омогућава да задржите РДД статус док га ажурирате новим подацима. Овај образац се назива Stateful Transformation.

Имајте на уму да да бисте користили updateStateByKey, морате да конфигуришете контролну тачку и оно што је урађено у претходном кораку.

# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()

updateStateByKey узима функцију као параметар који се назива update функција. Ово се извршава на свакој ставци у РДД-у и изводи жељену логику.

У нашем случају створили смо функцију ажурирања која се зове aggregate_tags_count који ће сабрати све new_values (нове вредности) за сваки хасхтаг и додати их у total_sum (укупан збир), што је збир свих група и чува податке у РДД tags_totals.

def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

Затим радимо РДД обраду tags_totals у свакој групи да би могли да га претворе у привремену табелу користећи Спарк СКЛ контекст а након овога дајте изјаву да бисте могли да узмете првих десет хасхтагова са својим рачунима и ставите их у оквир података hashtag_counts_df

def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)

Последњи корак у нашој апликацији Спарк је слање оквира података hashtag_counts_df до апликације на контролној табли. Тако ћемо оквир података претворити у две матрице, једну за хасхтагове и једну за њихове рачуне. Затим ћемо послати у апликацију на контролној табли преко РЕСТ АПИ-ја.

def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

На крају, ево примера резултата Спарк Стреаминг током покретања и штампања hashtag_counts_df. Приметићете да се излаз штампа тачно сваке две секунде за сваки интервал групе.

Пример излаза за Твиттер * Спарк стреаминг *, одштампан за свако подешавање интервала групе

Направите једноставну контролну таблу у реалном времену за представљање података

Сада ћемо створити једноставну апликацију на контролној табли коју ће Спарк ажурирати у реалном времену. Изградићемо га користећи Питхон, Фласк и Цхартс.јс .

Прво ћемо створити Питхон пројекат са структуром приказаном доле, преузети и додати датотеку Цхарт.јс у статичком директоријуму.

Илустрација: Направите пројекат Питхон за употребу у Твиттер Хасхтаг анализи

Затим, у датотеци app.py, креираћемо функцију која се зове update_data, а коју ће Спарк позвати преко УРЛ-а http://localhost:5001/updateData да бисте могли да ажурирате глобалне ознаке и низове вредности.

Слично томе, функција refresh_graph_data створен је да га АЈАКС захтев позове да врати нове ажуриране ознаке и низове вредности као ЈСОН. Функција get_chart_page ће напустити страницу chart.html кад се позове.

from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)

Сада ћемо створити једноставан граф у датотеци chart.html да бисте могли да прикажете податке хештега и ажурирате их у реалном времену. Као што је дефинисано доле, морамо да увеземо ЈаваСцрипт библиотеке, Chart.js и jquery.min.js.

У телу ознаке треба да креирамо платно и дамо му ИД да бисмо могли да се на њега позивамо током приказивања графикона када користимо ЈаваСцрипт у следећем кораку.

Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

Сада ћемо створити графикон користећи ЈаваСцрипт код испод. Прво узмемо елемент платна, а затим креирамо нови објект графа и проследимо му елемент платна и дефинишемо објект података као што је приказано доле.

Имајте на уму да су ознаке података спојене са ознакама и променљивима вредности које се враћају приликом напуштања странице позивањем get_chart_page у датотеци app.py.

Последњи део је функција која је конфигурисана да сваке секунде даје Ајак захтев и позива УРЛ /refreshData, који ће извршити refresh_graph_data у app.py и вратиће нове ажуриране податке, а затим ажурирати графикон који нови подаци остављају.

var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);

Покрените апликације заједно

Покренућемо три апликације доле наведеним редоследом: 1. Твиттер Апп Цлиент. 2. Апликација Спарк 3. Веб апликација на контролној табли.

Тада можете приступити контролној табли у реалном времену тако што ћете потражити УРЛ

Сада можете видети да се графикон ажурира у наставку:

Анимација: Графикон трендовских хасхтагова на Твиттеру у реалном времену

Коришћење Апацхе Стреаминг-а у стварном животу

Научили смо да радимо једноставну анализу података у реалном времену помоћу Спарк Стреаминг-а и интегришемо их директно са једноставном контролном плочом, користећи РЕСТфул веб услугу. Из овог примера можемо видети колико је Спарк моћан, јер снима огроман ток података, трансформише га и издваја драгоцене информације које се лако могу користити за доношење одлука у кратком времену. Постоји много корисних случајева употребе који се могу применити и могу послужити различитим индустријама, попут вести или маркетинга.

Илустрација: Хасхтагови се могу користити за издвајање информација и расположења, што се може применити на више индустрија.

Пример индустрије вести

Можемо пратити најчешће помињане хасхтагове да бисмо сазнали о којим темама људи причају на друштвеним мрежама. Такође можемо пратити одређене хасхтагове и њихове твеетове да бисмо сазнали шта људи говоре о одређеним темама или догађајима у свету.

Пример маркетинга

Можемо прикупити пренос твеетова и анализом мишљења их категоризовати и утврдити интересе људи како бисмо им пружили понуде повезане са њиховим интересима.

Такође, постоји много случајева примене који се могу применити посебно за аналитику. Велики података а могу послужити многим индустријама. За више случајева коришћења Апацхе Спарк уопште, предлажем да погледате један од наших претходни постови .

Препоручујем вам да прочитате више о Спарк Стреаминг овде да бисте сазнали више о његовим могућностима и извршили напреднију трансформацију података за више информација у реалном времену када их користите.

Супер једноставан водич за иконографију

Уи Десигн

Супер једноставан водич за иконографију
Аутоматски ажурирајте еластични стог помоћу Ансибле Плаибоокс

Аутоматски ажурирајте еластични стог помоћу Ансибле Плаибоокс

Технологија

Популар Постс
Зашто се више предузетника одлучује за изградњу фондова за претрагу преко стартупа
Зашто се више предузетника одлучује за изградњу фондова за претрагу преко стартупа
Послови које треба обавити: Претворите потребе купаца у решења за производе
Послови које треба обавити: Претворите потребе купаца у решења за производе
Свет је наше сучеље - еволуција дизајна корисничког интерфејса
Свет је наше сучеље - еволуција дизајна корисничког интерфејса
Контекст свесне апликације и сложена архитектура обраде догађаја
Контекст свесне апликације и сложена архитектура обраде догађаја
Уметност фотографије цвећа: 15 савета за више уметничких резултата
Уметност фотографије цвећа: 15 савета за више уметничких резултата
 
Истински повраћај улагања у УКС: Студије случаја редизајнирања Б2Б
Истински повраћај улагања у УКС: Студије случаја редизајнирања Б2Б
Најузбудљивија класа имовине у приватном капиталу? Претражите средства из перспективе инвеститора
Најузбудљивија класа имовине у приватном капиталу? Претражите средства из перспективе инвеститора
Уобичајене грешке у комуникацији са купцима: Како не фрустрирати купца
Уобичајене грешке у комуникацији са купцима: Како не фрустрирати купца
Како спојити на ТикТок-у и дозволити другима да спајају ваше видео записе
Како спојити на ТикТок-у и дозволити другима да спајају ваше видео записе
Процена професионалне франшизе у спорту
Процена професионалне франшизе у спорту
Категорије
Људи И ТимовиОкретанПланирање И ПредвиђањеАгиле ТалентНачин ЖивотаИнвеститори И ФинансирањеИнжењерски МенаџментКпи И АналитикаМобилеПуцање

© 2023 | Сва Права Задржана

socialgekon.com