跳到主要內容

[Jave] [GCP Dataflow] Pubsub To Firebase of Streaming


開發環境 




登入GCP  - 開發專案如果權限沒有另外設定,依預設登入的google專案
( Google Cloud SDK Shell )

---2018/08/21 updated---
gcloud auth login
gcloud auth application-default login
gcloud config set project {projId}


Deploy Dataflow
( Terminal )

1.到專案的資料夾
cd {path}

2.deploy gardle (在專案的目錄下執行) 
gradle run -Pargs="--project={projId} --runner=DataflowRunner    --tempLocation=gs://{bucket name}/temp --stagingLocation=gs://{bucket name}/staging"



新增GCP專案 Real-time databse & 取得 GCP Auth
http://beaminguna.blogspot.tw/2017/11/firebase-gcp-real-time-databse-gcp-auth.html


建立Pubsub -> 新增訂閱 -> 發布訊息 -> 接收訊息
( Google Cloud SDK Shell )

gcloud init
gcloud components install beta
gcloud beta pubsub topics create myTopic
gcloud beta pubsub subscriptions create --topic myTopic mySubscription
gcloud beta pubsub topics publish myTopic "hello"
gcloud beta pubsub subscriptions pull --auto-ack mySubscription






Example Code
( IntelliJ )


[ build.gradle ]


group 'com.template'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'application'

sourceCompatibility = 1.8

mainClassName = 'com.dataflow.DataflowTemplate'

repositories {
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'

//firebase-admin 5.5.0 & beam-sdks-java-core 2.2.0 have question.
    compile group: 'com.google.cloud.dataflow', name:  'google-cloud-dataflow-java-sdk-all', version: '2.1.0'
    compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.1.0'

    compile group: 'com.google.guava', name: 'guava', version: '23.0'


    compile group: 'com.google.firebase', name: 'firebase-admin', version: '5.3.1'

    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'

    compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.25'
    compile 'com.google.cloud:google-cloud-logging:1.6.0'

}


task resources {
    def resourcesDir = new File('build/resources/main')
    resourcesDir.mkdirs()
}

run {
    if (project.hasProperty('args')) {
        args project.args.split('\\s')
    }
}

run.mustRunAfter 'resources'




[  .java ]  


public class streamingdataflow{

    static final int WINDOW_SIZE = 10; // Default window duration in second

    static final com.streaming.ProjectLogging loggin = new ProjectLogging(streamingdataflow.class);
    static final String PubsubPath = "projects/{projId}/subscriptions/{Subscription Name}";
    static final String DatabaseUrl = "https://{projId}.firebaseio.com";

    @DefaultCoder(AvroCoder.class)
    static class LogData {

        public String getTime() {
            return time;
        }

        public Date getTimeFixed() {
            SimpleDateFormat groupformatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");

            try {
                return groupformatter.parse(time);
            } catch (ParseException e) {
                loggin.error("EasyStatisticFn_Error getTimeGroup --- " + e.toString());
                return null;
            }
        }

        public String getAct() {
            return act;
        }

        public Integer getValue() {
            return value;
        }

        private String time;
        private String act;
        private Integer value;

    }

    // ========================================
    // ========== 分析資料 =====================
    // ========================================
    private static class EasyStatisticFn extends DoFn<Iterable<streamingdataflow.LogData>, String> {

        private static final long serialVersionUID = 1L;

        // -- init dofn ---
        @StartBundle
        public void startBundle() {

            try {
                String FirebaseAuthString = "{.json string}";

                InputStream stream = new ByteArrayInputStream(
                        FirebaseAuthString.getBytes(StandardCharsets.UTF_8.name()));
                FirebaseOptions options = new FirebaseOptions.Builder()
                        .setCredential(FirebaseCredentials.fromCertificate(stream)).setDatabaseUrl(DatabaseUrl).build();

                FirebaseApp.initializeApp(options);
            } catch (Exception e) {
                loggin.error("FirebaseInit_Error --- " + e.getMessage());
            }
        }

        @ProcessElement
        public void processElement(ProcessContext c) {

            try {

                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
                SimpleDateFormat keyformatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");

                Date ndate = new Date();

                // -==========================================================
                FirebaseDatabase database = FirebaseDatabase.getInstance();
                DatabaseReference ref = database.getReference("");
                DatabaseReference listRef = ref.child("list");

                // -==========================================================
                // Transfer data type.
                List<streamingdataflow.LogData> datalist = Lists.newArrayList(c.element());
                loggin.error("EasyStatisticFn_ProcessElement_jsonArray:" + datalist.size());

                // fix time.
                Map<Date, List<streamingdataflow.LogData>> timeMap = datalist.stream()
                        .collect(Collectors.groupingBy(streamingdataflow.LogData::getTimeFixed));

                for (Map.Entry entrytime : timeMap.entrySet()) {
                    loggin.error("EasyStatisticFn_ProcessElement_fixtime --- " + formatter.format(entrytime.getKey()));

                    String gpTime = keyformatter.format(entrytime.getKey());
                    List<streamingdataflow.LogData> gpList = (List<streamingdataflow.LogData>) entrytime.getValue();

                    DatabaseReference gpRef = listRef.child(gpTime);

                    // Group by Act.
                    Map<String, List<streamingdataflow.LogData>> resultMap = gpList.stream()
                            .collect(Collectors.groupingBy(streamingdataflow.LogData::getAct));

                    int cnt = gpList.size();

                    ListenFirebaseValue(gpRef, "All", cnt);

                    for (Map.Entry entry : resultMap.entrySet()) {
                        List<streamingdataflow.LogData> gplist = (List<streamingdataflow.LogData>) entry.getValue();
                        loggin.error("EasyStatisticFn_eachdata:" + gplist.iterator().toString());

                        ListenFirebaseValue(gpRef, entry.getKey().toString(), gplist.size());
                    }
                }           

                c.output(formatter.format(ndate) + "  successed.");
            } catch (Exception e) {
                loggin.error("EasyStatisticFn_Error --- " + e.toString());
                c.output(e.toString());
            }
        }
    }

    private static void ListenFirebaseValue(DatabaseReference Ref, String KeyName, Integer Count) {
        Ref.child(KeyName).addListenerForSingleValueEvent(new ValueEventListener() {
            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {

                if (dataSnapshot.getValue() != null) {
                    Integer post = dataSnapshot.getValue(Integer.class);
                    loggin.error("EasyStatisticFn_firebase : " + post);

                    Integer tmpsum = Count + post;
                    Ref.child(KeyName).setValue(tmpsum);
                } else {
                    Ref.child(KeyName).setValue(Count);
                }
            }

            @Override
            public void onCancelled(DatabaseError databaseError) {
                loggin.error("EasyStatisticFn_firebase_error : " + databaseError.getCode());
            }
        });
    }

 
    // ========================================
    // ======= Windows Collect ===================
    // ========================================
    public static class WindowsCollect
extends PTransform<PCollection<streamingdataflow.LogData>, PCollection<Iterable<streamingdataflow.LogData>>> {

        @Override
        public PCollection<Iterable<streamingdataflow.LogData>> expand(PCollection<streamingdataflow.LogData> logInfo) {

            return logInfo
                    .apply("Windows collect",
                            Window.<streamingdataflow.LogData>into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE))))
                    .apply("Add keys", WithKeys.<Integer, streamingdataflow.LogData>of(1))
                    .apply("Gruop data", GroupByKey.<Integer, streamingdataflow.LogData>create())
                    .apply("Formate data", Values.<Iterable<streamingdataflow.LogData>>create());
        }
    }


    // ========================================
    // ========== Run Dataflow ===================
    // ========================================
    public static void main(String[] args) {

        final TupleTag<streamingdataflow.LogData> successTag = new TupleTag<streamingdataflow.LogData>() {
        };
        final TupleTag<String> deadLetterTag = new TupleTag<String>() {
        };

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        options.as(DataflowPipelineOptions.class).setStreaming(true);

        Pipeline p = Pipeline.create(options);

        PCollection<String> dataList = p.apply("ReadMyFile", PubsubIO.readStrings().fromSubscription(PubsubPath));

        PCollectionTuple outputTuple = dataList.apply("TransferMessage", ParDo.of(new DoFn<String, streamingdataflow.LogData>() {

            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) {
                Gson gson = new Gson();
                String msg = c.element();

                try {
                    streamingdataflow.LogData logItem = gson.fromJson(msg, streamingdataflow.LogData.class);
                    c.output(logItem);
                } catch (Exception e) {
                    c.output(deadLetterTag, msg);
                }
            }
        }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

        // do successed.
        outputTuple.get(successTag)           
                .apply("seconds Collect", new streamingdataflow.WindowsCollect()).apply("Statistic", ParDo.of(new streamingdataflow.EasyStatisticFn()));

        // do error data.
        outputTuple.get(deadLetterTag).apply("WriteDeadDataToPubsub",
                PubsubIO.writeStrings().to("projects/{projId}/topics/{Sunscript Name}"));
     
        p.run();
    }

}




相關參考
Quickstart: Using the gcloud Command-Line Tool
Create a Google Cloud Dataflow Project with Gradle
Apache Beam Mobile Gaming Pipeline Examples

java8 - stream api
open source stream api


留言

這個網誌中的熱門文章

[python] python 時區轉換 ( timezone )

python 時區轉換 import datetime as dt import pytz d = dt.datetime(2019,1,1,14,0,0) #create Taipei timezone tw = pytz.timezone('Asia/Taipei') #set d timezone is 'Asia/Taipei' twdt = tw.localize(d) #change to utc time utc_dt = twdt.astimezone(pytz.utc) *注意 網路上很多使用 d.replace(tzinfo=tw)   但是可能會出現時差問題, 如下: datetime.time(23, 18, 5, tzinfo=<DstTzInfo 'Asia/Shanghai' LMT+8:06:00 STD>) 參考文件 python时区设置——pytz模块

[HTML] 文字浮水印效果 text watermark css

[.css] .info-mask {     height: 0;       position: absolute;       margin-top:120px;   } .info-opacity {     opacity: 0.8;     color: black; } .info-text {     color: gray;     font-size: 40px;     -webkit-transform: rotate(-5deg);       line-height: 60px;     max-width:1200px;     letter-spacing:3px;     padding-left:0.8em; } [.html]     <div class="info-mask">         <p class="info-text">             WaterMarkText         </p>     </div>    <table class="table table-bordered table-striped table-responsive table-hover" style="position:                 relative;" >        <tbody class="info-opacity" >     ...

[Linux] 解決 unix ///tmp/supervisor.sock no such file

supercisor run  一段時間系統會自動刪除tmp內的檔案, 這時候就會出現以下問題: unix ///tmp/supervisor.sock no such file 作業環境: Centos 7 *解決方法 修改 supervisor 設定檔 [...]#  vim /etc/supervisor/supervisord.conf [unix_http_server] file= /var/run/ supervisor.sock  ... [supervisord] logfile= /var/log/ supervisord.log ... [supervisorctl] serverurl=unix:/// var/run/ supervisor.sock 重新執行supervisor  關閉supervisor相關進程 [...]#  ps -ef | grep supervisord   [...]#  kill (進程id) 重新執行supervisor [...]# supervisord or [...]# supervisord -c /etc/supervisor/supervisord.conf 參考連結 解决unix:///tmp/supervisor.sock no such file的问题