十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
今天就跟大家聊聊有关Spark Streaming结合Flume和Kafka的日志分析是怎样的,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
创新互联总部坐落于成都市区,致力网站建设服务有做网站、成都网站制作、网络营销策划、网页设计、网站维护、公众号搭建、微信平台小程序开发、软件开发等为企业提供一整套的信息化建设解决方案。创造真正意义上的网站建设,为互联网品牌在互动行销领域创造价值而不懈努力!
按照 http://my.oschina.net/sunmin/blog/692994
整合安装Flume+Kafka+SparkStreaming
将flume/conf/producer.conf将需要监控的日志输出文件修改为本地的log 路径: /var/log/nginx/www.eric.aysaas.com-access.log
(快捷键 Ctrl + Alt + Shift + s),点击Project Structure界面左侧的“Modules”显示下图界面
jar 包自己编译,或者去载 http://search.maven.org/#search|ga|1|g%3A%22org.apache.spark%22%20AND%20v%3A%221.6.1%22
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * flume+kafka+SparkStreaming 实时 nginx 日志获取 * Created by eric on 16/6/29. */ object KafkaLog { def main(agrs: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[1]").setAppName("StreamingTest") val ssc = new StreamingContext(sparkConf, Seconds(20))//代表一个给定的秒数的实例 val topic = "HappyBirthDayToAnYuan" val topicSet = topic.split(" ").toSet //用 brokers and topics 创建 direct kafka stream val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092") //直接从 kafka brokers 拉取信息,而不使用任何接收器. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicSet ) val lines = messages.map(_._2) lines.print() val words: DStream[String] = lines.flatMap(_.split("\n")) words.count().print() //启动 ssc.start() ssc.awaitTermination() } }
访问本地页面产生日志 http://www.eric.aysaas.com/app/admin
在这20秒内总共产生的日志行数为:
看完上述内容,你们对Spark Streaming结合Flume和Kafka的日志分析是怎样的有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。