開發環境
- Java
+ IntelliJ IDEA - Terminal
- GCP Cloud SDK
- Firebase
登入GCP - 開發專案如果權限沒有另外設定,依預設登入的google專案
( Google Cloud SDK Shell )
---2018/08/21 updated---
gcloud auth login
gcloud auth application-default login
gcloud config set project {projId}
Deploy Dataflow
( Terminal )
1.到專案的資料夾
cd {path}
2.deploy gardle (在專案的目錄下執行)
gradle run -Pargs="--project={projId} --runner=DataflowRunner --tempLocation=gs://{bucket name}/temp --stagingLocation=gs://{bucket name}/staging"
新增GCP專案 Real-time databse & 取得 GCP Auth
http://beaminguna.blogspot.tw/2017/11/firebase-gcp-real-time-databse-gcp-auth.html
建立Pubsub -> 新增訂閱 -> 發布訊息 -> 接收訊息
( Google Cloud SDK Shell )
gcloud init
gcloud components install beta
gcloud beta pubsub topics create myTopic
gcloud beta pubsub subscriptions create --topic myTopic mySubscription
gcloud beta pubsub topics publish myTopic "hello"
gcloud beta pubsub subscriptions pull --auto-ack mySubscription
Example Code
( IntelliJ )
[ build.gradle ]
group 'com.template'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
mainClassName = 'com.dataflow.DataflowTemplate'
repositories {
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
//firebase-admin 5.5.0 & beam-sdks-java-core 2.2.0 have question.
compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.1.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.1.0'
compile group: 'com.google.guava', name: 'guava', version: '23.0'
compile group: 'com.google.firebase', name: 'firebase-admin', version: '5.3.1'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.25'
compile 'com.google.cloud:google-cloud-logging:1.6.0'
}
task resources {
def resourcesDir = new File('build/resources/main')
resourcesDir.mkdirs()
}
run {
if (project.hasProperty('args')) {
args project.args.split('\\s')
}
}
run.mustRunAfter 'resources'
[ .java ]
public class streamingdataflow{
static final int WINDOW_SIZE = 10; // Default window duration in second
static final com.streaming.ProjectLogging loggin = new ProjectLogging(streamingdataflow.class);
static final String PubsubPath = "projects/{projId}/subscriptions/{Subscription Name}";
static final String DatabaseUrl = "https://{projId}.firebaseio.com";
@DefaultCoder(AvroCoder.class)
static class LogData {
public String getTime() {
return time;
}
public Date getTimeFixed() {
SimpleDateFormat groupformatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
try {
return groupformatter.parse(time);
} catch (ParseException e) {
loggin.error("EasyStatisticFn_Error getTimeGroup --- " + e.toString());
return null;
}
}
public String getAct() {
return act;
}
public Integer getValue() {
return value;
}
private String time;
private String act;
private Integer value;
}
// ========================================
// ========== 分析資料 =====================
// ========================================
private static class EasyStatisticFn extends DoFn<Iterable<streamingdataflow.LogData>, String> {
private static final long serialVersionUID = 1L;
// -- init dofn ---
@StartBundle
public void startBundle() {
try {
String FirebaseAuthString = "{.json string}";
InputStream stream = new ByteArrayInputStream(
FirebaseAuthString.getBytes(StandardCharsets.UTF_8.name()));
FirebaseOptions options = new FirebaseOptions.Builder()
.setCredential(FirebaseCredentials.fromCertificate(stream)).setDatabaseUrl(DatabaseUrl).build();
FirebaseApp.initializeApp(options);
} catch (Exception e) {
loggin.error("FirebaseInit_Error --- " + e.getMessage());
}
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
SimpleDateFormat keyformatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
Date ndate = new Date();
// -==========================================================
FirebaseDatabase database = FirebaseDatabase.getInstance();
DatabaseReference ref = database.getReference("");
DatabaseReference listRef = ref.child("list");
// -==========================================================
// Transfer data type.
List<streamingdataflow.LogData> datalist = Lists.newArrayList(c.element());
loggin.error("EasyStatisticFn_ProcessElement_jsonArray:" + datalist.size());
// fix time.
Map<Date, List<streamingdataflow.LogData>> timeMap = datalist.stream()
.collect(Collectors.groupingBy(streamingdataflow.LogData::getTimeFixed));
for (Map.Entry entrytime : timeMap.entrySet()) {
loggin.error("EasyStatisticFn_ProcessElement_fixtime --- " + formatter.format(entrytime.getKey()));
String gpTime = keyformatter.format(entrytime.getKey());
List<streamingdataflow.LogData> gpList = (List<streamingdataflow.LogData>) entrytime.getValue();
DatabaseReference gpRef = listRef.child(gpTime);
// Group by Act.
Map<String, List<streamingdataflow.LogData>> resultMap = gpList.stream()
.collect(Collectors.groupingBy(streamingdataflow.LogData::getAct));
int cnt = gpList.size();
ListenFirebaseValue(gpRef, "All", cnt);
for (Map.Entry entry : resultMap.entrySet()) {
List<streamingdataflow.LogData> gplist = (List<streamingdataflow.LogData>) entry.getValue();
loggin.error("EasyStatisticFn_eachdata:" + gplist.iterator().toString());
ListenFirebaseValue(gpRef, entry.getKey().toString(), gplist.size());
}
}
c.output(formatter.format(ndate) + " successed.");
} catch (Exception e) {
loggin.error("EasyStatisticFn_Error --- " + e.toString());
c.output(e.toString());
}
}
}
private static void ListenFirebaseValue(DatabaseReference Ref, String KeyName, Integer Count) {
Ref.child(KeyName).addListenerForSingleValueEvent(new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (dataSnapshot.getValue() != null) {
Integer post = dataSnapshot.getValue(Integer.class);
loggin.error("EasyStatisticFn_firebase : " + post);
Integer tmpsum = Count + post;
Ref.child(KeyName).setValue(tmpsum);
} else {
Ref.child(KeyName).setValue(Count);
}
}
@Override
public void onCancelled(DatabaseError databaseError) {
loggin.error("EasyStatisticFn_firebase_error : " + databaseError.getCode());
}
});
}
// ========================================
// ======= Windows Collect ===================
// ========================================
public static class WindowsCollect
extends PTransform<PCollection<streamingdataflow.LogData>, PCollection<Iterable<streamingdataflow.LogData>>> {
@Override
public PCollection<Iterable<streamingdataflow.LogData>> expand(PCollection<streamingdataflow.LogData> logInfo) {
return logInfo
.apply("Windows collect",
Window.<streamingdataflow.LogData>into(FixedWindows.of(Duration.standardSeconds(WINDOW_SIZE))))
.apply("Add keys", WithKeys.<Integer, streamingdataflow.LogData>of(1))
.apply("Gruop data", GroupByKey.<Integer, streamingdataflow.LogData>create())
.apply("Formate data", Values.<Iterable<streamingdataflow.LogData>>create());
}
}
// ========================================
// ========== Run Dataflow ===================
// ========================================
public static void main(String[] args) {
final TupleTag<streamingdataflow.LogData> successTag = new TupleTag<streamingdataflow.LogData>() {
};
final TupleTag<String> deadLetterTag = new TupleTag<String>() {
};
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<String> dataList = p.apply("ReadMyFile", PubsubIO.readStrings().fromSubscription(PubsubPath));
PCollectionTuple outputTuple = dataList.apply("TransferMessage", ParDo.of(new DoFn<String, streamingdataflow.LogData>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
String msg = c.element();
try {
streamingdataflow.LogData logItem = gson.fromJson(msg, streamingdataflow.LogData.class);
c.output(logItem);
} catch (Exception e) {
c.output(deadLetterTag, msg);
}
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// do successed.
outputTuple.get(successTag)
.apply("seconds Collect", new streamingdataflow.WindowsCollect()).apply("Statistic", ParDo.of(new streamingdataflow.EasyStatisticFn()));
// do error data.
outputTuple.get(deadLetterTag).apply("WriteDeadDataToPubsub",
PubsubIO.writeStrings().to("projects/{projId}/topics/{Sunscript Name}"));
p.run();
}
}
相關參考
Quickstart: Using the gcloud Command-Line Tool
Create a Google Cloud Dataflow Project with Gradle
Apache Beam Mobile Gaming Pipeline Examples
java8 - stream api
open source stream api
留言
張貼留言